202 lines
6.9 KiB
Python
202 lines
6.9 KiB
Python
import os
|
||
import pickle
|
||
import argparse
|
||
import torch
|
||
import numpy as np
|
||
import os.path as osp
|
||
|
||
from sklearn.utils import shuffle
|
||
from torch.utils.data import Dataset
|
||
|
||
from federatedscope.core.data.utils import save_local_data
|
||
from federatedscope.cv.dataset.leaf import LEAF
|
||
|
||
|
||
def sigmoid(x):
|
||
return 1 / (1 + np.exp(-x))
|
||
|
||
|
||
def softmax(x):
|
||
ex = np.exp(x)
|
||
sum_ex = np.sum(np.exp(x))
|
||
return ex / sum_ex
|
||
|
||
|
||
class LEAF_SYNTHETIC(LEAF):
|
||
"""SYNTHETIC dataset from "Federated Multi-Task Learning
|
||
under a Mixture of Distributions"
|
||
|
||
Source: https://github.com/omarfoq/FedEM/tree/main/data/synthetic
|
||
|
||
Arguments:
|
||
root (str): root path.
|
||
name (str): name of dataset, `SYNTHETIC`.
|
||
n_components (int): number of mixture components, default=3.
|
||
n_task (int): number of tasks/clients, default = 300.
|
||
n_test (int): size of test set, default=5,000.
|
||
n_val (int): size of validation set, default=5,000.
|
||
dim (int): dimension of the data, default=150.
|
||
noise_level (float): proportion of noise, default=0.1.
|
||
alpha (float): alpha of LDA, default=0.4.
|
||
box (list): box of `x`, default=(-1.0, 1.0).
|
||
|
||
"""
|
||
def __init__(self,
|
||
root,
|
||
name='synthetic',
|
||
n_components=3,
|
||
n_tasks=300,
|
||
n_test=5000,
|
||
n_val=5000,
|
||
dim=150,
|
||
noise_level=0.1,
|
||
alpha=0.4,
|
||
box=(-1.0, 1.0),
|
||
uniform_marginal=True):
|
||
|
||
self.root = root
|
||
self.n_components = n_components
|
||
self.n_tasks = n_tasks
|
||
self.n_test = n_test
|
||
self.n_val = n_val
|
||
self.dim = dim
|
||
self.noise_level = noise_level
|
||
self.alpha = alpha * np.ones(n_components)
|
||
self.box = box
|
||
self.uniform_marginal = uniform_marginal
|
||
self.num_samples = self.get_num_samples(self.n_tasks)
|
||
|
||
self.theta = np.zeros((self.n_components, self.dim))
|
||
self.mixture_weights = np.zeros((self.n_tasks, self.n_components))
|
||
|
||
self.generate_mixture_weights()
|
||
self.generate_components()
|
||
|
||
super(LEAF_SYNTHETIC, self).__init__(root, name, None, None)
|
||
files = os.listdir(self.processed_dir)
|
||
files = [f for f in files if f.startswith('task_')]
|
||
if len(files):
|
||
# Sort by idx
|
||
files.sort(key=lambda k: int(k[5:]))
|
||
|
||
for file in files:
|
||
train_data, train_targets = torch.load(
|
||
osp.join(self.processed_dir, file, 'train.pt'))
|
||
test_data, test_targets = torch.load(
|
||
osp.join(self.processed_dir, file, 'test.pt'))
|
||
self.data_dict[int(file[5:])] = {
|
||
'train': (train_data, train_targets),
|
||
'test': (test_data, test_targets)
|
||
}
|
||
if osp.exists(osp.join(self.processed_dir, file, 'val.pt')):
|
||
val_data, val_targets = torch.load(
|
||
osp.join(self.processed_dir, file, 'val.pt'))
|
||
self.data_dict[int(file[5:])]['val'] = (val_data,
|
||
val_targets)
|
||
else:
|
||
raise RuntimeError(
|
||
'Please delete ‘processed’ folder and try again!')
|
||
|
||
def download(self):
|
||
pass
|
||
|
||
def extract(self):
|
||
pass
|
||
|
||
def __getitem__(self, index):
|
||
"""
|
||
Arguments:
|
||
index (int): Index
|
||
|
||
:returns:
|
||
dict: {'train':[(x, target)],
|
||
'test':[(x, target)],
|
||
'val':[(x, target)]}
|
||
where target is the target class.
|
||
"""
|
||
text_dict = {}
|
||
data = self.data_dict[index]
|
||
for key in data:
|
||
text_dict[key] = []
|
||
texts, targets = data[key]
|
||
for idx in range(targets.shape[0]):
|
||
text = texts[idx]
|
||
text_dict[key].append((text, targets[idx]))
|
||
|
||
return text_dict
|
||
|
||
def generate_mixture_weights(self):
|
||
for task_id in range(self.n_tasks):
|
||
self.mixture_weights[task_id] = np.random.dirichlet(
|
||
alpha=self.alpha)
|
||
|
||
def generate_components(self):
|
||
self.theta = np.random.uniform(self.box[0],
|
||
self.box[1],
|
||
size=(self.n_components, self.dim))
|
||
|
||
def generate_data(self, task_id, n_samples=10000):
|
||
latent_variable_count = np.random.multinomial(
|
||
n_samples, self.mixture_weights[task_id])
|
||
y = np.zeros(n_samples)
|
||
|
||
if self.uniform_marginal:
|
||
x = np.random.uniform(self.box[0],
|
||
self.box[1],
|
||
size=(n_samples, self.dim))
|
||
else:
|
||
raise NotImplementedError(
|
||
"Only uniform marginal is available for the moment")
|
||
|
||
current_index = 0
|
||
for component_id in range(self.n_components):
|
||
y_hat = x[current_index:current_index +
|
||
latent_variable_count[component_id]] @ self.theta[
|
||
component_id]
|
||
noise = np.random.normal(size=latent_variable_count[component_id],
|
||
scale=self.noise_level)
|
||
y[current_index:current_index +
|
||
latent_variable_count[component_id]] = np.round(
|
||
sigmoid(y_hat + noise)).astype(int)
|
||
|
||
return shuffle(x.astype(np.float32), y.astype(np.int64))
|
||
|
||
def save_metadata(self, path_):
|
||
metadata = dict()
|
||
metadata["mixture_weights"] = self.mixture_weights
|
||
metadata["theta"] = self.theta
|
||
|
||
with open(path_, 'wb') as f:
|
||
pickle.dump(metadata, f)
|
||
|
||
def get_num_samples(self,
|
||
num_tasks,
|
||
min_num_samples=50,
|
||
max_num_samples=1000):
|
||
num_samples = np.random.lognormal(4, 2, num_tasks).astype(int)
|
||
num_samples = [
|
||
min(s + min_num_samples, max_num_samples) for s in num_samples
|
||
]
|
||
return num_samples
|
||
|
||
def process(self):
|
||
for task_id in range(self.n_tasks):
|
||
save_path = os.path.join(self.processed_dir, f"task_{task_id}")
|
||
os.makedirs(save_path, exist_ok=True)
|
||
|
||
train_data, train_targets = self.generate_data(
|
||
task_id, self.num_samples[task_id])
|
||
test_data, test_targets = self.generate_data(task_id, self.n_test)
|
||
|
||
if self.n_val > 0:
|
||
val_data, val_targets = self.generate_data(task_id, self.n_val)
|
||
else:
|
||
val_data, val_targets = None, None
|
||
save_local_data(dir_path=save_path,
|
||
train_data=train_data,
|
||
train_targets=train_targets,
|
||
test_data=test_data,
|
||
test_targets=test_targets,
|
||
val_data=val_data,
|
||
val_targets=val_targets)
|