diff --git a/federatedscope/core/configs/cfg_trafficflow.py b/federatedscope/core/configs/cfg_trafficflow.py index 73ade59..b065361 100644 --- a/federatedscope/core/configs/cfg_trafficflow.py +++ b/federatedscope/core/configs/cfg_trafficflow.py @@ -30,6 +30,8 @@ def extend_trafficflow_cfg(cfg): cfg.model.cheb_order = 1 # A tuple, e.g., (in_channel, h, w) cfg.model.use_day = True cfg.model.use_week = True + cfg.model.minigraph_size = 5 + cfg.model.use_minigraph = False # ---------------------------------------------------------------------- # # Criterion related options diff --git a/federatedscope/core/data/utils.py b/federatedscope/core/data/utils.py index b1efa69..a9296e0 100644 --- a/federatedscope/core/data/utils.py +++ b/federatedscope/core/data/utils.py @@ -107,8 +107,13 @@ def load_dataset(config, client_cfgs=None): modified_config = config elif config.data.type.lower() in [ 'trafficflow']: - from federatedscope.trafficflow.dataloader.traffic_dataloader import load_traffic_data - dataset, modified_config = load_traffic_data(config, client_cfgs) + if config.model.use_minigraph is False: + from federatedscope.trafficflow.dataloader.traffic_dataloader import load_traffic_data + dataset, modified_config = load_traffic_data(config, client_cfgs) + else: + from federatedscope.trafficflow.dataloader.traffic_dataloader_v2 import load_traffic_data + dataset, modified_config = load_traffic_data(config, client_cfgs) + else: raise ValueError('Dataset {} not found.'.format(config.data.type)) return dataset, modified_config diff --git a/federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py b/federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py new file mode 100644 index 0000000..0a48590 --- /dev/null +++ b/federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py @@ -0,0 +1,197 @@ +import numpy as np +import torch +import torch.utils.data +from federatedscope.trafficflow.dataset.add_window import add_window_horizon +from federatedscope.trafficflow.dataset.normalization import ( + NScaler, MinMax01Scaler, MinMax11Scaler, StandardScaler, ColumnMinMaxScaler) +from federatedscope.trafficflow.dataset.traffic_dataset import load_st_dataset +def normalize_dataset(data, normalizer, column_wise=False): + if normalizer == 'max01': + if column_wise: + minimum = data.min(axis=0, keepdims=True) + maximum = data.max(axis=0, keepdims=True) + else: + minimum = data.min() + maximum = data.max() + scaler = MinMax01Scaler(minimum, maximum) + data = scaler.transform(data) + print('Normalize the dataset by MinMax01 Normalization') + elif normalizer == 'max11': + if column_wise: + minimum = data.min(axis=0, keepdims=True) + maximum = data.max(axis=0, keepdims=True) + else: + minimum = data.min() + maximum = data.max() + scaler = MinMax11Scaler(minimum, maximum) + data = scaler.transform(data) + print('Normalize the dataset by MinMax11 Normalization') + elif normalizer == 'std': + if column_wise: + mean = data.mean(axis=0, keepdims=True) + std = data.std(axis=0, keepdims=True) + else: + mean = data.mean() + std = data.std() + scaler = StandardScaler(mean, std) + # data = scaler.transform(data) + print('Normalize the dataset by Standard Normalization') + elif normalizer == 'None': + scaler = NScaler() + data = scaler.transform(data) + print('Does not normalize the dataset') + elif normalizer == 'cmax': + #column min max, to be depressed + #note: axis must be the spatial dimension, please check ! + scaler = ColumnMinMaxScaler(data.min(axis=0), data.max(axis=0)) + data = scaler.transform(data) + print('Normalize the dataset by Column Min-Max Normalization') + else: + raise ValueError + return scaler + + +def split_data_by_days(data, val_days, test_days, interval=30): + """ + :param data: [B, *] + :param val_days: + :param test_days: + :param interval: interval (15, 30, 60) minutes + :return: + """ + t = int((24 * 60) / interval) + x = -t * int(test_days) + test_data = data[-t * int(test_days):] + val_data = data[-t * int(test_days + val_days): -t * int(test_days)] + train_data = data[:-t * int(test_days + val_days)] + return train_data, val_data, test_data + + +def split_data_by_ratio(data, val_ratio, test_ratio): + data_len = data.shape[0] + test_data = data[-int(data_len * test_ratio):] + val_data = data[-int(data_len * (test_ratio + val_ratio)):-int(data_len * test_ratio)] + train_data = data[:-int(data_len * (test_ratio + val_ratio))] + return train_data, val_data, test_data + + +def data_loader(X, Y, batch_size, shuffle=True, drop_last=True): + cuda = True if torch.cuda.is_available() else False + TensorFloat = torch.cuda.FloatTensor if cuda else torch.FloatTensor + X, Y = TensorFloat(X), TensorFloat(Y) + data = torch.utils.data.TensorDataset(X, Y) + dataloader = torch.utils.data.DataLoader(data, batch_size=batch_size, + shuffle=shuffle, drop_last=drop_last) + return dataloader + + +def load_traffic_data(config, client_cfgs): + print("Use Mini graph") + root = config.data.root + dataName = 'PEMSD' + root[-1] + raw_data = load_st_dataset(dataName) + + + l, n, f = raw_data.shape + + feature_list = [raw_data] + + + # numerical time_in_day + time_ind = [i % config.data.steps_per_day / config.data.steps_per_day for i in range(raw_data.shape[0])] + time_ind = np.array(time_ind) + time_in_day = np.tile(time_ind, [1, n, 1]).transpose((2, 1, 0)) + feature_list.append(time_in_day) + + # numerical day_in_week + day_in_week = [(i // config.data.steps_per_day) % config.data.days_per_week for i in range(raw_data.shape[0])] + day_in_week = np.array(day_in_week) + day_in_week = np.tile(day_in_week, [1, n, 1]).transpose((2, 1, 0)) + feature_list.append(day_in_week) + + # data = np.concatenate(feature_list, axis=-1) + single = False + x, y = add_window_horizon(raw_data, config.data.lag, config.data.horizon, single) + x_day, y_day = add_window_horizon(time_in_day, config.data.lag, config.data.horizon, single) + x_week, y_week = add_window_horizon(day_in_week, config.data.lag, config.data.horizon, single) + x, y = np.concatenate([x, x_day, x_week], axis=-1), np.concatenate([y, y_day, y_week], axis=-1) + + # split dataset by days or by ratio + if config.data.test_ratio > 1: + x_train, x_val, x_test = split_data_by_days(x, config.data.val_ratio, config.data.test_ratio) + y_train, y_val, y_test = split_data_by_days(y, config.data.val_ratio, config.data.test_ratio) + else: + x_train, x_val, x_test = split_data_by_ratio(x, config.data.val_ratio, config.data.test_ratio) + y_train, y_val, y_test = split_data_by_ratio(y, config.data.val_ratio, config.data.test_ratio) + + # normalize st data + normalizer = 'std' + scaler = normalize_dataset(x_train[..., :config.model.input_dim], normalizer, config.data.column_wise) + config.data.scaler = [float(scaler.mean), float(scaler.std)] + + x_train[..., :config.model.input_dim] = scaler.transform(x_train[..., :config.model.input_dim]) + x_val[..., :config.model.input_dim] = scaler.transform(x_val[..., :config.model.input_dim]) + x_test[..., :config.model.input_dim] = scaler.transform(x_test[..., :config.model.input_dim]) + # y_train[..., :config.model.output_dim] = scaler.transform(y_train[..., :config.model.output_dim]) + # y_val[..., :config.model.output_dim] = scaler.transform(y_val[..., :config.model.output_dim]) + # y_test[..., :config.model.output_dim] = scaler.transform(y_test[..., :config.model.output_dim]) + + # Client-side dataset splitting + node_num = config.data.num_nodes + client_num = config.federate.client_num + per_samples = node_num // client_num + data_list, cur_index = dict(), 0 + input_dim, output_dim = config.model.input_dim, config.model.output_dim + for i in range(client_num): + if cur_index + per_samples <= node_num: + # Normal slicing + sub_array_train = x_train[:, :, cur_index:cur_index + per_samples, :] + sub_array_val = x_val[:, :, cur_index:cur_index + per_samples, :] + sub_array_test = x_test[:, :, cur_index:cur_index + per_samples, :] + + sub_y_train = y_train[:, :, cur_index:cur_index + per_samples, :output_dim] + sub_y_val = y_val[:, :, cur_index:cur_index + per_samples, :output_dim] + sub_y_test = y_test[:, :, cur_index:cur_index + per_samples, :output_dim] + else: + # If there are not enough nodes to fill per_samples, pad with zero columns + sub_array_train = x_train[:, :, cur_index:cur_index + per_samples, :] + sub_array_val = x_val[:, :, cur_index:cur_index + per_samples, :] + sub_array_test = x_test[:, :, cur_index:cur_index + per_samples, :] + padding = np.zeros((x_train.shape[0], config.data.lag ,config.data.lag, per_samples - x_train.shape[1], config.model.output_dim)) + sub_array_train = np.concatenate((sub_array_train, padding), axis=2) + sub_array_val = np.concatenate((sub_array_val, padding), axis=2) + sub_array_test = np.concatenate((sub_array_test, padding), axis=2) + + sub_y_train = y_train[:, :, cur_index:cur_index + per_samples, :] + sub_y_val = y_val[:, :, cur_index:cur_index + per_samples, :] + sub_y_test = y_test[:, :, cur_index:cur_index + per_samples, :] + sub_y_train = np.concatenate((sub_y_train, padding), axis=2) + sub_y_val = np.concatenate((sub_y_val, padding), axis=2) + sub_y_test = np.concatenate((sub_y_test, padding), axis=2) + + device = 'cuda' if torch.cuda.is_available() else 'cpu' + + data_list[i + 1] = { + 'train': torch.utils.data.TensorDataset( + torch.tensor(sub_array_train, dtype=torch.float, device=device), + torch.tensor(sub_y_train, dtype=torch.float, device=device) + ), + 'val': torch.utils.data.TensorDataset( + torch.tensor(sub_array_val, dtype=torch.float, device=device), + torch.tensor(sub_y_val, dtype=torch.float, device=device) + ), + 'test': torch.utils.data.TensorDataset( + torch.tensor(sub_array_test, dtype=torch.float, device=device), + torch.tensor(sub_y_test, dtype=torch.float, device=device) + ) + } + cur_index += per_samples + config.model.num_nodes = per_samples + return data_list, config + + +if __name__ == '__main__': + a = 'data/trafficflow/PeMS04' + name = 'PEMSD' + a[-1] + raw_data = load_st_dataset(name) + pass diff --git a/scripts/trafficflow_exp_scripts/D4.yaml b/scripts/trafficflow_exp_scripts/D4.yaml index e8da4c3..fe0e8f9 100644 --- a/scripts/trafficflow_exp_scripts/D4.yaml +++ b/scripts/trafficflow_exp_scripts/D4.yaml @@ -44,6 +44,8 @@ model: cheb_order: 2 use_day: True use_week: True + use_minigraph: True + minigraph_size: 5 train: batch_or_epoch: 'epoch' local_update_steps: 1 diff --git a/scripts/trafficflow_exp_scripts/D8.yaml b/scripts/trafficflow_exp_scripts/D8.yaml index 602ede5..97614aa 100644 --- a/scripts/trafficflow_exp_scripts/D8.yaml +++ b/scripts/trafficflow_exp_scripts/D8.yaml @@ -42,6 +42,8 @@ model: cheb_order: 2 use_day: True use_week: True + use_minigraph: True + minigraph_size: 5 train: batch_or_epoch: 'epoch' local_update_steps: 1 @@ -60,7 +62,7 @@ train: grad_norm: True real_value: True criterion: - type: L1loss + type: L1Loss trainer: type: trafficflowtrainer log_dir: ./