222 lines
7.7 KiB
Python
222 lines
7.7 KiB
Python
import datetime
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
|
|
class StandardScaler:
|
|
"""
|
|
Standard the input
|
|
"""
|
|
|
|
def __init__(self, mean, std):
|
|
self.mean = mean
|
|
self.std = std
|
|
|
|
def transform(self, data):
|
|
return (data - self.mean) / self.std
|
|
|
|
def inverse_transform(self, data):
|
|
return (data * self.std) + self.mean
|
|
|
|
|
|
def get_rush_hours_bool_index(df, hours=((7, 10), (17, 20)), weekdays=(0, 5)):
|
|
"""
|
|
Calculates predator of rush hours: 7:00am - 9:59am, 4:00pm-7:59am, Mon-Fri.
|
|
:param df:
|
|
:param hours: a tuple of two, (start_hour, end_hour)
|
|
:param weekdays: a tuple of two, (start_weekday, end_weekday)
|
|
"""
|
|
# Week day.
|
|
weekday_predate = (df.index.dayofweek >= weekdays[0]) & (df.index.dayofweek < weekdays[1])
|
|
# Hours.
|
|
hour_predate = (df.index.time >= datetime.time(hours[0][0], 0)) & (df.index.time < datetime.time(hours[0][1], 0))
|
|
hour_predate |= (df.index.time >= datetime.time(hours[1][0], 0)) & (df.index.time < datetime.time(hours[1][1], 0))
|
|
|
|
return weekday_predate & hour_predate
|
|
|
|
|
|
def generate_io_data(data, seq_len, horizon=1, scaler=None):
|
|
"""
|
|
Generates input, output data which are
|
|
Args:
|
|
:param data: tensor
|
|
:param seq_len: length of the sequence, or timesteps.
|
|
:param horizon: the horizon of prediction.
|
|
:param strides:
|
|
:param scaler:
|
|
:return (X, Y) i.e., input, output
|
|
"""
|
|
xs, ys = [], []
|
|
total_seq_len, _ = data.shape
|
|
assert np.ndim(data) == 2
|
|
if scaler:
|
|
data = scaler.transform(data)
|
|
for i in range(0, total_seq_len - horizon - seq_len + 1):
|
|
x_i = data[i: i + seq_len, :]
|
|
y_i = data[i + seq_len + horizon - 1, :]
|
|
xs.append(x_i)
|
|
ys.append(y_i)
|
|
xs = np.stack(xs, axis=0)
|
|
ys = np.stack(ys, axis=0)
|
|
return xs, ys
|
|
|
|
|
|
def generate_io_data_with_time(df, batch_size, seq_len, horizon, output_type='point', scaler=None,
|
|
add_time_in_day=True, add_day_in_week=False):
|
|
"""
|
|
|
|
:param df:
|
|
:param batch_size:
|
|
:param seq_len:
|
|
:param horizon:
|
|
:param output_type: point, range, seq2seq
|
|
:param scaler:
|
|
:param add_time_in_day:
|
|
:param add_day_in_week:
|
|
:return:
|
|
x, y, both are 3-D tensors with size (epoch_size, batch_size, input_dim).
|
|
"""
|
|
if scaler:
|
|
df = scaler.transform(df)
|
|
num_samples, num_nodes = df.shape
|
|
data = df.values
|
|
batch_len = num_samples // batch_size
|
|
data_list = [data]
|
|
if add_time_in_day:
|
|
time_ind = (df.index.values - df.index.values.astype('datetime64[D]')) / np.timedelta64(1, 'D')
|
|
data_list.append(time_ind.reshape(-1, 1))
|
|
if add_day_in_week:
|
|
day_in_week = np.zeros(shape=(num_samples, 7))
|
|
day_in_week[np.arange(num_samples), df.index.dayofweek] = 1
|
|
data_list.append(day_in_week)
|
|
|
|
data = np.concatenate(data_list, axis=-1)
|
|
data = data[:batch_size * batch_len, :].reshape((batch_size, batch_len, -1))
|
|
xs, ys = [], []
|
|
for i in range(seq_len, batch_len - horizon + 1):
|
|
x_i, y_i = None, None
|
|
if output_type == 'point':
|
|
x_i = data[:, i - seq_len: i, :].reshape((batch_size, -1))
|
|
y_i = data[:, i + horizon - 1, :num_nodes].reshape((batch_size, -1))
|
|
elif output_type == 'range':
|
|
x_i = data[:, i - seq_len: i, :].reshape((batch_size, -1))
|
|
y_i = data[:, i: i + horizon, :num_nodes].reshape((batch_size, -1))
|
|
elif output_type == 'seq2seq':
|
|
x_i = data[:, i - seq_len: i, :]
|
|
y_i = data[:, i: i + horizon, :]
|
|
xs.append(x_i)
|
|
ys.append(y_i)
|
|
xs = np.stack(xs, axis=0)
|
|
ys = np.stack(ys, axis=0)
|
|
return xs, ys
|
|
|
|
|
|
def generate_graph_seq2seq_io_data_with_time(df, batch_size, seq_len, horizon, num_nodes, scaler=None,
|
|
add_time_in_day=True, add_day_in_week=False):
|
|
"""
|
|
|
|
:param df:
|
|
:param batch_size:
|
|
:param seq_len:
|
|
:param horizon:
|
|
:param scaler:
|
|
:param add_day_in_week:
|
|
:return:
|
|
x, y, both are 5-D tensors with size (epoch_size, batch_size, seq_len, num_sensors, input_dim).
|
|
Adjacent batches are continuous sequence, i.e., x[i, j, :, :] is before x[i+1, j, :, :]
|
|
"""
|
|
if scaler:
|
|
df = scaler.transform(df)
|
|
num_samples, _ = df.shape
|
|
data = df.values
|
|
batch_len = num_samples // batch_size
|
|
data = np.expand_dims(data, axis=-1)
|
|
data_list = [data]
|
|
if add_time_in_day:
|
|
time_ind = (df.index.values - df.index.values.astype('datetime64[D]')) / np.timedelta64(1, 'D')
|
|
time_in_day = np.tile(time_ind, [1, num_nodes, 1]).transpose((2, 1, 0))
|
|
data_list.append(time_in_day)
|
|
if add_day_in_week:
|
|
day_in_week = np.zeros(shape=(num_samples, num_nodes, 7))
|
|
day_in_week[np.arange(num_samples), :, df.index.dayofweek] = 1
|
|
data_list.append(day_in_week)
|
|
|
|
data = np.concatenate(data_list, axis=-1)
|
|
data = data[:batch_size * batch_len, :, :].reshape((batch_size, batch_len, num_nodes, -1))
|
|
epoch_size = batch_len - seq_len - horizon + 1
|
|
x, y = [], []
|
|
for i in range(epoch_size):
|
|
x_i = data[:, i: i + seq_len, ...]
|
|
y_i = data[:, i + seq_len: i + seq_len + horizon, :, :]
|
|
x.append(x_i)
|
|
y.append(y_i)
|
|
x = np.stack(x, axis=0)
|
|
y = np.stack(y, axis=0)
|
|
return x, y
|
|
|
|
|
|
def round_down(num, divisor):
|
|
return num - (num % divisor)
|
|
|
|
|
|
def separate_seasonal_trend_and_residual(df, period, test_ratio=0.2, null_val=0., epsilon=1e-4):
|
|
"""
|
|
|
|
:param df:
|
|
:param period:
|
|
:param test_ratio: only use training part to calculate the average.
|
|
:param null_val: indicator of missing values. Assuming null_val
|
|
:param epsilon:
|
|
:return:
|
|
"""
|
|
n_sample, n_sensor = df.shape
|
|
n_test = int(round(n_sample * test_ratio))
|
|
n_train = n_sample - n_test
|
|
seasonal_trend = np.zeros((period, n_sensor), dtype=np.float32)
|
|
for i in range(period):
|
|
inds = [j for j in range(i % period, n_train, period)]
|
|
historical = df.iloc[inds, :]
|
|
seasonal_trend[i, :] = historical[historical != null_val].mean()
|
|
n_repeat = (n_sample + period - 1) // period
|
|
data = np.tile(seasonal_trend, [n_repeat, 1])[:n_sample, :]
|
|
seasonal_df = pd.DataFrame(data, index=df.index, columns=df.columns)
|
|
# Records where null value is happening.
|
|
|
|
missing_ind = df == null_val
|
|
residual_df = df - seasonal_df
|
|
residual_df[residual_df == null_val] += epsilon
|
|
residual_df[missing_ind] = null_val
|
|
return seasonal_df, residual_df
|
|
|
|
|
|
def train_test_split(x, y, test_ratio=0.2, random=False, granularity=1):
|
|
"""
|
|
This just splits data to training and testing parts. Default 80% train, 20% test
|
|
Format : data is in compressed sparse row format
|
|
|
|
Args:
|
|
:param x data
|
|
:param y label
|
|
:param test_ratio:
|
|
:param random: whether to randomize the input data.
|
|
:param granularity:
|
|
|
|
"""
|
|
perms = np.arange(0, x.shape[0])
|
|
if random:
|
|
perms = np.random.permutation(np.arange(0, x.shape[0]))
|
|
n_train = round_down(int(round(x.shape[0] * (1 - test_ratio))), granularity)
|
|
n_test = round_down(x.shape[0] - n_train, granularity)
|
|
x_train, y_train = x.take(perms[:n_train], axis=0), y.take(perms[:n_train], axis=0)
|
|
x_test, y_test = x.take(perms[n_train:n_train + n_test], axis=0), y.take(perms[n_train:n_train + n_test], axis=0)
|
|
return (x_train, y_train), (x_test, y_test)
|
|
|
|
|
|
def train_val_test_split_df(df, val_ratio=0.1, test_ratio=0.2):
|
|
n_sample, _ = df.shape
|
|
n_val = int(round(n_sample * val_ratio))
|
|
n_test = int(round(n_sample * test_ratio))
|
|
n_train = n_sample - n_val - n_test
|
|
train_data, val_data, test_data = df.iloc[:n_train, :], df.iloc[n_train: n_train + n_val, :], df.iloc[-n_test:, :]
|
|
return train_data, val_data, test_data
|