更新EXP模型,添加周期性数据处理和时间特征,优化数据加载器和训练器,支持新的EXP32模型结构

This commit is contained in:
czzhangheng 2025-05-08 22:43:33 +08:00
parent 1be0b59344
commit 4f7fb52707
7 changed files with 230 additions and 54 deletions

View File

@ -12,12 +12,14 @@ data:
add_day_in_week: True add_day_in_week: True
steps_per_day: 288 steps_per_day: 288
days_per_week: 7 days_per_week: 7
cycle: 288
model: model:
batch_size: 64 batch_size: 64
input_dim: 1 input_dim: 1
output_dim: 1 output_dim: 1
in_len: 12 in_len: 12
cycle_len: 288
train: train:
@ -36,6 +38,7 @@ train:
max_grad_norm: 5 max_grad_norm: 5
real_value: True real_value: True
test: test:
mae_thresh: null mae_thresh: null
mape_thresh: 0.0 mape_thresh: 0.0

View File

@ -1,94 +1,95 @@
from lib.normalization import normalize_dataset
import numpy as np import numpy as np
import gc import gc
import os import os
import torch import torch
import h5py import h5py
from lib.normalization import normalize_dataset
def get_dataloader(args, normalizer='std', single=True): def get_dataloader(args, normalizer='std', single=True):
data = load_st_dataset(args['type'], args['sample']) # 加载数据 # args should now include 'cycle'
L, N, F = data.shape # 数据形状 data = load_st_dataset(args['type'], args['sample']) # [T, N, F]
L, N, F = data.shape
# Step 1: data -> x,y # compute cycle index
cycle_arr = np.arange(L) % args['cycle'] # length-L array
# Step 1: sliding windows for X and Y
x = add_window_x(data, args['lag'], args['horizon'], single) x = add_window_x(data, args['lag'], args['horizon'], single)
y = add_window_y(data, args['lag'], args['horizon'], single) y = add_window_y(data, args['lag'], args['horizon'], single)
# window count = M = L - lag - horizon + 1
M = x.shape[0]
del data # Step 2: time features
gc.collect() time_in_day = np.tile(
np.array([i % args['steps_per_day'] / args['steps_per_day'] for i in range(L)]),
# Step 2: time_in_day, day_in_week -> day, week (N, 1)
time_in_day = [i % args['steps_per_day'] / args['steps_per_day'] for i in range(L)] ).T.reshape(L, N, 1)
time_in_day = np.tile(np.array(time_in_day), [1, N, 1]).transpose((2, 1, 0)) day_in_week = np.tile(
day_in_week = [(i // args['steps_per_day']) % args['days_per_week'] for i in range(L)] np.array([(i // args['steps_per_day']) % args['days_per_week'] for i in range(L)]),
day_in_week = np.tile(np.array(day_in_week), [1, N, 1]).transpose((2, 1, 0)) (N, 1)
).T.reshape(L, N, 1)
x_day = add_window_x(time_in_day, args['lag'], args['horizon'], single) x_day = add_window_x(time_in_day, args['lag'], args['horizon'], single)
x_week = add_window_x(day_in_week, args['lag'], args['horizon'], single) x_week = add_window_x(day_in_week, args['lag'], args['horizon'], single)
# Step 3 day, week, x, y --> x, y
x = np.concatenate([x, x_day, x_week], axis=-1) x = np.concatenate([x, x_day, x_week], axis=-1)
# del x_day, x_week
# gc.collect()
del x_day, x_week # Step 3: extract cycle index per window: take value at end of sequence
gc.collect() cycle_win = np.array([cycle_arr[i + args['lag']] for i in range(M)]) # shape [M]
# Step 4 x,y --> x_train, x_val, x_test, y_train, y_val, y_test # Step 4: split into train/val/test
if args['test_ratio'] > 1: if args['test_ratio'] > 1:
x_train, x_val, x_test = split_data_by_days(x, args['val_ratio'], args['test_ratio']) x_train, x_val, x_test = split_data_by_days(x, args['val_ratio'], args['test_ratio'])
y_train, y_val, y_test = split_data_by_days(y, args['val_ratio'], args['test_ratio'])
c_train, c_val, c_test = split_data_by_days(cycle_win, args['val_ratio'], args['test_ratio'])
else: else:
x_train, x_val, x_test = split_data_by_ratio(x, args['val_ratio'], args['test_ratio']) x_train, x_val, x_test = split_data_by_ratio(x, args['val_ratio'], args['test_ratio'])
y_train, y_val, y_test = split_data_by_ratio(y, args['val_ratio'], args['test_ratio'])
c_train, c_val, c_test = split_data_by_ratio(cycle_win, args['val_ratio'], args['test_ratio'])
# del x, y, cycle_win
# gc.collect()
del x # Step 5: normalization on X only
gc.collect()
# Normalization
scaler = normalize_dataset(x_train[..., :args['input_dim']], normalizer, args['column_wise']) scaler = normalize_dataset(x_train[..., :args['input_dim']], normalizer, args['column_wise'])
x_train[..., :args['input_dim']] = scaler.transform(x_train[..., :args['input_dim']]) x_train[..., :args['input_dim']] = scaler.transform(x_train[..., :args['input_dim']])
x_val[..., :args['input_dim']] = scaler.transform(x_val[..., :args['input_dim']]) x_val[..., :args['input_dim']] = scaler.transform(x_val[..., :args['input_dim']])
x_test[..., :args['input_dim']] = scaler.transform(x_test[..., :args['input_dim']]) x_test[..., :args['input_dim']] = scaler.transform(x_test[..., :args['input_dim']])
# add time features to Y
y_day = add_window_y(time_in_day, args['lag'], args['horizon'], single) y_day = add_window_y(time_in_day, args['lag'], args['horizon'], single)
y_week = add_window_y(day_in_week, args['lag'], args['horizon'], single) y_week = add_window_y(day_in_week, args['lag'], args['horizon'], single)
del time_in_day, day_in_week
gc.collect()
y = np.concatenate([y, y_day, y_week], axis=-1) y = np.concatenate([y, y_day, y_week], axis=-1)
# del y_day, y_week, time_in_day, day_in_week
# gc.collect()
del y_day, y_week # split Y time-augmented
gc.collect()
# Split Y
if args['test_ratio'] > 1: if args['test_ratio'] > 1:
y_train, y_val, y_test = split_data_by_days(y, args['val_ratio'], args['test_ratio']) y_train, y_val, y_test = split_data_by_days(y, args['val_ratio'], args['test_ratio'])
else: else:
y_train, y_val, y_test = split_data_by_ratio(y, args['val_ratio'], args['test_ratio']) y_train, y_val, y_test = split_data_by_ratio(y, args['val_ratio'], args['test_ratio'])
# del y
del y # Step 6: create dataloaders including cycle index
gc.collect() train_loader = data_loader_with_cycle(x_train, y_train, c_train, args['batch_size'], shuffle=True, drop_last=True)
val_loader = data_loader_with_cycle(x_val, y_val, c_val, args['batch_size'], shuffle=False, drop_last=True)
test_loader = data_loader_with_cycle(x_test, y_test, c_test, args['batch_size'], shuffle=False, drop_last=False)
# Step 5: x_train y_train x_val y_val x_test y_test --> train val test return train_loader, val_loader, test_loader, scaler
# train_dataloader = data_loader(x_train[..., :args['input_dim']], y_train[..., :args['input_dim']], args['batch_size'], shuffle=True, drop_last=True)
train_dataloader = data_loader(x_train, y_train, args['batch_size'], shuffle=True, drop_last=True)
del x_train, y_train
gc.collect()
# val_dataloader = data_loader(x_val[..., :args['input_dim']], y_val[..., :args['input_dim']], args['batch_size'], shuffle=False, drop_last=True) def data_loader_with_cycle(X, Y, C, batch_size, shuffle=True, drop_last=True):
val_dataloader = data_loader(x_val, y_val, args['batch_size'], shuffle=False, drop_last=True) device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
X_t = torch.tensor(X, dtype=torch.float32, device=device)
Y_t = torch.tensor(Y, dtype=torch.float32, device=device)
C_t = torch.tensor(C, dtype=torch.long, device=device).unsqueeze(-1) # [B,1]
dataset = torch.utils.data.TensorDataset(X_t, Y_t, C_t)
loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last)
return loader
del x_val, y_val # Rest of the helper functions (load_st_dataset, split_data..., add_window_x/y) unchanged
gc.collect()
# test_dataloader = data_loader(x_test[..., :args['input_dim']], y_test[..., :args['input_dim']], args['batch_size'], shuffle=False, drop_last=False)
test_dataloader = data_loader(x_test, y_test, args['batch_size'], shuffle=False, drop_last=False)
del x_test, y_test
gc.collect()
return train_dataloader, val_dataloader, test_dataloader, scaler
def load_st_dataset(dataset, sample): def load_st_dataset(dataset, sample):
# output B, N, D # output B, N, D

View File

@ -1,11 +1,13 @@
from dataloader.cde_loader.cdeDataloader import get_dataloader as cde_loader from dataloader.cde_loader.cdeDataloader import get_dataloader as cde_loader
from dataloader.PeMSDdataloader import get_dataloader as normal_loader from dataloader.PeMSDdataloader import get_dataloader as normal_loader
from dataloader.DCRNNdataloader import get_dataloader as DCRNN_loader from dataloader.DCRNNdataloader import get_dataloader as DCRNN_loader
from dataloader.EXPdataloader import get_dataloader as EXP_loader
def get_dataloader(config, normalizer, single): def get_dataloader(config, normalizer, single):
match config['model']['type']: match config['model']['type']:
case 'STGNCDE': return cde_loader(config['data'], normalizer, single) case 'STGNCDE': return cde_loader(config['data'], normalizer, single)
case 'DCRNN': return DCRNN_loader(config['data'], normalizer, single) case 'DCRNN': return DCRNN_loader(config['data'], normalizer, single)
case 'EXP': return EXP_loader(config['data'], normalizer, single)
case _: return normal_loader(config['data'], normalizer, single) case _: return normal_loader(config['data'], normalizer, single)

View File

@ -0,0 +1,168 @@
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
# ------------------------- CycleNet Component -------------------------
class RecurrentCycle(nn.Module):
"""Efficient cyclic data removal/addition."""
def __init__(self, cycle_len, channel_size):
super().__init__()
self.cycle_len = cycle_len
self.channel_size = channel_size
# 初始化周期缓存shape (cycle_len, channel_size)
self.data = nn.Parameter(torch.zeros(cycle_len, channel_size))
def forward(self, index, length):
# index: (B,), length: seq_len 或 pred_len
B = index.size(0)
# 生成 [0,1,...,length-1] 的偏移shape (1, length)
arange = torch.arange(length, device=index.device).unsqueeze(0)
# 对每条样本的起始 index 加 arange 并对 cycle_len 取模
idx = (index.unsqueeze(1) + arange) % self.cycle_len # (B, length)
# 返回对应的周期值 (B, length, channel_size)
return self.data[idx]
# ------------------------- Core Blocks -------------------------
class DynamicGraphConstructor(nn.Module):
def __init__(self, node_num, embed_dim):
super().__init__()
self.nodevec1 = nn.Parameter(torch.randn(node_num, embed_dim))
self.nodevec2 = nn.Parameter(torch.randn(node_num, embed_dim))
def forward(self):
adj = F.relu(torch.matmul(self.nodevec1, self.nodevec2.T))
return F.softmax(adj, dim=-1)
class GraphConvBlock(nn.Module):
def __init__(self, input_dim, output_dim):
super().__init__()
self.theta = nn.Linear(input_dim, output_dim)
self.residual = (input_dim == output_dim)
if not self.residual:
self.res_proj = nn.Linear(input_dim, output_dim)
def forward(self, x, adj):
res = x
x = torch.matmul(adj, x)
x = self.theta(x)
if not self.residual:
res = self.res_proj(res)
return F.relu(x + res)
class MANBA_Block(nn.Module):
def __init__(self, input_dim, hidden_dim):
super().__init__()
self.attn = nn.MultiheadAttention(embed_dim=input_dim, num_heads=4, batch_first=True)
self.ffn = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, input_dim)
)
self.norm1 = nn.LayerNorm(input_dim)
self.norm2 = nn.LayerNorm(input_dim)
def forward(self, x):
res = x
x_attn, _ = self.attn(x, x, x)
x = self.norm1(res + x_attn)
res2 = x
x_ffn = self.ffn(x)
return self.norm2(res2 + x_ffn)
class SandwichBlock(nn.Module):
def __init__(self, num_nodes, embed_dim, hidden_dim):
super().__init__()
self.manba1 = MANBA_Block(hidden_dim, hidden_dim * 2)
self.graph_constructor = DynamicGraphConstructor(num_nodes, embed_dim)
self.gc = GraphConvBlock(hidden_dim, hidden_dim)
self.manba2 = MANBA_Block(hidden_dim, hidden_dim * 2)
def forward(self, h):
h1 = self.manba1(h)
adj = self.graph_constructor()
h2 = self.gc(h1, adj)
return self.manba2(h2)
class MLP(nn.Module):
def __init__(self, in_dim, hidden_dims, out_dim, activation=nn.ReLU):
super().__init__()
dims = [in_dim] + hidden_dims + [out_dim]
layers = []
for i in range(len(dims) - 2):
layers += [nn.Linear(dims[i], dims[i+1]), activation()]
layers.append(nn.Linear(dims[-2], dims[-1]))
self.net = nn.Sequential(*layers)
def forward(self, x):
return self.net(x)
# ------------------------- EXP with CycleNet -------------------------
class EXP(nn.Module):
def __init__(self, args):
super().__init__()
self.horizon = args['horizon'] # 预测步长
self.output_dim = args['output_dim'] # 输出维度 (一般=1)
self.seq_len = args.get('in_len', 12) # 输入序列长度
self.hidden_dim = args.get('hidden_dim', 64)
self.num_nodes = args['num_nodes']
self.embed_dim = args.get('embed_dim', 16)
# 时间嵌入
self.time_slots = args.get('time_slots', 288)
self.time_embedding = nn.Embedding(self.time_slots, self.hidden_dim)
self.day_embedding = nn.Embedding(7, self.hidden_dim)
# CycleNet
self.cycleQueue = RecurrentCycle(cycle_len=args['cycle_len'], channel_size=self.num_nodes)
# 输入投影 (序列长度 -> 隐藏维度)
self.input_proj = MLP(self.seq_len, [self.hidden_dim], self.hidden_dim)
# 两层 Sandwich
self.sandwich1 = SandwichBlock(self.num_nodes, self.embed_dim, self.hidden_dim)
self.sandwich2 = SandwichBlock(self.num_nodes, self.embed_dim, self.hidden_dim)
# 输出投影
self.out_proj = MLP(self.hidden_dim, [2*self.hidden_dim], self.horizon * self.output_dim)
def forward(self, x, cycle_index):
# x: (B, T, N, D>=3)
# 1) 拆流量和时间特征,保证丢掉通道维
x_flow = x[..., 0] # -> (B, T, N) or (B, T, N, 1) 如果之前切片错用了0:1
x_time = x[..., 1]
x_day = x[..., 2]
B, T, N = x_flow.shape
# DEBUG 打印(可删除)
# print("DEBUG x_flow.dim(), shape:", x_flow.dim(), x_flow.shape)
# 2) 去周期化
cyc = self.cycleQueue(cycle_index, T).squeeze(1) # (B, T, N)
x_flow = x_flow - cyc
# 3) 序列投影
h0 = x_flow.permute(0, 2, 1).reshape(B * N, T) # -> (B*N, T)
h0 = self.input_proj(h0).view(B, N, self.hidden_dim)
# 4) 加时间嵌入
t_idx = (x_time[:, -1] * (self.time_slots - 1)).long() # (B, N)
d_idx = x_day[:, -1].long() # (B, N)
h0 = h0 + self.time_embedding(t_idx) + self.day_embedding(d_idx)
# 5) Sandwich Blocks
h1 = self.sandwich1(h0) + h0
h2 = self.sandwich2(h1)
# 6) 输出投影并 reshape
out = self.out_proj(h2) # (B, N, H*O)
out = out.view(B, N, self.horizon, self.output_dim) # (B, N, H, O)
out = out.permute(0, 2, 1, 3) # (B, H, N, O)
# 加回周期
idx_out = (cycle_index + self.seq_len) % self.cycleQueue.cycle_len
cyc_out = self.cycleQueue(idx_out, self.horizon) # (B, 1, H, N)
# squeeze 掉第1维并 unsqueeze 最后一维
cyc_out = cyc_out.squeeze(1).unsqueeze(-1) # (B, H, N, 1)
# 加回周期分量
return out + cyc_out

View File

@ -15,7 +15,7 @@ from model.STGODE.STGODE import ODEGCN
from model.PDG2SEQ.PDG2Seqb import PDG2Seq from model.PDG2SEQ.PDG2Seqb import PDG2Seq
from model.STID.STID import STID from model.STID.STID import STID
from model.STAEFormer.STAEFormer import STAEformer from model.STAEFormer.STAEFormer import STAEformer
from model.EXP.EXP31 import EXP as EXP from model.EXP.EXP32 import EXP as EXP
def model_selector(model): def model_selector(model):
match model['type']: match model['type']:

View File

@ -39,6 +39,7 @@ class Trainer:
is_train = (mode == 'train') is_train = (mode == 'train')
self.model.train() if is_train else self.model.eval() self.model.train() if is_train else self.model.eval()
total_loss = 0.0 total_loss = 0.0
epoch_time = time.time()
with torch.set_grad_enabled(is_train), \ with torch.set_grad_enabled(is_train), \
tqdm(total=len(dataloader), desc=f'{mode.capitalize()} Epoch {epoch}') as pbar: tqdm(total=len(dataloader), desc=f'{mode.capitalize()} Epoch {epoch}') as pbar:
@ -83,7 +84,8 @@ class Trainer:
pbar.set_postfix(loss=loss.item()) pbar.set_postfix(loss=loss.item())
avg_loss = total_loss / len(dataloader) avg_loss = total_loss / len(dataloader)
self.logger.info(f'{mode.capitalize()} Epoch {epoch}: avg Loss: {avg_loss:.6f}') self.logger.info(
f'{mode.capitalize()} Epoch {epoch}: average Loss: {avg_loss:.6f}, time: {time.time() - epoch_time:.2f} s')
return avg_loss return avg_loss
def train_epoch(self, epoch): def train_epoch(self, epoch):
@ -154,9 +156,9 @@ class Trainer:
y_pred, y_true = [], [] y_pred, y_true = [], []
with torch.no_grad(): with torch.no_grad():
for data, target in data_loader: for data, target, cycle_index in data_loader:
label = target[..., :args['output_dim']] label = target[..., :args['output_dim']]
output = model(data) output = model(data, cycle_index)
y_pred.append(output) y_pred.append(output)
y_true.append(label) y_true.append(label)

View File

@ -2,7 +2,7 @@ from trainer.Trainer import Trainer
from trainer.cdeTrainer.cdetrainer import Trainer as cdeTrainer from trainer.cdeTrainer.cdetrainer import Trainer as cdeTrainer
from trainer.DCRNN_Trainer import Trainer as DCRNN_Trainer from trainer.DCRNN_Trainer import Trainer as DCRNN_Trainer
from trainer.PDG2SEQ_Trainer import Trainer as PDG2SEQ_Trainer from trainer.PDG2SEQ_Trainer import Trainer as PDG2SEQ_Trainer
from trainer.EXP_trainer import Trainer as EXP_Trainer from trainer.E32Trainer import Trainer as EXP_Trainer
def select_trainer(model, loss, optimizer, train_loader, val_loader, test_loader, scaler, args, def select_trainer(model, loss, optimizer, train_loader, val_loader, test_loader, scaler, args,