FS-TFP/federatedscope/trafficflow/dataloader/traffic_dataloader.py

197 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
import torch
import torch.utils.data
from federatedscope.trafficflow.dataset.add_window import add_window_horizon
from federatedscope.trafficflow.dataset.normalization import (
NScaler, MinMax01Scaler, MinMax11Scaler, StandardScaler, ColumnMinMaxScaler)
from federatedscope.trafficflow.dataset.traffic_dataset import load_st_dataset
def normalize_dataset(data, normalizer, column_wise=False):
if normalizer == 'max01':
if column_wise:
minimum = data.min(axis=0, keepdims=True)
maximum = data.max(axis=0, keepdims=True)
else:
minimum = data.min()
maximum = data.max()
scaler = MinMax01Scaler(minimum, maximum)
data = scaler.transform(data)
print('Normalize the dataset by MinMax01 Normalization')
elif normalizer == 'max11':
if column_wise:
minimum = data.min(axis=0, keepdims=True)
maximum = data.max(axis=0, keepdims=True)
else:
minimum = data.min()
maximum = data.max()
scaler = MinMax11Scaler(minimum, maximum)
data = scaler.transform(data)
print('Normalize the dataset by MinMax11 Normalization')
elif normalizer == 'std':
if column_wise:
mean = data.mean(axis=0, keepdims=True)
std = data.std(axis=0, keepdims=True)
else:
mean = data.mean()
std = data.std()
scaler = StandardScaler(mean, std)
# data = scaler.transform(data)
print('Normalize the dataset by Standard Normalization')
elif normalizer == 'None':
scaler = NScaler()
data = scaler.transform(data)
print('Does not normalize the dataset')
elif normalizer == 'cmax':
#column min max, to be depressed
#note: axis must be the spatial dimension, please check !
scaler = ColumnMinMaxScaler(data.min(axis=0), data.max(axis=0))
data = scaler.transform(data)
print('Normalize the dataset by Column Min-Max Normalization')
else:
raise ValueError
return scaler
def split_data_by_days(data, val_days, test_days, interval=30):
"""
:param data: [B, *]
:param val_days:
:param test_days:
:param interval: interval (15, 30, 60) minutes
:return:
"""
t = int((24 * 60) / interval)
x = -t * int(test_days)
test_data = data[-t * int(test_days):]
val_data = data[-t * int(test_days + val_days): -t * int(test_days)]
train_data = data[:-t * int(test_days + val_days)]
return train_data, val_data, test_data
def split_data_by_ratio(data, val_ratio, test_ratio):
data_len = data.shape[0]
test_data = data[-int(data_len * test_ratio):]
val_data = data[-int(data_len * (test_ratio + val_ratio)):-int(data_len * test_ratio)]
train_data = data[:-int(data_len * (test_ratio + val_ratio))]
return train_data, val_data, test_data
def data_loader(X, Y, batch_size, shuffle=True, drop_last=True):
cuda = True if torch.cuda.is_available() else False
TensorFloat = torch.cuda.FloatTensor if cuda else torch.FloatTensor
X, Y = TensorFloat(X), TensorFloat(Y)
data = torch.utils.data.TensorDataset(X, Y)
dataloader = torch.utils.data.DataLoader(data, batch_size=batch_size,
shuffle=shuffle, drop_last=drop_last)
return dataloader
def load_traffic_data(config, client_cfgs):
root = config.data.root
dataName = 'PEMSD' + root[-1]
raw_data = load_st_dataset(dataName)
l, n, f = raw_data.shape
feature_list = [raw_data]
# numerical time_in_day
time_ind = [i % config.data.steps_per_day / config.data.steps_per_day for i in range(raw_data.shape[0])]
time_ind = np.array(time_ind)
time_in_day = np.tile(time_ind, [1, n, 1]).transpose((2, 1, 0))
feature_list.append(time_in_day)
# numerical day_in_week
day_in_week = [(i // config.data.steps_per_day) % config.data.days_per_week for i in range(raw_data.shape[0])]
day_in_week = np.array(day_in_week)
day_in_week = np.tile(day_in_week, [1, n, 1]).transpose((2, 1, 0))
feature_list.append(day_in_week)
# data = np.concatenate(feature_list, axis=-1)
single = False
x, y = add_window_horizon(raw_data, config.data.lag, config.data.horizon, single)
x_day, y_day = add_window_horizon(time_in_day, config.data.lag, config.data.horizon, single)
x_week, y_week = add_window_horizon(day_in_week, config.data.lag, config.data.horizon, single)
x, y = np.concatenate([x, x_day, x_week], axis=-1), np.concatenate([y, y_day, y_week], axis=-1)
# split dataset by days or by ratio
if config.data.test_ratio > 1:
x_train, x_val, x_test = split_data_by_days(x, config.data.val_ratio, config.data.test_ratio)
y_train, y_val, y_test = split_data_by_days(y, config.data.val_ratio, config.data.test_ratio)
else:
x_train, x_val, x_test = split_data_by_ratio(x, config.data.val_ratio, config.data.test_ratio)
y_train, y_val, y_test = split_data_by_ratio(y, config.data.val_ratio, config.data.test_ratio)
# normalize st data
normalizer = 'std'
scaler = normalize_dataset(x_train[..., :config.model.input_dim], normalizer, config.data.column_wise)
config.data.scaler = [float(scaler.mean), float(scaler.std)]
x_train[..., :config.model.input_dim] = scaler.transform(x_train[..., :config.model.input_dim])
x_val[..., :config.model.input_dim] = scaler.transform(x_val[..., :config.model.input_dim])
x_test[..., :config.model.input_dim] = scaler.transform(x_test[..., :config.model.input_dim])
# y_train[..., :config.model.output_dim] = scaler.transform(y_train[..., :config.model.output_dim])
# y_val[..., :config.model.output_dim] = scaler.transform(y_val[..., :config.model.output_dim])
# y_test[..., :config.model.output_dim] = scaler.transform(y_test[..., :config.model.output_dim])
# 客户端分割数据集
node_num = config.data.num_nodes
client_num = config.federate.client_num
per_samples = node_num // client_num
data_list, cur_index = dict(), 0
input_dim, output_dim = config.model.input_dim, config.model.output_dim
for i in range(client_num):
if cur_index + per_samples <= node_num:
# 正常截取
sub_array_train = x_train[:, :, cur_index:cur_index + per_samples, :]
sub_array_val = x_val[:, :, cur_index:cur_index + per_samples, :]
sub_array_test = x_test[:, :, cur_index:cur_index + per_samples, :]
sub_y_train = y_train[:, :, cur_index:cur_index + per_samples, :output_dim]
sub_y_val = y_val[:, :, cur_index:cur_index + per_samples, :output_dim]
sub_y_test = y_test[:, :, cur_index:cur_index + per_samples, :output_dim]
else:
# 不足一个per_samples补0列
sub_array_train = x_train[:, :, cur_index:cur_index + per_samples, :]
sub_array_val = x_val[:, :, cur_index:cur_index + per_samples, :]
sub_array_test = x_test[:, :, cur_index:cur_index + per_samples, :]
padding = np.zeros((x_train.shape[0], config.data.lag ,config.data.lag, per_samples - x_train.shape[1], config.model.output_dim))
sub_array_train = np.concatenate((sub_array_train, padding), axis=2)
sub_array_val = np.concatenate((sub_array_val, padding), axis=2)
sub_array_test = np.concatenate((sub_array_test, padding), axis=2)
sub_y_train = y_train[:, :, cur_index:cur_index + per_samples, :]
sub_y_val = y_val[:, :, cur_index:cur_index + per_samples, :]
sub_y_test = y_test[:, :, cur_index:cur_index + per_samples, :]
sub_y_train = np.concatenate((sub_y_train, padding), axis=2)
sub_y_val = np.concatenate((sub_y_val, padding), axis=2)
sub_y_test = np.concatenate((sub_y_test, padding), axis=2)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
data_list[i + 1] = {
'train': torch.utils.data.TensorDataset(
torch.tensor(sub_array_train, dtype=torch.float, device=device),
torch.tensor(sub_y_train, dtype=torch.float, device=device)
),
'val': torch.utils.data.TensorDataset(
torch.tensor(sub_array_val, dtype=torch.float, device=device),
torch.tensor(sub_y_val, dtype=torch.float, device=device)
),
'test': torch.utils.data.TensorDataset(
torch.tensor(sub_array_test, dtype=torch.float, device=device),
torch.tensor(sub_y_test, dtype=torch.float, device=device)
)
}
cur_index += per_samples
config.model.num_nodes = per_samples
return data_list, config
if __name__ == '__main__':
a = 'data/trafficflow/PeMS04'
name = 'PEMSD' + a[-1]
raw_data = load_st_dataset(name)
pass