159 lines
5.7 KiB
Python
Executable File
159 lines
5.7 KiB
Python
Executable File
from utils.normalization import normalize_dataset
|
|
from dataloader.data_selector import load_st_dataset
|
|
|
|
import numpy as np
|
|
import torch
|
|
|
|
|
|
def get_dataloader(args, normalizer="std", single=True):
|
|
data = load_st_dataset(args)
|
|
args = args["data"]
|
|
L, N, F = data.shape
|
|
|
|
# Generate sliding windows for main data and add time features
|
|
x, y = _prepare_data_with_windows(data, args, single)
|
|
|
|
# Split data
|
|
split_fn = split_data_by_days if args["test_ratio"] > 1 else split_data_by_ratio
|
|
x_train, x_val, x_test = split_fn(x, args["val_ratio"], args["test_ratio"])
|
|
y_train, y_val, y_test = split_fn(y, args["val_ratio"], args["test_ratio"])
|
|
|
|
# Normalize x and y using the same scaler
|
|
scaler = _normalize_data(x_train, x_val, x_test, args, normalizer)
|
|
_apply_existing_scaler(y_train, y_val, y_test, scaler, args)
|
|
|
|
# Create dataloaders
|
|
return (
|
|
_create_dataloader(x_train, y_train, args["batch_size"], True, False),
|
|
_create_dataloader(x_val, y_val, args["batch_size"], False, False),
|
|
_create_dataloader(x_test, y_test, args["batch_size"], False, False),
|
|
scaler
|
|
)
|
|
|
|
|
|
def _prepare_data_with_windows(data, args, single):
|
|
# Generate sliding windows for main data
|
|
x = add_window_x(data, args["lag"], args["horizon"], single)
|
|
y = add_window_y(data, args["lag"], args["horizon"], single)
|
|
|
|
# Generate time features
|
|
time_features = _generate_time_features(data.shape[0], args)
|
|
|
|
# Add time features to x and y
|
|
x = _add_time_features(x, time_features, args["lag"], args["horizon"], single, add_window_x)
|
|
y = _add_time_features(y, time_features, args["lag"], args["horizon"], single, add_window_y)
|
|
|
|
return x, y
|
|
|
|
|
|
def _generate_time_features(L, args):
|
|
N = args["num_nodes"]
|
|
time_in_day = [i % args["steps_per_day"] / args["steps_per_day"] for i in range(L)]
|
|
time_in_day = np.tile(np.array(time_in_day), [1, N, 1]).transpose((2, 1, 0))
|
|
|
|
day_in_week = [(i // args["steps_per_day"]) % args["days_per_week"] for i in range(L)]
|
|
day_in_week = np.tile(np.array(day_in_week), [1, N, 1]).transpose((2, 1, 0))
|
|
|
|
return time_in_day, day_in_week
|
|
|
|
|
|
def _add_time_features(data, time_features, lag, horizon, single, window_fn):
|
|
time_in_day, day_in_week = time_features
|
|
time_day = window_fn(time_in_day, lag, horizon, single)
|
|
time_week = window_fn(day_in_week, lag, horizon, single)
|
|
return np.concatenate([data, time_day, time_week], axis=-1)
|
|
|
|
|
|
def _normalize_data(train_data, val_data, test_data, args, normalizer):
|
|
scaler = normalize_dataset(train_data[..., : args["input_dim"]], normalizer, args["column_wise"])
|
|
|
|
for data in [train_data, val_data, test_data]:
|
|
data[..., : args["input_dim"]] = scaler.transform(data[..., : args["input_dim"]])
|
|
|
|
return scaler
|
|
|
|
|
|
def _apply_existing_scaler(train_data, val_data, test_data, scaler, args):
|
|
for data in [train_data, val_data, test_data]:
|
|
data[..., : args["input_dim"]] = scaler.transform(data[..., : args["input_dim"]])
|
|
|
|
|
|
def _create_dataloader(X_data, Y_data, batch_size, shuffle, drop_last):
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
|
X_tensor = torch.tensor(X_data, dtype=torch.float32, device=device)
|
|
Y_tensor = torch.tensor(Y_data, dtype=torch.float32, device=device)
|
|
dataset = torch.utils.data.TensorDataset(X_tensor, Y_tensor)
|
|
return torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last)
|
|
|
|
|
|
def split_data_by_days(data, val_days, test_days, interval=30):
|
|
t = int((24 * 60) / interval)
|
|
test_data = data[-t * int(test_days) :]
|
|
val_data = data[-t * int(test_days + val_days) : -t * int(test_days)]
|
|
train_data = data[: -t * int(test_days + val_days)]
|
|
return train_data, val_data, test_data
|
|
|
|
|
|
def split_data_by_ratio(data, val_ratio, test_ratio):
|
|
data_len = data.shape[0]
|
|
test_data = data[-int(data_len * test_ratio) :]
|
|
val_data = data[
|
|
-int(data_len * (test_ratio + val_ratio)) : -int(data_len * test_ratio)
|
|
]
|
|
train_data = data[: -int(data_len * (test_ratio + val_ratio))]
|
|
return train_data, val_data, test_data
|
|
|
|
|
|
|
|
|
|
def _generate_windows(data, window=3, horizon=1, offset=0):
|
|
"""
|
|
Internal helper function to generate sliding windows.
|
|
|
|
:param data: Input data
|
|
:param window: Window size
|
|
:param horizon: Horizon size
|
|
:param offset: Offset from window start
|
|
:return: Windowed data
|
|
"""
|
|
length = len(data)
|
|
end_index = length - horizon - window + 1
|
|
windows = []
|
|
index = 0
|
|
|
|
while index < end_index:
|
|
windows.append(data[index + offset : index + offset + window])
|
|
index += 1
|
|
|
|
return np.array(windows)
|
|
|
|
def add_window_x(data, window=3, horizon=1, single=False):
|
|
"""
|
|
Generate windowed X values from the input data.
|
|
|
|
:param data: Input data, shape [B, ...]
|
|
:param window: Size of the sliding window
|
|
:param horizon: Horizon size
|
|
:param single: If True, generate single-step windows, else multi-step
|
|
:return: X with shape [B, W, ...]
|
|
"""
|
|
return _generate_windows(data, window, horizon, offset=0)
|
|
|
|
def add_window_y(data, window=3, horizon=1, single=False):
|
|
"""
|
|
Generate windowed Y values from the input data.
|
|
|
|
:param data: Input data, shape [B, ...]
|
|
:param window: Size of the sliding window
|
|
:param horizon: Horizon size
|
|
:param single: If True, generate single-step windows, else multi-step
|
|
:return: Y with shape [B, H, ...]
|
|
"""
|
|
offset = window if not single else window + horizon - 1
|
|
return _generate_windows(data, window=1 if single else horizon, horizon=horizon, offset=offset)
|
|
|
|
if __name__ == "__main__":
|
|
from dataloader.data_selector import load_st_dataset
|
|
res = load_st_dataset({"dataset": "SD"})
|
|
print(f"Dataset shape: {res.shape}")
|