diff --git a/docs/design/v1/p2p_nccl_connector.md b/docs/design/v1/p2p_nccl_connector.md new file mode 100644 index 000000000000..c24b53763709 --- /dev/null +++ b/docs/design/v1/p2p_nccl_connector.md @@ -0,0 +1,337 @@ +An implementation of xPyD with dynamic scaling based on point-to-point communication, partly inspired by Dynamo. + +# Detailed Design + +## Overall Process +As shown in Figure 1, the overall process of this **PD disaggregation** solution is described through a request flow: + +1. The client sends an HTTP request to the Proxy/Router's `/v1/completions` interface. +2. The Proxy/Router selects a **1P1D (1 Prefill instance + 1 Decode instance)** through either through round-robin or random selection, generates a `request_id` (rules to be introduced later), modifies the `max_tokens` in the HTTP request message to **1**, and then forwards the request to the **P instance**. +3. Immediately afterward, the Proxy/Router forwards the **original HTTP request** to the **D instance**. +4. The **P instance** performs **Prefill** and then **actively sends the generated KV cache** to the D instance (using **PUT_ASYNC** mode). The D instance's `zmq_addr` can be resolved through the `request_id`. +5. The **D instance** has a **dedicated thread** for receiving the KV cache (to avoid blocking the main process). The received KV cache is saved into the **GPU memory buffer**, the size of which is determined by the vLLM startup parameter `kv_buffer_size`. When the GPU buffer is full, the KV cache is stored in the **local Tensor memory pool**. +6. During the **Decode**, the D instance's main process retrieves the KV cache (transmitted by the P instance) from either the **GPU buffer** or the **memory pool**, thereby **skipping Prefill**. +7. After completing **Decode**, the D instance returns the result to the **Proxy/Router**, which then forwards it to the **client**. + +![image1](https://github.com/user-attachments/assets/fb01bde6-755b-49f7-ad45-48a94b1e10a7) + +## Proxy/Router (Demo) + +A simple HTTP service acts as the entry point for client requests and starts a background thread to listen for P/D instances reporting their HTTP IP and PORT, as well as ZMQ IP and PORT. It maintains a dictionary of `http_addr -> zmq_addr`. The `http_addr` is the IP:PORT for the vLLM instance's request, while the `zmq_addr` is the address for KV cache handshake and metadata reception. + +The Proxy/Router is responsible for selecting 1P1D based on the characteristics of the client request, such as the prompt, and generating a corresponding `request_id`, for example: + +``` +cmpl-___prefill_addr_10.0.1.2:21001___decode_addr_10.0.1.3:22001_93923d63113b4b338973f24d19d4bf11-0 +``` + +Currently, to quickly verify whether xPyD can work, a round-robin selection of 1P1D is used. In the future, it is planned to use a trie combined with the load status of instances to select appropriate P and D. + +Each P/D instance periodically sends a heartbeat packet to the Proxy/Router (currently every 3 seconds) to register (i.e., report `http_addr -> zmq_addr`) and keep the connection alive. If an instance crashes and fails to send a ping for a certain period of time, the Proxy/Router will remove the timed-out instance (this feature has not yet been developed). + +## KV Cache Transfer Methods + +There are three methods for KVcache transfer: PUT, GET, and PUT_ASYNC. These methods can be specified using the `--kv-transfer-config` and `kv_connector_extra_config` parameters, specifically through the `send_type` field. Both PUT and PUT_ASYNC involve the P instance actively sending KVcache to the D instance. The difference is that PUT is a synchronous transfer method that blocks the main process, while PUT_ASYNC is an asynchronous transfer method. PUT_ASYNC uses a dedicated thread for sending KVcache, which means it does not block the main process. In contrast, the GET method involves the P instance saving the KVcache to the memory buffer after computing the prefill. The D instance then actively retrieves the computed KVcache from the P instance once it has allocated space for the KVcache. + +Experimental results have shown that the performance of these methods, from highest to lowest, is as follows: PUT_ASYNC → GET → PUT. + +## P2P Communication via ZMQ & NCCL + +As long as the address of the counterpart is known, point-to-point KV cache transfer (using NCCL) can be performed, without being constrained by rank and world size. To support dynamic scaling (expansion and contraction) of instances with PD disaggregation. This means that adding or removing P/D instances does not require a full system restart. + +Each P/D instance only needs to create a single `P2pNcclEngine` instance. This instance maintains a ZMQ Server, which runs a dedicated thread to listen on the `zmq_addr` address and receive control flow requests from other instances. These requests include requests to establish an NCCL connection and requests to send KVcache metadata (such as tensor shapes and data types). However, it does not actually transmit the KVcache data itself. + +When a P instance and a D instance transmit KVcache for the first time, they need to establish a ZMQ connection and an NCCL group. For subsequent KVcache transmissions, this ZMQ connection and NCCL group are reused. The NCCL group consists of only two ranks, meaning the world size is equal to 2. This design is intended to support dynamic scaling, which means that adding or removing P/D instances does not require a full system restart. As long as the address of the counterpart is known, point-to-point KVcache transmission can be performed, without being restricted by rank or world size. + +## NCCL Group Topology + +Currently, only symmetric TP (Tensor Parallelism) methods are supported for KVcache transmission. Asymmetric TP and PP (Pipeline Parallelism) methods will be supported in the future. Figure 2 illustrates the 1P2D setup, where each instance has a TP (Tensor Parallelism) degree of 2. There are a total of 7 NCCL groups: three vLLM instances each have one NCCL group with TP=2. Additionally, the 0th GPU card of the P instance establishes an NCCL group with the 0th GPU card of each D instance. Similarly, the 1st GPU card of the P instance establishes an NCCL group with the 1st GPU card of each D instance. + +![image2](https://github.com/user-attachments/assets/837e61d6-365e-4cbf-8640-6dd7ab295b36) + +Each NCCL group occupies a certain amount of GPU memory buffer for communication, the size of which is primarily influenced by the `NCCL_MAX_NCHANNELS` environment variable. When `NCCL_MAX_NCHANNELS=16`, an NCCL group typically occupies 100MB, while when `NCCL_MAX_NCHANNELS=8`, it usually takes up 52MB. For large-scale xPyD configurations—such as DeepSeek's 96P144D—this implementation is currently not feasible. Moving forward, we are considering using RDMA for point-to-point communication and are also keeping an eye on UCCL. + +## GPU Memory Buffer and Tensor Memory Pool + +The trade-off in the size of the memory buffer is as follows: For P instances, the memory buffer is not required in PUT and PUT_ASYNC modes, but it is necessary in GET mode. For D instances, a memory buffer is needed in all three modes. The memory buffer for D instances should not be too large. Similarly, for P instances in GET mode, the memory buffer should also not be too large. The memory buffer of D instances is used to temporarily store KVcache sent by P instances. If it is too large, it will reduce the KVcache space available for normal inference by D instances, thereby decreasing the inference batch size and ultimately leading to a reduction in output throughput. The size of the memory buffer is configured by the parameter `kv_buffer_size`, measured in bytes, and is typically set to 5%~10% of the memory size. + +If the `--max-num-seqs` parameter for P instances is set to a large value, due to the large batch size, P instances will generate a large amount of KVcache simultaneously. This may exceed the capacity of the memory buffer of D instances, resulting in KVcache loss. Once KVcache is lost, D instances need to recompute Prefill, which is equivalent to performing Prefill twice. Consequently, the time-to-first-token (TTFT) will significantly increase, leading to degraded performance. + +To address the above issues, I have designed and developed a local Tensor memory pool for storing KVcache, inspired by the buddy system used in Linux memory modules. Since the memory is sufficiently large, typically in the TB range on servers, there is no need to consider prefix caching or using block-based designs to reuse memory, thereby saving space. When the memory buffer is insufficient, KVcache can be directly stored in the Tensor memory pool, and D instances can subsequently retrieve KVcache from it. The read and write speed is that of PCIe, with PCIe 4.0 having a speed of approximately 21 GB/s, which is usually faster than the Prefill speed. Otherwise, solutions like Mooncake and lmcache would not be necessary. The Tensor memory pool acts as a flood diversion area, typically unused except during sudden traffic surges. In the worst-case scenario, my solution performs no worse than the normal situation with a Cache store. + +# Install vLLM + +```shell +# Enter the home directory or your working directory. +cd /home + +# Download the installation package, and I will update the commit-id in time. You can directly copy the command. +wget https://vllm-wheels.s3.us-west-2.amazonaws.com/9112b443a042d8d815880b8780633882ad32b183/vllm-1.0.0.dev-cp38-abi3-manylinux1_x86_64.whl + +# Download the code repository. +git clone -b xpyd-v1 https://github.com/Abatom/vllm.git +cd vllm + +# Set the installation package path. +export VLLM_PRECOMPILED_WHEEL_LOCATION=/home/vllm-1.0.0.dev-cp38-abi3-manylinux1_x86_64.whl + +# installation +pip install -e . -v +``` + +# Run xPyD + +## Instructions +- The following examples are run on an A800 (80GB) device, using the Meta-Llama-3.1-8B-Instruct model. +- Pay attention to the setting of the `kv_buffer_size` (in bytes). The empirical value is 10% of the GPU memory size. This is related to the kvcache size. If it is too small, the GPU memory buffer for temporarily storing the received kvcache will overflow, causing the kvcache to be stored in the tensor memory pool, which increases latency. If it is too large, the kvcache available for inference will be reduced, leading to a smaller batch size and decreased throughput. +- For Prefill instances, when using non-GET mode, the `kv_buffer_size` can be set to 1, as Prefill currently does not need to receive kvcache. However, when using GET mode, a larger `kv_buffer_size` is required because it needs to store the kvcache sent to the D instance. +- You may need to modify the `kv_buffer_size` and `port` in the following commands (if there is a conflict). +- `PUT_ASYNC` offers the best performance and should be prioritized. +- The `--port` must be consistent with the `http_port` in the `--kv-transfer-config`. +- The `disagg_prefill_proxy_xpyd.py` script will use port 10001 (for receiving client requests) and port 30001 (for receiving service discovery from P and D instances). +- The node running the proxy must have `quart` installed. +- Supports multiple nodes; you just need to modify the `proxy_ip` and `proxy_port` in `--kv-transfer-config`. +- In the following examples, it is assumed that **the proxy's IP is 10.0.1.1**. + +## Run 1P3D + +### Proxy (e.g. 10.0.1.1) + +```shell +cd {your vllm directory}/examples/online_serving/disagg_xpyd/ +python3 disagg_prefill_proxy_xpyd.py & +``` + +### Prefill1 (e.g. 10.0.1.2 or 10.0.1.1) + +```shell +VLLM_USE_V1=1 CUDA_VISIBLE_DEVICES=0 vllm serve {your model directory} \ + --host 0.0.0.0 \ + --port 20005 \ + --tensor-parallel-size 1 \ + --seed 1024 \ + --served-model-name base_model \ + --dtype float16 \ + --max-model-len 10000 \ + --max-num-batched-tokens 10000 \ + --max-num-seqs 256 \ + --trust-remote-code \ + --gpu-memory-utilization 0.9 \ + --disable-log-request \ + --kv-transfer-config \ + '{"kv_connector":"P2pNcclConnector","kv_role":"kv_producer","kv_buffer_size":"1e1","kv_port":"21001","kv_connector_extra_config":{"proxy_ip":"10.0.1.1","proxy_port":"30001","http_port":"20005","send_type":"PUT_ASYNC","nccl_num_channels":"16"}}' > /var/vllm.log 2>&1 & +``` + +### Decode1 (e.g. 10.0.1.3 or 10.0.1.1) + +```shell +VLLM_USE_V1=1 CUDA_VISIBLE_DEVICES=1 vllm serve {your model directory} \ + --host 0.0.0.0 \ + --port 20009 \ + --tensor-parallel-size 1 \ + --seed 1024 \ + --served-model-name base_model \ + --dtype float16 \ + --max-model-len 10000 \ + --max-num-batched-tokens 10000 \ + --max-num-seqs 256 \ + --trust-remote-code \ + --gpu-memory-utilization 0.7 \ + --disable-log-request \ + --kv-transfer-config \ + '{"kv_connector":"P2pNcclConnector","kv_role":"kv_consumer","kv_buffer_size":"8e9","kv_port":"22001","kv_connector_extra_config":{"proxy_ip":"10.0.1.1","proxy_port":"30001","http_port":"20009","send_type":"PUT_ASYNC","nccl_num_channels":"16"}}' > /var/vllm.log 2>&1 & +``` + +### Decode2 (e.g. 10.0.1.4 or 10.0.1.1) + +```shell +VLLM_USE_V1=1 CUDA_VISIBLE_DEVICES=2 vllm serve {your model directory} \ + --host 0.0.0.0 \ + --port 20003 \ + --tensor-parallel-size 1 \ + --seed 1024 \ + --served-model-name base_model \ + --dtype float16 \ + --max-model-len 10000 \ + --max-num-batched-tokens 10000 \ + --max-num-seqs 256 \ + --trust-remote-code \ + --gpu-memory-utilization 0.7 \ + --disable-log-request \ + --kv-transfer-config \ + '{"kv_connector":"P2pNcclConnector","kv_role":"kv_consumer","kv_buffer_size":"8e9","kv_port":"23001","kv_connector_extra_config":{"proxy_ip":"10.0.1.1","proxy_port":"30001","http_port":"20003","send_type":"PUT_ASYNC","nccl_num_channels":"16"}}' > /var/vllm.log 2>&1 & +``` + +### Decode3 (e.g. 10.0.1.5 or 10.0.1.1) + +```shell +VLLM_USE_V1=1 CUDA_VISIBLE_DEVICES=3 vllm serve {your model directory} \ + --host 0.0.0.0 \ + --port 20008 \ + --tensor-parallel-size 1 \ + --seed 1024 \ + --served-model-name base_model \ + --dtype float16 \ + --max-model-len 10000 \ + --max-num-batched-tokens 10000 \ + --max-num-seqs 256 \ + --trust-remote-code \ + --gpu-memory-utilization 0.7 \ + --disable-log-request \ + --kv-transfer-config \ + '{"kv_connector":"P2pNcclConnector","kv_role":"kv_consumer","kv_buffer_size":"8e9","kv_port":"24001","kv_connector_extra_config":{"proxy_ip":"10.0.1.1","proxy_port":"30001","http_port":"20008","send_type":"PUT_ASYNC","nccl_num_channels":"16"}}' > /var/vllm.log 2>&1 & +``` + +## Run 3P1D + +### Proxy (e.g. 10.0.1.1) + +```shell +cd {your vllm directory}/examples/online_serving/disagg_xpyd/ +python3 disagg_prefill_proxy_xpyd.py & +``` + +### Prefill1 (e.g. 10.0.1.2 or 10.0.1.1) + +```shell +VLLM_USE_V1=1 CUDA_VISIBLE_DEVICES=0 vllm serve {your model directory} \ + --host 0.0.0.0 \ + --port 20005 \ + --tensor-parallel-size 1 \ + --seed 1024 \ + --served-model-name base_model \ + --dtype float16 \ + --max-model-len 10000 \ + --max-num-batched-tokens 10000 \ + --max-num-seqs 256 \ + --trust-remote-code \ + --gpu-memory-utilization 0.9 \ + --disable-log-request \ + --kv-transfer-config \ + '{"kv_connector":"P2pNcclConnector","kv_role":"kv_producer","kv_buffer_size":"1e1","kv_port":"21001","kv_connector_extra_config":{"proxy_ip":"10.0.1.1","proxy_port":"30001","http_port":"20005","send_type":"PUT_ASYNC","nccl_num_channels":"16"}}' > /var/vllm.log 2>&1 & +``` + +### Prefill2 (e.g. 10.0.1.3 or 10.0.1.1) + +```shell +VLLM_USE_V1=1 CUDA_VISIBLE_DEVICES=1 vllm serve {your model directory} \ + --host 0.0.0.0 \ + --port 20009 \ + --tensor-parallel-size 1 \ + --seed 1024 \ + --served-model-name base_model \ + --dtype float16 \ + --max-model-len 10000 \ + --max-num-batched-tokens 10000 \ + --max-num-seqs 256 \ + --trust-remote-code \ + --gpu-memory-utilization 0.9 \ + --disable-log-request \ + --kv-transfer-config \ + '{"kv_connector":"P2pNcclConnector","kv_role":"kv_producer","kv_buffer_size":"1e1","kv_port":"22001","kv_connector_extra_config":{"proxy_ip":"10.0.1.1","proxy_port":"30001","http_port":"20009","send_type":"PUT_ASYNC","nccl_num_channels":"16"}}' > /var/vllm.log 2>&1 & +``` + +### Prefill3 (e.g. 10.0.1.4 or 10.0.1.1) + +```shell +VLLM_USE_V1=1 CUDA_VISIBLE_DEVICES=2 vllm serve {your model directory} \ + --host 0.0.0.0 \ + --port 20003 \ + --tensor-parallel-size 1 \ + --seed 1024 \ + --served-model-name base_model \ + --dtype float16 \ + --max-model-len 10000 \ + --max-num-batched-tokens 10000 \ + --max-num-seqs 256 \ + --trust-remote-code \ + --gpu-memory-utilization 0.9 \ + --disable-log-request \ + --kv-transfer-config \ + '{"kv_connector":"P2pNcclConnector","kv_role":"kv_producer","kv_buffer_size":"1e1","kv_port":"23001","kv_connector_extra_config":{"proxy_ip":"10.0.1.1","proxy_port":"30001","http_port":"20003","send_type":"PUT_ASYNC","nccl_num_channels":"16"}}' > /var/vllm.log 2>&1 & +``` + +### Decode1 (e.g. 10.0.1.5 or 10.0.1.1) + +```shell +VLLM_USE_V1=1 CUDA_VISIBLE_DEVICES=3 vllm serve {your model directory} \ + --host 0.0.0.0 \ + --port 20008 \ + --tensor-parallel-size 1 \ + --seed 1024 \ + --served-model-name base_model \ + --dtype float16 \ + --max-model-len 10000 \ + --max-num-batched-tokens 10000 \ + --max-num-seqs 256 \ + --trust-remote-code \ + --gpu-memory-utilization 0.7 \ + --disable-log-request \ + --kv-transfer-config \ + '{"kv_connector":"P2pNcclConnector","kv_role":"kv_consumer","kv_buffer_size":"8e9","kv_port":"24001","kv_connector_extra_config":{"proxy_ip":"10.0.1.1","proxy_port":"30001","http_port":"20008","send_type":"PUT_ASYNC","nccl_num_channels":"16"}}' > /var/vllm.log 2>&1 & +``` + +# Single request + +```shell +curl -X POST -s http://10.0.1.1:10001/v1/completions \ +-H "Content-Type: application/json" \ +-d '{ + "model": "base_model", + "prompt": "San Francisco is a", + "max_tokens": 10, + "temperature": 0 +}' +``` + +# Benchmark + +```shell +python3 benchmark_serving.py \ + --backend vllm \ + --model base_model \ + --tokenizer meta-llama/Llama-3.1-8B-Instruct \ + --dataset-name "random" \ + --host 10.0.1.1 \ + --port 10001 \ + --random-input-len 1024 \ + --random-output-len 1024 \ + --ignore-eos \ + --burstiness 100 \ + --percentile-metrics "ttft,tpot,itl,e2el" \ + --metric-percentiles "90,95,99" \ + --seed $(date +%s) \ + --trust-remote-code \ + --request-rate 3 \ + --num-prompts 1000 +``` + +# Shut down + +```shell +pgrep python | xargs kill -9 && pkill -f python +``` + +# Test data + +## **Scenario 1**: 1K input & 1K output tokens, E2E P99 latency ~20s +- **1P5D (6×A800) vs vLLM (1×A800)**: + - Throughput ↑7.2% (1085 → 6979/6) + - ITL (P99) ↓81.3% (120ms → 22.9ms) + - TTFT (P99) ↑26.8% (175ms → 222ms) + - TPOT: No change + +- **1P6D (7×A800) vs vLLM (1×A800)**: + - Throughput ↑9.6% (1085 → 8329/7) + - ITL (P99) ↓81.0% (120ms → 22.7ms) + - TTFT (P99) ↑210% (175ms →543ms) + - TPOT: No change + +## **Scenario 2**: 1K input & 200 output tokens, E2E P99 latency ~4s +- **1P1D (2×A800) vs vLLM (1×A800)**: + - Throughput ↑37.4% (537 → 1476/2) + - ITL (P99) ↓81.8% (127ms → 23.1ms) + - TTFT (P99) ↑41.8% (160ms → 227ms) + - TPOT: No change + +![testdata](https://github.com/user-attachments/assets/f791bfc7-9f3d-4e5c-9171-a42f9f4da627) diff --git a/examples/online_serving/disagg_xpyd/disagg_prefill_proxy_xpyd.py b/examples/online_serving/disagg_xpyd/disagg_prefill_proxy_xpyd.py new file mode 100644 index 000000000000..73f2caaa0dbd --- /dev/null +++ b/examples/online_serving/disagg_xpyd/disagg_prefill_proxy_xpyd.py @@ -0,0 +1,154 @@ +# SPDX-License-Identifier: Apache-2.0 + +import os +import socket +import threading +import uuid + +import aiohttp +import msgpack +import zmq +from quart import Quart, make_response, request + +count = 0 +prefill_instances: dict[str, str] = {} # http_address: zmq_address +decode_instances: dict[str, str] = {} # http_address: zmq_address + +prefill_cv = threading.Condition() +decode_cv = threading.Condition() + + +def _listen_for_register(poller, router_socket): + while True: + socks = dict(poller.poll()) + if router_socket in socks: + remote_address, message = router_socket.recv_multipart() + # data: {"type": "P", "http_address": "ip:port", + # "zmq_address": "ip:port"} + data = msgpack.loads(message) + if data["type"] == "P": + global prefill_instances + global prefill_cv + with prefill_cv: + prefill_instances[data["http_address"]] = data["zmq_address"] + elif data["type"] == "D": + global decode_instances + global decode_cv + with decode_cv: + decode_instances[data["http_address"]] = data["zmq_address"] + else: + print( + "Unexpected, Received message from %s, data: %s", + remote_address, + data, + ) + + +def start_service_discovery(hostname, port): + if not hostname: + hostname = socket.gethostname() + if port == 0: + raise ValueError("Port cannot be 0") + + context = zmq.Context() + router_socket = context.socket(zmq.ROUTER) + router_socket.bind(f"tcp://{hostname}:{port}") + + poller = zmq.Poller() + poller.register(router_socket, zmq.POLLIN) + + _listener_thread = threading.Thread( + target=_listen_for_register, args=[poller, router_socket], daemon=True + ) + _listener_thread.start() + return _listener_thread + + +AIOHTTP_TIMEOUT = aiohttp.ClientTimeout(total=6 * 60 * 60) + +app = Quart(__name__) + + +def random_uuid() -> str: + return str(uuid.uuid4().hex) + + +async def forward_request(url, data, request_id): + async with aiohttp.ClientSession(timeout=AIOHTTP_TIMEOUT) as session: + headers = { + "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}", + "X-Request-Id": request_id, + } + async with session.post(url=url, json=data, headers=headers) as response: + if response.status == 200: + if True: + async for chunk_bytes in response.content.iter_chunked(1024): + yield chunk_bytes + else: + content = await response.read() + yield content + + +@app.route("/v1/completions", methods=["POST"]) +async def handle_request(): + try: + original_request_data = await request.get_json() + + prefill_request = original_request_data.copy() + # change max_tokens = 1 to let it only do prefill + prefill_request["max_tokens"] = 1 + + global count + global prefill_instances + global prefill_cv + with prefill_cv: + prefill_list = list(prefill_instances.items()) + prefill_addr, prefill_zmq_addr = prefill_list[count % len(prefill_list)] + + global decode_instances + global decode_cv + with decode_cv: + decode_list = list(decode_instances.items()) + decode_addr, decode_zmq_addr = decode_list[count % len(decode_list)] + + print( + f"handle_request count: {count}, [HTTP:{prefill_addr}, " + f"ZMQ:{prefill_zmq_addr}] 👉 [HTTP:{decode_addr}, " + f"ZMQ:{decode_zmq_addr}]" + ) + count += 1 + + request_id = ( + f"___prefill_addr_{prefill_zmq_addr}___decode_addr_" + f"{decode_zmq_addr}_{random_uuid()}" + ) + + # finish prefill + async for _ in forward_request( + f"http://{prefill_addr}/v1/completions", prefill_request, request_id + ): + continue + + # return decode + generator = forward_request( + f"http://{decode_addr}/v1/completions", original_request_data, request_id + ) + response = await make_response(generator) + response.timeout = None + + return response + + except Exception as e: + import sys + import traceback + + exc_info = sys.exc_info() + print("Error occurred in disagg prefill proxy server") + print(e) + print("".join(traceback.format_exception(*exc_info))) + + +if __name__ == "__main__": + t = start_service_discovery("0.0.0.0", 30001) + app.run(host="0.0.0.0", port=10001) + t.join() diff --git a/vllm/distributed/device_communicators/pynccl_wrapper.py b/vllm/distributed/device_communicators/pynccl_wrapper.py index 04a4d0147f5d..3018a92da07c 100644 --- a/vllm/distributed/device_communicators/pynccl_wrapper.py +++ b/vllm/distributed/device_communicators/pynccl_wrapper.py @@ -272,6 +272,14 @@ def ncclGetUniqueId(self) -> ncclUniqueId: ctypes.byref(unique_id))) return unique_id + def unique_id_from_bytes(self, data: bytes) -> ncclUniqueId: + if len(data) != 128: + raise ValueError( + f"Expected 128 bytes for ncclUniqueId, got {len(data)} bytes") + unique_id = ncclUniqueId() + ctypes.memmove(ctypes.addressof(unique_id.internal), data, 128) + return unique_id + def ncclCommInitRank(self, world_size: int, unique_id: ncclUniqueId, rank: int) -> ncclComm_t: comm = ncclComm_t() diff --git a/vllm/distributed/kv_transfer/kv_connector/factory.py b/vllm/distributed/kv_transfer/kv_connector/factory.py index 58dfa251c735..be9ce72dea67 100644 --- a/vllm/distributed/kv_transfer/kv_connector/factory.py +++ b/vllm/distributed/kv_transfer/kv_connector/factory.py @@ -112,6 +112,11 @@ def create_connector_v1( "vllm.distributed.kv_transfer.kv_connector.v1.shared_storage_connector", "SharedStorageConnector") +KVConnectorFactory.register_connector( + "P2pNcclConnector", + "vllm.distributed.kv_transfer.kv_connector.v1.p2p.p2p_nccl_connector", + "P2pNcclConnector") + KVConnectorFactory.register_connector( "LMCacheConnectorV1", "vllm.distributed.kv_transfer.kv_connector.v1.lmcache_connector", diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/p2p/__init__.py b/vllm/distributed/kv_transfer/kv_connector/v1/p2p/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/p2p/p2p_nccl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/p2p/p2p_nccl_connector.py new file mode 100644 index 000000000000..a47deaf91272 --- /dev/null +++ b/vllm/distributed/kv_transfer/kv_connector/v1/p2p/p2p_nccl_connector.py @@ -0,0 +1,481 @@ +# SPDX-License-Identifier: Apache-2.0 + +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, Optional + +import regex as re +import torch + +from vllm.config import VllmConfig +from vllm.distributed.kv_transfer.kv_connector.v1.base import ( + KVConnectorBase_V1, KVConnectorMetadata, KVConnectorRole) +from vllm.distributed.kv_transfer.kv_connector.v1.p2p.p2p_nccl_engine import ( + P2pNcclEngine) +from vllm.distributed.parallel_state import get_world_group +from vllm.forward_context import get_forward_context +from vllm.logger import init_logger +from vllm.v1.attention.backends.mla.common import MLACommonMetadata +from vllm.v1.core.sched.output import SchedulerOutput + +if TYPE_CHECKING: + from vllm.attention.backends.abstract import AttentionMetadata + from vllm.forward_context import ForwardContext + from vllm.v1.core.kv_cache_manager import KVCacheBlocks + from vllm.v1.request import Request + +logger = init_logger(__name__) + + +@dataclass +class ReqMeta: + # Request Id + request_id: str + # Request tokens + token_ids: torch.Tensor + # Slot mappings, should have the same length as token_ids + slot_mapping: torch.Tensor + + @staticmethod + def make_meta(request_id: str, token_ids: list[int], block_ids: list[int], + block_size: int) -> "ReqMeta": + valid_num_tokens = len(token_ids) + token_ids_tensor = torch.tensor(token_ids) + block_ids_tensor = torch.tensor(block_ids) + num_blocks = block_ids_tensor.shape[0] + block_offsets = torch.arange(0, block_size) + slot_mapping = block_offsets.reshape((1, block_size)) + \ + block_ids_tensor.reshape((num_blocks, 1)) * block_size + slot_mapping = slot_mapping.flatten()[:valid_num_tokens] + + return ReqMeta( + request_id=request_id, + token_ids=token_ids_tensor, + slot_mapping=slot_mapping, + ) + + +@dataclass +class P2pNcclConnectorMetadata(KVConnectorMetadata): + requests: list[ReqMeta] + + def __init__(self): + self.requests = [] + + def add_request( + self, + request_id: str, + token_ids: list[int], + block_ids: list[int], + block_size: int, + ) -> None: + self.requests.append( + ReqMeta.make_meta(request_id, token_ids, block_ids, block_size)) + + +class P2pNcclConnector(KVConnectorBase_V1): + + def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole): + super().__init__(vllm_config=vllm_config, role=role) + self._block_size = vllm_config.cache_config.block_size + self._requests_need_load: dict[str, Any] = {} + self.config = vllm_config.kv_transfer_config + self.is_producer = self.config.is_kv_producer + self.chunked_prefill: dict[str, Any] = {} + + self._rank = get_world_group().rank \ + if role == KVConnectorRole.WORKER else 0 + self._local_rank = get_world_group().local_rank \ + if role == KVConnectorRole.WORKER else 0 + + self.p2p_nccl_engine = P2pNcclEngine( + local_rank=self._local_rank, + config=self.config, + hostname="", + port_offset=self._rank, + ) if role == KVConnectorRole.WORKER else None + + # ============================== + # Worker-side methods + # ============================== + + def start_load_kv(self, forward_context: "ForwardContext", + **kwargs) -> None: + """Start loading the KV cache from the connector buffer to vLLM's + paged KV buffer. + + Args: + forward_context (ForwardContext): the forward context. + **kwargs: additional arguments for the load operation + + Note: + The number of elements in kv_caches and layer_names should be + the same. + """ + + # Only consumer/decode loads KV Cache + if self.is_producer: + return + + assert self.p2p_nccl_engine is not None + + attn_metadata = forward_context.attn_metadata + if attn_metadata is None: + return + + def inject_kv_into_layer( + dst_kv_cache_layer: torch.Tensor, + src_kv_cache: torch.Tensor, + slot_mapping: torch.Tensor, + request_id: str, + ) -> None: + """Inject the KV cache into the layer. + + Args: + dst_kv_cache_layer (torch.Tensor): the destination KV cache + layer. In shape [2, num_pages, page_size, xxx] if not + using MLA, [num_pages, page_size, xxx] otherwise. + src_kv_cache (torch.Tensor): the source KV cache. In shape + [2, num_tokens, xxx] if not using MLA, [num_tokens, xxx] + otherwise. + slot_mapping (torch.Tensor): the slot mapping. In shape + [num_tokens]. + request_id (str): request id for log + """ + dst_kv_cache_layer_shape = dst_kv_cache_layer.shape + if isinstance(attn_metadata, MLACommonMetadata): + num_pages = dst_kv_cache_layer_shape[0] + page_size = dst_kv_cache_layer_shape[1] + dst_kv_cache_layer = dst_kv_cache_layer.reshape( + num_pages * page_size, -1) + self.check_tensors_except_dim(dst_kv_cache_layer, src_kv_cache, + 0) + num_token = src_kv_cache.shape[0] + if len(slot_mapping) == num_token: + dst_kv_cache_layer[slot_mapping, ...] = src_kv_cache + else: + dst_kv_cache_layer[slot_mapping[:num_token], + ...] = src_kv_cache + logger.warning( + "🚧src_kv_cache does not match, num_slot:%d, " + "num_token:%d, request_id:%s", len(slot_mapping), + num_token, request_id) + + dst_kv_cache_layer.reshape(dst_kv_cache_layer_shape) + else: + num_pages = dst_kv_cache_layer_shape[1] + page_size = dst_kv_cache_layer_shape[2] + dst_kv_cache_layer = dst_kv_cache_layer.reshape( + 2, num_pages * page_size, -1) + self.check_tensors_except_dim(dst_kv_cache_layer, src_kv_cache, + 1) + num_token = src_kv_cache.shape[1] + if len(slot_mapping) == num_token: + dst_kv_cache_layer[:, slot_mapping, ...] = src_kv_cache + else: + dst_kv_cache_layer[:, slot_mapping[:num_token], + ...] = src_kv_cache + logger.warning( + "🚧src_kv_cache does not match, num_slot:%d, " + "num_token:%d, request_id:%s", len(slot_mapping), + num_token, request_id) + + dst_kv_cache_layer.reshape(dst_kv_cache_layer_shape) + + # Get the metadata + metadata: KVConnectorMetadata = \ + self._get_connector_metadata() + assert isinstance(metadata, P2pNcclConnectorMetadata) + + if metadata is None: + return + + # Load the KV for each request each layer + for request in metadata.requests: + for layer_name in forward_context.no_compile_layers: + attn_layer = forward_context.no_compile_layers[layer_name] + kv_cache_layer = attn_layer.kv_cache[ \ + forward_context.virtual_engine] + + kv_cache = self.p2p_nccl_engine.recv_tensor( + request.request_id + "#" + layer_name) + + if kv_cache is None: + logger.warning("🚧src_kv_cache is None, %s", + request.request_id) + continue + + inject_kv_into_layer(kv_cache_layer, kv_cache, + request.slot_mapping, request.request_id) + + def wait_for_layer_load(self, layer_name: str) -> None: + """Blocking until the KV for a specific layer is loaded into vLLM's + paged buffer. + + This interface will be useful for layer-by-layer pipelining. + + Args: + layer_name: the name of that layer + """ + return + + def save_kv_layer(self, layer_name: str, kv_layer: torch.Tensor, + attn_metadata: "AttentionMetadata", **kwargs) -> None: + """Start saving the KV cache of the layer from vLLM's paged buffer + to the connector. + + Args: + layer_name (str): the name of the layer. + kv_layer (torch.Tensor): the paged KV buffer of the current + layer in vLLM. + attn_metadata (AttentionMetadata): the attention metadata. + **kwargs: additional arguments for the save operation. + """ + + # Only producer/prefill saves KV Cache + if not self.is_producer: + return + + assert self.p2p_nccl_engine is not None + + def extract_kv_from_layer( + layer: torch.Tensor, + slot_mapping: torch.Tensor, + ) -> torch.Tensor: + """Extract the KV cache from the layer. + + Assume the shape of the layer is (2, num_pages, page_size, xxx) + if MLA is not used, and (num_pages, page_size, xxx) otherwise. + """ + if isinstance(attn_metadata, MLACommonMetadata): + num_pages, page_size = layer.shape[0], layer.shape[1] + return layer.reshape(num_pages * page_size, -1)[slot_mapping, + ...] + num_pages, page_size = layer.shape[1], layer.shape[2] + return layer.reshape(2, num_pages * page_size, -1)[:, slot_mapping, + ...] + + connector_metadata = self._get_connector_metadata() + assert isinstance(connector_metadata, P2pNcclConnectorMetadata) + for request in connector_metadata.requests: + request_id = request.request_id + ip, port = self.parse_request_id(request_id, True) + remote_address = ip + ":" + str(port + self._rank) + kv_cache = extract_kv_from_layer(kv_layer, request.slot_mapping) + self.p2p_nccl_engine.send_tensor(request_id + "#" + layer_name, + kv_cache, remote_address) + + def wait_for_save(self): + if self.is_producer: + assert self.p2p_nccl_engine is not None + self.p2p_nccl_engine.wait_for_sent() + + def get_finished( + self, finished_req_ids: set[str], + **kwargs) -> tuple[Optional[set[str]], Optional[set[str]]]: + """ + Notifies worker-side connector ids of requests that have + finished generating tokens. + + Returns: + ids of requests that have finished asynchronous transfer, + tuple of (sending/saving ids, recving/loading ids). + The finished saves/sends req ids must belong to a set provided in a + call to this method (this call or a prior one). + """ + + assert self.p2p_nccl_engine is not None + + forward_context: ForwardContext = get_forward_context() + return self.p2p_nccl_engine.get_finished(finished_req_ids, + forward_context) + + # ============================== + # Scheduler-side methods + # ============================== + + def get_num_new_matched_tokens( + self, + request: "Request", + num_computed_tokens: int, + ) -> tuple[int, bool]: + """ + Get number of new tokens that can be loaded from the + external KV cache beyond the num_computed_tokens. + + Args: + request (Request): the request object. + num_computed_tokens (int): the number of locally + computed tokens for this request + + Returns: + the number of tokens that can be loaded from the + external KV cache beyond what is already computed. + """ + if self.is_producer: + return 0, False + + num_external_tokens = (len(request.prompt_token_ids) - 1 - + num_computed_tokens) + + if num_external_tokens < 0: + num_external_tokens = 0 + + return num_external_tokens, False + + def update_state_after_alloc(self, request: "Request", + blocks: "KVCacheBlocks", + num_external_tokens: int): + """ + Update KVConnector state after block allocation. + """ + if not self.is_producer and num_external_tokens > 0: + self._requests_need_load[request.request_id] = ( + request, blocks.get_block_ids()[0]) + + def build_connector_meta( + self, + scheduler_output: SchedulerOutput, + ) -> KVConnectorMetadata: + """Build the connector metadata for this step. + + This function should NOT modify any fields in the scheduler_output. + Also, calling this function will reset the state of the connector. + + Args: + scheduler_output (SchedulerOutput): the scheduler output object. + """ + + meta = P2pNcclConnectorMetadata() + + for new_req in scheduler_output.scheduled_new_reqs: + if self.is_producer: + num_scheduled_tokens = ( + scheduler_output.num_scheduled_tokens)[new_req.req_id] + num_tokens = num_scheduled_tokens + new_req.num_computed_tokens + # the request's prompt is chunked prefill + if num_tokens < len(new_req.prompt_token_ids): + # 'CachedRequestData' has no attribute 'prompt_token_ids' + self.chunked_prefill[new_req.req_id] = ( + new_req.block_ids[0], new_req.prompt_token_ids) + continue + # the request's prompt is not chunked prefill + meta.add_request(request_id=new_req.req_id, + token_ids=new_req.prompt_token_ids, + block_ids=new_req.block_ids[0], + block_size=self._block_size) + continue + if new_req.req_id in self._requests_need_load: + meta.add_request(request_id=new_req.req_id, + token_ids=new_req.prompt_token_ids, + block_ids=new_req.block_ids[0], + block_size=self._block_size) + self._requests_need_load.pop(new_req.req_id) + + for cached_req in scheduler_output.scheduled_cached_reqs: + if self.is_producer: + num_scheduled_tokens = ( + scheduler_output.num_scheduled_tokens)[cached_req.req_id] + num_tokens = (num_scheduled_tokens + + cached_req.num_computed_tokens) + assert cached_req.req_id in self.chunked_prefill + block_ids = cached_req.new_block_ids[0] + if not cached_req.resumed_from_preemption: + block_ids = (self.chunked_prefill[cached_req.req_id][0] + + block_ids) + prompt_token_ids = self.chunked_prefill[cached_req.req_id][1] + # the request's prompt is chunked prefill again + if num_tokens < len(prompt_token_ids): + self.chunked_prefill[cached_req.req_id] = ( + block_ids, prompt_token_ids) + continue + # the request's prompt is all prefilled finally + meta.add_request(request_id=cached_req.req_id, + token_ids=prompt_token_ids, + block_ids=block_ids, + block_size=self._block_size) + self.chunked_prefill.pop(cached_req.req_id, None) + continue + + # NOTE(rob): here we rely on the resumed requests being + # the first N requests in the list scheduled_cache_reqs. + if not cached_req.resumed_from_preemption: + break + if cached_req.req_id in self._requests_need_load: + request, _ = self._requests_need_load.pop(cached_req.req_id) + total_tokens = cached_req.num_computed_tokens + 1 + token_ids = request.all_token_ids[:total_tokens] + + # NOTE(rob): For resumed req, new_block_ids is all + # of the block_ids for the request. + block_ids = cached_req.new_block_ids[0] + + meta.add_request(request_id=cached_req.req_id, + token_ids=token_ids, + block_ids=block_ids, + block_size=self._block_size) + + # Requests loaded asynchronously are not in the scheduler_output. + # for request_id in self._requests_need_load: + # request, block_ids = self._requests_need_load[request_id] + # meta.add_request(request_id=request.request_id, + # token_ids=request.prompt_token_ids, + # block_ids=block_ids, + # block_size=self._block_size) + + self._requests_need_load.clear() + return meta + + def request_finished( + self, + request: "Request", + block_ids: list[int], + ) -> tuple[bool, Optional[dict[str, Any]]]: + """ + Called when a request has finished, before its blocks are freed. + + Returns: + True if the request is being saved/sent asynchronously and blocks + should not be freed until the request_id is returned from + get_finished(). + Optional KVTransferParams to be included in the request outputs + returned by the engine. + """ + + self.chunked_prefill.pop(request.request_id, None) + + return False, None + + # ============================== + # Static methods + # ============================== + + @staticmethod + def parse_request_id(request_id: str, is_prefill=True) -> tuple[str, int]: + # Regular expression to match the string hostname and integer port + if is_prefill: + pattern = r"___decode_addr_(.*):(\d+)" + else: + pattern = r"___prefill_addr_(.*):(\d+)___" + + # Use re.search to find the pattern in the request_id + match = re.search(pattern, request_id) + if match: + # Extract the ranks + ip = match.group(1) + port = int(match.group(2)) + + return ip, port + raise ValueError( + f"Request id {request_id} does not contain hostname and port") + + @staticmethod + def check_tensors_except_dim(tensor1, tensor2, dim): + shape1 = tensor1.size() + shape2 = tensor2.size() + + if len(shape1) != len(shape2) or not all( + s1 == s2 + for i, (s1, s2) in enumerate(zip(shape1, shape2)) if i != dim): + raise NotImplementedError( + "Currently, only symmetric TP is supported. Asymmetric TP, PP," + "and others will be supported in future PRs.") diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/p2p/p2p_nccl_engine.py b/vllm/distributed/kv_transfer/kv_connector/v1/p2p/p2p_nccl_engine.py new file mode 100644 index 000000000000..81f7a2525896 --- /dev/null +++ b/vllm/distributed/kv_transfer/kv_connector/v1/p2p/p2p_nccl_engine.py @@ -0,0 +1,531 @@ +# SPDX-License-Identifier: Apache-2.0 + +import logging +import os +import threading +import time +import typing +from collections import deque +from contextlib import contextmanager +from typing import TYPE_CHECKING, Any, Optional + +import msgpack +import torch +import zmq + +from vllm.config import KVTransferConfig +from vllm.distributed.device_communicators.pynccl_wrapper import ( + NCCLLibrary, buffer_type, cudaStream_t, ncclComm_t, ncclDataTypeEnum) +from vllm.distributed.kv_transfer.kv_connector.v1.p2p.tensor_memory_pool import ( # noqa: E501 + TensorMemoryPool) +from vllm.utils import current_stream, get_ip + +if TYPE_CHECKING: + from vllm.forward_context import ForwardContext + +logger = logging.getLogger(__name__) + +DEFAULT_MEM_POOL_SIZE_GB = 32 + + +@contextmanager +def set_p2p_nccl_context(num_channels: str): + original_values: dict[str, Any] = {} + env_vars = [ + 'NCCL_MAX_NCHANNELS', + 'NCCL_MIN_NCHANNELS', + 'NCCL_CUMEM_ENABLE', + 'NCCL_BUFFSIZE', + 'NCCL_PROTO', # LL,LL128,SIMPLE + 'NCCL_ALGO', # RING,TREE + ] + + for var in env_vars: + original_values[var] = os.environ.get(var) + + logger.info("set_p2p_nccl_context, original_values: %s", original_values) + + try: + os.environ['NCCL_MAX_NCHANNELS'] = num_channels + os.environ['NCCL_MIN_NCHANNELS'] = num_channels + os.environ['NCCL_CUMEM_ENABLE'] = '1' + yield + finally: + for var in env_vars: + if original_values[var] is not None: + os.environ[var] = original_values[var] + else: + os.environ.pop(var, None) + + +class P2pNcclEngine: + + def __init__(self, + local_rank: int, + config: KVTransferConfig, + hostname: str = "", + port_offset: int = 0, + library_path: Optional[str] = None) -> None: + self.config = config + self.rank = port_offset + self.local_rank = local_rank + self.device = torch.device(f"cuda:{self.local_rank}") + self.nccl = NCCLLibrary(library_path) + + if not hostname: + hostname = get_ip() + port = int(self.config.kv_port) + port_offset + if port == 0: + raise ValueError("Port cannot be 0") + self._hostname = hostname + self._port = port + + # Each card corresponds to a ZMQ address. + self.zmq_address = f"{self._hostname}:{self._port}" + + # The `http_port` must be consistent with the port of OpenAI. + self.http_address = ( + f"{self._hostname}:" + f"{self.config.kv_connector_extra_config['http_port']}") + + # If `proxy_ip` or `proxy_port` is `""`, + # then the ping thread will not be enabled. + proxy_ip = self.config.get_from_extra_config("proxy_ip", "") + proxy_port = self.config.get_from_extra_config("proxy_port", "") + if proxy_ip == "" or proxy_port == "": + self.proxy_address = "" + else: + self.proxy_address = proxy_ip + ":" + proxy_port + + self.context = zmq.Context() + self.router_socket = self.context.socket(zmq.ROUTER) + self.router_socket.bind(f"tcp://{self.zmq_address}") + + self.poller = zmq.Poller() + self.poller.register(self.router_socket, zmq.POLLIN) + + self.send_store_cv = threading.Condition() + self.send_queue_cv = threading.Condition() + self.recv_store_cv = threading.Condition() + + self.send_stream = torch.cuda.Stream() + self.recv_stream = torch.cuda.Stream() + + mem_pool_size_gb = self.config.get_from_extra_config( + "mem_pool_size_gb", DEFAULT_MEM_POOL_SIZE_GB) + self.pool = TensorMemoryPool(max_block_size=int(mem_pool_size_gb) * + 1024**3) # GB + + # The sending type includes tree mutually exclusive options: + # PUT, GET, PUT_ASYNC. + self.send_type = self.config.get_from_extra_config("send_type", "PUT") + if self.send_type == "GET": + # tensor_id: torch.Tensor + self.send_store: dict[str, torch.Tensor] = {} + else: + # PUT or PUT_ASYNC + # tensor_id: torch.Tensor + self.send_queue: deque[list[Any]] = deque() + self.send_request_id_to_tensor_ids: dict[str, set[str]] = {} + if self.send_type == "PUT_ASYNC": + self._send_thread = threading.Thread(target=self._send_async, + daemon=True) + self._send_thread.start() + + # tensor_id: torch.Tensor/(addr, dtype, shape) + self.recv_store: dict[str, Any] = {} + self.recv_request_id_to_tensor_ids: dict[str, set[str]] = {} + self.socks: dict[str, Any] = {} # remote_address: client socket + self.comms: dict[str, Any] = {} # remote_address: (ncclComm_t, rank) + + self.buffer_size = 0 + self.buffer_size_threshold = float(self.config.kv_buffer_size) + + self.nccl_num_channels = self.config.get_from_extra_config( + "nccl_num_channels", "8") + + self._listener_thread = threading.Thread( + target=self._listen_for_requests, daemon=True) + self._listener_thread.start() + + self._ping_thread = None + if port_offset == 0 and self.proxy_address != "": + self._ping_thread = threading.Thread(target=self._ping, + daemon=True) + self._ping_thread.start() + + logger.info( + "💯P2pNcclEngine init, rank:%d, local_rank:%d, http_address:%s, " + "zmq_address:%s, proxy_address:%s, send_type:%s, buffer_size_" + "threshold:%.2f, nccl_num_channels:%s", self.rank, self.local_rank, + self.http_address, self.zmq_address, self.proxy_address, + self.send_type, self.buffer_size_threshold, self.nccl_num_channels) + + def _create_connect(self, remote_address: typing.Optional[str] = None): + assert remote_address is not None + if remote_address not in self.socks: + sock = self.context.socket(zmq.DEALER) + sock.setsockopt_string(zmq.IDENTITY, self.zmq_address) + sock.connect(f"tcp://{remote_address}") + self.socks[remote_address] = sock + if remote_address in self.comms: + logger.info("👋comm exists, remote_address:%s, comms:%s", + remote_address, self.comms) + return sock, self.comms[remote_address] + + unique_id = self.nccl.ncclGetUniqueId() + data = {"cmd": "NEW", "unique_id": bytes(unique_id.internal)} + sock.send(msgpack.dumps(data)) + + with torch.cuda.device(self.device): + rank = 0 + with set_p2p_nccl_context(self.nccl_num_channels): + comm: ncclComm_t = self.nccl.ncclCommInitRank( + 2, unique_id, rank) + self.comms[remote_address] = (comm, rank) + logger.info("🤝ncclCommInitRank Success, %s👉%s, MyRank: %s", + self.zmq_address, remote_address, rank) + + return self.socks[remote_address], self.comms[remote_address] + + def send_tensor( + self, + tensor_id: str, + tensor: torch.Tensor, + remote_address: typing.Optional[str] = None, + ) -> bool: + if remote_address is None: + with self.recv_store_cv: + self.recv_store[tensor_id] = tensor + self.recv_store_cv.notify() + return True + else: + if self.send_type == "PUT": + return self._send_sync(tensor_id, tensor, remote_address) + elif self.send_type == "PUT_ASYNC": + with self.send_queue_cv: + self.send_queue.append([tensor_id, remote_address, tensor]) + self.send_queue_cv.notify() + else: # GET + with self.send_store_cv: + tensor_size = tensor.element_size() * tensor.numel() + while (self.buffer_size + tensor_size + > self.buffer_size_threshold): + oldest_tenser_id = next(iter(self.send_store)) + oldest_tenser = self.send_store.pop(oldest_tenser_id) + oldest_tenser_size = oldest_tenser.element_size( + ) * oldest_tenser.numel() + self.buffer_size -= oldest_tenser_size + logger.info( + "⛔[GET]Send to %s, tensor_id:%s, tensor_size:%d," + " buffer_size:%d, oldest_tenser_size:%d, rank:%d", + remote_address, tensor_id, tensor_size, + self.buffer_size, oldest_tenser_size, self.rank) + + self.send_store[tensor_id] = tensor + self.buffer_size += tensor_size + logger.debug( + "🔵[GET]Send to %s, tensor_id:%s, tensor_size:%d, " + "shape:%s, rank:%d, buffer_size:%d(%.2f%%)", + remote_address, tensor_id, tensor_size, tensor.shape, + self.rank, self.buffer_size, + self.buffer_size / self.buffer_size_threshold * 100) + + return True + + def recv_tensor( + self, + tensor_id: str, + remote_address: typing.Optional[str] = None, + ) -> torch.Tensor: + if self.send_type == "PUT" or self.send_type == "PUT_ASYNC": + start_time = time.time() + with self.recv_store_cv: + while tensor_id not in self.recv_store: + self.recv_store_cv.wait() + tensor = self.recv_store[tensor_id] + + if tensor is not None: + if isinstance(tensor, tuple): + addr, dtype, shape = tensor + tensor = self.pool.load_tensor(addr, dtype, shape, + self.device) + else: + self.buffer_size -= (tensor.element_size() * + tensor.numel()) + else: + duration = time.time() - start_time + logger.warning( + "🔴[PUT]Recv From %s, tensor_id:%s, duration:%.3fms, " + "rank:%d", remote_address, tensor_id, duration * 1000, + self.rank) + return tensor + + # GET + if remote_address is None: + return None + + if remote_address not in self.socks: + self._create_connect(remote_address) + + sock = self.socks[remote_address] + comm, rank = self.comms[remote_address] + + data = {"cmd": "GET", "tensor_id": tensor_id} + sock.send(msgpack.dumps(data)) + + message = sock.recv() + data = msgpack.loads(message) + if data["ret"] != 0: + logger.warning("🔴[GET]Recv From %s, tensor_id: %s, ret: %d", + remote_address, tensor_id, data["ret"]) + return None + + tensor = torch.empty(data["shape"], + dtype=getattr(torch, data["dtype"]), + device=self.device) + + self._recv(comm, tensor, rank ^ 1, self.recv_stream) + + return tensor + + def _listen_for_requests(self): + while True: + socks = dict(self.poller.poll()) + if self.router_socket in socks: + remote_address, message = self.router_socket.recv_multipart() + data = msgpack.loads(message) + if data["cmd"] == "NEW": + unique_id = self.nccl.unique_id_from_bytes( + bytes(data["unique_id"])) + with torch.cuda.device(self.device): + rank = 1 + with set_p2p_nccl_context(self.nccl_num_channels): + comm: ncclComm_t = self.nccl.ncclCommInitRank( + 2, unique_id, rank) + self.comms[remote_address.decode()] = (comm, rank) + logger.info( + "🤝ncclCommInitRank Success, %s👈%s, MyRank:%s", + self.zmq_address, remote_address.decode(), rank) + elif data["cmd"] == "PUT": + tensor_id = data["tensor_id"] + try: + tensor = torch.empty(data["shape"], + dtype=getattr( + torch, data["dtype"]), + device=self.device) + self.router_socket.send_multipart( + [remote_address, b"0"]) + comm, rank = self.comms[remote_address.decode()] + self._recv(comm, tensor, rank ^ 1, self.recv_stream) + tensor_size = tensor.element_size() * tensor.numel() + if (self.buffer_size + tensor_size + > self.buffer_size_threshold): + # Store Tensor in memory pool + addr = self.pool.store_tensor(tensor) + tensor = (addr, tensor.dtype, tensor.shape) + logger.warning( + "🔴[PUT]Recv Tensor, Out Of Threshold, " + "%s👈%s, data:%s, addr:%d", self.zmq_address, + remote_address.decode(), data, addr) + else: + self.buffer_size += tensor_size + + except torch.cuda.OutOfMemoryError: + self.router_socket.send_multipart( + [remote_address, b"1"]) + tensor = None + logger.warning( + "🔴[PUT]Recv Tensor, Out Of Memory, %s👈%s, " + "data:%s", self.zmq_address, + remote_address.decode(), data) + + with self.recv_store_cv: + self.recv_store[tensor_id] = tensor + self._have_received_tensor_id(tensor_id) + self.recv_store_cv.notify() + + elif data["cmd"] == "GET": + tensor_id = data["tensor_id"] + with self.send_store_cv: + tensor = self.send_store.pop(tensor_id, None) + if tensor is not None: + data = { + "ret": 0, + "shape": tensor.shape, + "dtype": + str(tensor.dtype).replace("torch.", "") + } + # LRU + self.send_store[tensor_id] = tensor + self._have_sent_tensor_id(tensor_id) + else: + data = {"ret": 1} + + self.router_socket.send_multipart( + [remote_address, msgpack.dumps(data)]) + + if data["ret"] == 0: + comm, rank = self.comms[remote_address.decode()] + self._send(comm, tensor.to(self.device), rank ^ 1, + self.send_stream) + else: + logger.warning( + "🚧Unexpected, Received message from %s, data:%s", + remote_address, data) + + def _have_sent_tensor_id(self, tensor_id: str): + request_id = tensor_id.split('#')[0] + if request_id not in self.send_request_id_to_tensor_ids: + self.send_request_id_to_tensor_ids[request_id] = set() + self.send_request_id_to_tensor_ids[request_id].add(tensor_id) + + def _have_received_tensor_id(self, tensor_id: str): + request_id = tensor_id.split('#')[0] + if request_id not in self.recv_request_id_to_tensor_ids: + self.recv_request_id_to_tensor_ids[request_id] = set() + self.recv_request_id_to_tensor_ids[request_id].add(tensor_id) + + def _send_async(self): + while True: + with self.send_queue_cv: + while not self.send_queue: + self.send_queue_cv.wait() + tensor_id, remote_address, tensor = self.send_queue.popleft() + if not self.send_queue: + self.send_queue_cv.notify() + self._send_sync(tensor_id, tensor, remote_address) + + def wait_for_sent(self): + if self.send_type == "PUT_ASYNC": + start_time = time.time() + with self.send_queue_cv: + while self.send_queue: + self.send_queue_cv.wait() + duration = time.time() - start_time + logger.debug( + "🚧[PUT_ASYNC]It took %.3fms to wait for the send_queue" + " to be empty, rank:%d", duration * 1000, self.rank) + + def _send_sync( + self, + tensor_id: str, + tensor: torch.Tensor, + remote_address: typing.Optional[str] = None, + ) -> bool: + if remote_address is None: + return False + if remote_address not in self.socks: + self._create_connect(remote_address) + + sock = self.socks[remote_address] + comm, rank = self.comms[remote_address] + data = { + "cmd": "PUT", + "tensor_id": tensor_id, + "shape": tensor.shape, + "dtype": str(tensor.dtype).replace("torch.", "") + } + sock.send(msgpack.dumps(data)) + + response = sock.recv() + if response != b"0": + logger.error( + "🔴Send Tensor, Peer Out Of Memory/Threshold, %s 👉 %s, " + "MyRank:%s, data:%s, tensor:%s, size:%fGB, response:%s", + self.zmq_address, remote_address, rank, data, tensor.shape, + tensor.element_size() * tensor.numel() / 1024**3, + response.decode()) + return False + + self._send(comm, tensor.to(self.device), rank ^ 1, self.send_stream) + + if self.send_type == "PUT_ASYNC": + self._have_sent_tensor_id(tensor_id) + + return True + + def get_finished( + self, finished_req_ids: set[str], forward_context: "ForwardContext" + ) -> tuple[Optional[set[str]], Optional[set[str]]]: + """ + Notifies worker-side connector ids of requests that have + finished generating tokens. + + Returns: + ids of requests that have finished asynchronous transfer, + tuple of (sending/saving ids, recving/loading ids). + The finished saves/sends req ids must belong to a set provided in a + call to this method (this call or a prior one). + """ + + # Clear the buffer upon request completion. + for request_id in finished_req_ids: + for layer_name in forward_context.no_compile_layers: + tensor_id = request_id + "#" + layer_name + if tensor_id in self.recv_store: + with self.recv_store_cv: + tensor = self.recv_store.pop(tensor_id, None) + self.send_request_id_to_tensor_ids.pop( + request_id, None) + self.recv_request_id_to_tensor_ids.pop( + request_id, None) + addr = 0 + if isinstance(tensor, tuple): + addr, _, _ = tensor + self.pool.free(addr) + + # TODO:Retrieve requests that have already sent the KV cache. + finished_sending: set[str] = set() + + # TODO:Retrieve requests that have already received the KV cache. + finished_recving: set[str] = set() + + return finished_sending or None, finished_recving or None + + def _ping(self): + sock = self.context.socket(zmq.DEALER) + sock.setsockopt_string(zmq.IDENTITY, self.zmq_address) + logger.debug("ping start, zmq_address:%s", self.zmq_address) + sock.connect(f"tcp://{self.proxy_address}") + data = { + "type": "P" if self.config.is_kv_producer else "D", + "http_address": self.http_address, + "zmq_address": self.zmq_address + } + while True: + sock.send(msgpack.dumps(data)) + time.sleep(3) + + def _send(self, comm, tensor: torch.Tensor, dst: int, stream=None): + assert tensor.device == self.device, ( + f"this nccl communicator is created to work on {self.device}, " + f"but the input tensor is on {tensor.device}") + if stream is None: + stream = current_stream() + + with torch.cuda.stream(stream): + self.nccl.ncclSend(buffer_type(tensor.data_ptr()), tensor.numel(), + ncclDataTypeEnum.from_torch(tensor.dtype), dst, + comm, cudaStream_t(stream.cuda_stream)) + stream.synchronize() + + def _recv(self, comm, tensor: torch.Tensor, src: int, stream=None): + assert tensor.device == self.device, ( + f"this nccl communicator is created to work on {self.device}, " + f"but the input tensor is on {tensor.device}") + if stream is None: + stream = current_stream() + + with torch.cuda.stream(stream): + self.nccl.ncclRecv(buffer_type(tensor.data_ptr()), tensor.numel(), + ncclDataTypeEnum.from_torch(tensor.dtype), src, + comm, cudaStream_t(stream.cuda_stream)) + stream.synchronize() + + def close(self) -> None: + self._listener_thread.join() + if self.send_type == "PUT_ASYNC": + self._send_thread.join() + if self._ping_thread is not None: + self._ping_thread.join() diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/p2p/tensor_memory_pool.py b/vllm/distributed/kv_transfer/kv_connector/v1/p2p/tensor_memory_pool.py new file mode 100644 index 000000000000..303619a3fdd0 --- /dev/null +++ b/vllm/distributed/kv_transfer/kv_connector/v1/p2p/tensor_memory_pool.py @@ -0,0 +1,264 @@ +# SPDX-License-Identifier: Apache-2.0 + +import atexit +import ctypes +import math +from dataclasses import dataclass + +import torch + +from vllm.logger import init_logger + +logger = init_logger(__name__) + + +@dataclass +class MemoryBlock: + size: int + addr: int + + +"""A memory pool for managing pinned host memory allocations for tensors. + +This class implements a buddy allocation system to efficiently manage pinned +host memory for tensor storage. It supports allocation, deallocation, and +tensor storage/retrieval operations. + +Key Features: +- Uses power-of-two block sizes for efficient buddy allocation +- Supports splitting and merging of memory blocks +- Provides methods to store CUDA tensors in pinned host memory +- Allows loading tensors from pinned memory back to device +- Automatically cleans up memory on destruction + +Attributes: + max_block_size (int): Maximum block size (rounded to nearest power of two) + min_block_size (int): Minimum block size (rounded to nearest power of two) + free_lists (dict): Dictionary of free memory blocks by size + allocated_blocks (dict): Dictionary of currently allocated blocks + base_tensor (torch.Tensor): Base pinned memory tensor + base_address (int): Base memory address of the pinned memory region + +Example: + >>> pool = TensorMemoryPool(max_block_size=1024*1024) + >>> tensor = torch.randn(100, device='cuda') + >>> addr = pool.store_tensor(tensor) + >>> loaded_tensor = pool.load_tensor(addr, tensor.dtype, + ... tensor.shape, 'cuda') + >>> pool.free(addr) +""" + + +class TensorMemoryPool: + """Initializes the memory pool with given size constraints. + + Args: + max_block_size (int): Maximum size of memory blocks to manage + min_block_size (int, optional): Minimum size of memory blocks + to manage. Defaults to 512. + + Raises: + ValueError: If block sizes are invalid or max_block_size is less + than min_block_size + """ + + def __init__(self, max_block_size: int, min_block_size: int = 512): + if max_block_size <= 0 or min_block_size <= 0: + raise ValueError("Block sizes must be positive") + if max_block_size < min_block_size: + raise ValueError( + "Max block size must be greater than min block size") + + self.max_block_size = self._round_to_power_of_two(max_block_size) + self.min_block_size = self._round_to_power_of_two(min_block_size) + + self.free_lists: dict[int, dict[int, MemoryBlock]] = {} + self.allocated_blocks: dict[int, MemoryBlock] = {} + + self._initialize_free_lists() + self._allocate_pinned_memory() + + atexit.register(self.cleanup) + + def _round_to_power_of_two(self, size: int) -> int: + return 1 << (size - 1).bit_length() + + def _initialize_free_lists(self): + size = self.max_block_size + while size >= self.min_block_size: + self.free_lists[size] = {} + size //= 2 + + def _allocate_pinned_memory(self): + self.base_tensor = torch.empty(self.max_block_size // 4, + dtype=torch.float32, + pin_memory=True) + self.base_address = self.base_tensor.data_ptr() + initial_block = MemoryBlock(size=self.max_block_size, + addr=self.base_address) + self.free_lists[self.max_block_size][ + initial_block.addr] = initial_block + logger.debug("TensorMemoryPool, base_address:", self.base_address, + self.base_address % self.max_block_size) + + def allocate(self, size: int) -> int: + """Allocates a memory block of at least the requested size. + + Args: + size (int): Minimum size of memory to allocate + + Returns: + int: Address of the allocated memory block + + Raises: + ValueError: If size is invalid or insufficient memory is available + """ + if size <= 0: + raise ValueError("Allocation size must be positive") + + required_size = self._round_to_power_of_two( + max(size, self.min_block_size)) + if required_size > self.max_block_size: + raise ValueError("Requested size exceeds maximum block size") + + current_size = required_size + while current_size <= self.max_block_size: + if self.free_lists[current_size]: + _, block = self.free_lists[current_size].popitem() + self._split_block(block, required_size) + self.allocated_blocks[block.addr] = block + return block.addr + current_size *= 2 + + raise ValueError("Insufficient memory") + + def _split_block(self, block: MemoryBlock, required_size: int): + while (block.size > required_size + and block.size // 2 >= self.min_block_size): + buddy_size = block.size // 2 + buddy_addr = block.addr + buddy_size + + buddy = MemoryBlock(size=buddy_size, addr=buddy_addr) + block.size = buddy_size + + self.free_lists[buddy_size][buddy.addr] = buddy + + def free(self, addr: int): + """Frees an allocated memory block. + + Args: + addr (int): Address of the block to free + + Raises: + ValueError: If address is invalid or not allocated + """ + if addr not in self.allocated_blocks: + raise ValueError("Invalid address to free") + + block = self.allocated_blocks.pop(addr) + self._merge_buddies(block) + + def _merge_buddies(self, block: MemoryBlock): + MAX_MERGE_DEPTH = 30 + depth = 0 + + while depth < MAX_MERGE_DEPTH: + buddy_offset = block.size if (block.addr - self.base_address) % ( + 2 * block.size) == 0 else -block.size + buddy_addr = block.addr + buddy_offset + buddy = self.free_lists[block.size].get(buddy_addr) + if buddy: + del self.free_lists[buddy.size][buddy.addr] + merged_addr = min(block.addr, buddy.addr) + merged_size = block.size * 2 + block = MemoryBlock(size=merged_size, addr=merged_addr) + depth += 1 + else: + break + self.free_lists[block.size][block.addr] = block + + def store_tensor(self, tensor: torch.Tensor) -> int: + """Stores a CUDA tensor in pinned host memory. + + Args: + tensor (torch.Tensor): CUDA tensor to store + + Returns: + int: Address where the tensor is stored + + Raises: + ValueError: If tensor is not on CUDA or allocation fails + """ + if not tensor.is_cuda: + raise ValueError("Only CUDA tensors can be stored") + + size = tensor.element_size() * tensor.numel() + addr = self.allocate(size) + block = self.allocated_blocks[addr] + + if block.size < size: + self.free(addr) + raise ValueError( + f"Allocated block size {block.size} is smaller than " + f"required size {size}") + + try: + buffer = (ctypes.c_byte * block.size).from_address(block.addr) + cpu_tensor = torch.frombuffer(buffer, + dtype=tensor.dtype, + count=tensor.numel()).reshape( + tensor.shape) + except ValueError as err: + self.free(addr) + raise ValueError(f"Failed to create tensor view: {err}") from err + + cpu_tensor.copy_(tensor) + + return addr + + def load_tensor(self, addr: int, dtype: torch.dtype, + shape: tuple[int, ...], device) -> torch.Tensor: + """Loads a tensor from pinned host memory to the specified device. + + Args: + addr (int): Address where tensor is stored + dtype (torch.dtype): Data type of the tensor + shape (tuple[int, ...]): Shape of the tensor + device: Target device for the loaded tensor + + Returns: + torch.Tensor: The loaded tensor on the specified device + + Raises: + ValueError: If address is invalid or sizes don't match + """ + if addr not in self.allocated_blocks: + raise ValueError("Invalid address to load") + + block = self.allocated_blocks[addr] + num_elements = math.prod(shape) + dtype_size = torch.tensor([], dtype=dtype).element_size() + required_size = num_elements * dtype_size + + if required_size > block.size: + raise ValueError("Requested tensor size exceeds block size") + + buffer = (ctypes.c_byte * block.size).from_address(block.addr) + cpu_tensor = torch.frombuffer(buffer, dtype=dtype, + count=num_elements).reshape(shape) + + cuda_tensor = torch.empty(shape, dtype=dtype, device=device) + + cuda_tensor.copy_(cpu_tensor) + + return cuda_tensor + + def cleanup(self): + """Cleans up all memory resources and resets the pool state.""" + self.free_lists.clear() + self.allocated_blocks.clear() + if hasattr(self, 'base_tensor'): + del self.base_tensor + + def __del__(self): + self.cleanup()