TrafficWheel/lib/LargeST.py

272 lines
9.2 KiB
Python
Executable File

import pickle
import torch
import numpy as np
import os
import gc
# ! X shape: (B, T, N, C)
def load_pkl(pickle_file: str) -> object:
"""
Load data from a pickle file.
Args:
pickle_file (str): Path to the pickle file.
Returns:
object: Loaded object from the pickle file.
"""
try:
with open(pickle_file, "rb") as f:
pickle_data = pickle.load(f)
except UnicodeDecodeError:
with open(pickle_file, "rb") as f:
pickle_data = pickle.load(f, encoding="latin1")
except Exception as e:
print(f"Unable to load data from {pickle_file}: {e}")
raise
return pickle_data
def get_dataloaders_from_index_data(
data_dir, tod=False, dow=False, batch_size=64, log=None, train_size=0.6
):
data = np.load(os.path.join(data_dir, "data.npz"))["data"].astype(np.float32)
features = [0]
if tod:
features.append(1)
if dow:
features.append(2)
# if dom:
# features.append(3)
data = data[..., features]
index = np.load(os.path.join(data_dir, "index.npz"))
train_index = index["train"] # (num_samples, 3)
val_index = index["val"]
test_index = index["test"]
x_train_index = vrange(train_index[:, 0], train_index[:, 1])
y_train_index = vrange(train_index[:, 1], train_index[:, 2])
x_val_index = vrange(val_index[:, 0], val_index[:, 1])
y_val_index = vrange(val_index[:, 1], val_index[:, 2])
x_test_index = vrange(test_index[:, 0], test_index[:, 1])
y_test_index = vrange(test_index[:, 1], test_index[:, 2])
x_train = data[x_train_index]
y_train = data[y_train_index][..., :1]
x_val = data[x_val_index]
y_val = data[y_val_index][..., :1]
x_test = data[x_test_index]
y_test = data[y_test_index][..., :1]
scaler = StandardScaler(mean=x_train[..., 0].mean(), std=x_train[..., 0].std())
x_train[..., 0] = scaler.transform(x_train[..., 0])
x_val[..., 0] = scaler.transform(x_val[..., 0])
x_test[..., 0] = scaler.transform(x_test[..., 0])
print_log(f"Trainset:\tx-{x_train.shape}\ty-{y_train.shape}", log=log)
print_log(f"Valset: \tx-{x_val.shape} \ty-{y_val.shape}", log=log)
print_log(f"Testset:\tx-{x_test.shape}\ty-{y_test.shape}", log=log)
trainset = torch.utils.data.TensorDataset(
torch.FloatTensor(x_train), torch.FloatTensor(y_train)
)
valset = torch.utils.data.TensorDataset(
torch.FloatTensor(x_val), torch.FloatTensor(y_val)
)
testset = torch.utils.data.TensorDataset(
torch.FloatTensor(x_test), torch.FloatTensor(y_test)
)
if train_size != 0.6:
drop_last = True
else:
drop_last = False
trainset_loader = torch.utils.data.DataLoader(
trainset, batch_size=batch_size, shuffle=True, drop_last=drop_last
)
valset_loader = torch.utils.data.DataLoader(
valset, batch_size=batch_size, shuffle=False, drop_last=drop_last
)
testset_loader = torch.utils.data.DataLoader(
testset, batch_size=batch_size, shuffle=False, drop_last=drop_last
)
return trainset_loader, valset_loader, testset_loader, scaler
def get_dataloaders_from_index_data_MTS(
data_dir,
in_steps=12,
out_steps=12,
tod=False,
dow=False,
y_tod=False,
y_dow=False,
batch_size=64,
log=None,
):
data = np.load(os.path.join(data_dir, f"data.npz"))["data"].astype(np.float32)
index = np.load(os.path.join(data_dir, f"index_{in_steps}_{out_steps}.npz"))
x_features = [0]
if tod:
x_features.append(1)
if dow:
x_features.append(2)
y_features = [0]
if y_tod:
y_features.append(1)
if y_dow:
y_features.append(2)
train_index = index["train"] # (num_samples, 3)
val_index = index["val"]
test_index = index["test"]
# Parallel
# x_train_index = vrange(train_index[:, 0], train_index[:, 1])
# y_train_index = vrange(train_index[:, 1], train_index[:, 2])
# x_val_index = vrange(val_index[:, 0], val_index[:, 1])
# y_val_index = vrange(val_index[:, 1], val_index[:, 2])
# x_test_index = vrange(test_index[:, 0], test_index[:, 1])
# y_test_index = vrange(test_index[:, 1], test_index[:, 2])
# x_train = data[x_train_index][..., x_features]
# y_train = data[y_train_index][..., y_features]
# x_val = data[x_val_index][..., x_features]
# y_val = data[y_val_index][..., y_features]
# x_test = data[x_test_index][..., x_features]
# y_test = data[y_test_index][..., y_features]
# Iterative
x_train = np.stack([data[idx[0] : idx[1]] for idx in train_index])[..., x_features]
y_train = np.stack([data[idx[1] : idx[2]] for idx in train_index])[..., y_features]
x_val = np.stack([data[idx[0] : idx[1]] for idx in val_index])[..., x_features]
y_val = np.stack([data[idx[1] : idx[2]] for idx in val_index])[..., y_features]
x_test = np.stack([data[idx[0] : idx[1]] for idx in test_index])[..., x_features]
y_test = np.stack([data[idx[1] : idx[2]] for idx in test_index])[..., y_features]
scaler = StandardScaler(mean=x_train[..., 0].mean(), std=x_train[..., 0].std())
x_train[..., 0] = scaler.transform(x_train[..., 0])
x_val[..., 0] = scaler.transform(x_val[..., 0])
x_test[..., 0] = scaler.transform(x_test[..., 0])
print_log(f"Trainset:\tx-{x_train.shape}\ty-{y_train.shape}", log=log)
print_log(f"Valset: \tx-{x_val.shape} \ty-{y_val.shape}", log=log)
print_log(f"Testset:\tx-{x_test.shape}\ty-{y_test.shape}", log=log)
trainset = torch.utils.data.TensorDataset(
torch.FloatTensor(x_train), torch.FloatTensor(y_train)
)
valset = torch.utils.data.TensorDataset(
torch.FloatTensor(x_val), torch.FloatTensor(y_val)
)
testset = torch.utils.data.TensorDataset(
torch.FloatTensor(x_test), torch.FloatTensor(y_test)
)
trainset_loader = torch.utils.data.DataLoader(
trainset, batch_size=batch_size, shuffle=True
)
valset_loader = torch.utils.data.DataLoader(
valset, batch_size=batch_size, shuffle=False
)
testset_loader = torch.utils.data.DataLoader(
testset, batch_size=batch_size, shuffle=False
)
return trainset_loader, valset_loader, testset_loader, scaler
def get_dataloaders_from_index_data_Test(
data_dir,
in_steps=12,
out_steps=12,
tod=False,
dow=False,
y_tod=False,
y_dow=False,
batch_size=64,
log=None,
):
data = np.load(os.path.join(data_dir, f"data.npz"))["data"].astype(np.float32)
index = np.load(os.path.join(data_dir, f"index_{in_steps}_{out_steps}.npz"))
x_features = [0]
if tod:
x_features.append(1)
if dow:
x_features.append(2)
y_features = [0]
if y_tod:
y_features.append(1)
if y_dow:
y_features.append(2)
train_index = index["train"] # (num_samples, 3)
# val_index = index["val"]
test_index = index["test"]
# Parallel
# x_train_index = vrange(train_index[:, 0], train_index[:, 1])
# y_train_index = vrange(train_index[:, 1], train_index[:, 2])
# x_val_index = vrange(val_index[:, 0], val_index[:, 1])
# y_val_index = vrange(val_index[:, 1], val_index[:, 2])
# x_test_index = vrange(test_index[:, 0], test_index[:, 1])
# y_test_index = vrange(test_index[:, 1], test_index[:, 2])
# x_train = data[x_train_index][..., x_features]
# y_train = data[y_train_index][..., y_features]
# x_val = data[x_val_index][..., x_features]
# y_val = data[y_val_index][..., y_features]
# x_test = data[x_test_index][..., x_features]
# y_test = data[y_test_index][..., y_features]
# Iterative
x_train = np.stack([data[idx[0] : idx[1]] for idx in train_index])[..., x_features]
# y_train = np.stack([data[idx[1] : idx[2]] for idx in train_index])[..., y_features]
# x_val = np.stack([data[idx[0] : idx[1]] for idx in val_index])[..., x_features]
# y_val = np.stack([data[idx[1] : idx[2]] for idx in val_index])[..., y_features]
x_test = np.stack([data[idx[0] : idx[1]] for idx in test_index])[..., x_features]
y_test = np.stack([data[idx[1] : idx[2]] for idx in test_index])[..., y_features]
scaler = StandardScaler(mean=x_train[..., 0].mean(), std=x_train[..., 0].std())
# x_train[..., 0] = scaler.transform(x_train[..., 0])
# x_val[..., 0] = scaler.transform(x_val[..., 0])
x_test[..., 0] = scaler.transform(x_test[..., 0])
# print_log(f"Trainset:\tx-{x_train.shape}\ty-{y_train.shape}", log=log)
# print_log(f"Valset: \tx-{x_val.shape} \ty-{y_val.shape}", log=log)
print_log(f"Testset:\tx-{x_test.shape}\ty-{y_test.shape}", log=log)
# trainset = torch.utils.data.TensorDataset(
# torch.FloatTensor(x_train), torch.FloatTensor(y_train)
# )
# valset = torch.utils.data.TensorDataset(
# torch.FloatTensor(x_val), torch.FloatTensor(y_val)
# )
testset = torch.utils.data.TensorDataset(
torch.FloatTensor(x_test), torch.FloatTensor(y_test)
)
# trainset_loader = torch.utils.data.DataLoader(
# trainset, batch_size=batch_size, shuffle=True
# )
# valset_loader = torch.utils.data.DataLoader(
# valset, batch_size=batch_size, shuffle=False
# )
testset_loader = torch.utils.data.DataLoader(
testset, batch_size=batch_size, shuffle=False
)
return testset_loader, scaler