diff --git a/model/dcrnn_model.py b/model/dcrnn_model.py index deef895..30c74e4 100644 --- a/model/dcrnn_model.py +++ b/model/dcrnn_model.py @@ -2,12 +2,11 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import numpy as np import tensorflow as tf from tensorflow.contrib import legacy_seq2seq -from lib.metrics import masked_mse_loss, masked_mae_loss, masked_rmse_loss +from lib.metrics import masked_mae_loss from model.dcrnn_cell import DCGRUCell @@ -21,12 +20,6 @@ class DCRNNModel(object): self._mae = None self._train_op = None - # Learning rate. - self._lr = tf.get_variable('learning_rate', shape=(), initializer=tf.constant_initializer(0.01), - trainable=False) - self._new_lr = tf.placeholder(tf.float32, shape=(), name='new_learning_rate') - self._lr_update = tf.assign(self._lr, self._new_lr, name='lr_update') - max_diffusion_step = int(model_kwargs.get('max_diffusion_step', 2)) cl_decay_steps = int(model_kwargs.get('cl_decay_steps', 1000)) filter_type = model_kwargs.get('filter_type', 'laplacian') @@ -46,7 +39,8 @@ class DCRNNModel(object): # Labels: (batch_size, timesteps, num_sensor, input_dim), same format with input except the temporal dimension. self._labels = tf.placeholder(tf.float32, shape=(batch_size, horizon, num_nodes, input_dim), name='labels') - GO_SYMBOL = tf.zeros(shape=(batch_size, num_nodes * input_dim)) + # GO_SYMBOL = tf.zeros(shape=(batch_size, num_nodes * input_dim)) + GO_SYMBOL = tf.zeros(shape=(batch_size, num_nodes * output_dim)) cell = DCGRUCell(rnn_units, adj_mx, max_diffusion_step=max_diffusion_step, num_nodes=num_nodes, filter_type=filter_type) @@ -80,7 +74,7 @@ class DCRNNModel(object): else: # Return the prediction of the model in testing. result = prev - if aux_dim > 0: + if False and aux_dim > 0: result = tf.reshape(result, (batch_size, num_nodes, output_dim)) result = tf.concat([result, aux_info[i]], axis=-1) result = tf.reshape(result, (batch_size, num_nodes * input_dim)) @@ -93,20 +87,6 @@ class DCRNNModel(object): # Project the output to output_dim. outputs = tf.stack(outputs[:-1], axis=1) self._outputs = tf.reshape(outputs, (batch_size, horizon, num_nodes, output_dim), name='outputs') - - preds = self._outputs - labels = self._labels[..., :output_dim] - - null_val = 0. - self._mae = masked_mae_loss(self._scaler, null_val)(preds=preds, labels=labels) - self._loss = masked_mae_loss(self._scaler, null_val)(preds=preds, labels=labels) - if is_training: - optimizer = tf.train.AdamOptimizer(self._lr) - tvars = tf.trainable_variables() - grads = tf.gradients(self._loss, tvars) - grads, _ = tf.clip_by_global_norm(grads, max_grad_norm) - self._train_op = optimizer.apply_gradients(zip(grads, tvars), global_step=global_step, name='train_op') - self._merged = tf.summary.merge_all() @staticmethod @@ -119,61 +99,6 @@ class DCRNNModel(object): """ return tf.cast(k / (k + tf.exp(global_step / k)), tf.float32) - @staticmethod - def run_epoch_generator(sess, model, data_generator, return_output=False, train_op=None, writer=None): - losses = [] - maes = [] - outputs = [] - - fetches = { - 'mae': model.mae, - 'loss': model.loss, - 'global_step': tf.train.get_or_create_global_step() - } - if train_op: - fetches.update({ - 'train_op': train_op, - }) - merged = model.merged - if merged is not None: - fetches.update({'merged': merged}) - - if return_output: - fetches.update({ - 'outputs': model.outputs - }) - - for _, (x, y) in enumerate(data_generator): - feed_dict = { - model.inputs: x, - model.labels: y, - } - - vals = sess.run(fetches, feed_dict=feed_dict) - - losses.append(vals['loss']) - maes.append(vals['mae']) - if writer is not None and 'merged' in vals: - writer.add_summary(vals['merged'], global_step=vals['global_step']) - if return_output: - outputs.append(vals['outputs']) - - results = { - 'loss': np.mean(losses), - 'mae': np.mean(maes) - } - if return_output: - results['outputs'] = outputs - return results - - def get_lr(self, sess): - return np.asscalar(sess.run(self._lr)) - - def set_lr(self, sess, lr): - sess.run(self._lr_update, feed_dict={ - self._new_lr: lr - }) - @property def inputs(self): return self._inputs @@ -186,10 +111,6 @@ class DCRNNModel(object): def loss(self): return self._loss - @property - def lr(self): - return self._lr - @property def mae(self): return self._mae @@ -201,7 +122,3 @@ class DCRNNModel(object): @property def outputs(self): return self._outputs - - @property - def train_op(self): - return self._train_op diff --git a/model/dcrnn_supervisor.py b/model/dcrnn_supervisor.py index 585e377..b161f7a 100644 --- a/model/dcrnn_supervisor.py +++ b/model/dcrnn_supervisor.py @@ -10,6 +10,8 @@ import time import yaml from lib import utils, metrics +from lib.AMSGrad import AMSGrad +from lib.metrics import masked_mae_loss from lib.utils import StandardScaler, DataLoader from model.dcrnn_model import DCRNNModel @@ -29,7 +31,7 @@ class DCRNNSupervisor(object): # logging. self._log_dir = self._get_log_dir(kwargs) - self._logger = utils.get_logger(self.log_dir, __name__, 'info.log') + self._logger = utils.get_logger(self._log_dir, __name__, 'info.log') self._writer = tf.summary.FileWriter(self._log_dir) self._logger.info(kwargs) @@ -61,6 +63,37 @@ class DCRNNSupervisor(object): batch_size=self._data_kwargs['test_batch_size'], adj_mx=adj_mx, **self._model_kwargs) + # Calculate loss + output_dim = self._model_kwargs.get('output_dim') + preds = self._train_model.outputs + labels = self._train_model.labels[..., :output_dim] + + null_val = 0. + self._loss_fn = masked_mae_loss(scaler, null_val) + self._train_loss = self._loss_fn(preds=preds, labels=labels) + + # Learning rate. + self._lr = tf.get_variable('learning_rate', shape=(), initializer=tf.constant_initializer(0.01), + trainable=False) + self._new_lr = tf.placeholder(tf.float32, shape=(), name='new_learning_rate') + self._lr_update = tf.assign(self._lr, self._new_lr, name='lr_update') + + # Configure optimizer + optimizer_name = self._train_kwargs.get('optimizer', 'adam').lower() + epsilon = float(self._train_kwargs.get('epsilon', 1e-3)) + optimizer = tf.train.AdamOptimizer(self._lr, epsilon=epsilon) + if optimizer_name == 'sgd': + optimizer = tf.train.GradientDescentOptimizer(self._lr, ) + elif optimizer_name == 'amsgrad': + optimizer = AMSGrad(self._lr, epsilon=epsilon) + + tvars = tf.trainable_variables() + grads = tf.gradients(self._train_loss, tvars) + max_grad_norm = kwargs['train'].get('max_grad_norm', 1.) + grads, _ = tf.clip_by_global_norm(grads, max_grad_norm) + global_step = tf.train.get_or_create_global_step() + self._train_op = optimizer.apply_gradients(zip(grads, tvars), global_step=global_step, name='train_op') + # Log model statistics. total_trainable_parameter = utils.get_total_trainable_parameter_size() self._logger.info('Total number of trainable parameters: %d' % total_trainable_parameter) @@ -114,6 +147,63 @@ class DCRNNSupervisor(object): return data + def run_epoch_generator(self, sess, model, data_generator, return_output=False, training=False, writer=None): + losses = [] + maes = [] + outputs = [] + output_dim = self._model_kwargs.get('output_dim') + preds = model.outputs + labels = model.labels[..., :output_dim] + loss = self._loss_fn(preds=preds, labels=labels) + fetches = { + 'loss': loss, + 'mae': loss, + 'global_step': tf.train.get_or_create_global_step() + } + if training: + fetches.update({ + 'train_op': self._train_op + }) + merged = model.merged + if merged is not None: + fetches.update({'merged': merged}) + + if return_output: + fetches.update({ + 'outputs': model.outputs + }) + + for _, (x, y) in enumerate(data_generator): + feed_dict = { + model.inputs: x, + model.labels: y, + } + + vals = sess.run(fetches, feed_dict=feed_dict) + + losses.append(vals['loss']) + maes.append(vals['mae']) + if writer is not None and 'merged' in vals: + writer.add_summary(vals['merged'], global_step=vals['global_step']) + if return_output: + outputs.append(vals['outputs']) + + results = { + 'loss': np.mean(losses), + 'mae': np.mean(maes) + } + if return_output: + results['outputs'] = outputs + return results + + def get_lr(self, sess): + return np.asscalar(sess.run(self._lr)) + + def set_lr(self, sess, lr): + sess.run(self._lr_update, feed_dict={ + self._new_lr: lr + }) + def train(self, sess, **kwargs): kwargs.update(self._train_kwargs) return self._train(sess, **kwargs) @@ -133,17 +223,18 @@ class DCRNNSupervisor(object): self._epoch = epoch + 1 else: sess.run(tf.global_variables_initializer()) + self._logger.info('Start training ...') + while self._epoch <= epochs: # Learning rate schedule. new_lr = max(min_learning_rate, base_lr * (lr_decay_ratio ** np.sum(self._epoch >= np.array(steps)))) - self._train_model.set_lr(sess=sess, lr=new_lr) - sys.stdout.flush() + self.set_lr(sess=sess, lr=new_lr) start_time = time.time() - train_results = self._train_model.run_epoch_generator(sess, self._train_model, - self._data['train_loader'].get_iterator(), - train_op=self._train_model.train_op, - writer=self._writer) + train_results = self.run_epoch_generator(sess, self._train_model, + self._data['train_loader'].get_iterator(), + training=True, + writer=self._writer) train_loss, train_mae = train_results['loss'], train_results['mae'] if train_loss > 1e5: self._logger.warning('Gradient explosion detected. Ending...') @@ -151,17 +242,17 @@ class DCRNNSupervisor(object): global_step = sess.run(tf.train.get_or_create_global_step()) # Compute validation error. - val_results = self._val_model.run_epoch_generator(sess, self._val_model, - self._data['val_loader'].get_iterator(), - train_op=None) + val_results = self.run_epoch_generator(sess, self._val_model, + self._data['val_loader'].get_iterator(), + training=False) val_loss, val_mae = val_results['loss'], val_results['mae'] utils.add_simple_summary(self._writer, ['loss/train_loss', 'metric/train_mae', 'loss/val_loss', 'metric/val_mae'], [train_loss, train_mae, val_loss, val_mae], global_step=global_step) end_time = time.time() - message = 'Epoch [{}] ({}) train_mae: {:.4f}, val_mae: {:.4f} lr:{:.6f} {:.1f}s'.format( - self._epoch, global_step, train_mae, val_mae, new_lr, (end_time - start_time)) + message = 'Epoch [{}/{}] ({}) train_mae: {:.4f}, val_mae: {:.4f} lr:{:.6f} {:.1f}s'.format( + self._epoch, epochs, global_step, train_mae, val_mae, new_lr, (end_time - start_time)) self._logger.info(message) if self._epoch % test_every_n_epochs == test_every_n_epochs - 1: self.test_and_write_result(sess, global_step) @@ -186,9 +277,10 @@ class DCRNNSupervisor(object): return np.min(history) def test_and_write_result(self, sess, global_step, **kwargs): - test_results = self._test_model.run_epoch_generator(sess, self._test_model, - self._data['test_loader'].get_iterator(), - return_output=True, train_op=None) + test_results = self.run_epoch_generator(sess, self._test_model, + self._data['test_loader'].get_iterator(), + return_output=True, + training=False) # y_preds: a list of (batch_size, horizon, num_nodes, output_dim) test_loss, y_preds = test_results['loss'], test_results['outputs'] @@ -196,14 +288,17 @@ class DCRNNSupervisor(object): y_preds = np.concatenate(y_preds, axis=0) scaler = self._data['scaler'] - outputs = [] + predictions = [] + y_truths = [] for horizon_i in range(self._data['y_test'].shape[1]): - y_truth = np.concatenate(self._data['y_test'][:, horizon_i, :, 0], axis=0) + y_truth = self._data['y_test'][:, horizon_i, :, 0] y_truth = scaler.inverse_transform(y_truth) - y_pred = np.concatenate(y_preds[:, horizon_i, :, 0], axis=0) - y_pred = y_pred[:y_truth.shape[0], ...] # Only take the batch number + y_truths.append(y_truth) + + y_pred = y_preds[:y_truth.shape[0], horizon_i, :, 0] y_pred = scaler.inverse_transform(y_pred) - outputs.append(y_pred) + predictions.append(y_pred) + mae = metrics.masked_mae_np(y_pred, y_truth, null_val=0) mape = metrics.masked_mape_np(y_pred, y_truth, null_val=0) rmse = metrics.masked_rmse_np(y_pred, y_truth, null_val=0) @@ -217,7 +312,11 @@ class DCRNNSupervisor(object): ['metric/rmse', 'metric/mape', 'metric/mae']], [rmse, mape, mae], global_step=global_step) - return y_preds + outputs = { + 'predictions': predictions, + 'groundtruth': y_truths + } + return outputs @staticmethod def restore(sess, config): @@ -245,7 +344,3 @@ class DCRNNSupervisor(object): with open(os.path.join(self._log_dir, config_filename), 'w') as f: yaml.dump(config, f, default_flow_style=False) return config['train']['model_filename'] - - @property - def log_dir(self): - return self._log_dir