import numpy as np import os import torch def data_provider(args): data = {} for category in ['train', 'val', 'test']: cat_data = np.load(os.path.join(args.root_path, args.data_path, category + '.npz'),allow_pickle=True) data['x_' + category] = torch.Tensor(cat_data['x'].astype(float)).to(torch.device(args.device)) data['y_' + category] = torch.Tensor(cat_data['y'].astype(float)).to(torch.device(args.device)) data['train_loader'] = Data_Loader(data['x_train'], data['y_train'], args.batch_size) data['val_loader'] = Data_Loader(data['x_val'], data['y_val'], args.batch_size) data['test_loader'] = Data_Loader(data['x_test'], data['y_test'], args.batch_size) train_loader = data['train_loader'] vali_loader = data['val_loader'] test_loader = data['test_loader'] return train_loader, vali_loader, test_loader class Data_Loader(object): def __init__(self, xs, ys, batch_size, pad_with_last_sample=False): """ :param xs: :param ys: :param batch_size: :param pad_with_last_sample: pad with the last sample to make number of samples divisible to batch_size. """ self.batch_size = batch_size self.current_ind = 0 if pad_with_last_sample: num_padding = (batch_size - (len(xs) % batch_size)) % batch_size x_padding = np.repeat(xs[-1:], num_padding, axis=0) y_padding = np.repeat(ys[-1:], num_padding, axis=0) xs = np.concatenate([xs, x_padding], axis=0) ys = np.concatenate([ys, y_padding], axis=0) self.size = len(xs) self.num_batch = int(self.size // self.batch_size) self.xs = xs self.ys = ys def shuffle(self): permutation = np.random.permutation(self.size) xs, ys = self.xs[permutation], self.ys[permutation] self.xs = xs self.ys = ys def get_iterator(self): self.current_ind = 0 def _wrapper(): while self.current_ind < self.num_batch: start_ind = self.batch_size * self.current_ind end_ind = min(self.size, self.batch_size * (self.current_ind + 1)) x_i = self.xs[start_ind: end_ind, ...] y_i = self.ys[start_ind: end_ind, ...] yield (x_i, y_i) self.current_ind += 1 return _wrapper() class StandardScaler(): """ Standard the input """ def __init__(self, mean, std): self.mean = mean self.std = std def transform(self, data): return (data - self.mean) / self.std def inverse_transform(self, data): return (data * self.std) + self.mean class Data_Loader(object): def __init__(self, xs, ys, batch_size, pad_with_last_sample=False): """ :param xs: :param ys: :param batch_size: :param pad_with_last_sample: pad with the last sample to make number of samples divisible to batch_size. """ self.batch_size = batch_size self.current_ind = 0 if pad_with_last_sample: num_padding = (batch_size - (len(xs) % batch_size)) % batch_size x_padding = np.repeat(xs[-1:], num_padding, axis=0) y_padding = np.repeat(ys[-1:], num_padding, axis=0) xs = np.concatenate([xs, x_padding], axis=0) ys = np.concatenate([ys, y_padding], axis=0) self.size = len(xs) self.num_batch = int(self.size // self.batch_size) self.xs = xs self.ys = ys def shuffle(self): permutation = np.random.permutation(self.size) xs, ys = self.xs[permutation], self.ys[permutation] self.xs = xs self.ys = ys def get_iterator(self): self.current_ind = 0 def _wrapper(): while self.current_ind < self.num_batch: start_ind = self.batch_size * self.current_ind end_ind = min(self.size, self.batch_size * (self.current_ind + 1)) x_i = self.xs[start_ind: end_ind, ...] y_i = self.ys[start_ind: end_ind, ...] yield (x_i, y_i) self.current_ind += 1 return _wrapper()