TrafficWheel/dataloader/PeMSDdataloader.py

159 lines
5.7 KiB
Python
Executable File

from utils.normalization import normalize_dataset
from dataloader.data_selector import load_st_dataset
import numpy as np
import torch
def get_dataloader(args, normalizer="std", single=True):
data = load_st_dataset(args)
args = args["data"]
L, N, F = data.shape
# Generate sliding windows for main data and add time features
x, y = _prepare_data_with_windows(data, args, single)
# Split data
split_fn = split_data_by_days if args["test_ratio"] > 1 else split_data_by_ratio
x_train, x_val, x_test = split_fn(x, args["val_ratio"], args["test_ratio"])
y_train, y_val, y_test = split_fn(y, args["val_ratio"], args["test_ratio"])
# Normalize x and y using the same scaler
scaler = _normalize_data(x_train, x_val, x_test, args, normalizer)
_apply_existing_scaler(y_train, y_val, y_test, scaler, args)
# Create dataloaders
return (
_create_dataloader(x_train, y_train, args["batch_size"], True, False),
_create_dataloader(x_val, y_val, args["batch_size"], False, False),
_create_dataloader(x_test, y_test, args["batch_size"], False, False),
scaler
)
def _prepare_data_with_windows(data, args, single):
# Generate sliding windows for main data
x = add_window_x(data, args["lag"], args["horizon"], single)
y = add_window_y(data, args["lag"], args["horizon"], single)
# Generate time features
time_features = _generate_time_features(data.shape[0], args)
# Add time features to x and y
x = _add_time_features(x, time_features, args["lag"], args["horizon"], single, add_window_x)
y = _add_time_features(y, time_features, args["lag"], args["horizon"], single, add_window_y)
return x, y
def _generate_time_features(L, args):
N = args["num_nodes"]
time_in_day = [i % args["steps_per_day"] / args["steps_per_day"] for i in range(L)]
time_in_day = np.tile(np.array(time_in_day), [1, N, 1]).transpose((2, 1, 0))
day_in_week = [(i // args["steps_per_day"]) % args["days_per_week"] for i in range(L)]
day_in_week = np.tile(np.array(day_in_week), [1, N, 1]).transpose((2, 1, 0))
return time_in_day, day_in_week
def _add_time_features(data, time_features, lag, horizon, single, window_fn):
time_in_day, day_in_week = time_features
time_day = window_fn(time_in_day, lag, horizon, single)
time_week = window_fn(day_in_week, lag, horizon, single)
return np.concatenate([data, time_day, time_week], axis=-1)
def _normalize_data(train_data, val_data, test_data, args, normalizer):
scaler = normalize_dataset(train_data[..., : args["input_dim"]], normalizer, args["column_wise"])
for data in [train_data, val_data, test_data]:
data[..., : args["input_dim"]] = scaler.transform(data[..., : args["input_dim"]])
return scaler
def _apply_existing_scaler(train_data, val_data, test_data, scaler, args):
for data in [train_data, val_data, test_data]:
data[..., : args["input_dim"]] = scaler.transform(data[..., : args["input_dim"]])
def _create_dataloader(X_data, Y_data, batch_size, shuffle, drop_last):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
X_tensor = torch.tensor(X_data, dtype=torch.float32, device=device)
Y_tensor = torch.tensor(Y_data, dtype=torch.float32, device=device)
dataset = torch.utils.data.TensorDataset(X_tensor, Y_tensor)
return torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last)
def split_data_by_days(data, val_days, test_days, interval=30):
t = int((24 * 60) / interval)
test_data = data[-t * int(test_days) :]
val_data = data[-t * int(test_days + val_days) : -t * int(test_days)]
train_data = data[: -t * int(test_days + val_days)]
return train_data, val_data, test_data
def split_data_by_ratio(data, val_ratio, test_ratio):
data_len = data.shape[0]
test_data = data[-int(data_len * test_ratio) :]
val_data = data[
-int(data_len * (test_ratio + val_ratio)) : -int(data_len * test_ratio)
]
train_data = data[: -int(data_len * (test_ratio + val_ratio))]
return train_data, val_data, test_data
def _generate_windows(data, window=3, horizon=1, offset=0):
"""
Internal helper function to generate sliding windows.
:param data: Input data
:param window: Window size
:param horizon: Horizon size
:param offset: Offset from window start
:return: Windowed data
"""
length = len(data)
end_index = length - horizon - window + 1
windows = []
index = 0
while index < end_index:
windows.append(data[index + offset : index + offset + window])
index += 1
return np.array(windows)
def add_window_x(data, window=3, horizon=1, single=False):
"""
Generate windowed X values from the input data.
:param data: Input data, shape [B, ...]
:param window: Size of the sliding window
:param horizon: Horizon size
:param single: If True, generate single-step windows, else multi-step
:return: X with shape [B, W, ...]
"""
return _generate_windows(data, window, horizon, offset=0)
def add_window_y(data, window=3, horizon=1, single=False):
"""
Generate windowed Y values from the input data.
:param data: Input data, shape [B, ...]
:param window: Size of the sliding window
:param horizon: Horizon size
:param single: If True, generate single-step windows, else multi-step
:return: Y with shape [B, H, ...]
"""
offset = window if not single else window + horizon - 1
return _generate_windows(data, window=1 if single else horizon, horizon=horizon, offset=offset)
if __name__ == "__main__":
from dataloader.data_selector import load_st_dataset
res = load_st_dataset({"dataset": "SD"})
print(f"Dataset shape: {res.shape}")