import numpy as np import torch from dataloader.data_selector import load_st_dataset from utils.normalization import normalize_dataset # ============================================================== # MAIN ENTRY # ============================================================== def get_dataloader(args, normalizer="std", single=True): """ Return dataloaders with x, y, x_mark, y_mark. This version follows Informer/ETSformer official dataloader behavior. """ data = load_st_dataset(args) args = args["data"] x, y, x_mark, y_mark = _prepare_data_with_windows(data, args) # --- split --- split_fn = split_data_by_days if args["test_ratio"] > 1 else split_data_by_ratio x_train, x_val, x_test = split_fn(x, args["val_ratio"], args["test_ratio"]) y_train, y_val, y_test = split_fn(y, args["val_ratio"], args["test_ratio"]) x_mark_train, x_mark_val, x_mark_test = split_fn(x_mark, args["val_ratio"], args["test_ratio"]) y_mark_train, y_mark_val, y_mark_test = split_fn(y_mark, args["val_ratio"], args["test_ratio"]) # --- normalization --- scaler = _normalize_data(x_train, x_val, x_test, args, normalizer) _apply_existing_scaler(y_train, y_val, y_test, scaler, args) # reshape [b, t, n, c] -> [b*n, t, c] (x_train, x_val, x_test, y_train, y_val, y_test, x_mark_train, x_mark_val, x_mark_test, y_mark_train, y_mark_val, y_mark_test) = _reshape_tensor( x_train, x_val, x_test, y_train, y_val, y_test, x_mark_train, x_mark_val, x_mark_test, y_mark_train, y_mark_val, y_mark_test ) # --- dataloaders --- return ( _create_dataloader(x_train, y_train, x_mark_train, y_mark_train, args["batch_size"], True, False), _create_dataloader(x_val, y_val, x_mark_val, y_mark_val, args["batch_size"], False, False), _create_dataloader(x_test, y_test, x_mark_test, y_mark_test, args["batch_size"], False, False), scaler ) # ============================================================== # Informer-style WINDOW GENERATION # ============================================================== def _prepare_data_with_windows(data, args): """ Generate x, y, x_mark, y_mark using Informer slicing rule. x: [seq_len] y: [label_len + pred_len] """ seq_len = args["lag"] label_len = args["label_len"] pred_len = args["horizon"] L, N, C = data.shape # ---------- construct timestamp features ---------- time_in_day, day_in_week = _generate_time_features(L, args) data_mark = np.concatenate([time_in_day, day_in_week], axis=-1) xs, ys, x_marks, y_marks = [], [], [], [] for s_begin in range(L - seq_len - pred_len - 1): s_end = s_begin + seq_len r_begin = s_end - label_len r_end = r_begin + label_len + pred_len xs.append(data[s_begin:s_end]) ys.append(data[r_begin:r_end]) x_marks.append(data_mark[s_begin:s_end]) y_marks.append(data_mark[r_begin:r_end]) return np.array(xs), np.array(ys), np.array(x_marks), np.array(y_marks) # ============================================================== # TIME FEATURE # ============================================================== def _generate_time_features(L, args): N = args["num_nodes"] # Time in day tid = np.array([i % args["steps_per_day"] / args["steps_per_day"] for i in range(L)]) tid = np.tile(tid[:, None], (1, N)) # Day in week diw = np.array([(i // args["steps_per_day"]) % args["days_per_week"] for i in range(L)]) diw = np.tile(diw[:, None], (1, N)) return tid[..., None], diw[..., None] # ============================================================== # NORMALIZATION # ============================================================== def _normalize_data(train_data, val_data, test_data, args, normalizer): scaler = normalize_dataset( train_data[..., :args["input_dim"]], normalizer, args["column_wise"] ) for data in [train_data, val_data, test_data]: data[..., :args["input_dim"]] = scaler.transform( data[..., :args["input_dim"]] ) return scaler def _apply_existing_scaler(train_data, val_data, test_data, scaler, args): for data in [train_data, val_data, test_data]: data[..., :args["input_dim"]] = scaler.transform( data[..., :args["input_dim"]] ) # ============================================================== # DATALOADER # ============================================================== def _create_dataloader(x, y, x_mark, y_mark, batch_size, shuffle, drop_last): device = torch.device("cuda" if torch.cuda.is_available() else "cpu") dataset = torch.utils.data.TensorDataset( torch.tensor(x, dtype=torch.float32, device=device), torch.tensor(y, dtype=torch.float32, device=device), torch.tensor(x_mark, dtype=torch.float32, device=device), torch.tensor(y_mark, dtype=torch.float32, device=device), ) return torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last) # ============================================================== # SPLIT # ============================================================== def split_data_by_days(data, val_days, test_days, interval=30): t = int((24 * 60) / interval) test_data = data[-t * int(test_days):] val_data = data[-t * int(test_days + val_days):-t * int(test_days)] train_data = data[:-t * int(test_days + val_days)] return train_data, val_data, test_data def split_data_by_ratio(data, val_ratio, test_ratio): L = len(data) test_data = data[-int(L * test_ratio):] val_data = data[-int(L * (test_ratio + val_ratio)):-int(L * test_ratio)] train_data = data[: -int(L * (test_ratio + val_ratio))] return train_data, val_data, test_data # ============================================================== # RESHAPE [B,T,N,C] -> [B*N,T,C] # ============================================================== def _reshape_tensor(*tensors): reshaped = [] for x in tensors: b, t, n, c = x.shape x_new = x.transpose(0, 2, 1, 3).reshape(b * n, t, c) reshaped.append(x_new) return reshaped