170 lines
4.7 KiB
Python
170 lines
4.7 KiB
Python
import redis
|
|
import pickle
|
|
import subprocess
|
|
from celery import Celery
|
|
|
|
from federatedscope.organizer.utils import anonymize, args2yaml
|
|
|
|
|
|
# ---------------------------------------------------------------------- #
|
|
# Lobby related (global variable stored in Redis)
|
|
# ---------------------------------------------------------------------- #
|
|
class Lobby(object):
|
|
def __init__(self, host='localhost', port=6379, db=0):
|
|
self.r = redis.StrictRedis(host=host, port=port, db=db)
|
|
self.pool = []
|
|
self._set_up()
|
|
|
|
def _save(self, key, value):
|
|
"""
|
|
Save object to Redis via pickle.
|
|
"""
|
|
pickled_object = pickle.dumps(value)
|
|
self.r.set(key, pickled_object)
|
|
|
|
def _load(self, key):
|
|
"""
|
|
Load object from Redis via pickle.
|
|
"""
|
|
try:
|
|
value = pickle.loads(self.r.get(key))
|
|
except TypeError:
|
|
value = None
|
|
return value
|
|
|
|
def _set_up(self):
|
|
"""
|
|
Store all meta info in Redis.
|
|
"""
|
|
self._save('blacklist', [])
|
|
# key: room_id, value: configs of FS
|
|
self._save('room', {})
|
|
|
|
def _check_room(self, room, room_id):
|
|
"""
|
|
Check the validity of the room.
|
|
"""
|
|
if room_id in room.keys():
|
|
return True
|
|
else:
|
|
# TODO: check whether the room is full
|
|
return False
|
|
|
|
def _check_user(self):
|
|
"""
|
|
Check the validity of the user (whether in black list, etc).
|
|
"""
|
|
pass
|
|
|
|
def create_room(self, args, psw=None):
|
|
"""
|
|
Create FS server session and store args in Redis.
|
|
"""
|
|
self._check_user()
|
|
# Update room args in Redis
|
|
room = self._load('room')
|
|
room_id = str(len(room))
|
|
# TODO: we must convert arg line to yaml dict to avoid conflicts
|
|
# with port
|
|
meta_info = {
|
|
'command': args,
|
|
'cfg': args2yaml(args),
|
|
'psw': psw,
|
|
}
|
|
if room_id in room.keys():
|
|
raise ValueError
|
|
else:
|
|
room[room_id] = meta_info
|
|
self._save('room', room)
|
|
|
|
# Launch FS
|
|
input_args = args.split(' ')
|
|
cmd = ['python', '../../federatedscope/main.py'] + input_args
|
|
p = subprocess.Popen(cmd,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL)
|
|
self.pool.append(p)
|
|
|
|
return room_id
|
|
|
|
def display_room(self):
|
|
"""
|
|
Display all the joinable FS tasks.
|
|
"""
|
|
self._check_user()
|
|
mask_key = ['psw', 'cfg']
|
|
room = self._load('room')
|
|
for mask in mask_key:
|
|
room = anonymize(room, mask)
|
|
return room
|
|
|
|
def view_room(self, room_id, psw=None):
|
|
"""
|
|
View one specific FS task.
|
|
"""
|
|
self._check_user()
|
|
room = self._load('room')
|
|
if self._check_room(room, room_id):
|
|
target_room = self._load('room')[room_id]
|
|
if psw != target_room['psw']:
|
|
return 'Wrong Password!'
|
|
else:
|
|
return target_room
|
|
else:
|
|
return 'Target Room is full or invalid, please use ' \
|
|
'`update_room` to show all available rooms.'
|
|
|
|
def shut_down(self):
|
|
"""
|
|
Shut down all rooms and kill all subprocesses.
|
|
"""
|
|
for p in self.pool:
|
|
p.terminate()
|
|
self._save('room', {})
|
|
return True
|
|
|
|
|
|
# ---------------------------------------------------------------------- #
|
|
# Message related
|
|
# ---------------------------------------------------------------------- #
|
|
organizer = Celery('server',
|
|
broker='redis://localhost:6379/0',
|
|
backend='redis://localhost')
|
|
organizer.config_from_object('cfg_server')
|
|
lobby = Lobby()
|
|
|
|
|
|
# ---------------------------------------------------------------------- #
|
|
# Room related tasks
|
|
# ---------------------------------------------------------------------- #
|
|
@organizer.task
|
|
def create_room(args, psw):
|
|
print('Creating room...')
|
|
room_id = lobby.create_room(args, psw)
|
|
rtn_info = f"The room was created successfully and the id is {room_id}."
|
|
print(rtn_info)
|
|
return rtn_info
|
|
|
|
|
|
@organizer.task
|
|
def display_room():
|
|
room = lobby.display_room()
|
|
rtn_info = ""
|
|
for key, value in room.items():
|
|
tmp = f"room_id: {key}, info: {value}\n"
|
|
rtn_info += tmp
|
|
print(rtn_info)
|
|
return room
|
|
|
|
|
|
@organizer.task
|
|
def view_room(room_id, psw=None):
|
|
rtn_info = lobby.view_room(room_id, psw)
|
|
return rtn_info
|
|
|
|
|
|
@organizer.task
|
|
def shut_down():
|
|
lobby.shut_down()
|
|
return 'Shut down all rooms successfully.'
|