Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion openhands-sdk/openhands/sdk/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,9 @@ def step(
pending_actions = ConversationState.get_unmatched_actions(state.events)
if pending_actions:
logger.info(
"Confirmation mode: Executing %d pending action(s)",
"Confirmation mode: Executing %d pending action(s): %s",
len(pending_actions),
[(a.id, a.tool_call_id, a.tool_name) for a in pending_actions],
)
self._execute_actions(conversation, pending_actions, on_event)
return
Expand Down Expand Up @@ -626,6 +627,11 @@ def _execute_action_event(
tool_name=tool.name,
tool_call_id=action_event.tool_call.id,
)
logger.debug(
f"Creating ObservationEvent: id={obs_event.id}, "
f"action_id={action_event.id}, tool_call_id={action_event.tool_call.id}, "
f"tool={tool.name}"
)
on_event(obs_event)

# Set conversation state
Expand Down
21 changes: 21 additions & 0 deletions openhands-sdk/openhands/sdk/context/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ def filter_unmatched_tool_calls(
but don't have matching pairs. Also enforces batch atomicity - if any
ActionEvent in a batch is filtered out, all ActionEvents in that batch
are also filtered out.

Additionally filters out duplicate ObservationEvents with the same
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've got a PR in progress (#1649) that's trying to untangle all the structural assumptions and loops in the view. It'd be helpful for that initiative if we weren't tacking more functionality onto the existing functions.

Can we break this functionality out into a separate function (something like filter_duplicate_events) that gets called first thing in View.from_events?

tool_call_id to prevent malformed message histories that would cause
LLM API errors (e.g., Claude requires exactly one tool_result per tool_use).
"""
action_tool_call_ids = View._get_action_tool_call_ids(events)
observation_tool_call_ids = View._get_observation_tool_call_ids(events)
Expand Down Expand Up @@ -368,13 +372,30 @@ def filter_unmatched_tool_calls(
)

# Filter out removed events
# Also track seen observation tool_call_ids to filter duplicates
seen_observation_tool_call_ids: set[ToolCallID] = set()
result = []
for event in events:
if event.id in removed_event_ids:
continue
if isinstance(event, ObservationBaseEvent):
if event.tool_call_id in tool_call_ids_to_remove:
continue
# Filter duplicate observations with the same tool_call_id
# This enables recovery from stuck conversations where duplicate
# observations were persisted in the event stream
if event.tool_call_id is not None:
if event.tool_call_id in seen_observation_tool_call_ids:
logger.warning(
f"DUPLICATE_OBSERVATION_FILTERED: "
f"tool_call_id={event.tool_call_id}, "
f"event_id={event.id}, "
f"event_type={type(event).__name__}. "
f"This may indicate a bug in event stream management "
f"during conversation resume."
)
continue
seen_observation_tool_call_ids.add(event.tool_call_id)
result.append(event)

return result
Expand Down
21 changes: 19 additions & 2 deletions openhands-sdk/openhands/sdk/conversation/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,18 +390,35 @@ def get_unmatched_actions(events: Sequence[Event]) -> list[ActionEvent]:
List of ActionEvent objects that don't have corresponding observations,
in chronological order
"""
observed_action_ids = set()
observed_action_ids: set[str] = set()
observed_tool_call_ids: set[str] = set()
unmatched_actions = []
# Search in reverse - recent events are more likely to be unmatched
for event in reversed(events):
if isinstance(event, (ObservationEvent, UserRejectObservation)):
observed_action_ids.add(event.action_id)
# Also track tool_call_id to prevent re-execution of actions
# that already have observations (fixes duplicate observation bug)
if event.tool_call_id is not None:
observed_tool_call_ids.add(event.tool_call_id)
elif isinstance(event, ActionEvent):
# Only executable actions (validated) are considered pending
if event.action is not None and event.id not in observed_action_ids:
# Check both action_id AND tool_call_id to ensure we don't
# re-execute actions that already have observations
is_observed = event.id in observed_action_ids or (
event.tool_call_id is not None
and event.tool_call_id in observed_tool_call_ids
)
if event.action is not None and not is_observed:
# Insert at beginning to maintain chronological order in result
unmatched_actions.insert(0, event)

if unmatched_actions:
logger.debug(
f"get_unmatched_actions found {len(unmatched_actions)} unmatched: "
f"{[(a.id, a.tool_call_id, a.tool_name) for a in unmatched_actions]}"
)

return unmatched_actions

# ===== FIFOLock delegation methods =====
Expand Down
183 changes: 183 additions & 0 deletions tests/sdk/context/test_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -969,3 +969,186 @@ def test_should_keep_event_other_event_types() -> None:
message_event, action_tool_call_ids, observation_tool_call_ids
)
assert result is True


def test_filter_unmatched_tool_calls_duplicate_observations() -> None:
"""Test that duplicate observations with the same tool_call_id are filtered.

This reproduces the issue from GitHub issue #1782 where duplicate
ObservationEvents with the same tool_call_id cause LLM API errors
(Claude requires exactly one tool_result per tool_use).

The defensive fix should keep only the first observation for each
tool_call_id and filter out subsequent duplicates.
"""
# Create ActionEvent
action_event = create_autospec(ActionEvent, instance=True)
action_event.tool_call_id = "call_1"
action_event.id = "action_1"
action_event.llm_response_id = "response_1"

# Create first observation (should be kept)
observation_event_1 = create_autospec(ObservationEvent, instance=True)
observation_event_1.tool_call_id = "call_1"
observation_event_1.id = "obs_1"

# Create duplicate observation with same tool_call_id (should be filtered)
observation_event_duplicate = create_autospec(ObservationEvent, instance=True)
observation_event_duplicate.tool_call_id = "call_1"
observation_event_duplicate.id = "obs_duplicate"

# Create message events
msg1 = message_event("First message")
msg2 = message_event("Second message")

events = [
msg1,
action_event,
observation_event_1,
msg2,
observation_event_duplicate, # Duplicate - should be filtered
]

result = View.filter_unmatched_tool_calls(events) # type: ignore

# Should keep the action, first observation, and both message events
# Duplicate observation should be filtered out
assert len(result) == 4
assert action_event in result
assert observation_event_1 in result
assert msg1 in result
assert msg2 in result
assert observation_event_duplicate not in result


def test_filter_unmatched_tool_calls_multiple_duplicate_observations() -> None:
"""Test filtering multiple duplicate observations for the same tool_call_id.

Ensures that only the first observation is kept even when multiple
duplicates exist.
"""
# Create ActionEvent
action_event = create_autospec(ActionEvent, instance=True)
action_event.tool_call_id = "call_1"
action_event.id = "action_1"
action_event.llm_response_id = "response_1"

# Create first observation (should be kept)
observation_event_1 = create_autospec(ObservationEvent, instance=True)
observation_event_1.tool_call_id = "call_1"
observation_event_1.id = "obs_1"

# Create multiple duplicates (all should be filtered)
observation_event_dup1 = create_autospec(ObservationEvent, instance=True)
observation_event_dup1.tool_call_id = "call_1"
observation_event_dup1.id = "obs_dup1"

observation_event_dup2 = create_autospec(ObservationEvent, instance=True)
observation_event_dup2.tool_call_id = "call_1"
observation_event_dup2.id = "obs_dup2"

observation_event_dup3 = create_autospec(ObservationEvent, instance=True)
observation_event_dup3.tool_call_id = "call_1"
observation_event_dup3.id = "obs_dup3"

events = [
action_event,
observation_event_1,
observation_event_dup1,
observation_event_dup2,
observation_event_dup3,
]

result = View.filter_unmatched_tool_calls(events) # type: ignore

# Should keep only action and first observation
assert len(result) == 2
assert action_event in result
assert observation_event_1 in result
assert observation_event_dup1 not in result
assert observation_event_dup2 not in result
assert observation_event_dup3 not in result


def test_filter_unmatched_tool_calls_independent_tool_calls_with_duplicates() -> None:
"""Test that duplicate filtering is per-tool_call_id.

Multiple different tool_call_ids should each be allowed one observation,
even when some have duplicates.
"""
# First action-observation pair
action_event_1 = create_autospec(ActionEvent, instance=True)
action_event_1.tool_call_id = "call_1"
action_event_1.id = "action_1"
action_event_1.llm_response_id = "response_1"

observation_event_1 = create_autospec(ObservationEvent, instance=True)
observation_event_1.tool_call_id = "call_1"
observation_event_1.id = "obs_1"

# Duplicate for first call
observation_event_1_dup = create_autospec(ObservationEvent, instance=True)
observation_event_1_dup.tool_call_id = "call_1"
observation_event_1_dup.id = "obs_1_dup"

# Second action-observation pair (no duplicates)
action_event_2 = create_autospec(ActionEvent, instance=True)
action_event_2.tool_call_id = "call_2"
action_event_2.id = "action_2"
action_event_2.llm_response_id = "response_2"

observation_event_2 = create_autospec(ObservationEvent, instance=True)
observation_event_2.tool_call_id = "call_2"
observation_event_2.id = "obs_2"

events = [
action_event_1,
observation_event_1,
action_event_2,
observation_event_1_dup, # Duplicate for call_1
observation_event_2,
]

result = View.filter_unmatched_tool_calls(events) # type: ignore

# Should keep both action-observation pairs, filter the duplicate
assert len(result) == 4
assert action_event_1 in result
assert observation_event_1 in result
assert action_event_2 in result
assert observation_event_2 in result
assert observation_event_1_dup not in result


def test_filter_unmatched_tool_calls_duplicate_none_tool_call_id() -> None:
"""Test observations with None tool_call_id aren't affected by dup filtering.

Observations with None tool_call_id should still be filtered based on
the existing matching logic, not the duplicate detection.
"""
# Action without tool_call_id
action_event = create_autospec(ActionEvent, instance=True)
action_event.tool_call_id = None
action_event.id = "action_none"
action_event.llm_response_id = "response_1"

# Two observations with None tool_call_id (these get filtered by matching logic)
observation_event_1 = create_autospec(ObservationEvent, instance=True)
observation_event_1.tool_call_id = None
observation_event_1.id = "obs_none_1"

observation_event_2 = create_autospec(ObservationEvent, instance=True)
observation_event_2.tool_call_id = None
observation_event_2.id = "obs_none_2"

events = [
action_event,
observation_event_1,
observation_event_2,
]

result = View.filter_unmatched_tool_calls(events) # type: ignore

# All events with None tool_call_id should be filtered by matching logic
# (not duplicate detection)
assert len(result) == 0
Loading