-
Notifications
You must be signed in to change notification settings - Fork 389
feat: add HITL with DataPart support and event-based sync #1025
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add HITL with DataPart support and event-based sync #1025
Conversation
332efa9 to
3b100c7
Compare
EItanya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks so so much for the PR, the solution is looking really cool. I just have a few nits and questions!
python/packages/kagent-langgraph/src/kagent/langgraph/_executor.py
Outdated
Show resolved
Hide resolved
|
@EItanya - realized i hadn't pushed some stuff i had locally. pushed and addressed some of your other comments. |
python/packages/kagent-langgraph/src/kagent/langgraph/_executor.py
Outdated
Show resolved
Hide resolved
python/packages/kagent-langgraph/src/kagent/langgraph/_executor.py
Outdated
Show resolved
Hide resolved
python/packages/kagent-langgraph/src/kagent/langgraph/_executor.py
Outdated
Show resolved
Hide resolved
python/packages/kagent-langgraph/src/kagent/langgraph/_executor.py
Outdated
Show resolved
Hide resolved
python/packages/kagent-langgraph/src/kagent/langgraph/_executor.py
Outdated
Show resolved
Hide resolved
| "app_name": self.app_name, | ||
| }, | ||
| "project_name": self.app_name, | ||
| "run_name": "kagent-langgraph-resume", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are there issues in having the name be constant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You didn't answer this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept it constant to match the initial execution pattern (line 96 also uses constant "kagent-langgraph-exec").
The run_name is used for LangSmith/tracing to categorize run types. Since tags already include dynamic values (task_id, context_id, thread_id), I figured the run_name should be a constant category label. I'm not 100% certain this is correct. Should run_name include unique identifiers? Or is constant appropriate for grouping trace types? I can change it to f"kagent-langgraph-resume-{task_id}" if that's better for observability.
| # Determine decision from message | ||
| message_text = context.get_user_input().lower() | ||
|
|
||
| if "approved" in message_text or "proceed" in message_text: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would this be meant for approve/deny scenarios only? what if we want to ask for specific input from the user and resume execution with that?
I think the decisions (e.g. "approve"/"deny") should be constants coming from the client -- clicking Approve/Deny (or whatever UX we go with) should automatically send True/False or some enum that we know will always signal an approve/deny decision, so we don't have to check for specific strings in message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, but going to leave TextPart as a fallback for now, if that's ok
Address maintainer feedback by standardizing HITL (Human-in-the-Loop) functionality in kagent-core to enable uniform interrupt handling across all executor types (LangGraph, CrewAI, ADK). Changes: - Add kagent-core/a2a/_hitl.py with framework-agnostic HITL types and utilities - Add HITL constants to kagent-core/a2a/_consts.py - Refactor kagent-langgraph executor to use core HITL utilities - Extract backtick escaping to dedicated function - Implement two-tier decision detection (DataPart priority, TextPart fallback) - Change security default from approve to deny - Fix: Use JSON for checkpoint metadata serialization (LangGraph 1.0 compatibility) - Add 10 comprehensive tests for HITL functionality Addresses: kagent-dev#1025 (comments kagent-dev#2, kagent-dev#4, kagent-dev#5, kagent-dev#6, kagent-dev#8) Signed-off-by: apexlnc <43242113+apexlnc@users.noreply.github.com>
2c1ab75 to
c37cf69
Compare
Standardize HITL (Human-in-the-Loop) functionality in kagent-core to enable uniform interrupt handling across all executor types (LangGraph, CrewAI, ADK). Changes: - Add kagent-core/a2a/_hitl.py with framework-agnostic HITL types and utilities - Add HITL constants to kagent-core/a2a/_consts.py with KAGENT_HITL_ prefix - Refactor kagent-langgraph executor to use core HITL utilities - Extract backtick escaping to dedicated function - Implement two-tier decision detection (DataPart priority, TextPart fallback) - Change security default from approve to deny - Fix: Use JSON for checkpoint metadata serialization (LangGraph 1.0 compatibility) - Add 10 comprehensive tests for HITL functionality Signed-off-by: apexlnc <43242113+apexlnc@users.noreply.github.com>
c37cf69 to
7320883
Compare
EItanya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple of last questions/nits, overall looking awesome!
|
|
||
| type_, serialized_checkpoint = self.serde.dumps_typed(checkpoint) | ||
| serialized_metadata = self.jsonplus_serde.dumps(get_checkpoint_metadata(config, metadata)) | ||
| # Serialize metadata as JSON (simpler, no type needed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain this change a little more? I initially used that serializer in order to match the langraph code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- LangGraph 1.0 changed the API: removed .dumps(), now only .dumps_typed() exists, and this was causing:
AttributeError: 'JsonPlusSerializer' object has no attribute 'dumps'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As for why JSON instead of .dumps_typed():
- Go backend expects JSON - The database schema comment explicitly says: Metadata string // JSON serialized metadata
- No metadata_type field - Go schema has checkpoint_type for checkpoints but no metadata_type for metadata
- Cross-language - JSON works Python ↔ Go, msgpack would need type info
| LangGraph's format and delegates to the generic handler in kagent-core. | ||
| """ | ||
| # Extract interrupt details from LangGraph format | ||
| if not interrupt_data: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Python does this make sure it has len > 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah Python empty lists are falsy, so this checks if the list is empty and returns early if so
| "app_name": self.app_name, | ||
| }, | ||
| "project_name": self.app_name, | ||
| "run_name": "kagent-langgraph-resume", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You didn't answer this
Let me know if you have any other questions! |
…#1025) Improves the LangGraph executor HITL implementation with structured decision support and event-based synchronization, plus fixes for checkpoint duplicate key errors. Controller (Go): - Fix checkpoint duplicate key errors using GORM's OnConflict{DoNothing: true} - Use errors.Is() for proper wrapped error detection kagent-core: - Add event-based task save synchronization (wait_for_save()) - Replace arbitrary sleep delays with reactive event signaling kagent-langgraph: - DataPart Support: Check structured DataPart for decision_type before text parsing - Event-based sync: Use wait_for_save() instead of asyncio.sleep(0.5) - Bug fix: Properly access part.root for RootModel types - Complete HITL implementation with interrupt/resume logic Tested end-to-end with live HITL workflows. Follows A2A protocol patterns from deepagents and uses idiomatic GORM patterns for database operations. Signed-off-by: apexlnc <43242113+apexlnc@users.noreply.github.com> Signed-off-by: killjoycircuit <rutujdhawale@gmail.com>
Improves the LangGraph executor HITL implementation with structured decision support and event-based synchronization, plus fixes for checkpoint duplicate key errors.
Controller (Go):
kagent-core:
kagent-langgraph:
Tested end-to-end with live HITL workflows. Follows A2A protocol patterns from deepagents and uses idiomatic GORM patterns for database operations.