@@ -28,8 +28,7 @@ use crate::models::ResponseItem;
2828use crate :: openai_tools:: create_tools_json;
2929use crate :: util:: backoff;
3030
31- /// Implementation for the classic Chat Completions API. This is intentionally
32- /// minimal: we only stream back plain assistant text.
31+ /// Implementation for the classic Chat Completions API.
3332pub ( crate ) async fn stream_chat_completions (
3433 prompt : & Prompt ,
3534 model : & str ,
@@ -43,17 +42,67 @@ pub(crate) async fn stream_chat_completions(
4342 messages. push ( json ! ( { "role" : "system" , "content" : full_instructions} ) ) ;
4443
4544 for item in & prompt. input {
46- if let ResponseItem :: Message { role, content } = item {
47- let mut text = String :: new ( ) ;
48- for c in content {
49- match c {
50- ContentItem :: InputText { text : t } | ContentItem :: OutputText { text : t } => {
51- text. push_str ( t) ;
45+ match item {
46+ ResponseItem :: Message { role, content } => {
47+ let mut text = String :: new ( ) ;
48+ for c in content {
49+ match c {
50+ ContentItem :: InputText { text : t }
51+ | ContentItem :: OutputText { text : t } => {
52+ text. push_str ( t) ;
53+ }
54+ _ => { }
5255 }
53- _ => { }
5456 }
57+ messages. push ( json ! ( { "role" : role, "content" : text} ) ) ;
58+ }
59+ ResponseItem :: FunctionCall {
60+ name,
61+ arguments,
62+ call_id,
63+ } => {
64+ messages. push ( json ! ( {
65+ "role" : "assistant" ,
66+ "content" : null,
67+ "tool_calls" : [ {
68+ "id" : call_id,
69+ "type" : "function" ,
70+ "function" : {
71+ "name" : name,
72+ "arguments" : arguments,
73+ }
74+ } ]
75+ } ) ) ;
76+ }
77+ ResponseItem :: LocalShellCall {
78+ id,
79+ call_id : _,
80+ status,
81+ action,
82+ } => {
83+ // Confirm with API team.
84+ messages. push ( json ! ( {
85+ "role" : "assistant" ,
86+ "content" : null,
87+ "tool_calls" : [ {
88+ "id" : id. clone( ) . unwrap_or_else( || "" . to_string( ) ) ,
89+ "type" : "local_shell_call" ,
90+ "status" : status,
91+ "action" : action,
92+ } ]
93+ } ) ) ;
94+ }
95+ ResponseItem :: FunctionCallOutput { call_id, output } => {
96+ messages. push ( json ! ( {
97+ "role" : "tool" ,
98+ "tool_call_id" : call_id,
99+ "content" : output. content,
100+ } ) ) ;
101+ }
102+ ResponseItem :: Reasoning { .. } | ResponseItem :: Other => {
103+ // Omit these items from the conversation history.
104+ continue ;
55105 }
56- messages. push ( json ! ( { "role" : role, "content" : text} ) ) ;
57106 }
58107 }
59108
@@ -165,6 +214,21 @@ where
165214
166215 let idle_timeout = * OPENAI_STREAM_IDLE_TIMEOUT_MS ;
167216
217+ // State to accumulate a function call across streaming chunks.
218+ // OpenAI may split the `arguments` string over multiple `delta` events
219+ // until the chunk whose `finish_reason` is `tool_calls` is emitted. We
220+ // keep collecting the pieces here and forward a single
221+ // `ResponseItem::FunctionCall` once the call is complete.
222+ #[ derive( Default ) ]
223+ struct FunctionCallState {
224+ name : Option < String > ,
225+ arguments : String ,
226+ call_id : Option < String > ,
227+ active : bool ,
228+ }
229+
230+ let mut fn_call_state = FunctionCallState :: default ( ) ;
231+
168232 loop {
169233 let sse = match timeout ( idle_timeout, stream. next ( ) ) . await {
170234 Ok ( Some ( Ok ( ev) ) ) => ev,
@@ -204,23 +268,89 @@ where
204268 Ok ( v) => v,
205269 Err ( _) => continue ,
206270 } ;
271+ trace ! ( "chat_completions received SSE chunk: {chunk:?}" ) ;
272+
273+ let choice_opt = chunk. get ( "choices" ) . and_then ( |c| c. get ( 0 ) ) ;
274+
275+ if let Some ( choice) = choice_opt {
276+ // Handle assistant content tokens.
277+ if let Some ( content) = choice
278+ . get ( "delta" )
279+ . and_then ( |d| d. get ( "content" ) )
280+ . and_then ( |c| c. as_str ( ) )
281+ {
282+ let item = ResponseItem :: Message {
283+ role : "assistant" . to_string ( ) ,
284+ content : vec ! [ ContentItem :: OutputText {
285+ text: content. to_string( ) ,
286+ } ] ,
287+ } ;
288+
289+ let _ = tx_event. send ( Ok ( ResponseEvent :: OutputItemDone ( item) ) ) . await ;
290+ }
291+
292+ // Handle streaming function / tool calls.
293+ if let Some ( tool_calls) = choice
294+ . get ( "delta" )
295+ . and_then ( |d| d. get ( "tool_calls" ) )
296+ . and_then ( |tc| tc. as_array ( ) )
297+ {
298+ if let Some ( tool_call) = tool_calls. first ( ) {
299+ // Mark that we have an active function call in progress.
300+ fn_call_state. active = true ;
301+
302+ // Extract call_id if present.
303+ if let Some ( id) = tool_call. get ( "id" ) . and_then ( |v| v. as_str ( ) ) {
304+ fn_call_state. call_id . get_or_insert_with ( || id. to_string ( ) ) ;
305+ }
306+
307+ // Extract function details if present.
308+ if let Some ( function) = tool_call. get ( "function" ) {
309+ if let Some ( name) = function. get ( "name" ) . and_then ( |n| n. as_str ( ) ) {
310+ fn_call_state. name . get_or_insert_with ( || name. to_string ( ) ) ;
311+ }
312+
313+ if let Some ( args_fragment) =
314+ function. get ( "arguments" ) . and_then ( |a| a. as_str ( ) )
315+ {
316+ fn_call_state. arguments . push_str ( args_fragment) ;
317+ }
318+ }
319+ }
320+ }
321+
322+ // Emit end-of-turn when finish_reason signals completion.
323+ if let Some ( finish_reason) = choice. get ( "finish_reason" ) . and_then ( |v| v. as_str ( ) ) {
324+ match finish_reason {
325+ "tool_calls" if fn_call_state. active => {
326+ // Build the FunctionCall response item.
327+ let item = ResponseItem :: FunctionCall {
328+ name : fn_call_state. name . clone ( ) . unwrap_or_else ( || "" . to_string ( ) ) ,
329+ arguments : fn_call_state. arguments . clone ( ) ,
330+ call_id : fn_call_state. call_id . clone ( ) . unwrap_or_else ( String :: new) ,
331+ } ;
332+
333+ // Emit it downstream.
334+ let _ = tx_event. send ( Ok ( ResponseEvent :: OutputItemDone ( item) ) ) . await ;
335+ }
336+ "stop" => {
337+ // Regular turn without tool-call.
338+ }
339+ _ => { }
340+ }
207341
208- let content_opt = chunk
209- . get ( "choices" )
210- . and_then ( |c| c. get ( 0 ) )
211- . and_then ( |c| c. get ( "delta" ) )
212- . and_then ( |d| d. get ( "content" ) )
213- . and_then ( |c| c. as_str ( ) ) ;
214-
215- if let Some ( content) = content_opt {
216- let item = ResponseItem :: Message {
217- role : "assistant" . to_string ( ) ,
218- content : vec ! [ ContentItem :: OutputText {
219- text: content. to_string( ) ,
220- } ] ,
221- } ;
222-
223- let _ = tx_event. send ( Ok ( ResponseEvent :: OutputItemDone ( item) ) ) . await ;
342+ // Emit Completed regardless of reason so the agent can advance.
343+ let _ = tx_event
344+ . send ( Ok ( ResponseEvent :: Completed {
345+ response_id : String :: new ( ) ,
346+ } ) )
347+ . await ;
348+
349+ // Prepare for potential next turn (should not happen in same stream).
350+ // fn_call_state = FunctionCallState::default();
351+
352+ return ; // End processing for this SSE stream.
353+ }
224354 }
225355 }
226356}
@@ -267,20 +397,28 @@ where
267397 Poll :: Ready ( None ) => return Poll :: Ready ( None ) ,
268398 Poll :: Ready ( Some ( Err ( e) ) ) => return Poll :: Ready ( Some ( Err ( e) ) ) ,
269399 Poll :: Ready ( Some ( Ok ( ResponseEvent :: OutputItemDone ( item) ) ) ) => {
270- // Accumulate *assistant* text but do not emit yet.
271- if let crate :: models:: ResponseItem :: Message { role, content } = & item {
272- if role == "assistant" {
400+ // If this is an incremental assistant message chunk, accumulate but
401+ // do NOT emit yet. Forward any other item (e.g. FunctionCall) right
402+ // away so downstream consumers see it.
403+
404+ let is_assistant_delta = matches ! ( & item, crate :: models:: ResponseItem :: Message { role, .. } if role == "assistant" ) ;
405+
406+ if is_assistant_delta {
407+ if let crate :: models:: ResponseItem :: Message { content, .. } = & item {
273408 if let Some ( text) = content. iter ( ) . find_map ( |c| match c {
274409 crate :: models:: ContentItem :: OutputText { text } => Some ( text) ,
275410 _ => None ,
276411 } ) {
277412 this. cumulative . push_str ( text) ;
278413 }
279414 }
415+
416+ // Swallow partial assistant chunk; keep polling.
417+ continue ;
280418 }
281419
282- // Swallow partial event; keep polling .
283- continue ;
420+ // Not an assistant message – forward immediately .
421+ return Poll :: Ready ( Some ( Ok ( ResponseEvent :: OutputItemDone ( item ) ) ) ) ;
284422 }
285423 Poll :: Ready ( Some ( Ok ( ResponseEvent :: Completed { response_id } ) ) ) => {
286424 if !this. cumulative . is_empty ( ) {
0 commit comments