import numpy as np import torch import torch.nn as nn from model.DCRNN.dcrnn_cell import DCGRUCell from utils.get_adj import get_adj class Seq2SeqAttrs: def __init__(self, args, adj_mx): self.adj_mx = adj_mx self.max_diffusion_step = args.get("max_diffusion_step", 2) self.cl_decay_steps = args.get("cl_decay_steps", 1000) self.filter_type = args.get("filter_type", "laplacian") self.num_nodes = args.get("num_nodes", 1) self.num_rnn_layers = args.get("num_rnn_layers", 1) self.rnn_units = args.get("rnn_units") self.input_dim = args.get("input_dim", 1) self.output_dim = args.get("output_dim", 1) self.horizon = args.get("horizon", 12) self.seq_len = args.get("seq_len", 12) self.hidden_state_size = self.num_nodes * self.rnn_units class EncoderModel(nn.Module, Seq2SeqAttrs): def __init__(self, args, adj_mx): nn.Module.__init__(self) Seq2SeqAttrs.__init__(self, args, adj_mx) self.input_dim = args.get("input_dim", 1) self.seq_len = args.get("seq_len") # for the encoder self.dcgru_layers = nn.ModuleList( [ DCGRUCell( self.rnn_units, adj_mx, self.max_diffusion_step, self.num_nodes, input_dim=self.input_dim, filter_type=self.filter_type, ) for _ in range(self.num_rnn_layers) ] ) def forward(self, inputs, hidden_state=None): """ Encoder forward pass. :param inputs: shape (batch_size, self.num_nodes * self.input_dim) :param hidden_state: (num_layers, batch_size, self.hidden_state_size) optional, zeros if not provided :return: output: # shape (batch_size, self.hidden_state_size) hidden_state # shape (num_layers, batch_size, self.hidden_state_size) (lower indices mean lower layers) """ batch_size, _ = inputs.size() if hidden_state is None: hidden_state = torch.zeros( (self.num_rnn_layers, batch_size, self.hidden_state_size), device=inputs.device, ) hidden_states = [] output = inputs for layer_num, dcgru_layer in enumerate(self.dcgru_layers): next_hidden_state = dcgru_layer(output, hidden_state[layer_num]) hidden_states.append(next_hidden_state) output = next_hidden_state return output, torch.stack( hidden_states ) # runs in O(num_layers) so not too slow class DecoderModel(nn.Module, Seq2SeqAttrs): def __init__(self, args, adj_mx): # super().__init__(is_training, adj_mx, **model_kwargs) nn.Module.__init__(self) Seq2SeqAttrs.__init__(self, args, adj_mx) self.output_dim = args.get("output_dim", 1) self.horizon = args.get("horizon", 1) # for the decoder self.projection_layer = nn.Linear(self.rnn_units, self.output_dim) self.dcgru_layers = nn.ModuleList( [ DCGRUCell( self.rnn_units, adj_mx, self.max_diffusion_step, self.num_nodes, input_dim=self.output_dim, filter_type=self.filter_type, ) for _ in range(self.num_rnn_layers) ] ) def forward(self, inputs, hidden_state=None): """ Decoder forward pass. :param inputs: shape (batch_size, self.num_nodes * self.output_dim) :param hidden_state: (num_layers, batch_size, self.hidden_state_size) optional, zeros if not provided :return: output: # shape (batch_size, self.num_nodes * self.output_dim) hidden_state # shape (num_layers, batch_size, self.hidden_state_size) (lower indices mean lower layers) """ hidden_states = [] output = inputs for layer_num, dcgru_layer in enumerate(self.dcgru_layers): next_hidden_state = dcgru_layer(output, hidden_state[layer_num]) hidden_states.append(next_hidden_state) output = next_hidden_state projected = self.projection_layer(output.view(-1, self.rnn_units)) output = projected.view(-1, self.num_nodes * self.output_dim) return output, torch.stack(hidden_states) class DCRNNModel(nn.Module, Seq2SeqAttrs): def __init__(self, args): super().__init__() adj_mx = get_adj(args) Seq2SeqAttrs.__init__(self, args, adj_mx) self.encoder_model = EncoderModel(args, adj_mx) self.decoder_model = DecoderModel(args, adj_mx) self.cl_decay_steps = args.get("cl_decay_steps", 1000) self.use_curriculum_learning = args.get("use_curriculum_learning", False) self.batch_seen = 0 def _compute_sampling_threshold(self, batches_seen): return self.cl_decay_steps / ( self.cl_decay_steps + np.exp(batches_seen / self.cl_decay_steps) ) def encoder(self, inputs): """ encoder forward pass on t time steps :param inputs: shape (seq_len, batch_size, num_sensor * input_dim) :return: encoder_hidden_state: (num_layers, batch_size, self.hidden_state_size) """ encoder_hidden_state = None for t in range(self.encoder_model.seq_len): _, encoder_hidden_state = self.encoder_model( inputs[t], encoder_hidden_state ) return encoder_hidden_state def decoder(self, encoder_hidden_state, labels=None, batches_seen=None): """ Decoder forward pass :param encoder_hidden_state: (num_layers, batch_size, self.hidden_state_size) :param labels: (self.horizon, batch_size, self.num_nodes * self.output_dim) [optional, not exist for inference] :param batches_seen: global step [optional, not exist for inference] :return: output: (self.horizon, batch_size, self.num_nodes * self.output_dim) """ batch_size = encoder_hidden_state.size(1) go_symbol = torch.zeros( (batch_size, self.num_nodes * self.decoder_model.output_dim), device=encoder_hidden_state.device, ) decoder_hidden_state = encoder_hidden_state decoder_input = go_symbol outputs = [] for t in range(self.decoder_model.horizon): decoder_output, decoder_hidden_state = self.decoder_model( decoder_input, decoder_hidden_state ) decoder_input = decoder_output outputs.append(decoder_output) if self.training and self.use_curriculum_learning: c = np.random.uniform(0, 1) if c < self._compute_sampling_threshold(batches_seen): decoder_input = labels[t] outputs = torch.stack(outputs) return outputs def forward(self, inputs, labels=None): """ seq2seq forward pass. inputs: [B, T, N, C] """ x = inputs[..., : self.input_dim] x = ( x.permute(1, 0, 2, 3) .contiguous() .view(self.seq_len, -1, self.num_nodes * self.input_dim) ) y = None if labels is not None: y = labels[..., : self.output_dim] y = ( y.permute(1, 0, 2, 3) .contiguous() .view(self.horizon, -1, self.num_nodes * self.output_dim) ) encoder_hidden_state = self.encoder(x) outputs = self.decoder(encoder_hidden_state, y, batches_seen=self.batch_seen) self.batch_seen += 1 outputs = outputs.view(self.horizon, -1, self.num_nodes, self.output_dim) outputs = outputs.permute(1, 0, 2, 3).contiguous() return outputs