Refactor DCRNN Model.
This commit is contained in:
parent
2f2d748b45
commit
5dc36fed7c
|
|
@ -2,12 +2,11 @@ from __future__ import absolute_import
|
||||||
from __future__ import division
|
from __future__ import division
|
||||||
from __future__ import print_function
|
from __future__ import print_function
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
import tensorflow as tf
|
import tensorflow as tf
|
||||||
|
|
||||||
from tensorflow.contrib import legacy_seq2seq
|
from tensorflow.contrib import legacy_seq2seq
|
||||||
|
|
||||||
from lib.metrics import masked_mse_loss, masked_mae_loss, masked_rmse_loss
|
from lib.metrics import masked_mae_loss
|
||||||
from model.dcrnn_cell import DCGRUCell
|
from model.dcrnn_cell import DCGRUCell
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -21,12 +20,6 @@ class DCRNNModel(object):
|
||||||
self._mae = None
|
self._mae = None
|
||||||
self._train_op = None
|
self._train_op = None
|
||||||
|
|
||||||
# Learning rate.
|
|
||||||
self._lr = tf.get_variable('learning_rate', shape=(), initializer=tf.constant_initializer(0.01),
|
|
||||||
trainable=False)
|
|
||||||
self._new_lr = tf.placeholder(tf.float32, shape=(), name='new_learning_rate')
|
|
||||||
self._lr_update = tf.assign(self._lr, self._new_lr, name='lr_update')
|
|
||||||
|
|
||||||
max_diffusion_step = int(model_kwargs.get('max_diffusion_step', 2))
|
max_diffusion_step = int(model_kwargs.get('max_diffusion_step', 2))
|
||||||
cl_decay_steps = int(model_kwargs.get('cl_decay_steps', 1000))
|
cl_decay_steps = int(model_kwargs.get('cl_decay_steps', 1000))
|
||||||
filter_type = model_kwargs.get('filter_type', 'laplacian')
|
filter_type = model_kwargs.get('filter_type', 'laplacian')
|
||||||
|
|
@ -46,7 +39,8 @@ class DCRNNModel(object):
|
||||||
# Labels: (batch_size, timesteps, num_sensor, input_dim), same format with input except the temporal dimension.
|
# Labels: (batch_size, timesteps, num_sensor, input_dim), same format with input except the temporal dimension.
|
||||||
self._labels = tf.placeholder(tf.float32, shape=(batch_size, horizon, num_nodes, input_dim), name='labels')
|
self._labels = tf.placeholder(tf.float32, shape=(batch_size, horizon, num_nodes, input_dim), name='labels')
|
||||||
|
|
||||||
GO_SYMBOL = tf.zeros(shape=(batch_size, num_nodes * input_dim))
|
# GO_SYMBOL = tf.zeros(shape=(batch_size, num_nodes * input_dim))
|
||||||
|
GO_SYMBOL = tf.zeros(shape=(batch_size, num_nodes * output_dim))
|
||||||
|
|
||||||
cell = DCGRUCell(rnn_units, adj_mx, max_diffusion_step=max_diffusion_step, num_nodes=num_nodes,
|
cell = DCGRUCell(rnn_units, adj_mx, max_diffusion_step=max_diffusion_step, num_nodes=num_nodes,
|
||||||
filter_type=filter_type)
|
filter_type=filter_type)
|
||||||
|
|
@ -80,7 +74,7 @@ class DCRNNModel(object):
|
||||||
else:
|
else:
|
||||||
# Return the prediction of the model in testing.
|
# Return the prediction of the model in testing.
|
||||||
result = prev
|
result = prev
|
||||||
if aux_dim > 0:
|
if False and aux_dim > 0:
|
||||||
result = tf.reshape(result, (batch_size, num_nodes, output_dim))
|
result = tf.reshape(result, (batch_size, num_nodes, output_dim))
|
||||||
result = tf.concat([result, aux_info[i]], axis=-1)
|
result = tf.concat([result, aux_info[i]], axis=-1)
|
||||||
result = tf.reshape(result, (batch_size, num_nodes * input_dim))
|
result = tf.reshape(result, (batch_size, num_nodes * input_dim))
|
||||||
|
|
@ -93,20 +87,6 @@ class DCRNNModel(object):
|
||||||
# Project the output to output_dim.
|
# Project the output to output_dim.
|
||||||
outputs = tf.stack(outputs[:-1], axis=1)
|
outputs = tf.stack(outputs[:-1], axis=1)
|
||||||
self._outputs = tf.reshape(outputs, (batch_size, horizon, num_nodes, output_dim), name='outputs')
|
self._outputs = tf.reshape(outputs, (batch_size, horizon, num_nodes, output_dim), name='outputs')
|
||||||
|
|
||||||
preds = self._outputs
|
|
||||||
labels = self._labels[..., :output_dim]
|
|
||||||
|
|
||||||
null_val = 0.
|
|
||||||
self._mae = masked_mae_loss(self._scaler, null_val)(preds=preds, labels=labels)
|
|
||||||
self._loss = masked_mae_loss(self._scaler, null_val)(preds=preds, labels=labels)
|
|
||||||
if is_training:
|
|
||||||
optimizer = tf.train.AdamOptimizer(self._lr)
|
|
||||||
tvars = tf.trainable_variables()
|
|
||||||
grads = tf.gradients(self._loss, tvars)
|
|
||||||
grads, _ = tf.clip_by_global_norm(grads, max_grad_norm)
|
|
||||||
self._train_op = optimizer.apply_gradients(zip(grads, tvars), global_step=global_step, name='train_op')
|
|
||||||
|
|
||||||
self._merged = tf.summary.merge_all()
|
self._merged = tf.summary.merge_all()
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
|
@ -119,61 +99,6 @@ class DCRNNModel(object):
|
||||||
"""
|
"""
|
||||||
return tf.cast(k / (k + tf.exp(global_step / k)), tf.float32)
|
return tf.cast(k / (k + tf.exp(global_step / k)), tf.float32)
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def run_epoch_generator(sess, model, data_generator, return_output=False, train_op=None, writer=None):
|
|
||||||
losses = []
|
|
||||||
maes = []
|
|
||||||
outputs = []
|
|
||||||
|
|
||||||
fetches = {
|
|
||||||
'mae': model.mae,
|
|
||||||
'loss': model.loss,
|
|
||||||
'global_step': tf.train.get_or_create_global_step()
|
|
||||||
}
|
|
||||||
if train_op:
|
|
||||||
fetches.update({
|
|
||||||
'train_op': train_op,
|
|
||||||
})
|
|
||||||
merged = model.merged
|
|
||||||
if merged is not None:
|
|
||||||
fetches.update({'merged': merged})
|
|
||||||
|
|
||||||
if return_output:
|
|
||||||
fetches.update({
|
|
||||||
'outputs': model.outputs
|
|
||||||
})
|
|
||||||
|
|
||||||
for _, (x, y) in enumerate(data_generator):
|
|
||||||
feed_dict = {
|
|
||||||
model.inputs: x,
|
|
||||||
model.labels: y,
|
|
||||||
}
|
|
||||||
|
|
||||||
vals = sess.run(fetches, feed_dict=feed_dict)
|
|
||||||
|
|
||||||
losses.append(vals['loss'])
|
|
||||||
maes.append(vals['mae'])
|
|
||||||
if writer is not None and 'merged' in vals:
|
|
||||||
writer.add_summary(vals['merged'], global_step=vals['global_step'])
|
|
||||||
if return_output:
|
|
||||||
outputs.append(vals['outputs'])
|
|
||||||
|
|
||||||
results = {
|
|
||||||
'loss': np.mean(losses),
|
|
||||||
'mae': np.mean(maes)
|
|
||||||
}
|
|
||||||
if return_output:
|
|
||||||
results['outputs'] = outputs
|
|
||||||
return results
|
|
||||||
|
|
||||||
def get_lr(self, sess):
|
|
||||||
return np.asscalar(sess.run(self._lr))
|
|
||||||
|
|
||||||
def set_lr(self, sess, lr):
|
|
||||||
sess.run(self._lr_update, feed_dict={
|
|
||||||
self._new_lr: lr
|
|
||||||
})
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def inputs(self):
|
def inputs(self):
|
||||||
return self._inputs
|
return self._inputs
|
||||||
|
|
@ -186,10 +111,6 @@ class DCRNNModel(object):
|
||||||
def loss(self):
|
def loss(self):
|
||||||
return self._loss
|
return self._loss
|
||||||
|
|
||||||
@property
|
|
||||||
def lr(self):
|
|
||||||
return self._lr
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def mae(self):
|
def mae(self):
|
||||||
return self._mae
|
return self._mae
|
||||||
|
|
@ -201,7 +122,3 @@ class DCRNNModel(object):
|
||||||
@property
|
@property
|
||||||
def outputs(self):
|
def outputs(self):
|
||||||
return self._outputs
|
return self._outputs
|
||||||
|
|
||||||
@property
|
|
||||||
def train_op(self):
|
|
||||||
return self._train_op
|
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,8 @@ import time
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
from lib import utils, metrics
|
from lib import utils, metrics
|
||||||
|
from lib.AMSGrad import AMSGrad
|
||||||
|
from lib.metrics import masked_mae_loss
|
||||||
from lib.utils import StandardScaler, DataLoader
|
from lib.utils import StandardScaler, DataLoader
|
||||||
|
|
||||||
from model.dcrnn_model import DCRNNModel
|
from model.dcrnn_model import DCRNNModel
|
||||||
|
|
@ -29,7 +31,7 @@ class DCRNNSupervisor(object):
|
||||||
|
|
||||||
# logging.
|
# logging.
|
||||||
self._log_dir = self._get_log_dir(kwargs)
|
self._log_dir = self._get_log_dir(kwargs)
|
||||||
self._logger = utils.get_logger(self.log_dir, __name__, 'info.log')
|
self._logger = utils.get_logger(self._log_dir, __name__, 'info.log')
|
||||||
self._writer = tf.summary.FileWriter(self._log_dir)
|
self._writer = tf.summary.FileWriter(self._log_dir)
|
||||||
self._logger.info(kwargs)
|
self._logger.info(kwargs)
|
||||||
|
|
||||||
|
|
@ -61,6 +63,37 @@ class DCRNNSupervisor(object):
|
||||||
batch_size=self._data_kwargs['test_batch_size'],
|
batch_size=self._data_kwargs['test_batch_size'],
|
||||||
adj_mx=adj_mx, **self._model_kwargs)
|
adj_mx=adj_mx, **self._model_kwargs)
|
||||||
|
|
||||||
|
# Calculate loss
|
||||||
|
output_dim = self._model_kwargs.get('output_dim')
|
||||||
|
preds = self._train_model.outputs
|
||||||
|
labels = self._train_model.labels[..., :output_dim]
|
||||||
|
|
||||||
|
null_val = 0.
|
||||||
|
self._loss_fn = masked_mae_loss(scaler, null_val)
|
||||||
|
self._train_loss = self._loss_fn(preds=preds, labels=labels)
|
||||||
|
|
||||||
|
# Learning rate.
|
||||||
|
self._lr = tf.get_variable('learning_rate', shape=(), initializer=tf.constant_initializer(0.01),
|
||||||
|
trainable=False)
|
||||||
|
self._new_lr = tf.placeholder(tf.float32, shape=(), name='new_learning_rate')
|
||||||
|
self._lr_update = tf.assign(self._lr, self._new_lr, name='lr_update')
|
||||||
|
|
||||||
|
# Configure optimizer
|
||||||
|
optimizer_name = self._train_kwargs.get('optimizer', 'adam').lower()
|
||||||
|
epsilon = float(self._train_kwargs.get('epsilon', 1e-3))
|
||||||
|
optimizer = tf.train.AdamOptimizer(self._lr, epsilon=epsilon)
|
||||||
|
if optimizer_name == 'sgd':
|
||||||
|
optimizer = tf.train.GradientDescentOptimizer(self._lr, )
|
||||||
|
elif optimizer_name == 'amsgrad':
|
||||||
|
optimizer = AMSGrad(self._lr, epsilon=epsilon)
|
||||||
|
|
||||||
|
tvars = tf.trainable_variables()
|
||||||
|
grads = tf.gradients(self._train_loss, tvars)
|
||||||
|
max_grad_norm = kwargs['train'].get('max_grad_norm', 1.)
|
||||||
|
grads, _ = tf.clip_by_global_norm(grads, max_grad_norm)
|
||||||
|
global_step = tf.train.get_or_create_global_step()
|
||||||
|
self._train_op = optimizer.apply_gradients(zip(grads, tvars), global_step=global_step, name='train_op')
|
||||||
|
|
||||||
# Log model statistics.
|
# Log model statistics.
|
||||||
total_trainable_parameter = utils.get_total_trainable_parameter_size()
|
total_trainable_parameter = utils.get_total_trainable_parameter_size()
|
||||||
self._logger.info('Total number of trainable parameters: %d' % total_trainable_parameter)
|
self._logger.info('Total number of trainable parameters: %d' % total_trainable_parameter)
|
||||||
|
|
@ -114,6 +147,63 @@ class DCRNNSupervisor(object):
|
||||||
|
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
def run_epoch_generator(self, sess, model, data_generator, return_output=False, training=False, writer=None):
|
||||||
|
losses = []
|
||||||
|
maes = []
|
||||||
|
outputs = []
|
||||||
|
output_dim = self._model_kwargs.get('output_dim')
|
||||||
|
preds = model.outputs
|
||||||
|
labels = model.labels[..., :output_dim]
|
||||||
|
loss = self._loss_fn(preds=preds, labels=labels)
|
||||||
|
fetches = {
|
||||||
|
'loss': loss,
|
||||||
|
'mae': loss,
|
||||||
|
'global_step': tf.train.get_or_create_global_step()
|
||||||
|
}
|
||||||
|
if training:
|
||||||
|
fetches.update({
|
||||||
|
'train_op': self._train_op
|
||||||
|
})
|
||||||
|
merged = model.merged
|
||||||
|
if merged is not None:
|
||||||
|
fetches.update({'merged': merged})
|
||||||
|
|
||||||
|
if return_output:
|
||||||
|
fetches.update({
|
||||||
|
'outputs': model.outputs
|
||||||
|
})
|
||||||
|
|
||||||
|
for _, (x, y) in enumerate(data_generator):
|
||||||
|
feed_dict = {
|
||||||
|
model.inputs: x,
|
||||||
|
model.labels: y,
|
||||||
|
}
|
||||||
|
|
||||||
|
vals = sess.run(fetches, feed_dict=feed_dict)
|
||||||
|
|
||||||
|
losses.append(vals['loss'])
|
||||||
|
maes.append(vals['mae'])
|
||||||
|
if writer is not None and 'merged' in vals:
|
||||||
|
writer.add_summary(vals['merged'], global_step=vals['global_step'])
|
||||||
|
if return_output:
|
||||||
|
outputs.append(vals['outputs'])
|
||||||
|
|
||||||
|
results = {
|
||||||
|
'loss': np.mean(losses),
|
||||||
|
'mae': np.mean(maes)
|
||||||
|
}
|
||||||
|
if return_output:
|
||||||
|
results['outputs'] = outputs
|
||||||
|
return results
|
||||||
|
|
||||||
|
def get_lr(self, sess):
|
||||||
|
return np.asscalar(sess.run(self._lr))
|
||||||
|
|
||||||
|
def set_lr(self, sess, lr):
|
||||||
|
sess.run(self._lr_update, feed_dict={
|
||||||
|
self._new_lr: lr
|
||||||
|
})
|
||||||
|
|
||||||
def train(self, sess, **kwargs):
|
def train(self, sess, **kwargs):
|
||||||
kwargs.update(self._train_kwargs)
|
kwargs.update(self._train_kwargs)
|
||||||
return self._train(sess, **kwargs)
|
return self._train(sess, **kwargs)
|
||||||
|
|
@ -133,17 +223,18 @@ class DCRNNSupervisor(object):
|
||||||
self._epoch = epoch + 1
|
self._epoch = epoch + 1
|
||||||
else:
|
else:
|
||||||
sess.run(tf.global_variables_initializer())
|
sess.run(tf.global_variables_initializer())
|
||||||
|
self._logger.info('Start training ...')
|
||||||
|
|
||||||
while self._epoch <= epochs:
|
while self._epoch <= epochs:
|
||||||
# Learning rate schedule.
|
# Learning rate schedule.
|
||||||
new_lr = max(min_learning_rate, base_lr * (lr_decay_ratio ** np.sum(self._epoch >= np.array(steps))))
|
new_lr = max(min_learning_rate, base_lr * (lr_decay_ratio ** np.sum(self._epoch >= np.array(steps))))
|
||||||
self._train_model.set_lr(sess=sess, lr=new_lr)
|
self.set_lr(sess=sess, lr=new_lr)
|
||||||
sys.stdout.flush()
|
|
||||||
|
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
train_results = self._train_model.run_epoch_generator(sess, self._train_model,
|
train_results = self.run_epoch_generator(sess, self._train_model,
|
||||||
self._data['train_loader'].get_iterator(),
|
self._data['train_loader'].get_iterator(),
|
||||||
train_op=self._train_model.train_op,
|
training=True,
|
||||||
writer=self._writer)
|
writer=self._writer)
|
||||||
train_loss, train_mae = train_results['loss'], train_results['mae']
|
train_loss, train_mae = train_results['loss'], train_results['mae']
|
||||||
if train_loss > 1e5:
|
if train_loss > 1e5:
|
||||||
self._logger.warning('Gradient explosion detected. Ending...')
|
self._logger.warning('Gradient explosion detected. Ending...')
|
||||||
|
|
@ -151,17 +242,17 @@ class DCRNNSupervisor(object):
|
||||||
|
|
||||||
global_step = sess.run(tf.train.get_or_create_global_step())
|
global_step = sess.run(tf.train.get_or_create_global_step())
|
||||||
# Compute validation error.
|
# Compute validation error.
|
||||||
val_results = self._val_model.run_epoch_generator(sess, self._val_model,
|
val_results = self.run_epoch_generator(sess, self._val_model,
|
||||||
self._data['val_loader'].get_iterator(),
|
self._data['val_loader'].get_iterator(),
|
||||||
train_op=None)
|
training=False)
|
||||||
val_loss, val_mae = val_results['loss'], val_results['mae']
|
val_loss, val_mae = val_results['loss'], val_results['mae']
|
||||||
|
|
||||||
utils.add_simple_summary(self._writer,
|
utils.add_simple_summary(self._writer,
|
||||||
['loss/train_loss', 'metric/train_mae', 'loss/val_loss', 'metric/val_mae'],
|
['loss/train_loss', 'metric/train_mae', 'loss/val_loss', 'metric/val_mae'],
|
||||||
[train_loss, train_mae, val_loss, val_mae], global_step=global_step)
|
[train_loss, train_mae, val_loss, val_mae], global_step=global_step)
|
||||||
end_time = time.time()
|
end_time = time.time()
|
||||||
message = 'Epoch [{}] ({}) train_mae: {:.4f}, val_mae: {:.4f} lr:{:.6f} {:.1f}s'.format(
|
message = 'Epoch [{}/{}] ({}) train_mae: {:.4f}, val_mae: {:.4f} lr:{:.6f} {:.1f}s'.format(
|
||||||
self._epoch, global_step, train_mae, val_mae, new_lr, (end_time - start_time))
|
self._epoch, epochs, global_step, train_mae, val_mae, new_lr, (end_time - start_time))
|
||||||
self._logger.info(message)
|
self._logger.info(message)
|
||||||
if self._epoch % test_every_n_epochs == test_every_n_epochs - 1:
|
if self._epoch % test_every_n_epochs == test_every_n_epochs - 1:
|
||||||
self.test_and_write_result(sess, global_step)
|
self.test_and_write_result(sess, global_step)
|
||||||
|
|
@ -186,9 +277,10 @@ class DCRNNSupervisor(object):
|
||||||
return np.min(history)
|
return np.min(history)
|
||||||
|
|
||||||
def test_and_write_result(self, sess, global_step, **kwargs):
|
def test_and_write_result(self, sess, global_step, **kwargs):
|
||||||
test_results = self._test_model.run_epoch_generator(sess, self._test_model,
|
test_results = self.run_epoch_generator(sess, self._test_model,
|
||||||
self._data['test_loader'].get_iterator(),
|
self._data['test_loader'].get_iterator(),
|
||||||
return_output=True, train_op=None)
|
return_output=True,
|
||||||
|
training=False)
|
||||||
|
|
||||||
# y_preds: a list of (batch_size, horizon, num_nodes, output_dim)
|
# y_preds: a list of (batch_size, horizon, num_nodes, output_dim)
|
||||||
test_loss, y_preds = test_results['loss'], test_results['outputs']
|
test_loss, y_preds = test_results['loss'], test_results['outputs']
|
||||||
|
|
@ -196,14 +288,17 @@ class DCRNNSupervisor(object):
|
||||||
|
|
||||||
y_preds = np.concatenate(y_preds, axis=0)
|
y_preds = np.concatenate(y_preds, axis=0)
|
||||||
scaler = self._data['scaler']
|
scaler = self._data['scaler']
|
||||||
outputs = []
|
predictions = []
|
||||||
|
y_truths = []
|
||||||
for horizon_i in range(self._data['y_test'].shape[1]):
|
for horizon_i in range(self._data['y_test'].shape[1]):
|
||||||
y_truth = np.concatenate(self._data['y_test'][:, horizon_i, :, 0], axis=0)
|
y_truth = self._data['y_test'][:, horizon_i, :, 0]
|
||||||
y_truth = scaler.inverse_transform(y_truth)
|
y_truth = scaler.inverse_transform(y_truth)
|
||||||
y_pred = np.concatenate(y_preds[:, horizon_i, :, 0], axis=0)
|
y_truths.append(y_truth)
|
||||||
y_pred = y_pred[:y_truth.shape[0], ...] # Only take the batch number
|
|
||||||
|
y_pred = y_preds[:y_truth.shape[0], horizon_i, :, 0]
|
||||||
y_pred = scaler.inverse_transform(y_pred)
|
y_pred = scaler.inverse_transform(y_pred)
|
||||||
outputs.append(y_pred)
|
predictions.append(y_pred)
|
||||||
|
|
||||||
mae = metrics.masked_mae_np(y_pred, y_truth, null_val=0)
|
mae = metrics.masked_mae_np(y_pred, y_truth, null_val=0)
|
||||||
mape = metrics.masked_mape_np(y_pred, y_truth, null_val=0)
|
mape = metrics.masked_mape_np(y_pred, y_truth, null_val=0)
|
||||||
rmse = metrics.masked_rmse_np(y_pred, y_truth, null_val=0)
|
rmse = metrics.masked_rmse_np(y_pred, y_truth, null_val=0)
|
||||||
|
|
@ -217,7 +312,11 @@ class DCRNNSupervisor(object):
|
||||||
['metric/rmse', 'metric/mape', 'metric/mae']],
|
['metric/rmse', 'metric/mape', 'metric/mae']],
|
||||||
[rmse, mape, mae],
|
[rmse, mape, mae],
|
||||||
global_step=global_step)
|
global_step=global_step)
|
||||||
return y_preds
|
outputs = {
|
||||||
|
'predictions': predictions,
|
||||||
|
'groundtruth': y_truths
|
||||||
|
}
|
||||||
|
return outputs
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def restore(sess, config):
|
def restore(sess, config):
|
||||||
|
|
@ -245,7 +344,3 @@ class DCRNNSupervisor(object):
|
||||||
with open(os.path.join(self._log_dir, config_filename), 'w') as f:
|
with open(os.path.join(self._log_dir, config_filename), 'w') as f:
|
||||||
yaml.dump(config, f, default_flow_style=False)
|
yaml.dump(config, f, default_flow_style=False)
|
||||||
return config['train']['model_filename']
|
return config['train']['model_filename']
|
||||||
|
|
||||||
@property
|
|
||||||
def log_dir(self):
|
|
||||||
return self._log_dir
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue