Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 0 additions & 83 deletions lib/bindings/python/tests/test_kv_bindings.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@


import asyncio
import ctypes
import os
import subprocess
from ctypes import c_char_p, c_int64, c_uint32
from time import sleep
from typing import List

Expand Down Expand Up @@ -211,86 +208,6 @@ def remove_event(self):
self.event_id_counter += 1


# [TODO] to be deprecated
# KV events
class DynamoResult:
OK = 0
ERR = 1


class CtypesEventPublisher:
def __init__(
self, namespace: str, component: str, worker_id: int, kv_block_size: int
):
self.event_id_counter = 0
self.block_ids: List[int] = []

# load event publisher library
self.lib = ctypes.CDLL(os.environ["VLLM_KV_CAPI_PATH"])
self.lib.dynamo_llm_init.argtypes = [c_char_p, c_char_p, c_int64, c_uint32]
self.lib.dynamo_llm_init.restype = c_uint32
result = self.lib.dynamo_llm_init(
namespace.encode(), component.encode(), worker_id, kv_block_size
)
assert result == DynamoResult.OK

self.lib.dynamo_kv_event_publish_stored.argtypes = [
ctypes.c_uint64, # event_id
ctypes.POINTER(ctypes.c_uint32), # token_ids
ctypes.POINTER(ctypes.c_size_t), # num_block_tokens
ctypes.POINTER(ctypes.c_uint64), # block_ids
ctypes.c_size_t, # num_blocks
ctypes.POINTER(ctypes.c_uint64), # parent_hash
ctypes.c_uint64, # lora_id
]
self.lib.dynamo_kv_event_publish_stored.restype = (
ctypes.c_uint32
) # dynamo_llm_result_t

self.lib.dynamo_kv_event_publish_removed.argtypes = [
ctypes.c_uint64, # event_id
ctypes.POINTER(ctypes.c_uint64), # block_ids
ctypes.c_size_t, # num_blocks
]
self.lib.dynamo_kv_event_publish_removed.restype = (
ctypes.c_uint32
) # dynamo_llm_result_t

def store_event(self, tokens, lora_id):
parent_hash = (
(ctypes.c_uint64 * 1)(self.event_id_counter)
if self.event_id_counter > 0
else None
)
result = self.lib.dynamo_kv_event_publish_stored(
self.event_id_counter, # uint64_t event_id
(ctypes.c_uint32 * len(tokens))(*tokens), # const uint32_t *token_ids
(ctypes.c_size_t * 1)(len(tokens)), # const uintptr_t *num_block_tokens
(ctypes.c_uint64 * 1)(self.event_id_counter), # const uint64_t *block_ids
1, # uintptr_t num_blocks
parent_hash, # const uint64_t *parent_hash
lora_id, # uint64_t lora_id
)
self.block_ids.append(self.event_id_counter)
self.event_id_counter += 1

assert result == DynamoResult.OK

def remove_event(self):
result = self.lib.dynamo_kv_event_publish_removed(
self.event_id_counter, # uint64_t event_id
(ctypes.c_uint64 * 1)(self.block_ids[-1]), # const uint64_t *block_ids
1, # uintptr_t num_blocks
)
self.event_id_counter += 1

assert result == DynamoResult.OK

def shutdown(self):
result = self.lib.dynamo_llm_shutdown()
assert result == DynamoResult.OK


async def test_metrics_aggregator(distributed_runtime):
namespace = "kv_test"
component = "metrics"
Expand Down
Loading