Merge pull request #2 from czzhangheng/dev

Add subgraph function
This commit is contained in:
HengZhang 2024-11-28 11:53:50 +08:00 committed by GitHub
commit 267ee463ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 422 additions and 5 deletions

View File

@ -205,8 +205,12 @@ def get_model(model_config, local_data=None, backend='torch'):
from federatedscope.nlp.hetero_tasks.model import ATCModel from federatedscope.nlp.hetero_tasks.model import ATCModel
model = ATCModel(model_config) model = ATCModel(model_config)
elif model_config.type.lower() in ['feddgcn']: elif model_config.type.lower() in ['feddgcn']:
if model_config.use_minigraph is False:
from federatedscope.trafficflow.model.FedDGCN import FedDGCN from federatedscope.trafficflow.model.FedDGCN import FedDGCN
model = FedDGCN(model_config) model = FedDGCN(model_config)
else:
from federatedscope.trafficflow.model.FedDGCNv2 import FederatedFedDGCN
model = FederatedFedDGCN(model_config)
else: else:
raise ValueError('Model {} is not provided'.format(model_config.type)) raise ValueError('Model {} is not provided'.format(model_config.type))

View File

@ -62,6 +62,8 @@ def extend_model_cfg(cfg):
cfg.model.cheb_order = 1 # A tuple, e.g., (in_channel, h, w) cfg.model.cheb_order = 1 # A tuple, e.g., (in_channel, h, w)
cfg.model.use_day = True cfg.model.use_day = True
cfg.model.use_week = True cfg.model.use_week = True
cfg.model.minigraph_size = 5
cfg.model.use_minigraph = False
# ---------------------------------------------------------------------- # # ---------------------------------------------------------------------- #

View File

@ -30,6 +30,8 @@ def extend_trafficflow_cfg(cfg):
cfg.model.cheb_order = 1 # A tuple, e.g., (in_channel, h, w) cfg.model.cheb_order = 1 # A tuple, e.g., (in_channel, h, w)
cfg.model.use_day = True cfg.model.use_day = True
cfg.model.use_week = True cfg.model.use_week = True
cfg.model.minigraph_size = 5
cfg.model.use_minigraph = False
# ---------------------------------------------------------------------- # # ---------------------------------------------------------------------- #
# Criterion related options # Criterion related options

View File

@ -107,8 +107,13 @@ def load_dataset(config, client_cfgs=None):
modified_config = config modified_config = config
elif config.data.type.lower() in [ elif config.data.type.lower() in [
'trafficflow']: 'trafficflow']:
if config.model.use_minigraph is False:
from federatedscope.trafficflow.dataloader.traffic_dataloader import load_traffic_data from federatedscope.trafficflow.dataloader.traffic_dataloader import load_traffic_data
dataset, modified_config = load_traffic_data(config, client_cfgs) dataset, modified_config = load_traffic_data(config, client_cfgs)
else:
from federatedscope.trafficflow.dataloader.traffic_dataloader_v2 import load_traffic_data
dataset, modified_config = load_traffic_data(config, client_cfgs)
else: else:
raise ValueError('Dataset {} not found.'.format(config.data.type)) raise ValueError('Dataset {} not found.'.format(config.data.type))
return dataset, modified_config return dataset, modified_config

View File

@ -0,0 +1,227 @@
import numpy as np
import torch
import torch.utils.data
from federatedscope.trafficflow.dataset.add_window import add_window_horizon
from federatedscope.trafficflow.dataset.normalization import (
NScaler, MinMax01Scaler, MinMax11Scaler, StandardScaler, ColumnMinMaxScaler)
from federatedscope.trafficflow.dataset.traffic_dataset import load_st_dataset
def normalize_dataset(data, normalizer, column_wise=False):
if normalizer == 'max01':
if column_wise:
minimum = data.min(axis=0, keepdims=True)
maximum = data.max(axis=0, keepdims=True)
else:
minimum = data.min()
maximum = data.max()
scaler = MinMax01Scaler(minimum, maximum)
data = scaler.transform(data)
print('Normalize the dataset by MinMax01 Normalization')
elif normalizer == 'max11':
if column_wise:
minimum = data.min(axis=0, keepdims=True)
maximum = data.max(axis=0, keepdims=True)
else:
minimum = data.min()
maximum = data.max()
scaler = MinMax11Scaler(minimum, maximum)
data = scaler.transform(data)
print('Normalize the dataset by MinMax11 Normalization')
elif normalizer == 'std':
if column_wise:
mean = data.mean(axis=0, keepdims=True)
std = data.std(axis=0, keepdims=True)
else:
mean = data.mean()
std = data.std()
scaler = StandardScaler(mean, std)
# data = scaler.transform(data)
print('Normalize the dataset by Standard Normalization')
elif normalizer == 'None':
scaler = NScaler()
data = scaler.transform(data)
print('Does not normalize the dataset')
elif normalizer == 'cmax':
#column min max, to be depressed
#note: axis must be the spatial dimension, please check !
scaler = ColumnMinMaxScaler(data.min(axis=0), data.max(axis=0))
data = scaler.transform(data)
print('Normalize the dataset by Column Min-Max Normalization')
else:
raise ValueError
return scaler
def split_data_by_days(data, val_days, test_days, interval=30):
"""
:param data: [B, *]
:param val_days:
:param test_days:
:param interval: interval (15, 30, 60) minutes
:return:
"""
t = int((24 * 60) / interval)
x = -t * int(test_days)
test_data = data[-t * int(test_days):]
val_data = data[-t * int(test_days + val_days): -t * int(test_days)]
train_data = data[:-t * int(test_days + val_days)]
return train_data, val_data, test_data
def split_data_by_ratio(data, val_ratio, test_ratio):
data_len = data.shape[0]
test_data = data[-int(data_len * test_ratio):]
val_data = data[-int(data_len * (test_ratio + val_ratio)):-int(data_len * test_ratio)]
train_data = data[:-int(data_len * (test_ratio + val_ratio))]
return train_data, val_data, test_data
def data_loader(X, Y, batch_size, shuffle=True, drop_last=True):
cuda = True if torch.cuda.is_available() else False
TensorFloat = torch.cuda.FloatTensor if cuda else torch.FloatTensor
X, Y = TensorFloat(X), TensorFloat(Y)
data = torch.utils.data.TensorDataset(X, Y)
dataloader = torch.utils.data.DataLoader(data, batch_size=batch_size,
shuffle=shuffle, drop_last=drop_last)
return dataloader
def load_traffic_data(config, client_cfgs):
root = config.data.root
dataName = 'PEMSD' + root[-1]
raw_data = load_st_dataset(dataName)
l, n, f = raw_data.shape
feature_list = [raw_data]
# numerical time_in_day
time_ind = [i % config.data.steps_per_day / config.data.steps_per_day for i in range(raw_data.shape[0])]
time_ind = np.array(time_ind)
time_in_day = np.tile(time_ind, [1, n, 1]).transpose((2, 1, 0))
feature_list.append(time_in_day)
# numerical day_in_week
day_in_week = [(i // config.data.steps_per_day) % config.data.days_per_week for i in range(raw_data.shape[0])]
day_in_week = np.array(day_in_week)
day_in_week = np.tile(day_in_week, [1, n, 1]).transpose((2, 1, 0))
feature_list.append(day_in_week)
# data = np.concatenate(feature_list, axis=-1)
single = False
x, y = add_window_horizon(raw_data, config.data.lag, config.data.horizon, single)
x_day, y_day = add_window_horizon(time_in_day, config.data.lag, config.data.horizon, single)
x_week, y_week = add_window_horizon(day_in_week, config.data.lag, config.data.horizon, single)
x, y = np.concatenate([x, x_day, x_week], axis=-1), np.concatenate([y, y_day, y_week], axis=-1)
# split dataset by days or by ratio
if config.data.test_ratio > 1:
x_train, x_val, x_test = split_data_by_days(x, config.data.val_ratio, config.data.test_ratio)
y_train, y_val, y_test = split_data_by_days(y, config.data.val_ratio, config.data.test_ratio)
else:
x_train, x_val, x_test = split_data_by_ratio(x, config.data.val_ratio, config.data.test_ratio)
y_train, y_val, y_test = split_data_by_ratio(y, config.data.val_ratio, config.data.test_ratio)
# normalize st data
normalizer = 'std'
scaler = normalize_dataset(x_train[..., :config.model.input_dim], normalizer, config.data.column_wise)
config.data.scaler = [float(scaler.mean), float(scaler.std)]
x_train[..., :config.model.input_dim] = scaler.transform(x_train[..., :config.model.input_dim])
x_val[..., :config.model.input_dim] = scaler.transform(x_val[..., :config.model.input_dim])
x_test[..., :config.model.input_dim] = scaler.transform(x_test[..., :config.model.input_dim])
# Client-side dataset splitting
node_num = config.data.num_nodes
client_num = config.federate.client_num
per_samples = node_num // client_num
data_list, cur_index = dict(), 0
input_dim, output_dim = config.model.input_dim, config.model.output_dim
for i in range(client_num):
if cur_index + per_samples <= node_num:
# Normal slicing
sub_array_train = x_train[:, :, cur_index:cur_index + per_samples, :]
sub_array_val = x_val[:, :, cur_index:cur_index + per_samples, :]
sub_array_test = x_test[:, :, cur_index:cur_index + per_samples, :]
sub_y_train = y_train[:, :, cur_index:cur_index + per_samples, :output_dim]
sub_y_val = y_val[:, :, cur_index:cur_index + per_samples, :output_dim]
sub_y_test = y_test[:, :, cur_index:cur_index + per_samples, :output_dim]
else:
# If there are not enough nodes to fill per_samples, pad with zero columns
sub_array_train = x_train[:, :, cur_index:cur_index + per_samples, :]
sub_array_val = x_val[:, :, cur_index:cur_index + per_samples, :]
sub_array_test = x_test[:, :, cur_index:cur_index + per_samples, :]
padding = np.zeros((x_train.shape[0], config.data.lag ,config.data.lag, per_samples - x_train.shape[1], config.model.output_dim))
sub_array_train = np.concatenate((sub_array_train, padding), axis=2)
sub_array_val = np.concatenate((sub_array_val, padding), axis=2)
sub_array_test = np.concatenate((sub_array_test, padding), axis=2)
sub_y_train = y_train[:, :, cur_index:cur_index + per_samples, :]
sub_y_val = y_val[:, :, cur_index:cur_index + per_samples, :]
sub_y_test = y_test[:, :, cur_index:cur_index + per_samples, :]
sub_y_train = np.concatenate((sub_y_train, padding), axis=2)
sub_y_val = np.concatenate((sub_y_val, padding), axis=2)
sub_y_test = np.concatenate((sub_y_test, padding), axis=2)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
minigraph_size = config.model.minigraph_size
data_list[i + 1] = {
'train': torch.utils.data.TensorDataset(
torch.tensor(split_into_mini_graphs(sub_array_train, minigraph_size), dtype=torch.float, device=device),
torch.tensor(split_into_mini_graphs(sub_y_train, minigraph_size), dtype=torch.float, device=device)
),
'val': torch.utils.data.TensorDataset(
torch.tensor(split_into_mini_graphs(sub_array_val, minigraph_size), dtype=torch.float, device=device),
torch.tensor(split_into_mini_graphs(sub_y_val, minigraph_size), dtype=torch.float, device=device)
),
'test': torch.utils.data.TensorDataset(
torch.tensor(split_into_mini_graphs(sub_array_test, minigraph_size), dtype=torch.float, device=device),
torch.tensor(split_into_mini_graphs(sub_y_test, minigraph_size), dtype=torch.float, device=device)
)
}
cur_index += per_samples
config.model.num_nodes = per_samples
return data_list, config
def split_into_mini_graphs(tensor, graph_size, dummy_value=0):
"""
Splits a tensor into mini-graphs of specified size. Pads the last mini-graph with dummy nodes if necessary.
Args:
tensor (np.ndarray): Input tensor with shape (timestep, horizon, node_num, dim).
graph_size (int): The size of each mini-graph.
dummy_value (float, optional): The value to use for dummy nodes. Default is 0.
Returns:
np.ndarray: Output tensor with shape (timestep, horizon, graph_num, graph_size, dim).
"""
timestep, horizon, node_num, dim = tensor.shape
# Calculate the number of mini-graphs
graph_num = (node_num + graph_size - 1) // graph_size # Round up division
# Initialize output tensor with dummy values
output = np.full((timestep, horizon, graph_num, graph_size, dim), dummy_value, dtype=tensor.dtype)
# Fill in the real data
for i in range(graph_num):
start_idx = i * graph_size
end_idx = min(start_idx + graph_size, node_num) # Ensure we don't exceed the node number
slice_size = end_idx - start_idx
# Assign the data to the corresponding mini-graph
output[:, :, i, :slice_size, :] = tensor[:, :, start_idx:end_idx, :]
return output
if __name__ == '__main__':
a = 'data/trafficflow/PeMS04'
name = 'PEMSD' + a[-1]
raw_data = load_st_dataset(name)
pass

View File

@ -0,0 +1,169 @@
from torch.nn import ModuleList
import torch
import torch.nn as nn
from federatedscope.trafficflow.model.DGCRUCell import DGCRUCell
import time
class DGCRM(nn.Module):
def __init__(self, node_num, dim_in, dim_out, cheb_k, embed_dim, num_layers=1):
super(DGCRM, self).__init__()
assert num_layers >= 1, 'At least one DCRNN layer in the Encoder.'
self.node_num = node_num
self.input_dim = dim_in
self.num_layers = num_layers
self.DGCRM_cells = nn.ModuleList()
self.DGCRM_cells.append(DGCRUCell(node_num, dim_in, dim_out, cheb_k, embed_dim))
for _ in range(1, num_layers):
self.DGCRM_cells.append(DGCRUCell(node_num, dim_out, dim_out, cheb_k, embed_dim))
def forward(self, x, init_state, node_embeddings):
assert x.shape[2] == self.node_num and x.shape[3] == self.input_dim
seq_length = x.shape[1]
current_inputs = x
output_hidden = []
for i in range(self.num_layers):
state = init_state[i]
inner_states = []
for t in range(seq_length):
state = self.DGCRM_cells[i](current_inputs[:, t, :, :], state, [node_embeddings[0][:, t, :, :], node_embeddings[1]])
inner_states.append(state)
output_hidden.append(state)
current_inputs = torch.stack(inner_states, dim=1)
return current_inputs, output_hidden
def init_hidden(self, batch_size):
init_states = []
for i in range(self.num_layers):
init_states.append(self.DGCRM_cells[i].init_hidden_state(batch_size))
return torch.stack(init_states, dim=0) #(num_layers, B, N, hidden_dim)
# Build you torch or tf model class here
class FedDGCN(nn.Module):
def __init__(self, args):
super(FedDGCN, self).__init__()
# print("You are in subminigraph")
self.num_node = args.minigraph_size
self.input_dim = args.input_dim
self.hidden_dim = args.rnn_units
self.output_dim = args.output_dim
self.horizon = args.horizon
self.num_layers = args.num_layers
self.use_D = args.use_day
self.use_W = args.use_week
self.dropout1 = nn.Dropout(p=args.dropout) # 0.1
self.dropout2 = nn.Dropout(p=args.dropout)
self.node_embeddings1 = nn.Parameter(torch.randn(self.num_node, args.embed_dim), requires_grad=True)
self.node_embeddings2 = nn.Parameter(torch.randn(self.num_node, args.embed_dim), requires_grad=True)
self.T_i_D_emb = nn.Parameter(torch.empty(288, args.embed_dim))
self.D_i_W_emb = nn.Parameter(torch.empty(7, args.embed_dim))
# Initialize parameters
nn.init.xavier_uniform_(self.node_embeddings1)
nn.init.xavier_uniform_(self.T_i_D_emb)
nn.init.xavier_uniform_(self.D_i_W_emb)
self.encoder1 = DGCRM(args.minigraph_size, args.input_dim, args.rnn_units, args.cheb_order,
args.embed_dim, args.num_layers)
self.encoder2 = DGCRM(args.minigraph_size, args.input_dim, args.rnn_units, args.cheb_order,
args.embed_dim, args.num_layers)
# predictor
self.end_conv1 = nn.Conv2d(1, args.horizon * self.output_dim, kernel_size=(1, self.hidden_dim), bias=True)
self.end_conv2 = nn.Conv2d(1, args.horizon * self.output_dim, kernel_size=(1, self.hidden_dim), bias=True)
self.end_conv3 = nn.Conv2d(1, args.horizon * self.output_dim, kernel_size=(1, self.hidden_dim), bias=True)
def forward(self, source):
node_embedding1 = self.node_embeddings1
if self.use_D:
t_i_d_data = source[..., 1]
T_i_D_emb = self.T_i_D_emb[(t_i_d_data * 288).type(torch.LongTensor)]
node_embedding1 = torch.mul(node_embedding1, T_i_D_emb)
if self.use_W:
d_i_w_data = source[..., 2]
D_i_W_emb = self.D_i_W_emb[(d_i_w_data).type(torch.LongTensor)]
node_embedding1 = torch.mul(node_embedding1, D_i_W_emb)
node_embeddings=[node_embedding1,self.node_embeddings1]
source = source[..., 0].unsqueeze(-1)
init_state1 = self.encoder1.init_hidden(source.shape[0])
output, _ = self.encoder1(source, init_state1, node_embeddings)
output = self.dropout1(output[:, -1:, :, :])
output1 = self.end_conv1(output)
source1 = self.end_conv2(output)
source2 = source - source1
init_state2 = self.encoder2.init_hidden(source2.shape[0])
output2, _ = self.encoder2(source2, init_state2, node_embeddings)
output2 = self.dropout2(output2[:, -1:, :, :])
output2 = self.end_conv3(output2)
return output1 + output2
class FederatedFedDGCN(nn.Module):
def __init__(self, args):
super(FederatedFedDGCN, self).__init__()
# Initializing with None, we will populate model_list during the forward pass
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model_list = None
self.graph_num = (args.num_nodes + args.minigraph_size - 1) // args.minigraph_size
self.args = args
self.model_list = ModuleList(FedDGCN(self.args).to(self.device) for _ in range(self.graph_num))
def forward(self, source):
"""
Forward pass for the federated model. Each subgraph processes its portion of the data,
and then the results are aggregated.
Arguments:
- source: Tensor of shape (batchsize, horizon, subgraph_num, subgraph_size, dims)
Returns:
- Aggregated output (batchsize, horizon, subgraph_num, subgraph_size, dims)
"""
self.subgraph_num = source.shape[2]
# Initialize a list to store the outputs of each subgraph model
subgraph_outputs = []
# Iterate through the subgraph models
# Parallel computation has not been realized yet, so it may slower than normal.
for i in range(self.subgraph_num):
# Extract the subgraph-specific data
subgraph_data = source[:, :, i, :, :] # (batchsize, horizon, subgraph_size, dims)
# Forward pass for each subgraph model
subgraph_output = self.model_list[i](subgraph_data)
subgraph_outputs.append(subgraph_output)
# Reshape the outputs into (batchsize, horizon, subgraph_num, subgraph_size, dims)
output_tensor = torch.stack(subgraph_outputs, dim=2) # (batchsize, horizon, subgraph_num, subgraph_size, dims)
self.local_aggregate()
return output_tensor
def local_aggregate(self):
"""
Update the parameters of each model in model_list to the average of all models' parameters.
"""
with torch.no_grad(): # Ensure no gradients are calculated during the update
# Iterate over each model in model_list
for i, model in enumerate(self.model_list):
# Iterate over each model's parameters
for name, param in model.named_parameters():
# Initialize a container for the average value
avg_param = torch.zeros_like(param)
# Accumulate the corresponding parameters from all other models
for other_model in self.model_list:
avg_param += other_model.state_dict()[name]
# Calculate the average
avg_param /= len(self.model_list)
# Update the current model's parameter
param.data.copy_(avg_param)

View File

@ -42,6 +42,8 @@ model:
cheb_order: 2 cheb_order: 2
use_day: True use_day: True
use_week: True use_week: True
use_minigraph: False
minigraph_size: 10
train: train:
batch_or_epoch: 'epoch' batch_or_epoch: 'epoch'
local_update_steps: 1 local_update_steps: 1

View File

@ -44,6 +44,8 @@ model:
cheb_order: 2 cheb_order: 2
use_day: True use_day: True
use_week: True use_week: True
use_minigraph: False
minigraph_size: 10
train: train:
batch_or_epoch: 'epoch' batch_or_epoch: 'epoch'
local_update_steps: 1 local_update_steps: 1

View File

@ -42,6 +42,8 @@ model:
cheb_order: 2 cheb_order: 2
use_day: True use_day: True
use_week: True use_week: True
use_minigraph: False
minigraph_size: 10
train: train:
batch_or_epoch: 'epoch' batch_or_epoch: 'epoch'
local_update_steps: 1 local_update_steps: 1

View File

@ -42,6 +42,8 @@ model:
cheb_order: 2 cheb_order: 2
use_day: True use_day: True
use_week: True use_week: True
use_minigraph: False
minigraph_size: 10
train: train:
batch_or_epoch: 'epoch' batch_or_epoch: 'epoch'
local_update_steps: 1 local_update_steps: 1
@ -60,7 +62,7 @@ train:
grad_norm: True grad_norm: True
real_value: True real_value: True
criterion: criterion:
type: L1loss type: L1Loss
trainer: trainer:
type: trafficflowtrainer type: trafficflowtrainer
log_dir: ./ log_dir: ./