FS-TFP/federatedscope/trafficflow/dataloader/traffic_dataloader_v2.py

228 lines
10 KiB
Python

import numpy as np
import torch
import torch.utils.data
from federatedscope.trafficflow.dataset.add_window import add_window_horizon
from federatedscope.trafficflow.dataset.normalization import (
NScaler, MinMax01Scaler, MinMax11Scaler, StandardScaler, ColumnMinMaxScaler)
from federatedscope.trafficflow.dataset.traffic_dataset import load_st_dataset
def normalize_dataset(data, normalizer, column_wise=False):
if normalizer == 'max01':
if column_wise:
minimum = data.min(axis=0, keepdims=True)
maximum = data.max(axis=0, keepdims=True)
else:
minimum = data.min()
maximum = data.max()
scaler = MinMax01Scaler(minimum, maximum)
data = scaler.transform(data)
print('Normalize the dataset by MinMax01 Normalization')
elif normalizer == 'max11':
if column_wise:
minimum = data.min(axis=0, keepdims=True)
maximum = data.max(axis=0, keepdims=True)
else:
minimum = data.min()
maximum = data.max()
scaler = MinMax11Scaler(minimum, maximum)
data = scaler.transform(data)
print('Normalize the dataset by MinMax11 Normalization')
elif normalizer == 'std':
if column_wise:
mean = data.mean(axis=0, keepdims=True)
std = data.std(axis=0, keepdims=True)
else:
mean = data.mean()
std = data.std()
scaler = StandardScaler(mean, std)
# data = scaler.transform(data)
print('Normalize the dataset by Standard Normalization')
elif normalizer == 'None':
scaler = NScaler()
data = scaler.transform(data)
print('Does not normalize the dataset')
elif normalizer == 'cmax':
#column min max, to be depressed
#note: axis must be the spatial dimension, please check !
scaler = ColumnMinMaxScaler(data.min(axis=0), data.max(axis=0))
data = scaler.transform(data)
print('Normalize the dataset by Column Min-Max Normalization')
else:
raise ValueError
return scaler
def split_data_by_days(data, val_days, test_days, interval=30):
"""
:param data: [B, *]
:param val_days:
:param test_days:
:param interval: interval (15, 30, 60) minutes
:return:
"""
t = int((24 * 60) / interval)
x = -t * int(test_days)
test_data = data[-t * int(test_days):]
val_data = data[-t * int(test_days + val_days): -t * int(test_days)]
train_data = data[:-t * int(test_days + val_days)]
return train_data, val_data, test_data
def split_data_by_ratio(data, val_ratio, test_ratio):
data_len = data.shape[0]
test_data = data[-int(data_len * test_ratio):]
val_data = data[-int(data_len * (test_ratio + val_ratio)):-int(data_len * test_ratio)]
train_data = data[:-int(data_len * (test_ratio + val_ratio))]
return train_data, val_data, test_data
def data_loader(X, Y, batch_size, shuffle=True, drop_last=True):
cuda = True if torch.cuda.is_available() else False
TensorFloat = torch.cuda.FloatTensor if cuda else torch.FloatTensor
X, Y = TensorFloat(X), TensorFloat(Y)
data = torch.utils.data.TensorDataset(X, Y)
dataloader = torch.utils.data.DataLoader(data, batch_size=batch_size,
shuffle=shuffle, drop_last=drop_last)
return dataloader
def load_traffic_data(config, client_cfgs):
root = config.data.root
dataName = 'PEMSD' + root[-1]
raw_data = load_st_dataset(dataName)
l, n, f = raw_data.shape
feature_list = [raw_data]
# numerical time_in_day
time_ind = [i % config.data.steps_per_day / config.data.steps_per_day for i in range(raw_data.shape[0])]
time_ind = np.array(time_ind)
time_in_day = np.tile(time_ind, [1, n, 1]).transpose((2, 1, 0))
feature_list.append(time_in_day)
# numerical day_in_week
day_in_week = [(i // config.data.steps_per_day) % config.data.days_per_week for i in range(raw_data.shape[0])]
day_in_week = np.array(day_in_week)
day_in_week = np.tile(day_in_week, [1, n, 1]).transpose((2, 1, 0))
feature_list.append(day_in_week)
# data = np.concatenate(feature_list, axis=-1)
single = False
x, y = add_window_horizon(raw_data, config.data.lag, config.data.horizon, single)
x_day, y_day = add_window_horizon(time_in_day, config.data.lag, config.data.horizon, single)
x_week, y_week = add_window_horizon(day_in_week, config.data.lag, config.data.horizon, single)
x, y = np.concatenate([x, x_day, x_week], axis=-1), np.concatenate([y, y_day, y_week], axis=-1)
# split dataset by days or by ratio
if config.data.test_ratio > 1:
x_train, x_val, x_test = split_data_by_days(x, config.data.val_ratio, config.data.test_ratio)
y_train, y_val, y_test = split_data_by_days(y, config.data.val_ratio, config.data.test_ratio)
else:
x_train, x_val, x_test = split_data_by_ratio(x, config.data.val_ratio, config.data.test_ratio)
y_train, y_val, y_test = split_data_by_ratio(y, config.data.val_ratio, config.data.test_ratio)
# normalize st data
normalizer = 'std'
scaler = normalize_dataset(x_train[..., :config.model.input_dim], normalizer, config.data.column_wise)
config.data.scaler = [float(scaler.mean), float(scaler.std)]
x_train[..., :config.model.input_dim] = scaler.transform(x_train[..., :config.model.input_dim])
x_val[..., :config.model.input_dim] = scaler.transform(x_val[..., :config.model.input_dim])
x_test[..., :config.model.input_dim] = scaler.transform(x_test[..., :config.model.input_dim])
# Client-side dataset splitting
node_num = config.data.num_nodes
client_num = config.federate.client_num
per_samples = node_num // client_num
data_list, cur_index = dict(), 0
input_dim, output_dim = config.model.input_dim, config.model.output_dim
for i in range(client_num):
if cur_index + per_samples <= node_num:
# Normal slicing
sub_array_train = x_train[:, :, cur_index:cur_index + per_samples, :]
sub_array_val = x_val[:, :, cur_index:cur_index + per_samples, :]
sub_array_test = x_test[:, :, cur_index:cur_index + per_samples, :]
sub_y_train = y_train[:, :, cur_index:cur_index + per_samples, :output_dim]
sub_y_val = y_val[:, :, cur_index:cur_index + per_samples, :output_dim]
sub_y_test = y_test[:, :, cur_index:cur_index + per_samples, :output_dim]
else:
# If there are not enough nodes to fill per_samples, pad with zero columns
sub_array_train = x_train[:, :, cur_index:cur_index + per_samples, :]
sub_array_val = x_val[:, :, cur_index:cur_index + per_samples, :]
sub_array_test = x_test[:, :, cur_index:cur_index + per_samples, :]
padding = np.zeros((x_train.shape[0], config.data.lag ,config.data.lag, per_samples - x_train.shape[1], config.model.output_dim))
sub_array_train = np.concatenate((sub_array_train, padding), axis=2)
sub_array_val = np.concatenate((sub_array_val, padding), axis=2)
sub_array_test = np.concatenate((sub_array_test, padding), axis=2)
sub_y_train = y_train[:, :, cur_index:cur_index + per_samples, :]
sub_y_val = y_val[:, :, cur_index:cur_index + per_samples, :]
sub_y_test = y_test[:, :, cur_index:cur_index + per_samples, :]
sub_y_train = np.concatenate((sub_y_train, padding), axis=2)
sub_y_val = np.concatenate((sub_y_val, padding), axis=2)
sub_y_test = np.concatenate((sub_y_test, padding), axis=2)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
minigraph_size = config.model.minigraph_size
data_list[i + 1] = {
'train': torch.utils.data.TensorDataset(
torch.tensor(split_into_mini_graphs(sub_array_train, minigraph_size), dtype=torch.float, device=device),
torch.tensor(split_into_mini_graphs(sub_y_train, minigraph_size), dtype=torch.float, device=device)
),
'val': torch.utils.data.TensorDataset(
torch.tensor(split_into_mini_graphs(sub_array_val, minigraph_size), dtype=torch.float, device=device),
torch.tensor(split_into_mini_graphs(sub_y_val, minigraph_size), dtype=torch.float, device=device)
),
'test': torch.utils.data.TensorDataset(
torch.tensor(split_into_mini_graphs(sub_array_test, minigraph_size), dtype=torch.float, device=device),
torch.tensor(split_into_mini_graphs(sub_y_test, minigraph_size), dtype=torch.float, device=device)
)
}
cur_index += per_samples
config.model.num_nodes = per_samples
return data_list, config
def split_into_mini_graphs(tensor, graph_size, dummy_value=0):
"""
Splits a tensor into mini-graphs of specified size. Pads the last mini-graph with dummy nodes if necessary.
Args:
tensor (np.ndarray): Input tensor with shape (timestep, horizon, node_num, dim).
graph_size (int): The size of each mini-graph.
dummy_value (float, optional): The value to use for dummy nodes. Default is 0.
Returns:
np.ndarray: Output tensor with shape (timestep, horizon, graph_num, graph_size, dim).
"""
timestep, horizon, node_num, dim = tensor.shape
# Calculate the number of mini-graphs
graph_num = (node_num + graph_size - 1) // graph_size # Round up division
# Initialize output tensor with dummy values
output = np.full((timestep, horizon, graph_num, graph_size, dim), dummy_value, dtype=tensor.dtype)
# Fill in the real data
for i in range(graph_num):
start_idx = i * graph_size
end_idx = min(start_idx + graph_size, node_num) # Ensure we don't exceed the node number
slice_size = end_idx - start_idx
# Assign the data to the corresponding mini-graph
output[:, :, i, :slice_size, :] = tensor[:, :, start_idx:end_idx, :]
return output
if __name__ == '__main__':
a = 'data/trafficflow/PeMS04'
name = 'PEMSD' + a[-1]
raw_data = load_st_dataset(name)
pass