@@ -839,6 +839,75 @@ def make_multi_stats(nixl_count: int, foo_count: int) -> MultiKVConnectorStats:
839839 assert kv_connector_stats ["FooConnector" ].data ["num_foo_transfers" ] == 6
840840
841841
842+ @patch (
843+ "vllm.distributed.kv_transfer.kv_connector.v1.nixl_connector.NixlWrapper" ,
844+ FakeNixlWrapper ,
845+ )
846+ def test_scheduler_kv_connector_stats_aggregation ():
847+ """Test scheduler and worker KV connector stats aggregation."""
848+ from vllm .v1 .core .sched .output import SchedulerOutput
849+
850+ scheduler = create_scheduler (create_vllm_config ())
851+
852+ # Worker stats with transfer metrics
853+ worker_stats = NixlKVConnectorStats ()
854+ worker_stats .record_transfer (get_default_xfer_telemetry ())
855+ worker_stats .data ["remote_tokens" ] = []
856+
857+ # Scheduler stats with custom metric (needs dummy transfer to avoid being skipped)
858+ scheduler_stats = NixlKVConnectorStats ()
859+ scheduler_stats .data .update (
860+ { # dummy transfer just for testing, to bypass is_empty() check
861+ "transfer_duration" : [0 ],
862+ "post_duration" : [0 ],
863+ "bytes_transferred" : [0 ],
864+ "num_descriptors" : [0 ],
865+ "remote_tokens" : [128 ],
866+ }
867+ )
868+
869+ # Mock the scheduler connector's stats method
870+ scheduler .connector .get_kv_connector_stats = lambda : MultiKVConnectorStats (
871+ data = {"NixlConnector" : scheduler_stats }
872+ )
873+
874+ model_output = ModelRunnerOutput (
875+ req_ids = ["req_0" ],
876+ req_id_to_index = {"req_0" : 0 },
877+ sampled_token_ids = [[123 ]],
878+ logprobs = None ,
879+ prompt_logprobs_dict = {},
880+ pooler_output = [None ],
881+ kv_connector_output = KVConnectorOutput (
882+ kv_connector_stats = MultiKVConnectorStats (
883+ data = {"NixlConnector" : worker_stats }
884+ )
885+ ),
886+ )
887+ scheduler_output = SchedulerOutput (
888+ scheduled_new_reqs = [],
889+ scheduled_cached_reqs = None ,
890+ num_scheduled_tokens = {"req_0" : 1 },
891+ total_num_scheduled_tokens = 1 ,
892+ scheduled_spec_decode_tokens = {},
893+ scheduled_encoder_inputs = {},
894+ num_common_prefix_blocks = [0 ],
895+ finished_req_ids = set (),
896+ free_encoder_mm_hashes = set (),
897+ structured_output_request_ids = {},
898+ grammar_bitmask = None ,
899+ )
900+
901+ engine_core_outputs = scheduler .update_from_output (scheduler_output , model_output )
902+
903+ final_stats = next (
904+ iter (engine_core_outputs .values ())
905+ ).scheduler_stats .kv_connector_stats
906+ nixl_stats = final_stats ["NixlConnector" ]
907+ assert nixl_stats .num_successful_transfers == 2
908+ assert nixl_stats .data ["remote_tokens" ] == [128 ]
909+
910+
842911@pytest .mark .parametrize ("distributed_executor_backend" , ["ray" , None ])
843912@patch (
844913 "vllm.distributed.kv_transfer.kv_connector.v1.nixl_connector.NixlWrapper" ,
0 commit comments