refactor(dataloader): 重构数据加载器代码,优化滑动窗口生成和归一化处理

重构PeMSD和EXP数据加载器,使用numpy的stride_tricks实现高效滑动窗口
统一数据预处理流程,简化代码结构并提高可维护性
优化归一化处理,支持多scaler通道独立处理
This commit is contained in:
czzhangheng 2025-12-15 20:54:20 +08:00
parent 56b09ea8ac
commit b6d4f5daf5
5 changed files with 171 additions and 340 deletions

View File

@ -1,199 +1,90 @@
import numpy as np
import torch
from utils.normalization import normalize_dataset
from dataloader.data_selector import load_st_dataset
from utils.normalization import normalize_dataset
def get_dataloader(args, normalizer="std", single=True):
# args should now include 'cycle'
data = load_st_dataset(args["type"], args["sample"]) # [T, N, F]
L, N, F = data.shape
# compute cycle index
cycle_arr = np.arange(L) % args["cycle"] # length-L array
_device = "cuda" if torch.cuda.is_available() else "cpu"
to_tensor = lambda a: torch.as_tensor(a, dtype=torch.float32, device=_device)
# Step 1: sliding windows for X and Y
x = add_window_x(data, args["lag"], args["horizon"], single)
y = add_window_y(data, args["lag"], args["horizon"], single)
# window count = M = L - lag - horizon + 1
M = x.shape[0]
# Sliding window (stride trick, zero copy)
window = lambda d, w, h, o=0: np.lib.stride_tricks.as_strided(
d[o:],
shape=(len(d) - w - h + 1, w, *d.shape[1:]),
strides=(d.strides[0], d.strides[0], *d.strides[1:])
)
# Step 2: time features
time_in_day = np.tile(
np.array([i % args["steps_per_day"] / args["steps_per_day"] for i in range(L)]),
(N, 1),
).T.reshape(L, N, 1)
day_in_week = np.tile(
np.array(
[(i // args["steps_per_day"]) % args["days_per_week"] for i in range(L)]
),
(N, 1),
).T.reshape(L, N, 1)
# pad_with_last_sample=True
pad_last = lambda X, Y, bs: (
(lambda r: (
(np.concatenate([X, np.repeat(X[-1:], r, 0)], 0),
np.concatenate([Y, np.repeat(Y[-1:], r, 0)], 0))
if r else (X, Y)
))((-len(X)) % bs)
)
x_day = add_window_x(time_in_day, args["lag"], args["horizon"], single)
x_week = add_window_x(day_in_week, args["lag"], args["horizon"], single)
x = np.concatenate([x, x_day, x_week], axis=-1)
# del x_day, x_week
# gc.collect()
# Train / Val / Test split
split_by_ratio = lambda d, vr, tr: (
d[:-(vl := int(len(d) * (vr + tr)))],
d[-vl:-(tl := int(len(d) * tr))],
d[-tl:]
)
# Step 3: extract cycle index per window: take value at end of sequence
cycle_win = np.array([cycle_arr[i + args["lag"]] for i in range(M)]) # shape [M]
# Step 4: split into train/val/test
if args["test_ratio"] > 1:
x_train, x_val, x_test = split_data_by_days(
x, args["val_ratio"], args["test_ratio"]
)
y_train, y_val, y_test = split_data_by_days(
y, args["val_ratio"], args["test_ratio"]
)
c_train, c_val, c_test = split_data_by_days(
cycle_win, args["val_ratio"], args["test_ratio"]
)
else:
x_train, x_val, x_test = split_data_by_ratio(
x, args["val_ratio"], args["test_ratio"]
)
y_train, y_val, y_test = split_data_by_ratio(
y, args["val_ratio"], args["test_ratio"]
)
c_train, c_val, c_test = split_data_by_ratio(
cycle_win, args["val_ratio"], args["test_ratio"]
)
# del x, y, cycle_win
# gc.collect()
def get_dataloader(config, normalizer="std", single_step=True):
data = load_st_dataset(config)
cfg = config["data"]
# Step 5: normalization on X only
scaler = normalize_dataset(
x_train[..., : args["input_dim"]], normalizer, args["column_wise"]
)
x_train[..., : args["input_dim"]] = scaler.transform(
x_train[..., : args["input_dim"]]
)
x_val[..., : args["input_dim"]] = scaler.transform(x_val[..., : args["input_dim"]])
x_test[..., : args["input_dim"]] = scaler.transform(
x_test[..., : args["input_dim"]]
T, N, _ = data.shape
lag, horizon, batch_size, input_dim = (
cfg["lag"], cfg["horizon"], cfg["batch_size"], cfg["input_dim"]
)
# add time features to Y
y_day = add_window_y(time_in_day, args["lag"], args["horizon"], single)
y_week = add_window_y(day_in_week, args["lag"], args["horizon"], single)
y = np.concatenate([y, y_day, y_week], axis=-1)
# del y_day, y_week, time_in_day, day_in_week
# gc.collect()
# split Y time-augmented
if args["test_ratio"] > 1:
y_train, y_val, y_test = split_data_by_days(
y, args["val_ratio"], args["test_ratio"]
)
else:
y_train, y_val, y_test = split_data_by_ratio(
y, args["val_ratio"], args["test_ratio"]
)
# del y
# Step 6: create dataloaders including cycle index
train_loader = data_loader_with_cycle(
x_train, y_train, c_train, args["batch_size"], shuffle=True, drop_last=True
)
val_loader = data_loader_with_cycle(
x_val, y_val, c_val, args["batch_size"], shuffle=False, drop_last=True
)
test_loader = data_loader_with_cycle(
x_test, y_test, c_test, args["batch_size"], shuffle=False, drop_last=False
# X / Y construction
X = window(data, lag, horizon)
Y = window(
data,
1 if single_step else horizon,
horizon,
lag if not single_step else lag + horizon - 1
)
return train_loader, val_loader, test_loader, scaler
# Time features
t = np.arange(T)
time_in_day = np.tile((t % cfg["steps_per_day"]) / cfg["steps_per_day"], (N, 1)).T
day_in_week = np.tile((t // cfg["steps_per_day"]) % cfg["days_per_week"], (N, 1)).T
tf = lambda z: window(z[..., None], lag, horizon)
X = np.concatenate([X, tf(time_in_day), tf(day_in_week)], -1)
Y = np.concatenate([Y, tf(time_in_day), tf(day_in_week)], -1)
def data_loader_with_cycle(X, Y, C, batch_size, shuffle=True, drop_last=True):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
X_t = torch.tensor(X, dtype=torch.float32, device=device)
Y_t = torch.tensor(Y, dtype=torch.float32, device=device)
C_t = torch.tensor(C, dtype=torch.long, device=device).unsqueeze(-1) # [B,1]
dataset = torch.utils.data.TensorDataset(X_t, Y_t, C_t)
loader = torch.utils.data.DataLoader(
dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last
)
return loader
# Split
X_train, X_val, X_test = split_by_ratio(X, cfg["val_ratio"], cfg["test_ratio"])
Y_train, Y_val, Y_test = split_by_ratio(Y, cfg["val_ratio"], cfg["test_ratio"])
def split_data_by_days(data, val_days, test_days, interval=30):
t = int((24 * 60) / interval)
test_data = data[-t * int(test_days) :]
val_data = data[-t * int(test_days + val_days) : -t * int(test_days)]
train_data = data[: -t * int(test_days + val_days)]
return train_data, val_data, test_data
def split_data_by_ratio(data, val_ratio, test_ratio):
data_len = data.shape[0]
test_data = data[-int(data_len * test_ratio) :]
val_data = data[
-int(data_len * (test_ratio + val_ratio)) : -int(data_len * test_ratio)
# Channel-wise normalization (fit on train only)
scalers = [
normalize_dataset(X_train[..., i:i+1], normalizer, cfg["column_wise"])
for i in range(input_dim)
]
train_data = data[: -int(data_len * (test_ratio + val_ratio))]
return train_data, val_data, test_data
for i, sc in enumerate(scalers):
for d in (X_train, X_val, X_test, Y_train, Y_val, Y_test):
d[..., i:i+1] = sc.transform(d[..., i:i+1])
# Padding
X_train, Y_train = pad_last(X_train, Y_train, batch_size)
X_val, Y_val = pad_last(X_val, Y_val, batch_size)
X_test, Y_test = pad_last(X_test, Y_test, batch_size)
def data_loader(X, Y, batch_size, shuffle=True, drop_last=True):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
X = torch.tensor(X, dtype=torch.float32, device=device)
Y = torch.tensor(Y, dtype=torch.float32, device=device)
data = torch.utils.data.TensorDataset(X, Y)
dataloader = torch.utils.data.DataLoader(
data, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last
# DataLoader
make_loader = lambda X, Y, shuffle: torch.utils.data.DataLoader(
torch.utils.data.TensorDataset(to_tensor(X), to_tensor(Y)),
batch_size=batch_size, shuffle=shuffle, drop_last=False
)
return dataloader
def add_window_x(data, window=3, horizon=1, single=False):
"""
Generate windowed X values from the input data.
:param data: Input data, shape [B, ...]
:param window: Size of the sliding window
:param horizon: Horizon size
:param single: If True, generate single-step windows, else multi-step
:return: X with shape [B, W, ...]
"""
length = len(data)
end_index = length - horizon - window + 1
x = [] # Sliding windows
index = 0
while index < end_index:
x.append(data[index : index + window])
index += 1
return np.array(x)
def add_window_y(data, window=3, horizon=1, single=False):
"""
Generate windowed Y values from the input data.
:param data: Input data, shape [B, ...]
:param window: Size of the sliding window
:param horizon: Horizon size
:param single: If True, generate single-step windows, else multi-step
:return: Y with shape [B, H, ...]
"""
length = len(data)
end_index = length - horizon - window + 1
y = [] # Horizon values
index = 0
while index < end_index:
if single:
y.append(data[index + window + horizon - 1 : index + window + horizon])
else:
y.append(data[index + window : index + window + horizon])
index += 1
return np.array(y)
if __name__ == "__main__":
res = load_st_dataset("SD", 1)
k = 1
return (
make_loader(X_train, Y_train, True),
make_loader(X_val, Y_val, False),
make_loader(X_test, Y_test, False),
scalers
)

View File

@ -1,158 +1,90 @@
import numpy as np
import torch
from dataloader.data_selector import load_st_dataset
from utils.normalization import normalize_dataset
def get_dataloader(args, normalizer="std", single=True):
data = load_st_dataset(args)
args = args["data"]
L, N, F = data.shape
_device = "cuda" if torch.cuda.is_available() else "cpu"
to_tensor = lambda a: torch.as_tensor(a, dtype=torch.float32, device=_device)
# Generate sliding windows for main data and add time features
x, y = _prepare_data_with_windows(data, args, single)
# Sliding window (stride trick, zero copy)
window = lambda d, w, h, o=0: np.lib.stride_tricks.as_strided(
d[o:],
shape=(len(d) - w - h + 1, w, *d.shape[1:]),
strides=(d.strides[0], d.strides[0], *d.strides[1:])
)
# Split data
split_fn = split_data_by_days if args["test_ratio"] > 1 else split_data_by_ratio
x_train, x_val, x_test = split_fn(x, args["val_ratio"], args["test_ratio"])
y_train, y_val, y_test = split_fn(y, args["val_ratio"], args["test_ratio"])
# pad_with_last_sample=True
pad_last = lambda X, Y, bs: (
(lambda r: (
(np.concatenate([X, np.repeat(X[-1:], r, 0)], 0),
np.concatenate([Y, np.repeat(Y[-1:], r, 0)], 0))
if r else (X, Y)
))((-len(X)) % bs)
)
# Normalize x and y using the same scaler
scaler = _normalize_data(x_train, x_val, x_test, args, normalizer)
_apply_existing_scaler(y_train, y_val, y_test, scaler, args)
# Train / Val / Test split
split_by_ratio = lambda d, vr, tr: (
d[:-(vl := int(len(d) * (vr + tr)))],
d[-vl:-(tl := int(len(d) * tr))],
d[-tl:]
)
# Create dataloaders
return (
_create_dataloader(x_train, y_train, args["batch_size"], True, False),
_create_dataloader(x_val, y_val, args["batch_size"], False, False),
_create_dataloader(x_test, y_test, args["batch_size"], False, False),
scaler
def get_dataloader(config, normalizer="std", single_step=True):
data = load_st_dataset(config)
cfg = config["data"]
T, N, _ = data.shape
lag, horizon, batch_size, input_dim = (
cfg["lag"], cfg["horizon"], cfg["batch_size"], cfg["input_dim"]
)
# X / Y construction
X = window(data, lag, horizon)
Y = window(
data,
1 if single_step else horizon,
horizon,
lag if not single_step else lag + horizon - 1
)
def _prepare_data_with_windows(data, args, single):
# Generate sliding windows for main data
x = add_window_x(data, args["lag"], args["horizon"], single)
y = add_window_y(data, args["lag"], args["horizon"], single)
# Time features
t = np.arange(T)
time_in_day = np.tile((t % cfg["steps_per_day"]) / cfg["steps_per_day"], (N, 1)).T
day_in_week = np.tile((t // cfg["steps_per_day"]) % cfg["days_per_week"], (N, 1)).T
tf = lambda z: window(z[..., None], lag, horizon)
# Generate time features
time_features = _generate_time_features(data.shape[0], args)
X = np.concatenate([X, tf(time_in_day), tf(day_in_week)], -1)
Y = np.concatenate([Y, tf(time_in_day), tf(day_in_week)], -1)
# Add time features to x and y
x = _add_time_features(x, time_features, args["lag"], args["horizon"], single, add_window_x)
y = _add_time_features(y, time_features, args["lag"], args["horizon"], single, add_window_y)
# Split
X_train, X_val, X_test = split_by_ratio(X, cfg["val_ratio"], cfg["test_ratio"])
Y_train, Y_val, Y_test = split_by_ratio(Y, cfg["val_ratio"], cfg["test_ratio"])
return x, y
def _generate_time_features(L, args):
N = args["num_nodes"]
time_in_day = [i % args["steps_per_day"] / args["steps_per_day"] for i in range(L)]
time_in_day = np.tile(np.array(time_in_day), [1, N, 1]).transpose((2, 1, 0))
day_in_week = [(i // args["steps_per_day"]) % args["days_per_week"] for i in range(L)]
day_in_week = np.tile(np.array(day_in_week), [1, N, 1]).transpose((2, 1, 0))
return time_in_day, day_in_week
def _add_time_features(data, time_features, lag, horizon, single, window_fn):
time_in_day, day_in_week = time_features
time_day = window_fn(time_in_day, lag, horizon, single)
time_week = window_fn(day_in_week, lag, horizon, single)
return np.concatenate([data, time_day, time_week], axis=-1)
def _normalize_data(train_data, val_data, test_data, args, normalizer):
scaler = normalize_dataset(train_data[..., : args["input_dim"]], normalizer, args["column_wise"])
for data in [train_data, val_data, test_data]:
data[..., : args["input_dim"]] = scaler.transform(data[..., : args["input_dim"]])
return scaler
def _apply_existing_scaler(train_data, val_data, test_data, scaler, args):
for data in [train_data, val_data, test_data]:
data[..., : args["input_dim"]] = scaler.transform(data[..., : args["input_dim"]])
def _create_dataloader(X_data, Y_data, batch_size, shuffle, drop_last):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
X_tensor = torch.tensor(X_data, dtype=torch.float32, device=device)
Y_tensor = torch.tensor(Y_data, dtype=torch.float32, device=device)
dataset = torch.utils.data.TensorDataset(X_tensor, Y_tensor)
return torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last)
def split_data_by_days(data, val_days, test_days, interval=30):
t = int((24 * 60) / interval)
test_data = data[-t * int(test_days) :]
val_data = data[-t * int(test_days + val_days) : -t * int(test_days)]
train_data = data[: -t * int(test_days + val_days)]
return train_data, val_data, test_data
def split_data_by_ratio(data, val_ratio, test_ratio):
data_len = data.shape[0]
test_data = data[-int(data_len * test_ratio) :]
val_data = data[
-int(data_len * (test_ratio + val_ratio)) : -int(data_len * test_ratio)
# Channel-wise normalization (fit on train only)
scalers = [
normalize_dataset(X_train[..., i:i+1], normalizer, cfg["column_wise"])
for i in range(input_dim)
]
train_data = data[: -int(data_len * (test_ratio + val_ratio))]
return train_data, val_data, test_data
for i, sc in enumerate(scalers):
for d in (X_train, X_val, X_test, Y_train, Y_val, Y_test):
d[..., i:i+1] = sc.transform(d[..., i:i+1])
# Padding
X_train, Y_train = pad_last(X_train, Y_train, batch_size)
X_val, Y_val = pad_last(X_val, Y_val, batch_size)
X_test, Y_test = pad_last(X_test, Y_test, batch_size)
# DataLoader
make_loader = lambda X, Y, shuffle: torch.utils.data.DataLoader(
torch.utils.data.TensorDataset(to_tensor(X), to_tensor(Y)),
batch_size=batch_size, shuffle=shuffle, drop_last=False
)
def _generate_windows(data, window=3, horizon=1, offset=0):
"""
Internal helper function to generate sliding windows.
:param data: Input data
:param window: Window size
:param horizon: Horizon size
:param offset: Offset from window start
:return: Windowed data
"""
length = len(data)
end_index = length - horizon - window + 1
windows = []
index = 0
while index < end_index:
windows.append(data[index + offset : index + offset + window])
index += 1
return np.array(windows)
def add_window_x(data, window=3, horizon=1, single=False):
"""
Generate windowed X values from the input data.
:param data: Input data, shape [B, ...]
:param window: Size of the sliding window
:param horizon: Horizon size
:param single: If True, generate single-step windows, else multi-step
:return: X with shape [B, W, ...]
"""
return _generate_windows(data, window, horizon, offset=0)
def add_window_y(data, window=3, horizon=1, single=False):
"""
Generate windowed Y values from the input data.
:param data: Input data, shape [B, ...]
:param window: Size of the sliding window
:param horizon: Horizon size
:param single: If True, generate single-step windows, else multi-step
:return: Y with shape [B, H, ...]
"""
offset = window if not single else window + horizon - 1
return _generate_windows(data, window=1 if single else horizon, horizon=horizon, offset=offset)
# if __name__ == "__main__":
# from dataloader.data_selector import load_st_dataset
# res = load_st_dataset({"dataset": "SD"})
# print(f"Dataset shape: {res.shape}")
return (
make_loader(X_train, Y_train, True),
make_loader(X_val, Y_val, False),
make_loader(X_test, Y_test, False),
scalers
)

View File

@ -13,7 +13,7 @@ def read_config(config_path):
# 全局配置
device = "cuda:0" # 指定设备为cuda:0
seed = 2023 # 随机种子
epochs = 1
epochs = 1 # 训练轮数
# 拷贝项
config["basic"]["device"] = device
@ -90,8 +90,9 @@ def main(model, data, debug=False):
if __name__ == "__main__":
# 调试用
# model_list = ["iTransformer", "PatchTST", "HI"]
model_list = ["ASTRA_v2", "GWN", "REPST", "STAEFormer", "MTGNN"]
# model_list = ["ASTRA_v2", "GWN", "REPST", "STAEFormer", "MTGNN"]
model_list = ["MTGNN"]
# dataset_list = ["AirQuality", "SolarEnergy", "PEMS-BAY", "METR-LA", "BJTaxi-InFlow", "BJTaxi-OutFlow", "NYCBike-InFlow", "NYCBike-OutFlow"]
# dataset_list = ["AirQuality"]
dataset_list = ["AirQuality", "SolarEnergy", "METR-LA", "NYCBike-InFlow", "NYCBike-OutFlow"]
main(model_list, dataset_list, debug = True)
dataset_list = ["AirQuality"]
# dataset_list = ["AirQuality", "SolarEnergy", "METR-LA", "NYCBike-InFlow", "NYCBike-OutFlow"]
main(model_list, dataset_list, debug = False)

View File

@ -22,7 +22,6 @@ class Trainer:
train_loader, val_loader, test_loader,
scaler, args, lr_scheduler=None):
self.config = args
self.device = args["basic"]["device"]
self.args = args["train"]

View File

@ -3,6 +3,7 @@ from tqdm import tqdm
from utils.logger import get_logger
from utils.loss_function import all_metrics
class Trainer:
def __init__(self, model, loss, optimizer,
train_loader, val_loader, test_loader,
@ -22,6 +23,16 @@ class Trainer:
self.test_loader = test_loader
self.scaler = scaler
# ===== 新增:统一反归一化接口(单 scaler / 多 scaler 通吃)=====
self.inv = (
(lambda x: self.scaler.inverse_transform(x))
if not isinstance(self.scaler, (list, tuple))
else (lambda x: torch.cat(
[s.inverse_transform(x[..., i:i+1])
for i, s in enumerate(self.scaler)],
dim=-1))
)
self._init_paths()
self._init_logger()
@ -56,16 +67,14 @@ class Trainer:
out = self.model(data)
if os.environ.get("TRY") == "True":
if out.shape == label.shape:
print(f"shape true, out: {out.shape}, label: {label.shape}")
assert False
else:
print(f"shape false, out: {out.shape}, label: {label.shape}")
assert False
print(f"out: {out.shape}, label: {label.shape}")
assert False
loss = self.loss(out, label)
d_out = self.scaler.inverse_transform(out)
d_lbl = self.scaler.inverse_transform(label)
# ===== 修改点:反归一化 =====
d_out = self.inv(out)
d_lbl = self.inv(label)
d_loss = self.loss(d_out, d_lbl)
total_loss += d_loss.item()
@ -120,12 +129,10 @@ class Trainer:
if losses["val"] < best:
best, patience = losses["val"], 0
best_w = copy.deepcopy(self.model.state_dict())
self.logger.info("Best validation model saved")
else:
patience += 1
if self.args["early_stop"] and patience == self.args["early_stop_patience"]:
self.logger.info("Early stopping triggered")
break
if losses["test"] < best_test:
@ -154,14 +161,12 @@ class Trainer:
for data, target in self.test_loader:
data, target = data.to(self.device), target.to(self.device)
label = target[..., :self.args["output_dim"]]
out = self.model(data)
y_pred.append(out.cpu())
y_pred.append(self.model(data).cpu())
y_true.append(label.cpu())
d_pred = self.scaler.inverse_transform(torch.cat(y_pred))
d_true = self.scaler.inverse_transform(torch.cat(y_true))
# ===== 修改点:反归一化 =====
d_pred = self.inv(torch.cat(y_pred))
d_true = self.inv(torch.cat(y_true))
for t in range(d_true.shape[1]):
mae, rmse, mape = all_metrics(
@ -173,8 +178,11 @@ class Trainer:
f"Horizon {t+1:02d} MAE:{mae:.4f} RMSE:{rmse:.4f} MAPE:{mape:.4f}"
)
avg_mae, avg_rmse, avg_mape = all_metrics(d_pred, d_true, self.args["mae_thresh"], self.args["mape_thresh"])
avg_mae, avg_rmse, avg_mape = all_metrics(
d_pred, d_true,
self.args["mae_thresh"],
self.args["mape_thresh"]
)
self.logger.info(
f"AVG MAE:{avg_mae:.4f} AVG RMSE:{avg_rmse:.4f} AVG MAPE:{avg_mape:.4f}"
)