207 lines
7.8 KiB
Python
Executable File
207 lines
7.8 KiB
Python
Executable File
import numpy as np
|
|
import torch
|
|
import torch.nn as nn
|
|
|
|
from model.DCRNN.dcrnn_cell import DCGRUCell
|
|
from data.get_adj import get_adj
|
|
|
|
|
|
class Seq2SeqAttrs:
|
|
def __init__(self, args, adj_mx):
|
|
self.adj_mx = adj_mx
|
|
self.max_diffusion_step = args.get("max_diffusion_step", 2)
|
|
self.cl_decay_steps = args.get("cl_decay_steps", 1000)
|
|
self.filter_type = args.get("filter_type", "laplacian")
|
|
self.num_nodes = args.get("num_nodes", 1)
|
|
self.num_rnn_layers = args.get("num_rnn_layers", 1)
|
|
self.rnn_units = args.get("rnn_units")
|
|
self.input_dim = args.get("input_dim", 1)
|
|
self.output_dim = args.get("output_dim", 1)
|
|
self.horizon = args.get("horizon", 12)
|
|
self.seq_len = args.get("seq_len", 12)
|
|
self.hidden_state_size = self.num_nodes * self.rnn_units
|
|
|
|
|
|
class EncoderModel(nn.Module, Seq2SeqAttrs):
|
|
def __init__(self, args, adj_mx):
|
|
nn.Module.__init__(self)
|
|
Seq2SeqAttrs.__init__(self, args, adj_mx)
|
|
self.input_dim = args.get("input_dim", 1)
|
|
self.seq_len = args.get("seq_len") # for the encoder
|
|
self.dcgru_layers = nn.ModuleList(
|
|
[
|
|
DCGRUCell(
|
|
self.rnn_units,
|
|
adj_mx,
|
|
self.max_diffusion_step,
|
|
self.num_nodes,
|
|
input_dim=self.input_dim,
|
|
filter_type=self.filter_type,
|
|
)
|
|
for _ in range(self.num_rnn_layers)
|
|
]
|
|
)
|
|
|
|
def forward(self, inputs, hidden_state=None):
|
|
"""
|
|
Encoder forward pass.
|
|
|
|
:param inputs: shape (batch_size, self.num_nodes * self.input_dim)
|
|
:param hidden_state: (num_layers, batch_size, self.hidden_state_size)
|
|
optional, zeros if not provided
|
|
:return: output: # shape (batch_size, self.hidden_state_size)
|
|
hidden_state # shape (num_layers, batch_size, self.hidden_state_size)
|
|
(lower indices mean lower layers)
|
|
"""
|
|
batch_size, _ = inputs.size()
|
|
if hidden_state is None:
|
|
hidden_state = torch.zeros(
|
|
(self.num_rnn_layers, batch_size, self.hidden_state_size),
|
|
device=inputs.device,
|
|
)
|
|
hidden_states = []
|
|
output = inputs
|
|
for layer_num, dcgru_layer in enumerate(self.dcgru_layers):
|
|
next_hidden_state = dcgru_layer(output, hidden_state[layer_num])
|
|
hidden_states.append(next_hidden_state)
|
|
output = next_hidden_state
|
|
|
|
return output, torch.stack(
|
|
hidden_states
|
|
) # runs in O(num_layers) so not too slow
|
|
|
|
|
|
class DecoderModel(nn.Module, Seq2SeqAttrs):
|
|
def __init__(self, args, adj_mx):
|
|
# super().__init__(is_training, adj_mx, **model_kwargs)
|
|
nn.Module.__init__(self)
|
|
Seq2SeqAttrs.__init__(self, args, adj_mx)
|
|
self.output_dim = args.get("output_dim", 1)
|
|
self.horizon = args.get("horizon", 1) # for the decoder
|
|
self.projection_layer = nn.Linear(self.rnn_units, self.output_dim)
|
|
self.dcgru_layers = nn.ModuleList(
|
|
[
|
|
DCGRUCell(
|
|
self.rnn_units,
|
|
adj_mx,
|
|
self.max_diffusion_step,
|
|
self.num_nodes,
|
|
input_dim=self.output_dim,
|
|
filter_type=self.filter_type,
|
|
)
|
|
for _ in range(self.num_rnn_layers)
|
|
]
|
|
)
|
|
|
|
def forward(self, inputs, hidden_state=None):
|
|
"""
|
|
Decoder forward pass.
|
|
|
|
:param inputs: shape (batch_size, self.num_nodes * self.output_dim)
|
|
:param hidden_state: (num_layers, batch_size, self.hidden_state_size)
|
|
optional, zeros if not provided
|
|
:return: output: # shape (batch_size, self.num_nodes * self.output_dim)
|
|
hidden_state # shape (num_layers, batch_size, self.hidden_state_size)
|
|
(lower indices mean lower layers)
|
|
"""
|
|
hidden_states = []
|
|
output = inputs
|
|
for layer_num, dcgru_layer in enumerate(self.dcgru_layers):
|
|
next_hidden_state = dcgru_layer(output, hidden_state[layer_num])
|
|
hidden_states.append(next_hidden_state)
|
|
output = next_hidden_state
|
|
|
|
projected = self.projection_layer(output.view(-1, self.rnn_units))
|
|
output = projected.view(-1, self.num_nodes * self.output_dim)
|
|
|
|
return output, torch.stack(hidden_states)
|
|
|
|
|
|
class DCRNNModel(nn.Module, Seq2SeqAttrs):
|
|
def __init__(self, args):
|
|
super().__init__()
|
|
adj_mx = get_adj(args)
|
|
Seq2SeqAttrs.__init__(self, args, adj_mx)
|
|
self.encoder_model = EncoderModel(args, adj_mx)
|
|
self.decoder_model = DecoderModel(args, adj_mx)
|
|
self.cl_decay_steps = args.get("cl_decay_steps", 1000)
|
|
self.use_curriculum_learning = args.get("use_curriculum_learning", False)
|
|
self.batch_seen = 0
|
|
|
|
def _compute_sampling_threshold(self, batches_seen):
|
|
return self.cl_decay_steps / (
|
|
self.cl_decay_steps + np.exp(batches_seen / self.cl_decay_steps)
|
|
)
|
|
|
|
def encoder(self, inputs):
|
|
"""
|
|
encoder forward pass on t time steps
|
|
:param inputs: shape (seq_len, batch_size, num_sensor * input_dim)
|
|
:return: encoder_hidden_state: (num_layers, batch_size, self.hidden_state_size)
|
|
"""
|
|
encoder_hidden_state = None
|
|
for t in range(self.encoder_model.seq_len):
|
|
_, encoder_hidden_state = self.encoder_model(
|
|
inputs[t], encoder_hidden_state
|
|
)
|
|
|
|
return encoder_hidden_state
|
|
|
|
def decoder(self, encoder_hidden_state, labels=None, batches_seen=None):
|
|
"""
|
|
Decoder forward pass
|
|
:param encoder_hidden_state: (num_layers, batch_size, self.hidden_state_size)
|
|
:param labels: (self.horizon, batch_size, self.num_nodes * self.output_dim) [optional, not exist for inference]
|
|
:param batches_seen: global step [optional, not exist for inference]
|
|
:return: output: (self.horizon, batch_size, self.num_nodes * self.output_dim)
|
|
"""
|
|
batch_size = encoder_hidden_state.size(1)
|
|
go_symbol = torch.zeros(
|
|
(batch_size, self.num_nodes * self.decoder_model.output_dim),
|
|
device=encoder_hidden_state.device,
|
|
)
|
|
decoder_hidden_state = encoder_hidden_state
|
|
decoder_input = go_symbol
|
|
|
|
outputs = []
|
|
|
|
for t in range(self.decoder_model.horizon):
|
|
decoder_output, decoder_hidden_state = self.decoder_model(
|
|
decoder_input, decoder_hidden_state
|
|
)
|
|
decoder_input = decoder_output
|
|
outputs.append(decoder_output)
|
|
if self.training and self.use_curriculum_learning:
|
|
c = np.random.uniform(0, 1)
|
|
if c < self._compute_sampling_threshold(batches_seen):
|
|
decoder_input = labels[t]
|
|
outputs = torch.stack(outputs)
|
|
return outputs
|
|
|
|
def forward(self, inputs, labels=None):
|
|
"""
|
|
seq2seq forward pass. inputs: [B, T, N, C]
|
|
"""
|
|
x = inputs[..., : self.input_dim]
|
|
x = (
|
|
x.permute(1, 0, 2, 3)
|
|
.contiguous()
|
|
.view(self.seq_len, -1, self.num_nodes * self.input_dim)
|
|
)
|
|
|
|
y = None
|
|
if labels is not None:
|
|
y = labels[..., : self.output_dim]
|
|
y = (
|
|
y.permute(1, 0, 2, 3)
|
|
.contiguous()
|
|
.view(self.horizon, -1, self.num_nodes * self.output_dim)
|
|
)
|
|
|
|
encoder_hidden_state = self.encoder(x)
|
|
outputs = self.decoder(encoder_hidden_state, y, batches_seen=self.batch_seen)
|
|
self.batch_seen += 1
|
|
outputs = outputs.view(self.horizon, -1, self.num_nodes, self.output_dim)
|
|
outputs = outputs.permute(1, 0, 2, 3).contiguous()
|
|
return outputs
|