diff --git a/workflow/controller/config.go b/workflow/controller/config.go index 483fe3f07182..37dc20ffc78b 100644 --- a/workflow/controller/config.go +++ b/workflow/controller/config.go @@ -27,6 +27,7 @@ func (wfc *WorkflowController) updateConfig() error { wfc.offloadNodeStatusRepo = sqldb.ExplosiveOffloadNodeStatusRepo wfc.wfArchive = sqldb.NullWorkflowArchive wfc.archiveLabelSelector = labels.Everything() + persistence := wfc.Config.Persistence if persistence != nil { log.Info("Persistence configuration enabled") @@ -40,14 +41,7 @@ func (wfc *WorkflowController) updateConfig() error { return err } log.Info("Persistence Session created successfully") - if !persistence.SkipMigration { - err = sqldb.NewMigrate(session, persistence.GetClusterName(), tableName).Exec(context.Background()) - if err != nil { - return err - } - } else { - log.Info("DB migration is disabled") - } + wfc.session = session } sqldb.ConfigureDBSession(wfc.session, persistence.ConnectionPool) @@ -75,6 +69,7 @@ func (wfc *WorkflowController) updateConfig() error { } else { log.Info("Persistence configuration disabled") } + wfc.hydrator = hydrator.New(wfc.offloadNodeStatusRepo) wfc.updateEstimatorFactory() wfc.rateLimiter = wfc.newRateLimiter() @@ -86,6 +81,27 @@ func (wfc *WorkflowController) updateConfig() error { return nil } +// initDB inits argo DB tables +func (wfc *WorkflowController) initDB() error { + persistence := wfc.Config.Persistence + if persistence != nil { + tableName, err := sqldb.GetTableName(persistence) + if err != nil { + return err + } + if !persistence.SkipMigration { + err = sqldb.NewMigrate(wfc.session, persistence.GetClusterName(), tableName).Exec(context.Background()) + if err != nil { + return err + } + } else { + log.Info("DB migration is disabled") + } + } + + return nil +} + func (wfc *WorkflowController) newRateLimiter() *rate.Limiter { return rate.NewLimiter(rate.Limit(wfc.Config.GetResourceRateLimit().Limit), wfc.Config.GetResourceRateLimit().Burst) } diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index c4ac77d78194..51ec47decd93 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -240,6 +240,11 @@ var indexers = cache.Indexers{ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers int) { defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...) + // init DB after leader election (if enabled) + if err := wfc.initDB(); err != nil { + log.Fatalf("Failed to init db: %v", err) + } + ctx, cancel := context.WithCancel(ctx) defer cancel()