Skip to content

Commit

Permalink
Pick db conn pool fix to 1.1 (#13313)
Browse files Browse the repository at this point in the history
fix db connection pool leaking

Approved by: @xzxiong, @sukki37
  • Loading branch information
gavinyue authored Dec 11, 2023
1 parent 20f2847 commit 65fe967
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
21 changes: 15 additions & 6 deletions pkg/util/export/etl/db/db_holder.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,22 @@ func SetDBConn(conn *sql.DB) {
db.Store(conn)
}

func InitOrRefreshDBConn(forceNewConn bool, randomCN bool) (*sql.DB, error) {
func CloseDBConn() {
dbVal := db.Load()
if dbVal == nil {
return
}
dbConn := dbVal.(*sql.DB)
if dbConn != nil {
dbConn.Close()
}
}

func GetOrInitDBConn(forceNewConn bool, randomCN bool) (*sql.DB, error) {
initFunc := func() error {
dbMux.Lock()
defer dbMux.Unlock()
CloseDBConn()
dbUser, _ := GetSQLWriterDBUser()
if dbUser == nil {
return errNotReady
Expand Down Expand Up @@ -137,13 +149,10 @@ func WriteRowRecords(records [][]string, tbl *table.Table, timeout time.Duration
var dbConn *sql.DB

if DBConnErrCount.Load() > DBConnRetryThreshold {
if dbConn != nil {
dbConn.Close()
}
dbConn, err = InitOrRefreshDBConn(true, true)
dbConn, err = GetOrInitDBConn(true, true)
DBConnErrCount.Store(0)
} else {
dbConn, err = InitOrRefreshDBConn(false, false)
dbConn, err = GetOrInitDBConn(false, false)
}
if err != nil {
return 0, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/export/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func (m *Merge) doMergeFiles(ctx context.Context, files []*FileMeta) error {
}

// Check if the first record already exists in the database
existed, err = db_holder.IsRecordExisted(ctx, firstLine, m.table, db_holder.InitOrRefreshDBConn)
existed, err = db_holder.IsRecordExisted(ctx, firstLine, m.table, db_holder.GetOrInitDBConn)
if err != nil {
m.logger.Error("error checking if the first record exists",
logutil.TableField(m.table.GetIdentify()),
Expand Down

0 comments on commit 65fe967

Please sign in to comment.