From 578ff48c719caeedb95c8c9bced6cbda493d25b7 Mon Sep 17 00:00:00 2001 From: HengZhang Date: Wed, 27 Nov 2024 20:41:08 +0800 Subject: [PATCH 01/11] add minigraph dataloader --- .../core/configs/cfg_trafficflow.py | 2 + federatedscope/core/data/utils.py | 9 +- .../dataloader/traffic_dataloader_v2.py | 197 ++++++++++++++++++ scripts/trafficflow_exp_scripts/D4.yaml | 2 + scripts/trafficflow_exp_scripts/D8.yaml | 4 +- 5 files changed, 211 insertions(+), 3 deletions(-) create mode 100644 federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py diff --git a/federatedscope/core/configs/cfg_trafficflow.py b/federatedscope/core/configs/cfg_trafficflow.py index 73ade59..b065361 100644 --- a/federatedscope/core/configs/cfg_trafficflow.py +++ b/federatedscope/core/configs/cfg_trafficflow.py @@ -30,6 +30,8 @@ def extend_trafficflow_cfg(cfg): cfg.model.cheb_order = 1 # A tuple, e.g., (in_channel, h, w) cfg.model.use_day = True cfg.model.use_week = True + cfg.model.minigraph_size = 5 + cfg.model.use_minigraph = False # ---------------------------------------------------------------------- # # Criterion related options diff --git a/federatedscope/core/data/utils.py b/federatedscope/core/data/utils.py index b1efa69..a9296e0 100644 --- a/federatedscope/core/data/utils.py +++ b/federatedscope/core/data/utils.py @@ -107,8 +107,13 @@ def load_dataset(config, client_cfgs=None): modified_config = config elif config.data.type.lower() in [ 'trafficflow']: - from federatedscope.trafficflow.dataloader.traffic_dataloader import load_traffic_data - dataset, modified_config = load_traffic_data(config, client_cfgs) + if config.model.use_minigraph is False: + from federatedscope.trafficflow.dataloader.traffic_dataloader import load_traffic_data + dataset, modified_config = load_traffic_data(config, client_cfgs) + else: + from federatedscope.trafficflow.dataloader.traffic_dataloader_v2 import load_traffic_data + dataset, modified_config = load_traffic_data(config, client_cfgs) + else: raise ValueError('Dataset {} not found.'.format(config.data.type)) return dataset, modified_config diff --git a/federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py b/federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py new file mode 100644 index 0000000..0a48590 --- /dev/null +++ b/federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py @@ -0,0 +1,197 @@ +import numpy as np +import torch +import torch.utils.data +from federatedscope.trafficflow.dataset.add_window import add_window_horizon +from federatedscope.trafficflow.dataset.normalization import ( + NScaler, MinMax01Scaler, MinMax11Scaler, StandardScaler, ColumnMinMaxScaler) +from federatedscope.trafficflow.dataset.traffic_dataset import load_st_dataset +def normalize_dataset(data, normalizer, column_wise=False): + if normalizer == 'max01': + if column_wise: + minimum = data.min(axis=0, keepdims=True) + maximum = data.max(axis=0, keepdims=True) + else: + minimum = data.min() + maximum = data.max() + scaler = MinMax01Scaler(minimum, maximum) + data = scaler.transform(data) + print('Normalize the dataset by MinMax01 Normalization') + elif normalizer == 'max11': + if column_wise: + minimum = data.min(axis=0, keepdims=True) + maximum = data.max(axis=0, keepdims=True) + else: + minimum = data.min() + maximum = data.max() + scaler = MinMax11Scaler(minimum, maximum) + data = scaler.transform(data) + print('Normalize the dataset by MinMax11 Normalization') + elif normalizer == 'std': + if column_wise: + mean = data.mean(axis=0, keepdims=True) + std = data.std(axis=0, keepdims=True) + else: + mean = data.mean() + std = data.std() + scaler = StandardScaler(mean, std) + # data = scaler.transform(data) + print('Normalize the dataset by Standard Normalization') + elif normalizer == 'None': + scaler = NScaler() + data = scaler.transform(data) + print('Does not normalize the dataset') + elif normalizer == 'cmax': + #column min max, to be depressed + #note: axis must be the spatial dimension, please check ! + scaler = ColumnMinMaxScaler(data.min(axis=0), data.max(axis=0)) + data = scaler.transform(data) + print('Normalize the dataset by Column Min-Max Normalization') + else: + raise ValueError + return scaler + + +def split_data_by_days(data, val_days, test_days, interval=30): + """ + :param data: [B, *] + :param val_days: + :param test_days: + :param interval: interval (15, 30, 60) minutes + :return: + """ + t = int((24 * 60) / interval) + x = -t * int(test_days) + test_data = data[-t * int(test_days):] + val_data = data[-t * int(test_days + val_days): -t * int(test_days)] + train_data = data[:-t * int(test_days + val_days)] + return train_data, val_data, test_data + + +def split_data_by_ratio(data, val_ratio, test_ratio): + data_len = data.shape[0] + test_data = data[-int(data_len * test_ratio):] + val_data = data[-int(data_len * (test_ratio + val_ratio)):-int(data_len * test_ratio)] + train_data = data[:-int(data_len * (test_ratio + val_ratio))] + return train_data, val_data, test_data + + +def data_loader(X, Y, batch_size, shuffle=True, drop_last=True): + cuda = True if torch.cuda.is_available() else False + TensorFloat = torch.cuda.FloatTensor if cuda else torch.FloatTensor + X, Y = TensorFloat(X), TensorFloat(Y) + data = torch.utils.data.TensorDataset(X, Y) + dataloader = torch.utils.data.DataLoader(data, batch_size=batch_size, + shuffle=shuffle, drop_last=drop_last) + return dataloader + + +def load_traffic_data(config, client_cfgs): + print("Use Mini graph") + root = config.data.root + dataName = 'PEMSD' + root[-1] + raw_data = load_st_dataset(dataName) + + + l, n, f = raw_data.shape + + feature_list = [raw_data] + + + # numerical time_in_day + time_ind = [i % config.data.steps_per_day / config.data.steps_per_day for i in range(raw_data.shape[0])] + time_ind = np.array(time_ind) + time_in_day = np.tile(time_ind, [1, n, 1]).transpose((2, 1, 0)) + feature_list.append(time_in_day) + + # numerical day_in_week + day_in_week = [(i // config.data.steps_per_day) % config.data.days_per_week for i in range(raw_data.shape[0])] + day_in_week = np.array(day_in_week) + day_in_week = np.tile(day_in_week, [1, n, 1]).transpose((2, 1, 0)) + feature_list.append(day_in_week) + + # data = np.concatenate(feature_list, axis=-1) + single = False + x, y = add_window_horizon(raw_data, config.data.lag, config.data.horizon, single) + x_day, y_day = add_window_horizon(time_in_day, config.data.lag, config.data.horizon, single) + x_week, y_week = add_window_horizon(day_in_week, config.data.lag, config.data.horizon, single) + x, y = np.concatenate([x, x_day, x_week], axis=-1), np.concatenate([y, y_day, y_week], axis=-1) + + # split dataset by days or by ratio + if config.data.test_ratio > 1: + x_train, x_val, x_test = split_data_by_days(x, config.data.val_ratio, config.data.test_ratio) + y_train, y_val, y_test = split_data_by_days(y, config.data.val_ratio, config.data.test_ratio) + else: + x_train, x_val, x_test = split_data_by_ratio(x, config.data.val_ratio, config.data.test_ratio) + y_train, y_val, y_test = split_data_by_ratio(y, config.data.val_ratio, config.data.test_ratio) + + # normalize st data + normalizer = 'std' + scaler = normalize_dataset(x_train[..., :config.model.input_dim], normalizer, config.data.column_wise) + config.data.scaler = [float(scaler.mean), float(scaler.std)] + + x_train[..., :config.model.input_dim] = scaler.transform(x_train[..., :config.model.input_dim]) + x_val[..., :config.model.input_dim] = scaler.transform(x_val[..., :config.model.input_dim]) + x_test[..., :config.model.input_dim] = scaler.transform(x_test[..., :config.model.input_dim]) + # y_train[..., :config.model.output_dim] = scaler.transform(y_train[..., :config.model.output_dim]) + # y_val[..., :config.model.output_dim] = scaler.transform(y_val[..., :config.model.output_dim]) + # y_test[..., :config.model.output_dim] = scaler.transform(y_test[..., :config.model.output_dim]) + + # Client-side dataset splitting + node_num = config.data.num_nodes + client_num = config.federate.client_num + per_samples = node_num // client_num + data_list, cur_index = dict(), 0 + input_dim, output_dim = config.model.input_dim, config.model.output_dim + for i in range(client_num): + if cur_index + per_samples <= node_num: + # Normal slicing + sub_array_train = x_train[:, :, cur_index:cur_index + per_samples, :] + sub_array_val = x_val[:, :, cur_index:cur_index + per_samples, :] + sub_array_test = x_test[:, :, cur_index:cur_index + per_samples, :] + + sub_y_train = y_train[:, :, cur_index:cur_index + per_samples, :output_dim] + sub_y_val = y_val[:, :, cur_index:cur_index + per_samples, :output_dim] + sub_y_test = y_test[:, :, cur_index:cur_index + per_samples, :output_dim] + else: + # If there are not enough nodes to fill per_samples, pad with zero columns + sub_array_train = x_train[:, :, cur_index:cur_index + per_samples, :] + sub_array_val = x_val[:, :, cur_index:cur_index + per_samples, :] + sub_array_test = x_test[:, :, cur_index:cur_index + per_samples, :] + padding = np.zeros((x_train.shape[0], config.data.lag ,config.data.lag, per_samples - x_train.shape[1], config.model.output_dim)) + sub_array_train = np.concatenate((sub_array_train, padding), axis=2) + sub_array_val = np.concatenate((sub_array_val, padding), axis=2) + sub_array_test = np.concatenate((sub_array_test, padding), axis=2) + + sub_y_train = y_train[:, :, cur_index:cur_index + per_samples, :] + sub_y_val = y_val[:, :, cur_index:cur_index + per_samples, :] + sub_y_test = y_test[:, :, cur_index:cur_index + per_samples, :] + sub_y_train = np.concatenate((sub_y_train, padding), axis=2) + sub_y_val = np.concatenate((sub_y_val, padding), axis=2) + sub_y_test = np.concatenate((sub_y_test, padding), axis=2) + + device = 'cuda' if torch.cuda.is_available() else 'cpu' + + data_list[i + 1] = { + 'train': torch.utils.data.TensorDataset( + torch.tensor(sub_array_train, dtype=torch.float, device=device), + torch.tensor(sub_y_train, dtype=torch.float, device=device) + ), + 'val': torch.utils.data.TensorDataset( + torch.tensor(sub_array_val, dtype=torch.float, device=device), + torch.tensor(sub_y_val, dtype=torch.float, device=device) + ), + 'test': torch.utils.data.TensorDataset( + torch.tensor(sub_array_test, dtype=torch.float, device=device), + torch.tensor(sub_y_test, dtype=torch.float, device=device) + ) + } + cur_index += per_samples + config.model.num_nodes = per_samples + return data_list, config + + +if __name__ == '__main__': + a = 'data/trafficflow/PeMS04' + name = 'PEMSD' + a[-1] + raw_data = load_st_dataset(name) + pass diff --git a/scripts/trafficflow_exp_scripts/D4.yaml b/scripts/trafficflow_exp_scripts/D4.yaml index e8da4c3..fe0e8f9 100644 --- a/scripts/trafficflow_exp_scripts/D4.yaml +++ b/scripts/trafficflow_exp_scripts/D4.yaml @@ -44,6 +44,8 @@ model: cheb_order: 2 use_day: True use_week: True + use_minigraph: True + minigraph_size: 5 train: batch_or_epoch: 'epoch' local_update_steps: 1 diff --git a/scripts/trafficflow_exp_scripts/D8.yaml b/scripts/trafficflow_exp_scripts/D8.yaml index 602ede5..97614aa 100644 --- a/scripts/trafficflow_exp_scripts/D8.yaml +++ b/scripts/trafficflow_exp_scripts/D8.yaml @@ -42,6 +42,8 @@ model: cheb_order: 2 use_day: True use_week: True + use_minigraph: True + minigraph_size: 5 train: batch_or_epoch: 'epoch' local_update_steps: 1 @@ -60,7 +62,7 @@ train: grad_norm: True real_value: True criterion: - type: L1loss + type: L1Loss trainer: type: trafficflowtrainer log_dir: ./ From 22b229891a2193f72957ee2faf7541d51e0c5bea Mon Sep 17 00:00:00 2001 From: HengZhang Date: Wed, 27 Nov 2024 21:14:01 +0800 Subject: [PATCH 02/11] Update traffic_dataloader_v2.py --- .../dataloader/traffic_dataloader_v2.py | 54 +++++++++++++++---- 1 file changed, 43 insertions(+), 11 deletions(-) diff --git a/federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py b/federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py index 0a48590..bba354c 100644 --- a/federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py +++ b/federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py @@ -86,11 +86,10 @@ def data_loader(X, Y, batch_size, shuffle=True, drop_last=True): def load_traffic_data(config, client_cfgs): - print("Use Mini graph") root = config.data.root dataName = 'PEMSD' + root[-1] raw_data = load_st_dataset(dataName) - + sub_graph_size = config.model.minigraph_size l, n, f = raw_data.shape @@ -132,9 +131,6 @@ def load_traffic_data(config, client_cfgs): x_train[..., :config.model.input_dim] = scaler.transform(x_train[..., :config.model.input_dim]) x_val[..., :config.model.input_dim] = scaler.transform(x_val[..., :config.model.input_dim]) x_test[..., :config.model.input_dim] = scaler.transform(x_test[..., :config.model.input_dim]) - # y_train[..., :config.model.output_dim] = scaler.transform(y_train[..., :config.model.output_dim]) - # y_val[..., :config.model.output_dim] = scaler.transform(y_val[..., :config.model.output_dim]) - # y_test[..., :config.model.output_dim] = scaler.transform(y_test[..., :config.model.output_dim]) # Client-side dataset splitting node_num = config.data.num_nodes @@ -171,18 +167,20 @@ def load_traffic_data(config, client_cfgs): device = 'cuda' if torch.cuda.is_available() else 'cpu' + minigraph_size = config.model.minigraph_size + data_list[i + 1] = { 'train': torch.utils.data.TensorDataset( - torch.tensor(sub_array_train, dtype=torch.float, device=device), - torch.tensor(sub_y_train, dtype=torch.float, device=device) + torch.tensor(split_into_mini_graphs(sub_array_train, minigraph_size), dtype=torch.float, device=device), + torch.tensor(split_into_mini_graphs(sub_y_train, minigraph_size), dtype=torch.float, device=device) ), 'val': torch.utils.data.TensorDataset( - torch.tensor(sub_array_val, dtype=torch.float, device=device), - torch.tensor(sub_y_val, dtype=torch.float, device=device) + torch.tensor(split_into_mini_graphs(sub_array_val, minigraph_size), dtype=torch.float, device=device), + torch.tensor(split_into_mini_graphs(sub_y_val, minigraph_size), dtype=torch.float, device=device) ), 'test': torch.utils.data.TensorDataset( - torch.tensor(sub_array_test, dtype=torch.float, device=device), - torch.tensor(sub_y_test, dtype=torch.float, device=device) + torch.tensor(split_into_mini_graphs(sub_array_test, minigraph_size), dtype=torch.float, device=device), + torch.tensor(split_into_mini_graphs(sub_y_test, minigraph_size), dtype=torch.float, device=device) ) } cur_index += per_samples @@ -190,6 +188,40 @@ def load_traffic_data(config, client_cfgs): return data_list, config +def split_into_mini_graphs(tensor, minigraph_size): + """ + Splits a tensor into mini-graphs of specified size. Pads the last mini-graph with dummy nodes if necessary. + + Args: + tensor (np.ndarray): Input tensor with shape (timestep, horizon, node_num, dim). + minigraph_size (int): The size of each mini-graph. + + Returns: + np.ndarray: Output tensor with shape (timestep, horizon, minigraph_num, minigraph_size, dim). + """ + timestep, horizon, node_num, dim = tensor.shape + + # Calculate the number of mini-graphs + minigraph_num = (node_num + minigraph_size - 1) // minigraph_size # Round up division + + # Initialize output tensor with zeros (dummy nodes) + output = np.zeros((timestep, horizon, minigraph_num, minigraph_size, dim), dtype=tensor.dtype) + + # Fill in the real data + for i in range(minigraph_num): + start_idx = i * minigraph_size + end_idx = min(start_idx + minigraph_size, node_num) # Ensure we don't exceed the node number + slice_size = end_idx - start_idx + + # Assign the data to the corresponding mini-graph + output[:, :, i, :slice_size, :] = tensor[:, :, start_idx:end_idx, :] + + # For the remaining part in the mini-graph, it remains as dummy nodes (zeros) + + return output + + + if __name__ == '__main__': a = 'data/trafficflow/PeMS04' name = 'PEMSD' + a[-1] From b9ca3349c94595b16a188a459a759d83314e3cab Mon Sep 17 00:00:00 2001 From: HengZhang Date: Thu, 28 Nov 2024 10:12:34 +0800 Subject: [PATCH 03/11] dataloader --- .../dataloader/traffic_dataloader_v2.py | 25 +++++++++---------- scripts/trafficflow_exp_scripts/D4.yaml | 2 +- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py b/federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py index bba354c..84f6df0 100644 --- a/federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py +++ b/federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py @@ -188,35 +188,34 @@ def load_traffic_data(config, client_cfgs): return data_list, config -def split_into_mini_graphs(tensor, minigraph_size): +def split_into_mini_graphs(tensor, graph_size, dummy_value=0): """ Splits a tensor into mini-graphs of specified size. Pads the last mini-graph with dummy nodes if necessary. Args: tensor (np.ndarray): Input tensor with shape (timestep, horizon, node_num, dim). - minigraph_size (int): The size of each mini-graph. + graph_size (int): The size of each mini-graph. + dummy_value (float, optional): The value to use for dummy nodes. Default is 0. Returns: - np.ndarray: Output tensor with shape (timestep, horizon, minigraph_num, minigraph_size, dim). + np.ndarray: Output tensor with shape (timestep, horizon, graph_num, graph_size, dim). """ timestep, horizon, node_num, dim = tensor.shape # Calculate the number of mini-graphs - minigraph_num = (node_num + minigraph_size - 1) // minigraph_size # Round up division + graph_num = (node_num + graph_size - 1) // graph_size # Round up division - # Initialize output tensor with zeros (dummy nodes) - output = np.zeros((timestep, horizon, minigraph_num, minigraph_size, dim), dtype=tensor.dtype) + # Initialize output tensor with dummy values + output = np.full((timestep, horizon, graph_size, graph_num, dim), dummy_value, dtype=tensor.dtype) # Fill in the real data - for i in range(minigraph_num): - start_idx = i * minigraph_size - end_idx = min(start_idx + minigraph_size, node_num) # Ensure we don't exceed the node number + for i in range(graph_num): + start_idx = i * graph_size + end_idx = min(start_idx + graph_size, node_num) # Ensure we don't exceed the node number slice_size = end_idx - start_idx - # Assign the data to the corresponding mini-graph - output[:, :, i, :slice_size, :] = tensor[:, :, start_idx:end_idx, :] - - # For the remaining part in the mini-graph, it remains as dummy nodes (zeros) + # Assign the data to the corresponding mini-graph (adjusted indexing) + output[:, :, :slice_size, i, :] = tensor[:, :, start_idx:end_idx, :] return output diff --git a/scripts/trafficflow_exp_scripts/D4.yaml b/scripts/trafficflow_exp_scripts/D4.yaml index fe0e8f9..73d9f70 100644 --- a/scripts/trafficflow_exp_scripts/D4.yaml +++ b/scripts/trafficflow_exp_scripts/D4.yaml @@ -45,7 +45,7 @@ model: use_day: True use_week: True use_minigraph: True - minigraph_size: 5 + minigraph_size: 3 train: batch_or_epoch: 'epoch' local_update_steps: 1 From 50785ad3c1466e3cf025de0b8cee514eccd1d392 Mon Sep 17 00:00:00 2001 From: HengZhang Date: Thu, 28 Nov 2024 10:13:15 +0800 Subject: [PATCH 04/11] Update traffic_dataloader_v2.py --- federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py | 1 - 1 file changed, 1 deletion(-) diff --git a/federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py b/federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py index 84f6df0..98c5f56 100644 --- a/federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py +++ b/federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py @@ -89,7 +89,6 @@ def load_traffic_data(config, client_cfgs): root = config.data.root dataName = 'PEMSD' + root[-1] raw_data = load_st_dataset(dataName) - sub_graph_size = config.model.minigraph_size l, n, f = raw_data.shape From fc6ed02806e8423512da1e0c28c9b84f1d92a5ba Mon Sep 17 00:00:00 2001 From: HengZhang Date: Thu, 28 Nov 2024 10:17:52 +0800 Subject: [PATCH 05/11] add v2 model in builder --- .../core/auxiliaries/model_builder.py | 8 +- federatedscope/trafficflow/model/FedDGCNv2.py | 102 ++++++++++++++++++ 2 files changed, 108 insertions(+), 2 deletions(-) create mode 100644 federatedscope/trafficflow/model/FedDGCNv2.py diff --git a/federatedscope/core/auxiliaries/model_builder.py b/federatedscope/core/auxiliaries/model_builder.py index 25cc31d..22bfa39 100644 --- a/federatedscope/core/auxiliaries/model_builder.py +++ b/federatedscope/core/auxiliaries/model_builder.py @@ -205,8 +205,12 @@ def get_model(model_config, local_data=None, backend='torch'): from federatedscope.nlp.hetero_tasks.model import ATCModel model = ATCModel(model_config) elif model_config.type.lower() in ['feddgcn']: - from federatedscope.trafficflow.model.FedDGCN import FedDGCN - model = FedDGCN(model_config) + if model_config.use_minigraph is False: + from federatedscope.trafficflow.model.FedDGCN import FedDGCN + model = FedDGCN(model_config) + else: + from federatedscope.trafficflow.model.FedDGCNv2 import FedDGCN + model = FedDGCN(model_config) else: raise ValueError('Model {} is not provided'.format(model_config.type)) diff --git a/federatedscope/trafficflow/model/FedDGCNv2.py b/federatedscope/trafficflow/model/FedDGCNv2.py new file mode 100644 index 0000000..927a509 --- /dev/null +++ b/federatedscope/trafficflow/model/FedDGCNv2.py @@ -0,0 +1,102 @@ +from federatedscope.register import register_model +import torch +import torch.nn as nn +from federatedscope.trafficflow.model.DGCRUCell import DGCRUCell + +class DGCRM(nn.Module): + def __init__(self, node_num, dim_in, dim_out, cheb_k, embed_dim, num_layers=1): + super(DGCRM, self).__init__() + assert num_layers >= 1, 'At least one DCRNN layer in the Encoder.' + self.node_num = node_num + self.input_dim = dim_in + self.num_layers = num_layers + self.DGCRM_cells = nn.ModuleList() + self.DGCRM_cells.append(DGCRUCell(node_num, dim_in, dim_out, cheb_k, embed_dim)) + for _ in range(1, num_layers): + self.DGCRM_cells.append(DGCRUCell(node_num, dim_out, dim_out, cheb_k, embed_dim)) + + def forward(self, x, init_state, node_embeddings): + assert x.shape[2] == self.node_num and x.shape[3] == self.input_dim + seq_length = x.shape[1] + current_inputs = x + output_hidden = [] + for i in range(self.num_layers): + state = init_state[i] + inner_states = [] + for t in range(seq_length): + state = self.DGCRM_cells[i](current_inputs[:, t, :, :], state, [node_embeddings[0][:, t, :, :], node_embeddings[1]]) + inner_states.append(state) + output_hidden.append(state) + current_inputs = torch.stack(inner_states, dim=1) + return current_inputs, output_hidden + + def init_hidden(self, batch_size): + init_states = [] + for i in range(self.num_layers): + init_states.append(self.DGCRM_cells[i].init_hidden_state(batch_size)) + return torch.stack(init_states, dim=0) #(num_layers, B, N, hidden_dim) + +# Build you torch or tf model class here +class FedDGCN(nn.Module): + def __init__(self, args): + super(FedDGCN, self).__init__() + # print("You are in subminigraph") + self.num_node = args.num_nodes + self.input_dim = args.input_dim + self.hidden_dim = args.rnn_units + self.output_dim = args.output_dim + self.horizon = args.horizon + self.num_layers = args.num_layers + self.use_D = args.use_day + self.use_W = args.use_week + self.dropout1 = nn.Dropout(p=args.dropout) # 0.1 + self.dropout2 = nn.Dropout(p=args.dropout) + self.node_embeddings1 = nn.Parameter(torch.randn(self.num_node, args.embed_dim), requires_grad=True) + self.node_embeddings2 = nn.Parameter(torch.randn(self.num_node, args.embed_dim), requires_grad=True) + self.T_i_D_emb = nn.Parameter(torch.empty(288, args.embed_dim)) + self.D_i_W_emb = nn.Parameter(torch.empty(7, args.embed_dim)) + # Initialize parameters + nn.init.xavier_uniform_(self.node_embeddings1) + nn.init.xavier_uniform_(self.T_i_D_emb) + nn.init.xavier_uniform_(self.D_i_W_emb) + + self.encoder1 = DGCRM(args.num_nodes, args.input_dim, args.rnn_units, args.cheb_order, + args.embed_dim, args.num_layers) + self.encoder2 = DGCRM(args.num_nodes, args.input_dim, args.rnn_units, args.cheb_order, + args.embed_dim, args.num_layers) + # predictor + self.end_conv1 = nn.Conv2d(1, args.horizon * self.output_dim, kernel_size=(1, self.hidden_dim), bias=True) + self.end_conv2 = nn.Conv2d(1, args.horizon * self.output_dim, kernel_size=(1, self.hidden_dim), bias=True) + self.end_conv3 = nn.Conv2d(1, args.horizon * self.output_dim, kernel_size=(1, self.hidden_dim), bias=True) + + def forward(self, source, i=2): + node_embedding1 = self.node_embeddings1 + if self.use_D: + t_i_d_data = source[..., 1] + T_i_D_emb = self.T_i_D_emb[(t_i_d_data * 288).type(torch.LongTensor)] + node_embedding1 = torch.mul(node_embedding1, T_i_D_emb) + + if self.use_W: + d_i_w_data = source[..., 2] + D_i_W_emb = self.D_i_W_emb[(d_i_w_data).type(torch.LongTensor)] + node_embedding1 = torch.mul(node_embedding1, D_i_W_emb) + + node_embeddings=[node_embedding1,self.node_embeddings1] + + source = source[..., 0].unsqueeze(-1) + + init_state1 = self.encoder1.init_hidden(source.shape[0]) + output, _ = self.encoder1(source, init_state1, node_embeddings) + output = self.dropout1(output[:, -1:, :, :]) + + output1 = self.end_conv1(output) + source1 = self.end_conv2(output) + + source2 = source - source1 + + init_state2 = self.encoder2.init_hidden(source2.shape[0]) + output2, _ = self.encoder2(source2, init_state2, node_embeddings) + output2 = self.dropout2(output2[:, -1:, :, :]) + output2 = self.end_conv3(output2) + + return output1 + output2 From 1b259201885833ed0bec93760d1accded94de7c8 Mon Sep 17 00:00:00 2001 From: HengZhang Date: Thu, 28 Nov 2024 10:23:44 +0800 Subject: [PATCH 06/11] fix bug in linux -- add config in configs/cfg_model --- federatedscope/core/configs/cfg_model.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/federatedscope/core/configs/cfg_model.py b/federatedscope/core/configs/cfg_model.py index 9737f4f..cdc954e 100644 --- a/federatedscope/core/configs/cfg_model.py +++ b/federatedscope/core/configs/cfg_model.py @@ -62,6 +62,8 @@ def extend_model_cfg(cfg): cfg.model.cheb_order = 1 # A tuple, e.g., (in_channel, h, w) cfg.model.use_day = True cfg.model.use_week = True + cfg.model.minigraph_size = 5 + cfg.model.use_minigraph = False # ---------------------------------------------------------------------- # From 0b41f04d3c5a3573b160862f2f0da33c40975fcb Mon Sep 17 00:00:00 2001 From: HengZhang Date: Thu, 28 Nov 2024 10:35:31 +0800 Subject: [PATCH 07/11] finish v2 model output --- .../core/auxiliaries/model_builder.py | 4 +- federatedscope/trafficflow/model/FedDGCNv2.py | 59 ++++++++++++++++++- 2 files changed, 58 insertions(+), 5 deletions(-) diff --git a/federatedscope/core/auxiliaries/model_builder.py b/federatedscope/core/auxiliaries/model_builder.py index 22bfa39..5d94516 100644 --- a/federatedscope/core/auxiliaries/model_builder.py +++ b/federatedscope/core/auxiliaries/model_builder.py @@ -209,8 +209,8 @@ def get_model(model_config, local_data=None, backend='torch'): from federatedscope.trafficflow.model.FedDGCN import FedDGCN model = FedDGCN(model_config) else: - from federatedscope.trafficflow.model.FedDGCNv2 import FedDGCN - model = FedDGCN(model_config) + from federatedscope.trafficflow.model.FedDGCNv2 import FederatedFedDGCN + model = FederatedFedDGCN(model_config) else: raise ValueError('Model {} is not provided'.format(model_config.type)) diff --git a/federatedscope/trafficflow/model/FedDGCNv2.py b/federatedscope/trafficflow/model/FedDGCNv2.py index 927a509..ddd2f92 100644 --- a/federatedscope/trafficflow/model/FedDGCNv2.py +++ b/federatedscope/trafficflow/model/FedDGCNv2.py @@ -1,3 +1,5 @@ +from torch.nn import ModuleList + from federatedscope.register import register_model import torch import torch.nn as nn @@ -41,7 +43,7 @@ class FedDGCN(nn.Module): def __init__(self, args): super(FedDGCN, self).__init__() # print("You are in subminigraph") - self.num_node = args.num_nodes + self.num_node = args.minigraph_size self.input_dim = args.input_dim self.hidden_dim = args.rnn_units self.output_dim = args.output_dim @@ -60,9 +62,9 @@ class FedDGCN(nn.Module): nn.init.xavier_uniform_(self.T_i_D_emb) nn.init.xavier_uniform_(self.D_i_W_emb) - self.encoder1 = DGCRM(args.num_nodes, args.input_dim, args.rnn_units, args.cheb_order, + self.encoder1 = DGCRM(args.minigraph_size, args.input_dim, args.rnn_units, args.cheb_order, args.embed_dim, args.num_layers) - self.encoder2 = DGCRM(args.num_nodes, args.input_dim, args.rnn_units, args.cheb_order, + self.encoder2 = DGCRM(args.minigraph_size, args.input_dim, args.rnn_units, args.cheb_order, args.embed_dim, args.num_layers) # predictor self.end_conv1 = nn.Conv2d(1, args.horizon * self.output_dim, kernel_size=(1, self.hidden_dim), bias=True) @@ -100,3 +102,54 @@ class FedDGCN(nn.Module): output2 = self.end_conv3(output2) return output1 + output2 + + +class FederatedFedDGCN(nn.Module): + def __init__(self, args): + super(FederatedFedDGCN, self).__init__() + + # Initializing with None, we will populate model_list during the forward pass + self.model_list = None + self.main_model = FedDGCN(args) # Initialize a single FedDGCN model (for aggregation) + self.args = args + self.subgraph_num = 0 + self.num_node = args.minigraph_size + self.input_dim = args.input_dim + self.hidden_dim = args.rnn_units + self.output_dim = args.output_dim + self.horizon = args.horizon + + def forward(self, source): + """ + Forward pass for the federated model. Each subgraph processes its portion of the data, + and then the results are aggregated. + + Arguments: + - source: Tensor of shape (batchsize, horizon, subgraph_num, subgraph_size, dims) + + Returns: + - Aggregated output (batchsize, horizon, subgraph_num, subgraph_size, dims) + """ + self.subgraph_num = source.shape[2] + + # Initialize model_list if it hasn't been initialized yet + if self.model_list is None: + # Initialize model_list with FedDGCN models, one for each subgraph + self.model_list = ModuleList([self.main_model] + [FedDGCN(self.args) for _ in range(self.subgraph_num - 1)]) + + # Initialize a list to store the outputs of each subgraph model + subgraph_outputs = [] + + # Iterate through the subgraph models + for i in range(self.subgraph_num): + # Extract the subgraph-specific data + subgraph_data = source[:, :, i, :, :] # (batchsize, horizon, subgraph_size, dims) + + # Forward pass for each subgraph model + subgraph_output = self.model_list[i](subgraph_data) + subgraph_outputs.append(subgraph_output) + + # Reshape the outputs into (batchsize, horizon, subgraph_num, subgraph_size, dims) + output_tensor = torch.stack(subgraph_outputs, dim=2) # (batchsize, horizon, subgraph_num, subgraph_size, dims) + + return output_tensor From 9cfe3f01dc96f641d251c70f1d4f549e06690300 Mon Sep 17 00:00:00 2001 From: HengZhang Date: Thu, 28 Nov 2024 10:43:14 +0800 Subject: [PATCH 08/11] fix dataloader --- .../dataloader/traffic_dataloader_v2.py | 6 ++--- federatedscope/trafficflow/model/FedDGCNv2.py | 27 ++++++++++++++++--- scripts/trafficflow_exp_scripts/D4.yaml | 2 +- 3 files changed, 28 insertions(+), 7 deletions(-) diff --git a/federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py b/federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py index 98c5f56..f8d5229 100644 --- a/federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py +++ b/federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py @@ -205,7 +205,7 @@ def split_into_mini_graphs(tensor, graph_size, dummy_value=0): graph_num = (node_num + graph_size - 1) // graph_size # Round up division # Initialize output tensor with dummy values - output = np.full((timestep, horizon, graph_size, graph_num, dim), dummy_value, dtype=tensor.dtype) + output = np.full((timestep, horizon, graph_num, graph_size, dim), dummy_value, dtype=tensor.dtype) # Fill in the real data for i in range(graph_num): @@ -213,8 +213,8 @@ def split_into_mini_graphs(tensor, graph_size, dummy_value=0): end_idx = min(start_idx + graph_size, node_num) # Ensure we don't exceed the node number slice_size = end_idx - start_idx - # Assign the data to the corresponding mini-graph (adjusted indexing) - output[:, :, :slice_size, i, :] = tensor[:, :, start_idx:end_idx, :] + # Assign the data to the corresponding mini-graph + output[:, :, i, :slice_size, :] = tensor[:, :, start_idx:end_idx, :] return output diff --git a/federatedscope/trafficflow/model/FedDGCNv2.py b/federatedscope/trafficflow/model/FedDGCNv2.py index ddd2f92..f565131 100644 --- a/federatedscope/trafficflow/model/FedDGCNv2.py +++ b/federatedscope/trafficflow/model/FedDGCNv2.py @@ -1,6 +1,4 @@ from torch.nn import ModuleList - -from federatedscope.register import register_model import torch import torch.nn as nn from federatedscope.trafficflow.model.DGCRUCell import DGCRUCell @@ -71,7 +69,7 @@ class FedDGCN(nn.Module): self.end_conv2 = nn.Conv2d(1, args.horizon * self.output_dim, kernel_size=(1, self.hidden_dim), bias=True) self.end_conv3 = nn.Conv2d(1, args.horizon * self.output_dim, kernel_size=(1, self.hidden_dim), bias=True) - def forward(self, source, i=2): + def forward(self, source): node_embedding1 = self.node_embeddings1 if self.use_D: t_i_d_data = source[..., 1] @@ -152,4 +150,27 @@ class FederatedFedDGCN(nn.Module): # Reshape the outputs into (batchsize, horizon, subgraph_num, subgraph_size, dims) output_tensor = torch.stack(subgraph_outputs, dim=2) # (batchsize, horizon, subgraph_num, subgraph_size, dims) + self.update_main_model() + return output_tensor + + def update_main_model(self): + """ + 更新 main_model 的参数为 model_list 中所有模型参数的平均值。 + """ + # 遍历 main_model 的参数 + with torch.no_grad(): # 确保更新时不会计算梯度 + for name, main_param in self.main_model.named_parameters(): + # 初始化平均值的容器 + avg_param = torch.zeros_like(main_param) + + # 遍历 model_list 中的所有模型 + for model in self.model_list: + # 加上当前模型的对应参数 + avg_param += model.state_dict()[name] + + # 计算平均值 + avg_param /= len(self.model_list) + + # 更新 main_model 的参数 + main_param.copy_(avg_param) diff --git a/scripts/trafficflow_exp_scripts/D4.yaml b/scripts/trafficflow_exp_scripts/D4.yaml index 73d9f70..1dc3aa6 100644 --- a/scripts/trafficflow_exp_scripts/D4.yaml +++ b/scripts/trafficflow_exp_scripts/D4.yaml @@ -45,7 +45,7 @@ model: use_day: True use_week: True use_minigraph: True - minigraph_size: 3 + minigraph_size: 10 train: batch_or_epoch: 'epoch' local_update_steps: 1 From e95c13f0fc61a7b4cd71665366a90265b2069936 Mon Sep 17 00:00:00 2001 From: HengZhang Date: Thu, 28 Nov 2024 11:15:47 +0800 Subject: [PATCH 09/11] train success --- federatedscope/trafficflow/model/FedDGCNv2.py | 32 ++++++++----------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/federatedscope/trafficflow/model/FedDGCNv2.py b/federatedscope/trafficflow/model/FedDGCNv2.py index f565131..1b92e5d 100644 --- a/federatedscope/trafficflow/model/FedDGCNv2.py +++ b/federatedscope/trafficflow/model/FedDGCNv2.py @@ -3,6 +3,7 @@ import torch import torch.nn as nn from federatedscope.trafficflow.model.DGCRUCell import DGCRUCell + class DGCRM(nn.Module): def __init__(self, node_num, dim_in, dim_out, cheb_k, embed_dim, num_layers=1): super(DGCRM, self).__init__() @@ -24,7 +25,8 @@ class DGCRM(nn.Module): state = init_state[i] inner_states = [] for t in range(seq_length): - state = self.DGCRM_cells[i](current_inputs[:, t, :, :], state, [node_embeddings[0][:, t, :, :], node_embeddings[1]]) + state = self.DGCRM_cells[i](current_inputs[:, t, :, :], state, + [node_embeddings[0][:, t, :, :], node_embeddings[1]]) inner_states.append(state) output_hidden.append(state) current_inputs = torch.stack(inner_states, dim=1) @@ -34,7 +36,8 @@ class DGCRM(nn.Module): init_states = [] for i in range(self.num_layers): init_states.append(self.DGCRM_cells[i].init_hidden_state(batch_size)) - return torch.stack(init_states, dim=0) #(num_layers, B, N, hidden_dim) + return torch.stack(init_states, dim=0) # (num_layers, B, N, hidden_dim) + # Build you torch or tf model class here class FedDGCN(nn.Module): @@ -49,7 +52,7 @@ class FedDGCN(nn.Module): self.num_layers = args.num_layers self.use_D = args.use_day self.use_W = args.use_week - self.dropout1 = nn.Dropout(p=args.dropout) # 0.1 + self.dropout1 = nn.Dropout(p=args.dropout) # 0.1 self.dropout2 = nn.Dropout(p=args.dropout) self.node_embeddings1 = nn.Parameter(torch.randn(self.num_node, args.embed_dim), requires_grad=True) self.node_embeddings2 = nn.Parameter(torch.randn(self.num_node, args.embed_dim), requires_grad=True) @@ -72,16 +75,16 @@ class FedDGCN(nn.Module): def forward(self, source): node_embedding1 = self.node_embeddings1 if self.use_D: - t_i_d_data = source[..., 1] + t_i_d_data = source[..., 1] T_i_D_emb = self.T_i_D_emb[(t_i_d_data * 288).type(torch.LongTensor)] node_embedding1 = torch.mul(node_embedding1, T_i_D_emb) if self.use_W: - d_i_w_data = source[..., 2] + d_i_w_data = source[..., 2] D_i_W_emb = self.D_i_W_emb[(d_i_w_data).type(torch.LongTensor)] node_embedding1 = torch.mul(node_embedding1, D_i_W_emb) - node_embeddings=[node_embedding1,self.node_embeddings1] + node_embeddings = [node_embedding1, self.node_embeddings1] source = source[..., 0].unsqueeze(-1) @@ -107,15 +110,11 @@ class FederatedFedDGCN(nn.Module): super(FederatedFedDGCN, self).__init__() # Initializing with None, we will populate model_list during the forward pass + self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.model_list = None - self.main_model = FedDGCN(args) # Initialize a single FedDGCN model (for aggregation) + self.graph_num = (args.num_nodes + args.minigraph_size - 1) // args.minigraph_size self.args = args - self.subgraph_num = 0 - self.num_node = args.minigraph_size - self.input_dim = args.input_dim - self.hidden_dim = args.rnn_units - self.output_dim = args.output_dim - self.horizon = args.horizon + self.model_list = ModuleList(FedDGCN(self.args).to(self.device) for _ in range(self.graph_num)) def forward(self, source): """ @@ -130,11 +129,6 @@ class FederatedFedDGCN(nn.Module): """ self.subgraph_num = source.shape[2] - # Initialize model_list if it hasn't been initialized yet - if self.model_list is None: - # Initialize model_list with FedDGCN models, one for each subgraph - self.model_list = ModuleList([self.main_model] + [FedDGCN(self.args) for _ in range(self.subgraph_num - 1)]) - # Initialize a list to store the outputs of each subgraph model subgraph_outputs = [] @@ -150,7 +144,7 @@ class FederatedFedDGCN(nn.Module): # Reshape the outputs into (batchsize, horizon, subgraph_num, subgraph_size, dims) output_tensor = torch.stack(subgraph_outputs, dim=2) # (batchsize, horizon, subgraph_num, subgraph_size, dims) - self.update_main_model() + # self.update_main_model() return output_tensor From 5fdab2b66895426171808cf2ab1f2cf30bca3578 Mon Sep 17 00:00:00 2001 From: HengZhang Date: Thu, 28 Nov 2024 11:46:32 +0800 Subject: [PATCH 10/11] model subgraph --- .../cl/lr_scheduler/LR_Scheduler.py | 68 +++++++++++++++++++ federatedscope/trafficflow/model/FedDGCNv2.py | 53 +++++++-------- scripts/trafficflow_exp_scripts/D3.yaml | 2 + scripts/trafficflow_exp_scripts/D4.yaml | 2 +- scripts/trafficflow_exp_scripts/D7.yaml | 2 + scripts/trafficflow_exp_scripts/D8.yaml | 4 +- 6 files changed, 101 insertions(+), 30 deletions(-) create mode 100644 federatedscope/cl/lr_scheduler/LR_Scheduler.py diff --git a/federatedscope/cl/lr_scheduler/LR_Scheduler.py b/federatedscope/cl/lr_scheduler/LR_Scheduler.py new file mode 100644 index 0000000..90b82a5 --- /dev/null +++ b/federatedscope/cl/lr_scheduler/LR_Scheduler.py @@ -0,0 +1,68 @@ +import numpy as np +from federatedscope.register import register_scheduler + + +# LR Scheduler +class LR_Scheduler(object): + def __init__(self, + optimizer, + warmup_epochs, + warmup_lr, + num_epochs, + base_lr, + final_lr, + iter_per_epoch, + constant_predictor_lr=False): + self.base_lr = base_lr + self.constant_predictor_lr = constant_predictor_lr + warmup_iter = iter_per_epoch * warmup_epochs + warmup_lr_schedule = np.linspace(warmup_lr, base_lr, warmup_iter) + decay_iter = iter_per_epoch * (num_epochs - warmup_epochs) + cosine_lr_schedule = final_lr + 0.5 * (base_lr - final_lr) * ( + 1 + np.cos(np.pi * np.arange(decay_iter) / decay_iter)) + + self.lr_schedule = np.concatenate( + (warmup_lr_schedule, cosine_lr_schedule)) + self.optimizer = optimizer + self.iter = 0 + self.current_lr = 0 + + def step(self): + for param_group in self.optimizer.param_groups: + + if self.constant_predictor_lr and param_group[ + 'name'] == 'predictor': + param_group['lr'] = self.base_lr + else: + lr = param_group['lr'] = self.lr_schedule[self.iter] + + self.iter += 1 + self.current_lr = lr + return lr + + def get_lr(self): + return self.current_lr + + +def get_scheduler(optimizer, type): + try: + import torch.optim as optim + except ImportError: + optim = None + scheduler = None + + if type == 'cos_lr_scheduler': + if optim is not None: + lr_lambda = [lambda epoch: epoch // 30] + scheduler = optim.lr_scheduler.LambdaLR(optimizer, + warmup_epochs=0, + warmup_lr=0, + num_epochs=50, + base_lr=30, + final_lr=0, + iter_per_epoch=int(50000 / + 512)) + return scheduler + + +register_scheduler('cos_lr_scheduler', get_scheduler) diff --git a/federatedscope/trafficflow/model/FedDGCNv2.py b/federatedscope/trafficflow/model/FedDGCNv2.py index 1b92e5d..542cb9a 100644 --- a/federatedscope/trafficflow/model/FedDGCNv2.py +++ b/federatedscope/trafficflow/model/FedDGCNv2.py @@ -2,7 +2,7 @@ from torch.nn import ModuleList import torch import torch.nn as nn from federatedscope.trafficflow.model.DGCRUCell import DGCRUCell - +import time class DGCRM(nn.Module): def __init__(self, node_num, dim_in, dim_out, cheb_k, embed_dim, num_layers=1): @@ -25,8 +25,7 @@ class DGCRM(nn.Module): state = init_state[i] inner_states = [] for t in range(seq_length): - state = self.DGCRM_cells[i](current_inputs[:, t, :, :], state, - [node_embeddings[0][:, t, :, :], node_embeddings[1]]) + state = self.DGCRM_cells[i](current_inputs[:, t, :, :], state, [node_embeddings[0][:, t, :, :], node_embeddings[1]]) inner_states.append(state) output_hidden.append(state) current_inputs = torch.stack(inner_states, dim=1) @@ -36,8 +35,7 @@ class DGCRM(nn.Module): init_states = [] for i in range(self.num_layers): init_states.append(self.DGCRM_cells[i].init_hidden_state(batch_size)) - return torch.stack(init_states, dim=0) # (num_layers, B, N, hidden_dim) - + return torch.stack(init_states, dim=0) #(num_layers, B, N, hidden_dim) # Build you torch or tf model class here class FedDGCN(nn.Module): @@ -52,7 +50,7 @@ class FedDGCN(nn.Module): self.num_layers = args.num_layers self.use_D = args.use_day self.use_W = args.use_week - self.dropout1 = nn.Dropout(p=args.dropout) # 0.1 + self.dropout1 = nn.Dropout(p=args.dropout) # 0.1 self.dropout2 = nn.Dropout(p=args.dropout) self.node_embeddings1 = nn.Parameter(torch.randn(self.num_node, args.embed_dim), requires_grad=True) self.node_embeddings2 = nn.Parameter(torch.randn(self.num_node, args.embed_dim), requires_grad=True) @@ -75,16 +73,16 @@ class FedDGCN(nn.Module): def forward(self, source): node_embedding1 = self.node_embeddings1 if self.use_D: - t_i_d_data = source[..., 1] + t_i_d_data = source[..., 1] T_i_D_emb = self.T_i_D_emb[(t_i_d_data * 288).type(torch.LongTensor)] node_embedding1 = torch.mul(node_embedding1, T_i_D_emb) if self.use_W: - d_i_w_data = source[..., 2] + d_i_w_data = source[..., 2] D_i_W_emb = self.D_i_W_emb[(d_i_w_data).type(torch.LongTensor)] node_embedding1 = torch.mul(node_embedding1, D_i_W_emb) - node_embeddings = [node_embedding1, self.node_embeddings1] + node_embeddings=[node_embedding1,self.node_embeddings1] source = source[..., 0].unsqueeze(-1) @@ -133,6 +131,7 @@ class FederatedFedDGCN(nn.Module): subgraph_outputs = [] # Iterate through the subgraph models + # Parallel computation has not been realized yet, so it may slower than normal. for i in range(self.subgraph_num): # Extract the subgraph-specific data subgraph_data = source[:, :, i, :, :] # (batchsize, horizon, subgraph_size, dims) @@ -143,28 +142,28 @@ class FederatedFedDGCN(nn.Module): # Reshape the outputs into (batchsize, horizon, subgraph_num, subgraph_size, dims) output_tensor = torch.stack(subgraph_outputs, dim=2) # (batchsize, horizon, subgraph_num, subgraph_size, dims) - - # self.update_main_model() - + self.local_aggregate() return output_tensor - def update_main_model(self): + def local_aggregate(self): """ - 更新 main_model 的参数为 model_list 中所有模型参数的平均值。 + Update the parameters of each model in model_list to the average of all models' parameters. """ - # 遍历 main_model 的参数 - with torch.no_grad(): # 确保更新时不会计算梯度 - for name, main_param in self.main_model.named_parameters(): - # 初始化平均值的容器 - avg_param = torch.zeros_like(main_param) + with torch.no_grad(): # Ensure no gradients are calculated during the update + # Iterate over each model in model_list + for i, model in enumerate(self.model_list): + # Iterate over each model's parameters + for name, param in model.named_parameters(): + # Initialize a container for the average value + avg_param = torch.zeros_like(param) - # 遍历 model_list 中的所有模型 - for model in self.model_list: - # 加上当前模型的对应参数 - avg_param += model.state_dict()[name] + # Accumulate the corresponding parameters from all other models + for other_model in self.model_list: + avg_param += other_model.state_dict()[name] - # 计算平均值 - avg_param /= len(self.model_list) + # Calculate the average + avg_param /= len(self.model_list) + + # Update the current model's parameter + param.data.copy_(avg_param) - # 更新 main_model 的参数 - main_param.copy_(avg_param) diff --git a/scripts/trafficflow_exp_scripts/D3.yaml b/scripts/trafficflow_exp_scripts/D3.yaml index 2f19f56..1f6d0d8 100644 --- a/scripts/trafficflow_exp_scripts/D3.yaml +++ b/scripts/trafficflow_exp_scripts/D3.yaml @@ -42,6 +42,8 @@ model: cheb_order: 2 use_day: True use_week: True + use_minigraph: False + minigraph_size: 10 train: batch_or_epoch: 'epoch' local_update_steps: 1 diff --git a/scripts/trafficflow_exp_scripts/D4.yaml b/scripts/trafficflow_exp_scripts/D4.yaml index 1dc3aa6..97d85b7 100644 --- a/scripts/trafficflow_exp_scripts/D4.yaml +++ b/scripts/trafficflow_exp_scripts/D4.yaml @@ -44,7 +44,7 @@ model: cheb_order: 2 use_day: True use_week: True - use_minigraph: True + use_minigraph: False minigraph_size: 10 train: batch_or_epoch: 'epoch' diff --git a/scripts/trafficflow_exp_scripts/D7.yaml b/scripts/trafficflow_exp_scripts/D7.yaml index 518d52d..7dd6070 100644 --- a/scripts/trafficflow_exp_scripts/D7.yaml +++ b/scripts/trafficflow_exp_scripts/D7.yaml @@ -42,6 +42,8 @@ model: cheb_order: 2 use_day: True use_week: True + use_minigraph: False + minigraph_size: 10 train: batch_or_epoch: 'epoch' local_update_steps: 1 diff --git a/scripts/trafficflow_exp_scripts/D8.yaml b/scripts/trafficflow_exp_scripts/D8.yaml index 97614aa..498b71e 100644 --- a/scripts/trafficflow_exp_scripts/D8.yaml +++ b/scripts/trafficflow_exp_scripts/D8.yaml @@ -42,8 +42,8 @@ model: cheb_order: 2 use_day: True use_week: True - use_minigraph: True - minigraph_size: 5 + use_minigraph: False + minigraph_size: 10 train: batch_or_epoch: 'epoch' local_update_steps: 1 From f43502cc214e318ac8ed332572235258443199c7 Mon Sep 17 00:00:00 2001 From: HengZhang Date: Thu, 28 Nov 2024 11:51:45 +0800 Subject: [PATCH 11/11] Delete LR_Scheduler.py --- .../cl/lr_scheduler/ LR_Scheduler.py | 68 ------------------- 1 file changed, 68 deletions(-) delete mode 100644 federatedscope/cl/lr_scheduler/ LR_Scheduler.py diff --git a/federatedscope/cl/lr_scheduler/ LR_Scheduler.py b/federatedscope/cl/lr_scheduler/ LR_Scheduler.py deleted file mode 100644 index 90b82a5..0000000 --- a/federatedscope/cl/lr_scheduler/ LR_Scheduler.py +++ /dev/null @@ -1,68 +0,0 @@ -import numpy as np -from federatedscope.register import register_scheduler - - -# LR Scheduler -class LR_Scheduler(object): - def __init__(self, - optimizer, - warmup_epochs, - warmup_lr, - num_epochs, - base_lr, - final_lr, - iter_per_epoch, - constant_predictor_lr=False): - self.base_lr = base_lr - self.constant_predictor_lr = constant_predictor_lr - warmup_iter = iter_per_epoch * warmup_epochs - warmup_lr_schedule = np.linspace(warmup_lr, base_lr, warmup_iter) - decay_iter = iter_per_epoch * (num_epochs - warmup_epochs) - cosine_lr_schedule = final_lr + 0.5 * (base_lr - final_lr) * ( - 1 + np.cos(np.pi * np.arange(decay_iter) / decay_iter)) - - self.lr_schedule = np.concatenate( - (warmup_lr_schedule, cosine_lr_schedule)) - self.optimizer = optimizer - self.iter = 0 - self.current_lr = 0 - - def step(self): - for param_group in self.optimizer.param_groups: - - if self.constant_predictor_lr and param_group[ - 'name'] == 'predictor': - param_group['lr'] = self.base_lr - else: - lr = param_group['lr'] = self.lr_schedule[self.iter] - - self.iter += 1 - self.current_lr = lr - return lr - - def get_lr(self): - return self.current_lr - - -def get_scheduler(optimizer, type): - try: - import torch.optim as optim - except ImportError: - optim = None - scheduler = None - - if type == 'cos_lr_scheduler': - if optim is not None: - lr_lambda = [lambda epoch: epoch // 30] - scheduler = optim.lr_scheduler.LambdaLR(optimizer, - warmup_epochs=0, - warmup_lr=0, - num_epochs=50, - base_lr=30, - final_lr=0, - iter_per_epoch=int(50000 / - 512)) - return scheduler - - -register_scheduler('cos_lr_scheduler', get_scheduler)