Skip to content

feat(a2a): streaming TaskProcessor routed through full agent loop #583

@bug-ops

Description

@bug-ops

Parent

Part of #581. Depends on LoopbackChannel issue.

Summary

Refactor TaskProcessor trait to support streaming and rewrite AgentTaskProcessor to route A2A requests through the full agent loop (skills, memory, tools, MCP) instead of direct provider.chat().

Scope

Files: crates/zeph-a2a/src/server/handlers.rs, crates/zeph-a2a/src/server/mod.rs, src/main.rs

TaskProcessor trait change

// Current
async fn process(&self, task_id: &str, message: Message) -> Result<ProcessResult>;

// New
async fn process(&self, task_id: &str, message: Message, event_tx: Sender<TaskEvent>) -> Result<()>;

Processor emits events via event_tx:

  • TaskArtifactUpdateEvent { is_final: false } per token chunk
  • TaskStatusUpdateEvent { state: Working } on status changes
  • TaskStatusUpdateEvent { state: Completed, is_final: true } on completion

SSE handler update

Pipe event_tx → SSE stream directly. Remove current "process() then emit status" pattern.

AgentTaskProcessor rewrite

  • Holds LoopbackHandle from LoopbackChannel
  • process(): sends user text via input_tx, reads LoopbackEvents from output_rx
  • Maps: ChunkArtifactUpdate, FlushStatusUpdate(Completed), StatusStatusUpdate(Working)

Acceptance Criteria

  • A2A message/send returns agent response with skills/memory/tools active (not bare LLM)
  • A2A message/stream emits per-token ArtifactUpdate SSE events
  • Existing A2A tests updated for new trait signature

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions