From 4f7fb527074dd6f3bfa9614bb72efbcd7ce9911e Mon Sep 17 00:00:00 2001 From: czzhangheng Date: Thu, 8 May 2025 22:43:33 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0EXP=E6=A8=A1=E5=9E=8B?= =?UTF-8?q?=EF=BC=8C=E6=B7=BB=E5=8A=A0=E5=91=A8=E6=9C=9F=E6=80=A7=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=A4=84=E7=90=86=E5=92=8C=E6=97=B6=E9=97=B4=E7=89=B9?= =?UTF-8?q?=E5=BE=81=EF=BC=8C=E4=BC=98=E5=8C=96=E6=95=B0=E6=8D=AE=E5=8A=A0?= =?UTF-8?q?=E8=BD=BD=E5=99=A8=E5=92=8C=E8=AE=AD=E7=BB=83=E5=99=A8=EF=BC=8C?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E6=96=B0=E7=9A=84EXP32=E6=A8=A1=E5=9E=8B?= =?UTF-8?q?=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/EXP/PEMSD4.yaml | 3 + dataloader/EXPdataloader.py | 99 ++++++++++---------- dataloader/loader_selector.py | 2 + model/EXP/EXP32.py | 168 ++++++++++++++++++++++++++++++++++ model/model_selector.py | 2 +- trainer/E32Trainer.py | 8 +- trainer/trainer_selector.py | 2 +- 7 files changed, 230 insertions(+), 54 deletions(-) diff --git a/config/EXP/PEMSD4.yaml b/config/EXP/PEMSD4.yaml index 163d2ca..8c15601 100755 --- a/config/EXP/PEMSD4.yaml +++ b/config/EXP/PEMSD4.yaml @@ -12,12 +12,14 @@ data: add_day_in_week: True steps_per_day: 288 days_per_week: 7 + cycle: 288 model: batch_size: 64 input_dim: 1 output_dim: 1 in_len: 12 + cycle_len: 288 train: @@ -36,6 +38,7 @@ train: max_grad_norm: 5 real_value: True + test: mae_thresh: null mape_thresh: 0.0 diff --git a/dataloader/EXPdataloader.py b/dataloader/EXPdataloader.py index ad6d8e5..5c5e97c 100755 --- a/dataloader/EXPdataloader.py +++ b/dataloader/EXPdataloader.py @@ -1,94 +1,95 @@ -from lib.normalization import normalize_dataset - import numpy as np import gc import os import torch import h5py +from lib.normalization import normalize_dataset def get_dataloader(args, normalizer='std', single=True): - data = load_st_dataset(args['type'], args['sample']) # 加载数据 - L, N, F = data.shape # 数据形状 + # args should now include 'cycle' + data = load_st_dataset(args['type'], args['sample']) # [T, N, F] + L, N, F = data.shape - # Step 1: data -> x,y + # compute cycle index + cycle_arr = np.arange(L) % args['cycle'] # length-L array + + # Step 1: sliding windows for X and Y x = add_window_x(data, args['lag'], args['horizon'], single) y = add_window_y(data, args['lag'], args['horizon'], single) + # window count = M = L - lag - horizon + 1 + M = x.shape[0] - del data - gc.collect() - - # Step 2: time_in_day, day_in_week -> day, week - time_in_day = [i % args['steps_per_day'] / args['steps_per_day'] for i in range(L)] - time_in_day = np.tile(np.array(time_in_day), [1, N, 1]).transpose((2, 1, 0)) - day_in_week = [(i // args['steps_per_day']) % args['days_per_week'] for i in range(L)] - day_in_week = np.tile(np.array(day_in_week), [1, N, 1]).transpose((2, 1, 0)) + # Step 2: time features + time_in_day = np.tile( + np.array([i % args['steps_per_day'] / args['steps_per_day'] for i in range(L)]), + (N, 1) + ).T.reshape(L, N, 1) + day_in_week = np.tile( + np.array([(i // args['steps_per_day']) % args['days_per_week'] for i in range(L)]), + (N, 1) + ).T.reshape(L, N, 1) x_day = add_window_x(time_in_day, args['lag'], args['horizon'], single) x_week = add_window_x(day_in_week, args['lag'], args['horizon'], single) - - # Step 3 day, week, x, y --> x, y x = np.concatenate([x, x_day, x_week], axis=-1) + # del x_day, x_week + # gc.collect() - del x_day, x_week - gc.collect() + # Step 3: extract cycle index per window: take value at end of sequence + cycle_win = np.array([cycle_arr[i + args['lag']] for i in range(M)]) # shape [M] - # Step 4 x,y --> x_train, x_val, x_test, y_train, y_val, y_test + # Step 4: split into train/val/test if args['test_ratio'] > 1: x_train, x_val, x_test = split_data_by_days(x, args['val_ratio'], args['test_ratio']) + y_train, y_val, y_test = split_data_by_days(y, args['val_ratio'], args['test_ratio']) + c_train, c_val, c_test = split_data_by_days(cycle_win, args['val_ratio'], args['test_ratio']) else: x_train, x_val, x_test = split_data_by_ratio(x, args['val_ratio'], args['test_ratio']) + y_train, y_val, y_test = split_data_by_ratio(y, args['val_ratio'], args['test_ratio']) + c_train, c_val, c_test = split_data_by_ratio(cycle_win, args['val_ratio'], args['test_ratio']) + # del x, y, cycle_win + # gc.collect() - del x - gc.collect() - - # Normalization + # Step 5: normalization on X only scaler = normalize_dataset(x_train[..., :args['input_dim']], normalizer, args['column_wise']) x_train[..., :args['input_dim']] = scaler.transform(x_train[..., :args['input_dim']]) x_val[..., :args['input_dim']] = scaler.transform(x_val[..., :args['input_dim']]) x_test[..., :args['input_dim']] = scaler.transform(x_test[..., :args['input_dim']]) - + # add time features to Y y_day = add_window_y(time_in_day, args['lag'], args['horizon'], single) y_week = add_window_y(day_in_week, args['lag'], args['horizon'], single) - - del time_in_day, day_in_week - gc.collect() - y = np.concatenate([y, y_day, y_week], axis=-1) + # del y_day, y_week, time_in_day, day_in_week + # gc.collect() - del y_day, y_week - gc.collect() - - # Split Y + # split Y time-augmented if args['test_ratio'] > 1: y_train, y_val, y_test = split_data_by_days(y, args['val_ratio'], args['test_ratio']) else: y_train, y_val, y_test = split_data_by_ratio(y, args['val_ratio'], args['test_ratio']) + # del y - del y - gc.collect() + # Step 6: create dataloaders including cycle index + train_loader = data_loader_with_cycle(x_train, y_train, c_train, args['batch_size'], shuffle=True, drop_last=True) + val_loader = data_loader_with_cycle(x_val, y_val, c_val, args['batch_size'], shuffle=False, drop_last=True) + test_loader = data_loader_with_cycle(x_test, y_test, c_test, args['batch_size'], shuffle=False, drop_last=False) - # Step 5: x_train y_train x_val y_val x_test y_test --> train val test - # train_dataloader = data_loader(x_train[..., :args['input_dim']], y_train[..., :args['input_dim']], args['batch_size'], shuffle=True, drop_last=True) - train_dataloader = data_loader(x_train, y_train, args['batch_size'], shuffle=True, drop_last=True) + return train_loader, val_loader, test_loader, scaler - del x_train, y_train - gc.collect() - # val_dataloader = data_loader(x_val[..., :args['input_dim']], y_val[..., :args['input_dim']], args['batch_size'], shuffle=False, drop_last=True) - val_dataloader = data_loader(x_val, y_val, args['batch_size'], shuffle=False, drop_last=True) +def data_loader_with_cycle(X, Y, C, batch_size, shuffle=True, drop_last=True): + device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + X_t = torch.tensor(X, dtype=torch.float32, device=device) + Y_t = torch.tensor(Y, dtype=torch.float32, device=device) + C_t = torch.tensor(C, dtype=torch.long, device=device).unsqueeze(-1) # [B,1] + dataset = torch.utils.data.TensorDataset(X_t, Y_t, C_t) + loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last) + return loader - del x_val, y_val - gc.collect() +# Rest of the helper functions (load_st_dataset, split_data..., add_window_x/y) unchanged - # test_dataloader = data_loader(x_test[..., :args['input_dim']], y_test[..., :args['input_dim']], args['batch_size'], shuffle=False, drop_last=False) - test_dataloader = data_loader(x_test, y_test, args['batch_size'], shuffle=False, drop_last=False) - - del x_test, y_test - gc.collect() - - return train_dataloader, val_dataloader, test_dataloader, scaler def load_st_dataset(dataset, sample): # output B, N, D diff --git a/dataloader/loader_selector.py b/dataloader/loader_selector.py index 030fcee..eea8d60 100755 --- a/dataloader/loader_selector.py +++ b/dataloader/loader_selector.py @@ -1,11 +1,13 @@ from dataloader.cde_loader.cdeDataloader import get_dataloader as cde_loader from dataloader.PeMSDdataloader import get_dataloader as normal_loader from dataloader.DCRNNdataloader import get_dataloader as DCRNN_loader +from dataloader.EXPdataloader import get_dataloader as EXP_loader def get_dataloader(config, normalizer, single): match config['model']['type']: case 'STGNCDE': return cde_loader(config['data'], normalizer, single) case 'DCRNN': return DCRNN_loader(config['data'], normalizer, single) + case 'EXP': return EXP_loader(config['data'], normalizer, single) case _: return normal_loader(config['data'], normalizer, single) diff --git a/model/EXP/EXP32.py b/model/EXP/EXP32.py index e69de29..d9ef5b0 100644 --- a/model/EXP/EXP32.py +++ b/model/EXP/EXP32.py @@ -0,0 +1,168 @@ +import math +import torch +import torch.nn as nn +import torch.nn.functional as F + +# ------------------------- CycleNet Component ------------------------- +class RecurrentCycle(nn.Module): + """Efficient cyclic data removal/addition.""" + def __init__(self, cycle_len, channel_size): + super().__init__() + self.cycle_len = cycle_len + self.channel_size = channel_size + # 初始化周期缓存:shape (cycle_len, channel_size) + self.data = nn.Parameter(torch.zeros(cycle_len, channel_size)) + + def forward(self, index, length): + # index: (B,), length: seq_len 或 pred_len + B = index.size(0) + # 生成 [0,1,...,length-1] 的偏移,shape (1, length) + arange = torch.arange(length, device=index.device).unsqueeze(0) + # 对每条样本的起始 index 加 arange 并对 cycle_len 取模 + idx = (index.unsqueeze(1) + arange) % self.cycle_len # (B, length) + # 返回对应的周期值 (B, length, channel_size) + return self.data[idx] + +# ------------------------- Core Blocks ------------------------- +class DynamicGraphConstructor(nn.Module): + def __init__(self, node_num, embed_dim): + super().__init__() + self.nodevec1 = nn.Parameter(torch.randn(node_num, embed_dim)) + self.nodevec2 = nn.Parameter(torch.randn(node_num, embed_dim)) + + def forward(self): + adj = F.relu(torch.matmul(self.nodevec1, self.nodevec2.T)) + return F.softmax(adj, dim=-1) + +class GraphConvBlock(nn.Module): + def __init__(self, input_dim, output_dim): + super().__init__() + self.theta = nn.Linear(input_dim, output_dim) + self.residual = (input_dim == output_dim) + if not self.residual: + self.res_proj = nn.Linear(input_dim, output_dim) + + def forward(self, x, adj): + res = x + x = torch.matmul(adj, x) + x = self.theta(x) + if not self.residual: + res = self.res_proj(res) + return F.relu(x + res) + +class MANBA_Block(nn.Module): + def __init__(self, input_dim, hidden_dim): + super().__init__() + self.attn = nn.MultiheadAttention(embed_dim=input_dim, num_heads=4, batch_first=True) + self.ffn = nn.Sequential( + nn.Linear(input_dim, hidden_dim), + nn.ReLU(), + nn.Linear(hidden_dim, input_dim) + ) + self.norm1 = nn.LayerNorm(input_dim) + self.norm2 = nn.LayerNorm(input_dim) + + def forward(self, x): + res = x + x_attn, _ = self.attn(x, x, x) + x = self.norm1(res + x_attn) + res2 = x + x_ffn = self.ffn(x) + return self.norm2(res2 + x_ffn) + +class SandwichBlock(nn.Module): + def __init__(self, num_nodes, embed_dim, hidden_dim): + super().__init__() + self.manba1 = MANBA_Block(hidden_dim, hidden_dim * 2) + self.graph_constructor = DynamicGraphConstructor(num_nodes, embed_dim) + self.gc = GraphConvBlock(hidden_dim, hidden_dim) + self.manba2 = MANBA_Block(hidden_dim, hidden_dim * 2) + + def forward(self, h): + h1 = self.manba1(h) + adj = self.graph_constructor() + h2 = self.gc(h1, adj) + return self.manba2(h2) + +class MLP(nn.Module): + def __init__(self, in_dim, hidden_dims, out_dim, activation=nn.ReLU): + super().__init__() + dims = [in_dim] + hidden_dims + [out_dim] + layers = [] + for i in range(len(dims) - 2): + layers += [nn.Linear(dims[i], dims[i+1]), activation()] + layers.append(nn.Linear(dims[-2], dims[-1])) + self.net = nn.Sequential(*layers) + + def forward(self, x): + return self.net(x) + +# ------------------------- EXP with CycleNet ------------------------- +class EXP(nn.Module): + def __init__(self, args): + super().__init__() + self.horizon = args['horizon'] # 预测步长 + self.output_dim = args['output_dim'] # 输出维度 (一般=1) + self.seq_len = args.get('in_len', 12) # 输入序列长度 + self.hidden_dim = args.get('hidden_dim', 64) + self.num_nodes = args['num_nodes'] + self.embed_dim = args.get('embed_dim', 16) + + # 时间嵌入 + self.time_slots = args.get('time_slots', 288) + self.time_embedding = nn.Embedding(self.time_slots, self.hidden_dim) + self.day_embedding = nn.Embedding(7, self.hidden_dim) + + # CycleNet + self.cycleQueue = RecurrentCycle(cycle_len=args['cycle_len'], channel_size=self.num_nodes) + + # 输入投影 (序列长度 -> 隐藏维度) + self.input_proj = MLP(self.seq_len, [self.hidden_dim], self.hidden_dim) + + # 两层 Sandwich + self.sandwich1 = SandwichBlock(self.num_nodes, self.embed_dim, self.hidden_dim) + self.sandwich2 = SandwichBlock(self.num_nodes, self.embed_dim, self.hidden_dim) + + # 输出投影 + self.out_proj = MLP(self.hidden_dim, [2*self.hidden_dim], self.horizon * self.output_dim) + + def forward(self, x, cycle_index): + # x: (B, T, N, D>=3) + # 1) 拆流量和时间特征,保证丢掉通道维 + x_flow = x[..., 0] # -> (B, T, N) or (B, T, N, 1) 如果之前切片错用了0:1 + x_time = x[..., 1] + x_day = x[..., 2] + + B, T, N = x_flow.shape + # DEBUG 打印(可删除) + # print("DEBUG x_flow.dim(), shape:", x_flow.dim(), x_flow.shape) + + # 2) 去周期化 + cyc = self.cycleQueue(cycle_index, T).squeeze(1) # (B, T, N) + x_flow = x_flow - cyc + + # 3) 序列投影 + h0 = x_flow.permute(0, 2, 1).reshape(B * N, T) # -> (B*N, T) + h0 = self.input_proj(h0).view(B, N, self.hidden_dim) + + # 4) 加时间嵌入 + t_idx = (x_time[:, -1] * (self.time_slots - 1)).long() # (B, N) + d_idx = x_day[:, -1].long() # (B, N) + h0 = h0 + self.time_embedding(t_idx) + self.day_embedding(d_idx) + + # 5) Sandwich Blocks + h1 = self.sandwich1(h0) + h0 + h2 = self.sandwich2(h1) + + # 6) 输出投影并 reshape + out = self.out_proj(h2) # (B, N, H*O) + out = out.view(B, N, self.horizon, self.output_dim) # (B, N, H, O) + out = out.permute(0, 2, 1, 3) # (B, H, N, O) + + # 加回周期 + idx_out = (cycle_index + self.seq_len) % self.cycleQueue.cycle_len + cyc_out = self.cycleQueue(idx_out, self.horizon) # (B, 1, H, N) + # squeeze 掉第1维并 unsqueeze 最后一维 + cyc_out = cyc_out.squeeze(1).unsqueeze(-1) # (B, H, N, 1) + # 加回周期分量 + return out + cyc_out diff --git a/model/model_selector.py b/model/model_selector.py index ffe79b3..05e7ee3 100755 --- a/model/model_selector.py +++ b/model/model_selector.py @@ -15,7 +15,7 @@ from model.STGODE.STGODE import ODEGCN from model.PDG2SEQ.PDG2Seqb import PDG2Seq from model.STID.STID import STID from model.STAEFormer.STAEFormer import STAEformer -from model.EXP.EXP31 import EXP as EXP +from model.EXP.EXP32 import EXP as EXP def model_selector(model): match model['type']: diff --git a/trainer/E32Trainer.py b/trainer/E32Trainer.py index 8aa2bfe..1a8d062 100644 --- a/trainer/E32Trainer.py +++ b/trainer/E32Trainer.py @@ -39,6 +39,7 @@ class Trainer: is_train = (mode == 'train') self.model.train() if is_train else self.model.eval() total_loss = 0.0 + epoch_time = time.time() with torch.set_grad_enabled(is_train), \ tqdm(total=len(dataloader), desc=f'{mode.capitalize()} Epoch {epoch}') as pbar: @@ -83,7 +84,8 @@ class Trainer: pbar.set_postfix(loss=loss.item()) avg_loss = total_loss / len(dataloader) - self.logger.info(f'{mode.capitalize()} Epoch {epoch}: avg Loss: {avg_loss:.6f}') + self.logger.info( + f'{mode.capitalize()} Epoch {epoch}: average Loss: {avg_loss:.6f}, time: {time.time() - epoch_time:.2f} s') return avg_loss def train_epoch(self, epoch): @@ -154,9 +156,9 @@ class Trainer: y_pred, y_true = [], [] with torch.no_grad(): - for data, target in data_loader: + for data, target, cycle_index in data_loader: label = target[..., :args['output_dim']] - output = model(data) + output = model(data, cycle_index) y_pred.append(output) y_true.append(label) diff --git a/trainer/trainer_selector.py b/trainer/trainer_selector.py index a0f8e87..f7b48b6 100755 --- a/trainer/trainer_selector.py +++ b/trainer/trainer_selector.py @@ -2,7 +2,7 @@ from trainer.Trainer import Trainer from trainer.cdeTrainer.cdetrainer import Trainer as cdeTrainer from trainer.DCRNN_Trainer import Trainer as DCRNN_Trainer from trainer.PDG2SEQ_Trainer import Trainer as PDG2SEQ_Trainer -from trainer.EXP_trainer import Trainer as EXP_Trainer +from trainer.E32Trainer import Trainer as EXP_Trainer def select_trainer(model, loss, optimizer, train_loader, val_loader, test_loader, scaler, args,