Skip to content

Commit

Permalink
chore: ensure internal migration of large job-status datasets (#2748)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth authored Dec 1, 2022
1 parent 3bb21c9 commit a0fc4f5
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
2 changes: 2 additions & 0 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ var (
backupRowsBatchSize int64
backupMaxTotalPayloadSize int64
pkgLogger logger.Logger
jobStatusCountMigrationCheck bool // TODO: Remove this in next release
)

// Loads db config and migration related config from config file
Expand Down Expand Up @@ -684,6 +685,7 @@ func loadConfig() {
config.RegisterDurationConfigVariable(5, &refreshDSListLoopSleepDuration, true, time.Second, []string{"JobsDB.refreshDSListLoopSleepDuration", "JobsDB.refreshDSListLoopSleepDurationInS"}...)
config.RegisterDurationConfigVariable(5, &backupCheckSleepDuration, true, time.Second, []string{"JobsDB.backupCheckSleepDuration", "JobsDB.backupCheckSleepDurationIns"}...)
config.RegisterDurationConfigVariable(5, &cacheExpiration, true, time.Minute, []string{"JobsDB.cacheExpiration"}...)
config.RegisterBoolConfigVariable(false, &jobStatusCountMigrationCheck, true, "JobsDB.jobStatusCountMigrationCheck")
}

func Init2() {
Expand Down
18 changes: 10 additions & 8 deletions jobsdb/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (jd *HandleT) checkIfMigrateDS(ds dataSetT) (
queryStat.Start()
defer queryStat.End()

var delCount, totalCount int
var delCount, totalCount, statusCount int
sqlStatement := fmt.Sprintf(`SELECT COUNT(*) from %q`, ds.JobTable)
row := jd.dbHandle.QueryRow(sqlStatement)
err := row.Scan(&totalCount)
Expand All @@ -424,11 +424,13 @@ func (jd *HandleT) checkIfMigrateDS(ds dataSetT) (
err = row.Scan(&delCount)
jd.assertError(err)

if totalCount == 0 {
jd.assert(
delCount == 0,
fmt.Sprintf("delCount: %d. Either of them is not 0", delCount))
return false, false, 0
if jobStatusCountMigrationCheck {
// Total number of job status. If this table grows too big (e.g. a lot of retries)
// we migrate to a new table and get rid of old job status
sqlStatement = fmt.Sprintf(`SELECT COUNT(*) from %q`, ds.JobStatusTable)
row = jd.dbHandle.QueryRow(sqlStatement)
err = row.Scan(&statusCount)
jd.assertError(err)
}

recordsLeft = totalCount - delCount
Expand Down Expand Up @@ -466,10 +468,10 @@ func (jd *HandleT) checkIfMigrateDS(ds dataSetT) (

smallThreshold := jobMinRowsMigrateThres * float64(*jd.MaxDSSize)
isSmall := func() bool {
return float64(totalCount) < smallThreshold
return float64(totalCount) < smallThreshold && float64(statusCount) < smallThreshold
}

if float64(delCount)/float64(totalCount) > jobDoneMigrateThres {
if float64(delCount)/float64(totalCount) > jobDoneMigrateThres || (float64(statusCount)/float64(totalCount) > jobStatusMigrateThres) {
return true, isSmall(), recordsLeft
}

Expand Down

0 comments on commit a0fc4f5

Please sign in to comment.