from typing import Optional import torch from torch import Tensor from lib import utils class LayerParams: def __init__(self, rnn_network: torch.nn.RNN, type: str): self._rnn_network = rnn_network self._params_dict = {} self._biases_dict = {} self._type = type def get_weights(self, shape): if shape not in self._params_dict: nn_param = torch.nn.init.xavier_normal(torch.empty(*shape)) self._params_dict[shape] = nn_param self._rnn_network.register_parameter('{}_weight_{}'.format(self._type, str(shape)), nn_param) return self._params_dict[shape] def get_biases(self, length, bias_start=0.0): if length not in self._biases_dict: biases = torch.nn.init.constant(torch.empty(length), bias_start) self._biases_dict[length] = biases self._rnn_network.register_parameter('{}_biases_{}'.format(self._type, str(length)), biases) return self._biases_dict[length] class DCGRUCell(torch.nn.RNN): def __init__(self, num_units, adj_mx, max_diffusion_step, num_nodes, input_size: int, hidden_size: int, num_layers: int = 1, num_proj=None, nonlinearity='tanh', filter_type="laplacian", use_gc_for_ru=True): """ :param num_units: :param adj_mx: :param max_diffusion_step: :param num_nodes: :param input_size: :param num_proj: :param nonlinearity: :param filter_type: "laplacian", "random_walk", "dual_random_walk". :param use_gc_for_ru: whether to use Graph convolution to calculate the reset and update gates. """ super(DCGRUCell, self).__init__(input_size, hidden_size, bias=True, # bias param does not exist in tf code? num_layers=num_layers, nonlinearity=nonlinearity) self._activation = torch.tanh if nonlinearity == 'tanh' else torch.relu # support other nonlinearities up here? self._num_nodes = num_nodes self._num_proj = num_proj self._num_units = num_units self._max_diffusion_step = max_diffusion_step self._supports = [] self._use_gc_for_ru = use_gc_for_ru supports = [] if filter_type == "laplacian": supports.append(utils.calculate_scaled_laplacian(adj_mx, lambda_max=None)) elif filter_type == "random_walk": supports.append(utils.calculate_random_walk_matrix(adj_mx).T) elif filter_type == "dual_random_walk": supports.append(utils.calculate_random_walk_matrix(adj_mx).T) supports.append(utils.calculate_random_walk_matrix(adj_mx.T).T) else: supports.append(utils.calculate_scaled_laplacian(adj_mx)) for support in supports: self._supports.append(self._build_sparse_matrix(support)) self._proj_weights = torch.nn.Parameter(torch.randn(self._num_units, self._num_proj)) self._fc_params = LayerParams(self, 'fc') self._gconv_params = LayerParams(self, 'gconv') @property def state_size(self): return self._num_nodes * self._num_units @property def output_size(self): output_size = self._num_nodes * self._num_units if self._num_proj is not None: output_size = self._num_nodes * self._num_proj return output_size def forward(self, input: Tensor, hx: Optional[Tensor] = ...): """Gated recurrent unit (GRU) with Graph Convolution. :param input: (B, num_nodes * input_dim) :return - Output: A `2-D` tensor with shape `[batch_size x self.output_size]`. - New state: Either a single `2-D` tensor, or a tuple of tensors matching the arity and shapes of `state` """ output_size = 2 * self._num_units # We start with bias of 1.0 to not reset and not update. if self._use_gc_for_ru: fn = self._gconv else: fn = self._fc value = torch.sigmoid(fn(input, hx, output_size, bias_start=1.0)) value = torch.reshape(value, (-1, self._num_nodes, output_size)) r, u = torch.split(tensor=value, split_size_or_sections=2, dim=-1) r = torch.reshape(r, (-1, self._num_nodes * self._num_units)) u = torch.reshape(u, (-1, self._num_nodes * self._num_units)) c = self._gconv(input, r * hx, self._num_units) if self._activation is not None: c = self._activation(c) output = new_state = u * hx + (1 - u) * c if self._num_proj is not None: batch_size = input.shape[0] output = torch.reshape(new_state, shape=(-1, self._num_units)) output = torch.reshape(torch.matmul(output, self._proj_weights), shape=(batch_size, self.output_size)) return output, new_state @staticmethod def _concat(x, x_): x_ = x_.unsqueeze(0) return torch.cat([x, x_], dim=0) def _fc(self, inputs, state, output_size, bias_start=0.0): batch_size = inputs.shape[0] inputs = torch.reshape(inputs, (batch_size * self._num_nodes, -1)) state = torch.reshape(state, (batch_size * self._num_nodes, -1)) inputs_and_state = torch.cat([inputs, state], dim=-1) input_size = inputs_and_state.shape[-1] weights = self._fc_params.get_weights((input_size, output_size)) value = torch.sigmoid(torch.matmul(inputs_and_state, weights)) biases = self._fc_params.get_biases(output_size, bias_start) value += biases return value def _gconv(self, inputs, state, output_size, bias_start=0.0): """Graph convolution between input and the graph matrix. :param args: a 2D Tensor or a list of 2D, batch x n, Tensors. :param output_size: :param bias: :param bias_start: :return: """ # Reshape input and state to (batch_size, num_nodes, input_dim/state_dim) batch_size = inputs.shape[0] inputs = torch.reshape(inputs, (batch_size, self._num_nodes, -1)) state = torch.reshape(state, (batch_size, self._num_nodes, -1)) inputs_and_state = torch.cat([inputs, state], dim=2) input_size = inputs_and_state.shape[2].value dtype = inputs.dtype x = inputs_and_state x0 = x.permute(1, 2, 0) # (num_nodes, total_arg_size, batch_size) x0 = torch.reshape(x0, shape=[self._num_nodes, input_size * batch_size]) x = torch.unsqueeze(x0, 0) if self._max_diffusion_step == 0: pass else: for support in self._supports: # https://discuss.pytorch.org/t/sparse-x-dense-dense-matrix-multiplication/6116/7 x1 = torch.mm(support, x0) x = self._concat(x, x1) for k in range(2, self._max_diffusion_step + 1): x2 = 2 * torch.mm(support, x1) - x0 x = self._concat(x, x2) x1, x0 = x2, x1 num_matrices = len(self._supports) * self._max_diffusion_step + 1 # Adds for x itself. x = torch.reshape(x, shape=[num_matrices, self._num_nodes, input_size, batch_size]) x = x.permute(3, 1, 2, 0) # (batch_size, num_nodes, input_size, order) x = torch.reshape(x, shape=[batch_size * self._num_nodes, input_size * num_matrices]) weights = self._gconv_params.get_weights((input_size * num_matrices, output_size)) x = torch.matmul(x, weights) # (batch_size * self._num_nodes, output_size) biases = self._gconv_params.get_biases(output_size, bias_start) x += biases # Reshape res back to 2D: (batch_size, num_node, state_dim) -> (batch_size, num_node * state_dim) return torch.reshape(x, [batch_size, self._num_nodes * output_size])