from utils.normalization import normalize_dataset from dataloader.data_selector import load_st_dataset import numpy as np import torch def get_dataloader(args, normalizer="std", single=True): data = load_st_dataset(args) args = args["data"] L, N, F = data.shape # Generate sliding windows for main data and add time features x, y = _prepare_data_with_windows(data, args, single) # Split data split_fn = split_data_by_days if args["test_ratio"] > 1 else split_data_by_ratio x_train, x_val, x_test = split_fn(x, args["val_ratio"], args["test_ratio"]) y_train, y_val, y_test = split_fn(y, args["val_ratio"], args["test_ratio"]) # Normalize x and y using the same scaler scaler = _normalize_data(x_train, x_val, x_test, args, normalizer) _apply_existing_scaler(y_train, y_val, y_test, scaler, args) # Create dataloaders return ( _create_dataloader(x_train, y_train, args["batch_size"], True, False), _create_dataloader(x_val, y_val, args["batch_size"], False, False), _create_dataloader(x_test, y_test, args["batch_size"], False, False), scaler ) def _prepare_data_with_windows(data, args, single): # Generate sliding windows for main data x = add_window_x(data, args["lag"], args["horizon"], single) y = add_window_y(data, args["lag"], args["horizon"], single) # Generate time features time_features = _generate_time_features(data.shape[0], args) # Add time features to x and y x = _add_time_features(x, time_features, args["lag"], args["horizon"], single, add_window_x) y = _add_time_features(y, time_features, args["lag"], args["horizon"], single, add_window_y) return x, y def _generate_time_features(L, args): N = args["num_nodes"] time_in_day = [i % args["steps_per_day"] / args["steps_per_day"] for i in range(L)] time_in_day = np.tile(np.array(time_in_day), [1, N, 1]).transpose((2, 1, 0)) day_in_week = [(i // args["steps_per_day"]) % args["days_per_week"] for i in range(L)] day_in_week = np.tile(np.array(day_in_week), [1, N, 1]).transpose((2, 1, 0)) return time_in_day, day_in_week def _add_time_features(data, time_features, lag, horizon, single, window_fn): time_in_day, day_in_week = time_features time_day = window_fn(time_in_day, lag, horizon, single) time_week = window_fn(day_in_week, lag, horizon, single) return np.concatenate([data, time_day, time_week], axis=-1) def _normalize_data(train_data, val_data, test_data, args, normalizer): scaler = normalize_dataset(train_data[..., : args["input_dim"]], normalizer, args["column_wise"]) for data in [train_data, val_data, test_data]: data[..., : args["input_dim"]] = scaler.transform(data[..., : args["input_dim"]]) return scaler def _apply_existing_scaler(train_data, val_data, test_data, scaler, args): for data in [train_data, val_data, test_data]: data[..., : args["input_dim"]] = scaler.transform(data[..., : args["input_dim"]]) def _create_dataloader(X_data, Y_data, batch_size, shuffle, drop_last): device = torch.device("cuda" if torch.cuda.is_available() else "cpu") X_tensor = torch.tensor(X_data, dtype=torch.float32, device=device) Y_tensor = torch.tensor(Y_data, dtype=torch.float32, device=device) dataset = torch.utils.data.TensorDataset(X_tensor, Y_tensor) return torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last) def split_data_by_days(data, val_days, test_days, interval=30): t = int((24 * 60) / interval) test_data = data[-t * int(test_days) :] val_data = data[-t * int(test_days + val_days) : -t * int(test_days)] train_data = data[: -t * int(test_days + val_days)] return train_data, val_data, test_data def split_data_by_ratio(data, val_ratio, test_ratio): data_len = data.shape[0] test_data = data[-int(data_len * test_ratio) :] val_data = data[ -int(data_len * (test_ratio + val_ratio)) : -int(data_len * test_ratio) ] train_data = data[: -int(data_len * (test_ratio + val_ratio))] return train_data, val_data, test_data def _generate_windows(data, window=3, horizon=1, offset=0): """ Internal helper function to generate sliding windows. :param data: Input data :param window: Window size :param horizon: Horizon size :param offset: Offset from window start :return: Windowed data """ length = len(data) end_index = length - horizon - window + 1 windows = [] index = 0 while index < end_index: windows.append(data[index + offset : index + offset + window]) index += 1 return np.array(windows) def add_window_x(data, window=3, horizon=1, single=False): """ Generate windowed X values from the input data. :param data: Input data, shape [B, ...] :param window: Size of the sliding window :param horizon: Horizon size :param single: If True, generate single-step windows, else multi-step :return: X with shape [B, W, ...] """ return _generate_windows(data, window, horizon, offset=0) def add_window_y(data, window=3, horizon=1, single=False): """ Generate windowed Y values from the input data. :param data: Input data, shape [B, ...] :param window: Size of the sliding window :param horizon: Horizon size :param single: If True, generate single-step windows, else multi-step :return: Y with shape [B, H, ...] """ offset = window if not single else window + horizon - 1 return _generate_windows(data, window=1 if single else horizon, horizon=horizon, offset=offset) if __name__ == "__main__": from dataloader.data_selector import load_st_dataset res = load_st_dataset({"dataset": "SD"}) print(f"Dataset shape: {res.shape}")