add minigraph dataloader
This commit is contained in:
parent
7836e2e0c4
commit
578ff48c71
|
|
@ -30,6 +30,8 @@ def extend_trafficflow_cfg(cfg):
|
||||||
cfg.model.cheb_order = 1 # A tuple, e.g., (in_channel, h, w)
|
cfg.model.cheb_order = 1 # A tuple, e.g., (in_channel, h, w)
|
||||||
cfg.model.use_day = True
|
cfg.model.use_day = True
|
||||||
cfg.model.use_week = True
|
cfg.model.use_week = True
|
||||||
|
cfg.model.minigraph_size = 5
|
||||||
|
cfg.model.use_minigraph = False
|
||||||
|
|
||||||
# ---------------------------------------------------------------------- #
|
# ---------------------------------------------------------------------- #
|
||||||
# Criterion related options
|
# Criterion related options
|
||||||
|
|
|
||||||
|
|
@ -107,8 +107,13 @@ def load_dataset(config, client_cfgs=None):
|
||||||
modified_config = config
|
modified_config = config
|
||||||
elif config.data.type.lower() in [
|
elif config.data.type.lower() in [
|
||||||
'trafficflow']:
|
'trafficflow']:
|
||||||
from federatedscope.trafficflow.dataloader.traffic_dataloader import load_traffic_data
|
if config.model.use_minigraph is False:
|
||||||
dataset, modified_config = load_traffic_data(config, client_cfgs)
|
from federatedscope.trafficflow.dataloader.traffic_dataloader import load_traffic_data
|
||||||
|
dataset, modified_config = load_traffic_data(config, client_cfgs)
|
||||||
|
else:
|
||||||
|
from federatedscope.trafficflow.dataloader.traffic_dataloader_v2 import load_traffic_data
|
||||||
|
dataset, modified_config = load_traffic_data(config, client_cfgs)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise ValueError('Dataset {} not found.'.format(config.data.type))
|
raise ValueError('Dataset {} not found.'.format(config.data.type))
|
||||||
return dataset, modified_config
|
return dataset, modified_config
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,197 @@
|
||||||
|
import numpy as np
|
||||||
|
import torch
|
||||||
|
import torch.utils.data
|
||||||
|
from federatedscope.trafficflow.dataset.add_window import add_window_horizon
|
||||||
|
from federatedscope.trafficflow.dataset.normalization import (
|
||||||
|
NScaler, MinMax01Scaler, MinMax11Scaler, StandardScaler, ColumnMinMaxScaler)
|
||||||
|
from federatedscope.trafficflow.dataset.traffic_dataset import load_st_dataset
|
||||||
|
def normalize_dataset(data, normalizer, column_wise=False):
|
||||||
|
if normalizer == 'max01':
|
||||||
|
if column_wise:
|
||||||
|
minimum = data.min(axis=0, keepdims=True)
|
||||||
|
maximum = data.max(axis=0, keepdims=True)
|
||||||
|
else:
|
||||||
|
minimum = data.min()
|
||||||
|
maximum = data.max()
|
||||||
|
scaler = MinMax01Scaler(minimum, maximum)
|
||||||
|
data = scaler.transform(data)
|
||||||
|
print('Normalize the dataset by MinMax01 Normalization')
|
||||||
|
elif normalizer == 'max11':
|
||||||
|
if column_wise:
|
||||||
|
minimum = data.min(axis=0, keepdims=True)
|
||||||
|
maximum = data.max(axis=0, keepdims=True)
|
||||||
|
else:
|
||||||
|
minimum = data.min()
|
||||||
|
maximum = data.max()
|
||||||
|
scaler = MinMax11Scaler(minimum, maximum)
|
||||||
|
data = scaler.transform(data)
|
||||||
|
print('Normalize the dataset by MinMax11 Normalization')
|
||||||
|
elif normalizer == 'std':
|
||||||
|
if column_wise:
|
||||||
|
mean = data.mean(axis=0, keepdims=True)
|
||||||
|
std = data.std(axis=0, keepdims=True)
|
||||||
|
else:
|
||||||
|
mean = data.mean()
|
||||||
|
std = data.std()
|
||||||
|
scaler = StandardScaler(mean, std)
|
||||||
|
# data = scaler.transform(data)
|
||||||
|
print('Normalize the dataset by Standard Normalization')
|
||||||
|
elif normalizer == 'None':
|
||||||
|
scaler = NScaler()
|
||||||
|
data = scaler.transform(data)
|
||||||
|
print('Does not normalize the dataset')
|
||||||
|
elif normalizer == 'cmax':
|
||||||
|
#column min max, to be depressed
|
||||||
|
#note: axis must be the spatial dimension, please check !
|
||||||
|
scaler = ColumnMinMaxScaler(data.min(axis=0), data.max(axis=0))
|
||||||
|
data = scaler.transform(data)
|
||||||
|
print('Normalize the dataset by Column Min-Max Normalization')
|
||||||
|
else:
|
||||||
|
raise ValueError
|
||||||
|
return scaler
|
||||||
|
|
||||||
|
|
||||||
|
def split_data_by_days(data, val_days, test_days, interval=30):
|
||||||
|
"""
|
||||||
|
:param data: [B, *]
|
||||||
|
:param val_days:
|
||||||
|
:param test_days:
|
||||||
|
:param interval: interval (15, 30, 60) minutes
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
t = int((24 * 60) / interval)
|
||||||
|
x = -t * int(test_days)
|
||||||
|
test_data = data[-t * int(test_days):]
|
||||||
|
val_data = data[-t * int(test_days + val_days): -t * int(test_days)]
|
||||||
|
train_data = data[:-t * int(test_days + val_days)]
|
||||||
|
return train_data, val_data, test_data
|
||||||
|
|
||||||
|
|
||||||
|
def split_data_by_ratio(data, val_ratio, test_ratio):
|
||||||
|
data_len = data.shape[0]
|
||||||
|
test_data = data[-int(data_len * test_ratio):]
|
||||||
|
val_data = data[-int(data_len * (test_ratio + val_ratio)):-int(data_len * test_ratio)]
|
||||||
|
train_data = data[:-int(data_len * (test_ratio + val_ratio))]
|
||||||
|
return train_data, val_data, test_data
|
||||||
|
|
||||||
|
|
||||||
|
def data_loader(X, Y, batch_size, shuffle=True, drop_last=True):
|
||||||
|
cuda = True if torch.cuda.is_available() else False
|
||||||
|
TensorFloat = torch.cuda.FloatTensor if cuda else torch.FloatTensor
|
||||||
|
X, Y = TensorFloat(X), TensorFloat(Y)
|
||||||
|
data = torch.utils.data.TensorDataset(X, Y)
|
||||||
|
dataloader = torch.utils.data.DataLoader(data, batch_size=batch_size,
|
||||||
|
shuffle=shuffle, drop_last=drop_last)
|
||||||
|
return dataloader
|
||||||
|
|
||||||
|
|
||||||
|
def load_traffic_data(config, client_cfgs):
|
||||||
|
print("Use Mini graph")
|
||||||
|
root = config.data.root
|
||||||
|
dataName = 'PEMSD' + root[-1]
|
||||||
|
raw_data = load_st_dataset(dataName)
|
||||||
|
|
||||||
|
|
||||||
|
l, n, f = raw_data.shape
|
||||||
|
|
||||||
|
feature_list = [raw_data]
|
||||||
|
|
||||||
|
|
||||||
|
# numerical time_in_day
|
||||||
|
time_ind = [i % config.data.steps_per_day / config.data.steps_per_day for i in range(raw_data.shape[0])]
|
||||||
|
time_ind = np.array(time_ind)
|
||||||
|
time_in_day = np.tile(time_ind, [1, n, 1]).transpose((2, 1, 0))
|
||||||
|
feature_list.append(time_in_day)
|
||||||
|
|
||||||
|
# numerical day_in_week
|
||||||
|
day_in_week = [(i // config.data.steps_per_day) % config.data.days_per_week for i in range(raw_data.shape[0])]
|
||||||
|
day_in_week = np.array(day_in_week)
|
||||||
|
day_in_week = np.tile(day_in_week, [1, n, 1]).transpose((2, 1, 0))
|
||||||
|
feature_list.append(day_in_week)
|
||||||
|
|
||||||
|
# data = np.concatenate(feature_list, axis=-1)
|
||||||
|
single = False
|
||||||
|
x, y = add_window_horizon(raw_data, config.data.lag, config.data.horizon, single)
|
||||||
|
x_day, y_day = add_window_horizon(time_in_day, config.data.lag, config.data.horizon, single)
|
||||||
|
x_week, y_week = add_window_horizon(day_in_week, config.data.lag, config.data.horizon, single)
|
||||||
|
x, y = np.concatenate([x, x_day, x_week], axis=-1), np.concatenate([y, y_day, y_week], axis=-1)
|
||||||
|
|
||||||
|
# split dataset by days or by ratio
|
||||||
|
if config.data.test_ratio > 1:
|
||||||
|
x_train, x_val, x_test = split_data_by_days(x, config.data.val_ratio, config.data.test_ratio)
|
||||||
|
y_train, y_val, y_test = split_data_by_days(y, config.data.val_ratio, config.data.test_ratio)
|
||||||
|
else:
|
||||||
|
x_train, x_val, x_test = split_data_by_ratio(x, config.data.val_ratio, config.data.test_ratio)
|
||||||
|
y_train, y_val, y_test = split_data_by_ratio(y, config.data.val_ratio, config.data.test_ratio)
|
||||||
|
|
||||||
|
# normalize st data
|
||||||
|
normalizer = 'std'
|
||||||
|
scaler = normalize_dataset(x_train[..., :config.model.input_dim], normalizer, config.data.column_wise)
|
||||||
|
config.data.scaler = [float(scaler.mean), float(scaler.std)]
|
||||||
|
|
||||||
|
x_train[..., :config.model.input_dim] = scaler.transform(x_train[..., :config.model.input_dim])
|
||||||
|
x_val[..., :config.model.input_dim] = scaler.transform(x_val[..., :config.model.input_dim])
|
||||||
|
x_test[..., :config.model.input_dim] = scaler.transform(x_test[..., :config.model.input_dim])
|
||||||
|
# y_train[..., :config.model.output_dim] = scaler.transform(y_train[..., :config.model.output_dim])
|
||||||
|
# y_val[..., :config.model.output_dim] = scaler.transform(y_val[..., :config.model.output_dim])
|
||||||
|
# y_test[..., :config.model.output_dim] = scaler.transform(y_test[..., :config.model.output_dim])
|
||||||
|
|
||||||
|
# Client-side dataset splitting
|
||||||
|
node_num = config.data.num_nodes
|
||||||
|
client_num = config.federate.client_num
|
||||||
|
per_samples = node_num // client_num
|
||||||
|
data_list, cur_index = dict(), 0
|
||||||
|
input_dim, output_dim = config.model.input_dim, config.model.output_dim
|
||||||
|
for i in range(client_num):
|
||||||
|
if cur_index + per_samples <= node_num:
|
||||||
|
# Normal slicing
|
||||||
|
sub_array_train = x_train[:, :, cur_index:cur_index + per_samples, :]
|
||||||
|
sub_array_val = x_val[:, :, cur_index:cur_index + per_samples, :]
|
||||||
|
sub_array_test = x_test[:, :, cur_index:cur_index + per_samples, :]
|
||||||
|
|
||||||
|
sub_y_train = y_train[:, :, cur_index:cur_index + per_samples, :output_dim]
|
||||||
|
sub_y_val = y_val[:, :, cur_index:cur_index + per_samples, :output_dim]
|
||||||
|
sub_y_test = y_test[:, :, cur_index:cur_index + per_samples, :output_dim]
|
||||||
|
else:
|
||||||
|
# If there are not enough nodes to fill per_samples, pad with zero columns
|
||||||
|
sub_array_train = x_train[:, :, cur_index:cur_index + per_samples, :]
|
||||||
|
sub_array_val = x_val[:, :, cur_index:cur_index + per_samples, :]
|
||||||
|
sub_array_test = x_test[:, :, cur_index:cur_index + per_samples, :]
|
||||||
|
padding = np.zeros((x_train.shape[0], config.data.lag ,config.data.lag, per_samples - x_train.shape[1], config.model.output_dim))
|
||||||
|
sub_array_train = np.concatenate((sub_array_train, padding), axis=2)
|
||||||
|
sub_array_val = np.concatenate((sub_array_val, padding), axis=2)
|
||||||
|
sub_array_test = np.concatenate((sub_array_test, padding), axis=2)
|
||||||
|
|
||||||
|
sub_y_train = y_train[:, :, cur_index:cur_index + per_samples, :]
|
||||||
|
sub_y_val = y_val[:, :, cur_index:cur_index + per_samples, :]
|
||||||
|
sub_y_test = y_test[:, :, cur_index:cur_index + per_samples, :]
|
||||||
|
sub_y_train = np.concatenate((sub_y_train, padding), axis=2)
|
||||||
|
sub_y_val = np.concatenate((sub_y_val, padding), axis=2)
|
||||||
|
sub_y_test = np.concatenate((sub_y_test, padding), axis=2)
|
||||||
|
|
||||||
|
device = 'cuda' if torch.cuda.is_available() else 'cpu'
|
||||||
|
|
||||||
|
data_list[i + 1] = {
|
||||||
|
'train': torch.utils.data.TensorDataset(
|
||||||
|
torch.tensor(sub_array_train, dtype=torch.float, device=device),
|
||||||
|
torch.tensor(sub_y_train, dtype=torch.float, device=device)
|
||||||
|
),
|
||||||
|
'val': torch.utils.data.TensorDataset(
|
||||||
|
torch.tensor(sub_array_val, dtype=torch.float, device=device),
|
||||||
|
torch.tensor(sub_y_val, dtype=torch.float, device=device)
|
||||||
|
),
|
||||||
|
'test': torch.utils.data.TensorDataset(
|
||||||
|
torch.tensor(sub_array_test, dtype=torch.float, device=device),
|
||||||
|
torch.tensor(sub_y_test, dtype=torch.float, device=device)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
cur_index += per_samples
|
||||||
|
config.model.num_nodes = per_samples
|
||||||
|
return data_list, config
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
a = 'data/trafficflow/PeMS04'
|
||||||
|
name = 'PEMSD' + a[-1]
|
||||||
|
raw_data = load_st_dataset(name)
|
||||||
|
pass
|
||||||
|
|
@ -44,6 +44,8 @@ model:
|
||||||
cheb_order: 2
|
cheb_order: 2
|
||||||
use_day: True
|
use_day: True
|
||||||
use_week: True
|
use_week: True
|
||||||
|
use_minigraph: True
|
||||||
|
minigraph_size: 5
|
||||||
train:
|
train:
|
||||||
batch_or_epoch: 'epoch'
|
batch_or_epoch: 'epoch'
|
||||||
local_update_steps: 1
|
local_update_steps: 1
|
||||||
|
|
|
||||||
|
|
@ -42,6 +42,8 @@ model:
|
||||||
cheb_order: 2
|
cheb_order: 2
|
||||||
use_day: True
|
use_day: True
|
||||||
use_week: True
|
use_week: True
|
||||||
|
use_minigraph: True
|
||||||
|
minigraph_size: 5
|
||||||
train:
|
train:
|
||||||
batch_or_epoch: 'epoch'
|
batch_or_epoch: 'epoch'
|
||||||
local_update_steps: 1
|
local_update_steps: 1
|
||||||
|
|
@ -60,7 +62,7 @@ train:
|
||||||
grad_norm: True
|
grad_norm: True
|
||||||
real_value: True
|
real_value: True
|
||||||
criterion:
|
criterion:
|
||||||
type: L1loss
|
type: L1Loss
|
||||||
trainer:
|
trainer:
|
||||||
type: trafficflowtrainer
|
type: trafficflowtrainer
|
||||||
log_dir: ./
|
log_dir: ./
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue