@@ -22,6 +22,7 @@ use codex_protocol::models::ReasoningItemContent;
2222use codex_protocol:: models:: ResponseItem ;
2323use codex_protocol:: protocol:: SessionSource ;
2424use codex_protocol:: protocol:: SubAgentSource ;
25+ use codex_protocol:: protocol:: TokenUsage ;
2526use eventsource_stream:: Eventsource ;
2627use futures:: Stream ;
2728use futures:: StreamExt ;
@@ -490,6 +491,42 @@ async fn append_reasoning_text(
490491 . await ;
491492 }
492493}
494+
495+ fn parse_openai_usage ( usage : & serde_json:: Value ) -> Option < TokenUsage > {
496+ let prompt_tokens = usage
497+ . get ( "prompt_tokens" )
498+ . and_then ( |v| v. as_i64 ( ) )
499+ . unwrap_or ( 0 ) ;
500+ let completion_tokens = usage
501+ . get ( "completion_tokens" )
502+ . and_then ( |v| v. as_i64 ( ) )
503+ . unwrap_or ( 0 ) ;
504+ let total_tokens = usage
505+ . get ( "total_tokens" )
506+ . and_then ( |v| v. as_i64 ( ) )
507+ . unwrap_or ( 0 ) ;
508+
509+ let cached_input_tokens = usage
510+ . get ( "prompt_tokens_details" )
511+ . and_then ( |d| d. get ( "cached_tokens" ) )
512+ . and_then ( |v| v. as_i64 ( ) )
513+ . unwrap_or ( 0 ) ;
514+
515+ let reasoning_output_tokens = usage
516+ . get ( "completion_tokens_details" )
517+ . and_then ( |d| d. get ( "reasoning_tokens" ) )
518+ . and_then ( |v| v. as_i64 ( ) )
519+ . unwrap_or ( 0 ) ;
520+
521+ Some ( TokenUsage {
522+ input_tokens : prompt_tokens,
523+ cached_input_tokens,
524+ output_tokens : completion_tokens,
525+ reasoning_output_tokens,
526+ total_tokens,
527+ } )
528+ }
529+
493530/// Lightweight SSE processor for the Chat Completions streaming format. The
494531/// output is mapped onto Codex's internal [`ResponseEvent`] so that the rest
495532/// of the pipeline can stay agnostic of the underlying wire format.
@@ -519,6 +556,8 @@ async fn process_chat_sse<S>(
519556 let mut fn_call_state = FunctionCallState :: default ( ) ;
520557 let mut assistant_item: Option < ResponseItem > = None ;
521558 let mut reasoning_item: Option < ResponseItem > = None ;
559+ let mut token_usage: Option < TokenUsage > = None ;
560+ let mut response_id = String :: new ( ) ;
522561
523562 loop {
524563 let start = std:: time:: Instant :: now ( ) ;
@@ -538,8 +577,8 @@ async fn process_chat_sse<S>(
538577 // Stream closed gracefully – emit Completed with dummy id.
539578 let _ = tx_event
540579 . send ( Ok ( ResponseEvent :: Completed {
541- response_id : String :: new ( ) ,
542- token_usage : None ,
580+ response_id : response_id . clone ( ) ,
581+ token_usage : token_usage . clone ( ) ,
543582 } ) )
544583 . await ;
545584 return ;
@@ -569,8 +608,8 @@ async fn process_chat_sse<S>(
569608
570609 let _ = tx_event
571610 . send ( Ok ( ResponseEvent :: Completed {
572- response_id : String :: new ( ) ,
573- token_usage : None ,
611+ response_id : response_id . clone ( ) ,
612+ token_usage : token_usage . clone ( ) ,
574613 } ) )
575614 . await ;
576615 return ;
@@ -583,6 +622,16 @@ async fn process_chat_sse<S>(
583622 } ;
584623 trace ! ( "chat_completions received SSE chunk: {chunk:?}" ) ;
585624
625+ if let Some ( id) = chunk. get ( "id" ) . and_then ( |s| s. as_str ( ) ) {
626+ if response_id. is_empty ( ) {
627+ response_id = id. to_string ( ) ;
628+ }
629+ }
630+
631+ if let Some ( usage) = chunk. get ( "usage" ) {
632+ token_usage = parse_openai_usage ( usage) ;
633+ }
634+
586635 let choice_opt = chunk. get ( "choices" ) . and_then ( |c| c. get ( 0 ) ) ;
587636
588637 if let Some ( choice) = choice_opt {
@@ -713,8 +762,8 @@ async fn process_chat_sse<S>(
713762 // Emit Completed regardless of reason so the agent can advance.
714763 let _ = tx_event
715764 . send ( Ok ( ResponseEvent :: Completed {
716- response_id : String :: new ( ) ,
717- token_usage : None ,
765+ response_id : response_id . clone ( ) ,
766+ token_usage : token_usage . clone ( ) ,
718767 } ) )
719768 . await ;
720769
0 commit comments