From 7ba7fa320d2ce0b41db06afa322461f6610d5a39 Mon Sep 17 00:00:00 2001 From: Chintan Shah Date: Sat, 7 Sep 2019 17:53:46 -0400 Subject: [PATCH 1/4] Using pytorch image --- DCRNN_CPU | 2 +- dcrnn_train.py | 2 +- model/pytorch/__init__.py | 0 model/pytorch/dcrnn_cell.py | 1 + model/tf/__init__.py | 0 model/{ => tf}/dcrnn_cell.py | 0 model/{ => tf}/dcrnn_model.py | 3 +-- model/{ => tf}/dcrnn_supervisor.py | 2 +- requirements.txt | 3 ++- run_demo.py | 2 +- 10 files changed, 8 insertions(+), 7 deletions(-) create mode 100644 model/pytorch/__init__.py create mode 100644 model/pytorch/dcrnn_cell.py create mode 100644 model/tf/__init__.py rename model/{ => tf}/dcrnn_cell.py (100%) rename model/{ => tf}/dcrnn_model.py (98%) rename model/{ => tf}/dcrnn_supervisor.py (99%) diff --git a/DCRNN_CPU b/DCRNN_CPU index 2a7e78a..de4e4e8 100644 --- a/DCRNN_CPU +++ b/DCRNN_CPU @@ -1,3 +1,3 @@ -FROM tensorflow/tensorflow:latest-py3 +FROM ufoym/deepo:cpu COPY requirements.txt . RUN pip install -r requirements.txt diff --git a/dcrnn_train.py b/dcrnn_train.py index de75465..0c28dfb 100644 --- a/dcrnn_train.py +++ b/dcrnn_train.py @@ -7,7 +7,7 @@ import tensorflow as tf import yaml from lib.utils import load_graph_data -from model.dcrnn_supervisor import DCRNNSupervisor +from model.tf.dcrnn_supervisor import DCRNNSupervisor def main(args): diff --git a/model/pytorch/__init__.py b/model/pytorch/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/model/pytorch/dcrnn_cell.py b/model/pytorch/dcrnn_cell.py new file mode 100644 index 0000000..aaa3285 --- /dev/null +++ b/model/pytorch/dcrnn_cell.py @@ -0,0 +1 @@ +import torch diff --git a/model/tf/__init__.py b/model/tf/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/model/dcrnn_cell.py b/model/tf/dcrnn_cell.py similarity index 100% rename from model/dcrnn_cell.py rename to model/tf/dcrnn_cell.py diff --git a/model/dcrnn_model.py b/model/tf/dcrnn_model.py similarity index 98% rename from model/dcrnn_model.py rename to model/tf/dcrnn_model.py index 43797fa..16fa379 100644 --- a/model/dcrnn_model.py +++ b/model/tf/dcrnn_model.py @@ -6,8 +6,7 @@ import tensorflow as tf from tensorflow.contrib import legacy_seq2seq -from lib.metrics import masked_mae_loss -from model.dcrnn_cell import DCGRUCell +from model.tf.dcrnn_cell import DCGRUCell class DCRNNModel(object): diff --git a/model/dcrnn_supervisor.py b/model/tf/dcrnn_supervisor.py similarity index 99% rename from model/dcrnn_supervisor.py rename to model/tf/dcrnn_supervisor.py index 18a399e..8024628 100644 --- a/model/dcrnn_supervisor.py +++ b/model/tf/dcrnn_supervisor.py @@ -13,7 +13,7 @@ from lib import utils, metrics from lib.AMSGrad import AMSGrad from lib.metrics import masked_mae_loss -from model.dcrnn_model import DCRNNModel +from model.tf.dcrnn_model import DCRNNModel class DCRNNSupervisor(object): diff --git a/requirements.txt b/requirements.txt index f577b89..989b6e5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,5 @@ numpy>=1.12.1 pandas>=0.19.2 pyyaml statsmodels -tensorflow>=1.3.0 \ No newline at end of file +tensorflow>=1.3.0 +torch \ No newline at end of file diff --git a/run_demo.py b/run_demo.py index ecbbe86..05b617f 100644 --- a/run_demo.py +++ b/run_demo.py @@ -6,7 +6,7 @@ import tensorflow as tf import yaml from lib.utils import load_graph_data -from model.dcrnn_supervisor import DCRNNSupervisor +from model.tf.dcrnn_supervisor import DCRNNSupervisor def run_dcrnn(args): From 00c70b3a2738938d2995d88bc9449fa653f2328b Mon Sep 17 00:00:00 2001 From: Chintan Shah Date: Sun, 8 Sep 2019 18:47:19 -0400 Subject: [PATCH 2/4] Implemented fc layer and changed docker image to use pytorch --- model/pytorch/dcrnn_cell.py | 135 ++++++++++++++++++++++++++++++++++++ model/tf/dcrnn_cell.py | 2 +- requirements.txt | 3 +- 3 files changed, 138 insertions(+), 2 deletions(-) diff --git a/model/pytorch/dcrnn_cell.py b/model/pytorch/dcrnn_cell.py index aaa3285..ac7850f 100644 --- a/model/pytorch/dcrnn_cell.py +++ b/model/pytorch/dcrnn_cell.py @@ -1 +1,136 @@ +from typing import Optional + import torch +from torch import Tensor + +from lib import utils + + +class FCLayerParams: + def __init__(self, rnn_network: torch.nn.RNN): + self._rnn_network = rnn_network + self._params_dict = {} + self._biases_dict = {} + + def get_weights(self, shape): + if shape not in self._params_dict: + nn_param = torch.nn.init.xavier_normal(torch.empty(*shape)) + self._params_dict[shape] = nn_param + self._rnn_network.register_parameter('fc_weight_{}'.format(str(shape)), nn_param) + return self._params_dict[shape] + + def get_biases(self, length, bias_start=0.0): + if length not in self._biases_dict: + biases = torch.nn.init.constant(torch.empty(length), bias_start) + self._biases_dict[length] = biases + self._rnn_network.register_parameter('fc_biases_{}'.format(str(length)), biases) + + return self._biases_dict[length] + + +class DCGRUCell(torch.nn.RNN): + def __init__(self, num_units, adj_mx, max_diffusion_step, num_nodes, input_size: int, + hidden_size: int, + num_layers: int = 1, + num_proj=None, + nonlinearity='tanh', filter_type="laplacian", use_gc_for_ru=True): + """ + + :param num_units: + :param adj_mx: + :param max_diffusion_step: + :param num_nodes: + :param input_size: + :param num_proj: + :param nonlinearity: + :param filter_type: "laplacian", "random_walk", "dual_random_walk". + :param use_gc_for_ru: whether to use Graph convolution to calculate the reset and update gates. + """ + super(DCGRUCell, self).__init__(input_size, hidden_size, bias=True, + # bias param does not exist in tf code? + num_layers=num_layers, + nonlinearity=nonlinearity) + self._activation = torch.tanh if nonlinearity == 'tanh' else torch.relu + # support other nonlinearities up here? + self._num_nodes = num_nodes + self._num_proj = num_proj + self._num_units = num_units + self._max_diffusion_step = max_diffusion_step + self._supports = [] + self._use_gc_for_ru = use_gc_for_ru + supports = [] + if filter_type == "laplacian": + supports.append(utils.calculate_scaled_laplacian(adj_mx, lambda_max=None)) + elif filter_type == "random_walk": + supports.append(utils.calculate_random_walk_matrix(adj_mx).T) + elif filter_type == "dual_random_walk": + supports.append(utils.calculate_random_walk_matrix(adj_mx).T) + supports.append(utils.calculate_random_walk_matrix(adj_mx.T).T) + else: + supports.append(utils.calculate_scaled_laplacian(adj_mx)) + for support in supports: + self._supports.append(self._build_sparse_matrix(support)) + + self._proj_weights = torch.nn.Parameter(torch.randn(self._num_units, self._num_proj)) + self._fc_params = FCLayerParams(self) + + @property + def state_size(self): + return self._num_nodes * self._num_units + + @property + def output_size(self): + output_size = self._num_nodes * self._num_units + if self._num_proj is not None: + output_size = self._num_nodes * self._num_proj + return output_size + + def forward(self, input: Tensor, hx: Optional[Tensor] = ...): + """Gated recurrent unit (GRU) with Graph Convolution. + :param input: (B, num_nodes * input_dim) + + :return + - Output: A `2-D` tensor with shape `[batch_size x self.output_size]`. + - New state: Either a single `2-D` tensor, or a tuple of tensors matching + the arity and shapes of `state` + """ + output_size = 2 * self._num_units + # We start with bias of 1.0 to not reset and not update. + if self._use_gc_for_ru: + fn = self._gconv + else: + fn = self._fc + value = torch.sigmoid(fn(input, hx, output_size, bias_start=1.0)) + value = torch.reshape(value, (-1, self._num_nodes, output_size)) + r, u = torch.split(tensor=value, split_size_or_sections=2, dim=-1) + r = torch.reshape(r, (-1, self._num_nodes * self._num_units)) + u = torch.reshape(u, (-1, self._num_nodes * self._num_units)) + + c = self._gconv(input, r * hx, self._num_units) + if self._activation is not None: + c = self._activation(c) + + output = new_state = u * hx + (1 - u) * c + if self._num_proj is not None: + batch_size = input.shape[0] + output = torch.reshape(new_state, shape=(-1, self._num_units)) + output = torch.reshape(torch.matmul(output, self._proj_weights), + shape=(batch_size, self.output_size)) + return output, new_state + + @staticmethod + def _concat(x, x_): + x_ = x_.unsqueeze(0) + return torch.cat([x, x_], dim=0) + + def _fc(self, inputs, state, output_size, bias_start=0.0): + batch_size = inputs.shape[0] + inputs = torch.reshape(inputs, (batch_size * self._num_nodes, -1)) + state = torch.reshape(state, (batch_size * self._num_nodes, -1)) + inputs_and_state = torch.cat([inputs, state], dim=-1) + input_size = inputs_and_state.shape[-1] + weights = self._fc_params.get_weights((input_size, output_size)) + value = torch.sigmoid(torch.matmul(inputs_and_state, weights)) + biases = self._fc_params.get_biases(output_size, bias_start) + value += biases + return value \ No newline at end of file diff --git a/model/tf/dcrnn_cell.py b/model/tf/dcrnn_cell.py index 49208d0..4383eb6 100644 --- a/model/tf/dcrnn_cell.py +++ b/model/tf/dcrnn_cell.py @@ -4,7 +4,6 @@ from __future__ import print_function import numpy as np import tensorflow as tf - from tensorflow.contrib.rnn import RNNCell from lib import utils @@ -85,6 +84,7 @@ class DCGRUCell(RNNCell): """ with tf.variable_scope(scope or "dcgru_cell"): with tf.variable_scope("gates"): # Reset gate and update gate. + print(inputs.get_shape(), self.output_size) output_size = 2 * self._num_units # We start with bias of 1.0 to not reset and not update. if self._use_gc_for_ru: diff --git a/requirements.txt b/requirements.txt index 989b6e5..ee81736 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,5 @@ pandas>=0.19.2 pyyaml statsmodels tensorflow>=1.3.0 -torch \ No newline at end of file +torch +tables \ No newline at end of file From adbfa1914640523bc7ef64b5048ed51ceea1b317 Mon Sep 17 00:00:00 2001 From: Chintan Shah Date: Sun, 8 Sep 2019 19:28:20 -0400 Subject: [PATCH 3/4] Implemented DCGRUCell in pytorch (untested) --- model/pytorch/dcrnn_cell.py | 64 +++++++++++++++++++++++++++++++++---- 1 file changed, 58 insertions(+), 6 deletions(-) diff --git a/model/pytorch/dcrnn_cell.py b/model/pytorch/dcrnn_cell.py index ac7850f..d8f045c 100644 --- a/model/pytorch/dcrnn_cell.py +++ b/model/pytorch/dcrnn_cell.py @@ -6,24 +6,27 @@ from torch import Tensor from lib import utils -class FCLayerParams: - def __init__(self, rnn_network: torch.nn.RNN): +class LayerParams: + def __init__(self, rnn_network: torch.nn.RNN, type: str): self._rnn_network = rnn_network self._params_dict = {} self._biases_dict = {} + self._type = type def get_weights(self, shape): if shape not in self._params_dict: nn_param = torch.nn.init.xavier_normal(torch.empty(*shape)) self._params_dict[shape] = nn_param - self._rnn_network.register_parameter('fc_weight_{}'.format(str(shape)), nn_param) + self._rnn_network.register_parameter('{}_weight_{}'.format(self._type, str(shape)), + nn_param) return self._params_dict[shape] def get_biases(self, length, bias_start=0.0): if length not in self._biases_dict: biases = torch.nn.init.constant(torch.empty(length), bias_start) self._biases_dict[length] = biases - self._rnn_network.register_parameter('fc_biases_{}'.format(str(length)), biases) + self._rnn_network.register_parameter('{}_biases_{}'.format(self._type, str(length)), + biases) return self._biases_dict[length] @@ -72,7 +75,8 @@ class DCGRUCell(torch.nn.RNN): self._supports.append(self._build_sparse_matrix(support)) self._proj_weights = torch.nn.Parameter(torch.randn(self._num_units, self._num_proj)) - self._fc_params = FCLayerParams(self) + self._fc_params = LayerParams(self, 'fc') + self._gconv_params = LayerParams(self, 'gconv') @property def state_size(self): @@ -133,4 +137,52 @@ class DCGRUCell(torch.nn.RNN): value = torch.sigmoid(torch.matmul(inputs_and_state, weights)) biases = self._fc_params.get_biases(output_size, bias_start) value += biases - return value \ No newline at end of file + return value + + def _gconv(self, inputs, state, output_size, bias_start=0.0): + """Graph convolution between input and the graph matrix. + + :param args: a 2D Tensor or a list of 2D, batch x n, Tensors. + :param output_size: + :param bias: + :param bias_start: + :return: + """ + # Reshape input and state to (batch_size, num_nodes, input_dim/state_dim) + batch_size = inputs.shape[0] + inputs = torch.reshape(inputs, (batch_size, self._num_nodes, -1)) + state = torch.reshape(state, (batch_size, self._num_nodes, -1)) + inputs_and_state = torch.cat([inputs, state], dim=2) + input_size = inputs_and_state.shape[2].value + dtype = inputs.dtype + + x = inputs_and_state + x0 = x.permute(1, 2, 0) # (num_nodes, total_arg_size, batch_size) + x0 = torch.reshape(x0, shape=[self._num_nodes, input_size * batch_size]) + x = torch.unsqueeze(x0, 0) + + if self._max_diffusion_step == 0: + pass + else: + for support in self._supports: + # https://discuss.pytorch.org/t/sparse-x-dense-dense-matrix-multiplication/6116/7 + x1 = torch.mm(support, x0) + x = self._concat(x, x1) + + for k in range(2, self._max_diffusion_step + 1): + x2 = 2 * torch.mm(support, x1) - x0 + x = self._concat(x, x2) + x1, x0 = x2, x1 + + num_matrices = len(self._supports) * self._max_diffusion_step + 1 # Adds for x itself. + x = torch.reshape(x, shape=[num_matrices, self._num_nodes, input_size, batch_size]) + x = x.permute(3, 1, 2, 0) # (batch_size, num_nodes, input_size, order) + x = torch.reshape(x, shape=[batch_size * self._num_nodes, input_size * num_matrices]) + + weights = self._gconv_params.get_weights((input_size * num_matrices, output_size)) + x = torch.matmul(x, weights) # (batch_size * self._num_nodes, output_size) + + biases = self._gconv_params.get_biases(output_size, bias_start) + x += biases + # Reshape res back to 2D: (batch_size, num_node, state_dim) -> (batch_size, num_node * state_dim) + return torch.reshape(x, [batch_size, self._num_nodes * output_size]) From 7a5e3c02168057efea91944aeecdb7ae67103760 Mon Sep 17 00:00:00 2001 From: Chintan Shah Date: Sun, 29 Sep 2019 11:13:08 -0400 Subject: [PATCH 4/4] model partially implemented - does not work yet --- model/pytorch/dcrnn_model.py | 122 +++++++++++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) create mode 100644 model/pytorch/dcrnn_model.py diff --git a/model/pytorch/dcrnn_model.py b/model/pytorch/dcrnn_model.py new file mode 100644 index 0000000..eab2af4 --- /dev/null +++ b/model/pytorch/dcrnn_model.py @@ -0,0 +1,122 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import torch + +from model.pytorch.dcrnn_cell import DCGRUCell + + +# import tensorflow as tf +# from tensorflow.contrib import legacy_seq2seq + + +class DCRNNModel(object): + def __init__(self, is_training, batch_size, scaler, adj_mx, **model_kwargs): + # Scaler for data normalization. + self._scaler = scaler + + # Train and loss + self._loss = None + self._mae = None + self._train_op = None + + max_diffusion_step = int(model_kwargs.get('max_diffusion_step', 2)) + cl_decay_steps = int(model_kwargs.get('cl_decay_steps', 1000)) + filter_type = model_kwargs.get('filter_type', 'laplacian') + horizon = int(model_kwargs.get('horizon', 1)) + max_grad_norm = float(model_kwargs.get('max_grad_norm', 5.0)) + num_nodes = int(model_kwargs.get('num_nodes', 1)) + num_rnn_layers = int(model_kwargs.get('num_rnn_layers', 1)) + rnn_units = int(model_kwargs.get('rnn_units')) + seq_len = int(model_kwargs.get('seq_len')) + use_curriculum_learning = bool(model_kwargs.get('use_curriculum_learning', False)) + input_dim = int(model_kwargs.get('input_dim', 1)) + output_dim = int(model_kwargs.get('output_dim', 1)) + + # Input (batch_size, timesteps, num_sensor, input_dim) + # self._inputs = tf.placeholder(tf.float32, shape=(batch_size, seq_len, num_nodes, input_dim), name='inputs') + # Labels: (batch_size, timesteps, num_sensor, input_dim), same format with input except the temporal dimension. + # self._labels = tf.placeholder(tf.float32, shape=(batch_size, horizon, num_nodes, input_dim), name='labels') + + # GO_SYMBOL = tf.zeros(shape=(batch_size, num_nodes * input_dim)) + GO_SYMBOL = torch.zeros((batch_size, num_nodes * output_dim)) + + cell = DCGRUCell(rnn_units, adj_mx, max_diffusion_step=max_diffusion_step, + num_nodes=num_nodes, + filter_type=filter_type) + cell_with_projection = DCGRUCell(rnn_units, adj_mx, max_diffusion_step=max_diffusion_step, + num_nodes=num_nodes, + num_proj=output_dim, filter_type=filter_type) + encoding_cells = [cell] * num_rnn_layers + decoding_cells = [cell] * (num_rnn_layers - 1) + [cell_with_projection] + encoding_cells = tf.contrib.rnn.MultiRNNCell(encoding_cells, state_is_tuple=True) + decoding_cells = tf.contrib.rnn.MultiRNNCell(decoding_cells, state_is_tuple=True) + + global_step = tf.train.get_or_create_global_step() + # Outputs: (batch_size, timesteps, num_nodes, output_dim) + with tf.variable_scope('DCRNN_SEQ'): + inputs = tf.unstack( + tf.reshape(self._inputs, (batch_size, seq_len, num_nodes * input_dim)), axis=1) + labels = tf.unstack( + tf.reshape(self._labels[..., :output_dim], + (batch_size, horizon, num_nodes * output_dim)), axis=1) + labels.insert(0, GO_SYMBOL) + + def _loop_function(prev, i): + if is_training: + # Return either the model's prediction or the previous ground truth in training. + if use_curriculum_learning: + c = tf.random_uniform((), minval=0, maxval=1.) + threshold = self._compute_sampling_threshold(global_step, cl_decay_steps) + result = tf.cond(tf.less(c, threshold), lambda: labels[i], lambda: prev) + else: + result = labels[i] + else: + # Return the prediction of the model in testing. + result = prev + return result + + _, enc_state = tf.contrib.rnn.static_rnn(encoding_cells, inputs, dtype=tf.float32) + outputs, final_state = legacy_seq2seq.rnn_decoder(labels, enc_state, decoding_cells, + loop_function=_loop_function) + + # Project the output to output_dim. + outputs = tf.stack(outputs[:-1], axis=1) + self._outputs = tf.reshape(outputs, (batch_size, horizon, num_nodes, output_dim), + name='outputs') + self._merged = tf.summary.merge_all() + + @staticmethod + def _compute_sampling_threshold(global_step, k): + """ + Computes the sampling probability for scheduled sampling using inverse sigmoid. + :param global_step: + :param k: + :return: + """ + return tf.cast(k / (k + tf.exp(global_step / k)), tf.float32) + + @property + def inputs(self): + return self._inputs + + @property + def labels(self): + return self._labels + + @property + def loss(self): + return self._loss + + @property + def mae(self): + return self._mae + + @property + def merged(self): + return self._merged + + @property + def outputs(self): + return self._outputs