diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index c4e7b5173..e839aad6d 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -58,3 +58,25 @@ docker buildx create --name kagent-builder-v0.23.0 --platform linux/amd64,linux/ ``` Then run the `make helm-install` command again. + +### Run kagent and an agent locally. + +create a minimal cluster with kind. scale kagent to 0 replicas, as we will run it locally. + +```bash +make create-kind-cluster helm-install-provider helm-tools push-test-agent +kubectl scale -n kagent deployment kagent-controller --replicas 0 +``` + +Run kagent with `KAGENT_A2A_DEBUG_ADDR=localhost:8080` environment variable set, and when it connect to agents it will go to "localhost:8080" instead of the Kubernetes service. + +Run the agent locally as well, with `--net=host` option, so it can connect to the kagent service on localhost. For example: + +```bash +docker run --rm \ + -e KAGENT_URL=http://localhost:8083 \ + -e KAGENT_NAME=kebab-agent \ + -e KAGENT_NAMESPACE=kagent \ + --net=host \ + localhost:5001/kebab:latest +``` diff --git a/go/internal/controller/a2a/a2a_reconciler.go b/go/internal/controller/a2a/a2a_reconciler.go index 26139724f..357d0014d 100644 --- a/go/internal/controller/a2a/a2a_reconciler.go +++ b/go/internal/controller/a2a/a2a_reconciler.go @@ -3,6 +3,9 @@ package a2a import ( "context" "fmt" + "net" + "net/http" + "os" "github.com/kagent-dev/kagent/go/api/v1alpha2" "github.com/kagent-dev/kagent/go/internal/a2a" @@ -57,6 +60,7 @@ func (a *a2aReconciler) ReconcileAgent( agentRef := common.GetObjectRef(agent) client, err := a2aclient.NewA2AClient(card.URL, + debugOpt(), a2aclient.WithBuffer(a.streamingInitialBufSize, a.streamingMaxBufSize), a2aclient.WithHTTPReqHandler(auth.A2ARequestHandler(a.authenticator)), ) @@ -80,3 +84,19 @@ func (a *a2aReconciler) ReconcileAgentDeletion( ) { a.a2aHandler.RemoveAgentHandler(agentRef) } + +func debugOpt() a2aclient.Option { + debugAddr := os.Getenv("KAGENT_A2A_DEBUG_ADDR") + if debugAddr != "" { + client := new(http.Client) + client.Transport = &http.Transport{ + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + var zeroDialer net.Dialer + return zeroDialer.DialContext(ctx, network, debugAddr) + }, + } + return a2aclient.WithHTTPClient(client) + } else { + return func(*a2aclient.A2AClient) {} + } +} diff --git a/go/internal/controller/reconciler/reconciler.go b/go/internal/controller/reconciler/reconciler.go index 8c0aeef69..134b837b7 100644 --- a/go/internal/controller/reconciler/reconciler.go +++ b/go/internal/controller/reconciler/reconciler.go @@ -146,15 +146,15 @@ func (a *kagentReconciler) reconcileAgentStatus(ctx context.Context, agent *v1al conditionChanged := meta.SetStatusCondition(&agent.Status.Conditions, metav1.Condition{ Type: v1alpha2.AgentConditionTypeAccepted, Status: status, - LastTransitionTime: metav1.Now(), Reason: reason, Message: message, + ObservedGeneration: agent.Generation, }) deployedCondition := metav1.Condition{ Type: v1alpha2.AgentConditionTypeReady, Status: metav1.ConditionUnknown, - LastTransitionTime: metav1.Now(), + ObservedGeneration: agent.Generation, } // Check if the deployment exists @@ -393,18 +393,18 @@ func (a *kagentReconciler) reconcileRemoteMCPServerStatus( if err != nil { status = metav1.ConditionFalse message = err.Error() - reason = "AgentReconcileFailed" + reason = "ReconcileFailed" reconcileLog.Error(err, "failed to reconcile agent", "tool_server", utils.GetObjectRef(toolServer)) } else { status = metav1.ConditionTrue - reason = "AgentReconciled" + reason = "Reconciled" } conditionChanged := meta.SetStatusCondition(&toolServer.Status.Conditions, metav1.Condition{ Type: v1alpha2.AgentConditionTypeAccepted, Status: status, - LastTransitionTime: metav1.Now(), Reason: reason, Message: message, + ObservedGeneration: toolServer.Generation, }) // only update if the status has changed to prevent looping the reconciler @@ -418,7 +418,7 @@ func (a *kagentReconciler) reconcileRemoteMCPServerStatus( toolServer.Status.DiscoveredTools = discoveredTools if err := a.kube.Status().Update(ctx, toolServer); err != nil { - return fmt.Errorf("failed to update agent status: %v", err) + return fmt.Errorf("failed to update remote mcp server status: %v", err) } return nil diff --git a/go/test/e2e/agents/kebab/README.md b/go/test/e2e/agents/kebab/README.md index 504d9178d..a601f392b 100644 --- a/go/test/e2e/agents/kebab/README.md +++ b/go/test/e2e/agents/kebab/README.md @@ -12,4 +12,18 @@ docker build . --push -t localhost:5001/kebab:latest ```bash kubectl apply -f agent.yaml +``` + +# Run manually + +You can run the agent manually for testing purposes. Make sure you have Python 3.11+ installed. + +```bash +cd go/test/e2e/agents/kebab +docker run --rm \ + -e KAGENT_URL=http://localhost:8083 \ + -e KAGENT_NAME=kebab-agent \ + -e KAGENT_NAMESPACE=kagent \ + --net=host \ + localhost:5001/kebab:latest ``` \ No newline at end of file diff --git a/go/test/e2e/agents/kebab/kebab/agent.py b/go/test/e2e/agents/kebab/kebab/agent.py index c24b150af..560e8148d 100644 --- a/go/test/e2e/agents/kebab/kebab/agent.py +++ b/go/test/e2e/agents/kebab/kebab/agent.py @@ -1,4 +1,5 @@ +import logging import random from google.adk.agents.invocation_context import InvocationContext @@ -7,6 +8,8 @@ from typing import AsyncGenerator, override from google.genai import types +logger = logging.getLogger(__name__) + class KebabAgent(BaseAgent): def __init__(self): @@ -27,8 +30,11 @@ async def _run_async_impl( Yields: Event: the events generated by the agent. """ - - text = "kebab" + session = ctx.session + text = f"kebab for {session.user_id} in session {session.id} " + + logger.info(f"Generating response: {text}") + model_response_event = Event( id=Event.new_id(), invocation_id=ctx.invocation_id, diff --git a/go/test/e2e/invoke_api_test.go b/go/test/e2e/invoke_api_test.go index 6607da18d..83d02e9bc 100644 --- a/go/test/e2e/invoke_api_test.go +++ b/go/test/e2e/invoke_api_test.go @@ -164,6 +164,6 @@ func TestInvokeExternalAgent(t *testing.T) { jsn, err := json.Marshal(taskResult) require.NoError(t, err) // Prime numbers - require.Contains(t, text, "kebab", string(jsn)) + require.Contains(t, text, "kebab for user@example.com", string(jsn)) }) } diff --git a/python/packages/kagent-adk/src/kagent_adk/a2a.py b/python/packages/kagent-adk/src/kagent_adk/a2a.py index 52ef410ef..3f5388a1b 100644 --- a/python/packages/kagent-adk/src/kagent_adk/a2a.py +++ b/python/packages/kagent-adk/src/kagent_adk/a2a.py @@ -27,9 +27,6 @@ from ._task_store import KAgentTaskStore from ._token import KAgentTokenService -# --- Constants --- -USER_ID = "admin@kagent.dev" - # --- Configure Logging --- logger = logging.getLogger(__name__) @@ -40,7 +37,7 @@ def __init__(self, user_id: str): @property def is_authenticated(self) -> bool: - return False + return True @property def user_name(self) -> str: @@ -52,9 +49,8 @@ class KAgentRequestContextBuilder(SimpleRequestContextBuilder): A request context builder that will be used to hack in the user_id for now. """ - def __init__(self, user_id: str, task_store: TaskStore): + def __init__(self, task_store: TaskStore): super().__init__(task_store=task_store) - self.user_id = user_id async def build( self, @@ -64,10 +60,12 @@ async def build( task: Task | None = None, context: ServerCallContext | None = None, ) -> RequestContext: - if not context: - context = ServerCallContext(user=KAgentUser(user_id=self.user_id)) - else: - context.user = KAgentUser(user_id=self.user_id) + if context: + # grab the user id from the header + headers = context.state.get("headers", {}) + user_id = headers.get("x-user-id", None) + if user_id: + context.user = KAgentUser(user_id=user_id) request_context = await super().build(params, task_id, context_id, task, context) return request_context @@ -103,7 +101,7 @@ def __init__( def build(self) -> FastAPI: token_service = KAgentTokenService(self.app_name) - http_client = httpx.AsyncClient( + http_client = httpx.AsyncClient( # TODO: add user and agent headers base_url=kagent_url_override or self.kagent_url, event_hooks=token_service.event_hooks() ) session_service = KAgentSessionService(http_client) @@ -121,7 +119,7 @@ def create_runner() -> Runner: kagent_task_store = KAgentTaskStore(http_client) - request_context_builder = KAgentRequestContextBuilder(user_id=USER_ID, task_store=kagent_task_store) + request_context_builder = KAgentRequestContextBuilder(task_store=kagent_task_store) request_handler = DefaultRequestHandler( agent_executor=agent_executor, task_store=kagent_task_store,