FS-TFP/federatedscope/core/configs/cfg_fl_setting.py

279 lines
13 KiB
Python

import logging
from federatedscope.core.configs.config import CN
from federatedscope.register import register_config
import torch
logger = logging.getLogger(__name__)
def extend_fl_setting_cfg(cfg):
# ---------------------------------------------------------------------- #
# Federate learning related options
# ---------------------------------------------------------------------- #
cfg.federate = CN()
cfg.federate.client_num = 0
cfg.federate.sample_client_num = -1
cfg.federate.sample_client_rate = -1.0
cfg.federate.unseen_clients_rate = 0.0
cfg.federate.total_round_num = 50
cfg.federate.mode = 'standalone'
cfg.federate.share_local_model = False
cfg.federate.data_weighted_aggr = False # If True, the weight of aggr is
# the number of training samples in dataset.
cfg.federate.online_aggr = False
cfg.federate.make_global_eval = False
cfg.federate.use_diff = False
cfg.federate.merge_test_data = False # For efficient simulation, users
# can choose to merge the test data and perform global evaluation,
# instead of perform test at each client
cfg.federate.merge_val_data = False # Enabled only when
# `merge_test_data` is True, also for efficient simulation
# the method name is used to internally determine composition of
# different aggregators, messages, handlers, etc.,
cfg.federate.method = "FedAvg"
cfg.federate.ignore_weight = False
cfg.federate.use_ss = False # Whether to apply Secret Sharing
cfg.federate.restore_from = ''
cfg.federate.save_to = ''
cfg.federate.join_in_info = [
] # The information requirements (from server) for join_in
cfg.federate.sampler = 'uniform' # the strategy for sampling client
# in each training round, ['uniform', 'group']
cfg.federate.resource_info_file = "" # the device information file to
# record computation and communication ability
# The configurations for parallel in standalone
cfg.federate.process_num = 1
cfg.federate.master_addr = '127.0.0.1' # parameter of torch distributed
cfg.federate.master_port = 29500 # parameter of torch distributed
# atc (TODO: merge later)
cfg.federate.atc_vanilla = False
cfg.federate.atc_load_from = ''
# ---------------------------------------------------------------------- #
# Distribute training related options
# ---------------------------------------------------------------------- #
cfg.distribute = CN()
cfg.distribute.use = False
cfg.distribute.server_host = '0.0.0.0'
cfg.distribute.server_port = 50050
cfg.distribute.client_host = '0.0.0.0'
cfg.distribute.client_port = 50050
cfg.distribute.role = 'client'
cfg.distribute.data_file = 'data'
cfg.distribute.data_idx = -1 # data_idx is used to specify the data
# index in distributed mode when adopting a centralized dataset for
# simulation (formatted as {data_idx: data/dataloader}).
# data_idx = -1 means that the whole dataset is owned by the participant.
# when data_idx is other invalid values excepted for -1, we randomly
# sample the data_idx for simulation
cfg.distribute.grpc_max_send_message_length = 300 * 1024 * 1024 # 300M
cfg.distribute.grpc_max_receive_message_length = 300 * 1024 * 1024 # 300M
cfg.distribute.grpc_enable_http_proxy = False
cfg.distribute.grpc_compression = 'nocompression' # [deflate, gzip]
# ---------------------------------------------------------------------- #
# Vertical FL related options (for demo)
# ---------------------------------------------------------------------- #
cfg.vertical = CN()
cfg.vertical.use = False
cfg.vertical.mode = 'feature_gathering'
# ['feature_gathering', 'label_scattering']
cfg.vertical.dims = [5, 10] # Client 1 has the first 5 features,
# and Client 2 has the last 5 features
cfg.vertical.encryption = 'paillier'
cfg.vertical.key_size = 3072
cfg.vertical.algo = 'lr' # ['lr', 'xgb', 'gbdt', 'rf']
cfg.vertical.feature_subsample_ratio = 1.0
cfg.vertical.protect_object = '' # [feature_order, grad_and_hess]
cfg.vertical.protect_method = ''
# [dp, op_boost] for protect_object = feature_order
# [he] for protect_object = grad_and_hess
cfg.vertical.protect_args = []
# Default values for 'dp': {'bucket_num':100, 'epsilon':None}
# Default values for 'op_boost': {'algo':'global', 'lower_bound':1,
# 'upper_bound':100, 'epsilon':2}
cfg.vertical.eval_protection = '' # ['', 'he']
cfg.vertical.data_size_for_debug = 0 # use a subset for debug in vfl,
# 0 indicates using the entire dataset (disable debug mode)
# --------------- register corresponding check function ----------
cfg.register_cfg_check_fun(assert_fl_setting_cfg)
def assert_fl_setting_cfg(cfg):
assert cfg.federate.mode in ["standalone", "distributed"], \
f"Please specify the cfg.federate.mode as the string standalone or " \
f"distributed. But got {cfg.federate.mode}."
# ============= client num related ==============
assert not (cfg.federate.client_num == 0
and cfg.federate.mode == 'distributed'
), "Please configure the cfg.federate. in distributed mode. "
assert 0 <= cfg.federate.unseen_clients_rate < 1, \
"You specified in-valid cfg.federate.unseen_clients_rate"
if 0 < cfg.federate.unseen_clients_rate < 1 and cfg.federate.method in [
"local", "global"
]:
logger.warning(
"In local/global training mode, the unseen_clients_rate is "
"in-valid, plz check your config")
unseen_clients_rate = 0.0
cfg.federate.unseen_clients_rate = unseen_clients_rate
else:
unseen_clients_rate = cfg.federate.unseen_clients_rate
participated_client_num = max(
1, int((1 - unseen_clients_rate) * cfg.federate.client_num))
# sample client num pre-process
sample_client_num_valid = (
0 < cfg.federate.sample_client_num <=
cfg.federate.client_num) and cfg.federate.client_num != 0
sample_client_rate_valid = (0 < cfg.federate.sample_client_rate <= 1)
sample_cfg_valid = sample_client_rate_valid or sample_client_num_valid
non_sample_case = cfg.federate.method in ["local", "global"]
if non_sample_case and sample_cfg_valid:
logger.warning("In local/global training mode, "
"the sampling related configs are in-valid, "
"we will use all clients. ")
if cfg.federate.method == "global":
logger.info(
"In global training mode, we will put all data in a proxy client. "
)
if cfg.federate.make_global_eval:
cfg.federate.make_global_eval = False
logger.warning(
"In global training mode, we will conduct global evaluation "
"in a proxy client rather than the server. The configuration "
"cfg.federate.make_global_eval will be False.")
if non_sample_case or not sample_cfg_valid:
# (a) use all clients
# in standalone mode, federate.client_num may be modified from 0 to
# num_of_all_clients after loading the data
if cfg.federate.client_num != 0:
cfg.federate.sample_client_num = participated_client_num
else:
# (b) sampling case
if sample_client_rate_valid:
# (b.1) use sample_client_rate
old_sample_client_num = cfg.federate.sample_client_num
cfg.federate.sample_client_num = max(
1,
int(cfg.federate.sample_client_rate * participated_client_num))
if sample_client_num_valid:
logger.warning(
f"Users specify both valid sample_client_rate as"
f" {cfg.federate.sample_client_rate} "
f"and sample_client_num as {old_sample_client_num}.\n"
f"\t\tWe will use the sample_client_rate value to "
f"calculate "
f"the actual number of participated clients as"
f" {cfg.federate.sample_client_num}.")
# (b.2) use sample_client_num, commented since the below two
# lines do not change anything
# elif sample_client_num_valid:
# cfg.federate.sample_client_num = \
# cfg.federate.sample_client_num
if cfg.federate.use_ss:
assert cfg.federate.client_num == cfg.federate.sample_client_num, \
"Currently, we support secret sharing only in " \
"all-client-participation case"
assert cfg.federate.method != "local", \
"Secret sharing is not supported in local training mode"
# ============= aggregator related ================
assert (not cfg.federate.online_aggr) or (
not cfg.federate.use_ss
), "Have not supported to use online aggregator and secrete sharing at " \
"the same time"
assert not cfg.federate.merge_test_data or (
cfg.federate.merge_test_data and cfg.federate.mode == 'standalone'
), "The operation of merging test data can only used in standalone for " \
"efficient simulation, please change 'federate.merge_test_data' to " \
"False or change 'federate.mode' to 'distributed'."
if cfg.federate.merge_test_data and not cfg.federate.make_global_eval:
cfg.federate.make_global_eval = True
logger.warning('Set cfg.federate.make_global_eval=True since '
'cfg.federate.merge_test_data=True')
if cfg.federate.process_num > 1 and cfg.federate.mode != 'standalone':
cfg.federate.process_num = 1
logger.warning('Parallel training can only be used in standalone mode'
', thus cfg.federate.process_num is modified to 1')
if cfg.federate.process_num > 1 and not torch.cuda.is_available():
cfg.federate.process_num = 1
logger.warning(
'No GPU found for your device, set cfg.federate.process_num=1')
if torch.cuda.device_count() < cfg.federate.process_num:
cfg.federate.process_num = torch.cuda.device_count()
logger.warning(
'We found the number of gpu is insufficient, '
f'thus cfg.federate.process_num={cfg.federate.process_num}')
# TODO
if cfg.vertical.use:
if cfg.vertical.algo == 'lr' and hasattr(cfg, "trainer") and \
cfg.trainer.type != 'none':
logger.warning(f"When given cfg.vertical.algo = 'lr', the value "
f"of cfg.trainer.type is expected to be 'none' "
f"but got {cfg.trainer.type}. Therefore "
f"cfg.trainer.type is changed to 'none' here")
cfg.trainer.type = 'none'
if cfg.vertical.algo == 'lr' and hasattr(cfg, "model") and \
cfg.model.type != 'lr':
logger.warning(f"When given cfg.vertical.algo = 'lr', the value "
f"of cfg.model.type is expected to be 'lr' "
f"but got {cfg.model.type}. Therefore "
f"cfg.model.type is changed to 'lr' here")
cfg.model.type = 'lr'
if cfg.vertical.algo in ['xgb', 'gbdt'] and hasattr(cfg, "trainer") \
and cfg.trainer.type.lower() != 'verticaltrainer':
logger.warning(
f"When given cfg.vertical.algo = 'xgb' or 'gbdt', the value "
f"of cfg.trainer.type is expected to be "
f"'verticaltrainer' but got {cfg.trainer.type}. "
f"Therefore cfg.trainer.type is changed to "
f"'verticaltrainer' here")
cfg.trainer.type = 'verticaltrainer'
if cfg.vertical.algo == 'xgb' and hasattr(cfg, "model") and \
cfg.model.type != 'xgb_tree':
logger.warning(f"When given cfg.vertical.algo = 'xgb', the value "
f"of cfg.model.type is expected to be 'xgb_tree' "
f"but got {cfg.model.type}. Therefore "
f"cfg.model.type is changed to 'xgb_tree' here")
cfg.model.type = 'xgb_tree'
elif cfg.vertical.algo == 'gbdt' and hasattr(cfg, "model") and \
cfg.model.type != 'gbdt_tree':
logger.warning(f"When given cfg.vertical.algo = 'gbdt', the value "
f"of cfg.model.type is expected to be 'gbdt_tree' "
f"but got {cfg.model.type}. Therefore "
f"cfg.model.type is changed to 'gbdt_tree' here")
cfg.model.type = 'gbdt_tree'
if not (cfg.vertical.feature_subsample_ratio > 0
and cfg.vertical.feature_subsample_ratio <= 1.0):
raise ValueError(f'The value of vertical.feature_subsample_ratio '
f'must be in (0, 1.0], but got '
f'{cfg.vertical.feature_subsample_ratio}')
if cfg.distribute.use and cfg.distribute.grpc_compression.lower() not in [
'nocompression', 'deflate', 'gzip'
]:
raise ValueError(f'The type of grpc compression is expected to be one '
f'of ["nocompression", "deflate", "gzip"], but got '
f'{cfg.distribute.grpc_compression}.')
register_config("fl_setting", extend_fl_setting_cfg)