import os import numpy as np import pandas as pd import torch def get_adj(args): dataset_path = './data' match args['num_nodes']: case 358: dataset_name = 'PEMS03' adj_path = os.path.join(dataset_path, dataset_name, 'PEMS03.csv') id = os.path.join(dataset_path, dataset_name, 'PEMS03.txt') A = get_adjacency_matrix(adj_path, args['num_nodes'], args['construct_type'], id_filename=id) case 307: dataset_name = 'PEMS04' adj_path = os.path.join(dataset_path, dataset_name, 'PEMS04.csv') A = get_adjacency_matrix(adj_path, args['num_nodes'], args['construct_type']) case 883: dataset_name = 'PEMS07' adj_path = os.path.join(dataset_path, dataset_name, 'PEMS07.csv') A = get_adjacency_matrix(adj_path, args['num_nodes'], args['construct_type']) case 170: dataset_name = 'PEMS08' adj_path = os.path.join(dataset_path, dataset_name, 'PEMS08.csv') A = get_adjacency_matrix(adj_path, args['num_nodes'], args['construct_type']) local_adj = construct_adj(A, args['strides']) local_adj = torch.FloatTensor(local_adj) return local_adj def get_adjacency_matrix(distance_df_filename, num_of_vertices, type_='connectivity', id_filename=None): """ :param distance_df_filename: str, csv边信息文件路径 :param num_of_vertices:int, 节点数量 :param type_:str, {connectivity, distance} :param id_filename:str 节点信息文件, 有的话需要构建字典 """ A = np.zeros((int(num_of_vertices), int(num_of_vertices)), dtype=np.float32) if id_filename: with open(id_filename, 'r') as f: id_dict = {int(i): idx for idx, i in enumerate(f.read().strip().split('\n'))} # 建立映射列表 df = pd.read_csv(distance_df_filename) for row in df.values: if len(row) != 3: continue i, j = int(row[0]), int(row[1]) A[id_dict[i], id_dict[j]] = 1 A[id_dict[j], id_dict[i]] = 1 return A df = pd.read_csv(distance_df_filename) for row in df.values: if len(row) != 3: continue i, j, distance = int(row[0]), int(row[1]), float(row[2]) if type_ == 'connectivity': A[i, j] = 1 A[j, i] = 1 elif type == 'distance': A[i, j] = 1 / distance A[j, i] = 1 / distance else: raise ValueError("type_ error, must be " "connectivity or distance!") return A def construct_adj(A, steps): """ 构建local 时空图 :param A: np.ndarray, adjacency matrix, shape is (N, N) :param steps: 选择几个时间步来构建图 :return: new adjacency matrix: csr_matrix, shape is (N * steps, N * steps) """ N = len(A) # 获得行数 adj = np.zeros((N * steps, N * steps)) for i in range(steps): """对角线代表各个时间步自己的空间图,也就是A""" adj[i * N: (i + 1) * N, i * N: (i + 1) * N] = A for i in range(N): for k in range(steps - 1): """每个节点只会连接相邻时间步的自己""" adj[k * N + i, (k + 1) * N + i] = 1 adj[(k + 1) * N + i, k * N + i] = 1 for i in range(len(adj)): """加入自回""" adj[i, i] = 1 return adj