Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,16 @@ ENV PATH=$PATH:/.kagent/python/bin:/.kagent/python/.venv/bin
COPY --chown=python:pythongroup pyproject.toml .
COPY --chown=python:pythongroup .python-version .
COPY --chown=python:pythongroup uv.lock .
COPY --chown=python:pythongroup packages packages
COPY --chown=python:pythongroup packages/kagent-adk packages/kagent-adk
COPY --chown=python:pythongroup packages/kagent-core packages/kagent-core
COPY --chown=python:pythongroup README.md .

ARG VERSION

# Install dependencies
RUN echo "Installing dependencies..." \
# Install only kagent-adk package and its dependencies
RUN echo "Installing kagent-adk package..." \
&& uv venv --python=python$TOOLS_PYTHON_VERSION \
&& uv version ${VERSION%%-*} --package kagent-adk \
&& uv sync --locked --refresh \
&& uv build --package kagent-adk \
&& uv lock && uv sync --package kagent-adk \
&& uv cache prune

WORKDIR /app
Expand Down
8 changes: 8 additions & 0 deletions python/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,11 @@ audit: build
.PHONY: basic-langchain-sample
basic-langchain-sample:
docker build . -f samples/langgraph/currency/Dockerfile --tag localhost:5001/langgraph-currency:latest --push

.PHONY: research-crew-sample
research-crew-sample:
docker build . -f samples/crewai/research-crew/Dockerfile --tag localhost:5001/research-crew:latest --push

.PHONY: poem-flow-sample
poem-flow-sample:
docker build . -f samples/crewai/poem_flow/Dockerfile --tag localhost:5001/poem-flow:latest --push
47 changes: 47 additions & 0 deletions python/packages/kagent-crewai/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# KAgent CrewAI Integration

This package provides CrewAI integration for KAgent with A2A (Agent-to-Agent) server support.

## Features

- **A2A Server Integration**: Compatible with KAgent's Agent-to-Agent protocol
- **Event Streaming**: Real-time streaming of crew execution events
- **FastAPI Integration**: Ready-to-deploy web server for agent execution

## Quick Start

This package supports both CrewAI Crews and Flows. To get started, define your CrewAI crew or flow as you normally would, then replace the `kickoff` command with the `KAgentApp` which will handle A2A requests and execution.

```python
from kagent.crewai import KAgentApp
# This is the crew or flow you defined
from research_crew.crew import ResearchCrew

app = KAgentApp(crew=ResearchCrew().crew(), agent_card={
"name": "my-crewai-agent",
"description": "A CrewAI agent with KAgent integration",
"version": "0.1.0",
"capabilities": {"streaming": True},
"defaultInputModes": ["text"],
"defaultOutputModes": ["text"]
})

fastapi_app = app.build()
uvicorn.run(fastapi_app, host="0.0.0.0", port=8080)
```

## Architecture

The package mirrors the structure of `kagent-adk` and `kagent-langgraph` but uses CrewAI for multi-agent orchestration:

- **CrewAIAgentExecutor**: Executes CrewAI workflows within A2A protocol
- **KAgentApp**: FastAPI application builder with A2A integration
- **Event Converters**: Translates CrewAI events into A2A events for streaming.

## Deployment

The uses the same deployment approach as other KAgent A2A applications (ADK / LangGraph). You can refer to `samples/crewai/` for examples.

## Note

Due to the current design of the package, your tasks in CrewAI should expect a `input` parameter which contains the input text if available. We will support JSON input for more native CrewAI integration in the future. You can check out an example in `samples/crewai/research-crew/src/research_crew/config/tasks.yaml`.
37 changes: 37 additions & 0 deletions python/packages/kagent-crewai/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "kagent-crewai"
version = "0.1.0"
description = "CrewAI integration for KAgent with A2A server support"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"crewai[tools]>=0.193.2,<1.0.0",
"httpx>=0.25.0",
"fastapi>=0.100.0",
"pydantic>=2.0.0",
"typing-extensions>=4.0.0",
"uvicorn>=0.20.0",
"a2a-sdk>=0.3.1",
"kagent-core",
]

[project.optional-dependencies]
dev = [
"pytest>=7.0.0",
"pytest-asyncio>=0.21.0",
"black>=23.0.0",
"ruff>=0.1.0",
]

[tool.uv.sources]
kagent-core = {workspace = true}

[tool.hatch.build.targets.wheel]
packages = ["src/kagent"]

[tool.ruff]
extend = "../../pyproject.toml"
86 changes: 86 additions & 0 deletions python/packages/kagent-crewai/src/kagent/crewai/_a2a.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import faulthandler
import logging
from typing import Union

import httpx
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.types import AgentCard
from fastapi import FastAPI, Request
from fastapi.responses import PlainTextResponse

from crewai import Crew, Flow
from kagent.core import KAgentConfig, configure_tracing
from kagent.core.a2a import KAgentRequestContextBuilder, KAgentTaskStore

from ._executor import CrewAIAgentExecutor, CrewAIAgentExecutorConfig

logger = logging.getLogger(__name__)


def def_health_check(request: Request) -> PlainTextResponse:
return PlainTextResponse("OK")


def thread_dump(request: Request) -> PlainTextResponse:
import io

buf = io.StringIO()
faulthandler.dump_traceback(file=buf)
buf.seek(0)
return PlainTextResponse(buf.read())


class KAgentApp:
def __init__(
self,
*,
crew: Union[Crew, Flow],
agent_card: AgentCard,
config: KAgentConfig = KAgentConfig(),
executor_config: CrewAIAgentExecutorConfig | None = None,
tracing: bool = True,
):
self._crew = crew
self.agent_card = AgentCard.model_validate(agent_card)
self.config = config
self.executor_config = executor_config or CrewAIAgentExecutorConfig()
self.tracing = tracing

def build(self) -> FastAPI:
http_client = httpx.AsyncClient(base_url=self.config.url)

agent_executor = CrewAIAgentExecutor(
crew=self._crew,
app_name=self.config.app_name,
config=self.executor_config,
)

task_store = KAgentTaskStore(http_client)
request_context_builder = KAgentRequestContextBuilder(task_store=task_store)
request_handler = DefaultRequestHandler(
agent_executor=agent_executor,
task_store=task_store,
request_context_builder=request_context_builder,
)

a2a_app = A2AStarletteApplication(
agent_card=self.agent_card,
http_handler=request_handler,
)

faulthandler.enable()
app = FastAPI(
title=f"KAgent CrewAI: {self.config.app_name}",
description=f"CrewAI agent with KAgent integration: {self.agent_card.description}",
version=self.agent_card.version,
)

if self.tracing:
configure_tracing(app)

app.add_route("/health", methods=["GET"], route=def_health_check)
app.add_route("/thread_dump", methods=["GET"], route=thread_dump)
a2a_app.add_routes_to_app(app)

return app
151 changes: 151 additions & 0 deletions python/packages/kagent-crewai/src/kagent/crewai/_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import logging
import uuid
from datetime import datetime, timezone
from typing import Union, override

from a2a.server.agent_execution import AgentExecutor
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
from a2a.types import (
Artifact,
DataPart,
Message,
Part,
Role,
TaskArtifactUpdateEvent,
TaskState,
TaskStatus,
TaskStatusUpdateEvent,
TextPart,
)
from pydantic import BaseModel

from crewai import Crew, Flow

from ._listeners import A2ACrewAIListener

logger = logging.getLogger(__name__)


class CrewAIAgentExecutorConfig(BaseModel):
execution_timeout: float = 300.0


class CrewAIAgentExecutor(AgentExecutor):
def __init__(
self,
*,
crew: Union[Crew, Flow],
app_name: str,
config: CrewAIAgentExecutorConfig | None = None,
):
super().__init__()
self._crew = crew
self.app_name = app_name
self._config = config or CrewAIAgentExecutorConfig()

@override
async def cancel(self, context: RequestContext, event_queue: EventQueue):
raise NotImplementedError("Cancellation is not implemented")

@override
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
):
if not context.message:
raise ValueError("A2A request must have a message")

if not context.current_task:
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
task_id=context.task_id,
status=TaskStatus(
state=TaskState.submitted,
message=context.message,
timestamp=datetime.now(timezone.utc).isoformat(),
),
context_id=context.context_id,
final=False,
)
)

await event_queue.enqueue_event(
TaskStatusUpdateEvent(
task_id=context.task_id,
status=TaskStatus(
state=TaskState.working,
timestamp=datetime.now(timezone.utc).isoformat(),
),
context_id=context.context_id,
final=False,
metadata={
"app_name": self.app_name,
"session_id": context.context_id,
},
)
)

# This listener will capture and convert CrewAI events and enqueue them to A2A event queue
A2ACrewAIListener(context, event_queue, self.app_name)

try:
inputs = None
if context.message and context.message.parts:
for part in context.message.parts:
if isinstance(part, DataPart):
inputs = part.root.data
break
if inputs is None:
user_input = context.get_user_input()
inputs = {"input": user_input} if user_input else {}

if isinstance(self._crew, Flow):
flow_class = type(self._crew)
flow_instance = flow_class()
result = await flow_instance.kickoff_async(inputs=inputs)
else:
result = await self._crew.kickoff_async(inputs=inputs)

await event_queue.enqueue_event(
TaskArtifactUpdateEvent(
task_id=context.task_id,
last_chunk=True,
context_id=context.context_id,
artifact=Artifact(
artifact_id=str(uuid.uuid4()),
parts=[Part(TextPart(text=str(result.raw)))],
),
)
)
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
task_id=context.task_id,
status=TaskStatus(
state=TaskState.completed,
timestamp=datetime.now(timezone.utc).isoformat(),
),
context_id=context.context_id,
final=True,
)
)

except Exception as e:
logger.error(f"Error during CrewAI execution: {e}", exc_info=True)
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
task_id=context.task_id,
status=TaskStatus(
state=TaskState.failed,
timestamp=datetime.now(timezone.utc).isoformat(),
message=Message(
message_id=str(uuid.uuid4()),
role=Role.agent,
parts=[Part(TextPart(text=str(e)))],
),
),
context_id=context.context_id,
final=True,
)
)
Loading
Loading