179 lines
5.8 KiB
Python
Executable File
179 lines
5.8 KiB
Python
Executable File
import numpy as np
|
|
import torch
|
|
|
|
|
|
class NScaler:
|
|
"""No normalization, returns the data as is."""
|
|
|
|
def transform(self, data):
|
|
return data
|
|
|
|
def inverse_transform(self, data):
|
|
return data
|
|
|
|
|
|
class StandardScaler:
|
|
"""Standardizes the input data by removing the mean and scaling to unit variance."""
|
|
|
|
def __init__(self, mean, std):
|
|
self.mean = mean
|
|
self.std = std
|
|
|
|
def transform(self, data):
|
|
return (data - self.mean) / self.std
|
|
|
|
def inverse_transform(self, data):
|
|
if isinstance(data, torch.Tensor) and isinstance(self.mean, np.ndarray):
|
|
self.std = torch.from_numpy(self.std).to(data.device).type(data.dtype)
|
|
self.mean = torch.from_numpy(self.mean).to(data.device).type(data.dtype)
|
|
return (data * self.std) + self.mean
|
|
|
|
|
|
class MinMax01Scaler:
|
|
"""Scales data to the range [0, 1]."""
|
|
|
|
def __init__(self, min, max):
|
|
self.min = min
|
|
self.max = max
|
|
|
|
def transform(self, data):
|
|
return (data - self.min) / (self.max - self.min)
|
|
|
|
def inverse_transform(self, data):
|
|
if isinstance(data, torch.Tensor) and isinstance(self.min, np.ndarray):
|
|
self.min = torch.from_numpy(self.min).to(data.device).type(data.dtype)
|
|
self.max = torch.from_numpy(self.max).to(data.device).type(data.dtype)
|
|
return (data * (self.max - self.min)) + self.min
|
|
|
|
|
|
class MinMax11Scaler:
|
|
"""Scales data to the range [-1, 1]."""
|
|
|
|
def __init__(self, min, max):
|
|
self.min = min
|
|
self.max = max
|
|
|
|
def transform(self, data):
|
|
return ((data - self.min) / (self.max - self.min)) * 2.0 - 1.0
|
|
|
|
def inverse_transform(self, data):
|
|
if isinstance(data, torch.Tensor) and isinstance(self.min, np.ndarray):
|
|
self.min = torch.from_numpy(self.min).to(data.device).type(data.dtype)
|
|
self.max = torch.from_numpy(self.max).to(data.device).type(data.dtype)
|
|
return ((data + 1.0) / 2.0) * (self.max - self.min) + self.min
|
|
|
|
|
|
class ColumnMinMaxScaler:
|
|
"""Scales data using column-specific min and max values."""
|
|
|
|
def __init__(self, min, max):
|
|
self.min = min
|
|
self.min_max = max - self.min
|
|
self.min_max[self.min_max == 0] = 1
|
|
|
|
def transform(self, data):
|
|
return (data - self.min) / self.min_max
|
|
|
|
def inverse_transform(self, data):
|
|
if isinstance(data, torch.Tensor) and isinstance(self.min, np.ndarray):
|
|
self.min_max = (
|
|
torch.from_numpy(self.min_max).to(data.device).type(torch.float32)
|
|
)
|
|
self.min = torch.from_numpy(self.min).to(data.device).type(torch.float32)
|
|
return (data * self.min_max) + self.min
|
|
|
|
|
|
def one_hot_by_column(data):
|
|
"""Applies one-hot encoding to each column of a 2D numpy array."""
|
|
len_data = data.shape[0]
|
|
encoded = []
|
|
|
|
for i in range(data.shape[1]):
|
|
column = data[:, i]
|
|
min_val = column.min()
|
|
zero_matrix = np.zeros((len_data, column.max() - min_val + 1))
|
|
zero_matrix[np.arange(len_data), column - min_val] = 1
|
|
encoded.append(zero_matrix)
|
|
|
|
return np.hstack(encoded)
|
|
|
|
|
|
def minmax_by_column(data):
|
|
"""Applies MinMax scaling to each column of a 2D numpy array."""
|
|
normalized = []
|
|
|
|
for i in range(data.shape[1]):
|
|
column = data[:, i]
|
|
min_val = column.min()
|
|
max_val = column.max()
|
|
column = (column - min_val) / (max_val - min_val)
|
|
normalized.append(column[:, np.newaxis])
|
|
|
|
return np.hstack(normalized)
|
|
|
|
|
|
def normalize_dataset(data, normalizer, column_wise=False):
|
|
if normalizer == "max01":
|
|
if column_wise:
|
|
minimum = data.min(axis=0, keepdims=True)
|
|
maximum = data.max(axis=0, keepdims=True)
|
|
else:
|
|
minimum = data.min()
|
|
maximum = data.max()
|
|
scaler = MinMax01Scaler(minimum, maximum)
|
|
# data = scaler.transform(data)
|
|
# print('Normalize the dataset by MinMax01 Normalization')
|
|
elif normalizer == "max11":
|
|
if column_wise:
|
|
minimum = data.min(axis=0, keepdims=True)
|
|
maximum = data.max(axis=0, keepdims=True)
|
|
else:
|
|
minimum = data.min()
|
|
maximum = data.max()
|
|
scaler = MinMax11Scaler(minimum, maximum)
|
|
# data = scaler.transform(data)
|
|
# print('Normalize the dataset by MinMax11 Normalization')
|
|
elif normalizer == "std":
|
|
if column_wise:
|
|
mean = data.mean(axis=0, keepdims=True)
|
|
std = data.std(axis=0, keepdims=True)
|
|
else:
|
|
mean = data.mean()
|
|
std = data.std()
|
|
scaler = StandardScaler(mean, std)
|
|
# data = scaler.transform(data)
|
|
# print('Normalize the dataset by Standard Normalization')
|
|
elif normalizer == "None":
|
|
scaler = NScaler()
|
|
# data = scaler.transform(data)
|
|
# print('Does not normalize the dataset')
|
|
elif normalizer == "cmax":
|
|
scaler = ColumnMinMaxScaler(data.min(axis=0), data.max(axis=0))
|
|
# data = scaler.transform(data)
|
|
# print('Normalize the dataset by Column Min-Max Normalization')
|
|
else:
|
|
raise ValueError(f"Unsupported normalizer type: {normalizer}")
|
|
return scaler
|
|
|
|
|
|
if __name__ == "__main__":
|
|
test_data = np.array([[0, 0, 0, 1], [0, 1, 3, 2], [0, 2, 1, 3]])
|
|
print("Original data:\n", test_data)
|
|
|
|
# Example usage of scalers
|
|
min_val = test_data.min(axis=1)
|
|
max_val = test_data.max(axis=1)
|
|
print("Min values per row:\n", min_val)
|
|
print("Max values per row:\n", max_val)
|
|
|
|
normalized_data = (test_data - min_val[:, np.newaxis]) / (
|
|
max_val[:, np.newaxis] - min_val[:, np.newaxis]
|
|
)
|
|
print("Normalized data:\n", normalized_data)
|
|
|
|
# Example usage of one-hot encoding
|
|
print("One-hot encoded data:\n", one_hot_by_column(test_data))
|
|
|
|
# Example usage of MinMax scaling by column
|
|
print("MinMax scaled data by column:\n", minmax_by_column(test_data))
|