5959 ToolCallItem ,
6060 ToolCallItemTypes ,
6161 TResponseInputItem ,
62+ normalize_function_call_output_payload ,
6263)
6364from .lifecycle import AgentHooksBase , RunHooks , RunHooksBase
6465from .logger import logger
@@ -742,10 +743,15 @@ async def run(
742743 # Resuming from a saved state
743744 run_state = cast (RunState [TContext ], input )
744745 original_user_input = run_state ._original_input
745- # Normalize items to remove top-level providerData (API doesn't accept it there)
746+ # Normalize items to remove top-level providerData and convert protocol to API format
747+ # Then filter incomplete function calls to ensure API compatibility
746748 if isinstance (original_user_input , list ):
747- prepared_input : str | list [TResponseInputItem ] = AgentRunner ._normalize_input_items (
748- original_user_input
749+ # Normalize first (converts protocol format to API format, normalizes field names)
750+ normalized = AgentRunner ._normalize_input_items (original_user_input )
751+ # Filter incomplete function calls after normalization
752+ # This ensures consistent field names (call_id vs callId) for matching
753+ prepared_input : str | list [TResponseInputItem ] = (
754+ AgentRunner ._filter_incomplete_function_calls (normalized )
749755 )
750756 else :
751757 prepared_input = original_user_input
@@ -787,12 +793,16 @@ async def run(
787793 if is_resumed_state and run_state is not None :
788794 # Restore state from RunState
789795 current_turn = run_state ._current_turn
790- # Normalize original_input to remove top-level providerData
791- # ( API doesn't accept it there)
796+ # Normalize original_input: remove top-level providerData,
797+ # convert protocol to API format, then filter incomplete function calls
792798 raw_original_input = run_state ._original_input
793799 if isinstance (raw_original_input , list ):
800+ # Normalize first (converts protocol to API format, normalizes field names)
801+ normalized = AgentRunner ._normalize_input_items (raw_original_input )
802+ # Filter incomplete function calls after normalization
803+ # This ensures consistent field names (call_id vs callId) for matching
794804 original_input : str | list [TResponseInputItem ] = (
795- AgentRunner ._normalize_input_items ( raw_original_input )
805+ AgentRunner ._filter_incomplete_function_calls ( normalized )
796806 )
797807 else :
798808 original_input = raw_original_input
@@ -861,8 +871,40 @@ async def run(
861871 )
862872 in output_call_ids
863873 ]
864- # Save both function_call and function_call_output together
865- items_to_save = tool_call_items + tool_output_items
874+ # Check which items are already in the session to avoid duplicates
875+ # Get existing items from session and extract their call_ids
876+ existing_items = await session .get_items ()
877+ existing_call_ids : set [str ] = set ()
878+ for existing_item in existing_items :
879+ if isinstance (existing_item , dict ):
880+ item_type = existing_item .get ("type" )
881+ if item_type in ("function_call" , "function_call_output" ):
882+ existing_call_id = existing_item .get (
883+ "call_id"
884+ ) or existing_item .get ("callId" )
885+ if existing_call_id and isinstance (existing_call_id , str ):
886+ existing_call_ids .add (existing_call_id )
887+
888+ # Filter out items that are already in the session
889+ items_to_save : list [RunItem ] = []
890+ for item in tool_call_items + tool_output_items :
891+ item_call_id : str | None = None
892+ if isinstance (item .raw_item , dict ):
893+ raw_call_id = item .raw_item .get ("call_id" ) or item .raw_item .get (
894+ "callId"
895+ )
896+ item_call_id = (
897+ cast (str | None , raw_call_id ) if raw_call_id else None
898+ )
899+ elif hasattr (item .raw_item , "call_id" ):
900+ item_call_id = cast (
901+ str | None , getattr (item .raw_item , "call_id" , None )
902+ )
903+
904+ # Only save if not already in session
905+ if item_call_id is None or item_call_id not in existing_call_ids :
906+ items_to_save .append (item )
907+
866908 if items_to_save :
867909 await self ._save_result_to_session (session , [], items_to_save )
868910 # Clear the current step since we've handled it
@@ -1419,11 +1461,12 @@ async def _start_streaming(
14191461 # Resuming from state - normalize items to remove top-level providerData
14201462 # and filter incomplete function_call pairs
14211463 if isinstance (starting_input , list ):
1422- # Filter incomplete function_call pairs before normalizing
1423- filtered = AgentRunner ._filter_incomplete_function_calls (starting_input )
1424- prepared_input : str | list [TResponseInputItem ] = (
1425- AgentRunner ._normalize_input_items (filtered )
1426- )
1464+ # Normalize field names first (camelCase -> snake_case) to ensure
1465+ # consistent field names for filtering
1466+ normalized_input = AgentRunner ._normalize_input_items (starting_input )
1467+ # Filter incomplete function_call pairs after normalizing
1468+ filtered = AgentRunner ._filter_incomplete_function_calls (normalized_input )
1469+ prepared_input : str | list [TResponseInputItem ] = filtered
14271470 else :
14281471 prepared_input = starting_input
14291472 else :
@@ -2600,33 +2643,67 @@ def _normalize_input_items(items: list[TResponseInputItem]) -> list[TResponseInp
26002643 """
26012644 from .run_state import _normalize_field_names
26022645
2646+ def _coerce_to_dict (value : TResponseInputItem ) -> dict [str , Any ] | None :
2647+ if isinstance (value , dict ):
2648+ return dict (value )
2649+ if hasattr (value , "model_dump" ):
2650+ try :
2651+ return cast (dict [str , Any ], value .model_dump (exclude_unset = True ))
2652+ except Exception :
2653+ return None
2654+ return None
2655+
26032656 normalized : list [TResponseInputItem ] = []
26042657 for item in items :
2605- if isinstance (item , dict ):
2606- # Create a copy to avoid modifying the original
2607- normalized_item = dict (item )
2608- # Remove top-level providerData/provider_data - these should only be in content
2609- # The API doesn't accept providerData at the top level of input items
2610- normalized_item .pop ("providerData" , None )
2611- normalized_item .pop ("provider_data" , None )
2612- # Normalize item type: API expects 'function_call_output',
2613- # not 'function_call_result'
2614- item_type = normalized_item .get ("type" )
2615- if item_type == "function_call_result" :
2616- normalized_item ["type" ] = "function_call_output"
2617- item_type = "function_call_output"
2618- # Remove invalid fields based on item type
2619- # function_call_output items should not have 'name' field
2620- if item_type == "function_call_output" :
2621- normalized_item .pop ("name" , None )
2622- # Normalize field names (callId -> call_id, responseId -> response_id)
2623- normalized_item = _normalize_field_names (normalized_item )
2624- normalized .append (cast (TResponseInputItem , normalized_item ))
2625- else :
2626- # For non-dict items, keep as-is (they should already be in correct format)
2658+ coerced = _coerce_to_dict (item )
2659+ if coerced is None :
26272660 normalized .append (item )
2661+ continue
2662+
2663+ normalized_item = dict (coerced )
2664+ normalized_item .pop ("providerData" , None )
2665+ normalized_item .pop ("provider_data" , None )
2666+ item_type = normalized_item .get ("type" )
2667+ if item_type == "function_call_result" :
2668+ normalized_item ["type" ] = "function_call_output"
2669+ item_type = "function_call_output"
2670+ if item_type == "function_call_output" :
2671+ normalized_item .pop ("name" , None )
2672+ normalized_item .pop ("status" , None )
2673+ normalized_item = normalize_function_call_output_payload (normalized_item )
2674+ normalized_item = _normalize_field_names (normalized_item )
2675+ normalized .append (cast (TResponseInputItem , normalized_item ))
26282676 return normalized
26292677
2678+ @staticmethod
2679+ def _ensure_api_input_item (item : TResponseInputItem ) -> TResponseInputItem :
2680+ """Ensure item is in API format (function_call_output, snake_case fields)."""
2681+
2682+ def _coerce_dict (value : TResponseInputItem ) -> dict [str , Any ] | None :
2683+ if isinstance (value , dict ):
2684+ return dict (value )
2685+ if hasattr (value , "model_dump" ):
2686+ try :
2687+ return cast (dict [str , Any ], value .model_dump (exclude_unset = True ))
2688+ except Exception :
2689+ return None
2690+ return None
2691+
2692+ coerced = _coerce_dict (item )
2693+ if coerced is None :
2694+ return item
2695+
2696+ normalized = dict (coerced )
2697+ item_type = normalized .get ("type" )
2698+ if item_type == "function_call_result" :
2699+ normalized ["type" ] = "function_call_output"
2700+ normalized .pop ("name" , None )
2701+ normalized .pop ("status" , None )
2702+
2703+ if normalized .get ("type" ) == "function_call_output" :
2704+ normalized = normalize_function_call_output_payload (normalized )
2705+ return cast (TResponseInputItem , normalized )
2706+
26302707 @classmethod
26312708 async def _prepare_input_with_session (
26322709 cls ,
@@ -2651,13 +2728,19 @@ async def _prepare_input_with_session(
26512728 # Get previous conversation history
26522729 history = await session .get_items ()
26532730
2731+ # Convert protocol format items from session to API format.
2732+ # TypeScript may save protocol format (function_call_result) to sessions,
2733+ # but the API expects API format (function_call_output).
2734+ converted_history = [cls ._ensure_api_input_item (item ) for item in history ]
2735+
26542736 # Convert input to list format
26552737 new_input_list = ItemHelpers .input_to_new_input_list (input )
2738+ new_input_list = [cls ._ensure_api_input_item (item ) for item in new_input_list ]
26562739
26572740 if session_input_callback is None :
2658- merged = history + new_input_list
2741+ merged = converted_history + new_input_list
26592742 elif callable (session_input_callback ):
2660- res = session_input_callback (history , new_input_list )
2743+ res = session_input_callback (converted_history , new_input_list )
26612744 if inspect .isawaitable (res ):
26622745 merged = await res
26632746 else :
@@ -2711,10 +2794,19 @@ async def _save_result_to_session(
27112794 return
27122795
27132796 # Convert original input to list format if needed
2714- input_list = ItemHelpers .input_to_new_input_list (original_input )
2797+ input_list = [
2798+ cls ._ensure_api_input_item (item )
2799+ for item in ItemHelpers .input_to_new_input_list (original_input )
2800+ ]
2801+
2802+ # Filter out tool_approval_item items before converting to input format
2803+ # These items represent pending approvals and shouldn't be sent to the API
2804+ items_to_convert = [item for item in new_items if item .type != "tool_approval_item" ]
27152805
27162806 # Convert new items to input format
2717- new_items_as_input = [item .to_input_item () for item in new_items ]
2807+ new_items_as_input = [
2808+ cls ._ensure_api_input_item (item .to_input_item ()) for item in items_to_convert
2809+ ]
27182810
27192811 # Save all items from this turn
27202812 items_to_save = input_list + new_items_as_input
0 commit comments