-
Notifications
You must be signed in to change notification settings - Fork 377
[Feature] Implement Persistence and Tracing for CrewAI BYO Agents #951
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
b378d5a to
2c52e01
Compare
EItanya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looking good, just nits so far
409c0b8 to
8bbad04
Compare
| secretKeyRef: | ||
| name: kagent-google | ||
| key: GOOGLE_API_KEY | ||
| name: kagent-openai |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you tested with both LLM providers? Did you notice any differences for spans between the two?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For Gemini it seems that the instrumentation library is not instrumenting all the spans, but for openAI it works fine.
e6220bf to
0d70579
Compare
Signed-off-by: Jet Chiang <jetjiang.ez@gmail.com>
0d70579 to
a843e4a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements persistence and tracing support for CrewAI BYO (Bring Your Own) agents in the KAgent platform. It adds session-based memory storage for CrewAI crews and flow state persistence for CrewAI flows, along with OpenTelemetry tracing integration.
- Implements session-scoped memory storage for CrewAI crews using LongTermMemory interface
- Adds flow state persistence for CrewAI flows with checkpointing capabilities
- Integrates OpenTelemetry tracing with
opentelemetry-instrumentation-crewai
Reviewed Changes
Copilot reviewed 28 out of 30 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| python/samples/crewai/research-crew/src/research_crew/crew.py | Adds commented memory configuration option |
| python/samples/crewai/research-crew/src/research_crew/config/agents.yaml | Removes hardcoded LLM configuration |
| python/samples/crewai/poem_flow/src/poem_flow/main.py | Implements persistence with @persist decorator and flow state continuation |
| python/packages/kagent-crewai/src/kagent/crewai/_state.py | New KagentFlowPersistence class for CrewAI flow state management |
| python/packages/kagent-crewai/src/kagent/crewai/_memory.py | New KagentMemoryStorage class for session-scoped memory |
| python/packages/kagent-crewai/src/kagent/crewai/_executor.py | Updates executor to support memory and flow persistence |
| python/packages/kagent-crewai/src/kagent/crewai/_a2a.py | Adds CrewAI OpenTelemetry instrumentation |
| go/internal/httpserver/handlers/crewai.go | New Go handler for CrewAI memory and flow state endpoints |
| go/internal/database/models.go | Adds database models for CrewAI memory and flow state |
| go/test/e2e/invoke_api_test.go | Adds comprehensive e2e test for CrewAI agent with persistence |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
|
@EItanya for the existing E2E tests would it be a good idea to also use Artifacts instead of history when checking agent response since artifact is the usual way of returning task output? I tried it on the ADK inline / declarative agent and it passes |
Signed-off-by: Jet Chiang <jetjiang.ez@gmail.com>
| # Check if a TracerProvider already exists (e.g., set by CrewAI) | ||
| current_provider = trace.get_tracer_provider() | ||
| if isinstance(current_provider, TracerProvider): | ||
| # TracerProvider already exists, just add our processor to it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we also need to add our labels here, e.g. Resource({"service.name": "kagent"})
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CrewAI already creates a tracing provider that uses crewAI-telemetry as their service name (https://github.com/crewAIInc/crewAI/blob/8d93361cb305e39638cb6b1c257c572e129ac9a1/src/crewai/telemetry/telemetry.py). I tried disabling CrewAI's built in tracing provider so we create our own Kagent one, but that resulted in no spans being sent.
A reliable workaround I found is to create a custom span processor wrapper that overwrites the resource service name before exporting:
class ResourceOverrideSpanProcessor(SpanProcessor):
"""A span processor that overrides the resource attributes before exporting."""
def __init__(self, exporter: SpanExporter, resource: Resource):
self.batch_processor = BatchSpanProcessor(exporter)
self.override_resource = resource
def on_start(self, span: "Span", parent_context: Optional[Context] = None) -> None:
self.batch_processor.on_start(span, parent_context)
def on_end(self, span: ReadableSpan) -> None:
# Override the resource before passing to the batch processor
# This ensures the service.name is always the one in the resource (i.e. kagent)
span._resource = self.override_resource
self.batch_processor.on_end(span)
def shutdown(self) -> None:
self.batch_processor.shutdown()
def force_flush(self, timeout_millis: int = 30000) -> bool:
return self.batch_processor.force_flush(timeout_millis)And we can add it like this:
exporter = OTLPSpanExporter(endpoint=trace_endpoint)
processor = ResourceOverrideSpanProcessor(exporter, resource)
existing_crewai_trace_provider.add_span_processor(processor)This would work for any framework already creating a OTEL trace provider and all their spans will be renamed to kagent. How does this sound?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's interesting because this should probably be configurable, I think let's merge this as is and we can potentially revisit.
Signed-off-by: Jet Chiang <jetjiang.ez@gmail.com>
Signed-off-by: Jet Chiang <jetjiang.ez@gmail.com>
Signed-off-by: Jet Chiang <jetjiang.ez@gmail.com>
Signed-off-by: Jet Chiang <jetjiang.ez@gmail.com>
9a104fa to
990efd9
Compare
…gent-dev#951) This PR is a follow up for kagent-dev#920 - [x] Creates go handlers for `/crewai` routes for storing and retrieving memory items - [x] Updates the database handler to store and retrieve long term memory items from CrewAI Crew and state for CrewAI Flow - [x] Adds custom memory store to agents before execution to allow for session-based persistence - [x] Updates samples to show usage of memory by setting `memory=True` and persistence by `@persist()` - [x] Support tracing with `opentelemetry-instrumentation-crewai` for crewai specific spans as suggested in code review e2e test is added for CrewAI Poem flow sample agents using the mock LLM server. The test case will create the agent resource, create the mock LLM server using mock response in `invoke_creawi_agent.json`, test synchronous, streaming, and persistence for the agent. It requires the agent container to be build and pushed to the registry (by running `make poem-flow-sample` or the Dockerfile directly in `samples/crewai/poem_flow`). The following changes are made to helper functions in e2e test: 1. `runSyncTest` accepts an optional `contextID` to be included in the mock message to test session persistence 2. `runSyncTest` accepts an optional `useArtifacts` argument to indicate if the expected output should be checked for in the history messages or the artifact returned by the A2A server, since the A2A protocol specifies that `Artifacts` are the standard way to convey final outputs of a task --------- Signed-off-by: Jet Chiang <jetjiang.ez@gmail.com>
…gent-dev#951) This PR is a follow up for kagent-dev#920 ## Features - [x] Creates go handlers for `/crewai` routes for storing and retrieving memory items - [x] Updates the database handler to store and retrieve long term memory items from CrewAI Crew and state for CrewAI Flow - [x] Adds custom memory store to agents before execution to allow for session-based persistence - [x] Updates samples to show usage of memory by setting `memory=True` and persistence by `@persist()` - [x] Support tracing with `opentelemetry-instrumentation-crewai` for crewai specific spans as suggested in code review ## Tests e2e test is added for CrewAI Poem flow sample agents using the mock LLM server. The test case will create the agent resource, create the mock LLM server using mock response in `invoke_creawi_agent.json`, test synchronous, streaming, and persistence for the agent. It requires the agent container to be build and pushed to the registry (by running `make poem-flow-sample` or the Dockerfile directly in `samples/crewai/poem_flow`). The following changes are made to helper functions in e2e test: 1. `runSyncTest` accepts an optional `contextID` to be included in the mock message to test session persistence 2. `runSyncTest` accepts an optional `useArtifacts` argument to indicate if the expected output should be checked for in the history messages or the artifact returned by the A2A server, since the A2A protocol specifies that `Artifacts` are the standard way to convey final outputs of a task --------- Signed-off-by: Jet Chiang <jetjiang.ez@gmail.com>
…gent-dev#951) This PR is a follow up for kagent-dev#920 ## Features - [x] Creates go handlers for `/crewai` routes for storing and retrieving memory items - [x] Updates the database handler to store and retrieve long term memory items from CrewAI Crew and state for CrewAI Flow - [x] Adds custom memory store to agents before execution to allow for session-based persistence - [x] Updates samples to show usage of memory by setting `memory=True` and persistence by `@persist()` - [x] Support tracing with `opentelemetry-instrumentation-crewai` for crewai specific spans as suggested in code review ## Tests e2e test is added for CrewAI Poem flow sample agents using the mock LLM server. The test case will create the agent resource, create the mock LLM server using mock response in `invoke_creawi_agent.json`, test synchronous, streaming, and persistence for the agent. It requires the agent container to be build and pushed to the registry (by running `make poem-flow-sample` or the Dockerfile directly in `samples/crewai/poem_flow`). The following changes are made to helper functions in e2e test: 1. `runSyncTest` accepts an optional `contextID` to be included in the mock message to test session persistence 2. `runSyncTest` accepts an optional `useArtifacts` argument to indicate if the expected output should be checked for in the history messages or the artifact returned by the A2A server, since the A2A protocol specifies that `Artifacts` are the standard way to convey final outputs of a task --------- Signed-off-by: Jet Chiang <jetjiang.ez@gmail.com>
This PR is a follow up for #920
Features
/crewairoutes for storing and retrieving memory itemsmemory=Trueand persistence by@persist()opentelemetry-instrumentation-crewaifor crewai specific spans as suggested in code reviewTests
e2e test is added for CrewAI Poem flow sample agents using the mock LLM server. The test case will create the agent resource, create the mock LLM server using mock response in
invoke_creawi_agent.json, test synchronous, streaming, and persistence for the agent. It requires the agent container to be build and pushed to the registry (by runningmake poem-flow-sampleor the Dockerfile directly insamples/crewai/poem_flow).The following changes are made to helper functions in e2e test:
runSyncTestaccepts an optionalcontextIDto be included in the mock message to test session persistencerunSyncTestaccepts an optionaluseArtifactsargument to indicate if the expected output should be checked for in the history messages or the artifact returned by the A2A server, since the A2A protocol specifies thatArtifactsare the standard way to convey final outputs of a task