From 39ff41a32fe960f68691b6667d89d8f68079f427 Mon Sep 17 00:00:00 2001 From: Yuan Tang Date: Fri, 24 Mar 2023 14:32:36 -0400 Subject: [PATCH] fix: DB sessions are recreated whenever controller configmap updates. Fixes #10498 (#10734) --- persist/sqldb/sqldb.go | 24 ++++++++++++------------ workflow/controller/config.go | 30 ++++++++++++++++-------------- 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/persist/sqldb/sqldb.go b/persist/sqldb/sqldb.go index b51f4e0703e4..5b10d8aa65a5 100644 --- a/persist/sqldb/sqldb.go +++ b/persist/sqldb/sqldb.go @@ -65,12 +65,7 @@ func CreatePostGresDBSession(kubectlConfig kubernetes.Interface, namespace strin if err != nil { return nil, "", err } - - if persistPool != nil { - session.SetMaxOpenConns(persistPool.MaxOpenConns) - session.SetMaxIdleConns(persistPool.MaxIdleConns) - session.SetConnMaxLifetime(time.Duration(persistPool.ConnMaxLifetime)) - } + session = ConfigureDBSession(session, persistPool) return session, cfg.TableName, nil } @@ -100,12 +95,7 @@ func CreateMySQLDBSession(kubectlConfig kubernetes.Interface, namespace string, if err != nil { return nil, "", err } - - if persistPool != nil { - session.SetMaxOpenConns(persistPool.MaxOpenConns) - session.SetMaxIdleConns(persistPool.MaxIdleConns) - session.SetConnMaxLifetime(time.Duration(persistPool.ConnMaxLifetime)) - } + session = ConfigureDBSession(session, persistPool) // this is needed to make MySQL run in a Golang-compatible UTF-8 character set. _, err = session.Exec("SET NAMES 'utf8mb4'") if err != nil { @@ -117,3 +107,13 @@ func CreateMySQLDBSession(kubectlConfig kubernetes.Interface, namespace string, } return session, cfg.TableName, nil } + +// ConfigureDBSession configures the DB session +func ConfigureDBSession(session sqlbuilder.Database, persistPool *config.ConnectionPool) sqlbuilder.Database { + if persistPool != nil { + session.SetMaxOpenConns(persistPool.MaxOpenConns) + session.SetMaxIdleConns(persistPool.MaxIdleConns) + session.SetConnMaxLifetime(time.Duration(persistPool.ConnMaxLifetime)) + } + return session +} diff --git a/workflow/controller/config.go b/workflow/controller/config.go index 51494e302488..292f83a6f1d3 100644 --- a/workflow/controller/config.go +++ b/workflow/controller/config.go @@ -23,7 +23,6 @@ func (wfc *WorkflowController) updateConfig() error { return err } log.Info("Configuration:\n" + string(bytes)) - wfc.session = nil wfc.artifactRepositories = artifactrepositories.New(wfc.kubeclientset, wfc.namespace, &wfc.Config.ArtifactRepository) wfc.offloadNodeStatusRepo = sqldb.ExplosiveOffloadNodeStatusRepo wfc.wfArchive = sqldb.NullWorkflowArchive @@ -31,23 +30,26 @@ func (wfc *WorkflowController) updateConfig() error { persistence := wfc.Config.Persistence if persistence != nil { log.Info("Persistence configuration enabled") - session, tableName, err := sqldb.CreateDBSession(wfc.kubeclientset, wfc.namespace, persistence) - if err != nil { - return err - } - log.Info("Persistence Session created successfully") - if !persistence.SkipMigration { - err = sqldb.NewMigrate(session, persistence.GetClusterName(), tableName).Exec(context.Background()) + var tableName string + if wfc.session == nil { + session, tableName, err := sqldb.CreateDBSession(wfc.kubeclientset, wfc.namespace, persistence) if err != nil { return err } - } else { - log.Info("DB migration is disabled") + log.Info("Persistence Session created successfully") + if !persistence.SkipMigration { + err = sqldb.NewMigrate(session, persistence.GetClusterName(), tableName).Exec(context.Background()) + if err != nil { + return err + } + } else { + log.Info("DB migration is disabled") + } + wfc.session = session } - - wfc.session = session + sqldb.ConfigureDBSession(wfc.session, persistence.ConnectionPool) if persistence.NodeStatusOffload { - wfc.offloadNodeStatusRepo, err = sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName) + wfc.offloadNodeStatusRepo, err = sqldb.NewOffloadNodeStatusRepo(wfc.session, persistence.GetClusterName(), tableName) if err != nil { return err } @@ -62,7 +64,7 @@ func (wfc *WorkflowController) updateConfig() error { if err != nil { return err } - wfc.wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), wfc.managedNamespace, instanceIDService) + wfc.wfArchive = sqldb.NewWorkflowArchive(wfc.session, persistence.GetClusterName(), wfc.managedNamespace, instanceIDService) log.Info("Workflow archiving is enabled") } else { log.Info("Workflow archiving is disabled")