TrafficWheel/model/DCRNN/dcrnn_model.py

163 lines
7.1 KiB
Python

import numpy as np
import torch
import torch.nn as nn
from model.DCRNN.dcrnn_cell import DCGRUCell
from data.get_adj import get_adj
class Seq2SeqAttrs:
def __init__(self, args, adj_mx):
self.adj_mx = adj_mx
self.max_diffusion_step = args.get('max_diffusion_step', 2)
self.cl_decay_steps = args.get('cl_decay_steps', 1000)
self.filter_type = args.get('filter_type', 'laplacian')
self.num_nodes = args.get('num_nodes', 1)
self.num_rnn_layers = args.get('num_rnn_layers', 1)
self.rnn_units = args.get('rnn_units')
self.hidden_state_size = self.num_nodes * self.rnn_units
class EncoderModel(nn.Module, Seq2SeqAttrs):
def __init__(self, args, adj_mx):
nn.Module.__init__(self)
Seq2SeqAttrs.__init__(self, args, adj_mx)
self.input_dim = args.get('input_dim', 1)
self.seq_len = args.get('seq_len') # for the encoder
self.dcgru_layers = nn.ModuleList(
[DCGRUCell(self.rnn_units, adj_mx, self.max_diffusion_step, self.num_nodes,
filter_type=self.filter_type) for _ in range(self.num_rnn_layers)])
def forward(self, inputs, hidden_state=None):
"""
Encoder forward pass.
:param inputs: shape (batch_size, self.num_nodes * self.input_dim)
:param hidden_state: (num_layers, batch_size, self.hidden_state_size)
optional, zeros if not provided
:return: output: # shape (batch_size, self.hidden_state_size)
hidden_state # shape (num_layers, batch_size, self.hidden_state_size)
(lower indices mean lower layers)
"""
batch_size, _ = inputs.size()
if hidden_state is None:
hidden_state = torch.zeros((self.num_rnn_layers, batch_size, self.hidden_state_size),
device=inputs.device)
hidden_states = []
output = inputs
for layer_num, dcgru_layer in enumerate(self.dcgru_layers):
next_hidden_state = dcgru_layer(output, hidden_state[layer_num])
hidden_states.append(next_hidden_state)
output = next_hidden_state
return output, torch.stack(hidden_states) # runs in O(num_layers) so not too slow
class DecoderModel(nn.Module, Seq2SeqAttrs):
def __init__(self, args, adj_mx):
# super().__init__(is_training, adj_mx, **model_kwargs)
nn.Module.__init__(self)
Seq2SeqAttrs.__init__(self, args, adj_mx)
self.output_dim = args.get('output_dim', 1)
self.horizon = args.get('horizon', 1) # for the decoder
self.projection_layer = nn.Linear(self.rnn_units, self.output_dim)
self.dcgru_layers = nn.ModuleList(
[DCGRUCell(self.rnn_units, adj_mx, self.max_diffusion_step, self.num_nodes,
filter_type=self.filter_type) for _ in range(self.num_rnn_layers)])
def forward(self, inputs, hidden_state=None):
"""
Decoder forward pass.
:param inputs: shape (batch_size, self.num_nodes * self.output_dim)
:param hidden_state: (num_layers, batch_size, self.hidden_state_size)
optional, zeros if not provided
:return: output: # shape (batch_size, self.num_nodes * self.output_dim)
hidden_state # shape (num_layers, batch_size, self.hidden_state_size)
(lower indices mean lower layers)
"""
hidden_states = []
output = inputs
for layer_num, dcgru_layer in enumerate(self.dcgru_layers):
next_hidden_state = dcgru_layer(output, hidden_state[layer_num])
hidden_states.append(next_hidden_state)
output = next_hidden_state
projected = self.projection_layer(output.view(-1, self.rnn_units))
output = projected.view(-1, self.num_nodes * self.output_dim)
return output, torch.stack(hidden_states)
class DCRNNModel(nn.Module, Seq2SeqAttrs):
def __init__(self, args):
super().__init__()
adj_mx = get_adj(args)
Seq2SeqAttrs.__init__(self, args, adj_mx)
self.encoder_model = EncoderModel(args, adj_mx)
self.decoder_model = DecoderModel(args, adj_mx)
self.cl_decay_steps = args.get('cl_decay_steps', 1000)
self.use_curriculum_learning = args.get('use_curriculum_learning', False)
self.batch_seen = 0
def _compute_sampling_threshold(self, batches_seen):
return self.cl_decay_steps / (
self.cl_decay_steps + np.exp(batches_seen / self.cl_decay_steps))
def encoder(self, inputs):
"""
encoder forward pass on t time steps
:param inputs: shape (seq_len, batch_size, num_sensor * input_dim)
:return: encoder_hidden_state: (num_layers, batch_size, self.hidden_state_size)
"""
encoder_hidden_state = None
for t in range(self.encoder_model.seq_len):
_, encoder_hidden_state = self.encoder_model(inputs[t], encoder_hidden_state)
return encoder_hidden_state
def decoder(self, encoder_hidden_state, labels=None, batches_seen=None):
"""
Decoder forward pass
:param encoder_hidden_state: (num_layers, batch_size, self.hidden_state_size)
:param labels: (self.horizon, batch_size, self.num_nodes * self.output_dim) [optional, not exist for inference]
:param batches_seen: global step [optional, not exist for inference]
:return: output: (self.horizon, batch_size, self.num_nodes * self.output_dim)
"""
batch_size = encoder_hidden_state.size(1)
go_symbol = torch.zeros((batch_size, self.num_nodes * self.decoder_model.output_dim),
device=encoder_hidden_state.device)
decoder_hidden_state = encoder_hidden_state
decoder_input = go_symbol
outputs = []
for t in range(self.decoder_model.horizon):
decoder_output, decoder_hidden_state = self.decoder_model(decoder_input,
decoder_hidden_state)
decoder_input = decoder_output
outputs.append(decoder_output)
if self.training and self.use_curriculum_learning:
c = np.random.uniform(0, 1)
if c < self._compute_sampling_threshold(batches_seen):
decoder_input = labels[t]
outputs = torch.stack(outputs)
return outputs
def forward(self, inputs, labels=None):
"""
seq2seq forward pass 64 12 307 3
:param inputs: shape (seq_len, batch_size, num_sensor * input_dim) 12 64 307 * 1
:param labels: shape (horizon, batch_size, num_sensor * output) 12 64 307 1
:param batches_seen: batches seen till now
:return: output: (self.horizon, batch_size, self.num_nodes * self.output_dim)
"""
inputs = inputs[..., 0].permute(1, 0, 2)
labels = labels[..., 0].permute(1, 0, 2)
encoder_hidden_state = self.encoder(inputs)
outputs = self.decoder(encoder_hidden_state, labels, batches_seen=self.batch_seen)
self.batch_seen += 1
outputs = outputs.unsqueeze(dim=-1) # [12,64,307,1]
outputs = outputs.permute(1, 0, 2, 3) # [64,12,307,1]
return outputs