TrafficWheel/dataloader/Informer_loader.py

180 lines
6.3 KiB
Python

import numpy as np
import torch
from dataloader.data_selector import load_st_dataset
from utils.normalization import normalize_dataset
# ==============================================================
# MAIN ENTRY
# ==============================================================
def get_dataloader(args, normalizer="std", single=True):
"""
Return dataloaders with x, y, x_mark, y_mark.
This version follows Informer/ETSformer official dataloader behavior.
"""
data = load_st_dataset(args)
args = args["data"]
x, y, x_mark, y_mark = _prepare_data_with_windows(data, args)
# --- split ---
split_fn = split_data_by_days if args["test_ratio"] > 1 else split_data_by_ratio
x_train, x_val, x_test = split_fn(x, args["val_ratio"], args["test_ratio"])
y_train, y_val, y_test = split_fn(y, args["val_ratio"], args["test_ratio"])
x_mark_train, x_mark_val, x_mark_test = split_fn(x_mark, args["val_ratio"], args["test_ratio"])
y_mark_train, y_mark_val, y_mark_test = split_fn(y_mark, args["val_ratio"], args["test_ratio"])
# --- normalization ---
scaler = _normalize_data(x_train, x_val, x_test, args, normalizer)
_apply_existing_scaler(y_train, y_val, y_test, scaler, args)
# reshape [b, t, n, c] -> [b*n, t, c]
(x_train, x_val, x_test,
y_train, y_val, y_test,
x_mark_train, x_mark_val, x_mark_test,
y_mark_train, y_mark_val, y_mark_test) = _reshape_tensor(
x_train, x_val, x_test,
y_train, y_val, y_test,
x_mark_train, x_mark_val, x_mark_test,
y_mark_train, y_mark_val, y_mark_test
)
# --- dataloaders ---
return (
_create_dataloader(x_train, y_train, x_mark_train, y_mark_train,
args["batch_size"], True, False),
_create_dataloader(x_val, y_val, x_mark_val, y_mark_val,
args["batch_size"], False, False),
_create_dataloader(x_test, y_test, x_mark_test, y_mark_test,
args["batch_size"], False, False),
scaler
)
# ==============================================================
# Informer-style WINDOW GENERATION
# ==============================================================
def _prepare_data_with_windows(data, args):
"""
Generate x, y, x_mark, y_mark using Informer slicing rule.
x: [seq_len]
y: [label_len + pred_len]
"""
seq_len = args["lag"]
label_len = args["label_len"]
pred_len = args["horizon"]
L, N, C = data.shape
# ---------- construct timestamp features ----------
time_in_day, day_in_week = _generate_time_features(L, args)
data_mark = np.concatenate([time_in_day, day_in_week], axis=-1)
xs, ys, x_marks, y_marks = [], [], [], []
for s_begin in range(L - seq_len - pred_len - 1):
s_end = s_begin + seq_len
r_begin = s_end - label_len
r_end = r_begin + label_len + pred_len
xs.append(data[s_begin:s_end])
ys.append(data[r_begin:r_end])
x_marks.append(data_mark[s_begin:s_end])
y_marks.append(data_mark[r_begin:r_end])
return np.array(xs), np.array(ys), np.array(x_marks), np.array(y_marks)
# ==============================================================
# TIME FEATURE
# ==============================================================
def _generate_time_features(L, args):
N = args["num_nodes"]
# Time in day
tid = np.array([i % args["steps_per_day"] / args["steps_per_day"] for i in range(L)])
tid = np.tile(tid[:, None], (1, N))
# Day in week
diw = np.array([(i // args["steps_per_day"]) % args["days_per_week"] for i in range(L)])
diw = np.tile(diw[:, None], (1, N))
return tid[..., None], diw[..., None]
# ==============================================================
# NORMALIZATION
# ==============================================================
def _normalize_data(train_data, val_data, test_data, args, normalizer):
scaler = normalize_dataset(
train_data[..., :args["input_dim"]],
normalizer, args["column_wise"]
)
for data in [train_data, val_data, test_data]:
data[..., :args["input_dim"]] = scaler.transform(
data[..., :args["input_dim"]]
)
return scaler
def _apply_existing_scaler(train_data, val_data, test_data, scaler, args):
for data in [train_data, val_data, test_data]:
data[..., :args["input_dim"]] = scaler.transform(
data[..., :args["input_dim"]]
)
# ==============================================================
# DATALOADER
# ==============================================================
def _create_dataloader(x, y, x_mark, y_mark, batch_size, shuffle, drop_last):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
dataset = torch.utils.data.TensorDataset(
torch.tensor(x, dtype=torch.float32, device=device),
torch.tensor(y, dtype=torch.float32, device=device),
torch.tensor(x_mark, dtype=torch.float32, device=device),
torch.tensor(y_mark, dtype=torch.float32, device=device),
)
return torch.utils.data.DataLoader(dataset, batch_size=batch_size,
shuffle=shuffle, drop_last=drop_last)
# ==============================================================
# SPLIT
# ==============================================================
def split_data_by_days(data, val_days, test_days, interval=30):
t = int((24 * 60) / interval)
test_data = data[-t * int(test_days):]
val_data = data[-t * int(test_days + val_days):-t * int(test_days)]
train_data = data[:-t * int(test_days + val_days)]
return train_data, val_data, test_data
def split_data_by_ratio(data, val_ratio, test_ratio):
L = len(data)
test_data = data[-int(L * test_ratio):]
val_data = data[-int(L * (test_ratio + val_ratio)):-int(L * test_ratio)]
train_data = data[: -int(L * (test_ratio + val_ratio))]
return train_data, val_data, test_data
# ==============================================================
# RESHAPE [B,T,N,C] -> [B*N,T,C]
# ==============================================================
def _reshape_tensor(*tensors):
reshaped = []
for x in tensors:
b, t, n, c = x.shape
x_new = x.transpose(0, 2, 1, 3).reshape(b * n, t, c)
reshaped.append(x_new)
return reshaped