Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 168 additions & 0 deletions go/internal/a2a/a2a_registrar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package a2a

import (
"context"
"fmt"
"net"
"net/http"
"os"
"reflect"
"time"

"github.com/go-logr/logr"
"github.com/kagent-dev/kagent/go/api/v1alpha2"
agent_translator "github.com/kagent-dev/kagent/go/internal/controller/translator/agent"
authimpl "github.com/kagent-dev/kagent/go/internal/httpserver/auth"
common "github.com/kagent-dev/kagent/go/internal/utils"
"github.com/kagent-dev/kagent/go/pkg/auth"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
crcache "sigs.k8s.io/controller-runtime/pkg/cache"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
a2aclient "trpc.group/trpc-go/trpc-a2a-go/client"
)

type A2ARegistrar struct {
cache crcache.Cache
translator agent_translator.AdkApiTranslator
handlerMux A2AHandlerMux
a2aBaseUrl string
authenticator auth.AuthProvider
a2aBaseOptions []a2aclient.Option
}

var _ manager.Runnable = (*A2ARegistrar)(nil)

func NewA2ARegistrar(
cache crcache.Cache,
translator agent_translator.AdkApiTranslator,
mux A2AHandlerMux,
a2aBaseUrl string,
authenticator auth.AuthProvider,
streamingMaxBuf int,
streamingInitialBuf int,
streamingTimeout time.Duration,
) *A2ARegistrar {
reg := &A2ARegistrar{
cache: cache,
translator: translator,
handlerMux: mux,
a2aBaseUrl: a2aBaseUrl,
authenticator: authenticator,
a2aBaseOptions: []a2aclient.Option{
a2aclient.WithTimeout(streamingTimeout),
a2aclient.WithBuffer(streamingInitialBuf, streamingMaxBuf),
debugOpt(),
},
}

return reg
}

func (a *A2ARegistrar) NeedLeaderElection() bool {
return false
}

func (a *A2ARegistrar) Start(ctx context.Context) error {
log := ctrllog.FromContext(ctx).WithName("a2a-registrar")

informer, err := a.cache.GetInformer(ctx, &v1alpha2.Agent{})
if err != nil {
return fmt.Errorf("failed to get cache informer: %w", err)
}

if _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason you used the cache directly here instead of creating a Controller like the rest of the k8s watchers? I usually prefer consistency across the various watchers so the codebase is easier to grok, but definitely open to this if there's a good reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I debated this a bit as well. I ended up just an informer implementation for a couple reasons.

  1. Controllers come with a bunch of overhead that I think is unnecessary for this implementation. We don't need the reconciliation semantics that controllers are designed for. The registrar doesn't need to loop and update to conmverge on desired state; it just needs to react to add/update/delete events to maintain an in-memory routing table. Additionally we don't need all the other overhead that comes with a controller including predicates, owning/watching relationships, woirk queues, rate limiting, retries, etc.
  2. I wanted to explicitly break from the existing controllers. This code needs to run on all controller replicas, hence the A2ARegistrar deliberately, and explicilty, returns false from NeedLeaderElection() - rather than making this configurable when using a Controller.

Finally, looking at it another way... if we were in future going to try and extract a kagent-api component, then it would feel very weird to me to be running a "Controller" within that component - but maybe that's just me 😆

Copy link
Contributor Author

@onematchfox onematchfox Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also.... having 2 AgentControllers in the same pod also feels really strange (and wrong).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these are all very good reasons!

AddFunc: func(obj interface{}) {
if agent, ok := obj.(*v1alpha2.Agent); ok {
if err := a.upsertAgentHandler(ctx, agent, log); err != nil {
log.Error(err, "failed to upsert A2A handler", "agent", common.GetObjectRef(agent))
}
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldAgent, ok1 := oldObj.(*v1alpha2.Agent)
newAgent, ok2 := newObj.(*v1alpha2.Agent)
if !ok1 || !ok2 {
return
}
if oldAgent.Generation != newAgent.Generation || !reflect.DeepEqual(oldAgent.Spec, newAgent.Spec) {
if err := a.upsertAgentHandler(ctx, newAgent, log); err != nil {
log.Error(err, "failed to upsert A2A handler", "agent", common.GetObjectRef(newAgent))
}
}
},
DeleteFunc: func(obj interface{}) {
agent, ok := obj.(*v1alpha2.Agent)
if !ok {
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
if a2, ok := tombstone.Obj.(*v1alpha2.Agent); ok {
agent = a2
}
}
}
if agent == nil {
return
}
ref := common.GetObjectRef(agent)
a.handlerMux.RemoveAgentHandler(ref)
log.V(1).Info("removed A2A handler", "agent", ref)
},
}); err != nil {
return fmt.Errorf("failed to add informer event handler: %w", err)
}

if ok := a.cache.WaitForCacheSync(ctx); !ok {
return fmt.Errorf("cache sync failed")
}

<-ctx.Done()
return nil
}

func (a *A2ARegistrar) upsertAgentHandler(ctx context.Context, agent *v1alpha2.Agent, log logr.Logger) error {
agentRef := types.NamespacedName{Namespace: agent.GetNamespace(), Name: agent.GetName()}
card := agent_translator.GetA2AAgentCard(agent)

client, err := a2aclient.NewA2AClient(
card.URL,
append(
a.a2aBaseOptions,
a2aclient.WithHTTPReqHandler(
authimpl.A2ARequestHandler(
a.authenticator,
agentRef,
),
),
)...,
)
if err != nil {
return fmt.Errorf("create A2A client for %s: %w", agentRef, err)
}

cardCopy := *card
cardCopy.URL = fmt.Sprintf("%s/%s/", a.a2aBaseUrl, agentRef)

if err := a.handlerMux.SetAgentHandler(agentRef.String(), client, cardCopy); err != nil {
return fmt.Errorf("set handler for %s: %w", agentRef, err)
}

log.V(1).Info("registered/updated A2A handler", "agent", agentRef)
return nil
}

func debugOpt() a2aclient.Option {
debugAddr := os.Getenv("KAGENT_A2A_DEBUG_ADDR")
if debugAddr != "" {
client := new(http.Client)
client.Transport = &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
var zeroDialer net.Dialer
return zeroDialer.DialContext(ctx, network, debugAddr)
},
}
return a2aclient.WithHTTPClient(client)
} else {
return func(*a2aclient.A2AClient) {}
}
}
109 changes: 0 additions & 109 deletions go/internal/controller/a2a/a2a_reconciler.go

This file was deleted.

20 changes: 0 additions & 20 deletions go/internal/controller/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/util/retry"
"trpc.group/trpc-go/trpc-a2a-go/server"

"github.com/kagent-dev/kagent/go/api/v1alpha2"
"github.com/kagent-dev/kagent/go/internal/controller/a2a"
"github.com/kagent-dev/kagent/go/internal/controller/translator"
agent_translator "github.com/kagent-dev/kagent/go/internal/controller/translator/agent"
"github.com/kagent-dev/kagent/go/internal/database"
Expand Down Expand Up @@ -53,7 +51,6 @@ type KagentReconciler interface {

type kagentReconciler struct {
adkTranslator agent_translator.AdkApiTranslator
a2aReconciler a2a.A2AReconciler

kube client.Client
dbClient database.Client
Expand All @@ -69,14 +66,12 @@ func NewKagentReconciler(
kube client.Client,
dbClient database.Client,
defaultModelConfig types.NamespacedName,
a2aReconciler a2a.A2AReconciler,
) KagentReconciler {
return &kagentReconciler{
adkTranslator: translator,
kube: kube,
dbClient: dbClient,
defaultModelConfig: defaultModelConfig,
a2aReconciler: a2aReconciler,
}
}

Expand All @@ -100,9 +95,6 @@ func (a *kagentReconciler) ReconcileKagentAgent(ctx context.Context, req ctrl.Re
}

func (a *kagentReconciler) handleAgentDeletion(req ctrl.Request) error {
// remove a2a handler if it exists
a.a2aReconciler.ReconcileAgentDeletion(req.String())

if err := a.dbClient.DeleteAgent(req.String()); err != nil {
return fmt.Errorf("failed to delete agent %s: %w",
req.String(), err)
Expand Down Expand Up @@ -499,10 +491,6 @@ func (a *kagentReconciler) reconcileAgent(ctx context.Context, agent *v1alpha2.A
return fmt.Errorf("failed to reconcile owned objects: %v", err)
}

if err := a.reconcileA2A(ctx, agent, agentOutputs.AgentCard); err != nil {
return fmt.Errorf("failed to reconcile A2A for agent %s/%s: %v", agent.Namespace, agent.Name, err)
}

if err := a.upsertAgent(ctx, agent, agentOutputs); err != nil {
return fmt.Errorf("failed to upsert agent %s/%s: %v", agent.Namespace, agent.Name, err)
}
Expand Down Expand Up @@ -744,14 +732,6 @@ func (a *kagentReconciler) getDiscoveredMCPTools(ctx context.Context, serverRef
return discoveredTools, nil
}

func (a *kagentReconciler) reconcileA2A(
ctx context.Context,
agent *v1alpha2.Agent,
card server.AgentCard,
) error {
return a.a2aReconciler.ReconcileAgent(ctx, agent, card)
}

func convertTool(tool *database.Tool) (*v1alpha2.MCPTool, error) {
return &v1alpha2.MCPTool{
Name: tool.ID,
Expand Down
Loading
Loading