diff --git a/data/model/dcrnn_DR_2_h_12_64-64_lr_0.01_bs_64_d_0.00_sl_12_MAE_0606021843/config_75.yaml b/data/model/dcrnn_DR_2_h_12_64-64_lr_0.01_bs_64_d_0.00_sl_12_MAE_0606021843/config_75.yaml deleted file mode 100644 index aee8bc1..0000000 --- a/data/model/dcrnn_DR_2_h_12_64-64_lr_0.01_bs_64_d_0.00_sl_12_MAE_0606021843/config_75.yaml +++ /dev/null @@ -1,40 +0,0 @@ -base_dir: data/model -batch_size: 64 -cl_decay_steps: 2000 -data_type: ALL -dropout: 0 -epoch: 75 -epochs: 100 -filter_type: dual_random_walk -global_step: !!python/object/apply:numpy.core.multiarray.scalar -- !!python/object/apply:numpy.dtype - args: [i8, 0, 1] - state: !!python/tuple [3, <, null, null, null, -1, -1, 0] -- !!binary | - NGgAAAAAAAA= -graph_pkl_filename: data/sensor_graph/adj_mx.pkl -horizon: 12 -l1_decay: 0 -learning_rate: 0.01 -log_dir: data/model/dcrnn_DR_2_h_12_64-64_lr_0.01_bs_64_d_0.00_sl_12_MAE_0606021843/ -loss_func: MAE -lr_decay: 0.1 -lr_decay_epoch: 20 -lr_decay_interval: 10 -max_diffusion_step: 2 -max_grad_norm: 5 -min_learning_rate: 2.0e-06 -model_filename: data/model/dcrnn_DR_2_h_12_64-64_lr_0.01_bs_64_d_0.00_sl_12_MAE_0606021843/models-2.8476-26676 -null_val: 0 -num_rnn_layers: 2 -output_dim: 1 -patience: 50 -rnn_units: 64 -seq_len: 12 -test_every_n_epochs: 10 -test_ratio: 0.2 -use_cpu_only: false -use_curriculum_learning: true -validation_ratio: 0.1 -verbose: 0 -write_db: false diff --git a/data/model/dcrnn_DR_2_h_12_64-64_lr_0.01_bs_64_d_0.00_sl_12_MAE_0606021843/models-2.8476-26676.data-00000-of-00001 b/data/model/dcrnn_DR_2_h_12_64-64_lr_0.01_bs_64_d_0.00_sl_12_MAE_0606021843/models-2.8476-26676.data-00000-of-00001 deleted file mode 100644 index 275dc61..0000000 Binary files a/data/model/dcrnn_DR_2_h_12_64-64_lr_0.01_bs_64_d_0.00_sl_12_MAE_0606021843/models-2.8476-26676.data-00000-of-00001 and /dev/null differ diff --git a/data/model/dcrnn_DR_2_h_12_64-64_lr_0.01_bs_64_d_0.00_sl_12_MAE_0606021843/models-2.8476-26676.index b/data/model/dcrnn_DR_2_h_12_64-64_lr_0.01_bs_64_d_0.00_sl_12_MAE_0606021843/models-2.8476-26676.index deleted file mode 100644 index 77d2554..0000000 Binary files a/data/model/dcrnn_DR_2_h_12_64-64_lr_0.01_bs_64_d_0.00_sl_12_MAE_0606021843/models-2.8476-26676.index and /dev/null differ diff --git a/data/model/dcrnn_config.yaml b/data/model/dcrnn_config.yaml index a0109c6..60b1d5a 100644 --- a/data/model/dcrnn_config.yaml +++ b/data/model/dcrnn_config.yaml @@ -1,34 +1,36 @@ --- base_dir: data/model -batch_size: 64 -cl_decay_steps: 2000 -data_type: ALL -dropout: 0 -epoch: 0 -epochs: 100 -filter_type: dual_random_walk -global_step: 0 -graph_pkl_filename: data/sensor_graph/adj_mx.pkl -horizon: 12 -l1_decay: 0 -learning_rate: 0.01 -loss_func: MAE -lr_decay: 0.1 -lr_decay_epoch: 20 -lr_decay_interval: 10 -max_diffusion_step: 2 -max_grad_norm: 5 -min_learning_rate: 2.0e-06 -null_val: 0 -num_rnn_layers: 2 -output_dim: 1 -patience: 50 -rnn_units: 64 -seq_len: 12 -test_every_n_epochs: 10 -test_ratio: 0.2 -use_cpu_only: false -use_curriculum_learning: true -validation_ratio: 0.1 -verbose: 0 -write_db: false +data: + batch_size: 64 + dataset_dir: data/METR-LA + test_batch_size: 64 + val_batch_size: 64 + graph_pkl_filename: data/sensor_graph/adj_mx.pkl + +model: + cl_decay_steps: 2000 + filter_type: dual_random_walk + horizon: 12 + input_dim: 2 + l1_decay: 0 + max_diffusion_step: 2 + max_grad_norm: 5 + num_nodes: 207 + num_rnn_layers: 2 + output_dim: 1 + rnn_units: 64 + seq_len: 12 + use_curriculum_learning: true + +train: + base_lr: 0.01 + dropout: 0 + epoch: 0 + epochs: 100 + global_step: 0 + lr_decay_ratio: 0.1 + steps: [20, 30, 40, 50] + max_to_keep: 100 + min_learning_rate: 2.0e-06 + patience: 50 + test_every_n_epochs: 10 \ No newline at end of file diff --git a/data/model/dcrnn_config_u16_lap.yaml b/data/model/dcrnn_config_u16_lap.yaml new file mode 100644 index 0000000..d473341 --- /dev/null +++ b/data/model/dcrnn_config_u16_lap.yaml @@ -0,0 +1,36 @@ +--- +base_dir: data/model +data: + batch_size: 64 + dataset_dir: data/METR-LA + test_batch_size: 64 + val_batch_size: 64 + graph_pkl_filename: data/sensor_graph/adj_mx.pkl + +model: + cl_decay_steps: 2000 + filter_type: laplacian + horizon: 12 + input_dim: 2 + l1_decay: 0 + max_diffusion_step: 2 + max_grad_norm: 5 + num_nodes: 207 + num_rnn_layers: 2 + output_dim: 1 + rnn_units: 16 + seq_len: 12 + use_curriculum_learning: true + +train: + base_lr: 0.01 + dropout: 0 + epoch: 0 + epochs: 100 + global_step: 0 + lr_decay_ratio: 0.1 + steps: [20, 30, 40, 50] + max_to_keep: 100 + min_learning_rate: 2.0e-06 + patience: 50 + test_every_n_epochs: 10 \ No newline at end of file diff --git a/data/model/dcrnn_test_config.yaml b/data/model/dcrnn_test_config.yaml index b49b410..60b1d5a 100644 --- a/data/model/dcrnn_test_config.yaml +++ b/data/model/dcrnn_test_config.yaml @@ -1,35 +1,36 @@ --- base_dir: data/model -batch_size: 64 -cl_decay_steps: 2000 -data_type: ALL -dropout: 0 -epoch: 0 -epochs: 100 -filter_type: random_walk -global_step: 0 -graph_pkl_filename: data/sensor_graph/adj_mx.pkl -horizon: 3 -l1_decay: 0 -learning_rate: 0.01 -loss_func: MAE -lr_decay: 0.1 -lr_decay_epoch: 20 -lr_decay_interval: 10 -max_diffusion_step: 2 -max_grad_norm: 5 -method_type: GCRNN -min_learning_rate: 2.0e-06 -null_val: 0 -num_rnn_layers: 2 -output_dim: 1 -patience: 50 -rnn_units: 16 -seq_len: 3 -test_every_n_epochs: 10 -test_ratio: 0.2 -use_cpu_only: false -use_curriculum_learning: true -validation_ratio: 0.1 -verbose: 0 -write_db: false \ No newline at end of file +data: + batch_size: 64 + dataset_dir: data/METR-LA + test_batch_size: 64 + val_batch_size: 64 + graph_pkl_filename: data/sensor_graph/adj_mx.pkl + +model: + cl_decay_steps: 2000 + filter_type: dual_random_walk + horizon: 12 + input_dim: 2 + l1_decay: 0 + max_diffusion_step: 2 + max_grad_norm: 5 + num_nodes: 207 + num_rnn_layers: 2 + output_dim: 1 + rnn_units: 64 + seq_len: 12 + use_curriculum_learning: true + +train: + base_lr: 0.01 + dropout: 0 + epoch: 0 + epochs: 100 + global_step: 0 + lr_decay_ratio: 0.1 + steps: [20, 30, 40, 50] + max_to_keep: 100 + min_learning_rate: 2.0e-06 + patience: 50 + test_every_n_epochs: 10 \ No newline at end of file diff --git a/data/results/dcrnn_prediction_1.h5 b/data/results/dcrnn_prediction_1.h5 deleted file mode 100644 index 379fcce..0000000 Binary files a/data/results/dcrnn_prediction_1.h5 and /dev/null differ diff --git a/data/results/dcrnn_prediction_10.h5 b/data/results/dcrnn_prediction_10.h5 deleted file mode 100644 index 1a1a69a..0000000 Binary files a/data/results/dcrnn_prediction_10.h5 and /dev/null differ diff --git a/data/results/dcrnn_prediction_11.h5 b/data/results/dcrnn_prediction_11.h5 deleted file mode 100644 index 74c5455..0000000 Binary files a/data/results/dcrnn_prediction_11.h5 and /dev/null differ diff --git a/data/results/dcrnn_prediction_12.h5 b/data/results/dcrnn_prediction_12.h5 deleted file mode 100644 index 0abfb61..0000000 Binary files a/data/results/dcrnn_prediction_12.h5 and /dev/null differ diff --git a/data/results/dcrnn_prediction_2.h5 b/data/results/dcrnn_prediction_2.h5 deleted file mode 100644 index 15f52a6..0000000 Binary files a/data/results/dcrnn_prediction_2.h5 and /dev/null differ diff --git a/data/results/dcrnn_prediction_3.h5 b/data/results/dcrnn_prediction_3.h5 deleted file mode 100644 index 3865876..0000000 Binary files a/data/results/dcrnn_prediction_3.h5 and /dev/null differ diff --git a/data/results/dcrnn_prediction_4.h5 b/data/results/dcrnn_prediction_4.h5 deleted file mode 100644 index aafd18c..0000000 Binary files a/data/results/dcrnn_prediction_4.h5 and /dev/null differ diff --git a/data/results/dcrnn_prediction_5.h5 b/data/results/dcrnn_prediction_5.h5 deleted file mode 100644 index 081f0ed..0000000 Binary files a/data/results/dcrnn_prediction_5.h5 and /dev/null differ diff --git a/data/results/dcrnn_prediction_6.h5 b/data/results/dcrnn_prediction_6.h5 deleted file mode 100644 index 30e68ff..0000000 Binary files a/data/results/dcrnn_prediction_6.h5 and /dev/null differ diff --git a/data/results/dcrnn_prediction_7.h5 b/data/results/dcrnn_prediction_7.h5 deleted file mode 100644 index e0f2bf7..0000000 Binary files a/data/results/dcrnn_prediction_7.h5 and /dev/null differ diff --git a/data/results/dcrnn_prediction_8.h5 b/data/results/dcrnn_prediction_8.h5 deleted file mode 100644 index 1b9aa0e..0000000 Binary files a/data/results/dcrnn_prediction_8.h5 and /dev/null differ diff --git a/data/results/dcrnn_prediction_9.h5 b/data/results/dcrnn_prediction_9.h5 deleted file mode 100644 index 221277c..0000000 Binary files a/data/results/dcrnn_prediction_9.h5 and /dev/null differ diff --git a/dcrnn_train.py b/dcrnn_train.py index 0615089..de75465 100644 --- a/dcrnn_train.py +++ b/dcrnn_train.py @@ -2,86 +2,35 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import pandas as pd +import argparse import tensorflow as tf import yaml - -from lib import log_helper -from lib.dcrnn_utils import load_graph_data +from lib.utils import load_graph_data from model.dcrnn_supervisor import DCRNNSupervisor -# flags -flags = tf.app.flags -FLAGS = flags.FLAGS -flags.DEFINE_integer('batch_size', -1, 'Batch size') -flags.DEFINE_integer('cl_decay_steps', -1, - 'Parameter to control the decay speed of probability of feeding groundth instead of model output.') -flags.DEFINE_string('config_filename', None, 'Configuration filename for restoring the model.') -flags.DEFINE_integer('epochs', -1, 'Maximum number of epochs to train.') -flags.DEFINE_string('filter_type', None, 'laplacian/random_walk/dual_random_walk.') -flags.DEFINE_string('graph_pkl_filename', 'data/sensor_graph/adj_mx.pkl', - 'Pickle file containing: sensor_ids, sensor_id_to_ind_map, dist_matrix') -flags.DEFINE_integer('horizon', -1, 'Maximum number of timestamps to prediction.') -flags.DEFINE_float('l1_decay', -1.0, 'L1 Regularization') -flags.DEFINE_float('lr_decay', -1.0, 'Learning rate decay.') -flags.DEFINE_integer('lr_decay_epoch', -1, 'The epoch that starting decaying the parameter.') -flags.DEFINE_integer('lr_decay_interval', -1, 'Interval beteween each deacy.') -flags.DEFINE_float('learning_rate', -1, 'Learning rate. -1: select by hyperopt tuning.') -flags.DEFINE_string('log_dir', None, 'Log directory for restoring the model from a checkpoint.') -flags.DEFINE_string('loss_func', None, 'MSE/MAPE/RMSE_MAPE: loss function.') -flags.DEFINE_float('min_learning_rate', -1, 'Minimum learning rate') -flags.DEFINE_integer('nb_weeks', 17, 'How many week\'s data should be used for train/test.') -flags.DEFINE_integer('patience', -1, - 'Maximum number of epochs allowed for non-improving validation error before early stopping.') -flags.DEFINE_integer('seq_len', -1, 'Sequence length.') -flags.DEFINE_integer('test_every_n_epochs', -1, 'Run model on the testing dataset every n epochs.') -flags.DEFINE_string('traffic_df_filename', 'data/df_highway_2012_4mon_sample.h5', - 'Path to hdf5 pandas.DataFrame.') -flags.DEFINE_bool('use_cpu_only', False, 'Set to true to only use cpu.') -flags.DEFINE_bool('use_curriculum_learning', None, 'Set to true to use Curriculum learning in decoding stage.') -flags.DEFINE_integer('verbose', -1, '1: to log individual sensor information.') - - -def main(): - # Reads graph data. - with open(FLAGS.config_filename) as f: +def main(args): + with open(args.config_filename) as f: supervisor_config = yaml.load(f) - logger = log_helper.get_logger(supervisor_config.get('base_dir'), 'info.log') - logger.info('Loading graph from: ' + FLAGS.graph_pkl_filename) - sensor_ids, sensor_id_to_ind, adj_mx = load_graph_data(FLAGS.graph_pkl_filename) - adj_mx[adj_mx < 0.1] = 0 - logger.info('Loading traffic data from: ' + FLAGS.traffic_df_filename) - traffic_df_filename = FLAGS.traffic_df_filename - traffic_reading_df = pd.read_hdf(traffic_df_filename) - traffic_reading_df = traffic_reading_df.ix[:, sensor_ids] - supervisor_config['use_cpu_only'] = FLAGS.use_cpu_only - if FLAGS.log_dir: - supervisor_config['log_dir'] = FLAGS.log_dir - if FLAGS.use_curriculum_learning is not None: - supervisor_config['use_curriculum_learning'] = FLAGS.use_curriculum_learning - if FLAGS.loss_func: - supervisor_config['loss_func'] = FLAGS.loss_func - if FLAGS.filter_type: - supervisor_config['filter_type'] = FLAGS.filter_type - # Overwrites space with specified parameters. - for name in ['batch_size', 'cl_decay_steps', 'epochs', 'horizon', 'learning_rate', 'l1_decay', - 'lr_decay', 'lr_decay_epoch', 'lr_decay_interval', 'learning_rate', 'min_learning_rate', - 'patience', 'seq_len', 'test_every_n_epochs', 'verbose']: - if getattr(FLAGS, name) >= 0: - supervisor_config[name] = getattr(FLAGS, name) + + graph_pkl_filename = supervisor_config['data'].get('graph_pkl_filename') + sensor_ids, sensor_id_to_ind, adj_mx = load_graph_data(graph_pkl_filename) tf_config = tf.ConfigProto() - if FLAGS.use_cpu_only: + if args.use_cpu_only: tf_config = tf.ConfigProto(device_count={'GPU': 0}) tf_config.gpu_options.allow_growth = True with tf.Session(config=tf_config) as sess: - supervisor = DCRNNSupervisor(traffic_reading_df=traffic_reading_df, adj_mx=adj_mx, - config=supervisor_config) + supervisor = DCRNNSupervisor(adj_mx=adj_mx, **supervisor_config) supervisor.train(sess=sess) if __name__ == '__main__': - main() + parser = argparse.ArgumentParser() + parser.add_argument('--config_filename', default=None, type=str, + help='Configuration filename for restoring the model.') + parser.add_argument('--use_cpu_only', default=False, type=bool, help='Set to true to only use cpu.') + args = parser.parse_args() + main(args) diff --git a/lib/dcrnn_utils.py b/lib/dcrnn_utils.py deleted file mode 100644 index ee4a34e..0000000 --- a/lib/dcrnn_utils.py +++ /dev/null @@ -1,56 +0,0 @@ -import pickle -import numpy as np -import scipy.sparse as sp - -from scipy.sparse import linalg -from lib.tf_utils import sparse_matrix_to_tf_sparse_tensor -from lib.utils import load_pickle - - -def load_graph_data(pkl_filename): - sensor_ids, sensor_id_to_ind, adj_mx = load_pickle(pkl_filename) - return sensor_ids, sensor_id_to_ind, adj_mx - - -def calculate_normalized_laplacian(adj): - """ - # L = D^-1/2 (D-A) D^-1/2 = I - D^-1/2 A D^-1/2 - # D = diag(A 1) - :param adj: - :return: - """ - adj = sp.coo_matrix(adj) - d = np.array(adj.sum(1)) - d_inv_sqrt = np.power(d, -0.5).flatten() - d_inv_sqrt[np.isinf(d_inv_sqrt)] = 0. - d_mat_inv_sqrt = sp.diags(d_inv_sqrt) - normalized_laplacian = sp.eye(adj.shape[0]) - adj.dot(d_mat_inv_sqrt).transpose().dot(d_mat_inv_sqrt).tocoo() - return normalized_laplacian - - -def calculate_random_walk_matrix(adj_mx): - adj_mx = sp.coo_matrix(adj_mx) - d = np.array(adj_mx.sum(1)) - d_inv = np.power(d, -1).flatten() - d_inv[np.isinf(d_inv)] = 0. - d_mat_inv = sp.diags(d_inv) - random_walk_mx = d_mat_inv.dot(adj_mx).tocoo() - return random_walk_mx - - -def calculate_reverse_random_walk_matrix(adj_mx): - return calculate_random_walk_matrix(np.transpose(adj_mx)) - - -def calculate_scaled_laplacian(adj_mx, lambda_max=2, undirected=True): - if undirected: - adj_mx = np.maximum.reduce([adj_mx, adj_mx.T]) - L = calculate_normalized_laplacian(adj_mx) - if lambda_max is None: - lambda_max, _ = linalg.eigsh(L, 1, which='LM') - lambda_max = lambda_max[0] - L = sp.csr_matrix(L) - M, _ = L.shape - I = sp.identity(M, format='csr', dtype=L.dtype) - L = (2 / lambda_max * L) - I - return L.astype(np.float32) diff --git a/lib/log_helper.py b/lib/log_helper.py index 586ea3d..52fbe81 100644 --- a/lib/log_helper.py +++ b/lib/log_helper.py @@ -24,3 +24,22 @@ def get_logger(log_dir, name): # Add google cloud log handler logger.info('Log directory: %s', log_dir) return logger + + +def config_logging(log_dir, log_filename='info.log', level=logging.INFO): + # Add file handler and stdout handler + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + # Create the log directory if necessary. + try: + os.makedirs(log_dir) + except OSError: + pass + file_handler = logging.FileHandler(os.path.join(log_dir, log_filename)) + file_handler.setFormatter(formatter) + file_handler.setLevel(level=level) + # Add console handler. + console_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setFormatter(console_formatter) + console_handler.setLevel(level=level) + logging.basicConfig(handlers=[file_handler, console_handler], level=level) diff --git a/lib/tf_utils.py b/lib/tf_utils.py deleted file mode 100644 index 0232c2b..0000000 --- a/lib/tf_utils.py +++ /dev/null @@ -1,97 +0,0 @@ -from __future__ import absolute_import, division, print_function - -import numpy as np -import scipy.sparse as sp -import tensorflow as tf - - -def add_simple_summary(writer, names, values, global_step): - """ - Writes summary for a list of scalars. - :param writer: - :param names: - :param values: - :param global_step: - :return: - """ - for name, value in zip(names, values): - summary = tf.Summary() - summary_value = summary.value.add() - summary_value.simple_value = value - summary_value.tag = name - writer.add_summary(summary, global_step) - - -def adj_tensor_dot(adj, y): - """ Computes the matrix multiplication for the adjacency matrix and the 3D dense matrix y. - :param adj: square matrix with shape(n_node, n_node) - :param y: 3D tensor, with shape (batch_size, n_node, output_dim) - """ - y_shape = [i.value for i in y.shape] - if len(y_shape) != 3: - raise Exception('Dimension of y must be 3, instead of: %d' % len(y_shape)) - - y_permute_dim = list(range(len(y_shape))) - y_permute_dim = [y_permute_dim.pop(-2)] + y_permute_dim - yt = tf.reshape(tf.transpose(y, perm=y_permute_dim), [y_shape[-2], -1]) - if isinstance(adj, tf.SparseTensor): - res = tf.sparse_tensor_dense_matmul(adj, yt) - else: - res = tf.matmul(adj, yt) - res = tf.reshape(res, [y_shape[-2], -1, y_shape[-1]]) - res = tf.transpose(res, perm=[1, 0, 2]) - return res - - -def dot(x, y): - """ - Wrapper for tf.matmul for x with rank >= 2. - :param x: matrix with rank >=2 - :param y: matrix with rank==2 - :return: - """ - [input_dim, output_dim] = y.get_shape().as_list() - - input_shape = tf.shape(x) - batch_rank = input_shape.get_shape()[0].value - 1 - batch_shape = input_shape[:batch_rank] - output_shape = tf.concat(0, [batch_shape, [output_dim]]) - - x = tf.reshape(x, [-1, input_dim]) - result_ = tf.matmul(x, y) - - result = tf.reshape(result_, output_shape) - - return result - - -def get_total_trainable_parameter_size(): - """ - Calculates the total number of trainable parameters in the current graph. - :return: - """ - total_parameters = 0 - for variable in tf.trainable_variables(): - # shape is an array of tf.Dimension - total_parameters += np.product([x.value for x in variable.get_shape()]) - return total_parameters - - -def sparse_matrix_to_tf_sparse_tensor(sparse_mx): - """Converts sparse matrix to tuple representation as required by tf.SparseTensor""" - - def to_tuple(mx): - if not sp.isspmatrix_coo(mx): - mx = mx.tocoo() - indices = np.vstack((mx.row, mx.col)).transpose() - values = mx.data - shape = mx.shape - return indices, values, shape - - if isinstance(sparse_mx, list): - for i in range(len(sparse_mx)): - sparse_mx[i] = to_tuple(sparse_mx[i]) - else: - sparse_mx = to_tuple(sparse_mx) - - return sparse_mx diff --git a/lib/tf_utils_test.py b/lib/tf_utils_test.py deleted file mode 100644 index 66df9d6..0000000 --- a/lib/tf_utils_test.py +++ /dev/null @@ -1,28 +0,0 @@ -import unittest - -import numpy as np -import tensorflow as tf - -from lib import tf_utils - - -class TensorDotTest(unittest.TestCase): - def test_adj_tensor_dot(self): - # adj: [[1, 0], [0, 1]] - # SparseTensor(indices=[[0, 0], [1, 2]], values=[1, 2], dense_shape=[3, 4]) - adj_indices = [[0, 0], [1, 1]] - adj_values = np.array([1, 1], dtype=np.float32) - adj_shape = [2, 2] - adj = tf.SparseTensor(adj_indices, adj_values, adj_shape) - # y: (2, 2, 2), [[[1, 0], [0, 1]], [[1, 1], [1, 1]]] - y = np.array([[[1, 0], [0, 1]], [[1, 1], [1, 1]]], dtype=np.float32) - y = tf.constant(y) - expected_result = np.array([[[1, 0], [0, 1]], [[1, 1], [1, 1]]], dtype=np.float32) - result = tf_utils.adj_tensor_dot(adj, y) - with tf.Session() as sess: - result_ = sess.run(result) - self.assertTrue(np.array_equal(expected_result, result_)) - - -if __name__ == '__main__': - unittest.main() diff --git a/lib/utils.py b/lib/utils.py index 20eb52f..d5c5fef 100644 --- a/lib/utils.py +++ b/lib/utils.py @@ -1,7 +1,49 @@ -import datetime import numpy as np -import pandas as pd import pickle +import scipy.sparse as sp +import tensorflow as tf + +from scipy.sparse import linalg + + +class DataLoader(object): + def __init__(self, xs, ys, batch_size, pad_with_last_sample=True, shuffle=False): + """ + + :param xs: + :param ys: + :param batch_size: + :param pad_with_last_sample: pad with the last sample to make number of samples divisible to batch_size. + """ + self.batch_size = batch_size + self.current_ind = 0 + if pad_with_last_sample: + num_padding = (batch_size - (len(xs) % batch_size)) % batch_size + x_padding = np.repeat(xs[-1:], num_padding, axis=0) + y_padding = np.repeat(ys[-1:], num_padding, axis=0) + xs = np.concatenate([xs, x_padding], axis=0) + ys = np.concatenate([ys, y_padding], axis=0) + self.size = len(xs) + self.num_batch = int(self.size // self.batch_size) + if shuffle: + permutation = np.random.permutation(self.size) + xs, ys = xs[permutation], ys[permutation] + self.xs = xs + self.ys = ys + + def get_iterator(self): + self.current_ind = 0 + + def _wrapper(): + while self.current_ind < self.num_batch: + start_ind = self.batch_size * self.current_ind + end_ind = min(self.size, self.batch_size * (self.current_ind + 1)) + x_i = self.xs[start_ind: end_ind, ...] + y_i = self.ys[start_ind: end_ind, ...] + yield (x_i, y_i) + self.current_ind += 1 + + return _wrapper() class StandardScaler: @@ -20,193 +62,82 @@ class StandardScaler: return (data * self.std) + self.mean -def get_rush_hours_bool_index(df, hours=((7, 10), (17, 20)), weekdays=(0, 5)): +def add_simple_summary(writer, names, values, global_step): """ - Calculates predator of rush hours: 7:00am - 9:59am, 4:00pm-7:59am, Mon-Fri. - :param df: - :param hours: a tuple of two, (start_hour, end_hour) - :param weekdays: a tuple of two, (start_weekday, end_weekday) - """ - # Week day. - weekday_predate = (df.index.dayofweek >= weekdays[0]) & (df.index.dayofweek < weekdays[1]) - # Hours. - hour_predate = (df.index.time >= datetime.time(hours[0][0], 0)) & (df.index.time < datetime.time(hours[0][1], 0)) - hour_predate |= (df.index.time >= datetime.time(hours[1][0], 0)) & (df.index.time < datetime.time(hours[1][1], 0)) - - return weekday_predate & hour_predate - - -def generate_io_data(data, seq_len, horizon=1, scaler=None): - """ - Generates input, output data which are - Args: - :param data: tensor - :param seq_len: length of the sequence, or timesteps. - :param horizon: the horizon of prediction. - :param strides: - :param scaler: - :return (X, Y) i.e., input, output - """ - xs, ys = [], [] - total_seq_len, _ = data.shape - assert np.ndim(data) == 2 - if scaler: - data = scaler.transform(data) - for i in range(0, total_seq_len - horizon - seq_len + 1): - x_i = data[i: i + seq_len, :] - y_i = data[i + seq_len + horizon - 1, :] - xs.append(x_i) - ys.append(y_i) - xs = np.stack(xs, axis=0) - ys = np.stack(ys, axis=0) - return xs, ys - - -def generate_io_data_with_time(df, batch_size, seq_len, horizon, output_type='point', scaler=None, - add_time_in_day=True, add_day_in_week=False): - """ - - :param df: - :param batch_size: - :param seq_len: - :param horizon: - :param output_type: point, range, seq2seq - :param scaler: - :param add_time_in_day: - :param add_day_in_week: + Writes summary for a list of scalars. + :param writer: + :param names: + :param values: + :param global_step: :return: - x, y, both are 3-D tensors with size (epoch_size, batch_size, input_dim). """ - if scaler: - df = scaler.transform(df) - num_samples, num_nodes = df.shape - data = df.values - batch_len = num_samples // batch_size - data_list = [data] - if add_time_in_day: - time_ind = (df.index.values - df.index.values.astype('datetime64[D]')) / np.timedelta64(1, 'D') - data_list.append(time_ind.reshape(-1, 1)) - if add_day_in_week: - day_in_week = np.zeros(shape=(num_samples, 7)) - day_in_week[np.arange(num_samples), df.index.dayofweek] = 1 - data_list.append(day_in_week) - - data = np.concatenate(data_list, axis=-1) - data = data[:batch_size * batch_len, :].reshape((batch_size, batch_len, -1)) - xs, ys = [], [] - for i in range(seq_len, batch_len - horizon + 1): - x_i, y_i = None, None - if output_type == 'point': - x_i = data[:, i - seq_len: i, :].reshape((batch_size, -1)) - y_i = data[:, i + horizon - 1, :num_nodes].reshape((batch_size, -1)) - elif output_type == 'range': - x_i = data[:, i - seq_len: i, :].reshape((batch_size, -1)) - y_i = data[:, i: i + horizon, :num_nodes].reshape((batch_size, -1)) - elif output_type == 'seq2seq': - x_i = data[:, i - seq_len: i, :] - y_i = data[:, i: i + horizon, :] - xs.append(x_i) - ys.append(y_i) - xs = np.stack(xs, axis=0) - ys = np.stack(ys, axis=0) - return xs, ys + for name, value in zip(names, values): + summary = tf.Summary() + summary_value = summary.value.add() + summary_value.simple_value = value + summary_value.tag = name + writer.add_summary(summary, global_step) -def generate_graph_seq2seq_io_data_with_time(df, batch_size, seq_len, horizon, num_nodes, scaler=None, - add_time_in_day=True, add_day_in_week=False): +def calculate_normalized_laplacian(adj): """ - - :param df: - :param batch_size: - :param seq_len: - :param horizon: - :param scaler: - :param add_day_in_week: - :return: - x, y, both are 5-D tensors with size (epoch_size, batch_size, seq_len, num_sensors, input_dim). - Adjacent batches are continuous sequence, i.e., x[i, j, :, :] is before x[i+1, j, :, :] - """ - if scaler: - df = scaler.transform(df) - num_samples, _ = df.shape - data = df.values - batch_len = num_samples // batch_size - data = np.expand_dims(data, axis=-1) - data_list = [data] - if add_time_in_day: - time_ind = (df.index.values - df.index.values.astype('datetime64[D]')) / np.timedelta64(1, 'D') - time_in_day = np.tile(time_ind, [1, num_nodes, 1]).transpose((2, 1, 0)) - data_list.append(time_in_day) - if add_day_in_week: - day_in_week = np.zeros(shape=(num_samples, num_nodes, 7)) - day_in_week[np.arange(num_samples), :, df.index.dayofweek] = 1 - data_list.append(day_in_week) - - data = np.concatenate(data_list, axis=-1) - data = data[:batch_size * batch_len, :, :].reshape((batch_size, batch_len, num_nodes, -1)) - epoch_size = batch_len - seq_len - horizon + 1 - x, y = [], [] - for i in range(epoch_size): - x_i = data[:, i: i + seq_len, ...] - y_i = data[:, i + seq_len: i + seq_len + horizon, :, :] - x.append(x_i) - y.append(y_i) - x = np.stack(x, axis=0) - y = np.stack(y, axis=0) - return x, y - - -def generate_graph_seq2seq_io_data_with_time2(df, batch_size, seq_len, horizon, num_nodes, scaler=None, - add_time_in_day=True, add_day_in_week=False): - """ - - :param df: - :param batch_size: - :param seq_len: - :param horizon: - :param scaler: - :param add_time_in_day: - :param add_day_in_week: + # L = D^-1/2 (D-A) D^-1/2 = I - D^-1/2 A D^-1/2 + # D = diag(A 1) + :param adj: :return: - x, y, both are 5-D tensors with size (epoch_size, batch_size, seq_len, num_sensors, input_dim). - Adjacent batches are continuous sequence, i.e., x[i, j, :, :] is before x[i+1, j, :, :] """ - if scaler: - df = scaler.transform(df) - num_samples, _ = df.shape - assert df.shape[1] == num_nodes - data = df.values - data = np.expand_dims(data, axis=-1) - data_list = [data] - if add_time_in_day: - time_ind = (df.index.values - df.index.values.astype('datetime64[D]')) / np.timedelta64(1, 'D') - time_in_day = np.tile(time_ind, [1, num_nodes, 1]).transpose((2, 1, 0)) - data_list.append(time_in_day) - if add_day_in_week: - day_in_week = np.zeros(shape=(num_samples, num_nodes, 7)) - day_in_week[np.arange(num_samples), :, df.index.dayofweek] = 1 - data_list.append(day_in_week) + adj = sp.coo_matrix(adj) + d = np.array(adj.sum(1)) + d_inv_sqrt = np.power(d, -0.5).flatten() + d_inv_sqrt[np.isinf(d_inv_sqrt)] = 0. + d_mat_inv_sqrt = sp.diags(d_inv_sqrt) + normalized_laplacian = sp.eye(adj.shape[0]) - adj.dot(d_mat_inv_sqrt).transpose().dot(d_mat_inv_sqrt).tocoo() + return normalized_laplacian - # data: (num_samples, num_nodes, num_features) - data = np.concatenate(data_list, axis=-1) - num_features = data.shape[-1] - # Extract x and y - epoch_size = num_samples - seq_len - horizon + 1 - x, y = [], [] - for i in range(epoch_size): - x_i = data[i: i + seq_len, ...] - y_i = data[i + seq_len: i + seq_len + horizon, ...] - x.append(x_i) - y.append(y_i) - x = np.stack(x, axis=0) - y = np.stack(y, axis=0) - epoch_size //= batch_size - x = x[:batch_size * epoch_size, ...] - y = y[:batch_size * epoch_size, ...] - x = x.reshape(epoch_size, batch_size, seq_len, num_nodes, num_features) - y = y.reshape(epoch_size, batch_size, horizon, num_nodes, num_features) - return x, y +def calculate_random_walk_matrix(adj_mx): + adj_mx = sp.coo_matrix(adj_mx) + d = np.array(adj_mx.sum(1)) + d_inv = np.power(d, -1).flatten() + d_inv[np.isinf(d_inv)] = 0. + d_mat_inv = sp.diags(d_inv) + random_walk_mx = d_mat_inv.dot(adj_mx).tocoo() + return random_walk_mx + + +def calculate_reverse_random_walk_matrix(adj_mx): + return calculate_random_walk_matrix(np.transpose(adj_mx)) + + +def calculate_scaled_laplacian(adj_mx, lambda_max=2, undirected=True): + if undirected: + adj_mx = np.maximum.reduce([adj_mx, adj_mx.T]) + L = calculate_normalized_laplacian(adj_mx) + if lambda_max is None: + lambda_max, _ = linalg.eigsh(L, 1, which='LM') + lambda_max = lambda_max[0] + L = sp.csr_matrix(L) + M, _ = L.shape + I = sp.identity(M, format='csr', dtype=L.dtype) + L = (2 / lambda_max * L) - I + return L.astype(np.float32) + + +def get_total_trainable_parameter_size(): + """ + Calculates the total number of trainable parameters in the current graph. + :return: + """ + total_parameters = 0 + for variable in tf.trainable_variables(): + # shape is an array of tf.Dimension + total_parameters += np.product([x.value for x in variable.get_shape()]) + return total_parameters + + +def load_graph_data(pkl_filename): + sensor_ids, sensor_id_to_ind, adj_mx = load_pickle(pkl_filename) + return sensor_ids, sensor_id_to_ind, adj_mx def load_pickle(pickle_file): @@ -220,69 +151,3 @@ def load_pickle(pickle_file): print('Unable to load data ', pickle_file, ':', e) raise return pickle_data - - -def round_down(num, divisor): - return num - (num % divisor) - - -def separate_seasonal_trend_and_residual(df, period, test_ratio=0.2, null_val=0., epsilon=1e-4): - """ - - :param df: - :param period: - :param test_ratio: only use training part to calculate the average. - :param null_val: indicator of missing values. Assuming null_val - :param epsilon: - :return: - """ - n_sample, n_sensor = df.shape - n_test = int(round(n_sample * test_ratio)) - n_train = n_sample - n_test - seasonal_trend = np.zeros((period, n_sensor), dtype=np.float32) - for i in range(period): - inds = [j for j in range(i % period, n_train, period)] - historical = df.iloc[inds, :] - seasonal_trend[i, :] = historical[historical != null_val].mean() - n_repeat = (n_sample + period - 1) // period - data = np.tile(seasonal_trend, [n_repeat, 1])[:n_sample, :] - seasonal_df = pd.DataFrame(data, index=df.index, columns=df.columns) - # Records where null value is happening. - - missing_ind = df == null_val - residual_df = df - seasonal_df - residual_df[residual_df == null_val] += epsilon - residual_df[missing_ind] = null_val - return seasonal_df, residual_df - - -def train_test_split(x, y, test_ratio=0.2, random=False, granularity=1): - """ - This just splits data to training and testing parts. Default 80% train, 20% test - Format : data is in compressed sparse row format - - Args: - :param x data - :param y label - :param test_ratio: - :param random: whether to randomize the input data. - :param granularity: - - """ - perms = np.arange(0, x.shape[0]) - if random: - perms = np.random.permutation(np.arange(0, x.shape[0])) - n_train = round_down(int(round(x.shape[0] * (1 - test_ratio))), granularity) - n_test = round_down(x.shape[0] - n_train, granularity) - x_train, y_train = x.take(perms[:n_train], axis=0), y.take(perms[:n_train], axis=0) - x_test, y_test = x.take(perms[n_train:n_train + n_test], axis=0), y.take(perms[n_train:n_train + n_test], axis=0) - return (x_train, y_train), (x_test, y_test) - - -def train_val_test_split_df(df, val_ratio=0.1, test_ratio=0.2): - n_sample, _ = df.shape - n_val = int(round(n_sample * val_ratio)) - n_test = int(round(n_sample * test_ratio)) - n_train = n_sample - n_val - n_test - train_data, val_data, test_data = df.iloc[:n_train, :], df.iloc[n_train: n_train + n_val, :], df.iloc[-n_test:, :] - return train_data, val_data, test_data diff --git a/lib/utils_test.py b/lib/utils_test.py deleted file mode 100644 index b97b733..0000000 --- a/lib/utils_test.py +++ /dev/null @@ -1,122 +0,0 @@ -import unittest - -import numpy as np -import pandas as pd - -from lib import utils -from lib.utils import StandardScaler - - -class MyTestCase(unittest.TestCase): - def test_separate_seasonal_trend_and_residual(self): - data = np.array([ - [2, 1, 2, 3, 0, 1, 2, 1, 2, 3, 4, 3, 0, 3, 4, 1] - ], dtype=np.float32).T - trends = np.array([ - [1, 2, 3, 2, 1, 2, 3, 2, 1, 2, 3, 2, 1, 2, 3, 2] - ], dtype=np.float32).T - residual = np.array([ - [1, -1, -1, 1, -1, -1, -1, -1, 1, 1, 1, 1, -1, 1, 1, -1] - ], dtype=np.float32).T - df = pd.DataFrame(data) - df_trend, df_residual = utils.separate_seasonal_trend_and_residual(df, period=4, test_ratio=0, null_val=-1) - self.assertTrue(np.array_equal(df_trend.values, trends)) - self.assertTrue(np.array_equal(df_residual.values, residual)) - - def test_get_rush_hours_bool_index(self): - index = pd.date_range('2017-02-27', '2017-03-06', freq='1min') - data = np.zeros((len(index), 3)) - df = pd.DataFrame(data, index=index) - ind = utils.get_rush_hours_bool_index(df) - df = df[ind] - self.assertEqual(6 * 5 * 60, df.shape[0]) - - -class IODataPreparationTest(unittest.TestCase): - from lib import utils - def test_generate_io_data_with_time(self): - data = np.array([ - [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], - [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16] - ], dtype=np.float32).T - df = pd.DataFrame(data, index=pd.date_range('2017-10-18', '2017-10-19 23:59', freq='3h')) - xs, ys = utils.generate_io_data_with_time(df, batch_size=2, seq_len=3, horizon=3, output_type='range', ) - self.assertTupleEqual(xs.shape, (3, 2, 9)) - self.assertTupleEqual(ys.shape, (3, 2, 6)) - - def test_generate_graph_seq2seq_io_data_with_time(self): - data = np.array([ - [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], - [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16] - ], dtype=np.float32).T - df = pd.DataFrame(data, index=pd.date_range('2017-10-18', '2017-10-19 23:59', freq='3h')) - xs, ys = utils.generate_graph_seq2seq_io_data_with_time2(df, batch_size=2, seq_len=3, horizon=3, num_nodes=2) - self.assertTupleEqual(xs.shape, (5, 2, 3, 2, 2)) - self.assertTupleEqual(ys.shape, (5, 2, 3, 2, 2)) - - -class StandardScalerTest(unittest.TestCase): - def test_transform(self): - data = np.array([ - [35., 0.], - [0., 17.5], - [70., 35.]] - ) - expected_result = np.array([ - [0., -1.], - [-1, -0.5], - [1., 0.]] - ) - scaler = StandardScaler(mean=35., std=35.) - result = scaler.transform(data) - self.assertTrue(np.array_equal(expected_result, result)) - - def test_transform_df(self): - df = pd.DataFrame([ - [35., 0.], - [0., 17.5], - [70., 35.]] - ) - expected_result = np.array([ - [0., -1.], - [-1, -0.5], - [1., 0.]] - ) - scaler = StandardScaler(mean=35., std=35.) - result = scaler.transform(df) - - self.assertTrue(np.array_equal(expected_result, result.values)) - - def test_reverse_transform(self): - data = np.array([ - [0., -1.], - [-1, -0.5], - [1., 0.]] - ) - expected_result = np.array([ - [35., 0.], - [0., 17.5], - [70., 35.]] - ) - scaler = StandardScaler(mean=35., std=35.) - result = scaler.inverse_transform(data) - self.assertTrue(np.array_equal(expected_result, result)) - - def test_reverse_transform_df(self): - df = pd.DataFrame([ - [0., -1.], - [-1, -0.5], - [1., 0.]] - ) - expected_result = np.array([ - [35., 0.], - [0., 17.5], - [70., 35.]] - ) - scaler = StandardScaler(mean=35., std=35.) - result = scaler.inverse_transform(df) - self.assertTrue(np.array_equal(expected_result, result.values)) - - -if __name__ == '__main__': - unittest.main() diff --git a/model/dcrnn_cell.py b/model/dcrnn_cell.py index 7657f2a..49208d0 100644 --- a/model/dcrnn_cell.py +++ b/model/dcrnn_cell.py @@ -7,9 +7,7 @@ import tensorflow as tf from tensorflow.contrib.rnn import RNNCell -from tensorflow.python.platform import tf_logging as logging - -from lib import dcrnn_utils +from lib import utils class DCGRUCell(RNNCell): @@ -19,11 +17,11 @@ class DCGRUCell(RNNCell): def call(self, inputs, **kwargs): pass - def _compute_output_shape(self, input_shape): + def compute_output_shape(self, input_shape): pass - def __init__(self, num_units, adj_mx, max_diffusion_step, num_nodes, input_size=None, num_proj=None, - activation=tf.nn.tanh, reuse=None, filter_type="laplacian"): + def __init__(self, num_units, adj_mx, max_diffusion_step, num_nodes, num_proj=None, + activation=tf.nn.tanh, reuse=None, filter_type="laplacian", use_gc_for_ru=True): """ :param num_units: @@ -35,26 +33,26 @@ class DCGRUCell(RNNCell): :param activation: :param reuse: :param filter_type: "laplacian", "random_walk", "dual_random_walk". + :param use_gc_for_ru: whether to use Graph convolution to calculate the reset and update gates. """ super(DCGRUCell, self).__init__(_reuse=reuse) - if input_size is not None: - logging.warn("%s: The input_size parameter is deprecated.", self) self._activation = activation self._num_nodes = num_nodes self._num_proj = num_proj self._num_units = num_units self._max_diffusion_step = max_diffusion_step self._supports = [] + self._use_gc_for_ru = use_gc_for_ru supports = [] if filter_type == "laplacian": - supports.append(dcrnn_utils.calculate_scaled_laplacian(adj_mx, lambda_max=None)) + supports.append(utils.calculate_scaled_laplacian(adj_mx, lambda_max=None)) elif filter_type == "random_walk": - supports.append(dcrnn_utils.calculate_random_walk_matrix(adj_mx).T) + supports.append(utils.calculate_random_walk_matrix(adj_mx).T) elif filter_type == "dual_random_walk": - supports.append(dcrnn_utils.calculate_random_walk_matrix(adj_mx).T) - supports.append(dcrnn_utils.calculate_random_walk_matrix(adj_mx.T).T) + supports.append(utils.calculate_random_walk_matrix(adj_mx).T) + supports.append(utils.calculate_random_walk_matrix(adj_mx.T).T) else: - supports.append(dcrnn_utils.calculate_scaled_laplacian(adj_mx)) + supports.append(utils.calculate_scaled_laplacian(adj_mx)) for support in supports: self._supports.append(self._build_sparse_matrix(support)) @@ -87,13 +85,19 @@ class DCGRUCell(RNNCell): """ with tf.variable_scope(scope or "dcgru_cell"): with tf.variable_scope("gates"): # Reset gate and update gate. + output_size = 2 * self._num_units # We start with bias of 1.0 to not reset and not update. - value = tf.nn.sigmoid( - self._gconv(inputs, state, 2 * self._num_units, bias_start=1.0, scope=scope)) - r, u = tf.split(value=value, num_or_size_splits=2, axis=1) - # r, u = sigmoid(r), sigmoid(u) + if self._use_gc_for_ru: + fn = self._gconv + else: + fn = self._fc + value = tf.nn.sigmoid(fn(inputs, state, output_size, bias_start=1.0)) + value = tf.reshape(value, (-1, self._num_nodes, output_size)) + r, u = tf.split(value=value, num_or_size_splits=2, axis=-1) + r = tf.reshape(r, (-1, self._num_nodes * self._num_units)) + u = tf.reshape(u, (-1, self._num_nodes * self._num_units)) with tf.variable_scope("candidate"): - c = self._gconv(inputs, r * state, self._num_units, scope=scope) + c = self._gconv(inputs, r * state, self._num_units) if self._activation is not None: c = self._activation(c) output = new_state = u * state + (1 - u) * c @@ -110,7 +114,23 @@ class DCGRUCell(RNNCell): x_ = tf.expand_dims(x_, 0) return tf.concat([x, x_], axis=0) - def _gconv(self, inputs, state, output_size, bias_start=0.0, scope=None): + def _fc(self, inputs, state, output_size, bias_start=0.0): + dtype = inputs.dtype + batch_size = inputs.get_shape()[0].value + inputs = tf.reshape(inputs, (batch_size * self._num_nodes, -1)) + state = tf.reshape(state, (batch_size * self._num_nodes, -1)) + inputs_and_state = tf.concat([inputs, state], axis=-1) + input_size = inputs_and_state.get_shape()[-1].value + weights = tf.get_variable( + 'weights', [input_size, output_size], dtype=dtype, + initializer=tf.contrib.layers.xavier_initializer()) + value = tf.nn.sigmoid(tf.matmul(inputs_and_state, weights)) + biases = tf.get_variable("biases", [output_size], dtype=dtype, + initializer=tf.constant_initializer(bias_start, dtype=dtype)) + value = tf.nn.bias_add(value, biases) + return value + + def _gconv(self, inputs, state, output_size, bias_start=0.0): """Graph convolution between input and the graph matrix. :param args: a 2D Tensor or a list of 2D, batch x n, Tensors. @@ -157,10 +177,8 @@ class DCGRUCell(RNNCell): initializer=tf.contrib.layers.xavier_initializer()) x = tf.matmul(x, weights) # (batch_size * self._num_nodes, output_size) - biases = tf.get_variable( - "biases", [output_size], - dtype=dtype, - initializer=tf.constant_initializer(bias_start, dtype=dtype)) + biases = tf.get_variable("biases", [output_size], dtype=dtype, + initializer=tf.constant_initializer(bias_start, dtype=dtype)) x = tf.nn.bias_add(x, biases) # Reshape res back to 2D: (batch_size, num_node, state_dim) -> (batch_size, num_node * state_dim) return tf.reshape(x, [batch_size, self._num_nodes * output_size]) diff --git a/model/dcrnn_model.py b/model/dcrnn_model.py index e0820f2..deef895 100644 --- a/model/dcrnn_model.py +++ b/model/dcrnn_model.py @@ -2,34 +2,45 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import numpy as np import tensorflow as tf from tensorflow.contrib import legacy_seq2seq from lib.metrics import masked_mse_loss, masked_mae_loss, masked_rmse_loss from model.dcrnn_cell import DCGRUCell -from model.tf_model import TFModel -class DCRNNModel(TFModel): - def __init__(self, is_training, config, scaler=None, adj_mx=None): - super(DCRNNModel, self).__init__(config, scaler=scaler) - batch_size = int(config.get('batch_size')) - max_diffusion_step = int(config.get('max_diffusion_step', 2)) - cl_decay_steps = int(config.get('cl_decay_steps', 1000)) - filter_type = config.get('filter_type', 'laplacian') - horizon = int(config.get('horizon', 1)) - input_dim = int(config.get('input_dim', 1)) - loss_func = config.get('loss_func', 'MSE') - max_grad_norm = float(config.get('max_grad_norm', 5.0)) - num_nodes = int(config.get('num_nodes', 1)) - num_rnn_layers = int(config.get('num_rnn_layers', 1)) - output_dim = int(config.get('output_dim', 1)) - rnn_units = int(config.get('rnn_units')) - seq_len = int(config.get('seq_len')) - use_curriculum_learning = bool(config.get('use_curriculum_learning', False)) +class DCRNNModel(object): + def __init__(self, is_training, batch_size, scaler, adj_mx, **model_kwargs): + # Scaler for data normalization. + self._scaler = scaler + + # Train and loss + self._loss = None + self._mae = None + self._train_op = None + + # Learning rate. + self._lr = tf.get_variable('learning_rate', shape=(), initializer=tf.constant_initializer(0.01), + trainable=False) + self._new_lr = tf.placeholder(tf.float32, shape=(), name='new_learning_rate') + self._lr_update = tf.assign(self._lr, self._new_lr, name='lr_update') + + max_diffusion_step = int(model_kwargs.get('max_diffusion_step', 2)) + cl_decay_steps = int(model_kwargs.get('cl_decay_steps', 1000)) + filter_type = model_kwargs.get('filter_type', 'laplacian') + horizon = int(model_kwargs.get('horizon', 1)) + max_grad_norm = float(model_kwargs.get('max_grad_norm', 5.0)) + num_nodes = int(model_kwargs.get('num_nodes', 1)) + num_rnn_layers = int(model_kwargs.get('num_rnn_layers', 1)) + rnn_units = int(model_kwargs.get('rnn_units')) + seq_len = int(model_kwargs.get('seq_len')) + use_curriculum_learning = bool(model_kwargs.get('use_curriculum_learning', False)) + input_dim = int(model_kwargs.get('input_dim', 1)) + output_dim = int(model_kwargs.get('output_dim', 1)) aux_dim = input_dim - output_dim - # assert input_dim == output_dim, 'input_dim: %d != output_dim: %d' % (input_dim, output_dim) + # Input (batch_size, timesteps, num_sensor, input_dim) self._inputs = tf.placeholder(tf.float32, shape=(batch_size, seq_len, num_nodes, input_dim), name='inputs') # Labels: (batch_size, timesteps, num_sensor, input_dim), same format with input except the temporal dimension. @@ -57,7 +68,7 @@ class DCRNNModel(TFModel): aux_info.insert(0, None) labels.insert(0, GO_SYMBOL) - def loop_function(prev, i): + def _loop_function(prev, i): if is_training: # Return either the model's prediction or the previous ground truth in training. if use_curriculum_learning: @@ -77,27 +88,18 @@ class DCRNNModel(TFModel): _, enc_state = tf.contrib.rnn.static_rnn(encoding_cells, inputs, dtype=tf.float32) outputs, final_state = legacy_seq2seq.rnn_decoder(labels, enc_state, decoding_cells, - loop_function=loop_function) + loop_function=_loop_function) # Project the output to output_dim. outputs = tf.stack(outputs[:-1], axis=1) self._outputs = tf.reshape(outputs, (batch_size, horizon, num_nodes, output_dim), name='outputs') - # preds = self._outputs[..., 0] preds = self._outputs labels = self._labels[..., :output_dim] - null_val = config.get('null_val', 0.) + null_val = 0. self._mae = masked_mae_loss(self._scaler, null_val)(preds=preds, labels=labels) - - if loss_func == 'MSE': - self._loss = masked_mse_loss(self._scaler, null_val)(preds=preds, labels=labels) - elif loss_func == 'MAE': - self._loss = masked_mae_loss(self._scaler, null_val)(preds=preds, labels=labels) - elif loss_func == 'RMSE': - self._loss = masked_rmse_loss(self._scaler, null_val)(preds=preds, labels=labels) - else: - self._loss = masked_mse_loss(self._scaler, null_val)(preds=preds, labels=labels) + self._loss = masked_mae_loss(self._scaler, null_val)(preds=preds, labels=labels) if is_training: optimizer = tf.train.AdamOptimizer(self._lr) tvars = tf.trainable_variables() @@ -116,3 +118,90 @@ class DCRNNModel(TFModel): :return: """ return tf.cast(k / (k + tf.exp(global_step / k)), tf.float32) + + @staticmethod + def run_epoch_generator(sess, model, data_generator, return_output=False, train_op=None, writer=None): + losses = [] + maes = [] + outputs = [] + + fetches = { + 'mae': model.mae, + 'loss': model.loss, + 'global_step': tf.train.get_or_create_global_step() + } + if train_op: + fetches.update({ + 'train_op': train_op, + }) + merged = model.merged + if merged is not None: + fetches.update({'merged': merged}) + + if return_output: + fetches.update({ + 'outputs': model.outputs + }) + + for _, (x, y) in enumerate(data_generator): + feed_dict = { + model.inputs: x, + model.labels: y, + } + + vals = sess.run(fetches, feed_dict=feed_dict) + + losses.append(vals['loss']) + maes.append(vals['mae']) + if writer is not None and 'merged' in vals: + writer.add_summary(vals['merged'], global_step=vals['global_step']) + if return_output: + outputs.append(vals['outputs']) + + results = { + 'loss': np.mean(losses), + 'mae': np.mean(maes) + } + if return_output: + results['outputs'] = outputs + return results + + def get_lr(self, sess): + return np.asscalar(sess.run(self._lr)) + + def set_lr(self, sess, lr): + sess.run(self._lr_update, feed_dict={ + self._new_lr: lr + }) + + @property + def inputs(self): + return self._inputs + + @property + def labels(self): + return self._labels + + @property + def loss(self): + return self._loss + + @property + def lr(self): + return self._lr + + @property + def mae(self): + return self._mae + + @property + def merged(self): + return self._merged + + @property + def outputs(self): + return self._outputs + + @property + def train_op(self): + return self._train_op diff --git a/model/dcrnn_supervisor.py b/model/dcrnn_supervisor.py index 7595269..03283c2 100644 --- a/model/dcrnn_supervisor.py +++ b/model/dcrnn_supervisor.py @@ -2,134 +2,250 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import time - +import logging import numpy as np -import pandas as pd +import os +import sys import tensorflow as tf +import time +import yaml + +from lib import utils, log_helper, metrics +from lib.utils import StandardScaler, DataLoader -from lib.utils import generate_graph_seq2seq_io_data_with_time from model.dcrnn_model import DCRNNModel -from model.tf_model_supervisor import TFModelSupervisor -class DCRNNSupervisor(TFModelSupervisor): +class DCRNNSupervisor(object): """ Do experiments using Graph Random Walk RNN model. """ - def __init__(self, traffic_reading_df, adj_mx, config): - self._adj_mx = adj_mx - super(DCRNNSupervisor, self).__init__(config, df_data=traffic_reading_df) + def __init__(self, adj_mx, **kwargs): - def _prepare_train_val_test_data(self): - # Parsing model parameters. - batch_size = self._get_config('batch_size') - horizon = self._get_config('horizon') - seq_len = self._get_config('seq_len') + self._kwargs = kwargs + self._data_kwargs = kwargs.get('data') + self._model_kwargs = kwargs.get('model') + self._train_kwargs = kwargs.get('train') - test_batch_size = 1 - add_time_in_day = self._get_config('add_time_in_day') + # logging. + self._log_dir = self._get_log_dir(kwargs) + log_helper.config_logging(log_dir=self.log_dir, log_filename='info.log', level=logging.DEBUG) + self._writer = tf.summary.FileWriter(self._log_dir) + logging.info(kwargs) - num_nodes = self._df_train.shape[-1] - x_train, y_train = generate_graph_seq2seq_io_data_with_time(self._df_train, - batch_size=batch_size, - seq_len=seq_len, - horizon=horizon, - num_nodes=num_nodes, - scaler=self._scaler, - add_time_in_day=add_time_in_day, - add_day_in_week=False) - x_val, y_val = generate_graph_seq2seq_io_data_with_time(self._df_val, batch_size=batch_size, - seq_len=seq_len, - horizon=horizon, - num_nodes=num_nodes, - scaler=self._scaler, - add_time_in_day=add_time_in_day, - add_day_in_week=False) - x_test, y_test = generate_graph_seq2seq_io_data_with_time(self._df_test, - batch_size=test_batch_size, - seq_len=seq_len, - horizon=horizon, - num_nodes=num_nodes, - scaler=self._scaler, - add_time_in_day=add_time_in_day, - add_day_in_week=False) - return x_train, y_train, x_val, y_val, x_test, y_test + # Data preparation + self._data = self._prepare_data(**self._data_kwargs) - def _build_train_val_test_models(self): - # Builds the model. - input_dim = self._x_train.shape[-1] - num_nodes = self._df_test.shape[-1] - output_dim = self._get_config('output_dim') - test_batch_size = self._get_config('test_batch_size') - train_config = dict(self._config) - train_config.update({ - 'input_dim': input_dim, - 'num_nodes': num_nodes, - 'output_dim': output_dim, - }) - test_config = dict(self._config) - test_config.update({ - 'batch_size': test_batch_size, - 'input_dim': input_dim, - 'num_nodes': num_nodes, - 'output_dim': output_dim, - }) + # Build models. + scaler = self._data['scaler'] + self._epoch = 0 with tf.name_scope('Train'): with tf.variable_scope('DCRNN', reuse=False): - train_model = DCRNNModel(is_training=True, config=train_config, scaler=self._scaler, - adj_mx=self._adj_mx) + self._train_model = DCRNNModel(is_training=True, scaler=scaler, + batch_size=self._data_kwargs['batch_size'], + adj_mx=adj_mx, **self._model_kwargs) with tf.name_scope('Val'): with tf.variable_scope('DCRNN', reuse=True): - val_model = DCRNNModel(is_training=False, config=train_config, scaler=self._scaler, - adj_mx=self._adj_mx) + self._val_model = DCRNNModel(is_training=False, scaler=scaler, + batch_size=self._data_kwargs['batch_size'], + adj_mx=adj_mx, **self._model_kwargs) with tf.name_scope('Test'): with tf.variable_scope('DCRNN', reuse=True): - test_model = DCRNNModel(is_training=False, config=test_config, scaler=self._scaler, - adj_mx=self._adj_mx) + self._test_model = DCRNNModel(is_training=False, scaler=scaler, + batch_size=self._data_kwargs['test_batch_size'], + adj_mx=adj_mx, **self._model_kwargs) - return train_model, val_model, test_model - - def _convert_model_outputs_to_eval_df(self, y_preds): - y_preds = np.stack(y_preds, axis=1) - # y_preds: (batch_size, epoch_size, horizon, num_nodes, output_dim) - # horizon = y_preds.shape[2] - horizon = self._get_config('horizon') - num_nodes = self._df_train.shape[-1] - df_preds = {} - for horizon_i in range(horizon): - y_pred = np.reshape(y_preds[:, :, horizon_i, :, 0], self._eval_dfs[horizon_i].shape) - df_pred = pd.DataFrame(self._scaler.inverse_transform(y_pred), index=self._eval_dfs[horizon_i].index, - columns=self._eval_dfs[horizon_i].columns) - df_preds[horizon_i] = df_pred - return df_preds + # Log model statistics. + total_trainable_parameter = utils.get_total_trainable_parameter_size() + logging.info('Total number of trainable parameters: %d' % total_trainable_parameter) + for var in tf.global_variables(): + logging.info('%s, %s' % (var.name, var.get_shape())) @staticmethod - def _generate_run_id(config): - batch_size = config.get('batch_size') - dropout = config.get('dropout') - learning_rate = config.get('learning_rate') - loss_func = config.get('loss_func') - max_diffusion_step = config['max_diffusion_step'] - num_rnn_layers = config.get('num_rnn_layers') - rnn_units = config.get('rnn_units') - seq_len = config.get('seq_len') - structure = '-'.join( - ['%d' % rnn_units for _ in range(num_rnn_layers)]) - horizon = config.get('horizon') - filter_type = config.get('filter_type') - filter_type_abbr = 'L' - if filter_type == 'random_walk': - filter_type_abbr = 'R' - elif filter_type == 'dual_random_walk': - filter_type_abbr = 'DR' - run_id = 'dcrnn_%s_%d_h_%d_%s_lr_%g_bs_%d_d_%.2f_sl_%d_%s_%s/' % ( - filter_type_abbr, max_diffusion_step, horizon, - structure, learning_rate, batch_size, - dropout, seq_len, loss_func, - time.strftime('%m%d%H%M%S')) - return run_id + def _get_log_dir(kwargs): + log_dir = kwargs['train'].get('log_dir') + if log_dir is None: + batch_size = kwargs['data'].get('batch_size') + learning_rate = kwargs['train'].get('base_lr') + max_diffusion_step = kwargs['model'].get('max_diffusion_step') + num_rnn_layers = kwargs['model'].get('num_rnn_layers') + rnn_units = kwargs['model'].get('rnn_units') + structure = '-'.join( + ['%d' % rnn_units for _ in range(num_rnn_layers)]) + horizon = kwargs['model'].get('horizon') + filter_type = kwargs['model'].get('filter_type') + filter_type_abbr = 'L' + if filter_type == 'random_walk': + filter_type_abbr = 'R' + elif filter_type == 'dual_random_walk': + filter_type_abbr = 'DR' + run_id = 'dcrnn_%s_%d_h_%d_%s_lr_%g_bs_%d_%s/' % ( + filter_type_abbr, max_diffusion_step, horizon, + structure, learning_rate, batch_size, + time.strftime('%m%d%H%M%S')) + base_dir = kwargs.get('base_dir') + log_dir = os.path.join(base_dir, run_id) + if not os.path.exists(log_dir): + os.makedirs(log_dir) + return log_dir + + @staticmethod + def _prepare_data(dataset_dir, **kwargs): + data = {} + for category in ['train', 'val', 'test']: + cat_data = np.load(os.path.join(dataset_dir, category + '.npz')) + data['x_' + category] = cat_data['x'] + data['y_' + category] = cat_data['y'] + scaler = StandardScaler(mean=data['x_train'][..., 0].mean(), std=data['x_train'][..., 0].std()) + # Data format + for category in ['train', 'val', 'test']: + data['x_' + category][..., 0] = scaler.transform(data['x_' + category][..., 0]) + data['y_' + category][..., 0] = scaler.transform(data['y_' + category][..., 0]) + for k, v in data.items(): + logging.info((k, v.shape)) + data['train_loader'] = DataLoader(data['x_train'], data['y_train'], kwargs['batch_size'], shuffle=True) + data['val_loader'] = DataLoader(data['x_val'], data['y_val'], kwargs['val_batch_size'], shuffle=False) + data['test_loader'] = DataLoader(data['x_test'], data['y_test'], kwargs['test_batch_size'], shuffle=False) + data['scaler'] = scaler + + return data + + def train(self, sess, **kwargs): + kwargs.update(self._train_kwargs) + return self._train(sess, **kwargs) + + def _train(self, sess, base_lr, epoch, steps, patience=50, epochs=100, + min_learning_rate=2e-6, lr_decay_ratio=0.1, save_model=1, + test_every_n_epochs=10, **train_kwargs): + history = [] + min_val_loss = float('inf') + wait = 0 + + max_to_keep = train_kwargs.get('max_to_keep', 100) + saver = tf.train.Saver(tf.global_variables(), max_to_keep=max_to_keep) + model_filename = train_kwargs.get('model_filename') + if model_filename is not None: + saver.restore(sess, model_filename) + self._epoch = epoch + 1 + else: + sess.run(tf.global_variables_initializer()) + while self._epoch <= epochs: + # Learning rate schedule. + new_lr = max(min_learning_rate, base_lr * (lr_decay_ratio ** np.sum(self._epoch >= np.array(steps)))) + self._train_model.set_lr(sess=sess, lr=new_lr) + sys.stdout.flush() + + start_time = time.time() + train_results = self._train_model.run_epoch_generator(sess, self._train_model, + self._data['train_loader'].get_iterator(), + train_op=self._train_model.train_op, + writer=self._writer) + train_loss, train_mae = train_results['loss'], train_results['mae'] + if train_loss > 1e5: + logging.warning('Gradient explosion detected. Ending...') + break + + global_step = sess.run(tf.train.get_or_create_global_step()) + # Compute validation error. + val_results = self._val_model.run_epoch_generator(sess, self._val_model, + self._data['val_loader'].get_iterator(), + train_op=None) + val_loss, val_mae = val_results['loss'], val_results['mae'] + + utils.add_simple_summary(self._writer, + ['loss/train_loss', 'metric/train_mae', 'loss/val_loss', 'metric/val_mae'], + [train_loss, train_mae, val_loss, val_mae], global_step=global_step) + end_time = time.time() + message = 'Epoch [{}] ({}) train_mae: {:.4f}, val_mae: {:.4f} lr:{:.6f} {:.1f}s'.format( + self._epoch, global_step, train_mae, val_mae, new_lr, (end_time - start_time)) + logging.info(message) + if self._epoch % test_every_n_epochs == test_every_n_epochs - 1: + self.test_and_write_result(sess, global_step) + if val_loss <= min_val_loss: + wait = 0 + if save_model > 0: + model_filename = self.save_model(sess, saver, val_loss) + logging.info( + 'Val loss decrease from %.4f to %.4f, saving to %s' % (min_val_loss, val_loss, model_filename)) + min_val_loss = val_loss + else: + wait += 1 + if wait > patience: + logging.warning('Early stopping at epoch: %d' % self._epoch) + break + + history.append(val_mae) + # Increases epoch. + self._epoch += 1 + + sys.stdout.flush() + return np.min(history) + + def test_and_write_result(self, sess, global_step, **kwargs): + test_results = self._test_model.run_epoch_generator(sess, self._test_model, + self._data['test_loader'].get_iterator(), + return_output=True, train_op=None) + + # y_preds: a list of (batch_size, horizon, num_nodes, output_dim) + test_loss, y_preds = test_results['loss'], test_results['outputs'] + utils.add_simple_summary(self._writer, ['loss/test_loss'], [test_loss], global_step=global_step) + + y_preds = np.concatenate(y_preds, axis=0) + scaler = self._data['scaler'] + outputs = [] + for horizon_i in range(self._data['y_test'].shape[1]): + y_truth = np.concatenate(self._data['y_test'][:, horizon_i, :, 0], axis=0) + y_truth = scaler.inverse_transform(y_truth) + y_pred = np.concatenate(y_preds[:, horizon_i, :, 0], axis=0) + y_pred = y_pred[:y_truth.shape[0], ...] # Only take the batch number + y_pred = scaler.inverse_transform(y_pred) + outputs.append(y_pred) + mae = metrics.masked_mae_np(y_pred, y_truth, null_val=0) + mape = metrics.masked_mape_np(y_pred, y_truth, null_val=0) + rmse = metrics.masked_rmse_np(y_pred, y_truth, null_val=0) + logging.info( + "Horizon {:02d}, MAE: {:.2f}, MAPE: {:.4f}, RMSE: {:.2f}".format( + horizon_i + 1, mae, mape, rmse + ) + ) + utils.add_simple_summary(self._writer, + ['%s_%d' % (item, horizon_i + 1) for item in + ['metric/rmse', 'metric/mape', 'metric/mae']], + [rmse, mape, mae], + global_step=global_step) + return y_preds + + @staticmethod + def restore(sess, config): + """ + Restore from saved model. + :param sess: + :param config: + :return: + """ + model_filename = config['train'].get('model_filename') + max_to_keep = config['train'].get('max_to_keep', 100) + saver = tf.train.Saver(tf.global_variables(), max_to_keep=max_to_keep) + saver.restore(sess, model_filename) + + def save_model(self, sess, saver, val_loss): + config_filename = 'config_{}.yaml'.format(self._epoch) + config = dict(self._kwargs) + global_step = np.asscalar(sess.run(tf.train.get_or_create_global_step())) + config['train']['epoch'] = self._epoch + config['train']['global_step'] = global_step + config['train']['log_dir'] = self._log_dir + config['train']['model_filename'] = saver.save(sess, + os.path.join(self._log_dir, 'models-{:.4f}'.format(val_loss)), + global_step=global_step, write_meta_graph=False) + with open(os.path.join(self._log_dir, config_filename), 'w') as f: + yaml.dump(config, f, default_flow_style=False) + return config['train']['model_filename'] + + @property + def log_dir(self): + return self._log_dir diff --git a/model/tf_model.py b/model/tf_model.py deleted file mode 100644 index 7a76468..0000000 --- a/model/tf_model.py +++ /dev/null @@ -1,130 +0,0 @@ -""" -Base class for tensorflow models for traffic forecasting. -""" -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import numpy as np -import tensorflow as tf - - -class TFModel(object): - def __init__(self, config, scaler=None, **kwargs): - """ - Initialization including placeholders, learning rate, - :param config: - :param scaler: data z-norm normalizer - :param kwargs: - """ - self._config = dict(config) - - # Placeholders for input and output. - self._inputs = None - self._labels = None - self._outputs = None - - # Scaler for data normalization. - self._scaler = scaler - - # Train and loss - self._loss = None - self._mae = None - self._train_op = None - - # Learning rate. - learning_rate = config.get('learning_rate', 0.001) - self._lr = tf.get_variable('learning_rate', shape=(), initializer=tf.constant_initializer(learning_rate), - trainable=False) - self._new_lr = tf.placeholder(tf.float32, shape=(), name='new_learning_rate') - self._lr_update = tf.assign(self._lr, self._new_lr, name='lr_update') - - # Log merged summary - self._merged = None - - @staticmethod - def run_epoch(sess, model, inputs, labels, return_output=False, train_op=None, writer=None): - losses = [] - maes = [] - outputs = [] - - fetches = { - 'mae': model.mae, - 'loss': model.loss, - 'global_step': tf.train.get_or_create_global_step() - } - if train_op: - fetches.update({ - 'train_op': train_op, - }) - merged = model.merged - if merged is not None: - fetches.update({'merged': merged}) - - if return_output: - fetches.update({ - 'outputs': model.outputs - }) - - for _, (x, y) in enumerate(zip(inputs, labels)): - feed_dict = { - model.inputs: x, - model.labels: y, - } - - vals = sess.run(fetches, feed_dict=feed_dict) - - losses.append(vals['loss']) - maes.append(vals['mae']) - if writer is not None and 'merged' in vals: - writer.add_summary(vals['merged'], global_step=vals['global_step']) - if return_output: - outputs.append(vals['outputs']) - - results = { - 'loss': np.mean(losses), - 'mae': np.mean(maes) - } - if return_output: - results['outputs'] = outputs - return results - - def get_lr(self, sess): - return np.asscalar(sess.run(self._lr)) - - def set_lr(self, sess, lr): - sess.run(self._lr_update, feed_dict={ - self._new_lr: lr - }) - - @property - def inputs(self): - return self._inputs - - @property - def labels(self): - return self._labels - - @property - def loss(self): - return self._loss - - @property - def lr(self): - return self._lr - - @property - def mae(self): - return self._mae - - @property - def merged(self): - return self._merged - - @property - def outputs(self): - return self._outputs - - @property - def train_op(self): - return self._train_op diff --git a/model/tf_model_supervisor.py b/model/tf_model_supervisor.py deleted file mode 100644 index 89f74a3..0000000 --- a/model/tf_model_supervisor.py +++ /dev/null @@ -1,282 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import math -import numpy as np -import os -import sys -import tensorflow as tf -import time -import yaml - -from lib import log_helper -from lib import metrics -from lib import tf_utils -from lib import utils -from lib.utils import StandardScaler -from model.tf_model import TFModel - - -class TFModelSupervisor(object): - """ - Base supervisor for tensorflow models for traffic forecasting. - """ - - def __init__(self, config, df_data, **kwargs): - self._config = dict(config) - self._epoch = 0 - - # logging. - self._init_logging() - self._logger.info(config) - - # Data preparation - test_ratio = self._get_config('test_ratio') - validation_ratio = self._get_config('validation_ratio') - self._df_train, self._df_val, self._df_test = utils.train_val_test_split_df(df_data, val_ratio=validation_ratio, - test_ratio=test_ratio) - self._scaler = StandardScaler(mean=self._df_train.values.mean(), std=self._df_train.values.std()) - self._x_train, self._y_train, self._x_val, self._y_val, self._x_test, self._y_test = self._prepare_train_val_test_data() - self._eval_dfs = self._prepare_eval_df() - - # Build models. - self._train_model, self._val_model, self._test_model = self._build_train_val_test_models() - - # Log model statistics. - total_trainable_parameter = tf_utils.get_total_trainable_parameter_size() - self._logger.info('Total number of trainable parameters: %d' % total_trainable_parameter) - for var in tf.global_variables(): - self._logger.debug('%s, %s' % (var.name, var.get_shape())) - - def _get_config(self, key, use_default=True): - default_config = { - 'add_day_in_week': False, - 'add_time_in_day': True, - 'dropout': 0., - 'batch_size': 64, - 'horizon': 12, - 'learning_rate': 1e-3, - 'lr_decay': 0.1, - 'lr_decay_epoch': 50, - 'lr_decay_interval': 10, - 'max_to_keep': 100, - 'min_learning_rate': 2e-6, - 'null_val': 0., - 'output_type': 'range', - 'patience': 20, - 'save_model': 1, - 'seq_len': 12, - 'test_batch_size': 1, - 'test_every_n_epochs': 10, - 'test_ratio': 0.2, - 'use_cpu_only': False, - 'validation_ratio': 0.1, - 'verbose': 0, - } - value = self._config.get(key) - if value is None and use_default: - value = default_config.get(key) - return value - - def _init_logging(self): - base_dir = self._get_config('base_dir') - log_dir = self._get_config('log_dir') - if log_dir is None: - run_id = self._generate_run_id(self._config) - log_dir = os.path.join(base_dir, run_id) - if not os.path.exists(log_dir): - os.makedirs(log_dir) - else: - run_id = os.path.basename(os.path.normpath(log_dir)) - self._log_dir = log_dir - self._logger = log_helper.get_logger(self._log_dir, run_id) - self._writer = tf.summary.FileWriter(self._log_dir) - - def train(self, sess, **kwargs): - history = [] - min_val_loss = float('inf') - wait = 0 - - epochs = self._get_config('epochs') - initial_lr = self._get_config('learning_rate') - min_learning_rate = self._get_config('min_learning_rate') - lr_decay_epoch = self._get_config('lr_decay_epoch') - lr_decay = self._get_config('lr_decay') - lr_decay_interval = self._get_config('lr_decay_interval') - patience = self._get_config('patience') - test_every_n_epochs = self._get_config('test_every_n_epochs') - save_model = self._get_config('save_model') - - max_to_keep = self._get_config('max_to_keep') - saver = tf.train.Saver(tf.global_variables(), max_to_keep=max_to_keep) - model_filename = self._get_config('model_filename') - if model_filename is not None: - saver.restore(sess, model_filename) - self._train_model.set_lr(sess, self._get_config('learning_rate')) - self._epoch = self._get_config('epoch') + 1 - else: - sess.run(tf.global_variables_initializer()) - - while self._epoch <= epochs: - # Learning rate schedule. - new_lr = self.calculate_scheduled_lr(initial_lr, epoch=self._epoch, - lr_decay=lr_decay, lr_decay_epoch=lr_decay_epoch, - lr_decay_interval=lr_decay_interval, - min_lr=min_learning_rate) - if new_lr != initial_lr: - self._logger.info('Updating learning rate to: %.6f' % new_lr) - self._train_model.set_lr(sess=sess, lr=new_lr) - sys.stdout.flush() - - start_time = time.time() - train_results = TFModel.run_epoch(sess, self._train_model, - inputs=self._x_train, labels=self._y_train, - train_op=self._train_model.train_op, writer=self._writer) - train_loss, train_mae = train_results['loss'], train_results['mae'] - if train_loss > 1e5: - self._logger.warn('Gradient explosion detected. Ending...') - break - - global_step = sess.run(tf.train.get_or_create_global_step()) - # Compute validation error. - val_results = TFModel.run_epoch(sess, self._val_model, inputs=self._x_val, labels=self._y_val, - train_op=None) - val_loss, val_mae = val_results['loss'], val_results['mae'] - - tf_utils.add_simple_summary(self._writer, - ['loss/train_loss', 'metric/train_mae', 'loss/val_loss', 'metric/val_mae'], - [train_loss, train_mae, val_loss, val_mae], global_step=global_step) - end_time = time.time() - message = 'Epoch %d (%d) train_loss: %.4f, train_mae: %.4f, val_loss: %.4f, val_mae: %.4f %ds' % ( - self._epoch, global_step, train_loss, train_mae, val_loss, val_mae, (end_time - start_time)) - self._logger.info(message) - if self._epoch % test_every_n_epochs == test_every_n_epochs - 1: - self.test_and_write_result(sess=sess, global_step=global_step, epoch=self._epoch) - - if val_loss <= min_val_loss: - wait = 0 - if save_model > 0: - model_filename = self.save_model(sess, saver, val_loss) - self._logger.info( - 'Val loss decrease from %.4f to %.4f, saving to %s' % (min_val_loss, val_loss, model_filename)) - min_val_loss = val_loss - else: - wait += 1 - if wait > patience: - self._logger.warn('Early stopping at epoch: %d' % self._epoch) - break - - history.append(val_mae) - # Increases epoch. - self._epoch += 1 - - sys.stdout.flush() - return np.min(history) - - @staticmethod - def calculate_scheduled_lr(initial_lr, epoch, lr_decay, lr_decay_epoch, lr_decay_interval, - min_lr=1e-6): - decay_factor = int(math.ceil((epoch - lr_decay_epoch) / float(lr_decay_interval))) - new_lr = initial_lr * lr_decay ** max(0, decay_factor) - new_lr = max(min_lr, new_lr) - return new_lr - - @staticmethod - def _generate_run_id(config): - raise NotImplementedError - - @staticmethod - def _get_config_filename(epoch): - return 'config_%02d.yaml' % epoch - - def restore(self, sess, config): - """ - Restore from saved model. - :param sess: - :param config: - :return: - """ - model_filename = config['model_filename'] - max_to_keep = self._get_config('max_to_keep') - saver = tf.train.Saver(tf.global_variables(), max_to_keep=max_to_keep) - saver.restore(sess, model_filename) - - def save_model(self, sess, saver, val_loss): - config_filename = TFModelSupervisor._get_config_filename(self._epoch) - config = dict(self._config) - global_step = sess.run(tf.train.get_or_create_global_step()) - config['epoch'] = self._epoch - config['global_step'] = global_step - config['log_dir'] = self._log_dir - config['model_filename'] = saver.save(sess, os.path.join(self._log_dir, 'models-%.4f' % val_loss), - global_step=global_step, write_meta_graph=False) - with open(os.path.join(self._log_dir, config_filename), 'w') as f: - yaml.dump(config, f) - return config['model_filename'] - - def test_and_write_result(self, sess, global_step, **kwargs): - null_val = self._config.get('null_val') - start_time = time.time() - test_results = TFModel.run_epoch(sess, self._test_model, self._x_test, self._y_test, return_output=True, - train_op=None) - - # y_preds: a list of (batch_size, horizon, num_nodes, output_dim) - test_loss, y_preds = test_results['loss'], test_results['outputs'] - tf_utils.add_simple_summary(self._writer, ['loss/test_loss'], [test_loss], global_step=global_step) - - # Reshapes to (batch_size, epoch_size, horizon, num_node) - df_preds = self._convert_model_outputs_to_eval_df(y_preds) - - for horizon_i in df_preds: - df_pred = df_preds[horizon_i] - df_test = self._eval_dfs[horizon_i] - mae, mape, rmse = metrics.calculate_metrics(df_pred, df_test, null_val) - - tf_utils.add_simple_summary(self._writer, - ['%s_%d' % (item, horizon_i + 1) for item in - ['metric/rmse', 'metric/mape', 'metric/mae']], - [rmse, mape, mae], - global_step=global_step) - end_time = time.time() - message = 'Horizon %d, mape:%.4f, rmse:%.4f, mae:%.4f, %ds' % ( - horizon_i + 1, mape, rmse, mae, end_time - start_time) - self._logger.info(message) - start_time = end_time - return df_preds - - def _prepare_train_val_test_data(self): - """ - Prepare data for train, val and test. - :return: - """ - raise NotImplementedError - - def _prepare_eval_df(self): - horizon = self._get_config('horizon') - seq_len = self._get_config('seq_len') - # y_test: (epoch_size, batch_size, ...) - n_test_samples = np.prod(self._y_test.shape[:2]) - eval_dfs = {} - for horizon_i in range(horizon): - eval_dfs[horizon_i] = self._df_test[seq_len + horizon_i: seq_len + horizon_i + n_test_samples] - return eval_dfs - - def _build_train_val_test_models(self): - """ - Buids models for train, val and test. - :return: - """ - raise NotImplementedError - - def _convert_model_outputs_to_eval_df(self, y_preds): - """ - Convert the outputs to a dict, with key: horizon, value: the corresponding dataframe. - :param y_preds: - :return: - """ - raise NotImplementedError - - @property - def log_dir(self): - return self._log_dir diff --git a/run_demo.py b/run_demo.py index 9a831ab..5b8fedd 100644 --- a/run_demo.py +++ b/run_demo.py @@ -1,37 +1,28 @@ +import argparse import os import pandas as pd import sys import tensorflow as tf import yaml -from lib.dcrnn_utils import load_graph_data +from lib.utils import load_graph_data from model.dcrnn_supervisor import DCRNNSupervisor -flags = tf.app.flags -FLAGS = flags.FLAGS -flags.DEFINE_bool('use_cpu_only', False, 'Whether to run tensorflow on cpu.') - - -def run_dcrnn(traffic_reading_df): - # run_id = 'dcrnn_DR_2_h_12_64-64_lr_0.01_bs_64_d_0.00_sl_12_MAE_1207002222' - run_id = 'dcrnn_DR_2_h_12_64-64_lr_0.01_bs_64_d_0.00_sl_12_MAE_0606021843' - - log_dir = os.path.join('data/model', run_id) - - config_filename = 'config_75.yaml' +def run_dcrnn(args): graph_pkl_filename = 'data/sensor_graph/adj_mx.pkl' - with open(os.path.join(log_dir, config_filename)) as f: + with open(args.config_filename) as f: config = yaml.load(f) tf_config = tf.ConfigProto() - if FLAGS.use_cpu_only: + if args.use_cpu_only: tf_config = tf.ConfigProto(device_count={'GPU': 0}) tf_config.gpu_options.allow_growth = True _, _, adj_mx = load_graph_data(graph_pkl_filename) with tf.Session(config=tf_config) as sess: - supervisor = DCRNNSupervisor(traffic_reading_df, config=config, adj_mx=adj_mx) + supervisor = DCRNNSupervisor(adj_mx=adj_mx, **config) supervisor.restore(sess, config=config) df_preds = supervisor.test_and_write_result(sess, config['global_step']) + # TODO (yaguang): save this file to the npz file. for horizon_i in df_preds: df_pred = df_preds[horizon_i] filename = os.path.join('data/results/', 'dcrnn_prediction_%d.h5' % (horizon_i + 1)) @@ -41,7 +32,10 @@ def run_dcrnn(traffic_reading_df): if __name__ == '__main__': sys.path.append(os.getcwd()) - traffic_df_filename = 'data/df_highway_2012_4mon_sample.h5' - traffic_reading_df = pd.read_hdf(traffic_df_filename) - run_dcrnn(traffic_reading_df) - # run_fc_lstm(traffic_reading_df) + parser = argparse.ArgumentParser() + parser.add_argument('--traffic_df_filename', default='data/df_highway_2012_4mon_sample.h5', + type=str, help='Traffic data file.') + parser.add_argument('--use_cpu_only', default=False, type=str, help='Whether to run tensorflow on cpu.') + parser.add_argument('--config_filename', default=None, type=str, help='Config file for pretrained model.') + args = parser.parse_args() + run_dcrnn(args) diff --git a/scripts/__init__.py b/scripts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/gen_adj_mx.py b/scripts/gen_adj_mx.py similarity index 62% rename from gen_adj_mx.py rename to scripts/gen_adj_mx.py index 4e212e8..840f72c 100644 --- a/gen_adj_mx.py +++ b/scripts/gen_adj_mx.py @@ -2,21 +2,10 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import argparse import numpy as np import pandas as pd import pickle -import tensorflow as tf - -flags = tf.app.flags -FLAGS = flags.FLAGS - -flags.DEFINE_string('sensor_ids_filename', 'data/sensor_graph/graph_sensor_ids.txt', - 'File containing sensor ids separated by comma.') -flags.DEFINE_string('distances_filename', 'data/sensor_graph/distances_la_2012.csv', - 'CSV file containing sensor distances with three columns: [from, to, distance].') -flags.DEFINE_float('normalized_k', 0.1, 'Entries that become lower than normalized_k after normalization ' - 'are set to zero for sparsity.') -flags.DEFINE_string('output_pkl_filename', 'data/sensor_graph/adj_mat.pkl', 'Path of the output file.') def get_adjacency_matrix(distance_df, sensor_ids, normalized_k=0.1): @@ -54,10 +43,21 @@ def get_adjacency_matrix(distance_df, sensor_ids, normalized_k=0.1): if __name__ == '__main__': - with open(FLAGS.sensor_ids_filename) as f: + parser = argparse.ArgumentParser() + parser.add_argument('--sensor_ids_filename', type=str, default='data/sensor_graph/graph_sensor_ids.txt', + help='File containing sensor ids separated by comma.') + parser.add_argument('--distances_filename', type=str, default='data/sensor_graph/distances_la_2012.csv', + help='CSV file containing sensor distances with three columns: [from, to, distance].') + parser.add_argument('--normalized_k', type=float, default=0.1, + help='Entries that become lower than normalized_k after normalization are set to zero for sparsity.') + parser.add_argument('--output_pkl_filename', type=str, default='data/sensor_graph/adj_mat.pkl', + help='Path of the output file.') + args = parser.parse_args() + + with open(args.sensor_ids_filename) as f: sensor_ids = f.read().strip().split(',') - distance_df = pd.read_csv(FLAGS.distances_filename, dtype={'from': 'str', 'to': 'str'}) + distance_df = pd.read_csv(args.distances_filename, dtype={'from': 'str', 'to': 'str'}) _, sensor_id_to_ind, adj_mx = get_adjacency_matrix(distance_df, sensor_ids) # Save to pickle file. - with open(FLAGS.output_pkl_filename, 'wb') as f: + with open(args.output_pkl_filename, 'wb') as f: pickle.dump([sensor_ids, sensor_id_to_ind, adj_mx], f, protocol=2) diff --git a/scripts/generate_training_data.py b/scripts/generate_training_data.py new file mode 100644 index 0000000..8adeed0 --- /dev/null +++ b/scripts/generate_training_data.py @@ -0,0 +1,125 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +import argparse +import numpy as np +import os +import pandas as pd + +from lib.utils import DataLoader, StandardScaler + + +def generate_graph_seq2seq_io_data( + df, x_offsets, y_offsets, add_time_in_day=True, add_day_in_week=False, scaler=None +): + """ + Generate samples from + :param df: + :param x_offsets: + :param y_offsets: + :param add_time_in_day: + :param add_day_in_week: + :param scaler: + :return: + # x: (epoch_size, input_length, num_nodes, input_dim) + # y: (epoch_size, output_length, num_nodes, output_dim) + """ + + num_samples, num_nodes = df.shape + data = np.expand_dims(df.values, axis=-1) + data_list = [data] + if add_time_in_day: + time_ind = (df.index.values - df.index.values.astype("datetime64[D]")) / np.timedelta64(1, "D") + time_in_day = np.tile(time_ind, [1, num_nodes, 1]).transpose((2, 1, 0)) + data_list.append(time_in_day) + if add_day_in_week: + day_in_week = np.zeros(shape=(num_samples, num_nodes, 7)) + day_in_week[np.arange(num_samples), :, df.index.dayofweek] = 1 + data_list.append(day_in_week) + + data = np.concatenate(data_list, axis=-1) + # epoch_len = num_samples + min(x_offsets) - max(y_offsets) + x, y = [], [] + # t is the index of the last observation. + min_t = abs(min(x_offsets)) + max_t = abs(num_samples - abs(max(y_offsets))) # Exclusive + for t in range(min_t, max_t): + x_t = data[t + x_offsets, ...] + y_t = data[t + y_offsets, ...] + x.append(x_t) + y.append(y_t) + x = np.stack(x, axis=0) + y = np.stack(y, axis=0) + return x, y + + +def generate_train_val_test(args): + df = pd.read_hdf(args.traffic_df_filename) + # 0 is the latest observed sample. + x_offsets = np.sort( + # np.concatenate(([-week_size + 1, -day_size + 1], np.arange(-11, 1, 1))) + np.concatenate((np.arange(-11, 1, 1),)) + ) + # Predict the next one hour + y_offsets = np.sort(np.arange(1, 13, 1)) + # x: (num_samples, input_length, num_nodes, input_dim) + # y: (num_samples, output_length, num_nodes, output_dim) + x, y = generate_graph_seq2seq_io_data( + df, + x_offsets=x_offsets, + y_offsets=y_offsets, + add_time_in_day=True, + add_day_in_week=False, + ) + + print("x shape: ", x.shape, ", y shape: ", y.shape) + # Write the data into npz file. + # num_test = 6831, using the last 6831 examples as testing. + # for the rest: 7/8 is used for training, and 1/8 is used for validation. + num_samples = x.shape[0] + num_test = round(num_samples * 0.2) + num_train = round(num_samples * 0.7) + num_val = num_samples - num_test - num_train + + # train + x_train, y_train = x[:num_train], y[:num_train] + # val + x_val, y_val = ( + x[num_train: num_train + num_val], + y[num_train: num_train + num_val], + ) + # test + x_test, y_test = x[-num_test:], y[-num_test:] + + for cat in ["train", "val", "test"]: + _x, _y = locals()["x_" + cat], locals()["y_" + cat] + print(cat, "x: ", _x.shape, "y:", _y.shape) + np.savez_compressed( + os.path.join(args.output_dir, "%s.npz" % cat), + x=_x, + y=_y, + x_offsets=x_offsets.reshape(list(x_offsets.shape) + [1]), + y_offsets=y_offsets.reshape(list(y_offsets.shape) + [1]), + ) + + +def main(args): + print("Generating training data") + generate_train_val_test(args) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--output_dir", type=str, default="data/", help="Output directory." + ) + parser.add_argument( + "--traffic_df_filename", + type=str, + default="data/df_highway_2012_4mon_sample.h5", + help="Raw traffic readings.", + ) + args = parser.parse_args() + main(args)