Skip to content

Commit 5055069

Browse files
committed
feat: Add streaming task decomposition (backend only)
- Add streaming callbacks (on_stream_batch, on_stream_text) to workforce - Implement background task decomposition in chat_service - Support streaming task decomposition progress to frontend - Add coordinator context support for multi-turn conversations - Restore important logging from main branch
1 parent 644d3a9 commit 5055069

File tree

4 files changed

+256
-59
lines changed

4 files changed

+256
-59
lines changed

backend/app/service/chat_service.py

Lines changed: 151 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
TaskLock,
1818
delete_task_lock,
1919
set_current_task_id,
20+
ActionDecomposeProgressData,
21+
ActionDecomposeTextData,
2022
)
2123
from camel.toolkits import AgentCommunicationToolkit, ToolkitMessageIntegration
2224
from app.utils.toolkit.human_toolkit import HumanToolkit
@@ -267,6 +269,8 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
267269
last_completed_task_result = "" # Track the last completed task result
268270
summary_task_content = "" # Track task summary
269271
loop_iteration = 0
272+
event_loop = asyncio.get_running_loop()
273+
sub_tasks: list[Task] = []
270274

271275
logger.info("=" * 80)
272276
logger.info("🚀 [LIFECYCLE] step_solve STARTED", extra={"project_id": options.project_id, "task_id": options.task_id})
@@ -429,55 +433,116 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
429433
if len(options.attaches) > 0:
430434
camel_task.additional_info = {Path(file_path).name: file_path for file_path in options.attaches}
431435

436+
# Stream decomposition in background so queue items (decompose_text) are processed immediately
432437
logger.info(f"[NEW-QUESTION] 🧩 Starting task decomposition via workforce.eigent_make_sub_tasks")
433-
sub_tasks = await asyncio.to_thread(
434-
workforce.eigent_make_sub_tasks,
435-
camel_task,
436-
context_for_coordinator
437-
)
438-
logger.info(f"[NEW-QUESTION] ✅ Task decomposed into {len(sub_tasks)} subtasks")
438+
stream_state = {"subtasks": [], "seen_ids": set()}
439+
state_holder: dict[str, Any] = {"sub_tasks": [], "summary_task": ""}
439440

440-
logger.info(f"[NEW-QUESTION] Generating task summary")
441-
summary_task_agent = task_summary_agent(options)
442-
try:
443-
summary_task_content = await asyncio.wait_for(
444-
summary_task(summary_task_agent, camel_task), timeout=10
445-
)
446-
task_lock.summary_generated = True
447-
logger.info("[NEW-QUESTION] ✅ Summary generated successfully", extra={"project_id": options.project_id})
448-
except asyncio.TimeoutError:
449-
logger.warning("summary_task timeout", extra={"project_id": options.project_id, "task_id": options.task_id})
450-
# Fallback to a minimal summary to unblock UI
451-
fallback_name = "Task"
452-
content_preview = camel_task.content if hasattr(camel_task, "content") else ""
453-
if content_preview is None:
454-
content_preview = ""
455-
fallback_summary = (
456-
(content_preview[:80] + "...") if len(content_preview) > 80 else content_preview
457-
)
458-
summary_task_content = f"{fallback_name}|{fallback_summary}"
459-
task_lock.summary_generated = True
460-
461-
logger.info(f"[NEW-QUESTION] 📤 Sending to_sub_tasks SSE to frontend (task card)")
462-
logger.info(f"[NEW-QUESTION] to_sub_tasks data: task_id={camel_task.id}, summary={summary_task_content[:50]}..., subtasks_count={len(camel_task.subtasks)}")
463-
yield to_sub_tasks(camel_task, summary_task_content)
464-
logger.info(f"[NEW-QUESTION] ✅ to_sub_tasks SSE sent")
465-
# tracer.stop()
466-
# tracer.save("trace.json")
467-
468-
# Only auto-start in debug mode
469-
if env("debug") == "on":
470-
logger.info(f"[DEBUG] Auto-starting workforce in debug mode")
471-
task_lock.status = Status.processing
472-
task = asyncio.create_task(workforce.eigent_start(sub_tasks))
473-
task_lock.add_background_task(task)
441+
def on_stream_batch(new_tasks: list[Task], is_final: bool = False):
442+
fresh_tasks = [t for t in new_tasks if t.id not in stream_state["seen_ids"]]
443+
for t in fresh_tasks:
444+
stream_state["seen_ids"].add(t.id)
445+
stream_state["subtasks"].extend(fresh_tasks)
446+
447+
def on_stream_text(text_chunk: str):
448+
try:
449+
asyncio.run_coroutine_threadsafe(
450+
task_lock.put_queue(
451+
ActionDecomposeTextData(
452+
data={
453+
"project_id": options.project_id,
454+
"task_id": options.task_id,
455+
"content": text_chunk,
456+
}
457+
)
458+
),
459+
event_loop,
460+
)
461+
except Exception as e:
462+
logger.warning(f"Failed to stream decomposition text: {e}")
463+
464+
async def run_decomposition():
465+
nonlocal camel_task
466+
try:
467+
sub_tasks = await asyncio.to_thread(
468+
workforce.eigent_make_sub_tasks,
469+
camel_task,
470+
context_for_coordinator,
471+
on_stream_batch,
472+
on_stream_text,
473+
)
474+
if stream_state["subtasks"]:
475+
sub_tasks = stream_state["subtasks"]
476+
state_holder["sub_tasks"] = sub_tasks
477+
logger.info(f"[NEW-QUESTION] ✅ Task decomposed into {len(sub_tasks)} subtasks")
478+
try:
479+
setattr(task_lock, "decompose_sub_tasks", sub_tasks)
480+
except Exception:
481+
pass
482+
483+
logger.info(f"[NEW-QUESTION] Generating task summary")
484+
summary_task_agent = task_summary_agent(options)
485+
try:
486+
summary_task_content = await asyncio.wait_for(
487+
summary_task(summary_task_agent, camel_task), timeout=10
488+
)
489+
task_lock.summary_generated = True
490+
logger.info("[NEW-QUESTION] ✅ Summary generated successfully", extra={"project_id": options.project_id})
491+
except asyncio.TimeoutError:
492+
logger.warning("summary_task timeout", extra={"project_id": options.project_id, "task_id": options.task_id})
493+
task_lock.summary_generated = True
494+
fallback_name = "Task"
495+
content_preview = camel_task.content if hasattr(camel_task, "content") else ""
496+
if content_preview is None:
497+
content_preview = ""
498+
summary_task_content = (
499+
(content_preview[:80] + "...") if len(content_preview) > 80 else content_preview
500+
)
501+
summary_task_content = f"{fallback_name}|{summary_task_content}"
502+
except Exception:
503+
task_lock.summary_generated = True
504+
fallback_name = "Task"
505+
content_preview = camel_task.content if hasattr(camel_task, "content") else ""
506+
if content_preview is None:
507+
content_preview = ""
508+
summary_task_content = (
509+
(content_preview[:80] + "...") if len(content_preview) > 80 else content_preview
510+
)
511+
summary_task_content = f"{fallback_name}|{summary_task_content}"
512+
513+
state_holder["summary_task"] = summary_task_content
514+
try:
515+
setattr(task_lock, "summary_task_content", summary_task_content)
516+
except Exception:
517+
pass
518+
logger.info(f"[NEW-QUESTION] 📤 Sending to_sub_tasks SSE to frontend (task card)")
519+
logger.info(f"[NEW-QUESTION] to_sub_tasks data: task_id={camel_task.id}, summary={summary_task_content[:50]}..., subtasks_count={len(camel_task.subtasks)}")
520+
payload = {
521+
"project_id": options.project_id,
522+
"task_id": options.task_id,
523+
"sub_tasks": tree_sub_tasks(camel_task.subtasks),
524+
"delta_sub_tasks": tree_sub_tasks(sub_tasks),
525+
"is_final": True,
526+
"summary_task": summary_task_content,
527+
}
528+
await task_lock.put_queue(ActionDecomposeProgressData(data=payload))
529+
logger.info(f"[NEW-QUESTION] ✅ to_sub_tasks SSE sent")
530+
except Exception as e:
531+
logger.error(f"Error in background decomposition: {e}", exc_info=True)
532+
533+
bg_task = asyncio.create_task(run_decomposition())
534+
task_lock.add_background_task(bg_task)
474535

475536
elif item.action == Action.update_task:
476537
assert camel_task is not None
477538
update_tasks = {item.id: item for item in item.data.task}
539+
# Use stored decomposition results if available
540+
if not sub_tasks:
541+
sub_tasks = getattr(task_lock, "decompose_sub_tasks", [])
478542
sub_tasks = update_sub_tasks(sub_tasks, update_tasks)
479543
add_sub_tasks(camel_task, item.data.task)
480-
yield to_sub_tasks(camel_task, summary_task_content)
544+
summary_task_content_local = getattr(task_lock, "summary_task_content", summary_task_content)
545+
yield to_sub_tasks(camel_task, summary_task_content_local)
481546
elif item.action == Action.add_task:
482547

483548
# Check if this might be a misrouted second question
@@ -596,6 +661,8 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
596661
continue
597662

598663
task_lock.status = Status.processing
664+
if not sub_tasks:
665+
sub_tasks = getattr(task_lock, "decompose_sub_tasks", [])
599666
task = asyncio.create_task(workforce.eigent_start(sub_tasks))
600667
task_lock.add_background_task(task)
601668
elif item.action == Action.task_state:
@@ -700,11 +767,39 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
700767
context_for_multi_turn = build_context_for_workforce(task_lock, options)
701768

702769
logger.info(f"[LIFECYCLE] Multi-turn: calling workforce.handle_decompose_append_task for new task decomposition")
770+
stream_state = {"subtasks": [], "seen_ids": set()}
771+
772+
def on_stream_batch(new_tasks: list[Task], is_final: bool = False):
773+
fresh_tasks = [t for t in new_tasks if t.id not in stream_state["seen_ids"]]
774+
for t in fresh_tasks:
775+
stream_state["seen_ids"].add(t.id)
776+
stream_state["subtasks"].extend(fresh_tasks)
777+
778+
def on_stream_text(text_chunk: str):
779+
try:
780+
asyncio.run_coroutine_threadsafe(
781+
task_lock.put_queue(
782+
ActionDecomposeTextData(
783+
data={
784+
"project_id": options.project_id,
785+
"task_id": options.task_id,
786+
"content": text_chunk,
787+
}
788+
)
789+
),
790+
event_loop,
791+
)
792+
except Exception as e:
793+
logger.warning(f"Failed to stream decomposition text: {e}")
703794
new_sub_tasks = await workforce.handle_decompose_append_task(
704795
camel_task,
705796
reset=False,
706-
coordinator_context=context_for_multi_turn
797+
coordinator_context=context_for_multi_turn,
798+
on_stream_batch=on_stream_batch,
799+
on_stream_text=on_stream_text,
707800
)
801+
if stream_state["subtasks"]:
802+
new_sub_tasks = stream_state["subtasks"]
708803
logger.info(f"[LIFECYCLE] Multi-turn: task decomposed into {len(new_sub_tasks)} subtasks")
709804

710805
# Generate proper LLM summary for multi-turn tasks instead of hardcoded fallback
@@ -731,8 +826,16 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
731826
else:
732827
new_summary_content = f"Follow-up Task|{task_content_for_summary}"
733828

734-
# Send the extracted events
735-
yield to_sub_tasks(camel_task, new_summary_content)
829+
# Emit final subtasks once when decomposition is complete
830+
final_payload = {
831+
"project_id": options.project_id,
832+
"task_id": options.task_id,
833+
"sub_tasks": tree_sub_tasks(camel_task.subtasks),
834+
"delta_sub_tasks": tree_sub_tasks(new_sub_tasks),
835+
"is_final": True,
836+
"summary_task": new_summary_content,
837+
}
838+
await task_lock.put_queue(ActionDecomposeProgressData(data=final_payload))
736839

737840
# Update the context with new task data
738841
sub_tasks = new_sub_tasks
@@ -795,6 +898,10 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
795898
logger.info(f"Workforce resumed for project {options.project_id}")
796899
else:
797900
logger.warning(f"Cannot resume: workforce is None for project {options.project_id}")
901+
elif item.action == Action.decompose_text:
902+
yield sse_json("decompose_text", item.data)
903+
elif item.action == Action.decompose_progress:
904+
yield sse_json("to_sub_tasks", item.data)
798905
elif item.action == Action.new_agent:
799906
if workforce is not None:
800907
workforce.pause()

backend/app/service/task.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ class Action(str, Enum):
2020
update_task = "update_task" # user -> backend
2121
task_state = "task_state" # backend -> user
2222
new_task_state = "new_task_state" # backend -> user
23+
decompose_progress = "decompose_progress" # backend -> user (streaming decomposition)
24+
decompose_text = "decompose_text" # backend -> user (raw streaming text)
2325
start = "start" # user -> backend
2426
create_agent = "create_agent" # backend -> user
2527
activate_agent = "activate_agent" # backend -> user
@@ -64,6 +66,17 @@ class ActionTaskStateData(BaseModel):
6466
action: Literal[Action.task_state] = Action.task_state
6567
data: dict[Literal["task_id", "content", "state", "result", "failure_count"], str | int]
6668

69+
70+
class ActionDecomposeProgressData(BaseModel):
71+
action: Literal[Action.decompose_progress] = Action.decompose_progress
72+
data: dict
73+
74+
75+
class ActionDecomposeTextData(BaseModel):
76+
action: Literal[Action.decompose_text] = Action.decompose_text
77+
data: dict
78+
79+
6780
class ActionNewTaskStateData(BaseModel):
6881
action: Literal[Action.new_task_state] = Action.new_task_state
6982
data: dict[Literal["task_id", "content", "state", "result", "failure_count"], str | int]
@@ -227,6 +240,7 @@ class ActionSkipTaskData(BaseModel):
227240
| ActionAddTaskData
228241
| ActionRemoveTaskData
229242
| ActionSkipTaskData
243+
| ActionDecomposeTextData
230244
)
231245

232246

backend/app/utils/agent.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ def __init__(
107107
pause_event: asyncio.Event | None = None,
108108
prune_tool_calls_from_memory: bool = False,
109109
enable_snapshot_clean: bool = False,
110+
**kwargs: Any,
110111
) -> None:
111112
super().__init__(
112113
system_message=system_message,
@@ -128,6 +129,7 @@ def __init__(
128129
pause_event=pause_event,
129130
prune_tool_calls_from_memory=prune_tool_calls_from_memory,
130131
enable_snapshot_clean=enable_snapshot_clean,
132+
**kwargs,
131133
)
132134
self.api_task_id = api_task_id
133135
self.agent_name = agent_name
@@ -503,6 +505,21 @@ def agent_model(
503505
)
504506
)
505507

508+
# Build model config, defaulting to streaming for planner
509+
extra_params = options.extra_params or {}
510+
model_config: dict[str, Any] = {}
511+
if options.is_cloud():
512+
model_config["user"] = str(options.project_id)
513+
model_config.update(
514+
{
515+
k: v
516+
for k, v in extra_params.items()
517+
if k not in ["model_platform", "model_type", "api_key", "url"]
518+
}
519+
)
520+
if agent_name in (Agents.task_agent):
521+
model_config["stream"] = True
522+
506523
return ListenChatAgent(
507524
options.project_id,
508525
agent_name,
@@ -512,23 +529,15 @@ def agent_model(
512529
model_type=options.model_type,
513530
api_key=options.api_key,
514531
url=options.api_url,
515-
model_config_dict={
516-
"user": str(options.project_id),
517-
}
518-
if options.is_cloud()
519-
else None,
520-
**{
521-
k: v
522-
for k, v in (options.extra_params or {}).items()
523-
if k not in ["model_platform", "model_type", "api_key", "url"]
524-
},
532+
model_config_dict=model_config or None,
525533
),
526534
# output_language=options.language,
527535
tools=tools,
528536
agent_id=agent_id,
529537
prune_tool_calls_from_memory=prune_tool_calls_from_memory,
530538
toolkits_to_register_agent=toolkits_to_register_agent,
531539
enable_snapshot_clean=enable_snapshot_clean,
540+
stream_accumulate=False,
532541
)
533542

534543

0 commit comments

Comments
 (0)