import numpy as np import torch from dataloader.data_selector import load_st_dataset from utils.normalization import normalize_dataset _device = "cuda" if torch.cuda.is_available() else "cpu" to_tensor = lambda a: torch.as_tensor(a, dtype=torch.float32, device=_device) # Sliding window (stride trick, zero copy) window = lambda d, w, h, o=0: np.lib.stride_tricks.as_strided( d[o:], shape=(len(d) - w - h + 1, w, *d.shape[1:]), strides=(d.strides[0], d.strides[0], *d.strides[1:]) ) # pad_with_last_sample=True pad_last = lambda X, Y, bs: ( (lambda r: ( (np.concatenate([X, np.repeat(X[-1:], r, 0)], 0), np.concatenate([Y, np.repeat(Y[-1:], r, 0)], 0)) if r else (X, Y) ))((-len(X)) % bs) ) # Train / Val / Test split split_by_ratio = lambda d, vr, tr: ( d[:-(vl := int(len(d) * (vr + tr)))], d[-vl:-(tl := int(len(d) * tr))], d[-tl:] ) def get_dataloader(config, normalizer="std", single_step=True): data = load_st_dataset(config) cfg = config["data"] T, N, _ = data.shape lag, horizon, batch_size, input_dim = ( cfg["lag"], cfg["horizon"], cfg["batch_size"], cfg["input_dim"] ) # X / Y construction X = window(data, lag, horizon) Y = window( data, 1 if single_step else horizon, horizon, lag if not single_step else lag + horizon - 1 ) # Time features t = np.arange(T) time_in_day = np.tile((t % cfg["steps_per_day"]) / cfg["steps_per_day"], (N, 1)).T day_in_week = np.tile((t // cfg["steps_per_day"]) % cfg["days_per_week"], (N, 1)).T tf = lambda z: window(z[..., None], lag, horizon) X = np.concatenate([X, tf(time_in_day), tf(day_in_week)], -1) Y = np.concatenate([Y, tf(time_in_day), tf(day_in_week)], -1) # Split X_train, X_val, X_test = split_by_ratio(X, cfg["val_ratio"], cfg["test_ratio"]) Y_train, Y_val, Y_test = split_by_ratio(Y, cfg["val_ratio"], cfg["test_ratio"]) # Channel-wise normalization (fit on train only) scalers = [ normalize_dataset(X_train[..., i:i+1], normalizer, cfg["column_wise"]) for i in range(input_dim) ] for i, sc in enumerate(scalers): for d in (X_train, X_val, X_test, Y_train, Y_val, Y_test): d[..., i:i+1] = sc.transform(d[..., i:i+1]) # Padding X_train, Y_train = pad_last(X_train, Y_train, batch_size) X_val, Y_val = pad_last(X_val, Y_val, batch_size) X_test, Y_test = pad_last(X_test, Y_test, batch_size) # DataLoader make_loader = lambda X, Y, shuffle: torch.utils.data.DataLoader( torch.utils.data.TensorDataset(to_tensor(X), to_tensor(Y)), batch_size=batch_size, shuffle=shuffle, drop_last=False ) return ( make_loader(X_train, Y_train, True), make_loader(X_val, Y_val, False), make_loader(X_test, Y_test, False), scalers )