Initial commit.

This commit is contained in:
Yaguang 2017-12-07 17:35:50 -08:00
commit c7535e5443
44 changed files with 299833 additions and 0 deletions

103
.gitignore vendored Normal file
View File

@ -0,0 +1,103 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# dotenv
.env
# virtualenv
.venv
venv/
ENV/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
# pycharm
.idea/

21
LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2017 Yaguang Li
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

63
README.md Normal file
View File

@ -0,0 +1,63 @@
# Diffusion Convolutional Recurrent Neural Network: Data-Driven Traffic Forecasting
![Diffusion Convolutional Recurrent Neural Network](figures/model_architecture.jpg "Model Architecture")
This is a TensorFlow implementation of Diffusion Convolutional Recurrent Neural Network in the following paper: \
Yaguang Li, Rose Yu, Cyrus Shahabi, Yan Liu, [Diffusion Convolutional Recurrent Neural Network: Data-Driven Traffic Forecasting](https://arxiv.org/abs/1707.01926).
## Requirements
- hyperopt>=0.1
- scipy>=0.19.0
- numpy>=1.12.1
- pandas==0.19.2
- tensorflow>=1.3.0
- peewee>=2.8.8
- python 2.7
Dependency can be installed using the following command:
```bash
pip install -r requirements.txt
```
## Traffic Data
The traffic data file for Los Angeles is available [here](https://drive.google.com/open?id=1tjf5aXCgUoimvADyxKqb-YUlxP8O46pb), and should be
put into the `data/` folder.
## Graph Construction
As the currently implementation is based on pre-calculated road network distances between sensors, it currently only
supports sensor ids in Los Angeles (see `data/sensor_graph/sensor_info_201206.csv`).
```bash
python gen_adj_mx.py --sensor_ids_filename=data/sensor_graph/graph_sensor_ids.txt --normalized_k=0.1\
--output_pkl_filename=data/sensor_graph/adj_mx.pkl
```
## Train the Model
```bash
python dcrnn_seq2seq_train.py --config_filename=data/model/dcrnn_config.json
```
## Run the Pre-trained Model
```bash
python run_demo.py
```
The generated prediction of DCRNN is in `data/results/dcrnn_predictions_[1-12].h5`.
More details are being added ...
## Citation
If you find this repository useful in your research, please cite the following paper:
```
@article{li2017dcrnn_traffic,
title={Diffusion Convolutional Recurrent Neural Network: Data-Driven Traffic Forecasting},
author={Li, Yaguang and Yu, Rose and Shahabi, Cyrus and Liu, Yan},
journal={arXiv preprint arXiv:1707.01926},
year={2017}
}
```

View File

@ -0,0 +1,66 @@
model_checkpoint_path: "models-1.6253-35451"
all_model_checkpoint_paths: "models-2.9323-351"
all_model_checkpoint_paths: "models-2.2916-702"
all_model_checkpoint_paths: "models-2.1618-1404"
all_model_checkpoint_paths: "models-2.1094-1755"
all_model_checkpoint_paths: "models-2.0356-2106"
all_model_checkpoint_paths: "models-2.0139-2808"
all_model_checkpoint_paths: "models-1.9127-3159"
all_model_checkpoint_paths: "models-1.8968-4914"
all_model_checkpoint_paths: "models-1.8671-5265"
all_model_checkpoint_paths: "models-1.8386-7371"
all_model_checkpoint_paths: "models-1.7334-7722"
all_model_checkpoint_paths: "models-1.7301-8073"
all_model_checkpoint_paths: "models-1.7291-8424"
all_model_checkpoint_paths: "models-1.7214-8775"
all_model_checkpoint_paths: "models-1.7164-9477"
all_model_checkpoint_paths: "models-1.7163-10530"
all_model_checkpoint_paths: "models-1.6611-11232"
all_model_checkpoint_paths: "models-1.6586-11583"
all_model_checkpoint_paths: "models-1.6576-11934"
all_model_checkpoint_paths: "models-1.6554-12636"
all_model_checkpoint_paths: "models-1.6552-13338"
all_model_checkpoint_paths: "models-1.6540-13689"
all_model_checkpoint_paths: "models-1.6526-14391"
all_model_checkpoint_paths: "models-1.6425-14742"
all_model_checkpoint_paths: "models-1.6415-15444"
all_model_checkpoint_paths: "models-1.6385-15795"
all_model_checkpoint_paths: "models-1.6377-16497"
all_model_checkpoint_paths: "models-1.6358-16848"
all_model_checkpoint_paths: "models-1.6358-17901"
all_model_checkpoint_paths: "models-1.6284-18252"
all_model_checkpoint_paths: "models-1.6284-18603"
all_model_checkpoint_paths: "models-1.6282-18954"
all_model_checkpoint_paths: "models-1.6281-19305"
all_model_checkpoint_paths: "models-1.6275-19656"
all_model_checkpoint_paths: "models-1.6273-20007"
all_model_checkpoint_paths: "models-1.6273-20709"
all_model_checkpoint_paths: "models-1.6271-21060"
all_model_checkpoint_paths: "models-1.6266-21411"
all_model_checkpoint_paths: "models-1.6265-21762"
all_model_checkpoint_paths: "models-1.6263-23166"
all_model_checkpoint_paths: "models-1.6262-24570"
all_model_checkpoint_paths: "models-1.6261-24921"
all_model_checkpoint_paths: "models-1.6259-25974"
all_model_checkpoint_paths: "models-1.6259-27378"
all_model_checkpoint_paths: "models-1.6259-27729"
all_model_checkpoint_paths: "models-1.6259-28080"
all_model_checkpoint_paths: "models-1.6259-28431"
all_model_checkpoint_paths: "models-1.6258-28782"
all_model_checkpoint_paths: "models-1.6258-29484"
all_model_checkpoint_paths: "models-1.6258-29835"
all_model_checkpoint_paths: "models-1.6257-30186"
all_model_checkpoint_paths: "models-1.6257-30537"
all_model_checkpoint_paths: "models-1.6257-30888"
all_model_checkpoint_paths: "models-1.6256-31239"
all_model_checkpoint_paths: "models-1.6256-31941"
all_model_checkpoint_paths: "models-1.6255-32292"
all_model_checkpoint_paths: "models-1.6255-32643"
all_model_checkpoint_paths: "models-1.6255-32994"
all_model_checkpoint_paths: "models-1.6254-33345"
all_model_checkpoint_paths: "models-1.6254-33696"
all_model_checkpoint_paths: "models-1.6254-34047"
all_model_checkpoint_paths: "models-1.6253-34398"
all_model_checkpoint_paths: "models-1.6253-34749"
all_model_checkpoint_paths: "models-1.6253-35100"
all_model_checkpoint_paths: "models-1.6253-35451"

View File

@ -0,0 +1 @@
{"verbose": 0, "num_rnn_layers": 2, "min_learning_rate": 2e-06, "epochs": 100, "patience": 50, "test_ratio": 0.2, "cl_decay_steps": 2000, "write_db": false, "epoch": 100, "max_diffusion_step": 2, "lr_decay_epoch": 20, "dropout": 0.0, "log_dir": "data/model/dcrnn_DR_2_h_12_64-64_lr_0.01_bs_64_d_0.00_sl_12_MAE_1207002222/", "validation_ratio": 0.1, "data_type": "ALL", "learning_rate": 0.01, "batch_size": 64, "filter_type": "dual_random_walk", "graph_pkl_filename": "data/sensor_graph/adj_mx.pkl", "max_grad_norm": 5.0, "model_filename": "data/model/dcrnn_DR_2_h_12_64-64_lr_0.01_bs_64_d_0.00_sl_12_MAE_1207002222/models-1.6253-35451", "global_step": 35451, "use_cpu_only": false, "l1_decay": 0.0, "loss_func": "MAE", "lr_decay": 0.1, "lr_decay_interval": 10, "test_every_n_epochs": 10, "horizon": 12, "null_val": 0.0, "use_curriculum_learning": true, "seq_len": 12, "base_dir": "data/model", "rnn_units": 64}

View File

@ -0,0 +1,34 @@
{
"verbose": 0,
"num_rnn_layers": 2,
"epochs": 100,
"patience": 50,
"test_ratio": 0.2,
"cl_decay_steps": 2000,
"graph_pkl_filename": "data/sensor_graph/adj_mx.pkl",
"global_step": 0,
"max_diffusion_step": 2,
"epoch": 0,
"lr_decay_epoch": 20,
"learning_rate": 0.01,
"validation_ratio": 0.1,
"data_type": "ALL",
"dropout": 0.0,
"batch_size": 64,
"max_grad_norm": 5.0,
"min_learning_rate": 2e-06,
"use_cpu_only": false,
"l1_decay": 0.0,
"loss_func": "MAE",
"write_db": false,
"lr_decay": 0.1,
"lr_decay_interval": 10,
"test_every_n_epochs": 10,
"horizon": 12,
"null_val": 0.0,
"use_curriculum_learning": true,
"seq_len": 12,
"rnn_units": 64,
"base_dir": "data/model",
"filter_type": "dual_random_walk"
}

View File

@ -0,0 +1,35 @@
{
"verbose": 0,
"num_rnn_layers": 2,
"epochs": 100,
"patience": 50,
"test_ratio": 0.2,
"method_type": "GCRNN",
"cl_decay_steps": 2000,
"graph_pkl_filename": "data/sensor_graph/adj_mx.pkl",
"global_step": 0,
"max_diffusion_step": 2,
"epoch": 0,
"lr_decay_epoch": 20,
"learning_rate": 0.01,
"validation_ratio": 0.1,
"data_type": "ALL",
"dropout": 0.0,
"batch_size": 64,
"max_grad_norm": 5.0,
"min_learning_rate": 2e-06,
"use_cpu_only": false,
"l1_decay": 0.0,
"loss_func": "MAE",
"write_db": false,
"lr_decay": 0.1,
"lr_decay_interval": 10,
"test_every_n_epochs": 10,
"horizon": 3,
"null_val": 0.0,
"use_curriculum_learning": true,
"seq_len": 3,
"rnn_units": 16,
"base_dir": "data/model",
"filter_type": "random_walk"
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

87
dcrnn_train.py Normal file
View File

@ -0,0 +1,87 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
import pandas as pd
import tensorflow as tf
from lib import log_helper
from lib.dcrnn_utils import load_graph_data
from model.dcrnn_supervisor import DCRNNSupervisor
# flags
flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_integer('batch_size', -1, 'Batch size')
flags.DEFINE_integer('cl_decay_steps', -1,
'Parameter to control the decay speed of probability of feeding groundth instead of model output.')
flags.DEFINE_string('config_filename', None, 'Configuration filename for restoring the model.')
flags.DEFINE_integer('epochs', -1, 'Maximum number of epochs to train.')
flags.DEFINE_string('filter_type', None, 'laplacian/random_walk/dual_random_walk.')
flags.DEFINE_string('graph_pkl_filename', 'data/sensor_graph/adj_mx.pkl',
'Pickle file containing: sensor_ids, sensor_id_to_ind_map, dist_matrix')
flags.DEFINE_integer('horizon', -1, 'Maximum number of timestamps to prediction.')
flags.DEFINE_float('l1_decay', -1.0, 'L1 Regularization')
flags.DEFINE_float('lr_decay', -1.0, 'Learning rate decay.')
flags.DEFINE_integer('lr_decay_epoch', -1, 'The epoch that starting decaying the parameter.')
flags.DEFINE_integer('lr_decay_interval', -1, 'Interval beteween each deacy.')
flags.DEFINE_float('learning_rate', -1, 'Learning rate. -1: select by hyperopt tuning.')
flags.DEFINE_string('log_dir', None, 'Log directory for restoring the model from a checkpoint.')
flags.DEFINE_string('loss_func', None, 'MSE/MAPE/RMSE_MAPE: loss function.')
flags.DEFINE_float('min_learning_rate', -1, 'Minimum learning rate')
flags.DEFINE_integer('nb_weeks', 17, 'How many week\'s data should be used for train/test.')
flags.DEFINE_integer('patience', -1,
'Maximum number of epochs allowed for non-improving validation error before early stopping.')
flags.DEFINE_integer('seq_len', -1, 'Sequence length.')
flags.DEFINE_integer('test_every_n_epochs', -1, 'Run model on the testing dataset every n epochs.')
flags.DEFINE_string('traffic_df_filename', 'data/df_highway_2012_4mon_sample.h5',
'Path to hdf5 pandas.DataFrame.')
flags.DEFINE_bool('use_cpu_only', False, 'Set to true to only use cpu.')
flags.DEFINE_bool('use_curriculum_learning', None, 'Set to true to use Curriculum learning in decoding stage.')
flags.DEFINE_integer('verbose', -1, '1: to log individual sensor information.')
def main():
# Reads graph data.
with open(FLAGS.config_filename) as f:
supervisor_config = json.load(f)
logger = log_helper.get_logger(supervisor_config.get('base_dir'), 'info.log')
logger.info('Loading graph from: ' + FLAGS.graph_pkl_filename)
sensor_ids, sensor_id_to_ind, adj_mx = load_graph_data(FLAGS.graph_pkl_filename)
adj_mx[adj_mx < 0.1] = 0
logger.info('Loading traffic data from: ' + FLAGS.traffic_df_filename)
traffic_df_filename = FLAGS.traffic_df_filename
traffic_reading_df = pd.read_hdf(traffic_df_filename)
traffic_reading_df = traffic_reading_df.ix[:, sensor_ids]
supervisor_config['use_cpu_only'] = FLAGS.use_cpu_only
if FLAGS.log_dir:
supervisor_config['log_dir'] = FLAGS.log_dir
if FLAGS.use_curriculum_learning is not None:
supervisor_config['use_curriculum_learning'] = FLAGS.use_curriculum_learning
if FLAGS.loss_func:
supervisor_config['loss_func'] = FLAGS.loss_func
if FLAGS.filter_type:
supervisor_config['filter_type'] = FLAGS.filter_type
# Overwrites space with specified parameters.
for name in ['batch_size', 'cl_decay_steps', 'epochs', 'horizon', 'learning_rate', 'l1_decay',
'lr_decay', 'lr_decay_epoch', 'lr_decay_interval', 'learning_rate', 'min_learning_rate',
'patience', 'seq_len', 'test_every_n_epochs', 'verbose']:
if getattr(FLAGS, name) >= 0:
supervisor_config[name] = getattr(FLAGS, name)
tf_config = tf.ConfigProto()
if FLAGS.use_cpu_only:
tf_config = tf.ConfigProto(device_count={'GPU': 0})
tf_config.gpu_options.allow_growth = True
with tf.Session(config=tf_config) as sess:
supervisor = DCRNNSupervisor(traffic_reading_df=traffic_reading_df, adj_mx=adj_mx,
config=supervisor_config)
supervisor.train(sess=sess)
if __name__ == '__main__':
main()

Binary file not shown.

After

Width:  |  Height:  |  Size: 752 KiB

63
gen_adj_mx.py Normal file
View File

@ -0,0 +1,63 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import pandas as pd
import pickle
import tensorflow as tf
flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_string('sensor_ids_filename', 'data/sensor_graph/graph_sensor_ids.txt',
'File containing sensor ids separated by comma.')
flags.DEFINE_string('distances_filename', 'data/sensor_graph/distances_la_2012.csv',
'CSV file containing sensor distances with three columns: [from, to, distance].')
flags.DEFINE_float('normalized_k', 0.1, 'Entries that become lower than normalized_k after normalization '
'are set to zero for sparsity.')
flags.DEFINE_string('output_pkl_filename', 'data/sensor_graph/adj_mat.pkl', 'Path of the output file.')
def get_adjacency_matrix(distance_df, sensor_ids, normalized_k=0.1):
"""
:param distance_df: data frame with three columns: [from, to, distance].
:param sensor_ids: list of sensor ids.
:param normalized_k: entries that become lower than normalized_k after normalization are set to zero for sparsity.
:return:
"""
num_sensors = len(sensor_ids)
dist_mx = np.zeros((num_sensors, num_sensors), dtype=np.float32)
dist_mx[:] = np.inf
# Builds sensor id to index map.
sensor_id_to_ind = {}
for i, sensor_id in enumerate(sensor_ids):
sensor_id_to_ind[sensor_id] = i
# Fills cells in the matrix with distances.
for row in distance_df.values:
if row[0] not in sensor_id_to_ind or row[1] not in sensor_id_to_ind:
continue
dist_mx[sensor_id_to_ind[row[0]], sensor_id_to_ind[row[1]]] = row[2]
# Calculates the standard deviation as theta.
distances = dist_mx[~np.isinf(dist_mx)].flatten()
std = distances.std()
adj_mx = np.exp(-np.square(dist_mx / std))
# Make the adjacent matrix symmetric by taking the max.
# adj_mx = np.maximum.reduce([adj_mx, adj_mx.T])
# Sets entries that lower than a threshold, i.e., k, to zero for sparsity.
adj_mx[adj_mx < normalized_k] = 0
return sensor_ids, sensor_id_to_ind, adj_mx
if __name__ == '__main__':
with open(FLAGS.sensor_ids_filename) as f:
sensor_ids = f.read().strip().split(',')
distance_df = pd.read_csv(FLAGS.distances_filename, dtype={'from': 'str', 'to': 'str'})
_, sensor_id_to_ind, adj_mx = get_adjacency_matrix(distance_df, sensor_ids)
# Save to pickle file.
with open(FLAGS.output_pkl_filename, 'w') as f:
pickle.dump([sensor_ids, sensor_id_to_ind, adj_mx], f)

0
lib/__init__.py Normal file
View File

56
lib/dcrnn_utils.py Normal file
View File

@ -0,0 +1,56 @@
import pickle
import numpy as np
import scipy.sparse as sp
from scipy.sparse import linalg
from lib.tf_utils import sparse_matrix_to_tf_sparse_tensor
def load_graph_data(pkl_filename):
with open(pkl_filename) as f:
sensor_ids, sensor_id_to_ind, adj_mx = pickle.load(f)
return sensor_ids, sensor_id_to_ind, adj_mx
def calculate_normalized_laplacian(adj):
"""
# L = D^-1/2 (D-A) D^-1/2 = I - D^-1/2 A D^-1/2
# D = diag(A 1)
:param adj:
:return:
"""
adj = sp.coo_matrix(adj)
d = np.array(adj.sum(1))
d_inv_sqrt = np.power(d, -0.5).flatten()
d_inv_sqrt[np.isinf(d_inv_sqrt)] = 0.
d_mat_inv_sqrt = sp.diags(d_inv_sqrt)
normalized_laplacian = sp.eye(adj.shape[0]) - adj.dot(d_mat_inv_sqrt).transpose().dot(d_mat_inv_sqrt).tocoo()
return normalized_laplacian
def calculate_random_walk_matrix(adj_mx):
adj_mx = sp.coo_matrix(adj_mx)
d = np.array(adj_mx.sum(1))
d_inv = np.power(d, -1).flatten()
d_inv[np.isinf(d_inv)] = 0.
d_mat_inv = sp.diags(d_inv)
random_walk_mx = d_mat_inv.dot(adj_mx).tocoo()
return random_walk_mx
def calculate_reverse_random_walk_matrix(adj_mx):
return calculate_random_walk_matrix(np.transpose(adj_mx))
def calculate_scaled_laplacian(adj_mx, lambda_max=2, undirected=True):
if undirected:
adj_mx = np.maximum.reduce([adj_mx, adj_mx.T])
L = calculate_normalized_laplacian(adj_mx)
if lambda_max is None:
lambda_max, _ = linalg.eigsh(L, 1, which='LM')
lambda_max = lambda_max[0]
L = sp.csr_matrix(L)
M, _ = L.shape
I = sp.identity(M, format='csr', dtype=L.dtype)
L = (2 / lambda_max * L) - I
return L.astype(np.float32)

26
lib/log_helper.py Normal file
View File

@ -0,0 +1,26 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import sys
import logging
import os
def get_logger(log_dir, name):
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
# Add file handler and stdout handler
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler = logging.FileHandler(os.path.join(log_dir, 'info.log'))
file_handler.setFormatter(formatter)
# Add console handler.
console_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(console_formatter)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# Add google cloud log handler
logger.info('Log directory: %s', log_dir)
return logger

144
lib/metrics.py Normal file
View File

@ -0,0 +1,144 @@
import numpy as np
import tensorflow as tf
def masked_mse_tf(preds, labels, null_val=np.nan):
"""
Accuracy with masking.
:param preds:
:param labels:
:param null_val:
:return:
"""
if np.isnan(null_val):
mask = ~tf.is_nan(labels)
else:
mask = tf.not_equal(labels, null_val)
mask = tf.cast(mask, tf.float32)
mask /= tf.reduce_mean(mask)
mask = tf.where(tf.is_nan(mask), tf.zeros_like(mask), mask)
loss = tf.square(tf.subtract(preds, labels))
loss = loss * mask
loss = tf.where(tf.is_nan(loss), tf.zeros_like(loss), loss)
return tf.reduce_mean(loss)
def masked_mae_tf(preds, labels, null_val=np.nan):
"""
Accuracy with masking.
:param preds:
:param labels:
:param null_val:
:return:
"""
if np.isnan(null_val):
mask = ~tf.is_nan(labels)
else:
mask = tf.not_equal(labels, null_val)
mask = tf.cast(mask, tf.float32)
mask /= tf.reduce_mean(mask)
mask = tf.where(tf.is_nan(mask), tf.zeros_like(mask), mask)
loss = tf.abs(tf.subtract(preds, labels))
loss = loss * mask
loss = tf.where(tf.is_nan(loss), tf.zeros_like(loss), loss)
return tf.reduce_mean(loss)
def masked_rmse_tf(preds, labels, null_val=np.nan):
"""
Accuracy with masking.
:param preds:
:param labels:
:param null_val:
:return:
"""
return tf.sqrt(masked_mse_tf(preds=preds, labels=labels, null_val=null_val))
def masked_rmse_np(preds, labels, null_val=np.nan):
return np.sqrt(masked_mse_np(preds=preds, labels=labels, null_val=null_val))
def masked_mse_np(preds, labels, null_val=np.nan):
with np.errstate(divide='ignore', invalid='ignore'):
if np.isnan(null_val):
mask = ~np.isnan(labels)
else:
mask = np.not_equal(labels, null_val)
mask = mask.astype('float32')
mask /= np.mean(mask)
rmse = np.square(np.subtract(preds, labels)).astype('float32')
rmse = np.nan_to_num(rmse * mask)
return np.mean(rmse)
def masked_mae_np(preds, labels, null_val=np.nan):
with np.errstate(divide='ignore', invalid='ignore'):
if np.isnan(null_val):
mask = ~np.isnan(labels)
else:
mask = np.not_equal(labels, null_val)
mask = mask.astype('float32')
mask /= np.mean(mask)
mae = np.abs(np.subtract(preds, labels)).astype('float32')
mae = np.nan_to_num(mae * mask)
return np.mean(mae)
def masked_mape_np(preds, labels, null_val=np.nan):
with np.errstate(divide='ignore', invalid='ignore'):
if np.isnan(null_val):
mask = ~np.isnan(labels)
else:
mask = np.not_equal(labels, null_val)
mask = mask.astype('float32')
mask /= np.mean(mask)
mape = np.abs(np.divide(np.subtract(preds, labels).astype('float32'), labels))
mape = np.nan_to_num(mask * mape)
return np.mean(mape)
# Builds loss function.
def masked_mse_loss(scaler, null_val):
def loss(preds, labels):
if scaler:
preds = scaler.inverse_transform(preds)
labels = scaler.inverse_transform(labels)
return masked_mse_tf(preds=preds, labels=labels, null_val=null_val)
return loss
def masked_rmse_loss(scaler, null_val):
def loss(preds, labels):
if scaler:
preds = scaler.inverse_transform(preds)
labels = scaler.inverse_transform(labels)
return masked_rmse_tf(preds=preds, labels=labels, null_val=null_val)
return loss
def masked_mae_loss(scaler, null_val):
def loss(preds, labels):
if scaler:
preds = scaler.inverse_transform(preds)
labels = scaler.inverse_transform(labels)
mae = masked_mae_tf(preds=preds, labels=labels, null_val=null_val)
return mae
return loss
def calculate_metrics(df_pred, df_test, null_val):
"""
Calculate the MAE, MAPE, RMSE
:param df_pred:
:param df_test:
:param null_val:
:return:
"""
mape = masked_mape_np(preds=df_pred.as_matrix(), labels=df_test.as_matrix(), null_val=null_val)
mae = masked_mae_np(preds=df_pred.as_matrix(), labels=df_test.as_matrix(), null_val=null_val)
rmse = masked_rmse_np(preds=df_pred.as_matrix(), labels=df_test.as_matrix(), null_val=null_val)
return mae, mape, rmse

198
lib/metrics_test.py Normal file
View File

@ -0,0 +1,198 @@
import unittest
import numpy as np
import tensorflow as tf
from lib import metrics
class MyTestCase(unittest.TestCase):
def test_masked_mape_np(self):
preds = np.array([
[1, 2, 2],
[3, 4, 5],
], dtype=np.float32)
labels = np.array([
[1, 2, 2],
[3, 4, 4]
], dtype=np.float32)
mape = metrics.masked_mape_np(preds=preds, labels=labels)
self.assertAlmostEqual(1 / 24.0, mape, delta=1e-5)
def test_masked_mape_np2(self):
preds = np.array([
[1, 2, 2],
[3, 4, 5],
], dtype=np.float32)
labels = np.array([
[1, 2, 2],
[3, 4, 4]
], dtype=np.float32)
mape = metrics.masked_mape_np(preds=preds, labels=labels, null_val=4)
self.assertEqual(0., mape)
def test_masked_mape_np_all_zero(self):
preds = np.array([
[1, 2],
[3, 4],
], dtype=np.float32)
labels = np.array([
[0, 0],
[0, 0]
], dtype=np.float32)
mape = metrics.masked_mape_np(preds=preds, labels=labels, null_val=0)
self.assertEqual(0., mape)
def test_masked_mape_np_all_nan(self):
preds = np.array([
[1, 2],
[3, 4],
], dtype=np.float32)
labels = np.array([
[np.nan, np.nan],
[np.nan, np.nan]
], dtype=np.float32)
mape = metrics.masked_mape_np(preds=preds, labels=labels)
self.assertEqual(0., mape)
def test_masked_mape_np_nan(self):
preds = np.array([
[1, 2],
[3, 4],
], dtype=np.float32)
labels = np.array([
[np.nan, np.nan],
[np.nan, 3]
], dtype=np.float32)
mape = metrics.masked_mape_np(preds=preds, labels=labels)
self.assertAlmostEqual(1 / 3., mape, delta=1e-5)
def test_masked_rmse_np_vanilla(self):
preds = np.array([
[1, 2],
[3, 4],
], dtype=np.float32)
labels = np.array([
[1, 4],
[3, 4]
], dtype=np.float32)
mape = metrics.masked_rmse_np(preds=preds, labels=labels, null_val=0)
self.assertEqual(1., mape)
def test_masked_rmse_np_nan(self):
preds = np.array([
[1, 2],
[3, 4],
], dtype=np.float32)
labels = np.array([
[1, np.nan],
[3, 4]
], dtype=np.float32)
rmse = metrics.masked_rmse_np(preds=preds, labels=labels)
self.assertEqual(0., rmse)
def test_masked_rmse_np_all_zero(self):
preds = np.array([
[1, 2],
[3, 4],
], dtype=np.float32)
labels = np.array([
[0, 0],
[0, 0]
], dtype=np.float32)
mape = metrics.masked_rmse_np(preds=preds, labels=labels, null_val=0)
self.assertEqual(0., mape)
def test_masked_rmse_np_missing(self):
preds = np.array([
[1, 2],
[3, 4],
], dtype=np.float32)
labels = np.array([
[1, 0],
[3, 4]
], dtype=np.float32)
mape = metrics.masked_rmse_np(preds=preds, labels=labels, null_val=0)
self.assertEqual(0., mape)
def test_masked_rmse_np2(self):
preds = np.array([
[1, 2],
[3, 4],
], dtype=np.float32)
labels = np.array([
[1, 0],
[3, 3]
], dtype=np.float32)
rmse = metrics.masked_rmse_np(preds=preds, labels=labels, null_val=0)
self.assertAlmostEqual(np.sqrt(1 / 3.), rmse, delta=1e-5)
class TFRMSETestCase(unittest.TestCase):
def test_masked_mse_null(self):
with tf.Session() as sess:
preds = tf.constant(np.array([
[1, 2],
[3, 4],
], dtype=np.float32))
labels = tf.constant(np.array([
[1, 0],
[3, 3]
], dtype=np.float32))
rmse = metrics.masked_mse_tf(preds=preds, labels=labels, null_val=0)
self.assertAlmostEqual(1 / 3.0, sess.run(rmse), delta=1e-5)
def test_masked_mse_vanilla(self):
with tf.Session() as sess:
preds = tf.constant(np.array([
[1, 2],
[3, 4],
], dtype=np.float32))
labels = tf.constant(np.array([
[1, 0],
[3, 3]
], dtype=np.float32))
rmse = metrics.masked_mse_tf(preds=preds, labels=labels)
self.assertAlmostEqual(1.25, sess.run(rmse), delta=1e-5)
def test_masked_mse_all_zero(self):
with tf.Session() as sess:
preds = tf.constant(np.array([
[1, 2],
[3, 4],
], dtype=np.float32))
labels = tf.constant(np.array([
[0, 0],
[0, 0]
], dtype=np.float32))
rmse = metrics.masked_mse_tf(preds=preds, labels=labels, null_val=0)
self.assertAlmostEqual(0., sess.run(rmse), delta=1e-5)
def test_masked_mse_nan(self):
with tf.Session() as sess:
preds = tf.constant(np.array([
[1, 2],
[3, 4],
], dtype=np.float32))
labels = tf.constant(np.array([
[1, 2],
[3, np.nan]
], dtype=np.float32))
rmse = metrics.masked_mse_tf(preds=preds, labels=labels)
self.assertAlmostEqual(0., sess.run(rmse), delta=1e-5)
def test_masked_mse_all_nan(self):
with tf.Session() as sess:
preds = tf.constant(np.array([
[1, 2],
[3, 4],
], dtype=np.float32))
labels = tf.constant(np.array([
[np.nan, np.nan],
[np.nan, np.nan]
], dtype=np.float32))
rmse = metrics.masked_mse_tf(preds=preds, labels=labels, null_val=0)
self.assertAlmostEqual(0., sess.run(rmse), delta=1e-5)
if __name__ == '__main__':
unittest.main()

97
lib/tf_utils.py Normal file
View File

@ -0,0 +1,97 @@
from __future__ import absolute_import, division, print_function
import numpy as np
import scipy.sparse as sp
import tensorflow as tf
def add_simple_summary(writer, names, values, global_step):
"""
Writes summary for a list of scalars.
:param writer:
:param names:
:param values:
:param global_step:
:return:
"""
for name, value in zip(names, values):
summary = tf.Summary()
summary_value = summary.value.add()
summary_value.simple_value = value
summary_value.tag = name
writer.add_summary(summary, global_step)
def adj_tensor_dot(adj, y):
""" Computes the matrix multiplication for the adjacency matrix and the 3D dense matrix y.
:param adj: square matrix with shape(n_node, n_node)
:param y: 3D tensor, with shape (batch_size, n_node, output_dim)
"""
y_shape = [i.value for i in y.shape]
if len(y_shape) != 3:
raise Exception('Dimension of y must be 3, instead of: %d' % len(y_shape))
y_permute_dim = list(range(len(y_shape)))
y_permute_dim = [y_permute_dim.pop(-2)] + y_permute_dim
yt = tf.reshape(tf.transpose(y, perm=y_permute_dim), [y_shape[-2], -1])
if isinstance(adj, tf.SparseTensor):
res = tf.sparse_tensor_dense_matmul(adj, yt)
else:
res = tf.matmul(adj, yt)
res = tf.reshape(res, [y_shape[-2], -1, y_shape[-1]])
res = tf.transpose(res, perm=[1, 0, 2])
return res
def dot(x, y):
"""
Wrapper for tf.matmul for x with rank >= 2.
:param x: matrix with rank >=2
:param y: matrix with rank==2
:return:
"""
[input_dim, output_dim] = y.get_shape().as_list()
input_shape = tf.shape(x)
batch_rank = input_shape.get_shape()[0].value - 1
batch_shape = input_shape[:batch_rank]
output_shape = tf.concat(0, [batch_shape, [output_dim]])
x = tf.reshape(x, [-1, input_dim])
result_ = tf.matmul(x, y)
result = tf.reshape(result_, output_shape)
return result
def get_total_trainable_parameter_size():
"""
Calculates the total number of trainable parameters in the current graph.
:return:
"""
total_parameters = 0
for variable in tf.trainable_variables():
# shape is an array of tf.Dimension
total_parameters += np.product([x.value for x in variable.get_shape()])
return total_parameters
def sparse_matrix_to_tf_sparse_tensor(sparse_mx):
"""Converts sparse matrix to tuple representation as required by tf.SparseTensor"""
def to_tuple(mx):
if not sp.isspmatrix_coo(mx):
mx = mx.tocoo()
indices = np.vstack((mx.row, mx.col)).transpose()
values = mx.data
shape = mx.shape
return indices, values, shape
if isinstance(sparse_mx, list):
for i in range(len(sparse_mx)):
sparse_mx[i] = to_tuple(sparse_mx[i])
else:
sparse_mx = to_tuple(sparse_mx)
return sparse_mx

28
lib/tf_utils_test.py Normal file
View File

@ -0,0 +1,28 @@
import unittest
import numpy as np
import tensorflow as tf
from lib import tf_utils
class TensorDotTest(unittest.TestCase):
def test_adj_tensor_dot(self):
# adj: [[1, 0], [0, 1]]
# SparseTensor(indices=[[0, 0], [1, 2]], values=[1, 2], dense_shape=[3, 4])
adj_indices = [[0, 0], [1, 1]]
adj_values = np.array([1, 1], dtype=np.float32)
adj_shape = [2, 2]
adj = tf.SparseTensor(adj_indices, adj_values, adj_shape)
# y: (2, 2, 2), [[[1, 0], [0, 1]], [[1, 1], [1, 1]]]
y = np.array([[[1, 0], [0, 1]], [[1, 1], [1, 1]]], dtype=np.float32)
y = tf.constant(y)
expected_result = np.array([[[1, 0], [0, 1]], [[1, 1], [1, 1]]], dtype=np.float32)
result = tf_utils.adj_tensor_dot(adj, y)
with tf.Session() as sess:
result_ = sess.run(result)
self.assertTrue(np.array_equal(expected_result, result_))
if __name__ == '__main__':
unittest.main()

221
lib/utils.py Normal file
View File

@ -0,0 +1,221 @@
import datetime
import numpy as np
import pandas as pd
class StandardScaler:
"""
Standard the input
"""
def __init__(self, mean, std):
self.mean = mean
self.std = std
def transform(self, data):
return (data - self.mean) / self.std
def inverse_transform(self, data):
return (data * self.std) + self.mean
def get_rush_hours_bool_index(df, hours=((7, 10), (17, 20)), weekdays=(0, 5)):
"""
Calculates predator of rush hours: 7:00am - 9:59am, 4:00pm-7:59am, Mon-Fri.
:param df:
:param hours: a tuple of two, (start_hour, end_hour)
:param weekdays: a tuple of two, (start_weekday, end_weekday)
"""
# Week day.
weekday_predate = (df.index.dayofweek >= weekdays[0]) & (df.index.dayofweek < weekdays[1])
# Hours.
hour_predate = (df.index.time >= datetime.time(hours[0][0], 0)) & (df.index.time < datetime.time(hours[0][1], 0))
hour_predate |= (df.index.time >= datetime.time(hours[1][0], 0)) & (df.index.time < datetime.time(hours[1][1], 0))
return weekday_predate & hour_predate
def generate_io_data(data, seq_len, horizon=1, scaler=None):
"""
Generates input, output data which are
Args:
:param data: tensor
:param seq_len: length of the sequence, or timesteps.
:param horizon: the horizon of prediction.
:param strides:
:param scaler:
:return (X, Y) i.e., input, output
"""
xs, ys = [], []
total_seq_len, _ = data.shape
assert np.ndim(data) == 2
if scaler:
data = scaler.transform(data)
for i in range(0, total_seq_len - horizon - seq_len + 1):
x_i = data[i: i + seq_len, :]
y_i = data[i + seq_len + horizon - 1, :]
xs.append(x_i)
ys.append(y_i)
xs = np.stack(xs, axis=0)
ys = np.stack(ys, axis=0)
return xs, ys
def generate_io_data_with_time(df, batch_size, seq_len, horizon, output_type='point', scaler=None,
add_time_in_day=True, add_day_in_week=False):
"""
:param df:
:param batch_size:
:param seq_len:
:param horizon:
:param output_type: point, range, seq2seq
:param scaler:
:param add_time_in_day:
:param add_day_in_week:
:return:
x, y, both are 3-D tensors with size (epoch_size, batch_size, input_dim).
"""
if scaler:
df = scaler.transform(df)
num_samples, num_nodes = df.shape
data = df.values
batch_len = num_samples // batch_size
data_list = [data]
if add_time_in_day:
time_ind = (df.index.values - df.index.values.astype('datetime64[D]')) / np.timedelta64(1, 'D')
data_list.append(time_ind.reshape(-1, 1))
if add_day_in_week:
day_in_week = np.zeros(shape=(num_samples, 7))
day_in_week[np.arange(num_samples), df.index.dayofweek] = 1
data_list.append(day_in_week)
data = np.concatenate(data_list, axis=-1)
data = data[:batch_size * batch_len, :].reshape((batch_size, batch_len, -1))
xs, ys = [], []
for i in range(seq_len, batch_len - horizon + 1):
x_i, y_i = None, None
if output_type == 'point':
x_i = data[:, i - seq_len: i, :].reshape((batch_size, -1))
y_i = data[:, i + horizon - 1, :num_nodes].reshape((batch_size, -1))
elif output_type == 'range':
x_i = data[:, i - seq_len: i, :].reshape((batch_size, -1))
y_i = data[:, i: i + horizon, :num_nodes].reshape((batch_size, -1))
elif output_type == 'seq2seq':
x_i = data[:, i - seq_len: i, :]
y_i = data[:, i: i + horizon, :]
xs.append(x_i)
ys.append(y_i)
xs = np.stack(xs, axis=0)
ys = np.stack(ys, axis=0)
return xs, ys
def generate_graph_seq2seq_io_data_with_time(df, batch_size, seq_len, horizon, num_nodes, scaler=None,
add_time_in_day=True, add_day_in_week=False):
"""
:param df:
:param batch_size:
:param seq_len:
:param horizon:
:param scaler:
:param add_day_in_week:
:return:
x, y, both are 5-D tensors with size (epoch_size, batch_size, seq_len, num_sensors, input_dim).
Adjacent batches are continuous sequence, i.e., x[i, j, :, :] is before x[i+1, j, :, :]
"""
if scaler:
df = scaler.transform(df)
num_samples, _ = df.shape
data = df.values
batch_len = num_samples // batch_size
data = np.expand_dims(data, axis=-1)
data_list = [data]
if add_time_in_day:
time_ind = (df.index.values - df.index.values.astype('datetime64[D]')) / np.timedelta64(1, 'D')
time_in_day = np.tile(time_ind, [1, num_nodes, 1]).transpose((2, 1, 0))
data_list.append(time_in_day)
if add_day_in_week:
day_in_week = np.zeros(shape=(num_samples, num_nodes, 7))
day_in_week[np.arange(num_samples), :, df.index.dayofweek] = 1
data_list.append(day_in_week)
data = np.concatenate(data_list, axis=-1)
data = data[:batch_size * batch_len, :, :].reshape((batch_size, batch_len, num_nodes, -1))
epoch_size = batch_len - seq_len - horizon + 1
x, y = [], []
for i in range(epoch_size):
x_i = data[:, i: i + seq_len, ...]
y_i = data[:, i + seq_len: i + seq_len + horizon, :, :]
x.append(x_i)
y.append(y_i)
x = np.stack(x, axis=0)
y = np.stack(y, axis=0)
return x, y
def round_down(num, divisor):
return num - (num % divisor)
def separate_seasonal_trend_and_residual(df, period, test_ratio=0.2, null_val=0., epsilon=1e-4):
"""
:param df:
:param period:
:param test_ratio: only use training part to calculate the average.
:param null_val: indicator of missing values. Assuming null_val
:param epsilon:
:return:
"""
n_sample, n_sensor = df.shape
n_test = int(round(n_sample * test_ratio))
n_train = n_sample - n_test
seasonal_trend = np.zeros((period, n_sensor), dtype=np.float32)
for i in range(period):
inds = [j for j in range(i % period, n_train, period)]
historical = df.iloc[inds, :]
seasonal_trend[i, :] = historical[historical != null_val].mean()
n_repeat = (n_sample + period - 1) // period
data = np.tile(seasonal_trend, [n_repeat, 1])[:n_sample, :]
seasonal_df = pd.DataFrame(data, index=df.index, columns=df.columns)
# Records where null value is happening.
missing_ind = df == null_val
residual_df = df - seasonal_df
residual_df[residual_df == null_val] += epsilon
residual_df[missing_ind] = null_val
return seasonal_df, residual_df
def train_test_split(x, y, test_ratio=0.2, random=False, granularity=1):
"""
This just splits data to training and testing parts. Default 80% train, 20% test
Format : data is in compressed sparse row format
Args:
:param x data
:param y label
:param test_ratio:
:param random: whether to randomize the input data.
:param granularity:
"""
perms = np.arange(0, x.shape[0])
if random:
perms = np.random.permutation(np.arange(0, x.shape[0]))
n_train = round_down(int(round(x.shape[0] * (1 - test_ratio))), granularity)
n_test = round_down(x.shape[0] - n_train, granularity)
x_train, y_train = x.take(perms[:n_train], axis=0), y.take(perms[:n_train], axis=0)
x_test, y_test = x.take(perms[n_train:n_train + n_test], axis=0), y.take(perms[n_train:n_train + n_test], axis=0)
return (x_train, y_train), (x_test, y_test)
def train_val_test_split_df(df, val_ratio=0.1, test_ratio=0.2):
n_sample, _ = df.shape
n_val = int(round(n_sample * val_ratio))
n_test = int(round(n_sample * test_ratio))
n_train = n_sample - n_val - n_test
train_data, val_data, test_data = df.iloc[:n_train, :], df.iloc[n_train: n_train + n_val, :], df.iloc[-n_test:, :]
return train_data, val_data, test_data

112
lib/utils_test.py Normal file
View File

@ -0,0 +1,112 @@
import unittest
import numpy as np
import pandas as pd
from lib import utils
from lib.utils import StandardScaler
class MyTestCase(unittest.TestCase):
def test_separate_seasonal_trend_and_residual(self):
data = np.array([
[2, 1, 2, 3, 0, 1, 2, 1, 2, 3, 4, 3, 0, 3, 4, 1]
], dtype=np.float32).T
trends = np.array([
[1, 2, 3, 2, 1, 2, 3, 2, 1, 2, 3, 2, 1, 2, 3, 2]
], dtype=np.float32).T
residual = np.array([
[1, -1, -1, 1, -1, -1, -1, -1, 1, 1, 1, 1, -1, 1, 1, -1]
], dtype=np.float32).T
df = pd.DataFrame(data)
df_trend, df_residual = utils.separate_seasonal_trend_and_residual(df, period=4, test_ratio=0, null_val=-1)
self.assertTrue(np.array_equal(df_trend.values, trends))
self.assertTrue(np.array_equal(df_residual.values, residual))
def test_get_rush_hours_bool_index(self):
index = pd.date_range('2017-02-27', '2017-03-06', freq='1min')
data = np.zeros((len(index), 3))
df = pd.DataFrame(data, index=index)
ind = utils.get_rush_hours_bool_index(df)
df = df[ind]
self.assertEqual(6 * 5 * 60, df.shape[0])
class IODataPreparationTest(unittest.TestCase):
from lib import utils
def test_generate_io_data_with_time(self):
data = np.array([
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15],
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
], dtype=np.float32).T
df = pd.DataFrame(data, index=pd.date_range('2017-10-18', '2017-10-19 23:59', freq='3h'))
xs, ys = utils.generate_io_data_with_time(df, batch_size=2, seq_len=3, horizon=3, output_type='range', )
self.assertTupleEqual(xs.shape, (3, 2, 9))
self.assertTupleEqual(ys.shape, (3, 2, 6))
class StandardScalerTest(unittest.TestCase):
def test_transform(self):
data = np.array([
[35., 0.],
[0., 17.5],
[70., 35.]]
)
expected_result = np.array([
[0., -1.],
[-1, -0.5],
[1., 0.]]
)
scaler = StandardScaler(mean=35., std=35.)
result = scaler.transform(data)
self.assertTrue(np.array_equal(expected_result, result))
def test_transform_df(self):
df = pd.DataFrame([
[35., 0.],
[0., 17.5],
[70., 35.]]
)
expected_result = np.array([
[0., -1.],
[-1, -0.5],
[1., 0.]]
)
scaler = StandardScaler(mean=35., std=35.)
result = scaler.transform(df)
self.assertTrue(np.array_equal(expected_result, result.values))
def test_reverse_transform(self):
data = np.array([
[0., -1.],
[-1, -0.5],
[1., 0.]]
)
expected_result = np.array([
[35., 0.],
[0., 17.5],
[70., 35.]]
)
scaler = StandardScaler(mean=35., std=35.)
result = scaler.inverse_transform(data)
self.assertTrue(np.array_equal(expected_result, result))
def test_reverse_transform_df(self):
df = pd.DataFrame([
[0., -1.],
[-1, -0.5],
[1., 0.]]
)
expected_result = np.array([
[35., 0.],
[0., 17.5],
[70., 35.]]
)
scaler = StandardScaler(mean=35., std=35.)
result = scaler.inverse_transform(df)
self.assertTrue(np.array_equal(expected_result, result.values))
if __name__ == '__main__':
unittest.main()

0
model/__init__.py Normal file
View File

166
model/dcrnn_cell.py Normal file
View File

@ -0,0 +1,166 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import tensorflow as tf
from tensorflow.contrib.rnn import RNNCell
from tensorflow.python.platform import tf_logging as logging
from lib import dcrnn_utils
class DCGRUCell(RNNCell):
"""Graph Convolution Gated Recurrent Unit cell.
"""
def call(self, inputs, **kwargs):
pass
def _compute_output_shape(self, input_shape):
pass
def __init__(self, num_units, adj_mx, max_diffusion_step, num_nodes, input_size=None, num_proj=None,
activation=tf.nn.tanh, reuse=None, filter_type="laplacian"):
"""
:param num_units:
:param adj_mx:
:param max_diffusion_step:
:param num_nodes:
:param input_size:
:param num_proj:
:param activation:
:param reuse:
:param filter_type: "laplacian", "random_walk", "dual_random_walk".
"""
super(DCGRUCell, self).__init__(_reuse=reuse)
if input_size is not None:
logging.warn("%s: The input_size parameter is deprecated.", self)
self._activation = activation
self._num_nodes = num_nodes
self._num_proj = num_proj
self._num_units = num_units
self._max_diffusion_step = max_diffusion_step
self._supports = []
supports = []
if filter_type == "laplacian":
supports.append(dcrnn_utils.calculate_scaled_laplacian(adj_mx, lambda_max=None))
elif filter_type == "random_walk":
supports.append(dcrnn_utils.calculate_random_walk_matrix(adj_mx).T)
elif filter_type == "dual_random_walk":
supports.append(dcrnn_utils.calculate_random_walk_matrix(adj_mx).T)
supports.append(dcrnn_utils.calculate_random_walk_matrix(adj_mx.T).T)
else:
supports.append(dcrnn_utils.calculate_scaled_laplacian(adj_mx))
for support in supports:
self._supports.append(self._build_sparse_matrix(support))
@staticmethod
def _build_sparse_matrix(L):
L = L.tocoo()
indices = np.column_stack((L.row, L.col))
L = tf.SparseTensor(indices, L.data, L.shape)
return tf.sparse_reorder(L)
@property
def state_size(self):
return self._num_nodes * self._num_units
@property
def output_size(self):
output_size = self._num_nodes * self._num_units
if self._num_proj is not None:
output_size = self._num_nodes * self._num_proj
return output_size
def __call__(self, inputs, state, scope=None):
"""Gated recurrent unit (GRU) with Graph Convolution.
:param inputs: (B, num_nodes * input_dim)
:return
- Output: A `2-D` tensor with shape `[batch_size x self.output_size]`.
- New state: Either a single `2-D` tensor, or a tuple of tensors matching
the arity and shapes of `state`
"""
with tf.variable_scope(scope or "dcgru_cell"):
with tf.variable_scope("gates"): # Reset gate and update gate.
# We start with bias of 1.0 to not reset and not update.
value = tf.nn.sigmoid(
self._gconv(inputs, state, 2 * self._num_units, bias_start=1.0, scope=scope))
r, u = tf.split(value=value, num_or_size_splits=2, axis=1)
# r, u = sigmoid(r), sigmoid(u)
with tf.variable_scope("candidate"):
c = self._gconv(inputs, r * state, self._num_units, scope=scope)
if self._activation is not None:
c = self._activation(c)
output = new_state = u * state + (1 - u) * c
if self._num_proj is not None:
with tf.variable_scope("projection"):
w = tf.get_variable('w', shape=(self._num_units, self._num_proj))
batch_size = inputs.get_shape()[0].value
output = tf.reshape(new_state, shape=(-1, self._num_units))
output = tf.reshape(tf.matmul(output, w), shape=(batch_size, self.output_size))
return output, new_state
@staticmethod
def _concat(x, x_):
x_ = tf.expand_dims(x_, 0)
return tf.concat([x, x_], axis=0)
def _gconv(self, inputs, state, output_size, bias_start=0.0, scope=None):
"""Graph convolution between input and the graph matrix.
:param args: a 2D Tensor or a list of 2D, batch x n, Tensors.
:param output_size:
:param bias:
:param bias_start:
:param scope:
:return:
"""
# Reshape input and state to (batch_size, num_nodes, input_dim/state_dim)
batch_size = inputs.get_shape()[0].value
inputs = tf.reshape(inputs, (batch_size, self._num_nodes, -1))
state = tf.reshape(state, (batch_size, self._num_nodes, -1))
inputs_and_state = tf.concat([inputs, state], axis=2)
input_size = inputs_and_state.get_shape()[2].value
dtype = inputs.dtype
x = inputs_and_state
x0 = tf.transpose(x, perm=[1, 2, 0]) # (num_nodes, total_arg_size, batch_size)
x0 = tf.reshape(x0, shape=[self._num_nodes, input_size * batch_size])
x = tf.expand_dims(x0, axis=0)
scope = tf.get_variable_scope()
with tf.variable_scope(scope):
if self._max_diffusion_step == 0:
pass
else:
for support in self._supports:
x1 = tf.sparse_tensor_dense_matmul(support, x0)
x = self._concat(x, x1)
for k in range(2, self._max_diffusion_step + 1):
x2 = 2 * tf.sparse_tensor_dense_matmul(support, x1) - x0
x = self._concat(x, x2)
x1, x0 = x2, x1
num_matrices = len(self._supports) * self._max_diffusion_step + 1 # Adds for x itself.
x = tf.reshape(x, shape=[num_matrices, self._num_nodes, input_size, batch_size])
x = tf.transpose(x, perm=[3, 1, 2, 0]) # (batch_size, num_nodes, input_size, order)
x = tf.reshape(x, shape=[batch_size * self._num_nodes, input_size * num_matrices])
weights = tf.get_variable(
'weights', [input_size * num_matrices, output_size], dtype=dtype,
initializer=tf.contrib.layers.xavier_initializer())
x = tf.matmul(x, weights) # (batch_size * self._num_nodes, output_size)
biases = tf.get_variable(
"biases", [output_size],
dtype=dtype,
initializer=tf.constant_initializer(bias_start, dtype=dtype))
x = tf.nn.bias_add(x, biases)
# Reshape res back to 2D: (batch_size, num_node, state_dim) -> (batch_size, num_node * state_dim)
return tf.reshape(x, [batch_size, self._num_nodes * output_size])

106
model/dcrnn_model.py Normal file
View File

@ -0,0 +1,106 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
from tensorflow.contrib import legacy_seq2seq
from lib.metrics import masked_mse_loss, masked_mae_loss, masked_rmse_loss
from model.dcrnn_cell import DCGRUCell
from model.tf_model import TFModel
class DCRNNModel(TFModel):
def __init__(self, is_training, config, scaler=None, adj_mx=None):
super(DCRNNModel, self).__init__(config, scaler=scaler)
batch_size = int(config.get('batch_size'))
max_diffusion_step = int(config.get('max_diffusion_step', 2))
cl_decay_steps = int(config.get('cl_decay_steps', 1000))
filter_type = config.get('filter_type', 'laplacian')
horizon = int(config.get('horizon', 1))
input_dim = int(config.get('input_dim', 1))
loss_func = config.get('loss_func', 'MSE')
max_grad_norm = float(config.get('max_grad_norm', 5.0))
num_nodes = int(config.get('num_nodes', 1))
num_rnn_layers = int(config.get('num_rnn_layers', 1))
output_dim = int(config.get('output_dim', 1))
rnn_units = int(config.get('rnn_units'))
seq_len = int(config.get('seq_len'))
use_curriculum_learning = bool(config.get('use_curriculum_learning', False))
assert input_dim == output_dim, 'input_dim: %d != output_dim: %d' % (input_dim, output_dim)
# Input (batch_size, timesteps, num_sensor, input_dim)
self._inputs = tf.placeholder(tf.float32, shape=(batch_size, seq_len, num_nodes, input_dim), name='inputs')
# Labels: (batch_size, timesteps, num_sensor, output_dim)
self._labels = tf.placeholder(tf.float32, shape=(batch_size, horizon, num_nodes, output_dim), name='labels')
GO_SYMBOL = tf.zeros(shape=(batch_size, num_nodes * input_dim))
cell = DCGRUCell(rnn_units, adj_mx, max_diffusion_step=max_diffusion_step, num_nodes=num_nodes,
filter_type=filter_type)
cell_with_projection = DCGRUCell(rnn_units, adj_mx, max_diffusion_step=max_diffusion_step, num_nodes=num_nodes,
num_proj=output_dim, filter_type=filter_type)
encoding_cells = [cell] * num_rnn_layers
decoding_cells = [cell] * (num_rnn_layers - 1) + [cell_with_projection]
encoding_cells = tf.contrib.rnn.MultiRNNCell(encoding_cells, state_is_tuple=True)
decoding_cells = tf.contrib.rnn.MultiRNNCell(decoding_cells, state_is_tuple=True)
global_step = tf.train.get_or_create_global_step()
# Outputs: (batch_size, timesteps, num_nodes, output_dim)
with tf.variable_scope('DCRNN_SEQ'):
inputs = tf.unstack(tf.reshape(self._inputs, (batch_size, seq_len, num_nodes * input_dim)), axis=1)
labels = tf.unstack(tf.reshape(self._labels, (batch_size, horizon, num_nodes * output_dim)), axis=1)
labels.insert(0, GO_SYMBOL)
loop_function = None
if is_training:
if use_curriculum_learning:
def loop_function(prev, i):
c = tf.random_uniform((), minval=0, maxval=1.)
threshold = self._compute_sampling_threshold(global_step, cl_decay_steps)
result = tf.cond(tf.less(c, threshold), lambda: labels[i], lambda: prev)
return result
else:
# Return the output of the model.
def loop_function(prev, _):
return prev
_, enc_state = tf.contrib.rnn.static_rnn(encoding_cells, inputs, dtype=tf.float32)
outputs, final_state = legacy_seq2seq.rnn_decoder(labels, enc_state, decoding_cells,
loop_function=loop_function)
# Project the output to output_dim.
outputs = tf.stack(outputs[:-1], axis=1)
self._outputs = tf.reshape(outputs, (batch_size, horizon, num_nodes, output_dim), name='outputs')
preds = self._outputs[..., 0]
labels = self._labels[..., 0]
null_val = config.get('null_val', 0.)
self._mae = masked_mae_loss(self._scaler, null_val)(preds=preds, labels=labels)
if loss_func == 'MSE':
self._loss = masked_mse_loss(self._scaler, null_val)(preds=self._outputs, labels=self._labels)
elif loss_func == 'MAE':
self._loss = masked_mae_loss(self._scaler, null_val)(preds=self._outputs, labels=self._labels)
elif loss_func == 'RMSE':
self._loss = masked_rmse_loss(self._scaler, null_val)(preds=self._outputs, labels=self._labels)
else:
self._loss = masked_mse_loss(self._scaler, null_val)(preds=self._outputs, labels=self._labels)
if is_training:
optimizer = tf.train.AdamOptimizer(self._lr)
tvars = tf.trainable_variables()
grads = tf.gradients(self._loss, tvars)
grads, _ = tf.clip_by_global_norm(grads, max_grad_norm)
self._train_op = optimizer.apply_gradients(zip(grads, tvars), global_step=global_step, name='train_op')
self._merged = tf.summary.merge_all()
@staticmethod
def _compute_sampling_threshold(global_step, k):
"""
Computes the sampling probability for scheduled sampling using inverse sigmoid.
:param global_step:
:param k:
:return:
"""
return tf.cast(k / (k + tf.exp(global_step / k)), tf.float32)

135
model/dcrnn_supervisor.py Normal file
View File

@ -0,0 +1,135 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import time
import numpy as np
import pandas as pd
import tensorflow as tf
from lib.utils import generate_graph_seq2seq_io_data_with_time
from model.dcrnn_model import DCRNNModel
from model.tf_model_supervisor import TFModelSupervisor
class DCRNNSupervisor(TFModelSupervisor):
"""
Do experiments using Graph Random Walk RNN model.
"""
def __init__(self, traffic_reading_df, adj_mx, config):
self._adj_mx = adj_mx
super(DCRNNSupervisor, self).__init__(config, df_data=traffic_reading_df)
def _prepare_train_val_test_data(self):
# Parsing model parameters.
batch_size = self._get_config('batch_size')
horizon = self._get_config('horizon')
seq_len = self._get_config('seq_len')
test_batch_size = 1
add_time_in_day = self._get_config('add_time_in_day')
num_nodes = self._df_train.shape[-1]
x_train, y_train = generate_graph_seq2seq_io_data_with_time(self._df_train,
batch_size=batch_size,
seq_len=seq_len,
horizon=horizon,
num_nodes=num_nodes,
scaler=self._scaler,
add_time_in_day=add_time_in_day,
add_day_in_week=False)
x_val, y_val = generate_graph_seq2seq_io_data_with_time(self._df_val, batch_size=batch_size,
seq_len=seq_len,
horizon=horizon,
num_nodes=num_nodes,
scaler=self._scaler,
add_time_in_day=add_time_in_day,
add_day_in_week=False)
x_test, y_test = generate_graph_seq2seq_io_data_with_time(self._df_test,
batch_size=test_batch_size,
seq_len=seq_len,
horizon=horizon,
num_nodes=num_nodes,
scaler=self._scaler,
add_time_in_day=add_time_in_day,
add_day_in_week=False)
return x_train, y_train, x_val, y_val, x_test, y_test
def _build_train_val_test_models(self):
# Builds the model.
input_dim = self._x_train.shape[-1]
num_nodes = self._df_test.shape[-1]
output_dim = self._y_train.shape[-1]
test_batch_size = self._get_config('test_batch_size')
train_config = dict(self._config)
train_config.update({
'input_dim': input_dim,
'num_nodes': num_nodes,
'output_dim': output_dim,
})
test_config = dict(self._config)
test_config.update({
'batch_size': test_batch_size,
'input_dim': input_dim,
'num_nodes': num_nodes,
'output_dim': output_dim,
})
with tf.name_scope('Train'):
with tf.variable_scope('DCRNN', reuse=False):
train_model = DCRNNModel(is_training=True, config=train_config, scaler=self._scaler,
adj_mx=self._adj_mx)
with tf.name_scope('Val'):
with tf.variable_scope('DCRNN', reuse=True):
val_model = DCRNNModel(is_training=False, config=train_config, scaler=self._scaler,
adj_mx=self._adj_mx)
with tf.name_scope('Test'):
with tf.variable_scope('DCRNN', reuse=True):
test_model = DCRNNModel(is_training=False, config=test_config, scaler=self._scaler,
adj_mx=self._adj_mx)
return train_model, val_model, test_model
def _convert_model_outputs_to_eval_df(self, y_preds):
y_preds = np.stack(y_preds, axis=1)
# y_preds: (batch_size, epoch_size, horizon, num_nodes, output_dim)
# horizon = y_preds.shape[2]
horizon = self._get_config('horizon')
num_nodes = self._df_train.shape[-1]
df_preds = {}
for horizon_i in range(horizon):
y_pred = np.reshape(y_preds[:, :, horizon_i, :, 0], self._eval_dfs[horizon_i].shape)
df_pred = pd.DataFrame(self._scaler.inverse_transform(y_pred), index=self._eval_dfs[horizon_i].index,
columns=self._eval_dfs[horizon_i].columns)
df_preds[horizon_i] = df_pred
return df_preds
@staticmethod
def _generate_run_id(config):
batch_size = config.get('batch_size')
dropout = config.get('dropout')
learning_rate = config.get('learning_rate')
loss_func = config.get('loss_func')
max_diffusion_step = config['max_diffusion_step']
num_rnn_layers = config.get('num_rnn_layers')
rnn_units = config.get('rnn_units')
seq_len = config.get('seq_len')
structure = '-'.join(
['%d' % rnn_units for _ in range(num_rnn_layers)])
horizon = config.get('horizon')
filter_type = config.get('filter_type')
filter_type_abbr = 'L'
if filter_type == 'random_walk':
filter_type_abbr = 'R'
elif filter_type == 'dual_random_walk':
filter_type_abbr = 'DR'
run_id = 'dcrnn_%s_%d_h_%d_%s_lr_%g_bs_%d_d_%.2f_sl_%d_%s_%s/' % (
filter_type_abbr, max_diffusion_step, horizon,
structure, learning_rate, batch_size,
dropout, seq_len, loss_func,
time.strftime('%m%d%H%M%S'))
return run_id

130
model/tf_model.py Normal file
View File

@ -0,0 +1,130 @@
"""
Base class for tensorflow models for traffic forecasting.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import tensorflow as tf
class TFModel(object):
def __init__(self, config, scaler=None, **kwargs):
"""
Initialization including placeholders, learning rate,
:param config:
:param scaler: data z-norm normalizer
:param kwargs:
"""
self._config = dict(config)
# Placeholders for input and output.
self._inputs = None
self._labels = None
self._outputs = None
# Scaler for data normalization.
self._scaler = scaler
# Train and loss
self._loss = None
self._mae = None
self._train_op = None
# Learning rate.
learning_rate = config.get('learning_rate', 0.001)
self._lr = tf.get_variable('learning_rate', shape=(), initializer=tf.constant_initializer(learning_rate),
trainable=False)
self._new_lr = tf.placeholder(tf.float32, shape=(), name='new_learning_rate')
self._lr_update = tf.assign(self._lr, self._new_lr, name='lr_update')
# Log merged summary
self._merged = None
@staticmethod
def run_epoch(sess, model, inputs, labels, return_output=False, train_op=None, writer=None):
losses = []
maes = []
outputs = []
fetches = {
'mae': model.mae,
'loss': model.loss,
'global_step': tf.train.get_or_create_global_step()
}
if train_op:
fetches.update({
'train_op': train_op,
})
merged = model.merged
if merged is not None:
fetches.update({'merged': merged})
if return_output:
fetches.update({
'outputs': model.outputs
})
for _, (x, y) in enumerate(zip(inputs, labels)):
feed_dict = {
model.inputs: x,
model.labels: y,
}
vals = sess.run(fetches, feed_dict=feed_dict)
losses.append(vals['loss'])
maes.append(vals['mae'])
if writer is not None and 'merged' in vals:
writer.add_summary(vals['merged'], global_step=vals['global_step'])
if return_output:
outputs.append(vals['outputs'])
results = {
'loss': np.mean(losses),
'mae': np.mean(maes)
}
if return_output:
results['outputs'] = outputs
return results
def get_lr(self, sess):
return np.asscalar(sess.run(self._lr))
def set_lr(self, sess, lr):
sess.run(self._lr_update, feed_dict={
self._new_lr: lr
})
@property
def inputs(self):
return self._inputs
@property
def labels(self):
return self._labels
@property
def loss(self):
return self._loss
@property
def lr(self):
return self._lr
@property
def mae(self):
return self._mae
@property
def merged(self):
return self._merged
@property
def outputs(self):
return self._outputs
@property
def train_op(self):
return self._train_op

View File

@ -0,0 +1,282 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
import math
import numpy as np
import os
import sys
import tensorflow as tf
import time
from lib import log_helper
from lib import metrics
from lib import tf_utils
from lib import utils
from lib.utils import StandardScaler
from model.tf_model import TFModel
class TFModelSupervisor(object):
"""
Base supervisor for tensorflow models for traffic forecasting.
"""
def __init__(self, config, df_data, **kwargs):
self._config = dict(config)
self._epoch = 0
# logging.
self._init_logging()
self._logger.info(config)
# Data preparation
test_ratio = self._get_config('test_ratio')
validation_ratio = self._get_config('validation_ratio')
self._df_train, self._df_val, self._df_test = utils.train_val_test_split_df(df_data, val_ratio=validation_ratio,
test_ratio=test_ratio)
self._scaler = StandardScaler(mean=self._df_train.values.mean(), std=self._df_train.values.std())
self._x_train, self._y_train, self._x_val, self._y_val, self._x_test, self._y_test = self._prepare_train_val_test_data()
self._eval_dfs = self._prepare_eval_df()
# Build models.
self._train_model, self._val_model, self._test_model = self._build_train_val_test_models()
# Log model statistics.
total_trainable_parameter = tf_utils.get_total_trainable_parameter_size()
self._logger.info('Total number of trainable parameters: %d' % total_trainable_parameter)
for var in tf.global_variables():
self._logger.debug('%s, %s' % (var.name, var.get_shape()))
def _get_config(self, key, use_default=True):
default_config = {
'add_day_in_week': False,
'add_time_in_day': True,
'dropout': 0.,
'batch_size': 64,
'horizon': 12,
'learning_rate': 1e-3,
'lr_decay': 0.1,
'lr_decay_epoch': 50,
'lr_decay_interval': 10,
'max_to_keep': 100,
'min_learning_rate': 2e-6,
'null_val': 0.,
'output_type': 'range',
'patience': 20,
'save_model': 1,
'seq_len': 12,
'test_batch_size': 1,
'test_every_n_epochs': 10,
'test_ratio': 0.2,
'use_cpu_only': False,
'validation_ratio': 0.1,
'verbose': 0,
}
value = self._config.get(key)
if value is None and use_default:
value = default_config.get(key)
return value
def _init_logging(self):
base_dir = self._get_config('base_dir')
log_dir = self._get_config('log_dir')
if log_dir is None:
run_id = self._generate_run_id(self._config)
log_dir = os.path.join(base_dir, run_id)
if not os.path.exists(log_dir):
os.makedirs(log_dir)
else:
run_id = os.path.basename(os.path.normpath(log_dir))
self._log_dir = log_dir
self._logger = log_helper.get_logger(self._log_dir, run_id)
self._writer = tf.summary.FileWriter(self._log_dir)
def train(self, sess, **kwargs):
history = []
min_val_loss = float('inf')
wait = 0
epochs = self._get_config('epochs')
initial_lr = self._get_config('learning_rate')
min_learning_rate = self._get_config('min_learning_rate')
lr_decay_epoch = self._get_config('lr_decay_epoch')
lr_decay = self._get_config('lr_decay')
lr_decay_interval = self._get_config('lr_decay_interval')
patience = self._get_config('patience')
test_every_n_epochs = self._get_config('test_every_n_epochs')
save_model = self._get_config('save_model')
max_to_keep = self._get_config('max_to_keep')
saver = tf.train.Saver(tf.global_variables(), max_to_keep=max_to_keep)
model_filename = self._get_config('model_filename')
if model_filename is not None:
saver.restore(sess, model_filename)
self._train_model.set_lr(sess, self._get_config('learning_rate'))
self._epoch = self._get_config('epoch') + 1
else:
sess.run(tf.global_variables_initializer())
while self._epoch <= epochs:
# Learning rate schedule.
new_lr = self.calculate_scheduled_lr(initial_lr, epoch=self._epoch,
lr_decay=lr_decay, lr_decay_epoch=lr_decay_epoch,
lr_decay_interval=lr_decay_interval,
min_lr=min_learning_rate)
if new_lr != initial_lr:
self._logger.info('Updating learning rate to: %.6f' % new_lr)
self._train_model.set_lr(sess=sess, lr=new_lr)
sys.stdout.flush()
start_time = time.time()
train_results = TFModel.run_epoch(sess, self._train_model,
inputs=self._x_train, labels=self._y_train,
train_op=self._train_model.train_op, writer=self._writer)
train_loss, train_mae = train_results['loss'], train_results['mae']
if train_loss > 1e5:
self._logger.warn('Gradient explosion detected. Ending...')
break
global_step = sess.run(tf.train.get_or_create_global_step())
# Compute validation error.
val_results = TFModel.run_epoch(sess, self._val_model, inputs=self._x_val, labels=self._y_val,
train_op=None)
val_loss, val_mae = val_results['loss'], val_results['mae']
tf_utils.add_simple_summary(self._writer,
['loss/train_loss', 'metric/train_mae', 'loss/val_loss', 'metric/val_mae'],
[train_loss, train_mae, val_loss, val_mae], global_step=global_step)
end_time = time.time()
message = 'Epoch %d (%d) train_loss: %.4f, train_mae: %.4f, val_loss: %.4f, val_mae: %.4f %ds' % (
self._epoch, global_step, train_loss, train_mae, val_loss, val_mae, (end_time - start_time))
self._logger.info(message)
if self._epoch % test_every_n_epochs == test_every_n_epochs - 1:
self.test_and_write_result(sess=sess, global_step=global_step, epoch=self._epoch)
if val_loss <= min_val_loss:
wait = 0
if save_model > 0:
model_filename = self.save_model(sess, saver, val_loss)
self._logger.info(
'Val loss decrease from %.4f to %.4f, saving to %s' % (min_val_loss, val_loss, model_filename))
min_val_loss = val_loss
else:
wait += 1
if wait > patience:
self._logger.warn('Early stopping at epoch: %d' % self._epoch)
break
history.append(val_mae)
# Increases epoch.
self._epoch += 1
sys.stdout.flush()
return np.min(history)
@staticmethod
def calculate_scheduled_lr(initial_lr, epoch, lr_decay, lr_decay_epoch, lr_decay_interval,
min_lr=1e-6):
decay_factor = int(math.ceil((epoch - lr_decay_epoch) / float(lr_decay_interval)))
new_lr = initial_lr * lr_decay ** max(0, decay_factor)
new_lr = max(min_lr, new_lr)
return new_lr
@staticmethod
def _generate_run_id(config):
raise NotImplementedError
@staticmethod
def _get_config_filename(epoch):
return 'config_%02d.json' % epoch
def restore(self, sess, config):
"""
Restore from saved model.
:param sess:
:param config:
:return:
"""
model_filename = config['model_filename']
max_to_keep = self._get_config('max_to_keep')
saver = tf.train.Saver(tf.global_variables(), max_to_keep=max_to_keep)
saver.restore(sess, model_filename)
def save_model(self, sess, saver, val_loss):
config_filename = TFModelSupervisor._get_config_filename(self._epoch)
config = dict(self._config)
global_step = sess.run(tf.train.get_or_create_global_step())
config['epoch'] = self._epoch
config['global_step'] = global_step
config['log_dir'] = self._log_dir
config['model_filename'] = saver.save(sess, os.path.join(self._log_dir, 'models-%.4f' % val_loss),
global_step=global_step, write_meta_graph=False)
with open(os.path.join(self._log_dir, config_filename), 'w') as f:
json.dump(config, f)
return config['model_filename']
def test_and_write_result(self, sess, global_step, **kwargs):
null_val = self._config.get('null_val')
start_time = time.time()
test_results = TFModel.run_epoch(sess, self._test_model, self._x_test, self._y_test, return_output=True,
train_op=None)
# y_preds: a list of (batch_size, horizon, num_nodes, output_dim)
test_loss, y_preds = test_results['loss'], test_results['outputs']
tf_utils.add_simple_summary(self._writer, ['loss/test_loss'], [test_loss], global_step=global_step)
# Reshapes to (batch_size, epoch_size, horizon, num_node)
df_preds = self._convert_model_outputs_to_eval_df(y_preds)
for horizon_i in df_preds:
df_pred = df_preds[horizon_i]
df_test = self._eval_dfs[horizon_i]
mae, mape, rmse = metrics.calculate_metrics(df_pred, df_test, null_val)
tf_utils.add_simple_summary(self._writer,
['%s_%d' % (item, horizon_i + 1) for item in
['metric/rmse', 'metric/mape', 'metric/mae']],
[rmse, mape, mae],
global_step=global_step)
end_time = time.time()
message = 'Horizon %d, mape:%.4f, rmse:%.4f, mae:%.4f, %ds' % (
horizon_i + 1, mape, rmse, mae, end_time - start_time)
self._logger.info(message)
start_time = end_time
return df_preds
def _prepare_train_val_test_data(self):
"""
Prepare data for train, val and test.
:return:
"""
raise NotImplementedError
def _prepare_eval_df(self):
horizon = self._get_config('horizon')
seq_len = self._get_config('seq_len')
# y_test: (epoch_size, batch_size, ...)
n_test_samples = np.prod(self._y_test.shape[:2])
eval_dfs = {}
for horizon_i in range(horizon):
eval_dfs[horizon_i] = self._df_test[seq_len + horizon_i: seq_len + horizon_i + n_test_samples]
return eval_dfs
def _build_train_val_test_models(self):
"""
Buids models for train, val and test.
:return:
"""
raise NotImplementedError
def _convert_model_outputs_to_eval_df(self, y_preds):
"""
Convert the outputs to a dict, with key: horizon, value: the corresponding dataframe.
:param y_preds:
:return:
"""
raise NotImplementedError
@property
def log_dir(self):
return self._log_dir

5
requirements.txt Normal file
View File

@ -0,0 +1,5 @@
hyperopt>=0.1
scipy>=0.19.0
numpy>=1.12.1
pandas>=0.19.2
tensorflow>=1.3.0

71
run_demo.py Normal file
View File

@ -0,0 +1,71 @@
import json
import os
import pandas as pd
import sys
import tensorflow as tf
from lib.dcrnn_utils import load_graph_data
from model.dcrnn_supervisor import DCRNNSupervisor
flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_bool('use_cpu_only', False, 'Whether to run tensorflow on cpu.')
def run_dcrnn(traffic_reading_df):
run_id = 'dcrnn_DR_2_h_12_64-64_lr_0.01_bs_64_d_0.00_sl_12_MAE_1207002222'
log_dir = os.path.join('data/model', run_id)
config_filename = 'config_100.json'
graph_pkl_filename = 'data/sensor_graph/adj_mx.pkl'
with open(os.path.join(log_dir, config_filename)) as f:
config = json.load(f)
tf_config = tf.ConfigProto()
if FLAGS.use_cpu_only:
tf_config = tf.ConfigProto(device_count={'GPU': 0})
tf_config.gpu_options.allow_growth = True
_, _, adj_mx = load_graph_data(graph_pkl_filename)
with tf.Session(config=tf_config) as sess:
supervisor = DCRNNSupervisor(traffic_reading_df, config=config, adj_mx=adj_mx)
supervisor.restore(sess, config=config)
df_preds = supervisor.test_and_write_result(sess, config['global_step'])
for horizon_i in df_preds:
df_pred = df_preds[horizon_i]
filename = os.path.join('data/results/', 'dcrnn_prediction_%d.h5' % (horizon_i + 1))
df_pred.to_hdf(filename, 'results')
print('Predictions saved as data/results/dcrnn_seq2seq_prediction_[1-12].h5...')
# def run_fc_lstm(traffic_reading_df):
# run_id = 'fclstm_h_12_256-256_lr_0.0001_bs_64_d_0.00_sl_12_MAE_1026175218'
# log_dir = os.path.join('data/model', run_id)
#
# config_filename = 'config_59.json'
# # graph_pkl_filename = 'data/sensor_graph/sensor_graph_exp.pkl'
# with open(os.path.join(log_dir, config_filename)) as f:
# config = json.load(f)
# tf_config = tf.ConfigProto()
# if FLAGS.use_cpu_only:
# tf_config = tf.ConfigProto(device_count={'GPU': 0})
# tf_config.gpu_options.allow_growth = True
# # Set small entries to zero for sparseness.
# with tf.Session(config=tf_config) as sess:
# supervisor = LSTMSeq2SeqSupervisor(traffic_reading_df, config=config)
# supervisor.restore(sess, config=config)
# df_preds = supervisor.test_and_write_result(sess, config['global_step'])
# for horizon_i in df_preds:
# df_pred = df_preds[horizon_i]
# # filename = os.path.join('data/results/', 'gcrnn_seq2seq_prediction_%d.h5' % (horizon_i + 1))
# filename = os.path.join('data/results/', 'fc_lstm_prediction_%d.h5' % (horizon_i + 1))
# df_pred.to_hdf(filename, 'results')
# print('Predictions saved as data/results/fc_lstm_prediction_[1-12].h5...')
if __name__ == '__main__':
sys.path.append(os.getcwd())
traffic_df_filename = 'data/df_highway_2012_4mon_sample.h5'
traffic_reading_df = pd.read_hdf(traffic_df_filename)
run_dcrnn(traffic_reading_df)
# run_fc_lstm(traffic_reading_df)