File tree Expand file tree Collapse file tree 1 file changed +12
-0
lines changed
vllm/distributed/kv_transfer/kv_connector/v1 Expand file tree Collapse file tree 1 file changed +12
-0
lines changed Original file line number Diff line number Diff line change @@ -1025,6 +1025,11 @@ def get_finished(self) -> tuple[set[str], set[str]]:
10251025 # Sorted dict, oldest requests are put first so we can exit early.
10261026 if now < expires :
10271027 break
1028+ count = self .consumer_notification_counts_by_req .pop (req_id , 0 )
1029+ logger .warning (
1030+ "Releasing expired KV blocks for request %s which were "
1031+ "retrieved by %d decode worker(s) within %d seconds." , req_id ,
1032+ count , envs .VLLM_NIXL_ABORT_REQUEST_TIMEOUT )
10281033 del self ._reqs_to_send [req_id ]
10291034 done_sending .add (req_id )
10301035
@@ -1040,6 +1045,13 @@ def _get_new_notifs(self) -> set[str]:
10401045 for notifs in self .nixl_wrapper .get_new_notifs ().values ():
10411046 for notif in notifs :
10421047 req_id , tp_ratio = notif .decode ("utf-8" ).rsplit (":" , 1 )
1048+ if req_id not in self ._reqs_to_send :
1049+ logger .error (
1050+ "Potentially invalid KV blocks for "
1051+ "unrecognized request %s were retrieved by "
1052+ "a decode worker. They may have expired." , req_id )
1053+ continue
1054+
10431055 self .consumer_notification_counts_by_req [req_id ] += 1
10441056 # Wait all consumers (D) to be done reading before freeing.
10451057 if self .consumer_notification_counts_by_req [req_id ] == int (
You can’t perform that action at this time.
0 commit comments