11use super :: * ;
22use anyhow;
33
4-
54#[ derive( Debug , Clone , Serialize , Deserialize ) ]
65pub enum Action {
76 GetNumNewMatchedTokens ( GetNumNewMatchedTokensInput , GetNumNewMatchedTokensOutput ) ,
@@ -33,8 +32,7 @@ pub struct UpdateStateAfterAllocInput {
3332}
3433
3534#[ derive( Debug , Clone , Serialize , Deserialize ) ]
36- pub struct UpdateStateAfterAllocOutput {
37- }
35+ pub struct UpdateStateAfterAllocOutput { }
3836
3937#[ derive( Debug , Clone , Serialize , Deserialize ) ]
4038pub struct BuildConnectorMetaInput {
@@ -43,7 +41,7 @@ pub struct BuildConnectorMetaInput {
4341
4442#[ derive( Debug , Clone , Serialize , Deserialize ) ]
4543pub struct BuildConnectorMetaOutput {
46- metadata : Vec < u8 > ,
44+ metadata : ConnectorMetadata ,
4745}
4846
4947#[ derive( Debug , Clone , Serialize , Deserialize ) ]
@@ -74,8 +72,7 @@ pub struct CreateSlotInput {
7472}
7573
7674#[ derive( Debug , Clone , Serialize , Deserialize ) ]
77- pub struct CreateSlotOutput {
78- }
75+ pub struct CreateSlotOutput { }
7976
8077#[ derive( Debug ) ]
8178pub struct KvConnectorLeaderRecorder {
@@ -109,9 +106,11 @@ impl KvConnectorLeaderRecorder {
109106 let output_path = "/tmp/records.jsonl" ;
110107 tracing:: info!( "recording events to {}" , output_path) ;
111108
112- let recorder = drt. runtime ( ) . primary ( ) . block_on ( async {
113- Recorder :: new ( token, & output_path, None , None , None ) . await
114- } ) . unwrap ( ) ;
109+ let recorder = drt
110+ . runtime ( )
111+ . primary ( )
112+ . block_on ( async { Recorder :: new ( token, & output_path, None , None , None ) . await } )
113+ . unwrap ( ) ;
115114
116115 let connector_leader = KvConnectorLeader {
117116 slot_manager : ConnectorSlotManager :: new ( block_manager. clone ( ) , leader, drt. clone ( ) ) ,
@@ -124,7 +123,10 @@ impl KvConnectorLeaderRecorder {
124123 let ( unbounded_tx, unbounded_rx) = mpsc:: unbounded_channel ( ) ;
125124 let recorder_tx = recorder. event_sender ( ) ;
126125
127- let _ = drt. runtime ( ) . primary ( ) . spawn ( Self :: forward_unbounded_to_sender ( unbounded_rx, recorder_tx) ) ;
126+ // todo(kvbm): make this a critical task
127+ drt. runtime ( )
128+ . primary ( )
129+ . spawn ( Self :: forward_unbounded_to_sender ( unbounded_rx, recorder_tx) ) ;
128130
129131 Self {
130132 _recorder : recorder,
@@ -160,16 +162,22 @@ impl Leader for KvConnectorLeaderRecorder {
160162 ) -> anyhow:: Result < ( usize , bool ) > {
161163 let input_copy = GetNumNewMatchedTokensInput {
162164 request_id : request_id. clone ( ) ,
163- request_num_tokens : request_num_tokens . clone ( ) ,
164- num_computed_tokens : num_computed_tokens . clone ( ) ,
165+ request_num_tokens,
166+ num_computed_tokens,
165167 } ;
166- let output = self . connector_leader . get_num_new_matched_tokens ( request_id, request_num_tokens, num_computed_tokens) ;
167- let output_copy = output. as_ref ( ) . unwrap ( ) . clone ( ) ;
168- let _ = self . unbounded_tx . send ( Action :: GetNumNewMatchedTokens ( input_copy, GetNumNewMatchedTokensOutput {
169- num_new_matched_tokens : output_copy. 0 ,
170- has_matched : output_copy. 1 ,
171- } ) ) ;
172- output
168+ let output = self . connector_leader . get_num_new_matched_tokens (
169+ request_id,
170+ request_num_tokens,
171+ num_computed_tokens,
172+ ) ?;
173+ let _ = self . unbounded_tx . send ( Action :: GetNumNewMatchedTokens (
174+ input_copy,
175+ GetNumNewMatchedTokensOutput {
176+ num_new_matched_tokens : output. 0 ,
177+ has_matched : output. 1 ,
178+ } ,
179+ ) ) ;
180+ Ok ( output)
173181 }
174182
175183 /// We drop the need to pass in the KvCacheBlocks and the num_external_tokens as they are captured
@@ -186,10 +194,17 @@ impl Leader for KvConnectorLeaderRecorder {
186194 let input_copy = UpdateStateAfterAllocInput {
187195 request_id : request_id. clone ( ) ,
188196 block_ids : block_ids. clone ( ) ,
189- num_external_tokens : num_external_tokens . clone ( ) ,
197+ num_external_tokens,
190198 } ;
191- let _ = self . connector_leader . update_state_after_alloc ( request_id, block_ids, num_external_tokens) . unwrap ( ) ;
192- let _ = self . unbounded_tx . send ( Action :: UpdateStateAfterAlloc ( input_copy, UpdateStateAfterAllocOutput { } ) ) ;
199+ self . connector_leader . update_state_after_alloc (
200+ request_id,
201+ block_ids,
202+ num_external_tokens,
203+ ) ?;
204+ let _ = self . unbounded_tx . send ( Action :: UpdateStateAfterAlloc (
205+ input_copy,
206+ UpdateStateAfterAllocOutput { } ,
207+ ) ) ;
193208 Ok ( ( ) )
194209 }
195210
@@ -200,39 +215,48 @@ impl Leader for KvConnectorLeaderRecorder {
200215 let input_copy = BuildConnectorMetaInput {
201216 scheduler_output : scheduler_output. clone ( ) ,
202217 } ;
203- let output = self . connector_leader . build_connector_metadata ( scheduler_output) ;
204- let output_copy = output. as_ref ( ) . unwrap ( ) . clone ( ) ;
205- let _ = self . unbounded_tx
206- . send ( Action :: BuildConnectorMeta ( input_copy, BuildConnectorMetaOutput {
207- metadata : output_copy,
208- } ) ) ;
209- output
218+ let output = self
219+ . connector_leader
220+ . build_connector_metadata ( scheduler_output) ?;
221+ let _ = self . unbounded_tx . send ( Action :: BuildConnectorMeta (
222+ input_copy,
223+ BuildConnectorMetaOutput {
224+ metadata : serde_json:: from_slice ( & output) ?,
225+ } ,
226+ ) ) ;
227+ Ok ( output)
210228 }
211229
212- fn request_finished ( & mut self , request_id : String , block_ids : Vec < BlockId > ) -> anyhow:: Result < bool > {
230+ fn request_finished (
231+ & mut self ,
232+ request_id : String ,
233+ block_ids : Vec < BlockId > ,
234+ ) -> anyhow:: Result < bool > {
213235 let input_copy = RequestFinishedInput {
214236 request_id : request_id. clone ( ) ,
215237 block_ids : block_ids. clone ( ) ,
216238 } ;
217- let output = self . connector_leader . request_finished ( request_id, block_ids) ;
218- let output_copy = output. as_ref ( ) . unwrap ( ) . clone ( ) ;
219- let _ = self . unbounded_tx
220- . send ( Action :: RequestFinished ( input_copy, RequestFinishedOutput {
221- is_finished : output_copy,
222- } ) ) ;
223- output
239+ let output = self
240+ . connector_leader
241+ . request_finished ( request_id, block_ids) ?;
242+ let _ = self . unbounded_tx . send ( Action :: RequestFinished (
243+ input_copy,
244+ RequestFinishedOutput {
245+ is_finished : output,
246+ } ,
247+ ) ) ;
248+ Ok ( output)
224249 }
225250
226251 fn has_slot ( & self , request_id : String ) -> bool {
227252 let input_copy = HasSlotInput {
228253 request_id : request_id. clone ( ) ,
229254 } ;
230255 let output = self . connector_leader . has_slot ( request_id) ;
231- let output_copy = output. clone ( ) ;
232- let _ = self . unbounded_tx
233- . send ( Action :: HasSlot ( input_copy, HasSlotOutput {
234- result : output_copy,
235- } ) ) ;
256+ let _ = self . unbounded_tx . send ( Action :: HasSlot (
257+ input_copy,
258+ HasSlotOutput { result : output } ,
259+ ) ) ;
236260 output
237261 }
238262
@@ -244,7 +268,9 @@ impl Leader for KvConnectorLeaderRecorder {
244268 tokens : tokens. clone ( ) ,
245269 } ;
246270 let _ = self . connector_leader . create_slot ( request, tokens) ;
247- let _ = self . unbounded_tx . send ( Action :: CreateSlot ( input_copy, CreateSlotOutput { } ) ) ;
271+ let _ = self
272+ . unbounded_tx
273+ . send ( Action :: CreateSlot ( input_copy, CreateSlotOutput { } ) ) ;
248274 Ok ( ( ) )
249275 }
250- }
276+ }
0 commit comments