Code refactoring, including data loading, logging, configuration, removing redundant code.

This commit is contained in:
Yaguang 2018-09-26 11:19:00 -07:00
parent 80e156c580
commit 9ec161543b
34 changed files with 785 additions and 1326 deletions

View File

@ -1,40 +0,0 @@
base_dir: data/model
batch_size: 64
cl_decay_steps: 2000
data_type: ALL
dropout: 0
epoch: 75
epochs: 100
filter_type: dual_random_walk
global_step: !!python/object/apply:numpy.core.multiarray.scalar
- !!python/object/apply:numpy.dtype
args: [i8, 0, 1]
state: !!python/tuple [3, <, null, null, null, -1, -1, 0]
- !!binary |
NGgAAAAAAAA=
graph_pkl_filename: data/sensor_graph/adj_mx.pkl
horizon: 12
l1_decay: 0
learning_rate: 0.01
log_dir: data/model/dcrnn_DR_2_h_12_64-64_lr_0.01_bs_64_d_0.00_sl_12_MAE_0606021843/
loss_func: MAE
lr_decay: 0.1
lr_decay_epoch: 20
lr_decay_interval: 10
max_diffusion_step: 2
max_grad_norm: 5
min_learning_rate: 2.0e-06
model_filename: data/model/dcrnn_DR_2_h_12_64-64_lr_0.01_bs_64_d_0.00_sl_12_MAE_0606021843/models-2.8476-26676
null_val: 0
num_rnn_layers: 2
output_dim: 1
patience: 50
rnn_units: 64
seq_len: 12
test_every_n_epochs: 10
test_ratio: 0.2
use_cpu_only: false
use_curriculum_learning: true
validation_ratio: 0.1
verbose: 0
write_db: false

View File

@ -1,34 +1,36 @@
---
base_dir: data/model
batch_size: 64
cl_decay_steps: 2000
data_type: ALL
dropout: 0
epoch: 0
epochs: 100
filter_type: dual_random_walk
global_step: 0
graph_pkl_filename: data/sensor_graph/adj_mx.pkl
horizon: 12
l1_decay: 0
learning_rate: 0.01
loss_func: MAE
lr_decay: 0.1
lr_decay_epoch: 20
lr_decay_interval: 10
max_diffusion_step: 2
max_grad_norm: 5
min_learning_rate: 2.0e-06
null_val: 0
num_rnn_layers: 2
output_dim: 1
patience: 50
rnn_units: 64
seq_len: 12
test_every_n_epochs: 10
test_ratio: 0.2
use_cpu_only: false
use_curriculum_learning: true
validation_ratio: 0.1
verbose: 0
write_db: false
data:
batch_size: 64
dataset_dir: data/METR-LA
test_batch_size: 64
val_batch_size: 64
graph_pkl_filename: data/sensor_graph/adj_mx.pkl
model:
cl_decay_steps: 2000
filter_type: dual_random_walk
horizon: 12
input_dim: 2
l1_decay: 0
max_diffusion_step: 2
max_grad_norm: 5
num_nodes: 207
num_rnn_layers: 2
output_dim: 1
rnn_units: 64
seq_len: 12
use_curriculum_learning: true
train:
base_lr: 0.01
dropout: 0
epoch: 0
epochs: 100
global_step: 0
lr_decay_ratio: 0.1
steps: [20, 30, 40, 50]
max_to_keep: 100
min_learning_rate: 2.0e-06
patience: 50
test_every_n_epochs: 10

View File

@ -0,0 +1,36 @@
---
base_dir: data/model
data:
batch_size: 64
dataset_dir: data/METR-LA
test_batch_size: 64
val_batch_size: 64
graph_pkl_filename: data/sensor_graph/adj_mx.pkl
model:
cl_decay_steps: 2000
filter_type: laplacian
horizon: 12
input_dim: 2
l1_decay: 0
max_diffusion_step: 2
max_grad_norm: 5
num_nodes: 207
num_rnn_layers: 2
output_dim: 1
rnn_units: 16
seq_len: 12
use_curriculum_learning: true
train:
base_lr: 0.01
dropout: 0
epoch: 0
epochs: 100
global_step: 0
lr_decay_ratio: 0.1
steps: [20, 30, 40, 50]
max_to_keep: 100
min_learning_rate: 2.0e-06
patience: 50
test_every_n_epochs: 10

View File

@ -1,35 +1,36 @@
---
base_dir: data/model
batch_size: 64
cl_decay_steps: 2000
data_type: ALL
dropout: 0
epoch: 0
epochs: 100
filter_type: random_walk
global_step: 0
graph_pkl_filename: data/sensor_graph/adj_mx.pkl
horizon: 3
l1_decay: 0
learning_rate: 0.01
loss_func: MAE
lr_decay: 0.1
lr_decay_epoch: 20
lr_decay_interval: 10
max_diffusion_step: 2
max_grad_norm: 5
method_type: GCRNN
min_learning_rate: 2.0e-06
null_val: 0
num_rnn_layers: 2
output_dim: 1
patience: 50
rnn_units: 16
seq_len: 3
test_every_n_epochs: 10
test_ratio: 0.2
use_cpu_only: false
use_curriculum_learning: true
validation_ratio: 0.1
verbose: 0
write_db: false
data:
batch_size: 64
dataset_dir: data/METR-LA
test_batch_size: 64
val_batch_size: 64
graph_pkl_filename: data/sensor_graph/adj_mx.pkl
model:
cl_decay_steps: 2000
filter_type: dual_random_walk
horizon: 12
input_dim: 2
l1_decay: 0
max_diffusion_step: 2
max_grad_norm: 5
num_nodes: 207
num_rnn_layers: 2
output_dim: 1
rnn_units: 64
seq_len: 12
use_curriculum_learning: true
train:
base_lr: 0.01
dropout: 0
epoch: 0
epochs: 100
global_step: 0
lr_decay_ratio: 0.1
steps: [20, 30, 40, 50]
max_to_keep: 100
min_learning_rate: 2.0e-06
patience: 50
test_every_n_epochs: 10

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -2,86 +2,35 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pandas as pd
import argparse
import tensorflow as tf
import yaml
from lib import log_helper
from lib.dcrnn_utils import load_graph_data
from lib.utils import load_graph_data
from model.dcrnn_supervisor import DCRNNSupervisor
# flags
flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_integer('batch_size', -1, 'Batch size')
flags.DEFINE_integer('cl_decay_steps', -1,
'Parameter to control the decay speed of probability of feeding groundth instead of model output.')
flags.DEFINE_string('config_filename', None, 'Configuration filename for restoring the model.')
flags.DEFINE_integer('epochs', -1, 'Maximum number of epochs to train.')
flags.DEFINE_string('filter_type', None, 'laplacian/random_walk/dual_random_walk.')
flags.DEFINE_string('graph_pkl_filename', 'data/sensor_graph/adj_mx.pkl',
'Pickle file containing: sensor_ids, sensor_id_to_ind_map, dist_matrix')
flags.DEFINE_integer('horizon', -1, 'Maximum number of timestamps to prediction.')
flags.DEFINE_float('l1_decay', -1.0, 'L1 Regularization')
flags.DEFINE_float('lr_decay', -1.0, 'Learning rate decay.')
flags.DEFINE_integer('lr_decay_epoch', -1, 'The epoch that starting decaying the parameter.')
flags.DEFINE_integer('lr_decay_interval', -1, 'Interval beteween each deacy.')
flags.DEFINE_float('learning_rate', -1, 'Learning rate. -1: select by hyperopt tuning.')
flags.DEFINE_string('log_dir', None, 'Log directory for restoring the model from a checkpoint.')
flags.DEFINE_string('loss_func', None, 'MSE/MAPE/RMSE_MAPE: loss function.')
flags.DEFINE_float('min_learning_rate', -1, 'Minimum learning rate')
flags.DEFINE_integer('nb_weeks', 17, 'How many week\'s data should be used for train/test.')
flags.DEFINE_integer('patience', -1,
'Maximum number of epochs allowed for non-improving validation error before early stopping.')
flags.DEFINE_integer('seq_len', -1, 'Sequence length.')
flags.DEFINE_integer('test_every_n_epochs', -1, 'Run model on the testing dataset every n epochs.')
flags.DEFINE_string('traffic_df_filename', 'data/df_highway_2012_4mon_sample.h5',
'Path to hdf5 pandas.DataFrame.')
flags.DEFINE_bool('use_cpu_only', False, 'Set to true to only use cpu.')
flags.DEFINE_bool('use_curriculum_learning', None, 'Set to true to use Curriculum learning in decoding stage.')
flags.DEFINE_integer('verbose', -1, '1: to log individual sensor information.')
def main():
# Reads graph data.
with open(FLAGS.config_filename) as f:
def main(args):
with open(args.config_filename) as f:
supervisor_config = yaml.load(f)
logger = log_helper.get_logger(supervisor_config.get('base_dir'), 'info.log')
logger.info('Loading graph from: ' + FLAGS.graph_pkl_filename)
sensor_ids, sensor_id_to_ind, adj_mx = load_graph_data(FLAGS.graph_pkl_filename)
adj_mx[adj_mx < 0.1] = 0
logger.info('Loading traffic data from: ' + FLAGS.traffic_df_filename)
traffic_df_filename = FLAGS.traffic_df_filename
traffic_reading_df = pd.read_hdf(traffic_df_filename)
traffic_reading_df = traffic_reading_df.ix[:, sensor_ids]
supervisor_config['use_cpu_only'] = FLAGS.use_cpu_only
if FLAGS.log_dir:
supervisor_config['log_dir'] = FLAGS.log_dir
if FLAGS.use_curriculum_learning is not None:
supervisor_config['use_curriculum_learning'] = FLAGS.use_curriculum_learning
if FLAGS.loss_func:
supervisor_config['loss_func'] = FLAGS.loss_func
if FLAGS.filter_type:
supervisor_config['filter_type'] = FLAGS.filter_type
# Overwrites space with specified parameters.
for name in ['batch_size', 'cl_decay_steps', 'epochs', 'horizon', 'learning_rate', 'l1_decay',
'lr_decay', 'lr_decay_epoch', 'lr_decay_interval', 'learning_rate', 'min_learning_rate',
'patience', 'seq_len', 'test_every_n_epochs', 'verbose']:
if getattr(FLAGS, name) >= 0:
supervisor_config[name] = getattr(FLAGS, name)
graph_pkl_filename = supervisor_config['data'].get('graph_pkl_filename')
sensor_ids, sensor_id_to_ind, adj_mx = load_graph_data(graph_pkl_filename)
tf_config = tf.ConfigProto()
if FLAGS.use_cpu_only:
if args.use_cpu_only:
tf_config = tf.ConfigProto(device_count={'GPU': 0})
tf_config.gpu_options.allow_growth = True
with tf.Session(config=tf_config) as sess:
supervisor = DCRNNSupervisor(traffic_reading_df=traffic_reading_df, adj_mx=adj_mx,
config=supervisor_config)
supervisor = DCRNNSupervisor(adj_mx=adj_mx, **supervisor_config)
supervisor.train(sess=sess)
if __name__ == '__main__':
main()
parser = argparse.ArgumentParser()
parser.add_argument('--config_filename', default=None, type=str,
help='Configuration filename for restoring the model.')
parser.add_argument('--use_cpu_only', default=False, type=bool, help='Set to true to only use cpu.')
args = parser.parse_args()
main(args)

View File

@ -1,56 +0,0 @@
import pickle
import numpy as np
import scipy.sparse as sp
from scipy.sparse import linalg
from lib.tf_utils import sparse_matrix_to_tf_sparse_tensor
from lib.utils import load_pickle
def load_graph_data(pkl_filename):
sensor_ids, sensor_id_to_ind, adj_mx = load_pickle(pkl_filename)
return sensor_ids, sensor_id_to_ind, adj_mx
def calculate_normalized_laplacian(adj):
"""
# L = D^-1/2 (D-A) D^-1/2 = I - D^-1/2 A D^-1/2
# D = diag(A 1)
:param adj:
:return:
"""
adj = sp.coo_matrix(adj)
d = np.array(adj.sum(1))
d_inv_sqrt = np.power(d, -0.5).flatten()
d_inv_sqrt[np.isinf(d_inv_sqrt)] = 0.
d_mat_inv_sqrt = sp.diags(d_inv_sqrt)
normalized_laplacian = sp.eye(adj.shape[0]) - adj.dot(d_mat_inv_sqrt).transpose().dot(d_mat_inv_sqrt).tocoo()
return normalized_laplacian
def calculate_random_walk_matrix(adj_mx):
adj_mx = sp.coo_matrix(adj_mx)
d = np.array(adj_mx.sum(1))
d_inv = np.power(d, -1).flatten()
d_inv[np.isinf(d_inv)] = 0.
d_mat_inv = sp.diags(d_inv)
random_walk_mx = d_mat_inv.dot(adj_mx).tocoo()
return random_walk_mx
def calculate_reverse_random_walk_matrix(adj_mx):
return calculate_random_walk_matrix(np.transpose(adj_mx))
def calculate_scaled_laplacian(adj_mx, lambda_max=2, undirected=True):
if undirected:
adj_mx = np.maximum.reduce([adj_mx, adj_mx.T])
L = calculate_normalized_laplacian(adj_mx)
if lambda_max is None:
lambda_max, _ = linalg.eigsh(L, 1, which='LM')
lambda_max = lambda_max[0]
L = sp.csr_matrix(L)
M, _ = L.shape
I = sp.identity(M, format='csr', dtype=L.dtype)
L = (2 / lambda_max * L) - I
return L.astype(np.float32)

View File

@ -24,3 +24,22 @@ def get_logger(log_dir, name):
# Add google cloud log handler
logger.info('Log directory: %s', log_dir)
return logger
def config_logging(log_dir, log_filename='info.log', level=logging.INFO):
# Add file handler and stdout handler
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# Create the log directory if necessary.
try:
os.makedirs(log_dir)
except OSError:
pass
file_handler = logging.FileHandler(os.path.join(log_dir, log_filename))
file_handler.setFormatter(formatter)
file_handler.setLevel(level=level)
# Add console handler.
console_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(console_formatter)
console_handler.setLevel(level=level)
logging.basicConfig(handlers=[file_handler, console_handler], level=level)

View File

@ -1,97 +0,0 @@
from __future__ import absolute_import, division, print_function
import numpy as np
import scipy.sparse as sp
import tensorflow as tf
def add_simple_summary(writer, names, values, global_step):
"""
Writes summary for a list of scalars.
:param writer:
:param names:
:param values:
:param global_step:
:return:
"""
for name, value in zip(names, values):
summary = tf.Summary()
summary_value = summary.value.add()
summary_value.simple_value = value
summary_value.tag = name
writer.add_summary(summary, global_step)
def adj_tensor_dot(adj, y):
""" Computes the matrix multiplication for the adjacency matrix and the 3D dense matrix y.
:param adj: square matrix with shape(n_node, n_node)
:param y: 3D tensor, with shape (batch_size, n_node, output_dim)
"""
y_shape = [i.value for i in y.shape]
if len(y_shape) != 3:
raise Exception('Dimension of y must be 3, instead of: %d' % len(y_shape))
y_permute_dim = list(range(len(y_shape)))
y_permute_dim = [y_permute_dim.pop(-2)] + y_permute_dim
yt = tf.reshape(tf.transpose(y, perm=y_permute_dim), [y_shape[-2], -1])
if isinstance(adj, tf.SparseTensor):
res = tf.sparse_tensor_dense_matmul(adj, yt)
else:
res = tf.matmul(adj, yt)
res = tf.reshape(res, [y_shape[-2], -1, y_shape[-1]])
res = tf.transpose(res, perm=[1, 0, 2])
return res
def dot(x, y):
"""
Wrapper for tf.matmul for x with rank >= 2.
:param x: matrix with rank >=2
:param y: matrix with rank==2
:return:
"""
[input_dim, output_dim] = y.get_shape().as_list()
input_shape = tf.shape(x)
batch_rank = input_shape.get_shape()[0].value - 1
batch_shape = input_shape[:batch_rank]
output_shape = tf.concat(0, [batch_shape, [output_dim]])
x = tf.reshape(x, [-1, input_dim])
result_ = tf.matmul(x, y)
result = tf.reshape(result_, output_shape)
return result
def get_total_trainable_parameter_size():
"""
Calculates the total number of trainable parameters in the current graph.
:return:
"""
total_parameters = 0
for variable in tf.trainable_variables():
# shape is an array of tf.Dimension
total_parameters += np.product([x.value for x in variable.get_shape()])
return total_parameters
def sparse_matrix_to_tf_sparse_tensor(sparse_mx):
"""Converts sparse matrix to tuple representation as required by tf.SparseTensor"""
def to_tuple(mx):
if not sp.isspmatrix_coo(mx):
mx = mx.tocoo()
indices = np.vstack((mx.row, mx.col)).transpose()
values = mx.data
shape = mx.shape
return indices, values, shape
if isinstance(sparse_mx, list):
for i in range(len(sparse_mx)):
sparse_mx[i] = to_tuple(sparse_mx[i])
else:
sparse_mx = to_tuple(sparse_mx)
return sparse_mx

View File

@ -1,28 +0,0 @@
import unittest
import numpy as np
import tensorflow as tf
from lib import tf_utils
class TensorDotTest(unittest.TestCase):
def test_adj_tensor_dot(self):
# adj: [[1, 0], [0, 1]]
# SparseTensor(indices=[[0, 0], [1, 2]], values=[1, 2], dense_shape=[3, 4])
adj_indices = [[0, 0], [1, 1]]
adj_values = np.array([1, 1], dtype=np.float32)
adj_shape = [2, 2]
adj = tf.SparseTensor(adj_indices, adj_values, adj_shape)
# y: (2, 2, 2), [[[1, 0], [0, 1]], [[1, 1], [1, 1]]]
y = np.array([[[1, 0], [0, 1]], [[1, 1], [1, 1]]], dtype=np.float32)
y = tf.constant(y)
expected_result = np.array([[[1, 0], [0, 1]], [[1, 1], [1, 1]]], dtype=np.float32)
result = tf_utils.adj_tensor_dot(adj, y)
with tf.Session() as sess:
result_ = sess.run(result)
self.assertTrue(np.array_equal(expected_result, result_))
if __name__ == '__main__':
unittest.main()

View File

@ -1,7 +1,49 @@
import datetime
import numpy as np
import pandas as pd
import pickle
import scipy.sparse as sp
import tensorflow as tf
from scipy.sparse import linalg
class DataLoader(object):
def __init__(self, xs, ys, batch_size, pad_with_last_sample=True, shuffle=False):
"""
:param xs:
:param ys:
:param batch_size:
:param pad_with_last_sample: pad with the last sample to make number of samples divisible to batch_size.
"""
self.batch_size = batch_size
self.current_ind = 0
if pad_with_last_sample:
num_padding = (batch_size - (len(xs) % batch_size)) % batch_size
x_padding = np.repeat(xs[-1:], num_padding, axis=0)
y_padding = np.repeat(ys[-1:], num_padding, axis=0)
xs = np.concatenate([xs, x_padding], axis=0)
ys = np.concatenate([ys, y_padding], axis=0)
self.size = len(xs)
self.num_batch = int(self.size // self.batch_size)
if shuffle:
permutation = np.random.permutation(self.size)
xs, ys = xs[permutation], ys[permutation]
self.xs = xs
self.ys = ys
def get_iterator(self):
self.current_ind = 0
def _wrapper():
while self.current_ind < self.num_batch:
start_ind = self.batch_size * self.current_ind
end_ind = min(self.size, self.batch_size * (self.current_ind + 1))
x_i = self.xs[start_ind: end_ind, ...]
y_i = self.ys[start_ind: end_ind, ...]
yield (x_i, y_i)
self.current_ind += 1
return _wrapper()
class StandardScaler:
@ -20,193 +62,82 @@ class StandardScaler:
return (data * self.std) + self.mean
def get_rush_hours_bool_index(df, hours=((7, 10), (17, 20)), weekdays=(0, 5)):
def add_simple_summary(writer, names, values, global_step):
"""
Calculates predator of rush hours: 7:00am - 9:59am, 4:00pm-7:59am, Mon-Fri.
:param df:
:param hours: a tuple of two, (start_hour, end_hour)
:param weekdays: a tuple of two, (start_weekday, end_weekday)
"""
# Week day.
weekday_predate = (df.index.dayofweek >= weekdays[0]) & (df.index.dayofweek < weekdays[1])
# Hours.
hour_predate = (df.index.time >= datetime.time(hours[0][0], 0)) & (df.index.time < datetime.time(hours[0][1], 0))
hour_predate |= (df.index.time >= datetime.time(hours[1][0], 0)) & (df.index.time < datetime.time(hours[1][1], 0))
return weekday_predate & hour_predate
def generate_io_data(data, seq_len, horizon=1, scaler=None):
"""
Generates input, output data which are
Args:
:param data: tensor
:param seq_len: length of the sequence, or timesteps.
:param horizon: the horizon of prediction.
:param strides:
:param scaler:
:return (X, Y) i.e., input, output
"""
xs, ys = [], []
total_seq_len, _ = data.shape
assert np.ndim(data) == 2
if scaler:
data = scaler.transform(data)
for i in range(0, total_seq_len - horizon - seq_len + 1):
x_i = data[i: i + seq_len, :]
y_i = data[i + seq_len + horizon - 1, :]
xs.append(x_i)
ys.append(y_i)
xs = np.stack(xs, axis=0)
ys = np.stack(ys, axis=0)
return xs, ys
def generate_io_data_with_time(df, batch_size, seq_len, horizon, output_type='point', scaler=None,
add_time_in_day=True, add_day_in_week=False):
"""
:param df:
:param batch_size:
:param seq_len:
:param horizon:
:param output_type: point, range, seq2seq
:param scaler:
:param add_time_in_day:
:param add_day_in_week:
Writes summary for a list of scalars.
:param writer:
:param names:
:param values:
:param global_step:
:return:
x, y, both are 3-D tensors with size (epoch_size, batch_size, input_dim).
"""
if scaler:
df = scaler.transform(df)
num_samples, num_nodes = df.shape
data = df.values
batch_len = num_samples // batch_size
data_list = [data]
if add_time_in_day:
time_ind = (df.index.values - df.index.values.astype('datetime64[D]')) / np.timedelta64(1, 'D')
data_list.append(time_ind.reshape(-1, 1))
if add_day_in_week:
day_in_week = np.zeros(shape=(num_samples, 7))
day_in_week[np.arange(num_samples), df.index.dayofweek] = 1
data_list.append(day_in_week)
data = np.concatenate(data_list, axis=-1)
data = data[:batch_size * batch_len, :].reshape((batch_size, batch_len, -1))
xs, ys = [], []
for i in range(seq_len, batch_len - horizon + 1):
x_i, y_i = None, None
if output_type == 'point':
x_i = data[:, i - seq_len: i, :].reshape((batch_size, -1))
y_i = data[:, i + horizon - 1, :num_nodes].reshape((batch_size, -1))
elif output_type == 'range':
x_i = data[:, i - seq_len: i, :].reshape((batch_size, -1))
y_i = data[:, i: i + horizon, :num_nodes].reshape((batch_size, -1))
elif output_type == 'seq2seq':
x_i = data[:, i - seq_len: i, :]
y_i = data[:, i: i + horizon, :]
xs.append(x_i)
ys.append(y_i)
xs = np.stack(xs, axis=0)
ys = np.stack(ys, axis=0)
return xs, ys
for name, value in zip(names, values):
summary = tf.Summary()
summary_value = summary.value.add()
summary_value.simple_value = value
summary_value.tag = name
writer.add_summary(summary, global_step)
def generate_graph_seq2seq_io_data_with_time(df, batch_size, seq_len, horizon, num_nodes, scaler=None,
add_time_in_day=True, add_day_in_week=False):
def calculate_normalized_laplacian(adj):
"""
:param df:
:param batch_size:
:param seq_len:
:param horizon:
:param scaler:
:param add_day_in_week:
:return:
x, y, both are 5-D tensors with size (epoch_size, batch_size, seq_len, num_sensors, input_dim).
Adjacent batches are continuous sequence, i.e., x[i, j, :, :] is before x[i+1, j, :, :]
"""
if scaler:
df = scaler.transform(df)
num_samples, _ = df.shape
data = df.values
batch_len = num_samples // batch_size
data = np.expand_dims(data, axis=-1)
data_list = [data]
if add_time_in_day:
time_ind = (df.index.values - df.index.values.astype('datetime64[D]')) / np.timedelta64(1, 'D')
time_in_day = np.tile(time_ind, [1, num_nodes, 1]).transpose((2, 1, 0))
data_list.append(time_in_day)
if add_day_in_week:
day_in_week = np.zeros(shape=(num_samples, num_nodes, 7))
day_in_week[np.arange(num_samples), :, df.index.dayofweek] = 1
data_list.append(day_in_week)
data = np.concatenate(data_list, axis=-1)
data = data[:batch_size * batch_len, :, :].reshape((batch_size, batch_len, num_nodes, -1))
epoch_size = batch_len - seq_len - horizon + 1
x, y = [], []
for i in range(epoch_size):
x_i = data[:, i: i + seq_len, ...]
y_i = data[:, i + seq_len: i + seq_len + horizon, :, :]
x.append(x_i)
y.append(y_i)
x = np.stack(x, axis=0)
y = np.stack(y, axis=0)
return x, y
def generate_graph_seq2seq_io_data_with_time2(df, batch_size, seq_len, horizon, num_nodes, scaler=None,
add_time_in_day=True, add_day_in_week=False):
"""
:param df:
:param batch_size:
:param seq_len:
:param horizon:
:param scaler:
:param add_time_in_day:
:param add_day_in_week:
# L = D^-1/2 (D-A) D^-1/2 = I - D^-1/2 A D^-1/2
# D = diag(A 1)
:param adj:
:return:
x, y, both are 5-D tensors with size (epoch_size, batch_size, seq_len, num_sensors, input_dim).
Adjacent batches are continuous sequence, i.e., x[i, j, :, :] is before x[i+1, j, :, :]
"""
if scaler:
df = scaler.transform(df)
num_samples, _ = df.shape
assert df.shape[1] == num_nodes
data = df.values
data = np.expand_dims(data, axis=-1)
data_list = [data]
if add_time_in_day:
time_ind = (df.index.values - df.index.values.astype('datetime64[D]')) / np.timedelta64(1, 'D')
time_in_day = np.tile(time_ind, [1, num_nodes, 1]).transpose((2, 1, 0))
data_list.append(time_in_day)
if add_day_in_week:
day_in_week = np.zeros(shape=(num_samples, num_nodes, 7))
day_in_week[np.arange(num_samples), :, df.index.dayofweek] = 1
data_list.append(day_in_week)
adj = sp.coo_matrix(adj)
d = np.array(adj.sum(1))
d_inv_sqrt = np.power(d, -0.5).flatten()
d_inv_sqrt[np.isinf(d_inv_sqrt)] = 0.
d_mat_inv_sqrt = sp.diags(d_inv_sqrt)
normalized_laplacian = sp.eye(adj.shape[0]) - adj.dot(d_mat_inv_sqrt).transpose().dot(d_mat_inv_sqrt).tocoo()
return normalized_laplacian
# data: (num_samples, num_nodes, num_features)
data = np.concatenate(data_list, axis=-1)
num_features = data.shape[-1]
# Extract x and y
epoch_size = num_samples - seq_len - horizon + 1
x, y = [], []
for i in range(epoch_size):
x_i = data[i: i + seq_len, ...]
y_i = data[i + seq_len: i + seq_len + horizon, ...]
x.append(x_i)
y.append(y_i)
x = np.stack(x, axis=0)
y = np.stack(y, axis=0)
epoch_size //= batch_size
x = x[:batch_size * epoch_size, ...]
y = y[:batch_size * epoch_size, ...]
x = x.reshape(epoch_size, batch_size, seq_len, num_nodes, num_features)
y = y.reshape(epoch_size, batch_size, horizon, num_nodes, num_features)
return x, y
def calculate_random_walk_matrix(adj_mx):
adj_mx = sp.coo_matrix(adj_mx)
d = np.array(adj_mx.sum(1))
d_inv = np.power(d, -1).flatten()
d_inv[np.isinf(d_inv)] = 0.
d_mat_inv = sp.diags(d_inv)
random_walk_mx = d_mat_inv.dot(adj_mx).tocoo()
return random_walk_mx
def calculate_reverse_random_walk_matrix(adj_mx):
return calculate_random_walk_matrix(np.transpose(adj_mx))
def calculate_scaled_laplacian(adj_mx, lambda_max=2, undirected=True):
if undirected:
adj_mx = np.maximum.reduce([adj_mx, adj_mx.T])
L = calculate_normalized_laplacian(adj_mx)
if lambda_max is None:
lambda_max, _ = linalg.eigsh(L, 1, which='LM')
lambda_max = lambda_max[0]
L = sp.csr_matrix(L)
M, _ = L.shape
I = sp.identity(M, format='csr', dtype=L.dtype)
L = (2 / lambda_max * L) - I
return L.astype(np.float32)
def get_total_trainable_parameter_size():
"""
Calculates the total number of trainable parameters in the current graph.
:return:
"""
total_parameters = 0
for variable in tf.trainable_variables():
# shape is an array of tf.Dimension
total_parameters += np.product([x.value for x in variable.get_shape()])
return total_parameters
def load_graph_data(pkl_filename):
sensor_ids, sensor_id_to_ind, adj_mx = load_pickle(pkl_filename)
return sensor_ids, sensor_id_to_ind, adj_mx
def load_pickle(pickle_file):
@ -220,69 +151,3 @@ def load_pickle(pickle_file):
print('Unable to load data ', pickle_file, ':', e)
raise
return pickle_data
def round_down(num, divisor):
return num - (num % divisor)
def separate_seasonal_trend_and_residual(df, period, test_ratio=0.2, null_val=0., epsilon=1e-4):
"""
:param df:
:param period:
:param test_ratio: only use training part to calculate the average.
:param null_val: indicator of missing values. Assuming null_val
:param epsilon:
:return:
"""
n_sample, n_sensor = df.shape
n_test = int(round(n_sample * test_ratio))
n_train = n_sample - n_test
seasonal_trend = np.zeros((period, n_sensor), dtype=np.float32)
for i in range(period):
inds = [j for j in range(i % period, n_train, period)]
historical = df.iloc[inds, :]
seasonal_trend[i, :] = historical[historical != null_val].mean()
n_repeat = (n_sample + period - 1) // period
data = np.tile(seasonal_trend, [n_repeat, 1])[:n_sample, :]
seasonal_df = pd.DataFrame(data, index=df.index, columns=df.columns)
# Records where null value is happening.
missing_ind = df == null_val
residual_df = df - seasonal_df
residual_df[residual_df == null_val] += epsilon
residual_df[missing_ind] = null_val
return seasonal_df, residual_df
def train_test_split(x, y, test_ratio=0.2, random=False, granularity=1):
"""
This just splits data to training and testing parts. Default 80% train, 20% test
Format : data is in compressed sparse row format
Args:
:param x data
:param y label
:param test_ratio:
:param random: whether to randomize the input data.
:param granularity:
"""
perms = np.arange(0, x.shape[0])
if random:
perms = np.random.permutation(np.arange(0, x.shape[0]))
n_train = round_down(int(round(x.shape[0] * (1 - test_ratio))), granularity)
n_test = round_down(x.shape[0] - n_train, granularity)
x_train, y_train = x.take(perms[:n_train], axis=0), y.take(perms[:n_train], axis=0)
x_test, y_test = x.take(perms[n_train:n_train + n_test], axis=0), y.take(perms[n_train:n_train + n_test], axis=0)
return (x_train, y_train), (x_test, y_test)
def train_val_test_split_df(df, val_ratio=0.1, test_ratio=0.2):
n_sample, _ = df.shape
n_val = int(round(n_sample * val_ratio))
n_test = int(round(n_sample * test_ratio))
n_train = n_sample - n_val - n_test
train_data, val_data, test_data = df.iloc[:n_train, :], df.iloc[n_train: n_train + n_val, :], df.iloc[-n_test:, :]
return train_data, val_data, test_data

View File

@ -1,122 +0,0 @@
import unittest
import numpy as np
import pandas as pd
from lib import utils
from lib.utils import StandardScaler
class MyTestCase(unittest.TestCase):
def test_separate_seasonal_trend_and_residual(self):
data = np.array([
[2, 1, 2, 3, 0, 1, 2, 1, 2, 3, 4, 3, 0, 3, 4, 1]
], dtype=np.float32).T
trends = np.array([
[1, 2, 3, 2, 1, 2, 3, 2, 1, 2, 3, 2, 1, 2, 3, 2]
], dtype=np.float32).T
residual = np.array([
[1, -1, -1, 1, -1, -1, -1, -1, 1, 1, 1, 1, -1, 1, 1, -1]
], dtype=np.float32).T
df = pd.DataFrame(data)
df_trend, df_residual = utils.separate_seasonal_trend_and_residual(df, period=4, test_ratio=0, null_val=-1)
self.assertTrue(np.array_equal(df_trend.values, trends))
self.assertTrue(np.array_equal(df_residual.values, residual))
def test_get_rush_hours_bool_index(self):
index = pd.date_range('2017-02-27', '2017-03-06', freq='1min')
data = np.zeros((len(index), 3))
df = pd.DataFrame(data, index=index)
ind = utils.get_rush_hours_bool_index(df)
df = df[ind]
self.assertEqual(6 * 5 * 60, df.shape[0])
class IODataPreparationTest(unittest.TestCase):
from lib import utils
def test_generate_io_data_with_time(self):
data = np.array([
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15],
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
], dtype=np.float32).T
df = pd.DataFrame(data, index=pd.date_range('2017-10-18', '2017-10-19 23:59', freq='3h'))
xs, ys = utils.generate_io_data_with_time(df, batch_size=2, seq_len=3, horizon=3, output_type='range', )
self.assertTupleEqual(xs.shape, (3, 2, 9))
self.assertTupleEqual(ys.shape, (3, 2, 6))
def test_generate_graph_seq2seq_io_data_with_time(self):
data = np.array([
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15],
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
], dtype=np.float32).T
df = pd.DataFrame(data, index=pd.date_range('2017-10-18', '2017-10-19 23:59', freq='3h'))
xs, ys = utils.generate_graph_seq2seq_io_data_with_time2(df, batch_size=2, seq_len=3, horizon=3, num_nodes=2)
self.assertTupleEqual(xs.shape, (5, 2, 3, 2, 2))
self.assertTupleEqual(ys.shape, (5, 2, 3, 2, 2))
class StandardScalerTest(unittest.TestCase):
def test_transform(self):
data = np.array([
[35., 0.],
[0., 17.5],
[70., 35.]]
)
expected_result = np.array([
[0., -1.],
[-1, -0.5],
[1., 0.]]
)
scaler = StandardScaler(mean=35., std=35.)
result = scaler.transform(data)
self.assertTrue(np.array_equal(expected_result, result))
def test_transform_df(self):
df = pd.DataFrame([
[35., 0.],
[0., 17.5],
[70., 35.]]
)
expected_result = np.array([
[0., -1.],
[-1, -0.5],
[1., 0.]]
)
scaler = StandardScaler(mean=35., std=35.)
result = scaler.transform(df)
self.assertTrue(np.array_equal(expected_result, result.values))
def test_reverse_transform(self):
data = np.array([
[0., -1.],
[-1, -0.5],
[1., 0.]]
)
expected_result = np.array([
[35., 0.],
[0., 17.5],
[70., 35.]]
)
scaler = StandardScaler(mean=35., std=35.)
result = scaler.inverse_transform(data)
self.assertTrue(np.array_equal(expected_result, result))
def test_reverse_transform_df(self):
df = pd.DataFrame([
[0., -1.],
[-1, -0.5],
[1., 0.]]
)
expected_result = np.array([
[35., 0.],
[0., 17.5],
[70., 35.]]
)
scaler = StandardScaler(mean=35., std=35.)
result = scaler.inverse_transform(df)
self.assertTrue(np.array_equal(expected_result, result.values))
if __name__ == '__main__':
unittest.main()

View File

@ -7,9 +7,7 @@ import tensorflow as tf
from tensorflow.contrib.rnn import RNNCell
from tensorflow.python.platform import tf_logging as logging
from lib import dcrnn_utils
from lib import utils
class DCGRUCell(RNNCell):
@ -19,11 +17,11 @@ class DCGRUCell(RNNCell):
def call(self, inputs, **kwargs):
pass
def _compute_output_shape(self, input_shape):
def compute_output_shape(self, input_shape):
pass
def __init__(self, num_units, adj_mx, max_diffusion_step, num_nodes, input_size=None, num_proj=None,
activation=tf.nn.tanh, reuse=None, filter_type="laplacian"):
def __init__(self, num_units, adj_mx, max_diffusion_step, num_nodes, num_proj=None,
activation=tf.nn.tanh, reuse=None, filter_type="laplacian", use_gc_for_ru=True):
"""
:param num_units:
@ -35,26 +33,26 @@ class DCGRUCell(RNNCell):
:param activation:
:param reuse:
:param filter_type: "laplacian", "random_walk", "dual_random_walk".
:param use_gc_for_ru: whether to use Graph convolution to calculate the reset and update gates.
"""
super(DCGRUCell, self).__init__(_reuse=reuse)
if input_size is not None:
logging.warn("%s: The input_size parameter is deprecated.", self)
self._activation = activation
self._num_nodes = num_nodes
self._num_proj = num_proj
self._num_units = num_units
self._max_diffusion_step = max_diffusion_step
self._supports = []
self._use_gc_for_ru = use_gc_for_ru
supports = []
if filter_type == "laplacian":
supports.append(dcrnn_utils.calculate_scaled_laplacian(adj_mx, lambda_max=None))
supports.append(utils.calculate_scaled_laplacian(adj_mx, lambda_max=None))
elif filter_type == "random_walk":
supports.append(dcrnn_utils.calculate_random_walk_matrix(adj_mx).T)
supports.append(utils.calculate_random_walk_matrix(adj_mx).T)
elif filter_type == "dual_random_walk":
supports.append(dcrnn_utils.calculate_random_walk_matrix(adj_mx).T)
supports.append(dcrnn_utils.calculate_random_walk_matrix(adj_mx.T).T)
supports.append(utils.calculate_random_walk_matrix(adj_mx).T)
supports.append(utils.calculate_random_walk_matrix(adj_mx.T).T)
else:
supports.append(dcrnn_utils.calculate_scaled_laplacian(adj_mx))
supports.append(utils.calculate_scaled_laplacian(adj_mx))
for support in supports:
self._supports.append(self._build_sparse_matrix(support))
@ -87,13 +85,19 @@ class DCGRUCell(RNNCell):
"""
with tf.variable_scope(scope or "dcgru_cell"):
with tf.variable_scope("gates"): # Reset gate and update gate.
output_size = 2 * self._num_units
# We start with bias of 1.0 to not reset and not update.
value = tf.nn.sigmoid(
self._gconv(inputs, state, 2 * self._num_units, bias_start=1.0, scope=scope))
r, u = tf.split(value=value, num_or_size_splits=2, axis=1)
# r, u = sigmoid(r), sigmoid(u)
if self._use_gc_for_ru:
fn = self._gconv
else:
fn = self._fc
value = tf.nn.sigmoid(fn(inputs, state, output_size, bias_start=1.0))
value = tf.reshape(value, (-1, self._num_nodes, output_size))
r, u = tf.split(value=value, num_or_size_splits=2, axis=-1)
r = tf.reshape(r, (-1, self._num_nodes * self._num_units))
u = tf.reshape(u, (-1, self._num_nodes * self._num_units))
with tf.variable_scope("candidate"):
c = self._gconv(inputs, r * state, self._num_units, scope=scope)
c = self._gconv(inputs, r * state, self._num_units)
if self._activation is not None:
c = self._activation(c)
output = new_state = u * state + (1 - u) * c
@ -110,7 +114,23 @@ class DCGRUCell(RNNCell):
x_ = tf.expand_dims(x_, 0)
return tf.concat([x, x_], axis=0)
def _gconv(self, inputs, state, output_size, bias_start=0.0, scope=None):
def _fc(self, inputs, state, output_size, bias_start=0.0):
dtype = inputs.dtype
batch_size = inputs.get_shape()[0].value
inputs = tf.reshape(inputs, (batch_size * self._num_nodes, -1))
state = tf.reshape(state, (batch_size * self._num_nodes, -1))
inputs_and_state = tf.concat([inputs, state], axis=-1)
input_size = inputs_and_state.get_shape()[-1].value
weights = tf.get_variable(
'weights', [input_size, output_size], dtype=dtype,
initializer=tf.contrib.layers.xavier_initializer())
value = tf.nn.sigmoid(tf.matmul(inputs_and_state, weights))
biases = tf.get_variable("biases", [output_size], dtype=dtype,
initializer=tf.constant_initializer(bias_start, dtype=dtype))
value = tf.nn.bias_add(value, biases)
return value
def _gconv(self, inputs, state, output_size, bias_start=0.0):
"""Graph convolution between input and the graph matrix.
:param args: a 2D Tensor or a list of 2D, batch x n, Tensors.
@ -157,10 +177,8 @@ class DCGRUCell(RNNCell):
initializer=tf.contrib.layers.xavier_initializer())
x = tf.matmul(x, weights) # (batch_size * self._num_nodes, output_size)
biases = tf.get_variable(
"biases", [output_size],
dtype=dtype,
initializer=tf.constant_initializer(bias_start, dtype=dtype))
biases = tf.get_variable("biases", [output_size], dtype=dtype,
initializer=tf.constant_initializer(bias_start, dtype=dtype))
x = tf.nn.bias_add(x, biases)
# Reshape res back to 2D: (batch_size, num_node, state_dim) -> (batch_size, num_node * state_dim)
return tf.reshape(x, [batch_size, self._num_nodes * output_size])

View File

@ -2,34 +2,45 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import tensorflow as tf
from tensorflow.contrib import legacy_seq2seq
from lib.metrics import masked_mse_loss, masked_mae_loss, masked_rmse_loss
from model.dcrnn_cell import DCGRUCell
from model.tf_model import TFModel
class DCRNNModel(TFModel):
def __init__(self, is_training, config, scaler=None, adj_mx=None):
super(DCRNNModel, self).__init__(config, scaler=scaler)
batch_size = int(config.get('batch_size'))
max_diffusion_step = int(config.get('max_diffusion_step', 2))
cl_decay_steps = int(config.get('cl_decay_steps', 1000))
filter_type = config.get('filter_type', 'laplacian')
horizon = int(config.get('horizon', 1))
input_dim = int(config.get('input_dim', 1))
loss_func = config.get('loss_func', 'MSE')
max_grad_norm = float(config.get('max_grad_norm', 5.0))
num_nodes = int(config.get('num_nodes', 1))
num_rnn_layers = int(config.get('num_rnn_layers', 1))
output_dim = int(config.get('output_dim', 1))
rnn_units = int(config.get('rnn_units'))
seq_len = int(config.get('seq_len'))
use_curriculum_learning = bool(config.get('use_curriculum_learning', False))
class DCRNNModel(object):
def __init__(self, is_training, batch_size, scaler, adj_mx, **model_kwargs):
# Scaler for data normalization.
self._scaler = scaler
# Train and loss
self._loss = None
self._mae = None
self._train_op = None
# Learning rate.
self._lr = tf.get_variable('learning_rate', shape=(), initializer=tf.constant_initializer(0.01),
trainable=False)
self._new_lr = tf.placeholder(tf.float32, shape=(), name='new_learning_rate')
self._lr_update = tf.assign(self._lr, self._new_lr, name='lr_update')
max_diffusion_step = int(model_kwargs.get('max_diffusion_step', 2))
cl_decay_steps = int(model_kwargs.get('cl_decay_steps', 1000))
filter_type = model_kwargs.get('filter_type', 'laplacian')
horizon = int(model_kwargs.get('horizon', 1))
max_grad_norm = float(model_kwargs.get('max_grad_norm', 5.0))
num_nodes = int(model_kwargs.get('num_nodes', 1))
num_rnn_layers = int(model_kwargs.get('num_rnn_layers', 1))
rnn_units = int(model_kwargs.get('rnn_units'))
seq_len = int(model_kwargs.get('seq_len'))
use_curriculum_learning = bool(model_kwargs.get('use_curriculum_learning', False))
input_dim = int(model_kwargs.get('input_dim', 1))
output_dim = int(model_kwargs.get('output_dim', 1))
aux_dim = input_dim - output_dim
# assert input_dim == output_dim, 'input_dim: %d != output_dim: %d' % (input_dim, output_dim)
# Input (batch_size, timesteps, num_sensor, input_dim)
self._inputs = tf.placeholder(tf.float32, shape=(batch_size, seq_len, num_nodes, input_dim), name='inputs')
# Labels: (batch_size, timesteps, num_sensor, input_dim), same format with input except the temporal dimension.
@ -57,7 +68,7 @@ class DCRNNModel(TFModel):
aux_info.insert(0, None)
labels.insert(0, GO_SYMBOL)
def loop_function(prev, i):
def _loop_function(prev, i):
if is_training:
# Return either the model's prediction or the previous ground truth in training.
if use_curriculum_learning:
@ -77,27 +88,18 @@ class DCRNNModel(TFModel):
_, enc_state = tf.contrib.rnn.static_rnn(encoding_cells, inputs, dtype=tf.float32)
outputs, final_state = legacy_seq2seq.rnn_decoder(labels, enc_state, decoding_cells,
loop_function=loop_function)
loop_function=_loop_function)
# Project the output to output_dim.
outputs = tf.stack(outputs[:-1], axis=1)
self._outputs = tf.reshape(outputs, (batch_size, horizon, num_nodes, output_dim), name='outputs')
# preds = self._outputs[..., 0]
preds = self._outputs
labels = self._labels[..., :output_dim]
null_val = config.get('null_val', 0.)
null_val = 0.
self._mae = masked_mae_loss(self._scaler, null_val)(preds=preds, labels=labels)
if loss_func == 'MSE':
self._loss = masked_mse_loss(self._scaler, null_val)(preds=preds, labels=labels)
elif loss_func == 'MAE':
self._loss = masked_mae_loss(self._scaler, null_val)(preds=preds, labels=labels)
elif loss_func == 'RMSE':
self._loss = masked_rmse_loss(self._scaler, null_val)(preds=preds, labels=labels)
else:
self._loss = masked_mse_loss(self._scaler, null_val)(preds=preds, labels=labels)
self._loss = masked_mae_loss(self._scaler, null_val)(preds=preds, labels=labels)
if is_training:
optimizer = tf.train.AdamOptimizer(self._lr)
tvars = tf.trainable_variables()
@ -116,3 +118,90 @@ class DCRNNModel(TFModel):
:return:
"""
return tf.cast(k / (k + tf.exp(global_step / k)), tf.float32)
@staticmethod
def run_epoch_generator(sess, model, data_generator, return_output=False, train_op=None, writer=None):
losses = []
maes = []
outputs = []
fetches = {
'mae': model.mae,
'loss': model.loss,
'global_step': tf.train.get_or_create_global_step()
}
if train_op:
fetches.update({
'train_op': train_op,
})
merged = model.merged
if merged is not None:
fetches.update({'merged': merged})
if return_output:
fetches.update({
'outputs': model.outputs
})
for _, (x, y) in enumerate(data_generator):
feed_dict = {
model.inputs: x,
model.labels: y,
}
vals = sess.run(fetches, feed_dict=feed_dict)
losses.append(vals['loss'])
maes.append(vals['mae'])
if writer is not None and 'merged' in vals:
writer.add_summary(vals['merged'], global_step=vals['global_step'])
if return_output:
outputs.append(vals['outputs'])
results = {
'loss': np.mean(losses),
'mae': np.mean(maes)
}
if return_output:
results['outputs'] = outputs
return results
def get_lr(self, sess):
return np.asscalar(sess.run(self._lr))
def set_lr(self, sess, lr):
sess.run(self._lr_update, feed_dict={
self._new_lr: lr
})
@property
def inputs(self):
return self._inputs
@property
def labels(self):
return self._labels
@property
def loss(self):
return self._loss
@property
def lr(self):
return self._lr
@property
def mae(self):
return self._mae
@property
def merged(self):
return self._merged
@property
def outputs(self):
return self._outputs
@property
def train_op(self):
return self._train_op

View File

@ -2,134 +2,250 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import time
import logging
import numpy as np
import pandas as pd
import os
import sys
import tensorflow as tf
import time
import yaml
from lib import utils, log_helper, metrics
from lib.utils import StandardScaler, DataLoader
from lib.utils import generate_graph_seq2seq_io_data_with_time
from model.dcrnn_model import DCRNNModel
from model.tf_model_supervisor import TFModelSupervisor
class DCRNNSupervisor(TFModelSupervisor):
class DCRNNSupervisor(object):
"""
Do experiments using Graph Random Walk RNN model.
"""
def __init__(self, traffic_reading_df, adj_mx, config):
self._adj_mx = adj_mx
super(DCRNNSupervisor, self).__init__(config, df_data=traffic_reading_df)
def __init__(self, adj_mx, **kwargs):
def _prepare_train_val_test_data(self):
# Parsing model parameters.
batch_size = self._get_config('batch_size')
horizon = self._get_config('horizon')
seq_len = self._get_config('seq_len')
self._kwargs = kwargs
self._data_kwargs = kwargs.get('data')
self._model_kwargs = kwargs.get('model')
self._train_kwargs = kwargs.get('train')
test_batch_size = 1
add_time_in_day = self._get_config('add_time_in_day')
# logging.
self._log_dir = self._get_log_dir(kwargs)
log_helper.config_logging(log_dir=self.log_dir, log_filename='info.log', level=logging.DEBUG)
self._writer = tf.summary.FileWriter(self._log_dir)
logging.info(kwargs)
num_nodes = self._df_train.shape[-1]
x_train, y_train = generate_graph_seq2seq_io_data_with_time(self._df_train,
batch_size=batch_size,
seq_len=seq_len,
horizon=horizon,
num_nodes=num_nodes,
scaler=self._scaler,
add_time_in_day=add_time_in_day,
add_day_in_week=False)
x_val, y_val = generate_graph_seq2seq_io_data_with_time(self._df_val, batch_size=batch_size,
seq_len=seq_len,
horizon=horizon,
num_nodes=num_nodes,
scaler=self._scaler,
add_time_in_day=add_time_in_day,
add_day_in_week=False)
x_test, y_test = generate_graph_seq2seq_io_data_with_time(self._df_test,
batch_size=test_batch_size,
seq_len=seq_len,
horizon=horizon,
num_nodes=num_nodes,
scaler=self._scaler,
add_time_in_day=add_time_in_day,
add_day_in_week=False)
return x_train, y_train, x_val, y_val, x_test, y_test
# Data preparation
self._data = self._prepare_data(**self._data_kwargs)
def _build_train_val_test_models(self):
# Builds the model.
input_dim = self._x_train.shape[-1]
num_nodes = self._df_test.shape[-1]
output_dim = self._get_config('output_dim')
test_batch_size = self._get_config('test_batch_size')
train_config = dict(self._config)
train_config.update({
'input_dim': input_dim,
'num_nodes': num_nodes,
'output_dim': output_dim,
})
test_config = dict(self._config)
test_config.update({
'batch_size': test_batch_size,
'input_dim': input_dim,
'num_nodes': num_nodes,
'output_dim': output_dim,
})
# Build models.
scaler = self._data['scaler']
self._epoch = 0
with tf.name_scope('Train'):
with tf.variable_scope('DCRNN', reuse=False):
train_model = DCRNNModel(is_training=True, config=train_config, scaler=self._scaler,
adj_mx=self._adj_mx)
self._train_model = DCRNNModel(is_training=True, scaler=scaler,
batch_size=self._data_kwargs['batch_size'],
adj_mx=adj_mx, **self._model_kwargs)
with tf.name_scope('Val'):
with tf.variable_scope('DCRNN', reuse=True):
val_model = DCRNNModel(is_training=False, config=train_config, scaler=self._scaler,
adj_mx=self._adj_mx)
self._val_model = DCRNNModel(is_training=False, scaler=scaler,
batch_size=self._data_kwargs['batch_size'],
adj_mx=adj_mx, **self._model_kwargs)
with tf.name_scope('Test'):
with tf.variable_scope('DCRNN', reuse=True):
test_model = DCRNNModel(is_training=False, config=test_config, scaler=self._scaler,
adj_mx=self._adj_mx)
self._test_model = DCRNNModel(is_training=False, scaler=scaler,
batch_size=self._data_kwargs['test_batch_size'],
adj_mx=adj_mx, **self._model_kwargs)
return train_model, val_model, test_model
def _convert_model_outputs_to_eval_df(self, y_preds):
y_preds = np.stack(y_preds, axis=1)
# y_preds: (batch_size, epoch_size, horizon, num_nodes, output_dim)
# horizon = y_preds.shape[2]
horizon = self._get_config('horizon')
num_nodes = self._df_train.shape[-1]
df_preds = {}
for horizon_i in range(horizon):
y_pred = np.reshape(y_preds[:, :, horizon_i, :, 0], self._eval_dfs[horizon_i].shape)
df_pred = pd.DataFrame(self._scaler.inverse_transform(y_pred), index=self._eval_dfs[horizon_i].index,
columns=self._eval_dfs[horizon_i].columns)
df_preds[horizon_i] = df_pred
return df_preds
# Log model statistics.
total_trainable_parameter = utils.get_total_trainable_parameter_size()
logging.info('Total number of trainable parameters: %d' % total_trainable_parameter)
for var in tf.global_variables():
logging.info('%s, %s' % (var.name, var.get_shape()))
@staticmethod
def _generate_run_id(config):
batch_size = config.get('batch_size')
dropout = config.get('dropout')
learning_rate = config.get('learning_rate')
loss_func = config.get('loss_func')
max_diffusion_step = config['max_diffusion_step']
num_rnn_layers = config.get('num_rnn_layers')
rnn_units = config.get('rnn_units')
seq_len = config.get('seq_len')
structure = '-'.join(
['%d' % rnn_units for _ in range(num_rnn_layers)])
horizon = config.get('horizon')
filter_type = config.get('filter_type')
filter_type_abbr = 'L'
if filter_type == 'random_walk':
filter_type_abbr = 'R'
elif filter_type == 'dual_random_walk':
filter_type_abbr = 'DR'
run_id = 'dcrnn_%s_%d_h_%d_%s_lr_%g_bs_%d_d_%.2f_sl_%d_%s_%s/' % (
filter_type_abbr, max_diffusion_step, horizon,
structure, learning_rate, batch_size,
dropout, seq_len, loss_func,
time.strftime('%m%d%H%M%S'))
return run_id
def _get_log_dir(kwargs):
log_dir = kwargs['train'].get('log_dir')
if log_dir is None:
batch_size = kwargs['data'].get('batch_size')
learning_rate = kwargs['train'].get('base_lr')
max_diffusion_step = kwargs['model'].get('max_diffusion_step')
num_rnn_layers = kwargs['model'].get('num_rnn_layers')
rnn_units = kwargs['model'].get('rnn_units')
structure = '-'.join(
['%d' % rnn_units for _ in range(num_rnn_layers)])
horizon = kwargs['model'].get('horizon')
filter_type = kwargs['model'].get('filter_type')
filter_type_abbr = 'L'
if filter_type == 'random_walk':
filter_type_abbr = 'R'
elif filter_type == 'dual_random_walk':
filter_type_abbr = 'DR'
run_id = 'dcrnn_%s_%d_h_%d_%s_lr_%g_bs_%d_%s/' % (
filter_type_abbr, max_diffusion_step, horizon,
structure, learning_rate, batch_size,
time.strftime('%m%d%H%M%S'))
base_dir = kwargs.get('base_dir')
log_dir = os.path.join(base_dir, run_id)
if not os.path.exists(log_dir):
os.makedirs(log_dir)
return log_dir
@staticmethod
def _prepare_data(dataset_dir, **kwargs):
data = {}
for category in ['train', 'val', 'test']:
cat_data = np.load(os.path.join(dataset_dir, category + '.npz'))
data['x_' + category] = cat_data['x']
data['y_' + category] = cat_data['y']
scaler = StandardScaler(mean=data['x_train'][..., 0].mean(), std=data['x_train'][..., 0].std())
# Data format
for category in ['train', 'val', 'test']:
data['x_' + category][..., 0] = scaler.transform(data['x_' + category][..., 0])
data['y_' + category][..., 0] = scaler.transform(data['y_' + category][..., 0])
for k, v in data.items():
logging.info((k, v.shape))
data['train_loader'] = DataLoader(data['x_train'], data['y_train'], kwargs['batch_size'], shuffle=True)
data['val_loader'] = DataLoader(data['x_val'], data['y_val'], kwargs['val_batch_size'], shuffle=False)
data['test_loader'] = DataLoader(data['x_test'], data['y_test'], kwargs['test_batch_size'], shuffle=False)
data['scaler'] = scaler
return data
def train(self, sess, **kwargs):
kwargs.update(self._train_kwargs)
return self._train(sess, **kwargs)
def _train(self, sess, base_lr, epoch, steps, patience=50, epochs=100,
min_learning_rate=2e-6, lr_decay_ratio=0.1, save_model=1,
test_every_n_epochs=10, **train_kwargs):
history = []
min_val_loss = float('inf')
wait = 0
max_to_keep = train_kwargs.get('max_to_keep', 100)
saver = tf.train.Saver(tf.global_variables(), max_to_keep=max_to_keep)
model_filename = train_kwargs.get('model_filename')
if model_filename is not None:
saver.restore(sess, model_filename)
self._epoch = epoch + 1
else:
sess.run(tf.global_variables_initializer())
while self._epoch <= epochs:
# Learning rate schedule.
new_lr = max(min_learning_rate, base_lr * (lr_decay_ratio ** np.sum(self._epoch >= np.array(steps))))
self._train_model.set_lr(sess=sess, lr=new_lr)
sys.stdout.flush()
start_time = time.time()
train_results = self._train_model.run_epoch_generator(sess, self._train_model,
self._data['train_loader'].get_iterator(),
train_op=self._train_model.train_op,
writer=self._writer)
train_loss, train_mae = train_results['loss'], train_results['mae']
if train_loss > 1e5:
logging.warning('Gradient explosion detected. Ending...')
break
global_step = sess.run(tf.train.get_or_create_global_step())
# Compute validation error.
val_results = self._val_model.run_epoch_generator(sess, self._val_model,
self._data['val_loader'].get_iterator(),
train_op=None)
val_loss, val_mae = val_results['loss'], val_results['mae']
utils.add_simple_summary(self._writer,
['loss/train_loss', 'metric/train_mae', 'loss/val_loss', 'metric/val_mae'],
[train_loss, train_mae, val_loss, val_mae], global_step=global_step)
end_time = time.time()
message = 'Epoch [{}] ({}) train_mae: {:.4f}, val_mae: {:.4f} lr:{:.6f} {:.1f}s'.format(
self._epoch, global_step, train_mae, val_mae, new_lr, (end_time - start_time))
logging.info(message)
if self._epoch % test_every_n_epochs == test_every_n_epochs - 1:
self.test_and_write_result(sess, global_step)
if val_loss <= min_val_loss:
wait = 0
if save_model > 0:
model_filename = self.save_model(sess, saver, val_loss)
logging.info(
'Val loss decrease from %.4f to %.4f, saving to %s' % (min_val_loss, val_loss, model_filename))
min_val_loss = val_loss
else:
wait += 1
if wait > patience:
logging.warning('Early stopping at epoch: %d' % self._epoch)
break
history.append(val_mae)
# Increases epoch.
self._epoch += 1
sys.stdout.flush()
return np.min(history)
def test_and_write_result(self, sess, global_step, **kwargs):
test_results = self._test_model.run_epoch_generator(sess, self._test_model,
self._data['test_loader'].get_iterator(),
return_output=True, train_op=None)
# y_preds: a list of (batch_size, horizon, num_nodes, output_dim)
test_loss, y_preds = test_results['loss'], test_results['outputs']
utils.add_simple_summary(self._writer, ['loss/test_loss'], [test_loss], global_step=global_step)
y_preds = np.concatenate(y_preds, axis=0)
scaler = self._data['scaler']
outputs = []
for horizon_i in range(self._data['y_test'].shape[1]):
y_truth = np.concatenate(self._data['y_test'][:, horizon_i, :, 0], axis=0)
y_truth = scaler.inverse_transform(y_truth)
y_pred = np.concatenate(y_preds[:, horizon_i, :, 0], axis=0)
y_pred = y_pred[:y_truth.shape[0], ...] # Only take the batch number
y_pred = scaler.inverse_transform(y_pred)
outputs.append(y_pred)
mae = metrics.masked_mae_np(y_pred, y_truth, null_val=0)
mape = metrics.masked_mape_np(y_pred, y_truth, null_val=0)
rmse = metrics.masked_rmse_np(y_pred, y_truth, null_val=0)
logging.info(
"Horizon {:02d}, MAE: {:.2f}, MAPE: {:.4f}, RMSE: {:.2f}".format(
horizon_i + 1, mae, mape, rmse
)
)
utils.add_simple_summary(self._writer,
['%s_%d' % (item, horizon_i + 1) for item in
['metric/rmse', 'metric/mape', 'metric/mae']],
[rmse, mape, mae],
global_step=global_step)
return y_preds
@staticmethod
def restore(sess, config):
"""
Restore from saved model.
:param sess:
:param config:
:return:
"""
model_filename = config['train'].get('model_filename')
max_to_keep = config['train'].get('max_to_keep', 100)
saver = tf.train.Saver(tf.global_variables(), max_to_keep=max_to_keep)
saver.restore(sess, model_filename)
def save_model(self, sess, saver, val_loss):
config_filename = 'config_{}.yaml'.format(self._epoch)
config = dict(self._kwargs)
global_step = np.asscalar(sess.run(tf.train.get_or_create_global_step()))
config['train']['epoch'] = self._epoch
config['train']['global_step'] = global_step
config['train']['log_dir'] = self._log_dir
config['train']['model_filename'] = saver.save(sess,
os.path.join(self._log_dir, 'models-{:.4f}'.format(val_loss)),
global_step=global_step, write_meta_graph=False)
with open(os.path.join(self._log_dir, config_filename), 'w') as f:
yaml.dump(config, f, default_flow_style=False)
return config['train']['model_filename']
@property
def log_dir(self):
return self._log_dir

View File

@ -1,130 +0,0 @@
"""
Base class for tensorflow models for traffic forecasting.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import tensorflow as tf
class TFModel(object):
def __init__(self, config, scaler=None, **kwargs):
"""
Initialization including placeholders, learning rate,
:param config:
:param scaler: data z-norm normalizer
:param kwargs:
"""
self._config = dict(config)
# Placeholders for input and output.
self._inputs = None
self._labels = None
self._outputs = None
# Scaler for data normalization.
self._scaler = scaler
# Train and loss
self._loss = None
self._mae = None
self._train_op = None
# Learning rate.
learning_rate = config.get('learning_rate', 0.001)
self._lr = tf.get_variable('learning_rate', shape=(), initializer=tf.constant_initializer(learning_rate),
trainable=False)
self._new_lr = tf.placeholder(tf.float32, shape=(), name='new_learning_rate')
self._lr_update = tf.assign(self._lr, self._new_lr, name='lr_update')
# Log merged summary
self._merged = None
@staticmethod
def run_epoch(sess, model, inputs, labels, return_output=False, train_op=None, writer=None):
losses = []
maes = []
outputs = []
fetches = {
'mae': model.mae,
'loss': model.loss,
'global_step': tf.train.get_or_create_global_step()
}
if train_op:
fetches.update({
'train_op': train_op,
})
merged = model.merged
if merged is not None:
fetches.update({'merged': merged})
if return_output:
fetches.update({
'outputs': model.outputs
})
for _, (x, y) in enumerate(zip(inputs, labels)):
feed_dict = {
model.inputs: x,
model.labels: y,
}
vals = sess.run(fetches, feed_dict=feed_dict)
losses.append(vals['loss'])
maes.append(vals['mae'])
if writer is not None and 'merged' in vals:
writer.add_summary(vals['merged'], global_step=vals['global_step'])
if return_output:
outputs.append(vals['outputs'])
results = {
'loss': np.mean(losses),
'mae': np.mean(maes)
}
if return_output:
results['outputs'] = outputs
return results
def get_lr(self, sess):
return np.asscalar(sess.run(self._lr))
def set_lr(self, sess, lr):
sess.run(self._lr_update, feed_dict={
self._new_lr: lr
})
@property
def inputs(self):
return self._inputs
@property
def labels(self):
return self._labels
@property
def loss(self):
return self._loss
@property
def lr(self):
return self._lr
@property
def mae(self):
return self._mae
@property
def merged(self):
return self._merged
@property
def outputs(self):
return self._outputs
@property
def train_op(self):
return self._train_op

View File

@ -1,282 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import math
import numpy as np
import os
import sys
import tensorflow as tf
import time
import yaml
from lib import log_helper
from lib import metrics
from lib import tf_utils
from lib import utils
from lib.utils import StandardScaler
from model.tf_model import TFModel
class TFModelSupervisor(object):
"""
Base supervisor for tensorflow models for traffic forecasting.
"""
def __init__(self, config, df_data, **kwargs):
self._config = dict(config)
self._epoch = 0
# logging.
self._init_logging()
self._logger.info(config)
# Data preparation
test_ratio = self._get_config('test_ratio')
validation_ratio = self._get_config('validation_ratio')
self._df_train, self._df_val, self._df_test = utils.train_val_test_split_df(df_data, val_ratio=validation_ratio,
test_ratio=test_ratio)
self._scaler = StandardScaler(mean=self._df_train.values.mean(), std=self._df_train.values.std())
self._x_train, self._y_train, self._x_val, self._y_val, self._x_test, self._y_test = self._prepare_train_val_test_data()
self._eval_dfs = self._prepare_eval_df()
# Build models.
self._train_model, self._val_model, self._test_model = self._build_train_val_test_models()
# Log model statistics.
total_trainable_parameter = tf_utils.get_total_trainable_parameter_size()
self._logger.info('Total number of trainable parameters: %d' % total_trainable_parameter)
for var in tf.global_variables():
self._logger.debug('%s, %s' % (var.name, var.get_shape()))
def _get_config(self, key, use_default=True):
default_config = {
'add_day_in_week': False,
'add_time_in_day': True,
'dropout': 0.,
'batch_size': 64,
'horizon': 12,
'learning_rate': 1e-3,
'lr_decay': 0.1,
'lr_decay_epoch': 50,
'lr_decay_interval': 10,
'max_to_keep': 100,
'min_learning_rate': 2e-6,
'null_val': 0.,
'output_type': 'range',
'patience': 20,
'save_model': 1,
'seq_len': 12,
'test_batch_size': 1,
'test_every_n_epochs': 10,
'test_ratio': 0.2,
'use_cpu_only': False,
'validation_ratio': 0.1,
'verbose': 0,
}
value = self._config.get(key)
if value is None and use_default:
value = default_config.get(key)
return value
def _init_logging(self):
base_dir = self._get_config('base_dir')
log_dir = self._get_config('log_dir')
if log_dir is None:
run_id = self._generate_run_id(self._config)
log_dir = os.path.join(base_dir, run_id)
if not os.path.exists(log_dir):
os.makedirs(log_dir)
else:
run_id = os.path.basename(os.path.normpath(log_dir))
self._log_dir = log_dir
self._logger = log_helper.get_logger(self._log_dir, run_id)
self._writer = tf.summary.FileWriter(self._log_dir)
def train(self, sess, **kwargs):
history = []
min_val_loss = float('inf')
wait = 0
epochs = self._get_config('epochs')
initial_lr = self._get_config('learning_rate')
min_learning_rate = self._get_config('min_learning_rate')
lr_decay_epoch = self._get_config('lr_decay_epoch')
lr_decay = self._get_config('lr_decay')
lr_decay_interval = self._get_config('lr_decay_interval')
patience = self._get_config('patience')
test_every_n_epochs = self._get_config('test_every_n_epochs')
save_model = self._get_config('save_model')
max_to_keep = self._get_config('max_to_keep')
saver = tf.train.Saver(tf.global_variables(), max_to_keep=max_to_keep)
model_filename = self._get_config('model_filename')
if model_filename is not None:
saver.restore(sess, model_filename)
self._train_model.set_lr(sess, self._get_config('learning_rate'))
self._epoch = self._get_config('epoch') + 1
else:
sess.run(tf.global_variables_initializer())
while self._epoch <= epochs:
# Learning rate schedule.
new_lr = self.calculate_scheduled_lr(initial_lr, epoch=self._epoch,
lr_decay=lr_decay, lr_decay_epoch=lr_decay_epoch,
lr_decay_interval=lr_decay_interval,
min_lr=min_learning_rate)
if new_lr != initial_lr:
self._logger.info('Updating learning rate to: %.6f' % new_lr)
self._train_model.set_lr(sess=sess, lr=new_lr)
sys.stdout.flush()
start_time = time.time()
train_results = TFModel.run_epoch(sess, self._train_model,
inputs=self._x_train, labels=self._y_train,
train_op=self._train_model.train_op, writer=self._writer)
train_loss, train_mae = train_results['loss'], train_results['mae']
if train_loss > 1e5:
self._logger.warn('Gradient explosion detected. Ending...')
break
global_step = sess.run(tf.train.get_or_create_global_step())
# Compute validation error.
val_results = TFModel.run_epoch(sess, self._val_model, inputs=self._x_val, labels=self._y_val,
train_op=None)
val_loss, val_mae = val_results['loss'], val_results['mae']
tf_utils.add_simple_summary(self._writer,
['loss/train_loss', 'metric/train_mae', 'loss/val_loss', 'metric/val_mae'],
[train_loss, train_mae, val_loss, val_mae], global_step=global_step)
end_time = time.time()
message = 'Epoch %d (%d) train_loss: %.4f, train_mae: %.4f, val_loss: %.4f, val_mae: %.4f %ds' % (
self._epoch, global_step, train_loss, train_mae, val_loss, val_mae, (end_time - start_time))
self._logger.info(message)
if self._epoch % test_every_n_epochs == test_every_n_epochs - 1:
self.test_and_write_result(sess=sess, global_step=global_step, epoch=self._epoch)
if val_loss <= min_val_loss:
wait = 0
if save_model > 0:
model_filename = self.save_model(sess, saver, val_loss)
self._logger.info(
'Val loss decrease from %.4f to %.4f, saving to %s' % (min_val_loss, val_loss, model_filename))
min_val_loss = val_loss
else:
wait += 1
if wait > patience:
self._logger.warn('Early stopping at epoch: %d' % self._epoch)
break
history.append(val_mae)
# Increases epoch.
self._epoch += 1
sys.stdout.flush()
return np.min(history)
@staticmethod
def calculate_scheduled_lr(initial_lr, epoch, lr_decay, lr_decay_epoch, lr_decay_interval,
min_lr=1e-6):
decay_factor = int(math.ceil((epoch - lr_decay_epoch) / float(lr_decay_interval)))
new_lr = initial_lr * lr_decay ** max(0, decay_factor)
new_lr = max(min_lr, new_lr)
return new_lr
@staticmethod
def _generate_run_id(config):
raise NotImplementedError
@staticmethod
def _get_config_filename(epoch):
return 'config_%02d.yaml' % epoch
def restore(self, sess, config):
"""
Restore from saved model.
:param sess:
:param config:
:return:
"""
model_filename = config['model_filename']
max_to_keep = self._get_config('max_to_keep')
saver = tf.train.Saver(tf.global_variables(), max_to_keep=max_to_keep)
saver.restore(sess, model_filename)
def save_model(self, sess, saver, val_loss):
config_filename = TFModelSupervisor._get_config_filename(self._epoch)
config = dict(self._config)
global_step = sess.run(tf.train.get_or_create_global_step())
config['epoch'] = self._epoch
config['global_step'] = global_step
config['log_dir'] = self._log_dir
config['model_filename'] = saver.save(sess, os.path.join(self._log_dir, 'models-%.4f' % val_loss),
global_step=global_step, write_meta_graph=False)
with open(os.path.join(self._log_dir, config_filename), 'w') as f:
yaml.dump(config, f)
return config['model_filename']
def test_and_write_result(self, sess, global_step, **kwargs):
null_val = self._config.get('null_val')
start_time = time.time()
test_results = TFModel.run_epoch(sess, self._test_model, self._x_test, self._y_test, return_output=True,
train_op=None)
# y_preds: a list of (batch_size, horizon, num_nodes, output_dim)
test_loss, y_preds = test_results['loss'], test_results['outputs']
tf_utils.add_simple_summary(self._writer, ['loss/test_loss'], [test_loss], global_step=global_step)
# Reshapes to (batch_size, epoch_size, horizon, num_node)
df_preds = self._convert_model_outputs_to_eval_df(y_preds)
for horizon_i in df_preds:
df_pred = df_preds[horizon_i]
df_test = self._eval_dfs[horizon_i]
mae, mape, rmse = metrics.calculate_metrics(df_pred, df_test, null_val)
tf_utils.add_simple_summary(self._writer,
['%s_%d' % (item, horizon_i + 1) for item in
['metric/rmse', 'metric/mape', 'metric/mae']],
[rmse, mape, mae],
global_step=global_step)
end_time = time.time()
message = 'Horizon %d, mape:%.4f, rmse:%.4f, mae:%.4f, %ds' % (
horizon_i + 1, mape, rmse, mae, end_time - start_time)
self._logger.info(message)
start_time = end_time
return df_preds
def _prepare_train_val_test_data(self):
"""
Prepare data for train, val and test.
:return:
"""
raise NotImplementedError
def _prepare_eval_df(self):
horizon = self._get_config('horizon')
seq_len = self._get_config('seq_len')
# y_test: (epoch_size, batch_size, ...)
n_test_samples = np.prod(self._y_test.shape[:2])
eval_dfs = {}
for horizon_i in range(horizon):
eval_dfs[horizon_i] = self._df_test[seq_len + horizon_i: seq_len + horizon_i + n_test_samples]
return eval_dfs
def _build_train_val_test_models(self):
"""
Buids models for train, val and test.
:return:
"""
raise NotImplementedError
def _convert_model_outputs_to_eval_df(self, y_preds):
"""
Convert the outputs to a dict, with key: horizon, value: the corresponding dataframe.
:param y_preds:
:return:
"""
raise NotImplementedError
@property
def log_dir(self):
return self._log_dir

View File

@ -1,37 +1,28 @@
import argparse
import os
import pandas as pd
import sys
import tensorflow as tf
import yaml
from lib.dcrnn_utils import load_graph_data
from lib.utils import load_graph_data
from model.dcrnn_supervisor import DCRNNSupervisor
flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_bool('use_cpu_only', False, 'Whether to run tensorflow on cpu.')
def run_dcrnn(traffic_reading_df):
# run_id = 'dcrnn_DR_2_h_12_64-64_lr_0.01_bs_64_d_0.00_sl_12_MAE_1207002222'
run_id = 'dcrnn_DR_2_h_12_64-64_lr_0.01_bs_64_d_0.00_sl_12_MAE_0606021843'
log_dir = os.path.join('data/model', run_id)
config_filename = 'config_75.yaml'
def run_dcrnn(args):
graph_pkl_filename = 'data/sensor_graph/adj_mx.pkl'
with open(os.path.join(log_dir, config_filename)) as f:
with open(args.config_filename) as f:
config = yaml.load(f)
tf_config = tf.ConfigProto()
if FLAGS.use_cpu_only:
if args.use_cpu_only:
tf_config = tf.ConfigProto(device_count={'GPU': 0})
tf_config.gpu_options.allow_growth = True
_, _, adj_mx = load_graph_data(graph_pkl_filename)
with tf.Session(config=tf_config) as sess:
supervisor = DCRNNSupervisor(traffic_reading_df, config=config, adj_mx=adj_mx)
supervisor = DCRNNSupervisor(adj_mx=adj_mx, **config)
supervisor.restore(sess, config=config)
df_preds = supervisor.test_and_write_result(sess, config['global_step'])
# TODO (yaguang): save this file to the npz file.
for horizon_i in df_preds:
df_pred = df_preds[horizon_i]
filename = os.path.join('data/results/', 'dcrnn_prediction_%d.h5' % (horizon_i + 1))
@ -41,7 +32,10 @@ def run_dcrnn(traffic_reading_df):
if __name__ == '__main__':
sys.path.append(os.getcwd())
traffic_df_filename = 'data/df_highway_2012_4mon_sample.h5'
traffic_reading_df = pd.read_hdf(traffic_df_filename)
run_dcrnn(traffic_reading_df)
# run_fc_lstm(traffic_reading_df)
parser = argparse.ArgumentParser()
parser.add_argument('--traffic_df_filename', default='data/df_highway_2012_4mon_sample.h5',
type=str, help='Traffic data file.')
parser.add_argument('--use_cpu_only', default=False, type=str, help='Whether to run tensorflow on cpu.')
parser.add_argument('--config_filename', default=None, type=str, help='Config file for pretrained model.')
args = parser.parse_args()
run_dcrnn(args)

0
scripts/__init__.py Normal file
View File

View File

@ -2,21 +2,10 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import numpy as np
import pandas as pd
import pickle
import tensorflow as tf
flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_string('sensor_ids_filename', 'data/sensor_graph/graph_sensor_ids.txt',
'File containing sensor ids separated by comma.')
flags.DEFINE_string('distances_filename', 'data/sensor_graph/distances_la_2012.csv',
'CSV file containing sensor distances with three columns: [from, to, distance].')
flags.DEFINE_float('normalized_k', 0.1, 'Entries that become lower than normalized_k after normalization '
'are set to zero for sparsity.')
flags.DEFINE_string('output_pkl_filename', 'data/sensor_graph/adj_mat.pkl', 'Path of the output file.')
def get_adjacency_matrix(distance_df, sensor_ids, normalized_k=0.1):
@ -54,10 +43,21 @@ def get_adjacency_matrix(distance_df, sensor_ids, normalized_k=0.1):
if __name__ == '__main__':
with open(FLAGS.sensor_ids_filename) as f:
parser = argparse.ArgumentParser()
parser.add_argument('--sensor_ids_filename', type=str, default='data/sensor_graph/graph_sensor_ids.txt',
help='File containing sensor ids separated by comma.')
parser.add_argument('--distances_filename', type=str, default='data/sensor_graph/distances_la_2012.csv',
help='CSV file containing sensor distances with three columns: [from, to, distance].')
parser.add_argument('--normalized_k', type=float, default=0.1,
help='Entries that become lower than normalized_k after normalization are set to zero for sparsity.')
parser.add_argument('--output_pkl_filename', type=str, default='data/sensor_graph/adj_mat.pkl',
help='Path of the output file.')
args = parser.parse_args()
with open(args.sensor_ids_filename) as f:
sensor_ids = f.read().strip().split(',')
distance_df = pd.read_csv(FLAGS.distances_filename, dtype={'from': 'str', 'to': 'str'})
distance_df = pd.read_csv(args.distances_filename, dtype={'from': 'str', 'to': 'str'})
_, sensor_id_to_ind, adj_mx = get_adjacency_matrix(distance_df, sensor_ids)
# Save to pickle file.
with open(FLAGS.output_pkl_filename, 'wb') as f:
with open(args.output_pkl_filename, 'wb') as f:
pickle.dump([sensor_ids, sensor_id_to_ind, adj_mx], f, protocol=2)

View File

@ -0,0 +1,125 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import argparse
import numpy as np
import os
import pandas as pd
from lib.utils import DataLoader, StandardScaler
def generate_graph_seq2seq_io_data(
df, x_offsets, y_offsets, add_time_in_day=True, add_day_in_week=False, scaler=None
):
"""
Generate samples from
:param df:
:param x_offsets:
:param y_offsets:
:param add_time_in_day:
:param add_day_in_week:
:param scaler:
:return:
# x: (epoch_size, input_length, num_nodes, input_dim)
# y: (epoch_size, output_length, num_nodes, output_dim)
"""
num_samples, num_nodes = df.shape
data = np.expand_dims(df.values, axis=-1)
data_list = [data]
if add_time_in_day:
time_ind = (df.index.values - df.index.values.astype("datetime64[D]")) / np.timedelta64(1, "D")
time_in_day = np.tile(time_ind, [1, num_nodes, 1]).transpose((2, 1, 0))
data_list.append(time_in_day)
if add_day_in_week:
day_in_week = np.zeros(shape=(num_samples, num_nodes, 7))
day_in_week[np.arange(num_samples), :, df.index.dayofweek] = 1
data_list.append(day_in_week)
data = np.concatenate(data_list, axis=-1)
# epoch_len = num_samples + min(x_offsets) - max(y_offsets)
x, y = [], []
# t is the index of the last observation.
min_t = abs(min(x_offsets))
max_t = abs(num_samples - abs(max(y_offsets))) # Exclusive
for t in range(min_t, max_t):
x_t = data[t + x_offsets, ...]
y_t = data[t + y_offsets, ...]
x.append(x_t)
y.append(y_t)
x = np.stack(x, axis=0)
y = np.stack(y, axis=0)
return x, y
def generate_train_val_test(args):
df = pd.read_hdf(args.traffic_df_filename)
# 0 is the latest observed sample.
x_offsets = np.sort(
# np.concatenate(([-week_size + 1, -day_size + 1], np.arange(-11, 1, 1)))
np.concatenate((np.arange(-11, 1, 1),))
)
# Predict the next one hour
y_offsets = np.sort(np.arange(1, 13, 1))
# x: (num_samples, input_length, num_nodes, input_dim)
# y: (num_samples, output_length, num_nodes, output_dim)
x, y = generate_graph_seq2seq_io_data(
df,
x_offsets=x_offsets,
y_offsets=y_offsets,
add_time_in_day=True,
add_day_in_week=False,
)
print("x shape: ", x.shape, ", y shape: ", y.shape)
# Write the data into npz file.
# num_test = 6831, using the last 6831 examples as testing.
# for the rest: 7/8 is used for training, and 1/8 is used for validation.
num_samples = x.shape[0]
num_test = round(num_samples * 0.2)
num_train = round(num_samples * 0.7)
num_val = num_samples - num_test - num_train
# train
x_train, y_train = x[:num_train], y[:num_train]
# val
x_val, y_val = (
x[num_train: num_train + num_val],
y[num_train: num_train + num_val],
)
# test
x_test, y_test = x[-num_test:], y[-num_test:]
for cat in ["train", "val", "test"]:
_x, _y = locals()["x_" + cat], locals()["y_" + cat]
print(cat, "x: ", _x.shape, "y:", _y.shape)
np.savez_compressed(
os.path.join(args.output_dir, "%s.npz" % cat),
x=_x,
y=_y,
x_offsets=x_offsets.reshape(list(x_offsets.shape) + [1]),
y_offsets=y_offsets.reshape(list(y_offsets.shape) + [1]),
)
def main(args):
print("Generating training data")
generate_train_val_test(args)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--output_dir", type=str, default="data/", help="Output directory."
)
parser.add_argument(
"--traffic_df_filename",
type=str,
default="data/df_highway_2012_4mon_sample.h5",
help="Raw traffic readings.",
)
args = parser.parse_args()
main(args)