Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,25 @@ docker buildx create --name kagent-builder-v0.23.0 --platform linux/amd64,linux/
```

Then run the `make helm-install` command again.

### Run kagent and an agent locally.

create a minimal cluster with kind. scale kagent to 0 replicas, as we will run it locally.

```bash
make create-kind-cluster helm-install-provider helm-tools push-test-agent
kubectl scale -n kagent deployment kagent-controller --replicas 0
```

Run kagent with `KAGENT_A2A_DEBUG_ADDR=localhost:8080` environment variable set, and when it connect to agents it will go to "localhost:8080" instead of the Kubernetes service.

Run the agent locally as well, with `--net=host` option, so it can connect to the kagent service on localhost. For example:

```bash
docker run --rm \
-e KAGENT_URL=http://localhost:8083 \
-e KAGENT_NAME=kebab-agent \
-e KAGENT_NAMESPACE=kagent \
--net=host \
localhost:5001/kebab:latest
```
20 changes: 20 additions & 0 deletions go/internal/controller/a2a/a2a_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package a2a
import (
"context"
"fmt"
"net"
"net/http"
"os"

"github.com/kagent-dev/kagent/go/api/v1alpha2"
"github.com/kagent-dev/kagent/go/internal/a2a"
Expand Down Expand Up @@ -57,6 +60,7 @@ func (a *a2aReconciler) ReconcileAgent(
agentRef := common.GetObjectRef(agent)

client, err := a2aclient.NewA2AClient(card.URL,
debugOpt(),
a2aclient.WithBuffer(a.streamingInitialBufSize, a.streamingMaxBufSize),
a2aclient.WithHTTPReqHandler(auth.A2ARequestHandler(a.authenticator)),
)
Expand All @@ -80,3 +84,19 @@ func (a *a2aReconciler) ReconcileAgentDeletion(
) {
a.a2aHandler.RemoveAgentHandler(agentRef)
}

func debugOpt() a2aclient.Option {
debugAddr := os.Getenv("KAGENT_A2A_DEBUG_ADDR")
if debugAddr != "" {
client := new(http.Client)
client.Transport = &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
var zeroDialer net.Dialer
return zeroDialer.DialContext(ctx, network, debugAddr)
},
}
return a2aclient.WithHTTPClient(client)
} else {
return func(*a2aclient.A2AClient) {}
}
}
12 changes: 6 additions & 6 deletions go/internal/controller/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,15 @@ func (a *kagentReconciler) reconcileAgentStatus(ctx context.Context, agent *v1al
conditionChanged := meta.SetStatusCondition(&agent.Status.Conditions, metav1.Condition{
Type: v1alpha2.AgentConditionTypeAccepted,
Status: status,
LastTransitionTime: metav1.Now(),
Reason: reason,
Message: message,
ObservedGeneration: agent.Generation,
})

deployedCondition := metav1.Condition{
Type: v1alpha2.AgentConditionTypeReady,
Status: metav1.ConditionUnknown,
LastTransitionTime: metav1.Now(),
ObservedGeneration: agent.Generation,
Copy link

Copilot AI Aug 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The LastTransitionTime field is removed but should be set when creating status conditions. Without this field, the condition may not properly track when state changes occurred, which is important for Kubernetes status reporting.

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Aug 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the previous condition, this deployedCondition is missing the LastTransitionTime field which is required for proper Kubernetes status condition tracking.

Suggested change
ObservedGeneration: agent.Generation,
ObservedGeneration: agent.Generation,
LastTransitionTime: metav1.Now(),

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

controller runtime handles it automatically. it is not needed.

}

// Check if the deployment exists
Expand Down Expand Up @@ -393,18 +393,18 @@ func (a *kagentReconciler) reconcileRemoteMCPServerStatus(
if err != nil {
status = metav1.ConditionFalse
message = err.Error()
reason = "AgentReconcileFailed"
reason = "ReconcileFailed"
reconcileLog.Error(err, "failed to reconcile agent", "tool_server", utils.GetObjectRef(toolServer))
} else {
status = metav1.ConditionTrue
reason = "AgentReconciled"
reason = "Reconciled"
}
conditionChanged := meta.SetStatusCondition(&toolServer.Status.Conditions, metav1.Condition{
Type: v1alpha2.AgentConditionTypeAccepted,
Status: status,
LastTransitionTime: metav1.Now(),
Reason: reason,
Message: message,
ObservedGeneration: toolServer.Generation,
Copy link

Copilot AI Aug 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The LastTransitionTime field is removed from this condition as well. This field should be included to properly track when the condition state changed.

Suggested change
ObservedGeneration: toolServer.Generation,
ObservedGeneration: toolServer.Generation,
LastTransitionTime: metav1.Now(),

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

controller runtime handles it automatically. it is not needed.

})

// only update if the status has changed to prevent looping the reconciler
Expand All @@ -418,7 +418,7 @@ func (a *kagentReconciler) reconcileRemoteMCPServerStatus(
toolServer.Status.DiscoveredTools = discoveredTools

if err := a.kube.Status().Update(ctx, toolServer); err != nil {
return fmt.Errorf("failed to update agent status: %v", err)
return fmt.Errorf("failed to update remote mcp server status: %v", err)
}

return nil
Expand Down
14 changes: 14 additions & 0 deletions go/test/e2e/agents/kebab/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,18 @@ docker build . --push -t localhost:5001/kebab:latest

```bash
kubectl apply -f agent.yaml
```

# Run manually

You can run the agent manually for testing purposes. Make sure you have Python 3.11+ installed.

```bash
cd go/test/e2e/agents/kebab
docker run --rm \
-e KAGENT_URL=http://localhost:8083 \
-e KAGENT_NAME=kebab-agent \
-e KAGENT_NAMESPACE=kagent \
--net=host \
localhost:5001/kebab:latest
```
10 changes: 8 additions & 2 deletions go/test/e2e/agents/kebab/kebab/agent.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

import logging
import random

from google.adk.agents.invocation_context import InvocationContext
Expand All @@ -7,6 +8,8 @@
from typing import AsyncGenerator, override
from google.genai import types

logger = logging.getLogger(__name__)

class KebabAgent(BaseAgent):

def __init__(self):
Expand All @@ -27,8 +30,11 @@ async def _run_async_impl(
Yields:
Event: the events generated by the agent.
"""

text = "kebab"
session = ctx.session
text = f"kebab for {session.user_id} in session {session.id} "

logger.info(f"Generating response: {text}")

model_response_event = Event(
id=Event.new_id(),
invocation_id=ctx.invocation_id,
Expand Down
2 changes: 1 addition & 1 deletion go/test/e2e/invoke_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,6 @@ func TestInvokeExternalAgent(t *testing.T) {
jsn, err := json.Marshal(taskResult)
require.NoError(t, err)
// Prime numbers
require.Contains(t, text, "kebab", string(jsn))
require.Contains(t, text, "kebab for user@example.com", string(jsn))
})
}
22 changes: 10 additions & 12 deletions python/packages/kagent-adk/src/kagent_adk/a2a.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
from ._task_store import KAgentTaskStore
from ._token import KAgentTokenService

# --- Constants ---
USER_ID = "admin@kagent.dev"

# --- Configure Logging ---
logger = logging.getLogger(__name__)

Expand All @@ -40,7 +37,7 @@ def __init__(self, user_id: str):

@property
def is_authenticated(self) -> bool:
return False
return True

@property
def user_name(self) -> str:
Expand All @@ -52,9 +49,8 @@ class KAgentRequestContextBuilder(SimpleRequestContextBuilder):
A request context builder that will be used to hack in the user_id for now.
"""

def __init__(self, user_id: str, task_store: TaskStore):
def __init__(self, task_store: TaskStore):
super().__init__(task_store=task_store)
self.user_id = user_id

async def build(
self,
Expand All @@ -64,10 +60,12 @@ async def build(
task: Task | None = None,
context: ServerCallContext | None = None,
) -> RequestContext:
if not context:
context = ServerCallContext(user=KAgentUser(user_id=self.user_id))
else:
context.user = KAgentUser(user_id=self.user_id)
if context:
# grab the user id from the header
headers = context.state.get("headers", {})
user_id = headers.get("x-user-id", None)
if user_id:
context.user = KAgentUser(user_id=user_id)
Copy link

Copilot AI Aug 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user_id from the x-user-id header is used directly without validation. This could allow header injection attacks or unauthorized access if the header is spoofed. Consider adding input validation to ensure the user_id is properly formatted and authorized.

Suggested change
context.user = KAgentUser(user_id=user_id)
if user_id and is_valid_user_id(user_id):
context.user = KAgentUser(user_id=user_id)
else:
if user_id is not None:
logger.warning(f"Invalid user_id in x-user-id header: {user_id!r}")

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's true, but not worse than current state. the auth story will evolve over time.

request_context = await super().build(params, task_id, context_id, task, context)
return request_context

Expand Down Expand Up @@ -103,7 +101,7 @@ def __init__(

def build(self) -> FastAPI:
token_service = KAgentTokenService(self.app_name)
http_client = httpx.AsyncClient(
http_client = httpx.AsyncClient( # TODO: add user and agent headers
base_url=kagent_url_override or self.kagent_url, event_hooks=token_service.event_hooks()
)
session_service = KAgentSessionService(http_client)
Expand All @@ -121,7 +119,7 @@ def create_runner() -> Runner:

kagent_task_store = KAgentTaskStore(http_client)

request_context_builder = KAgentRequestContextBuilder(user_id=USER_ID, task_store=kagent_task_store)
request_context_builder = KAgentRequestContextBuilder(task_store=kagent_task_store)
request_handler = DefaultRequestHandler(
agent_executor=agent_executor,
task_store=kagent_task_store,
Expand Down
Loading