diff --git a/go/internal/controller/a2a/a2a_reconciler.go b/go/internal/controller/a2a/a2a_reconciler.go index 357d0014d..f9f693f9c 100644 --- a/go/internal/controller/a2a/a2a_reconciler.go +++ b/go/internal/controller/a2a/a2a_reconciler.go @@ -6,6 +6,7 @@ import ( "net" "net/http" "os" + "time" "github.com/kagent-dev/kagent/go/api/v1alpha2" "github.com/kagent-dev/kagent/go/internal/a2a" @@ -27,28 +28,30 @@ type A2AReconciler interface { ) } -type a2aReconciler struct { - a2aHandler a2a.A2AHandlerMux - a2aBaseUrl string +type ClientOptions struct { + StreamingMaxBufSize int + StreamingInitialBufSize int + Timeout time.Duration +} - streamingMaxBufSize int - streamingInitialBufSize int - authenticator auth.AuthProvider +type a2aReconciler struct { + a2aHandler a2a.A2AHandlerMux + a2aBaseUrl string + authenticator auth.AuthProvider + clientOptions ClientOptions } func NewReconciler( a2aHandler a2a.A2AHandlerMux, a2aBaseUrl string, - streamingMaxBufSize int, - streamingInitialBufSize int, + clientOptions ClientOptions, authenticator auth.AuthProvider, ) A2AReconciler { return &a2aReconciler{ - a2aHandler: a2aHandler, - a2aBaseUrl: a2aBaseUrl, - streamingMaxBufSize: streamingMaxBufSize, - streamingInitialBufSize: streamingInitialBufSize, - authenticator: authenticator, + a2aHandler: a2aHandler, + a2aBaseUrl: a2aBaseUrl, + clientOptions: clientOptions, + authenticator: authenticator, } } @@ -60,8 +63,9 @@ func (a *a2aReconciler) ReconcileAgent( agentRef := common.GetObjectRef(agent) client, err := a2aclient.NewA2AClient(card.URL, + a2aclient.WithTimeout(a.clientOptions.Timeout), + a2aclient.WithBuffer(a.clientOptions.StreamingInitialBufSize, a.clientOptions.StreamingMaxBufSize), debugOpt(), - a2aclient.WithBuffer(a.streamingInitialBufSize, a.streamingMaxBufSize), a2aclient.WithHTTPReqHandler(auth.A2ARequestHandler(a.authenticator)), ) if err != nil { diff --git a/go/pkg/app/app.go b/go/pkg/app/app.go index 9a6a33f20..840cfc436 100644 --- a/go/pkg/app/app.go +++ b/go/pkg/app/app.go @@ -25,6 +25,7 @@ import ( "os" "path/filepath" "strings" + "time" "github.com/kagent-dev/kagent/go/internal/version" @@ -101,6 +102,7 @@ type Config struct { Streaming struct { MaxBufSize resource.QuantityValue `default:"1Mi"` InitialBufSize resource.QuantityValue `default:"4Ki"` + Timeout time.Duration `default:"60s"` } LeaderElection bool ProbeAddr string @@ -145,6 +147,7 @@ func (cfg *Config) SetFlags(commandLine *flag.FlagSet) { commandLine.Var(&cfg.Streaming.MaxBufSize, "streaming-max-buf-size", "The maximum size of the streaming buffer.") commandLine.Var(&cfg.Streaming.InitialBufSize, "streaming-initial-buf-size", "The initial size of the streaming buffer.") + commandLine.DurationVar(&cfg.Streaming.Timeout, "streaming-timeout", 60*time.Second, "The timeout for the streaming connection.") } func Start(authenticator auth.AuthProvider, authorizer auth.Authorizer) { @@ -301,8 +304,11 @@ func Start(authenticator auth.AuthProvider, authorizer auth.Authorizer) { a2aReconciler := a2a_reconciler.NewReconciler( a2aHandler, cfg.A2ABaseUrl+httpserver.APIPathA2A, - int(cfg.Streaming.MaxBufSize.Value()), - int(cfg.Streaming.InitialBufSize.Value()), + a2a_reconciler.ClientOptions{ + StreamingMaxBufSize: int(cfg.Streaming.MaxBufSize.Value()), + StreamingInitialBufSize: int(cfg.Streaming.InitialBufSize.Value()), + Timeout: cfg.Streaming.Timeout, + }, authenticator, ) diff --git a/helm/kagent/templates/controller-deployment.yaml b/helm/kagent/templates/controller-deployment.yaml index 2ff7a0a63..9a1def876 100644 --- a/helm/kagent/templates/controller-deployment.yaml +++ b/helm/kagent/templates/controller-deployment.yaml @@ -54,6 +54,8 @@ spec: - {{ .Values.controller.streaming.maxBufSize | quote }} - -streaming-initial-buf-size - {{ .Values.controller.streaming.initialBufSize | quote }} + - -streaming-timeout + - {{ .Values.controller.streaming.timeout | quote }} - -database-type - {{ .Values.database.type }} {{- if eq .Values.database.type "sqlite" }} diff --git a/helm/kagent/values.yaml b/helm/kagent/values.yaml index 0399bda49..85dea2f3d 100644 --- a/helm/kagent/values.yaml +++ b/helm/kagent/values.yaml @@ -67,7 +67,7 @@ controller: streaming: # Streaming buffer size for A2A communication maxBufSize: 1Mi # 1024 * 1024 initialBufSize: 4Ki # 4 * 1024 - + timeout: 60s # 60 seconds # -- Namespaces the controller should watch. # If empty, the controller will watch ALL available namespaces. # @default -- [] (watches all available namespaces)