TrafficWheel/lib/normalization.py

175 lines
5.8 KiB
Python
Executable File

import numpy as np
import torch
class NScaler:
"""No normalization, returns the data as is."""
def transform(self, data):
return data
def inverse_transform(self, data):
return data
class StandardScaler:
"""Standardizes the input data by removing the mean and scaling to unit variance."""
def __init__(self, mean, std):
self.mean = mean
self.std = std
def transform(self, data):
return (data - self.mean) / self.std
def inverse_transform(self, data):
if isinstance(data, torch.Tensor) and isinstance(self.mean, np.ndarray):
self.std = torch.from_numpy(self.std).to(data.device).type(data.dtype)
self.mean = torch.from_numpy(self.mean).to(data.device).type(data.dtype)
return (data * self.std) + self.mean
class MinMax01Scaler:
"""Scales data to the range [0, 1]."""
def __init__(self, min, max):
self.min = min
self.max = max
def transform(self, data):
return (data - self.min) / (self.max - self.min)
def inverse_transform(self, data):
if isinstance(data, torch.Tensor) and isinstance(self.min, np.ndarray):
self.min = torch.from_numpy(self.min).to(data.device).type(data.dtype)
self.max = torch.from_numpy(self.max).to(data.device).type(data.dtype)
return (data * (self.max - self.min)) + self.min
class MinMax11Scaler:
"""Scales data to the range [-1, 1]."""
def __init__(self, min, max):
self.min = min
self.max = max
def transform(self, data):
return ((data - self.min) / (self.max - self.min)) * 2.0 - 1.0
def inverse_transform(self, data):
if isinstance(data, torch.Tensor) and isinstance(self.min, np.ndarray):
self.min = torch.from_numpy(self.min).to(data.device).type(data.dtype)
self.max = torch.from_numpy(self.max).to(data.device).type(data.dtype)
return ((data + 1.0) / 2.0) * (self.max - self.min) + self.min
class ColumnMinMaxScaler:
"""Scales data using column-specific min and max values."""
def __init__(self, min, max):
self.min = min
self.min_max = max - self.min
self.min_max[self.min_max == 0] = 1
def transform(self, data):
return (data - self.min) / self.min_max
def inverse_transform(self, data):
if isinstance(data, torch.Tensor) and isinstance(self.min, np.ndarray):
self.min_max = torch.from_numpy(self.min_max).to(data.device).type(torch.float32)
self.min = torch.from_numpy(self.min).to(data.device).type(torch.float32)
return (data * self.min_max) + self.min
def one_hot_by_column(data):
"""Applies one-hot encoding to each column of a 2D numpy array."""
len_data = data.shape[0]
encoded = []
for i in range(data.shape[1]):
column = data[:, i]
min_val = column.min()
zero_matrix = np.zeros((len_data, column.max() - min_val + 1))
zero_matrix[np.arange(len_data), column - min_val] = 1
encoded.append(zero_matrix)
return np.hstack(encoded)
def minmax_by_column(data):
"""Applies MinMax scaling to each column of a 2D numpy array."""
normalized = []
for i in range(data.shape[1]):
column = data[:, i]
min_val = column.min()
max_val = column.max()
column = (column - min_val) / (max_val - min_val)
normalized.append(column[:, np.newaxis])
return np.hstack(normalized)
def normalize_dataset(data, normalizer, column_wise=False):
if normalizer == 'max01':
if column_wise:
minimum = data.min(axis=0, keepdims=True)
maximum = data.max(axis=0, keepdims=True)
else:
minimum = data.min()
maximum = data.max()
scaler = MinMax01Scaler(minimum, maximum)
# data = scaler.transform(data)
# print('Normalize the dataset by MinMax01 Normalization')
elif normalizer == 'max11':
if column_wise:
minimum = data.min(axis=0, keepdims=True)
maximum = data.max(axis=0, keepdims=True)
else:
minimum = data.min()
maximum = data.max()
scaler = MinMax11Scaler(minimum, maximum)
# data = scaler.transform(data)
# print('Normalize the dataset by MinMax11 Normalization')
elif normalizer == 'std':
if column_wise:
mean = data.mean(axis=0, keepdims=True)
std = data.std(axis=0, keepdims=True)
else:
mean = data.mean()
std = data.std()
scaler = StandardScaler(mean, std)
# data = scaler.transform(data)
# print('Normalize the dataset by Standard Normalization')
elif normalizer == 'None':
scaler = NScaler()
# data = scaler.transform(data)
# print('Does not normalize the dataset')
elif normalizer == 'cmax':
scaler = ColumnMinMaxScaler(data.min(axis=0), data.max(axis=0))
# data = scaler.transform(data)
# print('Normalize the dataset by Column Min-Max Normalization')
else:
raise ValueError(f"Unsupported normalizer type: {normalizer}")
return scaler
if __name__ == '__main__':
test_data = np.array([[0, 0, 0, 1], [0, 1, 3, 2], [0, 2, 1, 3]])
print("Original data:\n", test_data)
# Example usage of scalers
min_val = test_data.min(axis=1)
max_val = test_data.max(axis=1)
print("Min values per row:\n", min_val)
print("Max values per row:\n", max_val)
normalized_data = (test_data - min_val[:, np.newaxis]) / (max_val[:, np.newaxis] - min_val[:, np.newaxis])
print("Normalized data:\n", normalized_data)
# Example usage of one-hot encoding
print("One-hot encoded data:\n", one_hot_by_column(test_data))
# Example usage of MinMax scaling by column
print("MinMax scaled data by column:\n", minmax_by_column(test_data))