TrafficWheel/dataloader/PeMSDdataloader.py

91 lines
2.8 KiB
Python
Executable File

import numpy as np
import torch
from dataloader.data_selector import load_st_dataset
from utils.normalization import normalize_dataset
_device = "cuda" if torch.cuda.is_available() else "cpu"
to_tensor = lambda a: torch.as_tensor(a, dtype=torch.float32, device=_device)
# Sliding window (stride trick, zero copy)
window = lambda d, w, h, o=0: np.lib.stride_tricks.as_strided(
d[o:],
shape=(len(d) - w - h + 1, w, *d.shape[1:]),
strides=(d.strides[0], d.strides[0], *d.strides[1:])
)
# pad_with_last_sample=True
pad_last = lambda X, Y, bs: (
(lambda r: (
(np.concatenate([X, np.repeat(X[-1:], r, 0)], 0),
np.concatenate([Y, np.repeat(Y[-1:], r, 0)], 0))
if r else (X, Y)
))((-len(X)) % bs)
)
# Train / Val / Test split
split_by_ratio = lambda d, vr, tr: (
d[:-(vl := int(len(d) * (vr + tr)))],
d[-vl:-(tl := int(len(d) * tr))],
d[-tl:]
)
def get_dataloader(config, normalizer="std", single_step=True):
data = load_st_dataset(config)
cfg = config["data"]
T, N, _ = data.shape
lag, horizon, batch_size, input_dim = (
cfg["lag"], cfg["horizon"], cfg["batch_size"], cfg["input_dim"]
)
# X / Y construction
X = window(data, lag, horizon)
Y = window(
data,
1 if single_step else horizon,
horizon,
lag if not single_step else lag + horizon - 1
)
# Time features
t = np.arange(T)
time_in_day = np.tile((t % cfg["steps_per_day"]) / cfg["steps_per_day"], (N, 1)).T
day_in_week = np.tile((t // cfg["steps_per_day"]) % cfg["days_per_week"], (N, 1)).T
tf = lambda z: window(z[..., None], lag, horizon)
X = np.concatenate([X, tf(time_in_day), tf(day_in_week)], -1)
Y = np.concatenate([Y, tf(time_in_day), tf(day_in_week)], -1)
# Split
X_train, X_val, X_test = split_by_ratio(X, cfg["val_ratio"], cfg["test_ratio"])
Y_train, Y_val, Y_test = split_by_ratio(Y, cfg["val_ratio"], cfg["test_ratio"])
# Channel-wise normalization (fit on train only)
scalers = [
normalize_dataset(X_train[..., i:i+1], normalizer, cfg["column_wise"])
for i in range(input_dim)
]
for i, sc in enumerate(scalers):
for d in (X_train, X_val, X_test, Y_train, Y_val, Y_test):
d[..., i:i+1] = sc.transform(d[..., i:i+1])
# Padding
X_train, Y_train = pad_last(X_train, Y_train, batch_size)
X_val, Y_val = pad_last(X_val, Y_val, batch_size)
X_test, Y_test = pad_last(X_test, Y_test, batch_size)
# DataLoader
make_loader = lambda X, Y, shuffle: torch.utils.data.DataLoader(
torch.utils.data.TensorDataset(to_tensor(X), to_tensor(Y)),
batch_size=batch_size, shuffle=shuffle, drop_last=False
)
return (
make_loader(X_train, Y_train, True),
make_loader(X_val, Y_val, False),
make_loader(X_test, Y_test, False),
scalers
)