import numpy as np import torch import torch.utils.data from federatedscope.trafficflow.dataset.add_window import add_window_horizon from federatedscope.trafficflow.dataset.normalization import ( NScaler, MinMax01Scaler, MinMax11Scaler, StandardScaler, ColumnMinMaxScaler) from federatedscope.trafficflow.dataset.traffic_dataset import load_st_dataset def normalize_dataset(data, normalizer, column_wise=False): if normalizer == 'max01': if column_wise: minimum = data.min(axis=0, keepdims=True) maximum = data.max(axis=0, keepdims=True) else: minimum = data.min() maximum = data.max() scaler = MinMax01Scaler(minimum, maximum) data = scaler.transform(data) print('Normalize the dataset by MinMax01 Normalization') elif normalizer == 'max11': if column_wise: minimum = data.min(axis=0, keepdims=True) maximum = data.max(axis=0, keepdims=True) else: minimum = data.min() maximum = data.max() scaler = MinMax11Scaler(minimum, maximum) data = scaler.transform(data) print('Normalize the dataset by MinMax11 Normalization') elif normalizer == 'std': if column_wise: mean = data.mean(axis=0, keepdims=True) std = data.std(axis=0, keepdims=True) else: mean = data.mean() std = data.std() scaler = StandardScaler(mean, std) # data = scaler.transform(data) print('Normalize the dataset by Standard Normalization') elif normalizer == 'None': scaler = NScaler() data = scaler.transform(data) print('Does not normalize the dataset') elif normalizer == 'cmax': #column min max, to be depressed #note: axis must be the spatial dimension, please check ! scaler = ColumnMinMaxScaler(data.min(axis=0), data.max(axis=0)) data = scaler.transform(data) print('Normalize the dataset by Column Min-Max Normalization') else: raise ValueError return scaler def split_data_by_days(data, val_days, test_days, interval=30): """ :param data: [B, *] :param val_days: :param test_days: :param interval: interval (15, 30, 60) minutes :return: """ t = int((24 * 60) / interval) x = -t * int(test_days) test_data = data[-t * int(test_days):] val_data = data[-t * int(test_days + val_days): -t * int(test_days)] train_data = data[:-t * int(test_days + val_days)] return train_data, val_data, test_data def split_data_by_ratio(data, val_ratio, test_ratio): data_len = data.shape[0] test_data = data[-int(data_len * test_ratio):] val_data = data[-int(data_len * (test_ratio + val_ratio)):-int(data_len * test_ratio)] train_data = data[:-int(data_len * (test_ratio + val_ratio))] return train_data, val_data, test_data def data_loader(X, Y, batch_size, shuffle=True, drop_last=True): cuda = True if torch.cuda.is_available() else False TensorFloat = torch.cuda.FloatTensor if cuda else torch.FloatTensor X, Y = TensorFloat(X), TensorFloat(Y) data = torch.utils.data.TensorDataset(X, Y) dataloader = torch.utils.data.DataLoader(data, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last) return dataloader def load_traffic_data(config, client_cfgs): root = config.data.root dataName = 'PEMSD' + root[-1] raw_data = load_st_dataset(dataName) l, n, f = raw_data.shape feature_list = [raw_data] # numerical time_in_day time_ind = [i % config.data.steps_per_day / config.data.steps_per_day for i in range(raw_data.shape[0])] time_ind = np.array(time_ind) time_in_day = np.tile(time_ind, [1, n, 1]).transpose((2, 1, 0)) feature_list.append(time_in_day) # numerical day_in_week day_in_week = [(i // config.data.steps_per_day) % config.data.days_per_week for i in range(raw_data.shape[0])] day_in_week = np.array(day_in_week) day_in_week = np.tile(day_in_week, [1, n, 1]).transpose((2, 1, 0)) feature_list.append(day_in_week) # data = np.concatenate(feature_list, axis=-1) single = False x, y = add_window_horizon(raw_data, config.data.lag, config.data.horizon, single) x_day, y_day = add_window_horizon(time_in_day, config.data.lag, config.data.horizon, single) x_week, y_week = add_window_horizon(day_in_week, config.data.lag, config.data.horizon, single) x, y = np.concatenate([x, x_day, x_week], axis=-1), np.concatenate([y, y_day, y_week], axis=-1) # split dataset by days or by ratio if config.data.test_ratio > 1: x_train, x_val, x_test = split_data_by_days(x, config.data.val_ratio, config.data.test_ratio) y_train, y_val, y_test = split_data_by_days(y, config.data.val_ratio, config.data.test_ratio) else: x_train, x_val, x_test = split_data_by_ratio(x, config.data.val_ratio, config.data.test_ratio) y_train, y_val, y_test = split_data_by_ratio(y, config.data.val_ratio, config.data.test_ratio) # normalize st data normalizer = 'std' scaler = normalize_dataset(x_train[..., :config.model.input_dim], normalizer, config.data.column_wise) config.data.scaler = [float(scaler.mean), float(scaler.std)] x_train[..., :config.model.input_dim] = scaler.transform(x_train[..., :config.model.input_dim]) x_val[..., :config.model.input_dim] = scaler.transform(x_val[..., :config.model.input_dim]) x_test[..., :config.model.input_dim] = scaler.transform(x_test[..., :config.model.input_dim]) # y_train[..., :config.model.output_dim] = scaler.transform(y_train[..., :config.model.output_dim]) # y_val[..., :config.model.output_dim] = scaler.transform(y_val[..., :config.model.output_dim]) # y_test[..., :config.model.output_dim] = scaler.transform(y_test[..., :config.model.output_dim]) # Client-side dataset splitting node_num = config.data.num_nodes client_num = config.federate.client_num per_samples = node_num // client_num data_list, cur_index = dict(), 0 input_dim, output_dim = config.model.input_dim, config.model.output_dim for i in range(client_num): if cur_index + per_samples <= node_num: # Normal slicing sub_array_train = x_train[:, :, cur_index:cur_index + per_samples, :] sub_array_val = x_val[:, :, cur_index:cur_index + per_samples, :] sub_array_test = x_test[:, :, cur_index:cur_index + per_samples, :] sub_y_train = y_train[:, :, cur_index:cur_index + per_samples, :output_dim] sub_y_val = y_val[:, :, cur_index:cur_index + per_samples, :output_dim] sub_y_test = y_test[:, :, cur_index:cur_index + per_samples, :output_dim] else: # If there are not enough nodes to fill per_samples, pad with zero columns sub_array_train = x_train[:, :, cur_index:cur_index + per_samples, :] sub_array_val = x_val[:, :, cur_index:cur_index + per_samples, :] sub_array_test = x_test[:, :, cur_index:cur_index + per_samples, :] padding = np.zeros((x_train.shape[0], config.data.lag ,config.data.lag, per_samples - x_train.shape[1], config.model.output_dim)) sub_array_train = np.concatenate((sub_array_train, padding), axis=2) sub_array_val = np.concatenate((sub_array_val, padding), axis=2) sub_array_test = np.concatenate((sub_array_test, padding), axis=2) sub_y_train = y_train[:, :, cur_index:cur_index + per_samples, :] sub_y_val = y_val[:, :, cur_index:cur_index + per_samples, :] sub_y_test = y_test[:, :, cur_index:cur_index + per_samples, :] sub_y_train = np.concatenate((sub_y_train, padding), axis=2) sub_y_val = np.concatenate((sub_y_val, padding), axis=2) sub_y_test = np.concatenate((sub_y_test, padding), axis=2) device = 'cuda' if torch.cuda.is_available() else 'cpu' data_list[i + 1] = { 'train': torch.utils.data.TensorDataset( torch.tensor(sub_array_train, dtype=torch.float, device=device), torch.tensor(sub_y_train, dtype=torch.float, device=device) ), 'val': torch.utils.data.TensorDataset( torch.tensor(sub_array_val, dtype=torch.float, device=device), torch.tensor(sub_y_val, dtype=torch.float, device=device) ), 'test': torch.utils.data.TensorDataset( torch.tensor(sub_array_test, dtype=torch.float, device=device), torch.tensor(sub_y_test, dtype=torch.float, device=device) ) } cur_index += per_samples config.model.num_nodes = per_samples return data_list, config if __name__ == '__main__': a = 'data/trafficflow/PeMS04' name = 'PEMSD' + a[-1] raw_data = load_st_dataset(name) pass