diff --git a/cmd/workflow-controller/main.go b/cmd/workflow-controller/main.go index f9abfea00330..c2a4fabf273c 100644 --- a/cmd/workflow-controller/main.go +++ b/cmd/workflow-controller/main.go @@ -57,6 +57,7 @@ func NewRootCommand() *cobra.Command { workflowWorkers int // --workflow-workers workflowTTLWorkers int // --workflow-ttl-workers podCleanupWorkers int // --pod-cleanup-workers + cronWorkflowWorkers int // --cron-workflow-workers burst int qps float32 namespaced bool // --namespaced @@ -116,7 +117,7 @@ func NewRootCommand() *cobra.Command { if leaderElectionOff == "true" { log.Info("Leader election is turned off. Running in single-instance mode") log.WithField("id", "single-instance").Info("starting leading") - go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers) + go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers) go wfController.RunMetricsServer(ctx, false) } else { nodeID, ok := os.LookupEnv("LEADER_ELECTION_IDENTITY") @@ -146,7 +147,7 @@ func NewRootCommand() *cobra.Command { Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { dummyCancel() - go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers) + go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers) go wfController.RunMetricsServer(ctx, false) }, OnStoppedLeading: func() { @@ -183,6 +184,7 @@ func NewRootCommand() *cobra.Command { command.Flags().IntVar(&workflowWorkers, "workflow-workers", 32, "Number of workflow workers") command.Flags().IntVar(&workflowTTLWorkers, "workflow-ttl-workers", 4, "Number of workflow TTL workers") command.Flags().IntVar(&podCleanupWorkers, "pod-cleanup-workers", 4, "Number of pod cleanup workers") + command.Flags().IntVar(&cronWorkflowWorkers, "cron-workflow-workers", 8, "Number of cron workflow workers") command.Flags().IntVar(&burst, "burst", 30, "Maximum burst for throttle.") command.Flags().Float32Var(&qps, "qps", 20.0, "Queries per second") command.Flags().BoolVar(&namespaced, "namespaced", false, "run workflow-controller as namespaced mode") diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index dd0e29db01ea..bbab1d57c5e3 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -217,10 +217,10 @@ func (wfc *WorkflowController) runGCcontroller(ctx context.Context, workflowTTLW } } -func (wfc *WorkflowController) runCronController(ctx context.Context) { +func (wfc *WorkflowController) runCronController(ctx context.Context, cronWorkflowWorkers int) { defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...) - cronController := cron.NewCronController(wfc.wfclientset, wfc.dynamicInterface, wfc.namespace, wfc.GetManagedNamespace(), wfc.Config.InstanceID, wfc.metrics, wfc.eventRecorderManager) + cronController := cron.NewCronController(wfc.wfclientset, wfc.dynamicInterface, wfc.namespace, wfc.GetManagedNamespace(), wfc.Config.InstanceID, wfc.metrics, wfc.eventRecorderManager, cronWorkflowWorkers) cronController.Run(ctx) } @@ -235,7 +235,7 @@ var indexers = cache.Indexers{ } // Run starts an Workflow resource controller -func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWorkers, podCleanupWorkers int) { +func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers int) { defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...) // init DB after leader election (if enabled) @@ -305,7 +305,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo go wfc.archivedWorkflowGarbageCollector(ctx.Done()) go wfc.runGCcontroller(ctx, workflowTTLWorkers) - go wfc.runCronController(ctx) + go wfc.runCronController(ctx, cronWorkflowWorkers) go wait.Until(wfc.syncWorkflowPhaseMetrics, 15*time.Second, ctx.Done()) go wait.Until(wfc.syncPodPhaseMetrics, 15*time.Second, ctx.Done()) diff --git a/workflow/cron/controller.go b/workflow/cron/controller.go index 9e16e70ef528..8b9c25b67a34 100644 --- a/workflow/cron/controller.go +++ b/workflow/cron/controller.go @@ -48,11 +48,11 @@ type Controller struct { dynamicInterface dynamic.Interface metrics *metrics.Metrics eventRecorderManager events.EventRecorderManager + cronWorkflowWorkers int } const ( cronWorkflowResyncPeriod = 20 * time.Minute - cronWorkflowWorkers = 8 ) var ( @@ -68,7 +68,7 @@ func init() { log.WithField("cronSyncPeriod", cronSyncPeriod).Info("cron config") } -func NewCronController(wfclientset versioned.Interface, dynamicInterface dynamic.Interface, namespace string, managedNamespace string, instanceId string, metrics *metrics.Metrics, eventRecorderManager events.EventRecorderManager) *Controller { +func NewCronController(wfclientset versioned.Interface, dynamicInterface dynamic.Interface, namespace string, managedNamespace string, instanceId string, metrics *metrics.Metrics, eventRecorderManager events.EventRecorderManager, cronWorkflowWorkers int) *Controller { return &Controller{ wfClientset: wfclientset, namespace: namespace, @@ -80,6 +80,7 @@ func NewCronController(wfclientset versioned.Interface, dynamicInterface dynamic cronWfQueue: metrics.RateLimiterWithBusyWorkers(workqueue.DefaultControllerRateLimiter(), "cron_wf_queue"), metrics: metrics, eventRecorderManager: eventRecorderManager, + cronWorkflowWorkers: cronWorkflowWorkers, } } @@ -110,7 +111,7 @@ func (cc *Controller) Run(ctx context.Context) { go wait.UntilWithContext(ctx, cc.syncAll, cronSyncPeriod) - for i := 0; i < cronWorkflowWorkers; i++ { + for i := 0; i < cc.cronWorkflowWorkers; i++ { go wait.Until(cc.runCronWorker, time.Second, ctx.Done()) }