diff --git a/.vscode/settings.json b/.vscode/settings.json index a8c2003..2c67ab7 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,5 +1,5 @@ { - "python-envs.defaultEnvManager": "ms-python.python:conda", - "python-envs.defaultPackageManager": "ms-python.python:conda", + "python-envs.defaultEnvManager": "ms-python.python:system", + "python-envs.defaultPackageManager": "ms-python.python:pip", "python-envs.pythonProjects": [] } \ No newline at end of file diff --git a/config/REPST/PEMS-BAY.yaml b/config/REPST/PEMS-BAY.yaml index 33e582e..3209bd0 100755 --- a/config/REPST/PEMS-BAY.yaml +++ b/config/REPST/PEMS-BAY.yaml @@ -3,6 +3,7 @@ basic: mode : "train" device : "cuda:1" model: "REPST" + seed: 2023 data: add_day_in_week: true @@ -49,7 +50,6 @@ train: lr_init: 0.003 max_grad_norm: 5 real_value: true - seed: 12 weight_decay: 0 debug: false output_dim: 1 diff --git a/dataloader/PeMSDdataloader.py b/dataloader/PeMSDdataloader.py index 720b5c3..ea6e89b 100755 --- a/dataloader/PeMSDdataloader.py +++ b/dataloader/PeMSDdataloader.py @@ -2,112 +2,88 @@ from utils.normalization import normalize_dataset from dataloader.data_selector import load_st_dataset import numpy as np -import gc import torch - def get_dataloader(args, normalizer="std", single=True): - data = load_st_dataset(args) # 加载数据 + data = load_st_dataset(args) args = args["data"] - L, N, F = data.shape # 数据形状 + L, N, F = data.shape - # Step 1: data -> x,y + # Generate sliding windows for main data and add time features + x, y = _prepare_data_with_windows(data, args, single) + + # Split data + split_fn = split_data_by_days if args["test_ratio"] > 1 else split_data_by_ratio + x_train, x_val, x_test = split_fn(x, args["val_ratio"], args["test_ratio"]) + y_train, y_val, y_test = split_fn(y, args["val_ratio"], args["test_ratio"]) + + # Normalize x and y using the same scaler + scaler = _normalize_data(x_train, x_val, x_test, args, normalizer) + _apply_existing_scaler(y_train, y_val, y_test, scaler, args) + + # Create dataloaders + return ( + _create_dataloader(x_train, y_train, args["batch_size"], True, False), + _create_dataloader(x_val, y_val, args["batch_size"], False, False), + _create_dataloader(x_test, y_test, args["batch_size"], False, False), + scaler + ) + + +def _prepare_data_with_windows(data, args, single): + # Generate sliding windows for main data x = add_window_x(data, args["lag"], args["horizon"], single) y = add_window_y(data, args["lag"], args["horizon"], single) - del data - gc.collect() + # Generate time features + time_features = _generate_time_features(data.shape[0], args) + + # Add time features to x and y + x = _add_time_features(x, time_features, args["lag"], args["horizon"], single, add_window_x) + y = _add_time_features(y, time_features, args["lag"], args["horizon"], single, add_window_y) + + return x, y - # Step 2: time_in_day, day_in_week -> day, week + +def _generate_time_features(L, args): + N = args["num_nodes"] time_in_day = [i % args["steps_per_day"] / args["steps_per_day"] for i in range(L)] time_in_day = np.tile(np.array(time_in_day), [1, N, 1]).transpose((2, 1, 0)) - day_in_week = [ - (i // args["steps_per_day"]) % args["days_per_week"] for i in range(L) - ] + + day_in_week = [(i // args["steps_per_day"]) % args["days_per_week"] for i in range(L)] day_in_week = np.tile(np.array(day_in_week), [1, N, 1]).transpose((2, 1, 0)) + + return time_in_day, day_in_week - x_day = add_window_x(time_in_day, args["lag"], args["horizon"], single) - x_week = add_window_x(day_in_week, args["lag"], args["horizon"], single) - # Step 3 day, week, x, y --> x, y - x = np.concatenate([x, x_day, x_week], axis=-1) +def _add_time_features(data, time_features, lag, horizon, single, window_fn): + time_in_day, day_in_week = time_features + time_day = window_fn(time_in_day, lag, horizon, single) + time_week = window_fn(day_in_week, lag, horizon, single) + return np.concatenate([data, time_day, time_week], axis=-1) - del x_day, x_week - gc.collect() - # Step 4 x,y --> x_train, x_val, x_test, y_train, y_val, y_test - if args["test_ratio"] > 1: - x_train, x_val, x_test = split_data_by_days( - x, args["val_ratio"], args["test_ratio"] - ) - else: - x_train, x_val, x_test = split_data_by_ratio( - x, args["val_ratio"], args["test_ratio"] - ) +def _normalize_data(train_data, val_data, test_data, args, normalizer): + scaler = normalize_dataset(train_data[..., : args["input_dim"]], normalizer, args["column_wise"]) + + for data in [train_data, val_data, test_data]: + data[..., : args["input_dim"]] = scaler.transform(data[..., : args["input_dim"]]) + + return scaler - del x - gc.collect() - # Normalization - scaler = normalize_dataset( - x_train[..., : args["input_dim"]], normalizer, args["column_wise"] - ) - x_train[..., : args["input_dim"]] = scaler.transform( - x_train[..., : args["input_dim"]] - ) - x_val[..., : args["input_dim"]] = scaler.transform(x_val[..., : args["input_dim"]]) - x_test[..., : args["input_dim"]] = scaler.transform( - x_test[..., : args["input_dim"]] - ) +def _apply_existing_scaler(train_data, val_data, test_data, scaler, args): + for data in [train_data, val_data, test_data]: + data[..., : args["input_dim"]] = scaler.transform(data[..., : args["input_dim"]]) - y_day = add_window_y(time_in_day, args["lag"], args["horizon"], single) - y_week = add_window_y(day_in_week, args["lag"], args["horizon"], single) - del time_in_day, day_in_week - gc.collect() - - y = np.concatenate([y, y_day, y_week], axis=-1) - - del y_day, y_week - gc.collect() - - # Split Y - if args["test_ratio"] > 1: - y_train, y_val, y_test = split_data_by_days( - y, args["val_ratio"], args["test_ratio"] - ) - else: - y_train, y_val, y_test = split_data_by_ratio( - y, args["val_ratio"], args["test_ratio"] - ) - - del y - gc.collect() - - # Step 5: x_train y_train x_val y_val x_test y_test --> train val test - train_dataloader = data_loader( - x_train, y_train, args["batch_size"], shuffle=True, drop_last=True - ) - - del x_train, y_train - gc.collect() - - val_dataloader = data_loader( - x_val, y_val, args["batch_size"], shuffle=False, drop_last=True - ) - - del x_val, y_val - gc.collect() - - test_dataloader = data_loader( - x_test, y_test, args["batch_size"], shuffle=False, drop_last=False - ) - - del x_test, y_test - gc.collect() - - return train_dataloader, val_dataloader, test_dataloader, scaler +def _create_dataloader(X_data, Y_data, batch_size, shuffle, drop_last): + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + X_tensor = torch.tensor(X_data, dtype=torch.float32, device=device) + Y_tensor = torch.tensor(Y_data, dtype=torch.float32, device=device) + dataset = torch.utils.data.TensorDataset(X_tensor, Y_tensor) + return torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last) def split_data_by_days(data, val_days, test_days, interval=30): @@ -128,17 +104,29 @@ def split_data_by_ratio(data, val_ratio, test_ratio): return train_data, val_data, test_data -def data_loader(X, Y, batch_size, shuffle=True, drop_last=True): - device = torch.device("cuda" if torch.cuda.is_available() else "cpu") - X = torch.tensor(X, dtype=torch.float32, device=device) - Y = torch.tensor(Y, dtype=torch.float32, device=device) - data = torch.utils.data.TensorDataset(X, Y) - dataloader = torch.utils.data.DataLoader( - data, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last - ) - return dataloader +def _generate_windows(data, window=3, horizon=1, offset=0): + """ + Internal helper function to generate sliding windows. + + :param data: Input data + :param window: Window size + :param horizon: Horizon size + :param offset: Offset from window start + :return: Windowed data + """ + length = len(data) + end_index = length - horizon - window + 1 + windows = [] + index = 0 + + while index < end_index: + windows.append(data[index + offset : index + offset + window]) + index += 1 + + return np.array(windows) + def add_window_x(data, window=3, horizon=1, single=False): """ Generate windowed X values from the input data. @@ -149,17 +137,7 @@ def add_window_x(data, window=3, horizon=1, single=False): :param single: If True, generate single-step windows, else multi-step :return: X with shape [B, W, ...] """ - length = len(data) - end_index = length - horizon - window + 1 - x = [] # Sliding windows - index = 0 - - while index < end_index: - x.append(data[index : index + window]) - index += 1 - - return np.array(x) - + return _generate_windows(data, window, horizon, offset=0) def add_window_y(data, window=3, horizon=1, single=False): """ @@ -171,21 +149,10 @@ def add_window_y(data, window=3, horizon=1, single=False): :param single: If True, generate single-step windows, else multi-step :return: Y with shape [B, H, ...] """ - length = len(data) - end_index = length - horizon - window + 1 - y = [] # Horizon values - index = 0 - - while index < end_index: - if single: - y.append(data[index + window + horizon - 1 : index + window + horizon]) - else: - y.append(data[index + window : index + window + horizon]) - index += 1 - - return np.array(y) - + offset = window if not single else window + horizon - 1 + return _generate_windows(data, window=1 if single else horizon, horizon=horizon, offset=offset) if __name__ == "__main__": - res = load_st_dataset("SD", 1) - k = 1 + from dataloader.data_selector import load_st_dataset + res = load_st_dataset({"dataset": "SD"}) + print(f"Dataset shape: {res.shape}") diff --git a/model/AEPSA/repst.py b/model/AEPSA/repst.py index 09ceb29..53a6046 100644 --- a/model/AEPSA/repst.py +++ b/model/AEPSA/repst.py @@ -1,4 +1,3 @@ -from tkinter import Y import torch import torch.nn as nn from transformers.models.gpt2.modeling_gpt2 import GPT2Model diff --git a/model/REPST/reprogramming.py b/model/REPST/reprogramming.py index 3b95ca6..3806989 100644 --- a/model/REPST/reprogramming.py +++ b/model/REPST/reprogramming.py @@ -30,7 +30,6 @@ class TokenEmbedding(nn.Module): def forward(self, x): b, n, m, pn, pl = x.shape # batch, node, feature, patch_num, patch_len - # 为什么没permute后reshape? x = x.permute(0, 1, 4, 3, 2) x = self.tokenConv(x.reshape(b*n, pl, m*pn)) # batch*node, patch_len, feature*patch_num x = self.confusion_layer(x) diff --git a/model/REPST/repst.py b/model/REPST/repst.py index 09ceb29..53a6046 100644 --- a/model/REPST/repst.py +++ b/model/REPST/repst.py @@ -1,4 +1,3 @@ -from tkinter import Y import torch import torch.nn as nn from transformers.models.gpt2.modeling_gpt2 import GPT2Model diff --git a/run.py b/run.py index 0a4c49a..175367f 100755 --- a/run.py +++ b/run.py @@ -13,7 +13,7 @@ from trainer.trainer_selector import select_trainer def main(): args = parse_args() args = init.init_device(args) - init.init_seed(args["train"]["seed"]) + init.init_seed(args["basic"]["seed"]) model = init.init_model(args) # Load dataset diff --git a/trainer/Trainer.py b/trainer/Trainer.py index 3048d1b..3c0e2b1 100755 --- a/trainer/Trainer.py +++ b/trainer/Trainer.py @@ -10,11 +10,14 @@ from tqdm import tqdm class TrainingStats: + """记录训练过程中的统计信息""" + def __init__(self, device): self.device = device self.reset() def reset(self): + """重置所有统计数据""" self.gpu_mem_usage_list = [] self.cpu_mem_usage_list = [] self.train_time_list = [] @@ -24,9 +27,11 @@ class TrainingStats: self.end_time = None def start_training(self): + """记录训练开始时间""" self.start_time = time.time() def end_training(self): + """记录训练结束时间""" self.end_time = time.time() def record_step_time(self, duration, mode): @@ -51,6 +56,10 @@ class TrainingStats: self.cpu_mem_usage_list.append(cpu_mem) self.gpu_mem_usage_list.append(gpu_mem) + def _calculate_average(self, values_list): + """安全计算平均值,避免除零错误""" + return sum(values_list) / len(values_list) if values_list else 0 + def report(self, logger): """在训练结束时输出汇总统计""" if not self.start_time or not self.end_time: @@ -58,26 +67,10 @@ class TrainingStats: return total_time = self.end_time - self.start_time - avg_gpu_mem = ( - sum(self.gpu_mem_usage_list) / len(self.gpu_mem_usage_list) - if self.gpu_mem_usage_list - else 0 - ) - avg_cpu_mem = ( - sum(self.cpu_mem_usage_list) / len(self.cpu_mem_usage_list) - if self.cpu_mem_usage_list - else 0 - ) - avg_train_time = ( - sum(self.train_time_list) / len(self.train_time_list) - if self.train_time_list - else 0 - ) - avg_infer_time = ( - sum(self.infer_time_list) / len(self.infer_time_list) - if self.infer_time_list - else 0 - ) + avg_gpu_mem = self._calculate_average(self.gpu_mem_usage_list) + avg_cpu_mem = self._calculate_average(self.cpu_mem_usage_list) + avg_train_time = self._calculate_average(self.train_time_list) + avg_infer_time = self._calculate_average(self.infer_time_list) iters_per_sec = self.total_iters / total_time if total_time > 0 else 0 logger.info("===== Training Summary =====") @@ -93,6 +86,8 @@ class TrainingStats: class Trainer: + """模型训练器,负责整个训练流程的管理""" + def __init__( self, model, @@ -105,37 +100,56 @@ class Trainer: args, lr_scheduler=None, ): + # 设备和基本参数 self.device = args["basic"]["device"] - args = args["train"] + train_args = args["train"] + + # 模型和训练相关组件 self.model = model self.loss = loss self.optimizer = optimizer + self.lr_scheduler = lr_scheduler + + # 数据加载器 self.train_loader = train_loader self.val_loader = val_loader self.test_loader = test_loader + + # 数据处理工具 self.scaler = scaler - self.args = args - self.lr_scheduler = lr_scheduler + self.args = train_args + + # 统计信息 self.train_per_epoch = len(train_loader) self.val_per_epoch = len(val_loader) if val_loader else 0 - # Paths for saving models and logs + # 初始化路径、日志和统计 + self._initialize_paths(train_args) + self._initialize_logger(train_args) + self._initialize_stats() + + def _initialize_paths(self, args): + """初始化模型保存路径""" self.best_path = os.path.join(args["log_dir"], "best_model.pth") self.best_test_path = os.path.join(args["log_dir"], "best_test_model.pth") self.loss_figure_path = os.path.join(args["log_dir"], "loss.png") - - # Initialize logger + + def _initialize_logger(self, args): + """初始化日志记录器""" if not os.path.isdir(args["log_dir"]) and not args["debug"]: os.makedirs(args["log_dir"], exist_ok=True) self.logger = get_logger( args["log_dir"], name=self.model.__class__.__name__, debug=args["debug"] ) self.logger.info(f"Experiment log path in: {args['log_dir']}") - - # Stats tracker + + def _initialize_stats(self): + """初始化统计信息记录器""" self.stats = TrainingStats(device=self.device) def _run_epoch(self, epoch, dataloader, mode): + """运行一个训练/验证/测试epoch""" + # 设置模型模式和是否进行优化 if mode == "train": self.model.train() optimizer_step = True @@ -143,48 +157,64 @@ class Trainer: self.model.eval() optimizer_step = False + # 初始化变量 total_loss = 0 epoch_time = time.time() - y_pred, y_true = [], [] + # 训练/验证循环 with torch.set_grad_enabled(optimizer_step): - progress_bar = tqdm(enumerate(dataloader), total=len(dataloader), desc=f"{mode.capitalize()} Epoch {epoch}") - for batch_idx, (data, target) in progress_bar: + progress_bar = tqdm( + enumerate(dataloader), + total=len(dataloader), + desc=f"{mode.capitalize()} Epoch {epoch}" + ) + + for _, (data, target) in progress_bar: + # 记录步骤开始时间 start_time = time.time() + # 前向传播 label = target[..., : self.args["output_dim"]] output = self.model(data).to(self.device) - - if self.args["real_value"]: - output = self.scaler.inverse_transform(output) - loss = self.loss(output, label) + + # 反归一化 + self.scaler.inverse_transform(output) + self.scaler.inverse_transform(label) + + # 反向传播和优化(仅在训练模式) if optimizer_step and self.optimizer is not None: self.optimizer.zero_grad() loss.backward() + # 梯度裁剪(如果需要) if self.args["grad_norm"]: torch.nn.utils.clip_grad_norm_( self.model.parameters(), self.args["max_grad_norm"] ) self.optimizer.step() + # 记录步骤时间和内存使用 step_time = time.time() - start_time self.stats.record_step_time(step_time, mode) + # 累积损失和预测结果 total_loss += loss.item() y_pred.append(output.detach().cpu()) y_true.append(label.detach().cpu()) - # Update progress bar with the current loss + # 更新进度条 progress_bar.set_postfix(loss=loss.item()) + # 合并所有批次的预测结果 y_pred = torch.cat(y_pred, dim=0) y_true = torch.cat(y_true, dim=0) + # 计算平均损失 avg_loss = total_loss / len(dataloader) - # 输出指标 + + # 计算并记录指标 mae, rmse, mape = all_metrics( y_pred, y_true, self.args["mae_thresh"], self.args["mape_thresh"] ) @@ -192,7 +222,7 @@ class Trainer: f"Epoch #{epoch:02d}: {mode.capitalize():<5} MAE:{mae:5.2f} | RMSE:{rmse:5.2f} | MAPE:{mape:7.4f} | Time: {time.time() - epoch_time:.2f} s" ) - # 记录内存 + # 记录内存使用情况 self.stats.record_memory_usage() return avg_loss @@ -207,22 +237,29 @@ class Trainer: return self._run_epoch(epoch, self.test_loader, "test") def train(self): + """执行完整的训练流程""" + # 初始化最佳模型和损失记录 best_model, best_test_model = None, None best_loss, best_test_loss = float("inf"), float("inf") not_improved_count = 0 + # 开始训练 self.stats.start_training() self.logger.info("Training process started") + # 训练循环 for epoch in range(1, self.args["epochs"] + 1): + # 训练、验证和测试一个epoch train_epoch_loss = self.train_epoch(epoch) val_epoch_loss = self.val_epoch(epoch) test_epoch_loss = self.test_epoch(epoch) + # 检查梯度爆炸 if train_epoch_loss > 1e6: self.logger.warning("Gradient explosion detected. Ending...") break + # 更新最佳验证模型 if val_epoch_loss < best_loss: best_loss = val_epoch_loss not_improved_count = 0 @@ -231,32 +268,51 @@ class Trainer: else: not_improved_count += 1 - if ( - self.args["early_stop"] - and not_improved_count == self.args["early_stop_patience"] - ): - self.logger.info( - f"Validation performance didn't improve for {self.args['early_stop_patience']} epochs. Training stops." - ) + # 检查早停条件 + if self._should_early_stop(not_improved_count): break + # 更新最佳测试模型 if test_epoch_loss < best_test_loss: best_test_loss = test_epoch_loss best_test_model = copy.deepcopy(self.model.state_dict()) + # 保存最佳模型 if not self.args["debug"]: - torch.save(best_model, self.best_path) - torch.save(best_test_model, self.best_test_path) - self.logger.info( - f"Best models saved at {self.best_path} and {self.best_test_path}" - ) + self._save_best_models(best_model, best_test_model) + # 结束训练并输出统计信息 self.stats.end_training() self.stats.report(self.logger) + # 最终评估 self._finalize_training(best_model, best_test_model) - # 输出参数量 + # 输出模型参数量 + self._log_model_params() + + def _should_early_stop(self, not_improved_count): + """检查是否满足早停条件""" + if ( + self.args["early_stop"] + and not_improved_count == self.args["early_stop_patience"] + ): + self.logger.info( + f"Validation performance didn't improve for {self.args['early_stop_patience']} epochs. Training stops." + ) + return True + return False + + def _save_best_models(self, best_model, best_test_model): + """保存最佳模型到文件""" + torch.save(best_model, self.best_path) + torch.save(best_test_model, self.best_test_path) + self.logger.info( + f"Best models saved at {self.best_path} and {self.best_test_path}" + ) + + def _log_model_params(self): + """输出模型可训练参数数量""" try: total_params = sum( p.numel() for p in self.model.parameters() if p.requires_grad @@ -276,14 +332,20 @@ class Trainer: @staticmethod def test(model, args, data_loader, scaler, logger, path=None): + """对模型进行评估并输出性能指标""" + # 加载模型检查点(如果提供了路径) if path: checkpoint = torch.load(path) model.load_state_dict(checkpoint["state_dict"]) model.to(args["basic"]["device"]) + # 设置为评估模式 model.eval() + + # 收集预测和真实标签 y_pred, y_true = [], [] + # 不计算梯度的情况下进行预测 with torch.no_grad(): for data, target in data_loader: label = target[..., : args["output_dim"]] @@ -291,12 +353,14 @@ class Trainer: y_pred.append(output) y_true.append(label) + # 合并所有批次的预测结果 if args["real_value"]: y_pred = scaler.inverse_transform(torch.cat(y_pred, dim=0)) else: y_pred = torch.cat(y_pred, dim=0) y_true = torch.cat(y_true, dim=0) + # 计算并记录每个时间步的指标 for t in range(y_true.shape[1]): mae, rmse, mape = all_metrics( y_pred[:, t, ...], @@ -308,6 +372,7 @@ class Trainer: f"Horizon {t + 1:02d}, MAE: {mae:.4f}, RMSE: {rmse:.4f}, MAPE: {mape:.4f}" ) + # 计算并记录平均指标 mae, rmse, mape = all_metrics( y_pred, y_true, args["mae_thresh"], args["mape_thresh"] )