diff --git a/doc/en/p2p-store.md b/doc/en/p2p-store.md index 934a70e..e29ef56 100644 --- a/doc/en/p2p-store.md +++ b/doc/en/p2p-store.md @@ -17,7 +17,6 @@ After compiling P2P Store successfully by following the compilation guide with ` ./p2p-store-example --cmd=trainer \ --metadata_server=10.0.0.1:2379 \ --local_server_name=10.0.0.2:12345 \ - --device_name=erdma_0 ``` 3. **Start the simulated inference node.** This node will pull data from the simulated training node or other simulated inference nodes. @@ -27,7 +26,6 @@ After compiling P2P Store successfully by following the compilation guide with ` ./p2p-store-example --cmd=inferencer \ --metadata_server=10.0.0.1:2379 \ --local_server_name=10.0.0.3:12346 \ - --device_name=erdma_1 ``` The test is completed with the display of "ALL DONE". @@ -37,12 +35,11 @@ In the above process, the simulated inference nodes search for data sources, whi Mooncake P2P Store currently implements the following interfaces in Golang: ```go -func NewP2PStore(metadataUri string, localSegmentName string, nicPriorityMatrix string) (*P2PStore, error) +func NewP2PStore(metadataUri string, localSegmentName string) (*P2PStore, error) ``` Creates an instance of `P2PStore`, which internally starts a Transfer Engine service. - `metadataUri`: The hostname or IP address of the metadata server/etcd service. - `localSegmentName`: The local server name (hostname/IP address:port), ensuring uniqueness within the cluster. -- `nicPriorityMatrix`: The network interface card priority order matrix, see the related description in the Transfer Engine API documentation (`TransferEngine::installTransport`). - Return value: If successful, returns a pointer to the `P2PStore` instance, otherwise returns `error`. ```go @@ -63,7 +60,7 @@ Registers a local file to the cluster, making it downloadable by other peers. En - `name`: The file registration name, ensuring uniqueness within the cluster. - `addrList` and `sizeList`: These two arrays represent the memory range of the file, with `addrList` indicating the starting address and `sizeList` indicating the corresponding length. The file content corresponds logically to the order in the arrays. - `maxShardSize`: The internal data sharding granularity, with a recommended value of 64MB. -- `location`: The device name corresponding to this memory segment, matching with `nicPriorityMatrix`. +- `location`: The device name corresponding to this memory segment. ```go func (store *P2PStore) Unregister(ctx context.Context, name string) error diff --git a/doc/en/transfer-engine.md b/doc/en/transfer-engine.md index 7b0f025..d3696af 100644 --- a/doc/en/transfer-engine.md +++ b/doc/en/transfer-engine.md @@ -61,8 +61,6 @@ For instance, as illustrated in figure above, to transfer data from buffer 0 (as To further maximize bandwidth utilization, if a single request's transfer is internally divided into multiple slices if its length exeeds 16KB. Each slice might use a different path, enabling collaborative work among all RDMA NICs. -If you do not want to manually configure the topology matrix, we also provide a function (`mooncake::discoverTopologyMatrix` in `topology.h`) to automatically discover the toplogy between CPU/CUDA and RDMA devices. Supports for more device types are working in progress. The automatic discovery mechanism might not always be accurate, and we welcome your feedbacks and improvement ideas! - ### Endpoint Management Mooncake Store employs a pair of end- points to represent the connection between a local RDMA @@ -140,6 +138,14 @@ After successfully compiling Transfer Engine, the test program `transfer_engine_ > Tip: Advanced users can also pass in a JSON file of the network card priority matrix through `--nic_priority_matrix`, for details, refer to the developer manual of Transfer Engine. - In network environments that only support TCP, the `--protocol=tcp` parameter can be used; in this case, there is no need to specify the `--device_name` parameter. + You can also specify `--auto_discovery` to enable discovery topology automatically, which generates a network card priority matrix based on the operating system configuration. Then, `--device_name` parameter is not required. + ``` + ./transfer_engine_bench --mode=target \ + --metadata_server=10.0.0.1:2379 \ + --local_server_name=10.0.0.2:12345 \ + --auto_discovery + ``` + 1. **Start the initiator node.** ```bash # This is 10.0.0.3 @@ -257,63 +263,11 @@ Recycles `BatchID`, and subsequent operations on `submitTransfer` and `getTransf - Return value: If successful, returns 0; otherwise, returns a negative value. ### Multi-Transport Management -The `TransferEngine` class internally manages multiple backend `Transport` classes, and users can load or unload `Transport` for different backends in `TransferEngine`. - -#### TransferEngine::installTransport -```cpp -Transport* installTransport(const std::string& proto, void** args); -``` - -Registers `Transport` in `TransferEngine`. If a `Transport` for a certain protocol already exists, it returns that `Transport`. -- `proto`: The name of the transport protocol used by `Transport`, currently supporting `tcp`, `rdma`, `nvmeof`. -- `args`: Additional parameters required for `Transport` initialization, presented as a variable-length array, with the last member being `nullptr`. -- Return value: If `proto` is within the determined range, returns the `Transport` corresponding to `proto`; otherwise, returns a null pointer. - -##### TCP Transfer Mode -For TCP transfer mode, there is no need to pass `args` objects when registering the `Transport` object. -```cpp -engine->installTransport("tcp", nullptr); -``` - -##### RDMA Transfer Mode -For RDMA transfer mode, the network card priority marrix must be specified through `args` during the registration of `Transport`. -```cpp -void** args = (void**) malloc(2 * sizeof(void*)); -args[0] = /* topology matrix */; -args[1] = nullptr; -engine->installTransport("rdma", args); -``` -The network card priority marrix is a JSON string indicating the storage medium name and the list of network cards to be used preferentially, as shown in the example below: -```json -{ - "cpu:0": [["mlx0", "mlx1"], ["mlx2", "mlx3"]], - "cuda:0": [["mlx1", "mlx0"]], - ... -} -``` -Each `key` represents the device name corresponding to a CPU socket or a GPU device. -Each `value` is a tuple of (`preferred_nic_list`, `accessable_nic_list`), each of which is a list of NIC names. -- `preferred_nic_list` indicates the preferred NICs, such as NICs directly connected to the CPU rather than across NUMA, or NICs under the same PCIe Switch for GPUs. -- `accessable_nic_list` indicates NICs that are not preferred but can theoretically connect, used for fault retry scenarios. - -##### NVMeOF Transfer Mode -For NVMeOF transfer mode, the file path must be specified through `args` during the registration of `Transport`. -```cpp -void** args = (void**) malloc(2 * sizeof(void*)); -args[0] = /* topology matrix */; -args[1] = nullptr; -engine->installTransport("nvmeof", args); -``` - -#### TransferEngine::uninstallTransport -```cpp -int uninstallTransport(const std::string& proto); -``` - -Unloads `Transport` from `TransferEngine`. -- `proto`: The name of the transport protocol used by `Transport`, currently supporting `rdma`, `nvmeof`. -- Return value: If successful, returns 0; otherwise, returns a negative value. +The `TransferEngine` class internally manages multiple backend `Transport` classes. +And it will discover the toplogy between CPU/CUDA and RDMA devices automatically +(more device types are working in progress, feedbacks are welcome when the automatic discovery mechanism is not accurate), +and it will install `Transport` automatically based on the topology. ### Space Registration diff --git a/doc/en/vllm-integration-v0.2.md b/doc/en/vllm-integration-v0.2.md index 5e99e64..27cd966 100644 --- a/doc/en/vllm-integration-v0.2.md +++ b/doc/en/vllm-integration-v0.2.md @@ -25,7 +25,6 @@ pip3 install -e . - If you encounter any problems that you cannot solve, please refer to the [vLLM official compilation guide](https://docs.vllm.ai/en/v0.6.4.post1/getting_started/installation.html#install-the-latest-code). ## Configuration -### Prepare configuration file to Run Example over RDMA - Prepare a _**mooncake.json**_ file for both Prefill and Decode instances - **You don't need to change the `prefill_url` and `decode_url` of the config file in the decode side, please use the identical config file.** @@ -35,11 +34,10 @@ pip3 install -e . "prefill_url": "192.168.0.137:13003", "decode_url": "192.168.0.139:13003", "metadata_server": "192.168.0.139:2379", - "metadata_backend": "etcd", - "protocol": "rdma", - "device_name": "erdma_0" + "metadata_backend": "etcd" } ``` + - "prefill_url": The IP address and port of the Prefill node. - The port in the URL is used to communicate with etcd server for metadata. - "decode_url": The IP address and port of the Decode node. @@ -50,23 +48,6 @@ pip3 install -e . - Use `redis` as backend: `"redis://192.168.0.137:6379"` - Use `http` as backend: `"http://192.168.0.137:8080/metadata"` - "metadata_backend": Currently we support "etcd", "redis", and "http" backends. If this parameter is absent and the `metadata_server` is not a complete URL with the backend prefix, the mooncake transfer engine will use "etcd" automatically. Please note that this parameter will be deprecated in the next version, we recommend you provide a complete URL for `metadata_server` with the prefix of the target backend. -- "protocol": The protocol to be used for data transmission. ("rdma/tcp") -- "device_name": The device to be used for data transmission, it is required when "protocol" is set to "rdma". If multiple NIC devices are used, they can be separated by commas such as "erdma_0,erdma_1". Please note that there are no spaces between them. - -### Prepare configuration file to Run Example over TCP - -- Prepare a _**mooncake.json**_ file for both Prefill and Decode instances -```json -{ - "prefill_url": "192.168.0.137:13003", - "decode_url": "192.168.0.139:13003", - "metadata_server": "192.168.0.139:2379", - "metadata_backend": "etcd", - "protocol": "tcp", - "device_name": "" -} -``` - ## Run Example - Please change the IP addresses and ports in the following guide according to your env. diff --git a/doc/en/vllm-integration.md b/doc/en/vllm-integration.md index 0e76e20..a88c320 100644 --- a/doc/en/vllm-integration.md +++ b/doc/en/vllm-integration.md @@ -37,7 +37,6 @@ pip3 install -e . - If you encounter any problems that you cannot solve, please refer to the [vLLM official compilation guide](https://docs.vllm.ai/en/v0.6.4.post1/getting_started/installation.html#install-the-latest-code). ## Configuration -### Prepare configuration file to Run Example over RDMA - Prepare a _**mooncake.json**_ file for both Prefill and Decode instances - **You don't need to change the `prefill_url` and `decode_url` of the config file in the decode side, please use the identical config file.** @@ -46,9 +45,7 @@ pip3 install -e . { "prefill_url": "192.168.0.137:13003", "decode_url": "192.168.0.139:13003", - "metadata_server": "192.168.0.139:2379", - "protocol": "rdma", - "device_name": "erdma_0" + "metadata_server": "192.168.0.139:2379" } ``` - "prefill_url": The IP address and port of the Prefill node. @@ -56,23 +53,6 @@ pip3 install -e . - "decode_url": The IP address and port of the Decode node. - The port in the URL is used to communicate with etcd server for metadata. - "metadata_server": The etcd server of mooncake transfer engine. -- "protocol": The protocol to be used for data transmission. ("rdma/tcp") -- "device_name": The device to be used for data transmission, required when "protocol" is set to "rdma". If multiple NIC devices are used, they can be separated by commas such as "erdma_0,erdma_1". Please note that there are no spaces between them. - - -### Prepare configuration file to Run Example over TCP - -- Prepare a _**mooncake.json**_ file for both Prefill and Decode instances -```json -{ - "prefill_url": "192.168.0.137:13003", - "decode_url": "192.168.0.139:13003", - "metadata_server": "192.168.0.139:2379", - "protocol": "tcp", - "device_name": "" -} -``` - ## Run Example - Please change the IP addresses and ports in the following guide according to your env. diff --git a/doc/zh/p2p-store.md b/doc/zh/p2p-store.md index ab0029d..1c91ca4 100644 --- a/doc/zh/p2p-store.md +++ b/doc/zh/p2p-store.md @@ -17,7 +17,6 @@ P2P Store 提供的是类似 Register 和 GetReplica 的接口。Register 相当 ./p2p-store-example --cmd=trainer \ --metadata_server=10.0.0.1:2379 \ --local_server_name=10.0.0.2:12345 \ - --device_name=erdma_0 ``` 3. **启动模拟推理节点。** 该节点会从模拟训练节点或其他模拟推理节点拉取数据。 @@ -27,7 +26,6 @@ P2P Store 提供的是类似 Register 和 GetReplica 的接口。Register 相当 ./p2p-store-example --cmd=inferencer \ --metadata_server=10.0.0.1:2379 \ --local_server_name=10.0.0.3:12346 \ - --device_name=erdma_1 ``` 测试完毕显示“ALL DONE”。 @@ -48,12 +46,11 @@ P2P Store 基于 [Transfer Engine](transfer-engine.md) 构建,支持在集群 Mooncake P2P Store 目前基于 Golang 实现了下列接口: ```go -func NewP2PStore(metadataUri string, localSegmentName string, nicPriorityMatrix string) (*P2PStore, error) +func NewP2PStore(metadataUri string, localSegmentName string) (*P2PStore, error) ``` 创建 P2PStore 实例,该实例内部会启动一个 Transfer Engine 服务。 - `metadataUri`:元数据服务器/etcd服务所在主机名或 IP 地址。 - `localSegmentName`:本地的服务器名称(主机名/IP地址:端口号),保证在集群内唯一。 -- `nicPriorityMatrix`:网卡优先级顺序表,参见位于 Transfer Engine API 文档的相关描述(`TransferEngine::installTransport`)。 - 返回值:若成功则返回 `P2PStore` 实例指针,否则返回 `error`。 ```go diff --git a/doc/zh/run-examples.md b/doc/zh/run-examples.md index 639df39..21f3190 100644 --- a/doc/zh/run-examples.md +++ b/doc/zh/run-examples.md @@ -106,6 +106,14 @@ Mooncake 支持在执行 `cmake` 命令期间添加下列高级编译选项: > 提示:高级用户还可通过 `--nic_priority_matrix` 传入网卡优先级矩阵 JSON 文件,详细参考 Transfer Engine 的开发者手册。 - 在仅支持 TCP 的网络环境中,可使用 `--protocol=tcp` 参数,此时不需要指定 `--device_name` 参数。 + 也可通过拓扑自动发现功能基于操作系统配置自动生成网卡优先级矩阵,此时不需要指定传输过程使用的 RDMA 网卡名称。 + ``` + ./transfer_engine_bench --mode=target \ + --metadata_server=10.0.0.1:2379 \ + --local_server_name=10.0.0.2:12345 \ + --auto_discovery + ``` + 1. **启动发起节点。** ```bash # This is 10.0.0.3 diff --git a/doc/zh/transfer-engine.md b/doc/zh/transfer-engine.md index 19e404b..54edc74 100644 --- a/doc/zh/transfer-engine.md +++ b/doc/zh/transfer-engine.md @@ -52,8 +52,6 @@ BatchTransfer API 使用请求(Request)对象数组传入用户请求,需 为了进一步最大化带宽利用率,如果单个请求的传输长度超过16KB,则其内部被划分为多个切片。每个切片可能使用不同的路径,使所有RDMA NIC能够协同工作。 -如果不想手动配置拓扑矩阵,也可以直接调用`mooncake::discoverTopologyMatrix`(位于`topology.h`)来自动生成拓扑矩阵。该函数能够自动探查CPU/CUDA和RDMA网卡之间的拓扑关系。对于更多设备种类的支持正在开发中。目前,拓扑自动发现机制可能无法给出准确的硬件拓扑,我们欢迎您的反馈和改进建议! - ### 端点管理 Transfer Engine 使用一对端点来表示本地RDMA NIC和远程RDMA NIC之间的连接。实际上,每个端点包括一个或多个RDMA QP对象。 Transfer Engine 中的连接是按需建立的;端点在第一次请求之前保持未配对状态。 @@ -113,6 +111,14 @@ Transfer Engine 使用SIEVE算法来管理端点的逐出。如果由于链路 > 提示:高级用户还可通过 `--nic_priority_matrix` 传入网卡优先级矩阵 JSON 文件,详细参考 [Transfer Engine 的开发者手册](#transferengineinstallorgettransport)。 - 在仅支持 TCP 的网络环境中,可使用 `--protocol=tcp` 参数,此时不需要指定 `--device_name` 参数。 + 也可通过拓扑自动发现功能基于操作系统配置自动生成网卡优先级矩阵,此时不需要指定传输过程使用的 RDMA 网卡名称。 + ``` + ./transfer_engine_bench --mode=target \ + --metadata_server=10.0.0.1:2379 \ + --local_server_name=10.0.0.2:12345 \ + --auto_discovery + ``` + 1. **启动发起节点。** ```bash # This is 10.0.0.3 @@ -230,60 +236,8 @@ int freeBatchID(BatchID batch_id); - 返回值:若成功,返回 0;否则返回负数值。 ### 多 Transport 管理 -`TransferEngine` 类内部管理多后端的 `Transport` 类,用户可向 `TransferEngine` 中装载或卸载对不同后端进行传输的 `Transport`。 - -#### TransferEngine::installTransport -```cpp -Transport* installTransport(const std::string& proto, void** args); -``` -在 `TransferEngine` 中注册 `Transport`。如果某个协议对应的 `Transport` 已存在,则返回该 `Transport`。 - -- `proto`: `Transport` 使用的传输协议名称,目前支持 `tcp`, `rdma`, `nvmeof`。 -- `args`:以变长数组形式呈现的 `Transport` 初始化需要的其他参数,数组内最后一个成员应当是 `nullptr`。 -- 返回值:若 `proto` 在确定范围内,返回对应 `proto` 的 `Transport`;否则返回空指针。 -**TCP 传输模式:** -对于 TCP 传输模式,注册 `Transport` 期间不需要传入 `args` 对象。 -```cpp -engine->installTransport("tcp", nullptr); -``` - -**RDMA 传输模式:** -对于 RDMA 传输模式,注册 `Transport` 期间需通过 `args` 指定网卡优先级顺序。 -```cpp -void** args = (void**) malloc(2 * sizeof(void*)); -args[0] = /* topology matrix */; -args[1] = nullptr; -engine->installTransport("rdma", args); -``` -网卡优先级顺序是一个 JSON 字符串,表示使用的存储介质名称及优先使用的网卡列表,样例如下: -```json -{ - "cpu:0": [["mlx0", "mlx1"], ["mlx2", "mlx3"]], - "cuda:0": [["mlx1", "mlx0"]], - ... -} -``` -其中每个 `key` 代表一个 CPU socket 或者一个 GPU device 对应的设备名称 -每个 `value` 为一个 (`preferred_nic_list`, `accessable_nic_list`) 的二元组,每一项都是一个 NIC 名称的列表(list)。 -- `preferred_nic_list` 表示优先选择的 NIC,比如对于 CPU 可以是当前直连而非跨 NUMA 的 NIC,对于 GPU 可以是挂在同一个 PCIe Switch 下的 NIC; -- `accessable_nic_list` 表示虽然不优选但是理论上可以连接上的 NIC,用于故障重试场景。 - -**NVMeOF 传输模式:** 对于 NVMeOF 传输模式,注册 `Transport` 期间需通过 `args` 指定文件路径。 -```cpp -void** args = (void**) malloc(2 * sizeof(void*)); -args[0] = /* topology matrix */; -args[1] = nullptr; -engine->installTransport("nvmeof", args); -``` - -#### TransferEngine::uinstallTransport -```cpp -int uninstallTransport(const std::string& proto); -``` -从 `TransferEngine` 中卸载 `Transport`。 -- `proto`: `Transport` 使用的传输协议名称,目前支持 `rdma`, `nvmeof`。 -- 返回值:若成功,返回 0;否则返回负数值。 +`TransferEngine` 类内部管理多后端的 `Transport` 类,并且会自动探查 CPU/CUDA 和 RDMA 网卡之间的拓扑关系(更多设备种类的支持正在开发中,如无法给出准确的硬件拓扑,欢迎您的反馈和改进建议),以及自动安装合适的 `Transport`。 ### 空间注册 diff --git a/doc/zh/vllm-integration.md b/doc/zh/vllm-integration.md index 5cf91c4..0cd973f 100644 --- a/doc/zh/vllm-integration.md +++ b/doc/zh/vllm-integration.md @@ -37,7 +37,6 @@ pip3 install -e . - 如果遇到任何无法解决的问题,请参照[vLLM官方的编译指南](https://docs.vllm.ai/en/v0.6.4.post1/getting_started/installation.html#install-the-latest-code)。 ## 配置 -### 使用 RDMA 运行示例所需配置文件 - 为预填充和解码实例准备一个 mooncake.json 文件 - **在解码实例侧,你无须更改配置文件里的`prefill_url` 与 `decode_url`,使用完同相同的配置文件即可。** @@ -46,9 +45,7 @@ pip3 install -e . { "prefill_url": "192.168.0.137:13003", "decode_url": "192.168.0.139:13003", - "metadata_server": "192.168.0.139:2379", - "protocol": "rdma", - "device_name": "erdma_0" + "metadata_server": "192.168.0.139:2379" } ``` - "prefill_url": 预填充节点的 IP 地址和端口。 @@ -56,23 +53,6 @@ pip3 install -e . - "decode_url": 解码节点的 IP 地址和端口。 - URL 中的端口用于与 etcd 服务器通信以获取元数据。 - "metadata_server": mooncake 传输引擎的 etcd 服务器。 -- "protocol": 数据传输协议("rdma/tcp")。 -- "device_name": 用于数据传输的设备,当 "protocol" 设置为 "rdma" 时必填。如果使用多个 NIC 设备,它们可以用逗号分隔,如 "erdma_0,erdma_1"。请注意它们之间没有空格。 - - -### 使用 TCP 运行示例所需配置文件 - -- 为预填充和解码实例准备一个 mooncake.json 文件 -```json -{ - "prefill_url": "192.168.0.137:13003", - "decode_url": "192.168.0.139:13003", - "metadata_server": "192.168.0.139:2379", - "protocol": "tcp", - "device_name": "" -} -``` - ## 运行示例 - 请根据您的环境更改以下指南中的 IP 地址和端口。 diff --git a/mooncake-integration/vllm/vllm_adaptor.cpp b/mooncake-integration/vllm/vllm_adaptor.cpp index 37a5bf2..8ceacb7 100644 --- a/mooncake-integration/vllm/vllm_adaptor.cpp +++ b/mooncake-integration/vllm/vllm_adaptor.cpp @@ -28,24 +28,6 @@ VLLMAdaptor::~VLLMAdaptor() { large_buffer_list_.clear(); } -std::string formatDeviceNames(const std::string &device_names) { - std::stringstream ss(device_names); - std::string item; - std::vector tokens; - while (getline(ss, item, ',')) { - tokens.push_back(item); - } - - std::string formatted; - for (size_t i = 0; i < tokens.size(); ++i) { - formatted += "\"" + tokens[i] + "\""; - if (i < tokens.size() - 1) { - formatted += ","; - } - } - return formatted; -} - std::pair parseConnectionString( const std::string &conn_string) { std::pair result; @@ -66,16 +48,14 @@ std::pair parseConnectionString( } int VLLMAdaptor::initialize(const char *local_hostname, - const char *metadata_server, const char *protocol, - const char *device_name) { + const char *metadata_server) { auto conn_string = parseConnectionString(metadata_server); - return initializeExt(local_hostname, conn_string.second.c_str(), protocol, - device_name, conn_string.first.c_str()); + return initializeExt(local_hostname, conn_string.second.c_str(), + conn_string.first.c_str()); } int VLLMAdaptor::initializeExt(const char *local_hostname, const char *metadata_server, - const char *protocol, const char *device_name, const char *metadata_type) { std::string conn_string = metadata_server; if (conn_string.find("://") == std::string::npos) @@ -88,23 +68,6 @@ int VLLMAdaptor::initializeExt(const char *local_hostname, hostname_port.first.c_str(), hostname_port.second); if (ret) return -1; - xport_ = nullptr; - if (strcmp(protocol, "rdma") == 0) { - auto device_names = formatDeviceNames(device_name); - std::string nic_priority_matrix = - "{\"cpu:0\": [[" + device_names + "], []]}"; - void **args = (void **)malloc(2 * sizeof(void *)); - args[0] = (void *)nic_priority_matrix.c_str(); - args[1] = nullptr; - xport_ = engine_->installTransport("rdma", args); - } else if (strcmp(protocol, "tcp") == 0) { - xport_ = engine_->installTransport("tcp", nullptr); - } else { - LOG(ERROR) << "Unsupported protocol"; - return -1; - } - - if (!xport_) return -1; free_list_.resize(kSlabSizeKBTabLen); doBuddyAllocate(kMaxClassId); return 0; diff --git a/mooncake-integration/vllm/vllm_adaptor.h b/mooncake-integration/vllm/vllm_adaptor.h index 119659d..4d6b796 100644 --- a/mooncake-integration/vllm/vllm_adaptor.h +++ b/mooncake-integration/vllm/vllm_adaptor.h @@ -16,13 +16,13 @@ #include #include #include -#include -#include #include #include #include #include +#include +#include #include "transfer_engine.h" #include "transport/rdma_transport/rdma_transport.h" @@ -44,11 +44,9 @@ class VLLMAdaptor { ~VLLMAdaptor(); - int initialize(const char *local_hostname, const char *metadata_server, - const char *protocol, const char *device_name); - + int initialize(const char *local_hostname, const char *metadata_server); + int initializeExt(const char *local_hostname, const char *metadata_server, - const char *protocol, const char *device_name, const char *metadata_type); uintptr_t allocateManagedBuffer(size_t length); @@ -74,7 +72,8 @@ class VLLMAdaptor { // FOR EXPERIMENT ONLY int expRegisterMemory(uintptr_t buffer_addr, size_t capacity); - int expUnregisterMemory(uintptr_t buffer_addr); // must be called before VLLMAdaptor::~VLLMAdaptor() + // must be called before VLLMAdaptor::~VLLMAdaptor() + int expUnregisterMemory(uintptr_t buffer_addr); private: char *allocateRawBuffer(size_t capacity); @@ -85,7 +84,6 @@ class VLLMAdaptor { private: std::shared_ptr engine_; - Transport *xport_; std::mutex mutex_; std::vector> free_list_; diff --git a/mooncake-p2p-store/src/example/p2p-store-example.go b/mooncake-p2p-store/src/example/p2p-store-example.go index eeb5cdb..d7953ff 100644 --- a/mooncake-p2p-store/src/example/p2p-store-example.go +++ b/mooncake-p2p-store/src/example/p2p-store-example.go @@ -18,7 +18,6 @@ import ( "context" "flag" "fmt" - "io/ioutil" "os" "syscall" "time" @@ -28,20 +27,16 @@ import ( ) var ( - command string - metadataServer string - localServerName string - deviceName string - nicPriorityMatrixPath string - fileSize int + command string + metadataServer string + localServerName string + fileSize int ) func main() { flag.StringVar(&command, "cmd", "trainer", "Command: trainer|inferencer") flag.StringVar(&metadataServer, "metadata_server", "localhost:2379", "Metadata server address") flag.StringVar(&localServerName, "local_server_name", "", "Local server name") - flag.StringVar(&deviceName, "device_name", "mlx5_2", "RNIC device name") - flag.StringVar(&nicPriorityMatrixPath, "nic_priority_matrix", "", "Path to NIC priority matrix file (Advanced)") flag.IntVar(&fileSize, "file_size_mb", 2048, "File size in MB") flag.Parse() @@ -112,7 +107,7 @@ func trainer() { ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Second) defer cancel() - store, err := p2pstore.NewP2PStore(metadataServer, localServerName, getPriorityMatrix()) + store, err := p2pstore.NewP2PStore(metadataServer, localServerName) if err != nil { fmt.Fprintf(os.Stderr, "Error creating checkpoint engine: %v\n", err) os.Exit(1) @@ -129,19 +124,6 @@ func trainer() { fmt.Println("ALL DONE") } -func getPriorityMatrix() string { - if len(nicPriorityMatrixPath) != 0 { - data, err := ioutil.ReadFile(nicPriorityMatrixPath) - if err != nil { - fmt.Println("Error reading file:", err) - os.Exit(1) - } - return string(data) - } else { - return "{ \"cpu:0\": [[\"" + deviceName + "\"], []]}" - } -} - func doInferencer(ctx context.Context, store *p2pstore.P2PStore, name string) { addr, err := syscall.Mmap(-1, 0, fileSize, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE) if err != nil { @@ -178,7 +160,7 @@ func inferencer() { ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Second) defer cancel() - store, err := p2pstore.NewP2PStore(metadataServer, localServerName, getPriorityMatrix()) + store, err := p2pstore.NewP2PStore(metadataServer, localServerName) if err != nil { fmt.Fprintf(os.Stderr, "Error creating checkpoint engine: %v\n", err) os.Exit(1) diff --git a/mooncake-p2p-store/src/p2pstore/core.go b/mooncake-p2p-store/src/p2pstore/core.go index 5a759ed..005ccf0 100644 --- a/mooncake-p2p-store/src/p2pstore/core.go +++ b/mooncake-p2p-store/src/p2pstore/core.go @@ -37,13 +37,13 @@ type P2PStore struct { transfer *TransferEngine } -func NewP2PStore(metadataUri string, localSegmentName string, nicPriorityMatrix string) (*P2PStore, error) { +func NewP2PStore(metadataUri string, localSegmentName string) (*P2PStore, error) { metadata, err := NewMetadata(metadataUri, METADATA_KEY_PREFIX) if err != nil { return nil, err } - transferEngine, err := NewTransferEngine(metadataUri, localSegmentName, nicPriorityMatrix) + transferEngine, err := NewTransferEngine(metadataUri, localSegmentName) if err != nil { innerErr := metadata.Close() if innerErr != nil { diff --git a/mooncake-p2p-store/src/p2pstore/transfer_engine.go b/mooncake-p2p-store/src/p2pstore/transfer_engine.go index 94ee61e..c1f67a4 100644 --- a/mooncake-p2p-store/src/p2pstore/transfer_engine.go +++ b/mooncake-p2p-store/src/p2pstore/transfer_engine.go @@ -22,8 +22,11 @@ package p2pstore * All the C functions used here follow this convention. */ -//#cgo LDFLAGS: -L../../../build/mooncake-transfer-engine/src -L../../../thirdparties/lib -ltransfer_engine -lstdc++ -lnuma -lglog -libverbs -ljsoncpp -letcd-cpp-api -lprotobuf -lgrpc++ -lgrpc -//#include "../../../mooncake-transfer-engine/include/transfer_engine_c.h" +/* +#cgo LDFLAGS: -L../../../build/mooncake-transfer-engine/src -L../../../thirdparties/lib -ltransfer_engine -lstdc++ -lnuma -lglog -libverbs -ljsoncpp -letcd-cpp-api -lprotobuf -lgrpc++ -lgrpc +#include "../../../mooncake-transfer-engine/include/transfer_engine_c.h" +#include +*/ import "C" import ( @@ -36,7 +39,6 @@ type BatchID int64 type TransferEngine struct { engine C.transfer_engine_t - xport C.transport_t } func parseServerName(serverName string) (host string, port int) { @@ -53,49 +55,28 @@ func parseServerName(serverName string) (host string, port int) { return host, port } -const ( - rdmaCStr = C.CString("rdma") -) - -func NewTransferEngine(metadata_uri string, local_server_name string, nic_priority_matrix string) (*TransferEngine, error) { +func NewTransferEngine(metadata_uri string, local_server_name string) (*TransferEngine, error) { // For simplifiy, local_server_name must be a valid IP address or hostname connectable_name, rpc_port := parseServerName(local_server_name) metadataUri := C.CString(metadata_uri) localServerName := C.CString(local_server_name) connectableName := C.CString(connectable_name) - nicPriorityMatrix := C.CString(nic_priority_matrix) defer C.free(unsafe.Pointer(metadataUri)) defer C.free(unsafe.Pointer(localServerName)) defer C.free(unsafe.Pointer(connectableName)) - defer C.free(unsafe.Pointer(nicPriorityMatrix)) native_engine := C.createTransferEngine(metadataUri, localServerName, connectableName, C.uint64_t(rpc_port)) if native_engine == nil { return nil, ErrTransferEngine } - var args [2]unsafe.Pointer - args[0] = unsafe.Pointer(nicPriorityMatrix) - args[1] = nil - xport := C.installTransport(native_engine, rdmaCStr, &args[0]) - if xport == nil { - C.destroyTransferEngine(native_engine) - return nil, ErrTransferEngine - } - return &TransferEngine{ engine: native_engine, - xport: xport, }, nil } func (engine *TransferEngine) Close() error { - ret := C.uninstallTransport(engine.engine, rdmaCStr) - if ret < 0 { - return ErrTransferEngine - } - C.destroyTransferEngine(engine.engine) return nil } diff --git a/mooncake-transfer-engine/example/memory_pool.cpp b/mooncake-transfer-engine/example/memory_pool.cpp index 2a39ea6..43b3485 100644 --- a/mooncake-transfer-engine/example/memory_pool.cpp +++ b/mooncake-transfer-engine/example/memory_pool.cpp @@ -76,7 +76,8 @@ int target() { auto nic_priority_matrix = loadNicPriorityMatrix(); const size_t dram_buffer_size = 1ull << 30; - auto engine = std::make_unique(); + // disable topology auto discovery for testing. + auto engine = std::make_unique(false); void **args = (void **)malloc(2 * sizeof(void *)); args[0] = (void *)nic_priority_matrix.c_str(); diff --git a/mooncake-transfer-engine/example/transfer_engine_bench.cpp b/mooncake-transfer-engine/example/transfer_engine_bench.cpp index e633c64..fb7c8bc 100644 --- a/mooncake-transfer-engine/example/transfer_engine_bench.cpp +++ b/mooncake-transfer-engine/example/transfer_engine_bench.cpp @@ -41,7 +41,8 @@ static void checkCudaError(cudaError_t result, const char *message) { #endif -#define NR_SOCKETS (2) +const static int NR_SOCKETS = + numa_available() ? numa_num_configured_nodes() : 1; static std::string getHostname(); @@ -66,6 +67,7 @@ DEFINE_int32(batch_size, 128, "Batch size"); DEFINE_uint64(block_size, 4096, "Block size for each transfer request"); DEFINE_int32(duration, 10, "Test duration in seconds"); DEFINE_int32(threads, 4, "Task submission threads"); +DEFINE_bool(auto_discovery, false, "Enable auto discovery"); #ifdef USE_CUDA DEFINE_bool(use_vram, true, "Allocate memory from GPU VRAM"); @@ -227,27 +229,29 @@ std::string loadNicPriorityMatrix() { } int initiator() { - auto engine = std::make_unique(); + // disable topology auto discovery for testing. + auto engine = std::make_unique(FLAGS_auto_discovery); auto hostname_port = parseHostNameWithPort(FLAGS_local_server_name); engine->init(FLAGS_metadata_server, FLAGS_local_server_name.c_str(), hostname_port.first.c_str(), hostname_port.second); - Transport *xport = nullptr; - if (FLAGS_protocol == "rdma") { - auto nic_priority_matrix = loadNicPriorityMatrix(); - void **args = (void **)malloc(2 * sizeof(void *)); - args[0] = (void *)nic_priority_matrix.c_str(); - args[1] = nullptr; - xport = engine->installTransport("rdma", args); - } else if (FLAGS_protocol == "tcp") { - xport = engine->installTransport("tcp", nullptr); - } else { - LOG(ERROR) << "Unsupported protocol"; + if (!FLAGS_auto_discovery) { + Transport *xport = nullptr; + if (FLAGS_protocol == "rdma") { + auto nic_priority_matrix = loadNicPriorityMatrix(); + void **args = (void **)malloc(2 * sizeof(void *)); + args[0] = (void *)nic_priority_matrix.c_str(); + args[1] = nullptr; + xport = engine->installTransport("rdma", args); + } else if (FLAGS_protocol == "tcp") { + xport = engine->installTransport("tcp", nullptr); + } else { + LOG(ERROR) << "Unsupported protocol"; + } + LOG_ASSERT(xport); } - LOG_ASSERT(xport); - void *addr[NR_SOCKETS] = {nullptr}; int buffer_num = NR_SOCKETS; @@ -306,22 +310,25 @@ int initiator() { } int target() { - auto engine = std::make_unique(); + // disable topology auto discovery for testing. + auto engine = std::make_unique(FLAGS_auto_discovery); auto hostname_port = parseHostNameWithPort(FLAGS_local_server_name); engine->init(FLAGS_metadata_server, FLAGS_local_server_name.c_str(), hostname_port.first.c_str(), hostname_port.second); - if (FLAGS_protocol == "rdma") { - auto nic_priority_matrix = loadNicPriorityMatrix(); - void **args = (void **)malloc(2 * sizeof(void *)); - args[0] = (void *)nic_priority_matrix.c_str(); - args[1] = nullptr; - engine->installTransport("rdma", args); - } else if (FLAGS_protocol == "tcp") { - engine->installTransport("tcp", nullptr); - } else { - LOG(ERROR) << "Unsupported protocol"; + if (!FLAGS_auto_discovery) { + if (FLAGS_protocol == "rdma") { + auto nic_priority_matrix = loadNicPriorityMatrix(); + void **args = (void **)malloc(2 * sizeof(void *)); + args[0] = (void *)nic_priority_matrix.c_str(); + args[1] = nullptr; + engine->installTransport("rdma", args); + } else if (FLAGS_protocol == "tcp") { + engine->installTransport("tcp", nullptr); + } else { + LOG(ERROR) << "Unsupported protocol"; + } } void *addr[NR_SOCKETS] = {nullptr}; @@ -364,4 +371,4 @@ int main(int argc, char **argv) { LOG(ERROR) << "Unsupported mode: must be 'initiator' or 'target'"; exit(EXIT_FAILURE); -} \ No newline at end of file +} diff --git a/mooncake-transfer-engine/include/multi_transport.h b/mooncake-transfer-engine/include/multi_transport.h index aef3b69..cdbd37f 100644 --- a/mooncake-transfer-engine/include/multi_transport.h +++ b/mooncake-transfer-engine/include/multi_transport.h @@ -44,7 +44,8 @@ class MultiTransport { int getTransferStatus(BatchID batch_id, size_t task_id, TransferStatus &status); - Transport *installTransport(const std::string &proto, void **args); + Transport *installTransport(const std::string &proto, + std::shared_ptr topo); Transport *getTransport(const std::string &proto); diff --git a/mooncake-transfer-engine/include/transfer_engine.h b/mooncake-transfer-engine/include/transfer_engine.h index 4abfdd9..8f3c224 100644 --- a/mooncake-transfer-engine/include/transfer_engine.h +++ b/mooncake-transfer-engine/include/transfer_engine.h @@ -43,17 +43,20 @@ using BufferEntry = Transport::BufferEntry; class TransferEngine { public: - TransferEngine() : metadata_(nullptr) {} + TransferEngine(bool auto_discover = true) + : metadata_(nullptr), + local_topology_(std::make_shared()), + auto_discover_(auto_discover) {} ~TransferEngine() { freeEngine(); } int init(const std::string &metadata_conn_string, const std::string &local_server_name, - const std::string &ip_or_host_name, - uint64_t rpc_port = 12345); + const std::string &ip_or_host_name, uint64_t rpc_port = 12345); int freeEngine(); + // Only for testing. Transport *installTransport(const std::string &proto, void **args); int uninstallTransport(const std::string &proto); @@ -110,6 +113,10 @@ class TransferEngine { std::string local_server_name_; std::shared_ptr multi_transports_; std::vector local_memory_regions_; + std::shared_ptr local_topology_; + // Discover topology and install transports automatically when it's true. + // Set it to false only for testing. + bool auto_discover_; }; } // namespace mooncake diff --git a/mooncake-transfer-engine/include/transfer_engine_c.h b/mooncake-transfer-engine/include/transfer_engine_c.h index bd3a942..9396855 100644 --- a/mooncake-transfer-engine/include/transfer_engine_c.h +++ b/mooncake-transfer-engine/include/transfer_engine_c.h @@ -83,7 +83,6 @@ typedef struct buffer_entry buffer_entry_t; typedef struct segment_desc segment_desc_t; typedef void *transfer_engine_t; -typedef void *transport_t; /* * All memory pointed to by the "char *" parameters will not be used @@ -91,18 +90,13 @@ typedef void *transport_t; * This means that the caller can free the memory pointed to by "char *" * parameters, after the call is completed. * All the C functions here follow this convention. -*/ + */ transfer_engine_t createTransferEngine(const char *metadata_conn_string, const char *local_server_name, const char *ip_or_host_name, uint64_t rpc_port); -transport_t installTransport(transfer_engine_t engine, const char *proto, - void **args); - -int uninstallTransport(transfer_engine_t engine, const char *proto); - segment_id_t openSegment(transfer_engine_t engine, const char *segment_name); int closeSegment(transfer_engine_t engine, segment_id_t segment_id); diff --git a/mooncake-transfer-engine/include/transport/cxl_transport/cxl_transport.h b/mooncake-transfer-engine/include/transport/cxl_transport/cxl_transport.h index 4cc6f09..efc1a57 100644 --- a/mooncake-transfer-engine/include/transport/cxl_transport/cxl_transport.h +++ b/mooncake-transfer-engine/include/transport/cxl_transport/cxl_transport.h @@ -56,10 +56,11 @@ class CxlTransport : public Transport { private: int install(std::string &local_server_name, - std::shared_ptr meta, void **args) override; + std::shared_ptr meta, + std::shared_ptr topo) override; - int registerLocalMemory(void *addr, size_t length, const std::string &location, - bool remote_accessible, + int registerLocalMemory(void *addr, size_t length, + const std::string &location, bool remote_accessible, bool update_metadata) override; int unregisterLocalMemory(void *addr, diff --git a/mooncake-transfer-engine/include/transport/nvmeof_transport/nvmeof_transport.h b/mooncake-transfer-engine/include/transport/nvmeof_transport/nvmeof_transport.h index bcaeab7..1db3deb 100644 --- a/mooncake-transfer-engine/include/transport/nvmeof_transport/nvmeof_transport.h +++ b/mooncake-transfer-engine/include/transport/nvmeof_transport/nvmeof_transport.h @@ -64,10 +64,11 @@ class NVMeoFTransport : public Transport { }; int install(std::string &local_server_name, - std::shared_ptr meta, void **args) override; + std::shared_ptr meta, + std::shared_ptr topo) override; - int registerLocalMemory(void *addr, size_t length, const std::string &location, - bool remote_accessible, + int registerLocalMemory(void *addr, size_t length, + const std::string &location, bool remote_accessible, bool update_metadata) override; int unregisterLocalMemory(void *addr, diff --git a/mooncake-transfer-engine/include/transport/rdma_transport/rdma_transport.h b/mooncake-transfer-engine/include/transport/rdma_transport/rdma_transport.h index 8b798cc..6bc30ee 100644 --- a/mooncake-transfer-engine/include/transport/rdma_transport/rdma_transport.h +++ b/mooncake-transfer-engine/include/transport/rdma_transport/rdma_transport.h @@ -54,7 +54,8 @@ class RdmaTransport : public Transport { ~RdmaTransport(); int install(std::string &local_server_name, - std::shared_ptr meta, void **args) override; + std::shared_ptr meta, + std::shared_ptr topo) override; const char *getName() const override { return "rdma"; } @@ -113,7 +114,7 @@ class RdmaTransport : public Transport { private: std::vector> context_list_; std::atomic next_segment_id_; - Topology local_topology_; + std::shared_ptr local_topology_; }; using TransferRequest = Transport::TransferRequest; diff --git a/mooncake-transfer-engine/include/transport/tcp_transport/tcp_transport.h b/mooncake-transfer-engine/include/transport/tcp_transport/tcp_transport.h index 62aba16..a302e7c 100644 --- a/mooncake-transfer-engine/include/transport/tcp_transport/tcp_transport.h +++ b/mooncake-transfer-engine/include/transport/tcp_transport/tcp_transport.h @@ -49,15 +49,17 @@ class TcpTransport : public Transport { int submitTransfer(BatchID batch_id, const std::vector &entries) override; - int submitTransferTask(const std::vector &request_list, - const std::vector &task_list) override; + int submitTransferTask( + const std::vector &request_list, + const std::vector &task_list) override; int getTransferStatus(BatchID batch_id, size_t task_id, TransferStatus &status) override; private: int install(std::string &local_server_name, - std::shared_ptr meta, void **args) override; + std::shared_ptr meta, + std::shared_ptr topo); int allocateLocalSegmentID(); diff --git a/mooncake-transfer-engine/include/transport/transport.h b/mooncake-transfer-engine/include/transport/transport.h index c59ee40..c9bdcce 100644 --- a/mooncake-transfer-engine/include/transport/transport.h +++ b/mooncake-transfer-engine/include/transport/transport.h @@ -186,7 +186,8 @@ class Transport { protected: virtual int install(std::string &local_server_name, - std::shared_ptr meta, void **args); + std::shared_ptr meta, + std::shared_ptr topo); std::string local_server_name_; std::shared_ptr metadata_; diff --git a/mooncake-transfer-engine/rust/src/main.rs b/mooncake-transfer-engine/rust/src/main.rs index a32fff2..cc3a729 100644 --- a/mooncake-transfer-engine/rust/src/main.rs +++ b/mooncake-transfer-engine/rust/src/main.rs @@ -40,9 +40,6 @@ pub struct Args { help = "Operation type: read or write")] pub operation: String, - #[clap(long, default_value_t = String::from("{\"cpu:0\": [[\"mlx5_0\"], []]}"))] - pub nic_priority_matrix: String, - #[clap(long, default_value_t = String::from("127.0.0.1"), help = "Segment ID to access data")] pub segment_id: String, @@ -161,7 +158,6 @@ fn initiator(args: Args) -> Result<()> { let engine = Arc::new(TransferEngine::new( &args.metadata_server, &get_host_ip()?, - &args.nic_priority_matrix, 12345, )?); @@ -216,7 +212,6 @@ fn target(args: Args) -> Result<()> { let engine = TransferEngine::new( &args.metadata_server, &get_host_ip()?, - &args.nic_priority_matrix, 12345, )?; diff --git a/mooncake-transfer-engine/rust/src/transfer_engine.rs b/mooncake-transfer-engine/rust/src/transfer_engine.rs index c9466fb..1621a1d 100644 --- a/mooncake-transfer-engine/rust/src/transfer_engine.rs +++ b/mooncake-transfer-engine/rust/src/transfer_engine.rs @@ -62,66 +62,35 @@ pub struct BufferEntry { pub struct TransferEngine { engine: bindings::transfer_engine_t, - xport: bindings::transport_t, } impl TransferEngine { pub fn new( metadata_uri: &str, local_server_name: &str, - nic_priority_matrix: &str, rpc_port: u64, ) -> Result { let metadata_uri_c = CString::new(metadata_uri).map_err(|_| anyhow!("CString::new failed"))?; let local_server_name_c = CString::new(local_server_name).map_err(|_| anyhow!("CString::new failed"))?; - let nic_priority_matrix_c = - CString::new(nic_priority_matrix).map_err(|_| anyhow!("CString::new failed"))?; - let proto_c = CString::new("rdma").map_err(|_| anyhow!("CString::new failed"))?; - let engine = unsafe { bindings::createTransferEngine(metadata_uri_c.as_ptr()) }; - if engine.is_null() { - bail!("Failed to create TransferEngine") - } - - let ret = unsafe { - bindings::initTransferEngine( - engine, + let engine = unsafe { + bindings::createTransferEngine( + metadata_uri_c.as_ptr(), local_server_name_c.as_ptr(), local_server_name_c.as_ptr(), rpc_port, ) }; - if ret < 0 { - unsafe { - bindings::destroyTransferEngine(engine); - } + if engine.is_null() { bail!("Failed to create TransferEngine") } - let mut args = vec![ - nic_priority_matrix_c.into_raw() as *mut c_void, - std::ptr::null_mut(), - ]; - let xport = - unsafe { bindings::installTransport(engine, proto_c.as_ptr(), args.as_mut_ptr()) }; - - if xport.is_null() { - unsafe { - bindings::destroyTransferEngine(engine); - } - bail!("Failed to install or get Transport") - } - Ok(Self { engine, xport }) + Ok(Self { engine }) } pub fn close(&mut self) -> Result<()> { - let proto_c = CString::new("rdma").map_err(|_| anyhow!("CString::new failed"))?; - let ret = unsafe { bindings::uninstallTransport(self.engine, proto_c.as_ptr()) }; - if ret < 0 { - bail!("Failed to uninstall Transport"); - } unsafe { bindings::destroyTransferEngine(self.engine); } diff --git a/mooncake-transfer-engine/src/multi_transport.cpp b/mooncake-transfer-engine/src/multi_transport.cpp index f2c1cc3..5d489c9 100644 --- a/mooncake-transfer-engine/src/multi_transport.cpp +++ b/mooncake-transfer-engine/src/multi_transport.cpp @@ -124,7 +124,7 @@ int MultiTransport::getTransferStatus(BatchID batch_id, size_t task_id, } Transport *MultiTransport::installTransport(const std::string &proto, - void **args) { + std::shared_ptr topo) { Transport *transport = nullptr; if (std::string(proto) == "rdma") { transport = new RdmaTransport(); @@ -143,7 +143,7 @@ Transport *MultiTransport::installTransport(const std::string &proto, return nullptr; } - if (transport->install(local_server_name_, metadata_, args)) { + if (transport->install(local_server_name_, metadata_, topo)) { return nullptr; } diff --git a/mooncake-transfer-engine/src/topology.cpp b/mooncake-transfer-engine/src/topology.cpp index 41f5627..34cf065 100644 --- a/mooncake-transfer-engine/src/topology.cpp +++ b/mooncake-transfer-engine/src/topology.cpp @@ -49,7 +49,7 @@ static std::vector listInfiniBandDevices() { std::vector devices; if (dir == NULL) { - LOG(WARNING) << "Failed to list /sys/class/infiniband"; + PLOG(WARNING) << "Failed to open /sys/class/infiniband"; return {}; } while ((entry = readdir(dir))) { @@ -91,7 +91,7 @@ static std::vector discoverCpuTopology( std::vector topology; if (dir == NULL) { - LOG(WARNING) << "Failed to list /sys/devices/system/node"; + PLOG(WARNING) << "Failed to open /sys/devices/system/node"; return {}; } while ((entry = readdir(dir))) { diff --git a/mooncake-transfer-engine/src/transfer_engine.cpp b/mooncake-transfer-engine/src/transfer_engine.cpp index 6b3768b..69046f1 100644 --- a/mooncake-transfer-engine/src/transfer_engine.cpp +++ b/mooncake-transfer-engine/src/transfer_engine.cpp @@ -26,10 +26,27 @@ int TransferEngine::init(const std::string &metadata_conn_string, metadata_ = std::make_shared(metadata_conn_string); multi_transports_ = std::make_shared(metadata_, local_server_name_); + TransferMetadata::RpcMetaDesc desc; desc.ip_or_host_name = ip_or_host_name; desc.rpc_port = rpc_port; - return metadata_->addRpcMetaEntry(local_server_name_, desc); + int ret = metadata_->addRpcMetaEntry(local_server_name_, desc); + if (ret) return ret; + + if (auto_discover_) { + // discover topology automatically + local_topology_->discover(); + + if (local_topology_->getHcaList().size() > 0) { + // only install RDMA transport when there is at least one HCA + multi_transports_->installTransport("rdma", local_topology_); + } else { + multi_transports_->installTransport("tcp", nullptr); + } + // TODO: install other transports automatically + } + + return 0; } int TransferEngine::freeEngine() { @@ -40,6 +57,7 @@ int TransferEngine::freeEngine() { return 0; } +// Only for testing Transport *TransferEngine::installTransport(const std::string &proto, void **args) { Transport *transport = multi_transports_->getTransport(proto); @@ -47,7 +65,17 @@ Transport *TransferEngine::installTransport(const std::string &proto, LOG(INFO) << "Transport " << proto << " already installed"; return transport; } - transport = multi_transports_->installTransport(proto, args); + + if (args != nullptr && args[0] != nullptr) { + const std::string nic_priority_matrix = static_cast(args[0]); + int ret = local_topology_->parse(nic_priority_matrix); + if (ret) { + LOG(ERROR) << "Failed to parse NIC priority matrix"; + return nullptr; + } + } + + transport = multi_transports_->installTransport(proto, local_topology_); if (!transport) return nullptr; for (auto &entry : local_memory_regions_) { int ret = transport->registerLocalMemory( diff --git a/mooncake-transfer-engine/src/transfer_engine_c.cpp b/mooncake-transfer-engine/src/transfer_engine_c.cpp index 95f9920..1bc16ba 100644 --- a/mooncake-transfer-engine/src/transfer_engine_c.cpp +++ b/mooncake-transfer-engine/src/transfer_engine_c.cpp @@ -36,17 +36,6 @@ transfer_engine_t createTransferEngine(const char *metadata_conn_string, return (transfer_engine_t)native; } -transport_t installTransport(transfer_engine_t engine, const char *proto, - void **args) { - TransferEngine *native = (TransferEngine *)engine; - return (transport_t)native->installTransport(proto, args); -} - -int uninstallTransport(transfer_engine_t engine, const char *proto) { - TransferEngine *native = (TransferEngine *)engine; - return native->uninstallTransport(proto); -} - void destroyTransferEngine(transfer_engine_t engine) { TransferEngine *native = (TransferEngine *)engine; delete native; diff --git a/mooncake-transfer-engine/src/transport/cxl_transport/cxl_transport.cpp b/mooncake-transfer-engine/src/transport/cxl_transport/cxl_transport.cpp index 284f88e..862e723 100644 --- a/mooncake-transfer-engine/src/transport/cxl_transport/cxl_transport.cpp +++ b/mooncake-transfer-engine/src/transport/cxl_transport/cxl_transport.cpp @@ -54,7 +54,8 @@ int CxlTransport::submitTransfer(BatchID batch_id, int CxlTransport::freeBatchID(BatchID batch_id) { return 0; } int CxlTransport::install(std::string &local_server_name, - std::shared_ptr meta, void **args) { + std::shared_ptr meta, + std::shared_ptr topo) { return 0; } diff --git a/mooncake-transfer-engine/src/transport/nvmeof_transport/nvmeof_transport.cpp b/mooncake-transfer-engine/src/transport/nvmeof_transport/nvmeof_transport.cpp index 6a6e5e5..852273c 100644 --- a/mooncake-transfer-engine/src/transport/nvmeof_transport/nvmeof_transport.cpp +++ b/mooncake-transfer-engine/src/transport/nvmeof_transport/nvmeof_transport.cpp @@ -86,7 +86,8 @@ int NVMeoFTransport::getTransferStatus(BatchID batch_id, size_t task_id, for (size_t i = slice_id; i < slice_id + slice_num; ++i) { // LOG(INFO) << "task " << task_id << " i " << i << " upper bound " << // slice_num; - auto event = desc_pool_->getTransferStatus(nvmeof_desc.desc_idx_, slice_id); + auto event = + desc_pool_->getTransferStatus(nvmeof_desc.desc_idx_, slice_id); transfer_status.s = from_cufile_transfer_status(event.status); // TODO(FIXME): what to do if multi slices have different status? if (transfer_status.s == COMPLETED) { @@ -113,7 +114,8 @@ int NVMeoFTransport::submitTransfer( size_t task_id = batch_desc.task_list.size(); size_t slice_id = desc_pool_->getSliceNum(nvmeof_desc.desc_idx_); batch_desc.task_list.resize(task_id + entries.size()); - std::unordered_map> segment_desc_map; + std::unordered_map> + segment_desc_map; // segment_desc_map[LOCAL_SEGMENT_ID] = // getSegmentDescByID(LOCAL_SEGMENT_ID); for (auto &request : entries) { @@ -202,8 +204,8 @@ int NVMeoFTransport::freeBatchID(BatchID batch_id) { int NVMeoFTransport::install(std::string &local_server_name, std::shared_ptr meta, - void **args) { - return Transport::install(local_server_name, meta, args); + std::shared_ptr topo) { + return Transport::install(local_server_name, meta, topo); } int NVMeoFTransport::registerLocalMemory(void *addr, size_t length, @@ -244,7 +246,8 @@ void NVMeoFTransport::addSliceToCUFileBatch( uint64_t desc_id, TransferRequest::OpCode op, CUfileHandle_t fh) { CUfileIOParams_t params; params.mode = CUFILE_BATCH; - params.opcode = op == Transport::TransferRequest::READ ? CUFILE_READ : CUFILE_WRITE; + params.opcode = + op == Transport::TransferRequest::READ ? CUFILE_READ : CUFILE_WRITE; params.cookie = (void *)0; params.u.batch.devPtr_base = source_addr; params.u.batch.devPtr_offset = 0; diff --git a/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp b/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp index 8277c0a..bedd9a9 100644 --- a/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp +++ b/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp @@ -44,23 +44,17 @@ RdmaTransport::~RdmaTransport() { int RdmaTransport::install(std::string &local_server_name, std::shared_ptr meta, - void **args) { - const std::string nic_priority_matrix = static_cast(args[0]); - bool dry_run = args[1] ? *static_cast(args[1]) : false; - - if (dry_run) return 0; + std::shared_ptr topo) { + if (topo == nullptr) { + LOG(ERROR) << "RdmaTransport: missing topology"; + return ERR_INVALID_ARGUMENT; + } metadata_ = meta; local_server_name_ = local_server_name; + local_topology_ = topo; - int ret = local_topology_.parse(nic_priority_matrix); - if (ret) { - LOG(ERROR) << "RdmaTransport: incorrect NIC priority matrix: " - << nic_priority_matrix; - return ret; - } - - ret = initializeRdmaResources(); + auto ret = initializeRdmaResources(); if (ret) { LOG(ERROR) << "RdmaTransport: cannot initialize RDMA resources"; return ret; @@ -134,7 +128,7 @@ int RdmaTransport::allocateLocalSegmentID() { device_desc.gid = entry->gid(); desc->devices.push_back(device_desc); } - desc->topology = local_topology_; + desc->topology = *(local_topology_.get()); metadata_->addLocalSegment(LOCAL_SEGMENT_ID, local_server_name_, std::move(desc)); return 0; @@ -353,7 +347,7 @@ int RdmaTransport::onSetupRdmaConnections(const HandShakeDesc &peer_desc, std::shared_ptr context; int index = 0; - for (auto &entry : local_topology_.getHcaList()) { + for (auto &entry : local_topology_->getHcaList()) { if (entry == local_nic_name) { context = context_list_[index]; break; @@ -372,13 +366,13 @@ int RdmaTransport::onSetupRdmaConnections(const HandShakeDesc &peer_desc, } int RdmaTransport::initializeRdmaResources() { - if (local_topology_.empty()) { + if (local_topology_->empty()) { LOG(ERROR) << "RdmaTransport: No available RNIC"; return ERR_DEVICE_NOT_FOUND; } std::vector device_speed_list; - for (auto &device_name : local_topology_.getHcaList()) { + for (auto &device_name : local_topology_->getHcaList()) { auto context = std::make_shared(*this, device_name); if (!context) return ERR_MEMORY; diff --git a/mooncake-transfer-engine/src/transport/tcp_transport/tcp_transport.cpp b/mooncake-transfer-engine/src/transport/tcp_transport/tcp_transport.cpp index bce4e1d..ef74d9f 100644 --- a/mooncake-transfer-engine/src/transport/tcp_transport/tcp_transport.cpp +++ b/mooncake-transfer-engine/src/transport/tcp_transport/tcp_transport.cpp @@ -202,7 +202,8 @@ TcpTransport::~TcpTransport() { } int TcpTransport::install(std::string &local_server_name, - std::shared_ptr meta, void **args) { + std::shared_ptr meta, + std::shared_ptr topo) { metadata_ = meta; local_server_name_ = local_server_name; diff --git a/mooncake-transfer-engine/src/transport/transport.cpp b/mooncake-transfer-engine/src/transport/transport.cpp index 3350eee..12df7eb 100644 --- a/mooncake-transfer-engine/src/transport/transport.cpp +++ b/mooncake-transfer-engine/src/transport/transport.cpp @@ -51,7 +51,8 @@ int Transport::freeBatchID(BatchID batch_id) { } int Transport::install(std::string &local_server_name, - std::shared_ptr meta, void **args) { + std::shared_ptr meta, + std::shared_ptr topo) { local_server_name_ = local_server_name; metadata_ = meta; return 0; diff --git a/mooncake-transfer-engine/tests/rdma_transport_test.cpp b/mooncake-transfer-engine/tests/rdma_transport_test.cpp index a2c7a29..7af0662 100644 --- a/mooncake-transfer-engine/tests/rdma_transport_test.cpp +++ b/mooncake-transfer-engine/tests/rdma_transport_test.cpp @@ -240,7 +240,8 @@ std::string loadNicPriorityMatrix() { int initiator() { const size_t ram_buffer_size = 1ull << 30; - auto engine = std::make_unique(); + // disable topology auto discovery for testing. + auto engine = std::make_unique(false); auto hostname_port = parseHostNameWithPort(FLAGS_local_server_name); engine->init(FLAGS_metadata_server, FLAGS_local_server_name.c_str(), @@ -285,7 +286,8 @@ int initiator() { int target() { const size_t ram_buffer_size = 1ull << 30; - auto engine = std::make_unique(); + // disable topology auto discovery for testing. + auto engine = std::make_unique(false); auto hostname_port = parseHostNameWithPort(FLAGS_local_server_name); engine->init(FLAGS_metadata_server, FLAGS_local_server_name.c_str(), diff --git a/mooncake-transfer-engine/tests/rdma_transport_test2.cpp b/mooncake-transfer-engine/tests/rdma_transport_test2.cpp index 617fc52..2758d67 100644 --- a/mooncake-transfer-engine/tests/rdma_transport_test2.cpp +++ b/mooncake-transfer-engine/tests/rdma_transport_test2.cpp @@ -121,7 +121,8 @@ class RDMATransportTest : public ::testing::Test { LOG(INFO) << "HERE \n"; google::InitGoogleLogging("RDMATransportTest"); FLAGS_logtostderr = 1; - engine = std::make_unique(); + // disable topology auto discovery for testing. + engine = std::make_unique(false); hostname_port = parseHostNameWithPort(FLAGS_local_server_name); engine->init(FLAGS_metadata_server, FLAGS_local_server_name.c_str(), hostname_port.first.c_str(), diff --git a/mooncake-transfer-engine/tests/tcp_transport_test.cpp b/mooncake-transfer-engine/tests/tcp_transport_test.cpp index 862d88c..d64a3bf 100644 --- a/mooncake-transfer-engine/tests/tcp_transport_test.cpp +++ b/mooncake-transfer-engine/tests/tcp_transport_test.cpp @@ -87,7 +87,8 @@ static void *allocateMemoryPool(size_t size, int socket_id, } TEST_F(TCPTransportTest, GetTcpTest) { - auto engine = std::make_unique(); + // disable topology auto discovery for testing. + auto engine = std::make_unique(false); auto hostname_port = parseHostNameWithPort(local_server_name); auto rc = engine->init(metadata_server, local_server_name, hostname_port.first.c_str(), hostname_port.second); @@ -101,7 +102,8 @@ TEST_F(TCPTransportTest, Writetest) { const size_t kDataLength = 4096000; void *addr = nullptr; const size_t ram_buffer_size = 1ull << 30; - auto engine = std::make_unique(); + // disable topology auto discovery for testing. + auto engine = std::make_unique(false); auto hostname_port = parseHostNameWithPort(local_server_name); auto rc = engine->init(metadata_server, local_server_name, hostname_port.first.c_str(), hostname_port.second); @@ -145,7 +147,8 @@ TEST_F(TCPTransportTest, WriteAndReadtest) { const size_t kDataLength = 4096000; void *addr = nullptr; const size_t ram_buffer_size = 1ull << 30; - auto engine = std::make_unique(); + // disable topology auto discovery for testing. + auto engine = std::make_unique(false); auto hostname_port = parseHostNameWithPort(local_server_name); engine->init(metadata_server, local_server_name, hostname_port.first.c_str(), hostname_port.second); @@ -216,7 +219,8 @@ TEST_F(TCPTransportTest, WriteAndRead2test) { const size_t kDataLength = 4096000; void *addr = nullptr; const size_t ram_buffer_size = 1ull << 30; - auto engine = std::make_unique(); + // disable topology auto discovery for testing. + auto engine = std::make_unique(false); auto hostname_port = parseHostNameWithPort(local_server_name); engine->init(metadata_server, local_server_name, hostname_port.first.c_str(), hostname_port.second);