diff --git a/dataloader/EXPdataloader.py b/dataloader/EXPdataloader.py index 237bf71..18ebf61 100755 --- a/dataloader/EXPdataloader.py +++ b/dataloader/EXPdataloader.py @@ -1,199 +1,90 @@ import numpy as np import torch -from utils.normalization import normalize_dataset from dataloader.data_selector import load_st_dataset +from utils.normalization import normalize_dataset -def get_dataloader(args, normalizer="std", single=True): - # args should now include 'cycle' - data = load_st_dataset(args["type"], args["sample"]) # [T, N, F] - L, N, F = data.shape - # compute cycle index - cycle_arr = np.arange(L) % args["cycle"] # length-L array +_device = "cuda" if torch.cuda.is_available() else "cpu" +to_tensor = lambda a: torch.as_tensor(a, dtype=torch.float32, device=_device) - # Step 1: sliding windows for X and Y - x = add_window_x(data, args["lag"], args["horizon"], single) - y = add_window_y(data, args["lag"], args["horizon"], single) - # window count = M = L - lag - horizon + 1 - M = x.shape[0] +# Sliding window (stride trick, zero copy) +window = lambda d, w, h, o=0: np.lib.stride_tricks.as_strided( + d[o:], + shape=(len(d) - w - h + 1, w, *d.shape[1:]), + strides=(d.strides[0], d.strides[0], *d.strides[1:]) +) - # Step 2: time features - time_in_day = np.tile( - np.array([i % args["steps_per_day"] / args["steps_per_day"] for i in range(L)]), - (N, 1), - ).T.reshape(L, N, 1) - day_in_week = np.tile( - np.array( - [(i // args["steps_per_day"]) % args["days_per_week"] for i in range(L)] - ), - (N, 1), - ).T.reshape(L, N, 1) +# pad_with_last_sample=True +pad_last = lambda X, Y, bs: ( + (lambda r: ( + (np.concatenate([X, np.repeat(X[-1:], r, 0)], 0), + np.concatenate([Y, np.repeat(Y[-1:], r, 0)], 0)) + if r else (X, Y) + ))((-len(X)) % bs) +) - x_day = add_window_x(time_in_day, args["lag"], args["horizon"], single) - x_week = add_window_x(day_in_week, args["lag"], args["horizon"], single) - x = np.concatenate([x, x_day, x_week], axis=-1) - # del x_day, x_week - # gc.collect() +# Train / Val / Test split +split_by_ratio = lambda d, vr, tr: ( + d[:-(vl := int(len(d) * (vr + tr)))], + d[-vl:-(tl := int(len(d) * tr))], + d[-tl:] +) - # Step 3: extract cycle index per window: take value at end of sequence - cycle_win = np.array([cycle_arr[i + args["lag"]] for i in range(M)]) # shape [M] - # Step 4: split into train/val/test - if args["test_ratio"] > 1: - x_train, x_val, x_test = split_data_by_days( - x, args["val_ratio"], args["test_ratio"] - ) - y_train, y_val, y_test = split_data_by_days( - y, args["val_ratio"], args["test_ratio"] - ) - c_train, c_val, c_test = split_data_by_days( - cycle_win, args["val_ratio"], args["test_ratio"] - ) - else: - x_train, x_val, x_test = split_data_by_ratio( - x, args["val_ratio"], args["test_ratio"] - ) - y_train, y_val, y_test = split_data_by_ratio( - y, args["val_ratio"], args["test_ratio"] - ) - c_train, c_val, c_test = split_data_by_ratio( - cycle_win, args["val_ratio"], args["test_ratio"] - ) - # del x, y, cycle_win - # gc.collect() +def get_dataloader(config, normalizer="std", single_step=True): + data = load_st_dataset(config) + cfg = config["data"] - # Step 5: normalization on X only - scaler = normalize_dataset( - x_train[..., : args["input_dim"]], normalizer, args["column_wise"] - ) - x_train[..., : args["input_dim"]] = scaler.transform( - x_train[..., : args["input_dim"]] - ) - x_val[..., : args["input_dim"]] = scaler.transform(x_val[..., : args["input_dim"]]) - x_test[..., : args["input_dim"]] = scaler.transform( - x_test[..., : args["input_dim"]] + T, N, _ = data.shape + lag, horizon, batch_size, input_dim = ( + cfg["lag"], cfg["horizon"], cfg["batch_size"], cfg["input_dim"] ) - # add time features to Y - y_day = add_window_y(time_in_day, args["lag"], args["horizon"], single) - y_week = add_window_y(day_in_week, args["lag"], args["horizon"], single) - y = np.concatenate([y, y_day, y_week], axis=-1) - # del y_day, y_week, time_in_day, day_in_week - # gc.collect() - - # split Y time-augmented - if args["test_ratio"] > 1: - y_train, y_val, y_test = split_data_by_days( - y, args["val_ratio"], args["test_ratio"] - ) - else: - y_train, y_val, y_test = split_data_by_ratio( - y, args["val_ratio"], args["test_ratio"] - ) - # del y - - # Step 6: create dataloaders including cycle index - train_loader = data_loader_with_cycle( - x_train, y_train, c_train, args["batch_size"], shuffle=True, drop_last=True - ) - val_loader = data_loader_with_cycle( - x_val, y_val, c_val, args["batch_size"], shuffle=False, drop_last=True - ) - test_loader = data_loader_with_cycle( - x_test, y_test, c_test, args["batch_size"], shuffle=False, drop_last=False + # X / Y construction + X = window(data, lag, horizon) + Y = window( + data, + 1 if single_step else horizon, + horizon, + lag if not single_step else lag + horizon - 1 ) - return train_loader, val_loader, test_loader, scaler + # Time features + t = np.arange(T) + time_in_day = np.tile((t % cfg["steps_per_day"]) / cfg["steps_per_day"], (N, 1)).T + day_in_week = np.tile((t // cfg["steps_per_day"]) % cfg["days_per_week"], (N, 1)).T + tf = lambda z: window(z[..., None], lag, horizon) + X = np.concatenate([X, tf(time_in_day), tf(day_in_week)], -1) + Y = np.concatenate([Y, tf(time_in_day), tf(day_in_week)], -1) -def data_loader_with_cycle(X, Y, C, batch_size, shuffle=True, drop_last=True): - device = torch.device("cuda" if torch.cuda.is_available() else "cpu") - X_t = torch.tensor(X, dtype=torch.float32, device=device) - Y_t = torch.tensor(Y, dtype=torch.float32, device=device) - C_t = torch.tensor(C, dtype=torch.long, device=device).unsqueeze(-1) # [B,1] - dataset = torch.utils.data.TensorDataset(X_t, Y_t, C_t) - loader = torch.utils.data.DataLoader( - dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last - ) - return loader + # Split + X_train, X_val, X_test = split_by_ratio(X, cfg["val_ratio"], cfg["test_ratio"]) + Y_train, Y_val, Y_test = split_by_ratio(Y, cfg["val_ratio"], cfg["test_ratio"]) - -def split_data_by_days(data, val_days, test_days, interval=30): - t = int((24 * 60) / interval) - test_data = data[-t * int(test_days) :] - val_data = data[-t * int(test_days + val_days) : -t * int(test_days)] - train_data = data[: -t * int(test_days + val_days)] - return train_data, val_data, test_data - - -def split_data_by_ratio(data, val_ratio, test_ratio): - data_len = data.shape[0] - test_data = data[-int(data_len * test_ratio) :] - val_data = data[ - -int(data_len * (test_ratio + val_ratio)) : -int(data_len * test_ratio) + # Channel-wise normalization (fit on train only) + scalers = [ + normalize_dataset(X_train[..., i:i+1], normalizer, cfg["column_wise"]) + for i in range(input_dim) ] - train_data = data[: -int(data_len * (test_ratio + val_ratio))] - return train_data, val_data, test_data + for i, sc in enumerate(scalers): + for d in (X_train, X_val, X_test, Y_train, Y_val, Y_test): + d[..., i:i+1] = sc.transform(d[..., i:i+1]) + # Padding + X_train, Y_train = pad_last(X_train, Y_train, batch_size) + X_val, Y_val = pad_last(X_val, Y_val, batch_size) + X_test, Y_test = pad_last(X_test, Y_test, batch_size) -def data_loader(X, Y, batch_size, shuffle=True, drop_last=True): - device = torch.device("cuda" if torch.cuda.is_available() else "cpu") - X = torch.tensor(X, dtype=torch.float32, device=device) - Y = torch.tensor(Y, dtype=torch.float32, device=device) - data = torch.utils.data.TensorDataset(X, Y) - dataloader = torch.utils.data.DataLoader( - data, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last + # DataLoader + make_loader = lambda X, Y, shuffle: torch.utils.data.DataLoader( + torch.utils.data.TensorDataset(to_tensor(X), to_tensor(Y)), + batch_size=batch_size, shuffle=shuffle, drop_last=False ) - return dataloader - -def add_window_x(data, window=3, horizon=1, single=False): - """ - Generate windowed X values from the input data. - - :param data: Input data, shape [B, ...] - :param window: Size of the sliding window - :param horizon: Horizon size - :param single: If True, generate single-step windows, else multi-step - :return: X with shape [B, W, ...] - """ - length = len(data) - end_index = length - horizon - window + 1 - x = [] # Sliding windows - index = 0 - - while index < end_index: - x.append(data[index : index + window]) - index += 1 - - return np.array(x) - - -def add_window_y(data, window=3, horizon=1, single=False): - """ - Generate windowed Y values from the input data. - - :param data: Input data, shape [B, ...] - :param window: Size of the sliding window - :param horizon: Horizon size - :param single: If True, generate single-step windows, else multi-step - :return: Y with shape [B, H, ...] - """ - length = len(data) - end_index = length - horizon - window + 1 - y = [] # Horizon values - index = 0 - - while index < end_index: - if single: - y.append(data[index + window + horizon - 1 : index + window + horizon]) - else: - y.append(data[index + window : index + window + horizon]) - index += 1 - - return np.array(y) - - -if __name__ == "__main__": - res = load_st_dataset("SD", 1) - k = 1 + return ( + make_loader(X_train, Y_train, True), + make_loader(X_val, Y_val, False), + make_loader(X_test, Y_test, False), + scalers + ) diff --git a/dataloader/PeMSDdataloader.py b/dataloader/PeMSDdataloader.py index 0e079e1..18ebf61 100755 --- a/dataloader/PeMSDdataloader.py +++ b/dataloader/PeMSDdataloader.py @@ -1,158 +1,90 @@ import numpy as np import torch - from dataloader.data_selector import load_st_dataset from utils.normalization import normalize_dataset -def get_dataloader(args, normalizer="std", single=True): - data = load_st_dataset(args) - args = args["data"] - L, N, F = data.shape +_device = "cuda" if torch.cuda.is_available() else "cpu" +to_tensor = lambda a: torch.as_tensor(a, dtype=torch.float32, device=_device) - # Generate sliding windows for main data and add time features - x, y = _prepare_data_with_windows(data, args, single) - - # Split data - split_fn = split_data_by_days if args["test_ratio"] > 1 else split_data_by_ratio - x_train, x_val, x_test = split_fn(x, args["val_ratio"], args["test_ratio"]) - y_train, y_val, y_test = split_fn(y, args["val_ratio"], args["test_ratio"]) +# Sliding window (stride trick, zero copy) +window = lambda d, w, h, o=0: np.lib.stride_tricks.as_strided( + d[o:], + shape=(len(d) - w - h + 1, w, *d.shape[1:]), + strides=(d.strides[0], d.strides[0], *d.strides[1:]) +) - # Normalize x and y using the same scaler - scaler = _normalize_data(x_train, x_val, x_test, args, normalizer) - _apply_existing_scaler(y_train, y_val, y_test, scaler, args) +# pad_with_last_sample=True +pad_last = lambda X, Y, bs: ( + (lambda r: ( + (np.concatenate([X, np.repeat(X[-1:], r, 0)], 0), + np.concatenate([Y, np.repeat(Y[-1:], r, 0)], 0)) + if r else (X, Y) + ))((-len(X)) % bs) +) - # Create dataloaders - return ( - _create_dataloader(x_train, y_train, args["batch_size"], True, False), - _create_dataloader(x_val, y_val, args["batch_size"], False, False), - _create_dataloader(x_test, y_test, args["batch_size"], False, False), - scaler +# Train / Val / Test split +split_by_ratio = lambda d, vr, tr: ( + d[:-(vl := int(len(d) * (vr + tr)))], + d[-vl:-(tl := int(len(d) * tr))], + d[-tl:] +) + + +def get_dataloader(config, normalizer="std", single_step=True): + data = load_st_dataset(config) + cfg = config["data"] + + T, N, _ = data.shape + lag, horizon, batch_size, input_dim = ( + cfg["lag"], cfg["horizon"], cfg["batch_size"], cfg["input_dim"] ) + # X / Y construction + X = window(data, lag, horizon) + Y = window( + data, + 1 if single_step else horizon, + horizon, + lag if not single_step else lag + horizon - 1 + ) -def _prepare_data_with_windows(data, args, single): - # Generate sliding windows for main data - x = add_window_x(data, args["lag"], args["horizon"], single) - y = add_window_y(data, args["lag"], args["horizon"], single) + # Time features + t = np.arange(T) + time_in_day = np.tile((t % cfg["steps_per_day"]) / cfg["steps_per_day"], (N, 1)).T + day_in_week = np.tile((t // cfg["steps_per_day"]) % cfg["days_per_week"], (N, 1)).T + tf = lambda z: window(z[..., None], lag, horizon) - # Generate time features - time_features = _generate_time_features(data.shape[0], args) - - # Add time features to x and y - x = _add_time_features(x, time_features, args["lag"], args["horizon"], single, add_window_x) - y = _add_time_features(y, time_features, args["lag"], args["horizon"], single, add_window_y) - - return x, y + X = np.concatenate([X, tf(time_in_day), tf(day_in_week)], -1) + Y = np.concatenate([Y, tf(time_in_day), tf(day_in_week)], -1) + # Split + X_train, X_val, X_test = split_by_ratio(X, cfg["val_ratio"], cfg["test_ratio"]) + Y_train, Y_val, Y_test = split_by_ratio(Y, cfg["val_ratio"], cfg["test_ratio"]) -def _generate_time_features(L, args): - N = args["num_nodes"] - time_in_day = [i % args["steps_per_day"] / args["steps_per_day"] for i in range(L)] - time_in_day = np.tile(np.array(time_in_day), [1, N, 1]).transpose((2, 1, 0)) - - day_in_week = [(i // args["steps_per_day"]) % args["days_per_week"] for i in range(L)] - day_in_week = np.tile(np.array(day_in_week), [1, N, 1]).transpose((2, 1, 0)) - - return time_in_day, day_in_week - - -def _add_time_features(data, time_features, lag, horizon, single, window_fn): - time_in_day, day_in_week = time_features - time_day = window_fn(time_in_day, lag, horizon, single) - time_week = window_fn(day_in_week, lag, horizon, single) - return np.concatenate([data, time_day, time_week], axis=-1) - - -def _normalize_data(train_data, val_data, test_data, args, normalizer): - scaler = normalize_dataset(train_data[..., : args["input_dim"]], normalizer, args["column_wise"]) - - for data in [train_data, val_data, test_data]: - data[..., : args["input_dim"]] = scaler.transform(data[..., : args["input_dim"]]) - - return scaler - - -def _apply_existing_scaler(train_data, val_data, test_data, scaler, args): - for data in [train_data, val_data, test_data]: - data[..., : args["input_dim"]] = scaler.transform(data[..., : args["input_dim"]]) - - -def _create_dataloader(X_data, Y_data, batch_size, shuffle, drop_last): - device = torch.device("cuda" if torch.cuda.is_available() else "cpu") - X_tensor = torch.tensor(X_data, dtype=torch.float32, device=device) - Y_tensor = torch.tensor(Y_data, dtype=torch.float32, device=device) - dataset = torch.utils.data.TensorDataset(X_tensor, Y_tensor) - return torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last) - - -def split_data_by_days(data, val_days, test_days, interval=30): - t = int((24 * 60) / interval) - test_data = data[-t * int(test_days) :] - val_data = data[-t * int(test_days + val_days) : -t * int(test_days)] - train_data = data[: -t * int(test_days + val_days)] - return train_data, val_data, test_data - - -def split_data_by_ratio(data, val_ratio, test_ratio): - data_len = data.shape[0] - test_data = data[-int(data_len * test_ratio) :] - val_data = data[ - -int(data_len * (test_ratio + val_ratio)) : -int(data_len * test_ratio) + # Channel-wise normalization (fit on train only) + scalers = [ + normalize_dataset(X_train[..., i:i+1], normalizer, cfg["column_wise"]) + for i in range(input_dim) ] - train_data = data[: -int(data_len * (test_ratio + val_ratio))] - return train_data, val_data, test_data + for i, sc in enumerate(scalers): + for d in (X_train, X_val, X_test, Y_train, Y_val, Y_test): + d[..., i:i+1] = sc.transform(d[..., i:i+1]) + # Padding + X_train, Y_train = pad_last(X_train, Y_train, batch_size) + X_val, Y_val = pad_last(X_val, Y_val, batch_size) + X_test, Y_test = pad_last(X_test, Y_test, batch_size) + # DataLoader + make_loader = lambda X, Y, shuffle: torch.utils.data.DataLoader( + torch.utils.data.TensorDataset(to_tensor(X), to_tensor(Y)), + batch_size=batch_size, shuffle=shuffle, drop_last=False + ) - -def _generate_windows(data, window=3, horizon=1, offset=0): - """ - Internal helper function to generate sliding windows. - - :param data: Input data - :param window: Window size - :param horizon: Horizon size - :param offset: Offset from window start - :return: Windowed data - """ - length = len(data) - end_index = length - horizon - window + 1 - windows = [] - index = 0 - - while index < end_index: - windows.append(data[index + offset : index + offset + window]) - index += 1 - - return np.array(windows) - -def add_window_x(data, window=3, horizon=1, single=False): - """ - Generate windowed X values from the input data. - - :param data: Input data, shape [B, ...] - :param window: Size of the sliding window - :param horizon: Horizon size - :param single: If True, generate single-step windows, else multi-step - :return: X with shape [B, W, ...] - """ - return _generate_windows(data, window, horizon, offset=0) - -def add_window_y(data, window=3, horizon=1, single=False): - """ - Generate windowed Y values from the input data. - - :param data: Input data, shape [B, ...] - :param window: Size of the sliding window - :param horizon: Horizon size - :param single: If True, generate single-step windows, else multi-step - :return: Y with shape [B, H, ...] - """ - offset = window if not single else window + horizon - 1 - return _generate_windows(data, window=1 if single else horizon, horizon=horizon, offset=offset) - -# if __name__ == "__main__": -# from dataloader.data_selector import load_st_dataset -# res = load_st_dataset({"dataset": "SD"}) -# print(f"Dataset shape: {res.shape}") + return ( + make_loader(X_train, Y_train, True), + make_loader(X_val, Y_val, False), + make_loader(X_test, Y_test, False), + scalers + ) diff --git a/train.py b/train.py index 2d3a32f..83d056a 100644 --- a/train.py +++ b/train.py @@ -13,7 +13,7 @@ def read_config(config_path): # 全局配置 device = "cuda:0" # 指定设备为cuda:0 seed = 2023 # 随机种子 - epochs = 1 + epochs = 1 # 训练轮数 # 拷贝项 config["basic"]["device"] = device @@ -90,8 +90,9 @@ def main(model, data, debug=False): if __name__ == "__main__": # 调试用 # model_list = ["iTransformer", "PatchTST", "HI"] - model_list = ["ASTRA_v2", "GWN", "REPST", "STAEFormer", "MTGNN"] + # model_list = ["ASTRA_v2", "GWN", "REPST", "STAEFormer", "MTGNN"] + model_list = ["MTGNN"] # dataset_list = ["AirQuality", "SolarEnergy", "PEMS-BAY", "METR-LA", "BJTaxi-InFlow", "BJTaxi-OutFlow", "NYCBike-InFlow", "NYCBike-OutFlow"] - # dataset_list = ["AirQuality"] - dataset_list = ["AirQuality", "SolarEnergy", "METR-LA", "NYCBike-InFlow", "NYCBike-OutFlow"] - main(model_list, dataset_list, debug = True) \ No newline at end of file + dataset_list = ["AirQuality"] + # dataset_list = ["AirQuality", "SolarEnergy", "METR-LA", "NYCBike-InFlow", "NYCBike-OutFlow"] + main(model_list, dataset_list, debug = False) \ No newline at end of file diff --git a/trainer/TSTrainer.py b/trainer/TSTrainer.py index 932d8b3..5ba71f2 100755 --- a/trainer/TSTrainer.py +++ b/trainer/TSTrainer.py @@ -22,7 +22,6 @@ class Trainer: train_loader, val_loader, test_loader, scaler, args, lr_scheduler=None): - self.config = args self.device = args["basic"]["device"] self.args = args["train"] diff --git a/trainer/Trainer.py b/trainer/Trainer.py index cdd444b..7c9aee0 100755 --- a/trainer/Trainer.py +++ b/trainer/Trainer.py @@ -3,6 +3,7 @@ from tqdm import tqdm from utils.logger import get_logger from utils.loss_function import all_metrics + class Trainer: def __init__(self, model, loss, optimizer, train_loader, val_loader, test_loader, @@ -22,6 +23,16 @@ class Trainer: self.test_loader = test_loader self.scaler = scaler + # ===== 新增:统一反归一化接口(单 scaler / 多 scaler 通吃)===== + self.inv = ( + (lambda x: self.scaler.inverse_transform(x)) + if not isinstance(self.scaler, (list, tuple)) + else (lambda x: torch.cat( + [s.inverse_transform(x[..., i:i+1]) + for i, s in enumerate(self.scaler)], + dim=-1)) + ) + self._init_paths() self._init_logger() @@ -56,16 +67,14 @@ class Trainer: out = self.model(data) if os.environ.get("TRY") == "True": - if out.shape == label.shape: - print(f"shape true, out: {out.shape}, label: {label.shape}") - assert False - else: - print(f"shape false, out: {out.shape}, label: {label.shape}") - assert False + print(f"out: {out.shape}, label: {label.shape}") + assert False loss = self.loss(out, label) - d_out = self.scaler.inverse_transform(out) - d_lbl = self.scaler.inverse_transform(label) + + # ===== 修改点:反归一化 ===== + d_out = self.inv(out) + d_lbl = self.inv(label) d_loss = self.loss(d_out, d_lbl) total_loss += d_loss.item() @@ -120,12 +129,10 @@ class Trainer: if losses["val"] < best: best, patience = losses["val"], 0 best_w = copy.deepcopy(self.model.state_dict()) - self.logger.info("Best validation model saved") else: patience += 1 if self.args["early_stop"] and patience == self.args["early_stop_patience"]: - self.logger.info("Early stopping triggered") break if losses["test"] < best_test: @@ -154,14 +161,12 @@ class Trainer: for data, target in self.test_loader: data, target = data.to(self.device), target.to(self.device) label = target[..., :self.args["output_dim"]] - - out = self.model(data) - - y_pred.append(out.cpu()) + y_pred.append(self.model(data).cpu()) y_true.append(label.cpu()) - d_pred = self.scaler.inverse_transform(torch.cat(y_pred)) - d_true = self.scaler.inverse_transform(torch.cat(y_true)) + # ===== 修改点:反归一化 ===== + d_pred = self.inv(torch.cat(y_pred)) + d_true = self.inv(torch.cat(y_true)) for t in range(d_true.shape[1]): mae, rmse, mape = all_metrics( @@ -172,9 +177,12 @@ class Trainer: self.logger.info( f"Horizon {t+1:02d} MAE:{mae:.4f} RMSE:{rmse:.4f} MAPE:{mape:.4f}" ) - - avg_mae, avg_rmse, avg_mape = all_metrics(d_pred, d_true, self.args["mae_thresh"], self.args["mape_thresh"]) + + avg_mae, avg_rmse, avg_mape = all_metrics( + d_pred, d_true, + self.args["mae_thresh"], + self.args["mape_thresh"] + ) self.logger.info( f"AVG MAE:{avg_mae:.4f} AVG RMSE:{avg_rmse:.4f} AVG MAPE:{avg_mape:.4f}" ) -