FS-TFP/federatedscope/organizer/client.py

257 lines
9.6 KiB
Python

import cmd2
import time
from celery import Celery
from cmd2 import Bg, Fg, style
from federatedscope.core.configs.config import CN
from federatedscope.organizer.cfg_client import server_ip
from federatedscope.organizer.utils import SSHManager, config2cmdargs, \
flatten_dict
organizer = Celery()
organizer.config_from_object('cfg_client')
class OrganizerClient(cmd2.Cmd):
SEVER_CATEGORY = 'Server Related Commands'
ECS_CATEGORY = 'ECS Related Commands'
TASK_CATEGORY = 'Task Related Commands'
# Maintained several dict
ecs_dict, room_dict = {}, {}
timeout = 10
def __init__(self):
super().__init__(
multiline_commands=['echo'],
include_ipy=True,
)
self.intro = style(
'Welcome to the FS organizer shell. Type help or ? to list '
'commands.\n',
fg=Fg.BLUE,
bg=Bg.WHITE,
bold=True)
self.prompt = 'FederatedScope>> '
self.self_in_py = True
self.default_category = 'Built-in Commands'
self.debug = True
self.foreground_color = Fg.CYAN.name.lower()
def fancy_output(self, out):
return self.poutput(style(out, fg=Fg.GREEN, bg=Bg.WHITE))
# ---------------------------------------------------------------------- #
# SSH Manager related
# ---------------------------------------------------------------------- #
@cmd2.with_category(ECS_CATEGORY)
def do_add_ecs(self, line):
'Usage: add_ecs ip user psw\n\n' \
'Add ECS to client control list\n\n' \
'required arguments:\n' \
' ip, ip address 172.X.X.X\n' \
' user, user name of ECS\n' \
' psw, password of user\n\n' \
'Example:\n' \
' add_ecs 172.X.X.X root 12345\n'
try:
ip, user, psw = line.split(' ')
key = f"{ip}"
if key in self.ecs_dict:
raise ValueError(f"ECS `{key}` already exists.")
self.ecs_dict[key] = SSHManager(ip, user, psw)
self.fancy_output(f"{self.ecs_dict[key]} added.")
except Exception as error:
self.pexcept(f"Exception: {error}")
@cmd2.with_category(ECS_CATEGORY)
def do_del_ecs(self, line):
'Usage: del_ecs ip\n\n' \
'Delete ECS from client control list\n\n' \
'required arguments:\n' \
' ip, ip address 172.X.X.X\n\n' \
'Example:\n' \
' del_ecs 172.X.X.X\n'
try:
key = line
self.fancy_output(f"Delete {key}: {self.ecs_dict[key]}.")
# TODO: Del all task
del self.ecs_dict[key]
except Exception as error:
self.pexcept(f"Exception: {error}")
@cmd2.with_category(ECS_CATEGORY)
def do_display_ecs(self, line):
'Usage: display_ecs' \
'Display saved ECS in client control list\n\n' \
'Example:\n' \
' display_ecs\n'
try:
info = ""
for key, value in self.ecs_dict.items():
info += f"ecs: {key}, info: {value}\n"
self.fancy_output(info)
except Exception as error:
self.pexcept(f"Exception: {error}")
@cmd2.with_category(ECS_CATEGORY)
def do_join_room(self, line):
'Usage: join_room ip room_id other_opts\n\n' \
'Let an ECS join a specific room\n\n' \
'required arguments:\n' \
' ip, ip address 172.X.X.X\n' \
' room_id, room id joining \n' \
' other_opts, other operations in FS\n\n' \
'Example:\n' \
' join_room 172.X.X.X 0 device 0 distribute.data_idx 2 ...\n'
try:
line = line.split(' ')
ip, room_id, opts = line[0], line[1], line[2:]
ecs, room = self.ecs_dict[ip], self.room_dict[room_id]
if room['cfg'] == '******':
raise ValueError('Please view_room before joining.')
cfg = CN(room['cfg'])
# Convert necessary configurations
cfg['distribute']['server_host'] = server_ip
cfg['distribute']['client_host'] = ip
cfg['distribute']['role'] = 'client'
# Merge other opts and convert to command string
cfg.merge_from_list(opts)
cfg = config2cmdargs(flatten_dict(cfg))
command = ''
for i in cfg:
value = f'{i}'.replace(' ', '')
command += f' "{value}"'
command = command[1:]
pid = ecs.launch_task(command)
self.fancy_output(f'{ecs.ip}({pid}) launched,')
except Exception as error:
self.pexcept(f"Exception: {error}")
# ---------------------------------------------------------------------- #
# Task manager related
# ---------------------------------------------------------------------- #
@cmd2.with_category(TASK_CATEGORY)
def do_display_task(self, line):
'Usage: display_task' \
'Display all running tasks in client task list\n\n' \
'Example:\n' \
' display_task\n'
# TODO: add abort, check status, etc
for i in self.ecs_dict:
self.fancy_output(f'{self.ecs_dict[i].ip}:'
f' {self.ecs_dict[i].tasks}')
# ---------------------------------------------------------------------- #
# Server related messages
# ---------------------------------------------------------------------- #
@cmd2.with_category(SEVER_CATEGORY)
def do_create_room(self, line):
'Usage: create_room command psw\n\n' \
'Create FS room in server with specific command\n\n' \
'required arguments:\n' \
' command, extra command to launch FS\n' \
' psw, password for room \n\n' \
'Example:\n' \
' create_room --cfg ../../scripts/example_configs' \
'/distributed_femnist_server.yaml 12345\n'
try:
global organizer
psw = line.split(' ')[-1]
command = line[:-len(psw) - 1]
result = organizer.send_task('server.create_room', [command, psw])
cnt = 0
while (not result.ready()) and cnt < self.timeout:
self.fancy_output(
'Waiting for response... (will re-try in 1s)')
time.sleep(1)
cnt += 1
self.fancy_output(result.get(timeout=1))
except Exception as error:
self.pexcept(f"Exception: {error}")
@cmd2.with_category(SEVER_CATEGORY)
def do_update_room(self, line):
'Usage: update_room' \
'Fetch all FS rooms from Lobby (will forget all saved room)\n\n' \
'Example:\n' \
' update_room\n'
try:
global organizer
self.fancy_output('Forget all saved room due to `update_room`.')
result = organizer.send_task('server.display_room')
cnt = 0
while (not result.ready()) and cnt < self.timeout:
self.fancy_output(
'Waiting for response... (will re-try in 1s)')
time.sleep(1)
cnt += 1
self.room_dict = result.get(timeout=1)
info = ""
for key, value in self.room_dict.items():
tmp = f"room_id: {key}, info: {value}\n"
info += tmp
self.fancy_output(info)
except Exception as error:
self.pexcept(f"Exception: {error}")
@cmd2.with_category(SEVER_CATEGORY)
def do_view_room(self, line):
'Usage: view_room room_id psw verbose\n\n' \
'View specific FS room\n\n' \
'required arguments:\n' \
' room_id, extra command to launch FS\n' \
' psw, password for room \n' \
' verbose,\n' \
' 0: print no information\n' \
' 1: print information of a specific room\n' \
' 2: print information of all the rooms\n\n' \
'Example:\n' \
' view_room 0 12345 0\n'
try:
global organizer
room_id, psw, verbose = line.split(' ')
result = organizer.send_task('server.view_room', [room_id, psw])
cnt = 0
while (not result.ready()) and cnt < self.timeout:
self.fancy_output(
'Waiting for response... (will re-try in 1s)')
time.sleep(1)
cnt += 1
info = result.get(timeout=1)
if isinstance(info, dict):
self.room_dict[room_id] = info
self.fancy_output(
f'Room {room_id} has been updated to joinable.')
if verbose == '1':
self.fancy_output(info)
elif verbose == '2':
self.fancy_output(self.room_dict)
else:
self.fancy_output(info)
except Exception as error:
self.pexcept(f"Exception: {error}")
@cmd2.with_category(SEVER_CATEGORY)
def do_shut_down(self, line):
'Usage: shut_down' \
'Shut down all rooms and quit\n\n' \
'Example:\n' \
' shut_down\n'
global organizer
result = organizer.send_task('server.shut_down')
cnt = 0
while (not result.ready()) and cnt < self.timeout:
self.fancy_output('Waiting for response... (will re-try in 1s)')
time.sleep(1)
cnt += 1
self.fancy_output(result.get(timeout=1))
return True
if __name__ == '__main__':
OrganizerClient().cmdloop()