Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 171 additions & 0 deletions go/internal/a2a/a2a_registrar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package a2a

import (
"context"
"fmt"
"net"
"net/http"
"os"
"reflect"
"time"

"github.com/go-logr/logr"
"github.com/kagent-dev/kagent/go/api/v1alpha2"
agent_translator "github.com/kagent-dev/kagent/go/internal/controller/translator/agent"
authimpl "github.com/kagent-dev/kagent/go/internal/httpserver/auth"
common "github.com/kagent-dev/kagent/go/internal/utils"
"github.com/kagent-dev/kagent/go/pkg/auth"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
crcache "sigs.k8s.io/controller-runtime/pkg/cache"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
a2aclient "trpc.group/trpc-go/trpc-a2a-go/client"
)

type A2ARegistrar struct {
cache crcache.Cache
translator agent_translator.AdkApiTranslator
handlerMux A2AHandlerMux
a2aBaseUrl string
authenticator auth.AuthProvider
a2aBaseOptions []a2aclient.Option
}

var _ manager.Runnable = (*A2ARegistrar)(nil)

func NewA2ARegistrar(
cache crcache.Cache,
translator agent_translator.AdkApiTranslator,
mux A2AHandlerMux,
a2aBaseUrl string,
authenticator auth.AuthProvider,
streamingMaxBuf int,
streamingInitialBuf int,
streamingTimeout time.Duration,
) *A2ARegistrar {
reg := &A2ARegistrar{
cache: cache,
translator: translator,
handlerMux: mux,
a2aBaseUrl: a2aBaseUrl,
authenticator: authenticator,
a2aBaseOptions: []a2aclient.Option{
a2aclient.WithTimeout(streamingTimeout),
a2aclient.WithBuffer(streamingInitialBuf, streamingMaxBuf),
debugOpt(),
},
}

return reg
}

func (a *A2ARegistrar) NeedLeaderElection() bool {
return false
}

func (a *A2ARegistrar) Start(ctx context.Context) error {
log := ctrllog.FromContext(ctx).WithName("a2a-registrar")

informer, err := a.cache.GetInformer(ctx, &v1alpha2.Agent{})
if err != nil {
return fmt.Errorf("failed to get cache informer: %w", err)
}

if _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if agent, ok := obj.(*v1alpha2.Agent); ok {
if err := a.upsertAgentHandler(ctx, agent, log); err != nil {
log.Error(err, "failed to upsert A2A handler", "agent", common.GetObjectRef(agent))
}
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldAgent, ok1 := oldObj.(*v1alpha2.Agent)
newAgent, ok2 := newObj.(*v1alpha2.Agent)
if !ok1 || !ok2 {
return
}
if oldAgent.Generation != newAgent.Generation || !reflect.DeepEqual(oldAgent.Spec, newAgent.Spec) {
if err := a.upsertAgentHandler(ctx, newAgent, log); err != nil {
log.Error(err, "failed to upsert A2A handler", "agent", common.GetObjectRef(newAgent))
}
}
},
DeleteFunc: func(obj interface{}) {
agent, ok := obj.(*v1alpha2.Agent)
if !ok {
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
if a2, ok := tombstone.Obj.(*v1alpha2.Agent); ok {
agent = a2
}
}
}
if agent == nil {
return
}
ref := common.GetObjectRef(agent)
a.handlerMux.RemoveAgentHandler(ref)
log.V(1).Info("removed A2A handler", "agent", ref)
},
}); err != nil {
return fmt.Errorf("failed to add informer event handler: %w", err)
}

if ok := a.cache.WaitForCacheSync(ctx); !ok {
return fmt.Errorf("cache sync failed")
}

<-ctx.Done()
return nil
}

func (a *A2ARegistrar) upsertAgentHandler(ctx context.Context, agent *v1alpha2.Agent, log logr.Logger) error {
agentRef := types.NamespacedName{Namespace: agent.GetNamespace(), Name: agent.GetName()}
card, err := a.translator.TranslateAgentCard(ctx, agent)
if err != nil {
return fmt.Errorf("translate agent %s: %w", agentRef, err)
}

client, err := a2aclient.NewA2AClient(
card.URL,
append(
a.a2aBaseOptions,
a2aclient.WithHTTPReqHandler(
authimpl.A2ARequestHandler(
a.authenticator,
agentRef,
),
),
)...,
)
if err != nil {
return fmt.Errorf("create A2A client for %s: %w", agentRef, err)
}

cardCopy := *card
cardCopy.URL = fmt.Sprintf("%s/%s/", a.a2aBaseUrl, agentRef)

if err := a.handlerMux.SetAgentHandler(agentRef.String(), client, cardCopy); err != nil {
return fmt.Errorf("set handler for %s: %w", agentRef, err)
}

log.V(1).Info("registered/updated A2A handler", "agent", agentRef)
return nil
}

func debugOpt() a2aclient.Option {
debugAddr := os.Getenv("KAGENT_A2A_DEBUG_ADDR")
if debugAddr != "" {
client := new(http.Client)
client.Transport = &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
var zeroDialer net.Dialer
return zeroDialer.DialContext(ctx, network, debugAddr)
},
}
return a2aclient.WithHTTPClient(client)
} else {
return func(*a2aclient.A2AClient) {}
}
}
109 changes: 0 additions & 109 deletions go/internal/controller/a2a/a2a_reconciler.go

This file was deleted.

20 changes: 0 additions & 20 deletions go/internal/controller/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/util/retry"
"trpc.group/trpc-go/trpc-a2a-go/server"

"github.com/kagent-dev/kagent/go/api/v1alpha2"
"github.com/kagent-dev/kagent/go/internal/controller/a2a"
"github.com/kagent-dev/kagent/go/internal/controller/translator"
agent_translator "github.com/kagent-dev/kagent/go/internal/controller/translator/agent"
"github.com/kagent-dev/kagent/go/internal/database"
Expand Down Expand Up @@ -53,7 +51,6 @@ type KagentReconciler interface {

type kagentReconciler struct {
adkTranslator agent_translator.AdkApiTranslator
a2aReconciler a2a.A2AReconciler

kube client.Client
dbClient database.Client
Expand All @@ -69,14 +66,12 @@ func NewKagentReconciler(
kube client.Client,
dbClient database.Client,
defaultModelConfig types.NamespacedName,
a2aReconciler a2a.A2AReconciler,
) KagentReconciler {
return &kagentReconciler{
adkTranslator: translator,
kube: kube,
dbClient: dbClient,
defaultModelConfig: defaultModelConfig,
a2aReconciler: a2aReconciler,
}
}

Expand All @@ -100,9 +95,6 @@ func (a *kagentReconciler) ReconcileKagentAgent(ctx context.Context, req ctrl.Re
}

func (a *kagentReconciler) handleAgentDeletion(req ctrl.Request) error {
// remove a2a handler if it exists
a.a2aReconciler.ReconcileAgentDeletion(req.String())

if err := a.dbClient.DeleteAgent(req.String()); err != nil {
return fmt.Errorf("failed to delete agent %s: %w",
req.String(), err)
Expand Down Expand Up @@ -499,10 +491,6 @@ func (a *kagentReconciler) reconcileAgent(ctx context.Context, agent *v1alpha2.A
return fmt.Errorf("failed to reconcile owned objects: %v", err)
}

if err := a.reconcileA2A(ctx, agent, agentOutputs.AgentCard); err != nil {
return fmt.Errorf("failed to reconcile A2A for agent %s/%s: %v", agent.Namespace, agent.Name, err)
}

if err := a.upsertAgent(ctx, agent, agentOutputs); err != nil {
return fmt.Errorf("failed to upsert agent %s/%s: %v", agent.Namespace, agent.Name, err)
}
Expand Down Expand Up @@ -744,14 +732,6 @@ func (a *kagentReconciler) getDiscoveredMCPTools(ctx context.Context, serverRef
return discoveredTools, nil
}

func (a *kagentReconciler) reconcileA2A(
ctx context.Context,
agent *v1alpha2.Agent,
card server.AgentCard,
) error {
return a.a2aReconciler.ReconcileAgent(ctx, agent, card)
}

func convertTool(tool *database.Tool) (*v1alpha2.MCPTool, error) {
return &v1alpha2.MCPTool{
Name: tool.ID,
Expand Down
Loading
Loading