trainer添加效率统计

This commit is contained in:
czzhangheng 2025-08-17 14:03:26 +08:00
parent 61565cd33a
commit bacdce7c72
7 changed files with 10113 additions and 1750 deletions

11398
baseline.ipynb Executable file → Normal file

File diff suppressed because it is too large Load Diff

31
lib/dl_other_dataset.py Normal file
View File

@ -0,0 +1,31 @@
if __name__ == '__main__':
import os
import subprocess
# Kaggle 数据集列表
datasets = {
"hangzhou_taxi": "changyuheng/hz-taxi",
"nyc_taxi": "new-york-city/nyc-taxi-trip-duration",
"hangzhou_bike": "changyuheng/hz-bike"
}
# 下载保存目录
save_dir = "./datasets"
os.makedirs(save_dir, exist_ok=True)
# 检查 Kaggle API 配置
kaggle_json = os.path.expanduser("~/.kaggle/kaggle.json")
if not os.path.exists(kaggle_json):
raise FileNotFoundError(f"未找到 {kaggle_json},请先在 Kaggle 设置中下载并放置 API Key。")
# 循环下载
for name, kaggle_id in datasets.items():
print(f"📥 正在下载 {name} ({kaggle_id}) ...")
cmd = [
"kaggle", "datasets", "download", "-d", kaggle_id,
"-p", os.path.join(save_dir, name), "--unzip"
]
subprocess.run(cmd, check=True)
print(f"{name} 下载完成\n")
print("🎉 所有数据集已下载到", save_dir)

View File

@ -4,43 +4,47 @@ from datetime import datetime
def get_logger(root, name=None, debug=True):
# when debug is true, show DEBUG and INFO in screen
# when debug is false, show DEBUG in file and info in both screen&file
# INFO will always be in screen
# create a logger
"""
创建带时间戳的日志记录器
:param root: 日志文件目录
:param name: 日志名称
:param debug: 是否开启调试模式
"""
logger = logging.getLogger(name)
# critical > error > warning > info > debug > notset
logger.setLevel(logging.DEBUG)
# define the formate
formatter = logging.Formatter('%(asctime)s: %(message)s', "%m/%d %H:%M")
# create another handler for output log to console
# 避免重复添加 Handler
if logger.hasHandlers():
logger.handlers.clear()
# 时间格式改为 年/月/日 时:分:秒
formatter = logging.Formatter('%(asctime)s - %(message)s', "%Y/%m/%d %H:%M:%S")
# 控制台输出
console_handler = logging.StreamHandler()
if debug:
console_handler.setLevel(logging.DEBUG)
else:
console_handler.setLevel(logging.INFO)
# create a handler for write log to file
console_handler.setFormatter(formatter)
console_handler.setLevel(logging.DEBUG if debug else logging.INFO)
logger.addHandler(console_handler)
# 文件输出(仅非 debug 模式)
if not debug:
os.makedirs(root, exist_ok=True)
logfile = os.path.join(root, 'run.log')
print('Creat Log File in: ', logfile)
file_handler = logging.FileHandler(logfile, mode='w')
print(f"Create Log File in: {logfile}")
file_handler = logging.FileHandler(logfile, mode='w', encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# add Handler to logger
logger.addHandler(console_handler)
if not debug:
logger.addHandler(file_handler)
return logger
if __name__ == '__main__':
time = datetime.now().strftime('%Y%m%d%H%M%S')
print(time)
logger = get_logger('./log.txt', debug=True)
time_str = datetime.now().strftime('%Y%m%d%H%M%S')
print(time_str)
logger = get_logger('./logs', debug=False) # 改成 False 测试文件输出
logger.debug('this is a {} debug message'.format(1))
logger.info('this is an info message')
logger.debug('this is a debug message')
logger.info('this is an info message')
logger.debug('this is a debug message')
logger.info('this is an info message')

72
lib/training_stats.py Normal file
View File

@ -0,0 +1,72 @@
# 新建 lib/training_stats.py
import time
import psutil
import torch
import os
class TrainingStats:
def __init__(self, device):
self.device = device
self.reset()
def reset(self):
self.gpu_mem_usage_list = []
self.cpu_mem_usage_list = []
self.train_time_list = []
self.infer_time_list = []
self.total_iters = 0
self.start_time = None
self.end_time = None
def start_training(self):
self.start_time = time.time()
def end_training(self):
self.end_time = time.time()
def record_step_time(self, duration, mode):
"""记录单步耗时和总迭代次数"""
if mode == 'train':
self.train_time_list.append(duration)
else:
self.infer_time_list.append(duration)
self.total_iters += 1
def record_memory_usage(self):
"""记录当前 GPU 和 CPU 内存占用"""
process = psutil.Process(os.getpid())
cpu_mem = process.memory_info().rss / (1024 ** 2)
if torch.cuda.is_available():
gpu_mem = torch.cuda.max_memory_allocated(device=self.device) / (1024 ** 2)
torch.cuda.reset_peak_memory_stats(device=self.device)
else:
gpu_mem = 0.0
self.cpu_mem_usage_list.append(cpu_mem)
self.gpu_mem_usage_list.append(gpu_mem)
def report(self, logger):
"""在训练结束时输出汇总统计"""
if not self.start_time or not self.end_time:
logger.warning("TrainingStats: start/end time not recorded properly.")
return
total_time = self.end_time - self.start_time
avg_gpu_mem = sum(self.gpu_mem_usage_list) / len(self.gpu_mem_usage_list) if self.gpu_mem_usage_list else 0
avg_cpu_mem = sum(self.cpu_mem_usage_list) / len(self.cpu_mem_usage_list) if self.cpu_mem_usage_list else 0
avg_train_time = sum(self.train_time_list) / len(self.train_time_list) if self.train_time_list else 0
avg_infer_time = sum(self.infer_time_list) / len(self.infer_time_list) if self.infer_time_list else 0
iters_per_sec = self.total_iters / total_time if total_time > 0 else 0
logger.info("===== Training Summary =====")
logger.info(f"Total training time: {total_time:.2f} s")
logger.info(f"Total iterations: {self.total_iters}")
logger.info(f"Average iterations per second: {iters_per_sec:.2f}")
logger.info(f"Average GPU Memory Usage: {avg_gpu_mem:.2f} MB")
logger.info(f"Average CPU Memory Usage: {avg_cpu_mem:.2f} MB")
if avg_train_time:
logger.info(f"Average training step time: {avg_train_time*1000:.2f} ms")
if avg_infer_time:
logger.info(f"Average inference step time: {avg_infer_time*1000:.2f} ms")

44
run.py
View File

@ -18,8 +18,6 @@ from trainer.trainer_selector import select_trainer
import yaml
def main():
args = parse_args()
@ -34,28 +32,26 @@ def main():
# Initialize model
model = init_model(args['model'], device=args['device'])
if args['mode'] == "benchmark":
# 支持计算消耗分析,设置 mode为 benchmark
import torch.profiler as profiler
dummy_input = torch.randn((64, 12, args['model']['num_nodes'], 3), device=args['device'])
min_val = dummy_input.min(dim=-1, keepdim=True)[0]
max_val = dummy_input.max(dim=-1, keepdim=True)[0]
dummy_input = (dummy_input - min_val) / (max_val - min_val + 1e-6)
with profiler.profile(
activities=[
profiler.ProfilerActivity.CPU,
profiler.ProfilerActivity.CUDA
],
with_stack=True,
profile_memory=True,
record_shapes=True
) as prof:
out = model(dummy_input)
print(prof.key_averages().table(sort_by="cuda_time_total", row_limit=10))
return 0
# if args['mode'] == "benchmark":
# # 支持计算消耗分析,设置 mode为 benchmark
# import torch.profiler as profiler
# dummy_input = torch.randn((64, 12, args['model']['num_nodes'], 3), device=args['device'])
# min_val = dummy_input.min(dim=-1, keepdim=True)[0]
# max_val = dummy_input.max(dim=-1, keepdim=True)[0]
#
# dummy_input = (dummy_input - min_val) / (max_val - min_val + 1e-6)
# with profiler.profile(
# activities=[
# profiler.ProfilerActivity.CPU,
# profiler.ProfilerActivity.CUDA
# ],
# with_stack=True,
# profile_memory=True,
# record_shapes=True
# ) as prof:
# out = model(dummy_input)
# print(prof.key_averages().table(sort_by="cuda_time_total", row_limit=10))
# return 0
# Load dataset
train_loader, val_loader, test_loader, scaler, *extra_data = get_dataloader(

View File

@ -2,6 +2,7 @@ import math
import os
import time
import copy
import psutil
from tqdm import tqdm
import torch
@ -9,6 +10,73 @@ from lib.logger import get_logger
from lib.loss_function import all_metrics
class TrainingStats:
def __init__(self, device):
self.device = device
self.reset()
def reset(self):
self.gpu_mem_usage_list = []
self.cpu_mem_usage_list = []
self.train_time_list = []
self.infer_time_list = []
self.total_iters = 0
self.start_time = None
self.end_time = None
def start_training(self):
self.start_time = time.time()
def end_training(self):
self.end_time = time.time()
def record_step_time(self, duration, mode):
"""记录单步耗时和总迭代次数"""
if mode == 'train':
self.train_time_list.append(duration)
else:
self.infer_time_list.append(duration)
self.total_iters += 1
def record_memory_usage(self):
"""记录当前 GPU 和 CPU 内存占用"""
process = psutil.Process(os.getpid())
cpu_mem = process.memory_info().rss / (1024 ** 2)
if torch.cuda.is_available():
gpu_mem = torch.cuda.max_memory_allocated(device=self.device) / (1024 ** 2)
torch.cuda.reset_peak_memory_stats(device=self.device)
else:
gpu_mem = 0.0
self.cpu_mem_usage_list.append(cpu_mem)
self.gpu_mem_usage_list.append(gpu_mem)
def report(self, logger):
"""在训练结束时输出汇总统计"""
if not self.start_time or not self.end_time:
logger.warning("TrainingStats: start/end time not recorded properly.")
return
total_time = self.end_time - self.start_time
avg_gpu_mem = sum(self.gpu_mem_usage_list) / len(self.gpu_mem_usage_list) if self.gpu_mem_usage_list else 0
avg_cpu_mem = sum(self.cpu_mem_usage_list) / len(self.cpu_mem_usage_list) if self.cpu_mem_usage_list else 0
avg_train_time = sum(self.train_time_list) / len(self.train_time_list) if self.train_time_list else 0
avg_infer_time = sum(self.infer_time_list) / len(self.infer_time_list) if self.infer_time_list else 0
iters_per_sec = self.total_iters / total_time if total_time > 0 else 0
logger.info("===== Training Summary =====")
logger.info(f"Total training time: {total_time:.2f} s")
logger.info(f"Total iterations: {self.total_iters}")
logger.info(f"Average iterations per second: {iters_per_sec:.2f}")
logger.info(f"Average GPU Memory Usage: {avg_gpu_mem:.2f} MB")
logger.info(f"Average CPU Memory Usage: {avg_cpu_mem:.2f} MB")
if avg_train_time:
logger.info(f"Average training step time: {avg_train_time*1000:.2f} ms")
if avg_infer_time:
logger.info(f"Average inference step time: {avg_infer_time*1000:.2f} ms")
class Trainer:
def __init__(self, model, loss, optimizer, train_loader, val_loader, test_loader,
scaler, args, lr_scheduler=None):
@ -35,6 +103,9 @@ class Trainer:
self.logger = get_logger(args['log_dir'], name=self.model.__class__.__name__, debug=args['debug'])
self.logger.info(f"Experiment log path in: {args['log_dir']}")
# Stats tracker
self.stats = TrainingStats(device=args['device'])
def _run_epoch(self, epoch, dataloader, mode):
if mode == 'train':
self.model.train()
@ -49,6 +120,8 @@ class Trainer:
with torch.set_grad_enabled(optimizer_step):
with tqdm(total=len(dataloader), desc=f'{mode.capitalize()} Epoch {epoch}') as pbar:
for batch_idx, (data, target) in enumerate(dataloader):
start_time = time.time()
label = target[..., :self.args['output_dim']]
output = self.model(data).to(self.args['device'])
@ -64,19 +137,25 @@ class Trainer:
torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.args['max_grad_norm'])
self.optimizer.step()
step_time = time.time() - start_time
self.stats.record_step_time(step_time, mode)
total_loss += loss.item()
if mode == 'train' and (batch_idx + 1) % self.args['log_step'] == 0:
self.logger.info(
f'Train Epoch {epoch}: {batch_idx + 1}/{len(dataloader)} Loss: {loss.item():.6f}')
# 更新 tqdm 的进度
pbar.update(1)
pbar.set_postfix(loss=loss.item())
avg_loss = total_loss / len(dataloader)
self.logger.info(
f'{mode.capitalize()} Epoch {epoch}: average Loss: {avg_loss:.6f}, time: {time.time() - epoch_time:.2f} s')
# 记录内存
self.stats.record_memory_usage()
return avg_loss
def train_epoch(self, epoch):
@ -93,7 +172,9 @@ class Trainer:
best_loss, best_test_loss = float('inf'), float('inf')
not_improved_count = 0
self.stats.start_training()
self.logger.info("Training process started")
for epoch in range(1, self.args['epochs'] + 1):
train_epoch_loss = self.train_epoch(epoch)
val_epoch_loss = self.val_epoch(epoch)
@ -125,6 +206,9 @@ class Trainer:
torch.save(best_test_model, self.best_test_path)
self.logger.info(f"Best models saved at {self.best_path} and {self.best_test_path}")
self.stats.end_training()
self.stats.report(self.logger)
self._finalize_training(best_model, best_test_model)
def _finalize_training(self, best_model, best_test_model):
@ -159,10 +243,6 @@ class Trainer:
y_pred = torch.cat(y_pred, dim=0)
y_true = torch.cat(y_true, dim=0)
# 你在这里需要把y_pred和y_true保存下来
# torch.save(y_pred, "./test/PEMS07/y_pred_D.pt") # [3566,12,170,1]
# torch.save(y_true, "./test/PEMS08/y_true.pt") # [3566,12,170,1]
for t in range(y_true.shape[1]):
mae, rmse, mape = all_metrics(y_pred[:, t, ...], y_true[:, t, ...],
args['mae_thresh'], args['mape_thresh'])

176
trainer/Trainer_old.py Executable file
View File

@ -0,0 +1,176 @@
import math
import os
import time
import copy
from tqdm import tqdm
import torch
from lib.logger import get_logger
from lib.loss_function import all_metrics
class Trainer:
def __init__(self, model, loss, optimizer, train_loader, val_loader, test_loader,
scaler, args, lr_scheduler=None):
self.model = model
self.loss = loss
self.optimizer = optimizer
self.train_loader = train_loader
self.val_loader = val_loader
self.test_loader = test_loader
self.scaler = scaler
self.args = args
self.lr_scheduler = lr_scheduler
self.train_per_epoch = len(train_loader)
self.val_per_epoch = len(val_loader) if val_loader else 0
# Paths for saving models and logs
self.best_path = os.path.join(args['log_dir'], 'best_model.pth')
self.best_test_path = os.path.join(args['log_dir'], 'best_test_model.pth')
self.loss_figure_path = os.path.join(args['log_dir'], 'loss.png')
# Initialize logger
if not os.path.isdir(args['log_dir']) and not args['debug']:
os.makedirs(args['log_dir'], exist_ok=True)
self.logger = get_logger(args['log_dir'], name=self.model.__class__.__name__, debug=args['debug'])
self.logger.info(f"Experiment log path in: {args['log_dir']}")
def _run_epoch(self, epoch, dataloader, mode):
if mode == 'train':
self.model.train()
optimizer_step = True
else:
self.model.eval()
optimizer_step = False
total_loss = 0
epoch_time = time.time()
with torch.set_grad_enabled(optimizer_step):
with tqdm(total=len(dataloader), desc=f'{mode.capitalize()} Epoch {epoch}') as pbar:
for batch_idx, (data, target) in enumerate(dataloader):
label = target[..., :self.args['output_dim']]
output = self.model(data).to(self.args['device'])
if self.args['real_value']:
output = self.scaler.inverse_transform(output)
loss = self.loss(output, label)
if optimizer_step and self.optimizer is not None:
self.optimizer.zero_grad()
loss.backward()
if self.args['grad_norm']:
torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.args['max_grad_norm'])
self.optimizer.step()
total_loss += loss.item()
if mode == 'train' and (batch_idx + 1) % self.args['log_step'] == 0:
self.logger.info(
f'Train Epoch {epoch}: {batch_idx + 1}/{len(dataloader)} Loss: {loss.item():.6f}')
# 更新 tqdm 的进度
pbar.update(1)
pbar.set_postfix(loss=loss.item())
avg_loss = total_loss / len(dataloader)
self.logger.info(
f'{mode.capitalize()} Epoch {epoch}: average Loss: {avg_loss:.6f}, time: {time.time() - epoch_time:.2f} s')
return avg_loss
def train_epoch(self, epoch):
return self._run_epoch(epoch, self.train_loader, 'train')
def val_epoch(self, epoch):
return self._run_epoch(epoch, self.val_loader or self.test_loader, 'val')
def test_epoch(self, epoch):
return self._run_epoch(epoch, self.test_loader, 'test')
def train(self):
best_model, best_test_model = None, None
best_loss, best_test_loss = float('inf'), float('inf')
not_improved_count = 0
self.logger.info("Training process started")
for epoch in range(1, self.args['epochs'] + 1):
train_epoch_loss = self.train_epoch(epoch)
val_epoch_loss = self.val_epoch(epoch)
test_epoch_loss = self.test_epoch(epoch)
if train_epoch_loss > 1e6:
self.logger.warning('Gradient explosion detected. Ending...')
break
if val_epoch_loss < best_loss:
best_loss = val_epoch_loss
not_improved_count = 0
best_model = copy.deepcopy(self.model.state_dict())
self.logger.info('Best validation model saved!')
else:
not_improved_count += 1
if self.args['early_stop'] and not_improved_count == self.args['early_stop_patience']:
self.logger.info(
f"Validation performance didn't improve for {self.args['early_stop_patience']} epochs. Training stops.")
break
if test_epoch_loss < best_test_loss:
best_test_loss = test_epoch_loss
best_test_model = copy.deepcopy(self.model.state_dict())
if not self.args['debug']:
torch.save(best_model, self.best_path)
torch.save(best_test_model, self.best_test_path)
self.logger.info(f"Best models saved at {self.best_path} and {self.best_test_path}")
self._finalize_training(best_model, best_test_model)
def _finalize_training(self, best_model, best_test_model):
self.model.load_state_dict(best_model)
self.logger.info("Testing on best validation model")
self.test(self.model, self.args, self.test_loader, self.scaler, self.logger)
self.model.load_state_dict(best_test_model)
self.logger.info("Testing on best test model")
self.test(self.model, self.args, self.test_loader, self.scaler, self.logger)
@staticmethod
def test(model, args, data_loader, scaler, logger, path=None):
if path:
checkpoint = torch.load(path)
model.load_state_dict(checkpoint['state_dict'])
model.to(args['device'])
model.eval()
y_pred, y_true = [], []
with torch.no_grad():
for data, target in data_loader:
label = target[..., :args['output_dim']]
output = model(data)
y_pred.append(output)
y_true.append(label)
if args['real_value']:
y_pred = scaler.inverse_transform(torch.cat(y_pred, dim=0))
else:
y_pred = torch.cat(y_pred, dim=0)
y_true = torch.cat(y_true, dim=0)
# 你在这里需要把y_pred和y_true保存下来
# torch.save(y_pred, "./test/PEMS07/y_pred_D.pt") # [3566,12,170,1]
# torch.save(y_true, "./test/PEMS08/y_true.pt") # [3566,12,170,1]
for t in range(y_true.shape[1]):
mae, rmse, mape = all_metrics(y_pred[:, t, ...], y_true[:, t, ...],
args['mae_thresh'], args['mape_thresh'])
logger.info(f"Horizon {t + 1:02d}, MAE: {mae:.4f}, RMSE: {rmse:.4f}, MAPE: {mape:.4f}")
mae, rmse, mape = all_metrics(y_pred, y_true, args['mae_thresh'], args['mape_thresh'])
logger.info(f"Average Horizon, MAE: {mae:.4f}, RMSE: {rmse:.4f}, MAPE: {mape:.4f}")
@staticmethod
def _compute_sampling_threshold(global_step, k):
return k / (k + math.exp(global_step / k))