From 28bbe3f9a5e089d4453db95298911550a598f200 Mon Sep 17 00:00:00 2001 From: Brian Fox <878612+onematchfox@users.noreply.github.com> Date: Tue, 25 Nov 2025 10:19:35 +0100 Subject: [PATCH 1/3] feat(controller): decouple A2A API from controller reconcilation Removed A2A "reconciler" and replaced with a registrar that is responsible for registering/deregistering mux handlers. Main controller still manages cluster resources and DB/status. Signed-off-by: Brian Fox <878612+onematchfox@users.noreply.github.com> --- go/internal/a2a/a2a_registrar.go | 171 ++++++++++++++++++ go/internal/controller/a2a/a2a_reconciler.go | 109 ----------- .../controller/reconciler/reconciler.go | 20 -- .../translator/agent/adk_api_translator.go | 107 +++++------ go/pkg/app/app.go | 32 ++-- 5 files changed, 244 insertions(+), 195 deletions(-) create mode 100644 go/internal/a2a/a2a_registrar.go delete mode 100644 go/internal/controller/a2a/a2a_reconciler.go diff --git a/go/internal/a2a/a2a_registrar.go b/go/internal/a2a/a2a_registrar.go new file mode 100644 index 000000000..91c6809c1 --- /dev/null +++ b/go/internal/a2a/a2a_registrar.go @@ -0,0 +1,171 @@ +package a2a + +import ( + "context" + "fmt" + "net" + "net/http" + "os" + "reflect" + "time" + + "github.com/go-logr/logr" + "github.com/kagent-dev/kagent/go/api/v1alpha2" + agent_translator "github.com/kagent-dev/kagent/go/internal/controller/translator/agent" + authimpl "github.com/kagent-dev/kagent/go/internal/httpserver/auth" + common "github.com/kagent-dev/kagent/go/internal/utils" + "github.com/kagent-dev/kagent/go/pkg/auth" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" + crcache "sigs.k8s.io/controller-runtime/pkg/cache" + ctrllog "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" + a2aclient "trpc.group/trpc-go/trpc-a2a-go/client" +) + +type A2ARegistrar struct { + cache crcache.Cache + translator agent_translator.AdkApiTranslator + handlerMux A2AHandlerMux + a2aBaseUrl string + authenticator auth.AuthProvider + a2aBaseOptions []a2aclient.Option +} + +var _ manager.Runnable = (*A2ARegistrar)(nil) + +func NewA2ARegistrar( + cache crcache.Cache, + translator agent_translator.AdkApiTranslator, + mux A2AHandlerMux, + a2aBaseUrl string, + authenticator auth.AuthProvider, + streamingMaxBuf int, + streamingInitialBuf int, + streamingTimeout time.Duration, +) *A2ARegistrar { + reg := &A2ARegistrar{ + cache: cache, + translator: translator, + handlerMux: mux, + a2aBaseUrl: a2aBaseUrl, + authenticator: authenticator, + a2aBaseOptions: []a2aclient.Option{ + a2aclient.WithTimeout(streamingTimeout), + a2aclient.WithBuffer(streamingInitialBuf, streamingMaxBuf), + debugOpt(), + }, + } + + return reg +} + +func (a *A2ARegistrar) NeedLeaderElection() bool { + return false +} + +func (a *A2ARegistrar) Start(ctx context.Context) error { + log := ctrllog.FromContext(ctx).WithName("a2a-registrar") + + informer, err := a.cache.GetInformer(ctx, &v1alpha2.Agent{}) + if err != nil { + return fmt.Errorf("failed to get cache informer: %w", err) + } + + if _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if agent, ok := obj.(*v1alpha2.Agent); ok { + if err := a.upsertAgentHandler(ctx, agent, log); err != nil { + log.Error(err, "failed to upsert A2A handler", "agent", common.GetObjectRef(agent)) + } + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + oldAgent, ok1 := oldObj.(*v1alpha2.Agent) + newAgent, ok2 := newObj.(*v1alpha2.Agent) + if !ok1 || !ok2 { + return + } + if oldAgent.Generation != newAgent.Generation || !reflect.DeepEqual(oldAgent.Spec, newAgent.Spec) { + if err := a.upsertAgentHandler(ctx, newAgent, log); err != nil { + log.Error(err, "failed to upsert A2A handler", "agent", common.GetObjectRef(newAgent)) + } + } + }, + DeleteFunc: func(obj interface{}) { + agent, ok := obj.(*v1alpha2.Agent) + if !ok { + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + if a2, ok := tombstone.Obj.(*v1alpha2.Agent); ok { + agent = a2 + } + } + } + if agent == nil { + return + } + ref := common.GetObjectRef(agent) + a.handlerMux.RemoveAgentHandler(ref) + log.V(1).Info("removed A2A handler", "agent", ref) + }, + }); err != nil { + return fmt.Errorf("failed to add informer event handler: %w", err) + } + + if ok := a.cache.WaitForCacheSync(ctx); !ok { + return fmt.Errorf("cache sync failed") + } + + <-ctx.Done() + return nil +} + +func (a *A2ARegistrar) upsertAgentHandler(ctx context.Context, agent *v1alpha2.Agent, log logr.Logger) error { + agentRef := types.NamespacedName{Namespace: agent.GetNamespace(), Name: agent.GetName()} + card, err := a.translator.TranslateAgentCard(ctx, agent) + if err != nil { + return fmt.Errorf("translate agent %s: %w", agentRef, err) + } + + client, err := a2aclient.NewA2AClient( + card.URL, + append( + a.a2aBaseOptions, + a2aclient.WithHTTPReqHandler( + authimpl.A2ARequestHandler( + a.authenticator, + agentRef, + ), + ), + )..., + ) + if err != nil { + return fmt.Errorf("create A2A client for %s: %w", agentRef, err) + } + + cardCopy := *card + cardCopy.URL = fmt.Sprintf("%s/%s/", a.a2aBaseUrl, agentRef) + + if err := a.handlerMux.SetAgentHandler(agentRef.String(), client, cardCopy); err != nil { + return fmt.Errorf("set handler for %s: %w", agentRef, err) + } + + log.V(1).Info("registered/updated A2A handler", "agent", agentRef) + return nil +} + +func debugOpt() a2aclient.Option { + debugAddr := os.Getenv("KAGENT_A2A_DEBUG_ADDR") + if debugAddr != "" { + client := new(http.Client) + client.Transport = &http.Transport{ + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + var zeroDialer net.Dialer + return zeroDialer.DialContext(ctx, network, debugAddr) + }, + } + return a2aclient.WithHTTPClient(client) + } else { + return func(*a2aclient.A2AClient) {} + } +} diff --git a/go/internal/controller/a2a/a2a_reconciler.go b/go/internal/controller/a2a/a2a_reconciler.go deleted file mode 100644 index 8d71f94f5..000000000 --- a/go/internal/controller/a2a/a2a_reconciler.go +++ /dev/null @@ -1,109 +0,0 @@ -package a2a - -import ( - "context" - "fmt" - "net" - "net/http" - "os" - "time" - - "github.com/kagent-dev/kagent/go/api/v1alpha2" - "github.com/kagent-dev/kagent/go/internal/a2a" - authimpl "github.com/kagent-dev/kagent/go/internal/httpserver/auth" - common "github.com/kagent-dev/kagent/go/internal/utils" - "github.com/kagent-dev/kagent/go/pkg/auth" - "k8s.io/apimachinery/pkg/types" - a2aclient "trpc.group/trpc-go/trpc-a2a-go/client" - "trpc.group/trpc-go/trpc-a2a-go/server" -) - -type A2AReconciler interface { - ReconcileAgent( - ctx context.Context, - agent *v1alpha2.Agent, - card server.AgentCard, - ) error - - ReconcileAgentDeletion( - agentRef string, - ) -} - -type ClientOptions struct { - StreamingMaxBufSize int - StreamingInitialBufSize int - Timeout time.Duration -} - -type a2aReconciler struct { - a2aHandler a2a.A2AHandlerMux - a2aBaseUrl string - authenticator auth.AuthProvider - clientOptions ClientOptions -} - -func NewReconciler( - a2aHandler a2a.A2AHandlerMux, - a2aBaseUrl string, - clientOptions ClientOptions, - authenticator auth.AuthProvider, -) A2AReconciler { - return &a2aReconciler{ - a2aHandler: a2aHandler, - a2aBaseUrl: a2aBaseUrl, - clientOptions: clientOptions, - authenticator: authenticator, - } -} - -func (a *a2aReconciler) ReconcileAgent( - ctx context.Context, - agent *v1alpha2.Agent, - card server.AgentCard, -) error { - agentRef := common.GetObjectRef(agent) - agentNns := types.NamespacedName{Namespace: agent.GetNamespace(), Name: agent.GetName()} - - client, err := a2aclient.NewA2AClient(card.URL, - a2aclient.WithTimeout(a.clientOptions.Timeout), - a2aclient.WithBuffer(a.clientOptions.StreamingInitialBufSize, a.clientOptions.StreamingMaxBufSize), - debugOpt(), - a2aclient.WithHTTPReqHandler(authimpl.A2ARequestHandler(a.authenticator, agentNns)), - ) - if err != nil { - return err - } - - // Modify card for kagent proxy - cardCopy := card - cardCopy.URL = fmt.Sprintf("%s/%s/", a.a2aBaseUrl, agentRef) - - return a.a2aHandler.SetAgentHandler( - agentRef, - client, - cardCopy, - ) -} - -func (a *a2aReconciler) ReconcileAgentDeletion( - agentRef string, -) { - a.a2aHandler.RemoveAgentHandler(agentRef) -} - -func debugOpt() a2aclient.Option { - debugAddr := os.Getenv("KAGENT_A2A_DEBUG_ADDR") - if debugAddr != "" { - client := new(http.Client) - client.Transport = &http.Transport{ - DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { - var zeroDialer net.Dialer - return zeroDialer.DialContext(ctx, network, debugAddr) - }, - } - return a2aclient.WithHTTPClient(client) - } else { - return func(*a2aclient.A2AClient) {} - } -} diff --git a/go/internal/controller/reconciler/reconciler.go b/go/internal/controller/reconciler/reconciler.go index d7fdb1bbd..b7443c777 100644 --- a/go/internal/controller/reconciler/reconciler.go +++ b/go/internal/controller/reconciler/reconciler.go @@ -19,10 +19,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/util/retry" - "trpc.group/trpc-go/trpc-a2a-go/server" "github.com/kagent-dev/kagent/go/api/v1alpha2" - "github.com/kagent-dev/kagent/go/internal/controller/a2a" "github.com/kagent-dev/kagent/go/internal/controller/translator" agent_translator "github.com/kagent-dev/kagent/go/internal/controller/translator/agent" "github.com/kagent-dev/kagent/go/internal/database" @@ -53,7 +51,6 @@ type KagentReconciler interface { type kagentReconciler struct { adkTranslator agent_translator.AdkApiTranslator - a2aReconciler a2a.A2AReconciler kube client.Client dbClient database.Client @@ -69,14 +66,12 @@ func NewKagentReconciler( kube client.Client, dbClient database.Client, defaultModelConfig types.NamespacedName, - a2aReconciler a2a.A2AReconciler, ) KagentReconciler { return &kagentReconciler{ adkTranslator: translator, kube: kube, dbClient: dbClient, defaultModelConfig: defaultModelConfig, - a2aReconciler: a2aReconciler, } } @@ -100,9 +95,6 @@ func (a *kagentReconciler) ReconcileKagentAgent(ctx context.Context, req ctrl.Re } func (a *kagentReconciler) handleAgentDeletion(req ctrl.Request) error { - // remove a2a handler if it exists - a.a2aReconciler.ReconcileAgentDeletion(req.String()) - if err := a.dbClient.DeleteAgent(req.String()); err != nil { return fmt.Errorf("failed to delete agent %s: %w", req.String(), err) @@ -499,10 +491,6 @@ func (a *kagentReconciler) reconcileAgent(ctx context.Context, agent *v1alpha2.A return fmt.Errorf("failed to reconcile owned objects: %v", err) } - if err := a.reconcileA2A(ctx, agent, agentOutputs.AgentCard); err != nil { - return fmt.Errorf("failed to reconcile A2A for agent %s/%s: %v", agent.Namespace, agent.Name, err) - } - if err := a.upsertAgent(ctx, agent, agentOutputs); err != nil { return fmt.Errorf("failed to upsert agent %s/%s: %v", agent.Namespace, agent.Name, err) } @@ -744,14 +732,6 @@ func (a *kagentReconciler) getDiscoveredMCPTools(ctx context.Context, serverRef return discoveredTools, nil } -func (a *kagentReconciler) reconcileA2A( - ctx context.Context, - agent *v1alpha2.Agent, - card server.AgentCard, -) error { - return a.a2aReconciler.ReconcileAgent(ctx, agent, card) -} - func convertTool(tool *database.Tool) (*v1alpha2.MCPTool, error) { return &v1alpha2.MCPTool{ Name: tool.ID, diff --git a/go/internal/controller/translator/agent/adk_api_translator.go b/go/internal/controller/translator/agent/adk_api_translator.go index 65cb1701b..571b4688d 100644 --- a/go/internal/controller/translator/agent/adk_api_translator.go +++ b/go/internal/controller/translator/agent/adk_api_translator.go @@ -68,6 +68,10 @@ type AdkApiTranslator interface { ctx context.Context, agent *v1alpha2.Agent, ) (*AgentOutputs, error) + TranslateAgentCard( + ctx context.Context, + agent *v1alpha2.Agent, + ) (*server.AgentCard, error) GetOwnedResourceTypes() []client.Object } @@ -118,45 +122,43 @@ func (a *adkApiTranslator) TranslateAgent( return nil, err } + var cfg *adk.AgentConfig + var dep *resolvedDeployment + var secretHashBytes []byte + switch agent.Spec.Type { case v1alpha2.AgentType_Declarative: - - cfg, card, mdd, secretHashBytes, err := a.translateInlineAgent(ctx, agent) + var mdd *modelDeploymentData + cfg, mdd, secretHashBytes, err = a.translateInlineAgent(ctx, agent) if err != nil { return nil, err } - dep, err := a.resolveInlineDeployment(agent, mdd) + dep, err = a.resolveInlineDeployment(agent, mdd) if err != nil { return nil, err } - return a.buildManifest(ctx, agent, dep, cfg, card, secretHashBytes) case v1alpha2.AgentType_BYO: - dep, err := a.resolveByoDeployment(agent) + dep, err = a.resolveByoDeployment(agent) if err != nil { return nil, err } - // TODO: Resolve this from the actual pod - agentCard := &server.AgentCard{ - Name: strings.ReplaceAll(agent.Name, "-", "_"), - Description: agent.Spec.Description, - URL: fmt.Sprintf("http://%s.%s:8080", agent.Name, agent.Namespace), - Capabilities: server.AgentCapabilities{ - Streaming: ptr.To(true), - PushNotifications: ptr.To(false), - StateTransitionHistory: ptr.To(true), - }, - // Can't be null for Python, so set to empty list - Skills: []server.AgentSkill{}, - DefaultInputModes: []string{"text"}, - DefaultOutputModes: []string{"text"}, - } - return a.buildManifest(ctx, agent, dep, nil, agentCard, nil) default: return nil, fmt.Errorf("unknown agent type: %s", agent.Spec.Type) } + + card := a.buildAgentCard(agent) + + return a.buildManifest(ctx, agent, dep, cfg, card, secretHashBytes) +} + +func (a *adkApiTranslator) TranslateAgentCard( + ctx context.Context, + agent *v1alpha2.Agent, +) (*server.AgentCard, error) { + return a.buildAgentCard(agent), nil } // GetOwnedResourceTypes returns all the resource types that may be created for an agent. @@ -178,6 +180,29 @@ func (r *adkApiTranslator) GetOwnedResourceTypes() []client.Object { return ownedResources } +func (a *adkApiTranslator) buildAgentCard(agent *v1alpha2.Agent) *server.AgentCard { + card := server.AgentCard{ + Name: strings.ReplaceAll(agent.Name, "-", "_"), + Description: agent.Spec.Description, + URL: fmt.Sprintf("http://%s.%s:8080", agent.Name, agent.Namespace), + Capabilities: server.AgentCapabilities{ + Streaming: ptr.To(true), + PushNotifications: ptr.To(false), + StateTransitionHistory: ptr.To(true), + }, + // Can't be null for Python, so set to empty list + Skills: []server.AgentSkill{}, + DefaultInputModes: []string{"text"}, + DefaultOutputModes: []string{"text"}, + } + if agent.Spec.Type == v1alpha2.AgentType_Declarative && agent.Spec.Declarative.A2AConfig != nil { + card.Skills = slices.Collect(utils.Map(slices.Values(agent.Spec.Declarative.A2AConfig.Skills), func(skill v1alpha2.AgentSkill) server.AgentSkill { + return server.AgentSkill(skill) + })) + } + return &card +} + func (a *adkApiTranslator) validateAgent(ctx context.Context, agent *v1alpha2.Agent, state *tState) error { agentRef := utils.GetObjectRef(agent) @@ -508,16 +533,16 @@ func (a *adkApiTranslator) buildManifest( return outputs, a.runPlugins(ctx, agent, outputs) } -func (a *adkApiTranslator) translateInlineAgent(ctx context.Context, agent *v1alpha2.Agent) (*adk.AgentConfig, *server.AgentCard, *modelDeploymentData, []byte, error) { +func (a *adkApiTranslator) translateInlineAgent(ctx context.Context, agent *v1alpha2.Agent) (*adk.AgentConfig, *modelDeploymentData, []byte, error) { model, mdd, secretHashBytes, err := a.translateModel(ctx, agent.Namespace, agent.Spec.Declarative.ModelConfig) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, err } systemMessage, err := a.resolveSystemMessage(ctx, agent) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, err } cfg := &adk.AgentConfig{ @@ -526,26 +551,6 @@ func (a *adkApiTranslator) translateInlineAgent(ctx context.Context, agent *v1al Model: model, ExecuteCode: ptr.Deref(agent.Spec.Declarative.ExecuteCodeBlocks, false), } - agentCard := &server.AgentCard{ - Name: strings.ReplaceAll(agent.Name, "-", "_"), - Description: agent.Spec.Description, - URL: fmt.Sprintf("http://%s.%s:8080", agent.Name, agent.Namespace), - Capabilities: server.AgentCapabilities{ - Streaming: ptr.To(true), - PushNotifications: ptr.To(false), - StateTransitionHistory: ptr.To(true), - }, - // Can't be null for Python, so set to empty list - Skills: []server.AgentSkill{}, - DefaultInputModes: []string{"text"}, - DefaultOutputModes: []string{"text"}, - } - - if agent.Spec.Declarative.A2AConfig != nil { - agentCard.Skills = slices.Collect(utils.Map(slices.Values(agent.Spec.Declarative.A2AConfig.Skills), func(skill v1alpha2.AgentSkill) server.AgentSkill { - return server.AgentSkill(skill) - })) - } for _, tool := range agent.Spec.Declarative.Tools { // Skip tools that are not applicable to the model provider @@ -553,7 +558,7 @@ func (a *adkApiTranslator) translateInlineAgent(ctx context.Context, agent *v1al case tool.McpServer != nil: err := a.translateMCPServerTarget(ctx, cfg, agent.Namespace, tool.McpServer, tool.HeadersFrom) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, err } case tool.Agent != nil: agentRef := types.NamespacedName{ @@ -562,14 +567,14 @@ func (a *adkApiTranslator) translateInlineAgent(ctx context.Context, agent *v1al } if agentRef.Namespace == agent.Namespace && agentRef.Name == agent.Name { - return nil, nil, nil, nil, fmt.Errorf("agent tool cannot be used to reference itself, %s", agentRef) + return nil, nil, nil, fmt.Errorf("agent tool cannot be used to reference itself, %s", agentRef) } // Translate a nested tool toolAgent := &v1alpha2.Agent{} err := a.kube.Get(ctx, agentRef, toolAgent) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, err } switch toolAgent.Spec.Type { @@ -577,7 +582,7 @@ func (a *adkApiTranslator) translateInlineAgent(ctx context.Context, agent *v1al url := fmt.Sprintf("http://%s.%s:8080", toolAgent.Name, toolAgent.Namespace) headers, err := tool.ResolveHeaders(ctx, a.kube, agent.Namespace) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, err } cfg.RemoteAgents = append(cfg.RemoteAgents, adk.RemoteAgentConfig{ @@ -587,15 +592,15 @@ func (a *adkApiTranslator) translateInlineAgent(ctx context.Context, agent *v1al Description: toolAgent.Spec.Description, }) default: - return nil, nil, nil, nil, fmt.Errorf("unknown agent type: %s", toolAgent.Spec.Type) + return nil, nil, nil, fmt.Errorf("unknown agent type: %s", toolAgent.Spec.Type) } default: - return nil, nil, nil, nil, fmt.Errorf("tool must have a provider or tool server") + return nil, nil, nil, fmt.Errorf("tool must have a provider or tool server") } } - return cfg, agentCard, mdd, secretHashBytes, nil + return cfg, mdd, secretHashBytes, nil } func (a *adkApiTranslator) resolveSystemMessage(ctx context.Context, agent *v1alpha2.Agent) (string, error) { diff --git a/go/pkg/app/app.go b/go/pkg/app/app.go index 1ee1e65e1..de61918f9 100644 --- a/go/pkg/app/app.go +++ b/go/pkg/app/app.go @@ -40,7 +40,6 @@ import ( "github.com/kagent-dev/kagent/go/internal/database" versionmetrics "github.com/kagent-dev/kagent/go/internal/metrics" - a2a_reconciler "github.com/kagent-dev/kagent/go/internal/controller/a2a" "github.com/kagent-dev/kagent/go/internal/controller/reconciler" reconcilerutils "github.com/kagent-dev/kagent/go/internal/controller/reconciler/utils" agent_translator "github.com/kagent-dev/kagent/go/internal/controller/translator/agent" @@ -351,25 +350,11 @@ func Start(getExtensionConfig GetExtensionConfig) { extensionCfg.AgentPlugins, ) - a2aHandler := a2a.NewA2AHttpMux(httpserver.APIPathA2A, extensionCfg.Authenticator) - - a2aReconciler := a2a_reconciler.NewReconciler( - a2aHandler, - cfg.A2ABaseUrl+httpserver.APIPathA2A, - a2a_reconciler.ClientOptions{ - StreamingMaxBufSize: int(cfg.Streaming.MaxBufSize.Value()), - StreamingInitialBufSize: int(cfg.Streaming.InitialBufSize.Value()), - Timeout: cfg.Streaming.Timeout, - }, - extensionCfg.Authenticator, - ) - rcnclr := reconciler.NewKagentReconciler( apiTranslator, mgr.GetClient(), dbClient, cfg.DefaultModelConfig, - a2aReconciler, ) if err := (&controller.ServiceController{ @@ -418,6 +403,23 @@ func Start(getExtensionConfig GetExtensionConfig) { os.Exit(1) } + // Register A2A handlers on all replicas + a2aHandler := a2a.NewA2AHttpMux(httpserver.APIPathA2A, extensionCfg.Authenticator) + + if err := mgr.Add(a2a.NewA2ARegistrar( + mgr.GetCache(), + apiTranslator, + a2aHandler, + cfg.A2ABaseUrl+httpserver.APIPathA2A, + extensionCfg.Authenticator, + int(cfg.Streaming.MaxBufSize.Value()), + int(cfg.Streaming.InitialBufSize.Value()), + cfg.Streaming.Timeout, + )); err != nil { + setupLog.Error(err, "unable to set up a2a registrar") + os.Exit(1) + } + // +kubebuilder:scaffold:builder if metricsCertWatcher != nil { setupLog.Info("Adding metrics certificate watcher to manager") From daabc363c5e0b32b72a602fb1960fbcf6c9ab210 Mon Sep 17 00:00:00 2001 From: Brian Fox Date: Mon, 24 Nov 2025 13:20:20 +0100 Subject: [PATCH 2/3] feat(helm): set leader elect flag using configured replicas for controller Signed-off-by: Brian Fox <878612+onematchfox@users.noreply.github.com> --- helm/kagent/templates/_helpers.tpl | 7 +++++++ helm/kagent/templates/controller-configmap.yaml | 1 + 2 files changed, 8 insertions(+) diff --git a/helm/kagent/templates/_helpers.tpl b/helm/kagent/templates/_helpers.tpl index 73a933553..059b93e73 100644 --- a/helm/kagent/templates/_helpers.tpl +++ b/helm/kagent/templates/_helpers.tpl @@ -107,6 +107,13 @@ Engine labels app.kubernetes.io/component: engine {{- end }} +{{/* +Check if leader election should be enabled (more than 1 replica) +*/}} +{{- define "kagent.leaderElectionEnabled" -}} +{{- gt (.Values.controller.replicas | int) 1 -}} +{{- end -}} + {{/* Validate controller configuration */}} diff --git a/helm/kagent/templates/controller-configmap.yaml b/helm/kagent/templates/controller-configmap.yaml index 792b58c01..dcf8b34a3 100644 --- a/helm/kagent/templates/controller-configmap.yaml +++ b/helm/kagent/templates/controller-configmap.yaml @@ -15,6 +15,7 @@ data: IMAGE_REGISTRY: {{ .Values.controller.agentImage.registry | default .Values.registry | quote }} IMAGE_REPOSITORY: {{ .Values.controller.agentImage.repository | quote }} IMAGE_TAG: {{ coalesce .Values.controller.agentImage.tag .Values.tag .Chart.Version | quote }} + LEADER_ELECT: {{ include "kagent.leaderElectionEnabled" . | quote }} OTEL_EXPORTER_OTLP_ENDPOINT: {{ .Values.otel.tracing.exporter.otlp.endpoint | quote }} OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: {{ .Values.otel.logging.exporter.otlp.endpoint | quote }} OTEL_EXPORTER_OTLP_LOGS_INSECURE: {{ .Values.otel.logging.exporter.otlp.insecure | quote }} From 495e9158f952e42e264eff67f32d74d171166b17 Mon Sep 17 00:00:00 2001 From: Brian Fox Date: Mon, 24 Nov 2025 15:23:18 +0100 Subject: [PATCH 3/3] feat(helm): add RBAC resources for leader election Signed-off-by: Brian Fox <878612+onematchfox@users.noreply.github.com> --- .../templates/rbac/leader-election-role.yaml | 28 +++++++++++++++++++ .../rbac/leader-election-rolebinding.yaml | 17 +++++++++++ 2 files changed, 45 insertions(+) create mode 100644 helm/kagent/templates/rbac/leader-election-role.yaml create mode 100644 helm/kagent/templates/rbac/leader-election-rolebinding.yaml diff --git a/helm/kagent/templates/rbac/leader-election-role.yaml b/helm/kagent/templates/rbac/leader-election-role.yaml new file mode 100644 index 000000000..82df4b781 --- /dev/null +++ b/helm/kagent/templates/rbac/leader-election-role.yaml @@ -0,0 +1,28 @@ +{{- if eq (include "kagent.leaderElectionEnabled" .) "true" }} +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: {{ include "kagent.fullname" . }}-leader-election-role + namespace: {{ include "kagent.namespace" . }} + labels: + {{- include "kagent.controller.labels" . | nindent 4 }} +rules: +- apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - get + - list + - watch + - create + - update + - patch +- apiGroups: + - "" + resources: + - events + verbs: + - create + - patch +{{- end }} \ No newline at end of file diff --git a/helm/kagent/templates/rbac/leader-election-rolebinding.yaml b/helm/kagent/templates/rbac/leader-election-rolebinding.yaml new file mode 100644 index 000000000..4a7a86949 --- /dev/null +++ b/helm/kagent/templates/rbac/leader-election-rolebinding.yaml @@ -0,0 +1,17 @@ +{{- if eq (include "kagent.leaderElectionEnabled" .) "true" }} +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: {{ include "kagent.fullname" . }}-leader-election-rolebinding + namespace: {{ include "kagent.namespace" . }} + labels: + {{- include "kagent.controller.labels" . | nindent 4 }} +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: {{ include "kagent.fullname" . }}-leader-election-role +subjects: +- kind: ServiceAccount + name: {{ include "kagent.fullname" . }}-controller + namespace: {{ include "kagent.namespace" . }} +{{- end }} \ No newline at end of file