Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Expose the Cron workflow workers as argument #11457

Merged
merged 3 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cmd/workflow-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func NewRootCommand() *cobra.Command {
workflowWorkers int // --workflow-workers
workflowTTLWorkers int // --workflow-ttl-workers
podCleanupWorkers int // --pod-cleanup-workers
cronWorkflowWorkers int // --cron-workflow-workers
burst int
qps float32
namespaced bool // --namespaced
Expand Down Expand Up @@ -116,7 +117,7 @@ func NewRootCommand() *cobra.Command {
if leaderElectionOff == "true" {
log.Info("Leader election is turned off. Running in single-instance mode")
log.WithField("id", "single-instance").Info("starting leading")
go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers)
go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers)
go wfController.RunMetricsServer(ctx, false)
} else {
nodeID, ok := os.LookupEnv("LEADER_ELECTION_IDENTITY")
Expand Down Expand Up @@ -146,7 +147,7 @@ func NewRootCommand() *cobra.Command {
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
dummyCancel()
go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers)
go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers)
go wfController.RunMetricsServer(ctx, false)
},
OnStoppedLeading: func() {
Expand Down Expand Up @@ -183,6 +184,7 @@ func NewRootCommand() *cobra.Command {
command.Flags().IntVar(&workflowWorkers, "workflow-workers", 32, "Number of workflow workers")
command.Flags().IntVar(&workflowTTLWorkers, "workflow-ttl-workers", 4, "Number of workflow TTL workers")
command.Flags().IntVar(&podCleanupWorkers, "pod-cleanup-workers", 4, "Number of pod cleanup workers")
command.Flags().IntVar(&cronWorkflowWorkers, "cron-workflow-workers", 8, "Number of cron workflow workers")
command.Flags().IntVar(&burst, "burst", 30, "Maximum burst for throttle.")
command.Flags().Float32Var(&qps, "qps", 20.0, "Queries per second")
command.Flags().BoolVar(&namespaced, "namespaced", false, "run workflow-controller as namespaced mode")
Expand Down
8 changes: 4 additions & 4 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,10 @@ func (wfc *WorkflowController) runGCcontroller(ctx context.Context, workflowTTLW
}
}

func (wfc *WorkflowController) runCronController(ctx context.Context) {
func (wfc *WorkflowController) runCronController(ctx context.Context, cronWorkflowWorkers int) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

cronController := cron.NewCronController(wfc.wfclientset, wfc.dynamicInterface, wfc.namespace, wfc.GetManagedNamespace(), wfc.Config.InstanceID, wfc.metrics, wfc.eventRecorderManager)
cronController := cron.NewCronController(wfc.wfclientset, wfc.dynamicInterface, wfc.namespace, wfc.GetManagedNamespace(), wfc.Config.InstanceID, wfc.metrics, wfc.eventRecorderManager, cronWorkflowWorkers)
cronController.Run(ctx)
}

Expand All @@ -235,7 +235,7 @@ var indexers = cache.Indexers{
}

// Run starts an Workflow resource controller
func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWorkers, podCleanupWorkers int) {
func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers int) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

// init DB after leader election (if enabled)
Expand Down Expand Up @@ -305,7 +305,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
go wfc.archivedWorkflowGarbageCollector(ctx.Done())

go wfc.runGCcontroller(ctx, workflowTTLWorkers)
go wfc.runCronController(ctx)
go wfc.runCronController(ctx, cronWorkflowWorkers)
go wait.Until(wfc.syncWorkflowPhaseMetrics, 15*time.Second, ctx.Done())
go wait.Until(wfc.syncPodPhaseMetrics, 15*time.Second, ctx.Done())

Expand Down
7 changes: 4 additions & 3 deletions workflow/cron/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ type Controller struct {
dynamicInterface dynamic.Interface
metrics *metrics.Metrics
eventRecorderManager events.EventRecorderManager
cronWorkflowWorkers int
}

const (
cronWorkflowResyncPeriod = 20 * time.Minute
cronWorkflowWorkers = 8
)

var (
Expand All @@ -68,7 +68,7 @@ func init() {
log.WithField("cronSyncPeriod", cronSyncPeriod).Info("cron config")
}

func NewCronController(wfclientset versioned.Interface, dynamicInterface dynamic.Interface, namespace string, managedNamespace string, instanceId string, metrics *metrics.Metrics, eventRecorderManager events.EventRecorderManager) *Controller {
func NewCronController(wfclientset versioned.Interface, dynamicInterface dynamic.Interface, namespace string, managedNamespace string, instanceId string, metrics *metrics.Metrics, eventRecorderManager events.EventRecorderManager, cronWorkflowWorkers int) *Controller {
return &Controller{
wfClientset: wfclientset,
namespace: namespace,
Expand All @@ -80,6 +80,7 @@ func NewCronController(wfclientset versioned.Interface, dynamicInterface dynamic
cronWfQueue: metrics.RateLimiterWithBusyWorkers(workqueue.DefaultControllerRateLimiter(), "cron_wf_queue"),
metrics: metrics,
eventRecorderManager: eventRecorderManager,
cronWorkflowWorkers: cronWorkflowWorkers,
}
}

Expand Down Expand Up @@ -110,7 +111,7 @@ func (cc *Controller) Run(ctx context.Context) {

go wait.UntilWithContext(ctx, cc.syncAll, cronSyncPeriod)

for i := 0; i < cronWorkflowWorkers; i++ {
for i := 0; i < cc.cronWorkflowWorkers; i++ {
go wait.Until(cc.runCronWorker, time.Second, ctx.Done())
}

Expand Down