Compare commits
19 Commits
7836e2e0c4
...
03a33bf920
| Author | SHA1 | Date |
|---|---|---|
|
|
03a33bf920 | |
|
|
1fb98ec20d | |
|
|
c7eb8cf849 | |
|
|
4a501e6819 | |
|
|
09c5595ca9 | |
|
|
987ad34a17 | |
|
|
87b4ad10d1 | |
|
|
267ee463ce | |
|
|
f43502cc21 | |
|
|
5fdab2b668 | |
|
|
e95c13f0fc | |
|
|
9cfe3f01dc | |
|
|
0b41f04d3c | |
|
|
1b25920188 | |
|
|
fc6ed02806 | |
|
|
50785ad3c1 | |
|
|
b9ca3349c9 | |
|
|
22b229891a | |
|
|
578ff48c71 |
246
README.md
246
README.md
|
|
@ -1,148 +1,260 @@
|
||||||
# FS-TFP
|
# FedDGCN: A Scalable Federated Learning Framework for Traffic Flow Prediction
|
||||||
This is the offical repository of **FedDGCN**: A Scalable Federated Learning
|
|
||||||
Framework for Traffic Flow Prediction.
|
|
||||||
|
|
||||||

|
This is the official repository of **FedDGCN: A Scalable Federated Learning Framework for Traffic Flow Prediction.**
|
||||||
|
|
||||||
It is also a the traffic flow prediction extension based on [FederatedScope](https://github.com/alibaba/FederatedScope).
|

|
||||||
|
|
||||||
NOTE: This is an early version of **FedDGCN**. The full version will be updated after testing is completed.
|
**FedDGCN** extends [FederatedScope](https://github.com/alibaba/FederatedScope) to support federated traffic flow prediction.
|
||||||
|
|
||||||
|
> **Note:** This is an early version of **FedDGCN**. The full version will be released after testing is completed.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
# 1. Environment
|
## Table of Contents
|
||||||
|
- [1. Environment Setup](#1-environment-setup)
|
||||||
|
- [Step 1: Create a Conda Environment](#step-1-create-a-conda-environment)
|
||||||
|
- [Step 2: Install PyTorch](#step-2-install-pytorch)
|
||||||
|
- [Step 3: Install FederatedScope](#step-3-install-federatedscope)
|
||||||
|
- [2. Run the Code](#2-run-the-code)
|
||||||
|
- [Step 1: Prepare the Datasets](#step-1-prepare-the-datasets)
|
||||||
|
- [Step 2: Configure the Settings](#step-2-configure-the-settings)
|
||||||
|
- [Step 3: Run the Experiments](#step-3-run-the-experiments)
|
||||||
|
- [3. Visualize Results](#3-visualize-results)
|
||||||
|
- [4. Citation](#4-citation)
|
||||||
|
- [5. Acknowledgements](#5-acknowledgements)
|
||||||
|
|
||||||
We run the experiment on a **Linux system**, i.e **Ubuntu 22.04**. It has not been tested on other systems yet.
|
---
|
||||||
|
|
||||||
## Step 1. Create a Conda env
|
## 1. Environment Setup
|
||||||
|
|
||||||
|
### Step 1: Create a Conda Environment
|
||||||
We recommend using a **Conda** virtual environment. This project supports **Python 3.9** (recommended) and **Python 3.10**.
|
We recommend using a **Conda** virtual environment. This project supports **Python 3.9** (recommended) and **Python 3.10**.
|
||||||
|
|
||||||
**WARNING: Python 3.11 and later versions are not compatible!**
|
> **Warning:** Python 3.11 and later versions are not compatible.
|
||||||
|
|
||||||
```
|
```bash
|
||||||
conda create -n FedDGCN python=3.9
|
conda create -n FedDGCN python=3.9
|
||||||
conda activate FedDGCN
|
conda activate FedDGCN
|
||||||
```
|
```
|
||||||
|
|
||||||
## Step 2. Install Pytorch
|
### Step 2: Install PyTorch
|
||||||
|
Download the appropriate version of [PyTorch](https://pytorch.org/get-started/locally/) based on your device.
|
||||||
Download the appropriate version of [PyTorch]( https://pytorch.org/get-started/locally/) based on your device.
|
|
||||||
|
|
||||||
This project has been tested with **Torch 2.4.0 (recommended)** and **Torch 2.0.0** with **CUDA 12**. Compatibility with other versions is not guaranteed.
|
This project has been tested with **Torch 2.4.0 (recommended)** and **Torch 2.0.0** with **CUDA 12**. Compatibility with other versions is not guaranteed.
|
||||||
|
|
||||||
## Step 3. Install FederatedScope
|
### Step 3: Install FederatedScope
|
||||||
|
Clone this repository and install it:
|
||||||
|
|
||||||
git clone this repository, and
|
```bash
|
||||||
|
|
||||||
```
|
|
||||||
cd FS-TFP
|
cd FS-TFP
|
||||||
pip install -e .
|
pip install -e .
|
||||||
```
|
```
|
||||||
|
|
||||||
Additionally, you might need to install some extra packages to avoid annoying warnings.
|
Additionally, install the required packages to avoid warnings:
|
||||||
|
|
||||||
```
|
```bash
|
||||||
pip install torch_geometric community rdkit
|
pip install torch_geometric community rdkit
|
||||||
```
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 2. Run the Code
|
||||||
|
|
||||||
# 2. Run the Code
|
### Step 1: Prepare the Datasets
|
||||||
|
Download the PeMS datasets from the **[STSGCN repository](https://github.com/Davidham3/STSGCN)**.
|
||||||
|
After downloading, extract the datasets and place them in the `./data/trafficflow` directory.
|
||||||
|
|
||||||
## Step 1. Prepare the datasets
|
The directory structure should be as follows:
|
||||||
|
|
||||||
You need to download the PeMS dataset from the **[STSGCN](https://github.com/Davidham3/STSGCN)** repository following README. After downloading, extract the dataset and place it in the `./data/trafficflow` directory at the root of the project.
|
|
||||||
|
|
||||||
The directory structure of `./data/trafficflow` should be as follows:
|
|
||||||
|
|
||||||
```
|
```
|
||||||
FS-TFP\DATA\TRAFFICFLOW
|
FS-TFP/data/trafficflow
|
||||||
├─PeMS03
|
├─PeMS03
|
||||||
├─PeMS04
|
├─PeMS04
|
||||||
├─PeMS07
|
├─PeMS07
|
||||||
└─PeMS08
|
└─PeMS08
|
||||||
```
|
```
|
||||||
|
|
||||||
## Step 2. Check your Setting
|
### Step 2: Configure the Settings
|
||||||
|
Run scripts for the four datasets are located in the `./scripts/trafficflow_exp_scripts/` directory.
|
||||||
|
|
||||||
We have placed the run scripts for the four datasets in the `./scripts/trafficflow_exp_scripts/` directory.
|
Each dataset has a YAML configuration file: `{D3, D4, D7, D8}.yaml`.
|
||||||
|
|
||||||
There are YAML files for four datasets: `{D3, D4, D7, D8}.yaml`.
|
To replicate the experimental outcomes, we advise executing the scripts as they are, without altering any parameters.
|
||||||
|
|
||||||
You can customize the parameters or use the presets we provide.
|
You can customize the parameters or use the presets. Key configurable parameters include:
|
||||||
|
|
||||||
Some key parameters include:
|
```yaml
|
||||||
|
# Line 3: GPU device to use (for multi-GPU machines)
|
||||||
```
|
|
||||||
# Line 3: Adjust the GPU device to use (for multi-GPU machines)
|
|
||||||
device: 0
|
device: 0
|
||||||
|
|
||||||
# Line 8: Adjust the total number of training rounds
|
# Line 8: Total number of training rounds
|
||||||
total_round_num: <number_of_rounds>
|
total_round_num: <number_of_rounds>
|
||||||
|
|
||||||
# Line 9: Adjust the number of clients based on your machine configuration
|
# Line 9: Number of clients
|
||||||
client_num: <number_of_clients>
|
client_num: <number_of_clients>
|
||||||
|
|
||||||
# Line 65: Adjust the training loss function
|
# Line 65: Training loss function
|
||||||
# Options: L1Loss, RMSE, MAPE
|
# Options: L1Loss, RMSE, MAPE
|
||||||
criterion:
|
criterion:
|
||||||
type: <loss_function>
|
type: <loss_function>
|
||||||
```
|
```
|
||||||
|
|
||||||
**WARNING:** Processing the **PEMSD7** dataset may require more than **32GB** RAM. If your system lacks sufficient RAM, it is recommended to increase the size of the swap partition.
|
> **Warning:** Processing the **PeMSD7** dataset may require more than **32GB RAM**.
|
||||||
|
> If your system lacks sufficient RAM, increase the size of the swap partition.
|
||||||
|
|
||||||
|
### Step 3: Run the Experiments
|
||||||
|
|
||||||
|
Use the following commands to run **FedDGCN**:
|
||||||
|
|
||||||
## Step 3. Run the experiments
|
```bash
|
||||||
|
# Run experiments on different datasets
|
||||||
You can use the following command to run **FedDGCN** directly. It is recommended to create the corresponding run configuration in your IDE based on the command below:
|
python federatedscope/main.py --cfg scripts/trafficflow_exp_scripts/D3.yaml # PeMSD3
|
||||||
|
python federatedscope/main.py --cfg scripts/trafficflow_exp_scripts/D4.yaml # PeMSD4
|
||||||
```
|
python federatedscope/main.py --cfg scripts/trafficflow_exp_scripts/D7.yaml # PeMSD7
|
||||||
# PEMSD3
|
python federatedscope/main.py --cfg scripts/trafficflow_exp_scripts/D8.yaml # PeMSD8
|
||||||
python federatedscope/main.py --cfg scripts/trafficflow_exp_scripts/D3.yaml
|
|
||||||
# PEMSD4
|
|
||||||
python federatedscope/main.py --cfg scripts/trafficflow_exp_scripts/D4.yaml
|
|
||||||
# PEMSD7
|
|
||||||
python federatedscope/main.py --cfg scripts/trafficflow_exp_scripts/D7.yaml
|
|
||||||
# PEMSD8
|
|
||||||
python federatedscope/main.py --cfg scripts/trafficflow_exp_scripts/D8.yaml
|
|
||||||
```
|
```
|
||||||
|
|
||||||
|
If you see output similar to the image below, congratulations! You have successfully run the experiment:
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
If you see the following output in your terminal, congratulations! You have successfully run the experiment:
|
---
|
||||||
|
|
||||||

|
## 3. Visualize Results
|
||||||
|
|
||||||
|
The experiment logs will be saved in the **`exp`** folder.
|
||||||
|
|
||||||
|
We provide a script, **`global.py`**, in the same folder. Replace the old logs with the new logs from the experiments and run the script to visualize the results:
|
||||||
|
|
||||||
# 3. Visualize the result
|
```bash
|
||||||
|
|
||||||
The experiment logs will be placed in the **exp** folder. We have written a script, **global.py**, in the **exp** folder. You need to replace the previous logs with the new ones generated from the experiment. Once replaced, simply run the script to visualize the experiment results.
|
|
||||||
|
|
||||||
```
|
|
||||||
python exp/global.py
|
python exp/global.py
|
||||||
```
|
```
|
||||||
|
|
||||||
The script will generate a **baseline.jpg** file to visualize the logs. You are also free to modify the script to implement additional functionality as needed.
|
This will generate a **baseline.jpg** file to visualize the logs.
|
||||||
|
|
||||||
You may install matplotlib first for drawing:
|
To install the required package for visualization:
|
||||||
|
|
||||||
```
|
```bash
|
||||||
pip install matplotlib
|
pip install matplotlib
|
||||||
```
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 4. Citation
|
||||||
# Citation
|
|
||||||
|
|
||||||
TBD
|
TBD
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 5. Acknowledgements
|
||||||
|
|
||||||
|
Special thanks to the authors of [FederatedScope](https://github.com/alibaba/FederatedScope), upon which this project is built.
|
||||||
|
|
||||||
|
We also express our gratitude to the following projects: [AGCRN](https://github.com/LeiBAI/AGCRN), [STG-NCDE](https://github.com/jeongwhanchoi/STG-NCDE), [RGDAN](https://github.com/wengwenchao123/RGDAN).
|
||||||
|
|
||||||
|
|
||||||
# Acknowledgements
|
|
||||||
|
|
||||||
We would like to extend our gratitude to the authors of the following works: [FederatedScope](https://github.com/alibaba/FederatedScope).
|
|
||||||
|
|
||||||
Our codes are built upon their open-source projects.
|
# How to improve our framework?
|
||||||
|
|
||||||
|
We welcome the community to help expand and improve our framework! This guide outlines how to customize configurations, models, datasets, trainers, and loss functions.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
|
||||||
|
## 📋 How to Add Configurations
|
||||||
|
|
||||||
|
1. **Create a YAML file**
|
||||||
|
Use the `./scripts` folder as a reference. We recommend copying an existing configuration and modifying it.
|
||||||
|
|
||||||
|
2. **Update Core Configurations**
|
||||||
|
Go to `./federatedscope/core/configs/`, locate `cfg_model`, and add your configurations with default values. For example:
|
||||||
|
```python
|
||||||
|
cfg.model.num_nodes = 0
|
||||||
|
cfg.model.rnn_units = 64
|
||||||
|
cfg.model.dropout = 0.1
|
||||||
|
```
|
||||||
|
|
||||||
|
3. **Add Nested Parameters**
|
||||||
|
If needed, create nested parameters using `CN()`:
|
||||||
|
```python
|
||||||
|
cfg.model.next = CN()
|
||||||
|
cfg.model.next.default = 1
|
||||||
|
```
|
||||||
|
|
||||||
|
4. **Sync Parameters Across Configs**
|
||||||
|
Ensure the parameters in `cfg_model` align with those in other configs (e.g., `cfg_trafficflow`) to avoid compatibility issues across systems (Windows, Linux, etc.).
|
||||||
|
|
||||||
|
5. **Customize YAML**
|
||||||
|
After adding parameters to config files (e.g., `cfg_data`, `cfg_training`), customize them in the corresponding YAML file.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🛠️ How to Add a Model
|
||||||
|
|
||||||
|
1. **Create Your Model**
|
||||||
|
Save your model in the `federatedscope/trafficflow/model/` folder (or another location of your choice).
|
||||||
|
Add the model name to `model:type` in the configuration YAML file.
|
||||||
|
|
||||||
|
2. **Register the Model**
|
||||||
|
In `federatedscope/core/auxiliaries/model_builder.py`, add logic for your model (around line 214):
|
||||||
|
```python
|
||||||
|
elif model_config.type.lower() in ['your_model']:
|
||||||
|
from federatedscope.trafficflow.model.your_model import YourModel
|
||||||
|
model = YourModel(model_config)
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 📊 How to Add a Dataset
|
||||||
|
|
||||||
|
1. **Create a DataLoader**
|
||||||
|
Implement a function in `federatedscope/trafficflow/dataloader/` to generate data for clients. The function should return a list of dictionaries (one per client) with `['train']`, `['val']`, and `['test']` datasets. Each dataset should be a `torch.utils.data.TensorDataset` containing `x` and `label` tensors.
|
||||||
|
|
||||||
|
2. **Register the DataLoader**
|
||||||
|
Update `federatedscope/core/data/utils.py` (around line 108) to import your dataloader:
|
||||||
|
```python
|
||||||
|
elif config.data.type.lower() in ['trafficflow']:
|
||||||
|
from federatedscope.trafficflow.dataloader.traffic_dataloader import load_traffic_data
|
||||||
|
dataset, modified_config = load_traffic_data(config, client_cfgs)
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🎓 How to Customize Your Trainer
|
||||||
|
|
||||||
|
1. **Create a Custom Trainer**
|
||||||
|
Implement your trainer in `federatedscope/trafficflow/trainer/`. Inherit from an existing trainer, e.g.:
|
||||||
|
```python
|
||||||
|
from federatedscope.core.trainers.torch_trainer import GeneralTorchTrainer as Trainer
|
||||||
|
|
||||||
|
class TrafficflowTrainer(Trainer):
|
||||||
|
# Overwrite methods or hooks as needed
|
||||||
|
```
|
||||||
|
|
||||||
|
2. **Reference Examples**
|
||||||
|
Review existing trainers for additional guidance.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 🔧 How to Add a Custom Loss Function
|
||||||
|
|
||||||
|
1. **Implement the Loss Function**
|
||||||
|
Create a file in `federatedscope/contrib/loss/` and write your custom loss function.
|
||||||
|
|
||||||
|
2. **Register the Loss Function**
|
||||||
|
Register your loss function using `register_criterion`:
|
||||||
|
|
||||||
|
```python
|
||||||
|
from federatedscope.register import register_criterion
|
||||||
|
|
||||||
|
register_criterion('RMSE', call_my_criterion)
|
||||||
|
register_criterion('MAPE', call_my_criterion)
|
||||||
|
```
|
||||||
|
|
||||||
|
3. **Update Configuration**
|
||||||
|
Set the loss function in the configuration YAML file using `criterion:type`.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
By following these steps, you can extend and customize the framework to meet your needs. Happy coding! 🎉
|
||||||
|
|
|
||||||
|
|
@ -205,8 +205,12 @@ def get_model(model_config, local_data=None, backend='torch'):
|
||||||
from federatedscope.nlp.hetero_tasks.model import ATCModel
|
from federatedscope.nlp.hetero_tasks.model import ATCModel
|
||||||
model = ATCModel(model_config)
|
model = ATCModel(model_config)
|
||||||
elif model_config.type.lower() in ['feddgcn']:
|
elif model_config.type.lower() in ['feddgcn']:
|
||||||
from federatedscope.trafficflow.model.FedDGCN import FedDGCN
|
if model_config.use_minigraph is False:
|
||||||
model = FedDGCN(model_config)
|
from federatedscope.trafficflow.model.FedDGCN import FedDGCN
|
||||||
|
model = FedDGCN(model_config)
|
||||||
|
else:
|
||||||
|
from federatedscope.trafficflow.model.FedDGCNv2 import FederatedFedDGCN
|
||||||
|
model = FederatedFedDGCN(model_config)
|
||||||
else:
|
else:
|
||||||
raise ValueError('Model {} is not provided'.format(model_config.type))
|
raise ValueError('Model {} is not provided'.format(model_config.type))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -62,6 +62,8 @@ def extend_model_cfg(cfg):
|
||||||
cfg.model.cheb_order = 1 # A tuple, e.g., (in_channel, h, w)
|
cfg.model.cheb_order = 1 # A tuple, e.g., (in_channel, h, w)
|
||||||
cfg.model.use_day = True
|
cfg.model.use_day = True
|
||||||
cfg.model.use_week = True
|
cfg.model.use_week = True
|
||||||
|
cfg.model.minigraph_size = 5
|
||||||
|
cfg.model.use_minigraph = False
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------- #
|
# ---------------------------------------------------------------------- #
|
||||||
|
|
|
||||||
|
|
@ -30,6 +30,8 @@ def extend_trafficflow_cfg(cfg):
|
||||||
cfg.model.cheb_order = 1 # A tuple, e.g., (in_channel, h, w)
|
cfg.model.cheb_order = 1 # A tuple, e.g., (in_channel, h, w)
|
||||||
cfg.model.use_day = True
|
cfg.model.use_day = True
|
||||||
cfg.model.use_week = True
|
cfg.model.use_week = True
|
||||||
|
cfg.model.minigraph_size = 5
|
||||||
|
cfg.model.use_minigraph = False
|
||||||
|
|
||||||
# ---------------------------------------------------------------------- #
|
# ---------------------------------------------------------------------- #
|
||||||
# Criterion related options
|
# Criterion related options
|
||||||
|
|
|
||||||
|
|
@ -107,8 +107,13 @@ def load_dataset(config, client_cfgs=None):
|
||||||
modified_config = config
|
modified_config = config
|
||||||
elif config.data.type.lower() in [
|
elif config.data.type.lower() in [
|
||||||
'trafficflow']:
|
'trafficflow']:
|
||||||
from federatedscope.trafficflow.dataloader.traffic_dataloader import load_traffic_data
|
if config.model.use_minigraph is False:
|
||||||
dataset, modified_config = load_traffic_data(config, client_cfgs)
|
from federatedscope.trafficflow.dataloader.traffic_dataloader import load_traffic_data
|
||||||
|
dataset, modified_config = load_traffic_data(config, client_cfgs)
|
||||||
|
else:
|
||||||
|
from federatedscope.trafficflow.dataloader.traffic_dataloader_v2 import load_traffic_data
|
||||||
|
dataset, modified_config = load_traffic_data(config, client_cfgs)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise ValueError('Dataset {} not found.'.format(config.data.type))
|
raise ValueError('Dataset {} not found.'.format(config.data.type))
|
||||||
return dataset, modified_config
|
return dataset, modified_config
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,227 @@
|
||||||
|
import numpy as np
|
||||||
|
import torch
|
||||||
|
import torch.utils.data
|
||||||
|
from federatedscope.trafficflow.dataset.add_window import add_window_horizon
|
||||||
|
from federatedscope.trafficflow.dataset.normalization import (
|
||||||
|
NScaler, MinMax01Scaler, MinMax11Scaler, StandardScaler, ColumnMinMaxScaler)
|
||||||
|
from federatedscope.trafficflow.dataset.traffic_dataset import load_st_dataset
|
||||||
|
def normalize_dataset(data, normalizer, column_wise=False):
|
||||||
|
if normalizer == 'max01':
|
||||||
|
if column_wise:
|
||||||
|
minimum = data.min(axis=0, keepdims=True)
|
||||||
|
maximum = data.max(axis=0, keepdims=True)
|
||||||
|
else:
|
||||||
|
minimum = data.min()
|
||||||
|
maximum = data.max()
|
||||||
|
scaler = MinMax01Scaler(minimum, maximum)
|
||||||
|
data = scaler.transform(data)
|
||||||
|
print('Normalize the dataset by MinMax01 Normalization')
|
||||||
|
elif normalizer == 'max11':
|
||||||
|
if column_wise:
|
||||||
|
minimum = data.min(axis=0, keepdims=True)
|
||||||
|
maximum = data.max(axis=0, keepdims=True)
|
||||||
|
else:
|
||||||
|
minimum = data.min()
|
||||||
|
maximum = data.max()
|
||||||
|
scaler = MinMax11Scaler(minimum, maximum)
|
||||||
|
data = scaler.transform(data)
|
||||||
|
print('Normalize the dataset by MinMax11 Normalization')
|
||||||
|
elif normalizer == 'std':
|
||||||
|
if column_wise:
|
||||||
|
mean = data.mean(axis=0, keepdims=True)
|
||||||
|
std = data.std(axis=0, keepdims=True)
|
||||||
|
else:
|
||||||
|
mean = data.mean()
|
||||||
|
std = data.std()
|
||||||
|
scaler = StandardScaler(mean, std)
|
||||||
|
# data = scaler.transform(data)
|
||||||
|
print('Normalize the dataset by Standard Normalization')
|
||||||
|
elif normalizer == 'None':
|
||||||
|
scaler = NScaler()
|
||||||
|
data = scaler.transform(data)
|
||||||
|
print('Does not normalize the dataset')
|
||||||
|
elif normalizer == 'cmax':
|
||||||
|
#column min max, to be depressed
|
||||||
|
#note: axis must be the spatial dimension, please check !
|
||||||
|
scaler = ColumnMinMaxScaler(data.min(axis=0), data.max(axis=0))
|
||||||
|
data = scaler.transform(data)
|
||||||
|
print('Normalize the dataset by Column Min-Max Normalization')
|
||||||
|
else:
|
||||||
|
raise ValueError
|
||||||
|
return scaler
|
||||||
|
|
||||||
|
|
||||||
|
def split_data_by_days(data, val_days, test_days, interval=30):
|
||||||
|
"""
|
||||||
|
:param data: [B, *]
|
||||||
|
:param val_days:
|
||||||
|
:param test_days:
|
||||||
|
:param interval: interval (15, 30, 60) minutes
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
t = int((24 * 60) / interval)
|
||||||
|
x = -t * int(test_days)
|
||||||
|
test_data = data[-t * int(test_days):]
|
||||||
|
val_data = data[-t * int(test_days + val_days): -t * int(test_days)]
|
||||||
|
train_data = data[:-t * int(test_days + val_days)]
|
||||||
|
return train_data, val_data, test_data
|
||||||
|
|
||||||
|
|
||||||
|
def split_data_by_ratio(data, val_ratio, test_ratio):
|
||||||
|
data_len = data.shape[0]
|
||||||
|
test_data = data[-int(data_len * test_ratio):]
|
||||||
|
val_data = data[-int(data_len * (test_ratio + val_ratio)):-int(data_len * test_ratio)]
|
||||||
|
train_data = data[:-int(data_len * (test_ratio + val_ratio))]
|
||||||
|
return train_data, val_data, test_data
|
||||||
|
|
||||||
|
|
||||||
|
def data_loader(X, Y, batch_size, shuffle=True, drop_last=True):
|
||||||
|
cuda = True if torch.cuda.is_available() else False
|
||||||
|
TensorFloat = torch.cuda.FloatTensor if cuda else torch.FloatTensor
|
||||||
|
X, Y = TensorFloat(X), TensorFloat(Y)
|
||||||
|
data = torch.utils.data.TensorDataset(X, Y)
|
||||||
|
dataloader = torch.utils.data.DataLoader(data, batch_size=batch_size,
|
||||||
|
shuffle=shuffle, drop_last=drop_last)
|
||||||
|
return dataloader
|
||||||
|
|
||||||
|
|
||||||
|
def load_traffic_data(config, client_cfgs):
|
||||||
|
root = config.data.root
|
||||||
|
dataName = 'PEMSD' + root[-1]
|
||||||
|
raw_data = load_st_dataset(dataName)
|
||||||
|
|
||||||
|
l, n, f = raw_data.shape
|
||||||
|
|
||||||
|
feature_list = [raw_data]
|
||||||
|
|
||||||
|
|
||||||
|
# numerical time_in_day
|
||||||
|
time_ind = [i % config.data.steps_per_day / config.data.steps_per_day for i in range(raw_data.shape[0])]
|
||||||
|
time_ind = np.array(time_ind)
|
||||||
|
time_in_day = np.tile(time_ind, [1, n, 1]).transpose((2, 1, 0))
|
||||||
|
feature_list.append(time_in_day)
|
||||||
|
|
||||||
|
# numerical day_in_week
|
||||||
|
day_in_week = [(i // config.data.steps_per_day) % config.data.days_per_week for i in range(raw_data.shape[0])]
|
||||||
|
day_in_week = np.array(day_in_week)
|
||||||
|
day_in_week = np.tile(day_in_week, [1, n, 1]).transpose((2, 1, 0))
|
||||||
|
feature_list.append(day_in_week)
|
||||||
|
|
||||||
|
# data = np.concatenate(feature_list, axis=-1)
|
||||||
|
single = False
|
||||||
|
x, y = add_window_horizon(raw_data, config.data.lag, config.data.horizon, single)
|
||||||
|
x_day, y_day = add_window_horizon(time_in_day, config.data.lag, config.data.horizon, single)
|
||||||
|
x_week, y_week = add_window_horizon(day_in_week, config.data.lag, config.data.horizon, single)
|
||||||
|
x, y = np.concatenate([x, x_day, x_week], axis=-1), np.concatenate([y, y_day, y_week], axis=-1)
|
||||||
|
|
||||||
|
# split dataset by days or by ratio
|
||||||
|
if config.data.test_ratio > 1:
|
||||||
|
x_train, x_val, x_test = split_data_by_days(x, config.data.val_ratio, config.data.test_ratio)
|
||||||
|
y_train, y_val, y_test = split_data_by_days(y, config.data.val_ratio, config.data.test_ratio)
|
||||||
|
else:
|
||||||
|
x_train, x_val, x_test = split_data_by_ratio(x, config.data.val_ratio, config.data.test_ratio)
|
||||||
|
y_train, y_val, y_test = split_data_by_ratio(y, config.data.val_ratio, config.data.test_ratio)
|
||||||
|
|
||||||
|
# normalize st data
|
||||||
|
normalizer = 'std'
|
||||||
|
scaler = normalize_dataset(x_train[..., :config.model.input_dim], normalizer, config.data.column_wise)
|
||||||
|
config.data.scaler = [float(scaler.mean), float(scaler.std)]
|
||||||
|
|
||||||
|
x_train[..., :config.model.input_dim] = scaler.transform(x_train[..., :config.model.input_dim])
|
||||||
|
x_val[..., :config.model.input_dim] = scaler.transform(x_val[..., :config.model.input_dim])
|
||||||
|
x_test[..., :config.model.input_dim] = scaler.transform(x_test[..., :config.model.input_dim])
|
||||||
|
|
||||||
|
# Client-side dataset splitting
|
||||||
|
node_num = config.data.num_nodes
|
||||||
|
client_num = config.federate.client_num
|
||||||
|
per_samples = node_num // client_num
|
||||||
|
data_list, cur_index = dict(), 0
|
||||||
|
input_dim, output_dim = config.model.input_dim, config.model.output_dim
|
||||||
|
for i in range(client_num):
|
||||||
|
if cur_index + per_samples <= node_num:
|
||||||
|
# Normal slicing
|
||||||
|
sub_array_train = x_train[:, :, cur_index:cur_index + per_samples, :]
|
||||||
|
sub_array_val = x_val[:, :, cur_index:cur_index + per_samples, :]
|
||||||
|
sub_array_test = x_test[:, :, cur_index:cur_index + per_samples, :]
|
||||||
|
|
||||||
|
sub_y_train = y_train[:, :, cur_index:cur_index + per_samples, :output_dim]
|
||||||
|
sub_y_val = y_val[:, :, cur_index:cur_index + per_samples, :output_dim]
|
||||||
|
sub_y_test = y_test[:, :, cur_index:cur_index + per_samples, :output_dim]
|
||||||
|
else:
|
||||||
|
# If there are not enough nodes to fill per_samples, pad with zero columns
|
||||||
|
sub_array_train = x_train[:, :, cur_index:cur_index + per_samples, :]
|
||||||
|
sub_array_val = x_val[:, :, cur_index:cur_index + per_samples, :]
|
||||||
|
sub_array_test = x_test[:, :, cur_index:cur_index + per_samples, :]
|
||||||
|
padding = np.zeros((x_train.shape[0], config.data.lag ,config.data.lag, per_samples - x_train.shape[1], config.model.output_dim))
|
||||||
|
sub_array_train = np.concatenate((sub_array_train, padding), axis=2)
|
||||||
|
sub_array_val = np.concatenate((sub_array_val, padding), axis=2)
|
||||||
|
sub_array_test = np.concatenate((sub_array_test, padding), axis=2)
|
||||||
|
|
||||||
|
sub_y_train = y_train[:, :, cur_index:cur_index + per_samples, :]
|
||||||
|
sub_y_val = y_val[:, :, cur_index:cur_index + per_samples, :]
|
||||||
|
sub_y_test = y_test[:, :, cur_index:cur_index + per_samples, :]
|
||||||
|
sub_y_train = np.concatenate((sub_y_train, padding), axis=2)
|
||||||
|
sub_y_val = np.concatenate((sub_y_val, padding), axis=2)
|
||||||
|
sub_y_test = np.concatenate((sub_y_test, padding), axis=2)
|
||||||
|
|
||||||
|
device = 'cuda' if torch.cuda.is_available() else 'cpu'
|
||||||
|
|
||||||
|
minigraph_size = config.model.minigraph_size
|
||||||
|
|
||||||
|
data_list[i + 1] = {
|
||||||
|
'train': torch.utils.data.TensorDataset(
|
||||||
|
torch.tensor(split_into_mini_graphs(sub_array_train, minigraph_size), dtype=torch.float, device=device),
|
||||||
|
torch.tensor(split_into_mini_graphs(sub_y_train, minigraph_size), dtype=torch.float, device=device)
|
||||||
|
),
|
||||||
|
'val': torch.utils.data.TensorDataset(
|
||||||
|
torch.tensor(split_into_mini_graphs(sub_array_val, minigraph_size), dtype=torch.float, device=device),
|
||||||
|
torch.tensor(split_into_mini_graphs(sub_y_val, minigraph_size), dtype=torch.float, device=device)
|
||||||
|
),
|
||||||
|
'test': torch.utils.data.TensorDataset(
|
||||||
|
torch.tensor(split_into_mini_graphs(sub_array_test, minigraph_size), dtype=torch.float, device=device),
|
||||||
|
torch.tensor(split_into_mini_graphs(sub_y_test, minigraph_size), dtype=torch.float, device=device)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
cur_index += per_samples
|
||||||
|
config.model.num_nodes = per_samples
|
||||||
|
return data_list, config
|
||||||
|
|
||||||
|
|
||||||
|
def split_into_mini_graphs(tensor, graph_size, dummy_value=0):
|
||||||
|
"""
|
||||||
|
Splits a tensor into mini-graphs of specified size. Pads the last mini-graph with dummy nodes if necessary.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
tensor (np.ndarray): Input tensor with shape (timestep, horizon, node_num, dim).
|
||||||
|
graph_size (int): The size of each mini-graph.
|
||||||
|
dummy_value (float, optional): The value to use for dummy nodes. Default is 0.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
np.ndarray: Output tensor with shape (timestep, horizon, graph_num, graph_size, dim).
|
||||||
|
"""
|
||||||
|
timestep, horizon, node_num, dim = tensor.shape
|
||||||
|
|
||||||
|
# Calculate the number of mini-graphs
|
||||||
|
graph_num = (node_num + graph_size - 1) // graph_size # Round up division
|
||||||
|
|
||||||
|
# Initialize output tensor with dummy values
|
||||||
|
output = np.full((timestep, horizon, graph_num, graph_size, dim), dummy_value, dtype=tensor.dtype)
|
||||||
|
|
||||||
|
# Fill in the real data
|
||||||
|
for i in range(graph_num):
|
||||||
|
start_idx = i * graph_size
|
||||||
|
end_idx = min(start_idx + graph_size, node_num) # Ensure we don't exceed the node number
|
||||||
|
slice_size = end_idx - start_idx
|
||||||
|
|
||||||
|
# Assign the data to the corresponding mini-graph
|
||||||
|
output[:, :, i, :slice_size, :] = tensor[:, :, start_idx:end_idx, :]
|
||||||
|
|
||||||
|
return output
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
a = 'data/trafficflow/PeMS04'
|
||||||
|
name = 'PEMSD' + a[-1]
|
||||||
|
raw_data = load_st_dataset(name)
|
||||||
|
pass
|
||||||
|
|
@ -0,0 +1,169 @@
|
||||||
|
from torch.nn import ModuleList
|
||||||
|
import torch
|
||||||
|
import torch.nn as nn
|
||||||
|
from federatedscope.trafficflow.model.DGCRUCell import DGCRUCell
|
||||||
|
import time
|
||||||
|
|
||||||
|
class DGCRM(nn.Module):
|
||||||
|
def __init__(self, node_num, dim_in, dim_out, cheb_k, embed_dim, num_layers=1):
|
||||||
|
super(DGCRM, self).__init__()
|
||||||
|
assert num_layers >= 1, 'At least one DCRNN layer in the Encoder.'
|
||||||
|
self.node_num = node_num
|
||||||
|
self.input_dim = dim_in
|
||||||
|
self.num_layers = num_layers
|
||||||
|
self.DGCRM_cells = nn.ModuleList()
|
||||||
|
self.DGCRM_cells.append(DGCRUCell(node_num, dim_in, dim_out, cheb_k, embed_dim))
|
||||||
|
for _ in range(1, num_layers):
|
||||||
|
self.DGCRM_cells.append(DGCRUCell(node_num, dim_out, dim_out, cheb_k, embed_dim))
|
||||||
|
|
||||||
|
def forward(self, x, init_state, node_embeddings):
|
||||||
|
assert x.shape[2] == self.node_num and x.shape[3] == self.input_dim
|
||||||
|
seq_length = x.shape[1]
|
||||||
|
current_inputs = x
|
||||||
|
output_hidden = []
|
||||||
|
for i in range(self.num_layers):
|
||||||
|
state = init_state[i]
|
||||||
|
inner_states = []
|
||||||
|
for t in range(seq_length):
|
||||||
|
state = self.DGCRM_cells[i](current_inputs[:, t, :, :], state, [node_embeddings[0][:, t, :, :], node_embeddings[1]])
|
||||||
|
inner_states.append(state)
|
||||||
|
output_hidden.append(state)
|
||||||
|
current_inputs = torch.stack(inner_states, dim=1)
|
||||||
|
return current_inputs, output_hidden
|
||||||
|
|
||||||
|
def init_hidden(self, batch_size):
|
||||||
|
init_states = []
|
||||||
|
for i in range(self.num_layers):
|
||||||
|
init_states.append(self.DGCRM_cells[i].init_hidden_state(batch_size))
|
||||||
|
return torch.stack(init_states, dim=0) #(num_layers, B, N, hidden_dim)
|
||||||
|
|
||||||
|
# Build you torch or tf model class here
|
||||||
|
class FedDGCN(nn.Module):
|
||||||
|
def __init__(self, args):
|
||||||
|
super(FedDGCN, self).__init__()
|
||||||
|
# print("You are in subminigraph")
|
||||||
|
self.num_node = args.minigraph_size
|
||||||
|
self.input_dim = args.input_dim
|
||||||
|
self.hidden_dim = args.rnn_units
|
||||||
|
self.output_dim = args.output_dim
|
||||||
|
self.horizon = args.horizon
|
||||||
|
self.num_layers = args.num_layers
|
||||||
|
self.use_D = args.use_day
|
||||||
|
self.use_W = args.use_week
|
||||||
|
self.dropout1 = nn.Dropout(p=args.dropout) # 0.1
|
||||||
|
self.dropout2 = nn.Dropout(p=args.dropout)
|
||||||
|
self.node_embeddings1 = nn.Parameter(torch.randn(self.num_node, args.embed_dim), requires_grad=True)
|
||||||
|
self.node_embeddings2 = nn.Parameter(torch.randn(self.num_node, args.embed_dim), requires_grad=True)
|
||||||
|
self.T_i_D_emb = nn.Parameter(torch.empty(288, args.embed_dim))
|
||||||
|
self.D_i_W_emb = nn.Parameter(torch.empty(7, args.embed_dim))
|
||||||
|
# Initialize parameters
|
||||||
|
nn.init.xavier_uniform_(self.node_embeddings1)
|
||||||
|
nn.init.xavier_uniform_(self.T_i_D_emb)
|
||||||
|
nn.init.xavier_uniform_(self.D_i_W_emb)
|
||||||
|
|
||||||
|
self.encoder1 = DGCRM(args.minigraph_size, args.input_dim, args.rnn_units, args.cheb_order,
|
||||||
|
args.embed_dim, args.num_layers)
|
||||||
|
self.encoder2 = DGCRM(args.minigraph_size, args.input_dim, args.rnn_units, args.cheb_order,
|
||||||
|
args.embed_dim, args.num_layers)
|
||||||
|
# predictor
|
||||||
|
self.end_conv1 = nn.Conv2d(1, args.horizon * self.output_dim, kernel_size=(1, self.hidden_dim), bias=True)
|
||||||
|
self.end_conv2 = nn.Conv2d(1, args.horizon * self.output_dim, kernel_size=(1, self.hidden_dim), bias=True)
|
||||||
|
self.end_conv3 = nn.Conv2d(1, args.horizon * self.output_dim, kernel_size=(1, self.hidden_dim), bias=True)
|
||||||
|
|
||||||
|
def forward(self, source):
|
||||||
|
node_embedding1 = self.node_embeddings1
|
||||||
|
if self.use_D:
|
||||||
|
t_i_d_data = source[..., 1]
|
||||||
|
T_i_D_emb = self.T_i_D_emb[(t_i_d_data * 288).type(torch.LongTensor)]
|
||||||
|
node_embedding1 = torch.mul(node_embedding1, T_i_D_emb)
|
||||||
|
|
||||||
|
if self.use_W:
|
||||||
|
d_i_w_data = source[..., 2]
|
||||||
|
D_i_W_emb = self.D_i_W_emb[(d_i_w_data).type(torch.LongTensor)]
|
||||||
|
node_embedding1 = torch.mul(node_embedding1, D_i_W_emb)
|
||||||
|
|
||||||
|
node_embeddings=[node_embedding1,self.node_embeddings1]
|
||||||
|
|
||||||
|
source = source[..., 0].unsqueeze(-1)
|
||||||
|
|
||||||
|
init_state1 = self.encoder1.init_hidden(source.shape[0])
|
||||||
|
output, _ = self.encoder1(source, init_state1, node_embeddings)
|
||||||
|
output = self.dropout1(output[:, -1:, :, :])
|
||||||
|
|
||||||
|
output1 = self.end_conv1(output)
|
||||||
|
source1 = self.end_conv2(output)
|
||||||
|
|
||||||
|
source2 = source - source1
|
||||||
|
|
||||||
|
init_state2 = self.encoder2.init_hidden(source2.shape[0])
|
||||||
|
output2, _ = self.encoder2(source2, init_state2, node_embeddings)
|
||||||
|
output2 = self.dropout2(output2[:, -1:, :, :])
|
||||||
|
output2 = self.end_conv3(output2)
|
||||||
|
|
||||||
|
return output1 + output2
|
||||||
|
|
||||||
|
|
||||||
|
class FederatedFedDGCN(nn.Module):
|
||||||
|
def __init__(self, args):
|
||||||
|
super(FederatedFedDGCN, self).__init__()
|
||||||
|
|
||||||
|
# Initializing with None, we will populate model_list during the forward pass
|
||||||
|
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
||||||
|
self.model_list = None
|
||||||
|
self.graph_num = (args.num_nodes + args.minigraph_size - 1) // args.minigraph_size
|
||||||
|
self.args = args
|
||||||
|
self.model_list = ModuleList(FedDGCN(self.args).to(self.device) for _ in range(self.graph_num))
|
||||||
|
|
||||||
|
def forward(self, source):
|
||||||
|
"""
|
||||||
|
Forward pass for the federated model. Each subgraph processes its portion of the data,
|
||||||
|
and then the results are aggregated.
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
- source: Tensor of shape (batchsize, horizon, subgraph_num, subgraph_size, dims)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
- Aggregated output (batchsize, horizon, subgraph_num, subgraph_size, dims)
|
||||||
|
"""
|
||||||
|
self.subgraph_num = source.shape[2]
|
||||||
|
|
||||||
|
# Initialize a list to store the outputs of each subgraph model
|
||||||
|
subgraph_outputs = []
|
||||||
|
|
||||||
|
# Iterate through the subgraph models
|
||||||
|
# Parallel computation has not been realized yet, so it may slower than normal.
|
||||||
|
for i in range(self.subgraph_num):
|
||||||
|
# Extract the subgraph-specific data
|
||||||
|
subgraph_data = source[:, :, i, :, :] # (batchsize, horizon, subgraph_size, dims)
|
||||||
|
|
||||||
|
# Forward pass for each subgraph model
|
||||||
|
subgraph_output = self.model_list[i](subgraph_data)
|
||||||
|
subgraph_outputs.append(subgraph_output)
|
||||||
|
|
||||||
|
# Reshape the outputs into (batchsize, horizon, subgraph_num, subgraph_size, dims)
|
||||||
|
output_tensor = torch.stack(subgraph_outputs, dim=2) # (batchsize, horizon, subgraph_num, subgraph_size, dims)
|
||||||
|
self.local_aggregate()
|
||||||
|
return output_tensor
|
||||||
|
|
||||||
|
def local_aggregate(self):
|
||||||
|
"""
|
||||||
|
Update the parameters of each model in model_list to the average of all models' parameters.
|
||||||
|
"""
|
||||||
|
with torch.no_grad(): # Ensure no gradients are calculated during the update
|
||||||
|
# Iterate over each model in model_list
|
||||||
|
for i, model in enumerate(self.model_list):
|
||||||
|
# Iterate over each model's parameters
|
||||||
|
for name, param in model.named_parameters():
|
||||||
|
# Initialize a container for the average value
|
||||||
|
avg_param = torch.zeros_like(param)
|
||||||
|
|
||||||
|
# Accumulate the corresponding parameters from all other models
|
||||||
|
for other_model in self.model_list:
|
||||||
|
avg_param += other_model.state_dict()[name]
|
||||||
|
|
||||||
|
# Calculate the average
|
||||||
|
avg_param /= len(self.model_list)
|
||||||
|
|
||||||
|
# Update the current model's parameter
|
||||||
|
param.data.copy_(avg_param)
|
||||||
|
|
||||||
|
|
@ -42,6 +42,8 @@ model:
|
||||||
cheb_order: 2
|
cheb_order: 2
|
||||||
use_day: True
|
use_day: True
|
||||||
use_week: True
|
use_week: True
|
||||||
|
use_minigraph: False
|
||||||
|
minigraph_size: 10
|
||||||
train:
|
train:
|
||||||
batch_or_epoch: 'epoch'
|
batch_or_epoch: 'epoch'
|
||||||
local_update_steps: 1
|
local_update_steps: 1
|
||||||
|
|
|
||||||
|
|
@ -44,6 +44,8 @@ model:
|
||||||
cheb_order: 2
|
cheb_order: 2
|
||||||
use_day: True
|
use_day: True
|
||||||
use_week: True
|
use_week: True
|
||||||
|
use_minigraph: False
|
||||||
|
minigraph_size: 10
|
||||||
train:
|
train:
|
||||||
batch_or_epoch: 'epoch'
|
batch_or_epoch: 'epoch'
|
||||||
local_update_steps: 1
|
local_update_steps: 1
|
||||||
|
|
|
||||||
|
|
@ -42,6 +42,8 @@ model:
|
||||||
cheb_order: 2
|
cheb_order: 2
|
||||||
use_day: True
|
use_day: True
|
||||||
use_week: True
|
use_week: True
|
||||||
|
use_minigraph: False
|
||||||
|
minigraph_size: 10
|
||||||
train:
|
train:
|
||||||
batch_or_epoch: 'epoch'
|
batch_or_epoch: 'epoch'
|
||||||
local_update_steps: 1
|
local_update_steps: 1
|
||||||
|
|
|
||||||
|
|
@ -42,6 +42,8 @@ model:
|
||||||
cheb_order: 2
|
cheb_order: 2
|
||||||
use_day: True
|
use_day: True
|
||||||
use_week: True
|
use_week: True
|
||||||
|
use_minigraph: False
|
||||||
|
minigraph_size: 10
|
||||||
train:
|
train:
|
||||||
batch_or_epoch: 'epoch'
|
batch_or_epoch: 'epoch'
|
||||||
local_update_steps: 1
|
local_update_steps: 1
|
||||||
|
|
@ -60,7 +62,7 @@ train:
|
||||||
grad_norm: True
|
grad_norm: True
|
||||||
real_value: True
|
real_value: True
|
||||||
criterion:
|
criterion:
|
||||||
type: L1loss
|
type: L1Loss
|
||||||
trainer:
|
trainer:
|
||||||
type: trafficflowtrainer
|
type: trafficflowtrainer
|
||||||
log_dir: ./
|
log_dir: ./
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue