Skip to content

Commit

Permalink
chore: sync.OnceValues for dsRange
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Oct 22, 2024
1 parent ac9e24f commit 52427e4
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 26 deletions.
63 changes: 41 additions & 22 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,11 @@ type dataSetRangeT struct {
ds dataSetT
}

type dsRangeMinMax struct {
minJobID sql.NullInt64
maxJobID sql.NullInt64
}

/*
Handle is the main type implementing the database for implementing
jobs. The caller must call the SetUp function on a Handle object
Expand All @@ -455,6 +460,7 @@ type Handle struct {

datasetList []dataSetT
datasetRangeList []dataSetRangeT
dsRangeFuncMap map[string]func() (dsRangeMinMax, error)
dsListLock *lock.Locker
dsMigrationLock *lock.Locker
noResultsCache *cache.NoResultsCache[ParameterFilterT]
Expand Down Expand Up @@ -755,6 +761,7 @@ func (jd *Handle) init() {
if jd.logger == nil {
jd.logger = logger.NewLogger().Child("jobsdb").Child(jd.tablePrefix)
}
jd.dsRangeFuncMap = make(map[string]func() (dsRangeMinMax, error))

if jd.config == nil {
jd.config = config.Default
Expand Down Expand Up @@ -1155,23 +1162,37 @@ func (jd *Handle) doRefreshDSRangeList(l lock.LockToken) error {
}
var datasetRangeList []dataSetRangeT

for idx, ds := range dsList {
for idx := 0; idx < len(dsList)-1; idx++ {
ds := dsList[idx]
jd.assert(ds.Index != "", "ds.Index is empty")

getIndex := func() (sql.NullInt64, sql.NullInt64, error) {
var minID, maxID sql.NullInt64
sqlStatement := fmt.Sprintf(`SELECT MIN(job_id), MAX(job_id) FROM %q`, ds.JobTable)
row := jd.dbHandle.QueryRow(sqlStatement)
if err := row.Scan(&minID, &maxID); err != nil {
return sql.NullInt64{}, sql.NullInt64{}, fmt.Errorf("scanning min & max jobID %w", err)
if _, ok := jd.dsRangeFuncMap[ds.Index]; !ok {
getIndex := func() (sql.NullInt64, sql.NullInt64, error) {
var minID, maxID sql.NullInt64
sqlStatement := fmt.Sprintf(`SELECT MIN(job_id), MAX(job_id) FROM %q`, ds.JobTable)
row := jd.dbHandle.QueryRow(sqlStatement)
if err := row.Scan(&minID, &maxID); err != nil {
return sql.NullInt64{}, sql.NullInt64{}, fmt.Errorf("scanning min & max jobID %w", err)
}
jd.logger.Debug(sqlStatement, minID, maxID)
return minID, maxID, nil
}
jd.logger.Debug(sqlStatement, minID, maxID)
return minID, maxID, nil
jd.dsRangeFuncMap[ds.Index] = sync.OnceValues(func() (dsRangeMinMax, error) {
minID, maxID, err := getIndex()
if err != nil {
return dsRangeMinMax{}, fmt.Errorf("getIndex %w", err)
}
return dsRangeMinMax{
minJobID: minID,
maxJobID: maxID,
}, nil
})
}
minID, maxID, err := getIndex()
minMax, err := jd.dsRangeFuncMap[ds.Index]()
if err != nil {
return err
}
minID, maxID := minMax.minJobID, minMax.maxJobID

// We store ranges EXCEPT for
// 1. the last element (which is being actively written to)
Expand All @@ -1185,18 +1206,16 @@ func (jd *Handle) doRefreshDSRangeList(l lock.LockToken) error {
continue
}

if idx < len(dsList)-1 {
// TODO: Cleanup - Remove the line below and jd.inProgressMigrationTargetDS
jd.assert(minID.Valid && maxID.Valid, fmt.Sprintf("minID.Valid: %v, maxID.Valid: %v. Either of them is false for table: %s", minID.Valid, maxID.Valid, ds.JobTable))
jd.assert(idx == 0 || prevMax < minID.Int64, fmt.Sprintf("idx: %d != 0 and prevMax: %d >= minID.Int64: %v of table: %s", idx, prevMax, minID.Int64, ds.JobTable))
datasetRangeList = append(datasetRangeList,
dataSetRangeT{
minJobID: minID.Int64,
maxJobID: maxID.Int64,
ds: ds,
})
prevMax = maxID.Int64
}
// TODO: Cleanup - Remove the line below and jd.inProgressMigrationTargetDS
jd.assert(minID.Valid && maxID.Valid, fmt.Sprintf("minID.Valid: %v, maxID.Valid: %v. Either of them is false for table: %s", minID.Valid, maxID.Valid, ds.JobTable))
jd.assert(idx == 0 || prevMax < minID.Int64, fmt.Sprintf("idx: %d != 0 and prevMax: %d >= minID.Int64: %v of table: %s", idx, prevMax, minID.Int64, ds.JobTable))
datasetRangeList = append(datasetRangeList,
dataSetRangeT{
minJobID: minID.Int64,
maxJobID: maxID.Int64,
ds: ds,
})
prevMax = maxID.Int64
}
jd.datasetRangeList = datasetRangeList
return nil
Expand Down
14 changes: 10 additions & 4 deletions jobsdb/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/lib/pq"
"github.com/samber/lo"

"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/jobsdb/internal/dsindex"
"github.com/rudderlabs/rudder-server/jobsdb/internal/lock"
Expand Down Expand Up @@ -110,9 +111,13 @@ func (jd *Handle) doMigrateDS(ctx context.Context) error {
return err
}

jd.logger.Infof("[[ migrateDSLoop ]]: Migrate from: %v", migrateFrom)
jd.logger.Infof("[[ migrateDSLoop ]]: To: %v", destination)
jd.logger.Infof("[[ migrateDSLoop ]]: Next: %v", insertBeforeDS)
jd.logger.Infon(
"[[ migrateDSLoop ]]",
logger.NewField("pendingJobsCount", pendingJobsCount),
logger.NewField("migrateFrom", migrateFrom),
logger.NewField("to", destination),
logger.NewField("insert before", insertBeforeDS),
)

opPayload, err := json.Marshal(&journalOpPayloadT{From: migrateFrom, To: destination})
if err != nil {
Expand Down Expand Up @@ -491,6 +496,7 @@ func (jd *Handle) postMigrateHandleDS(tx *Tx, migrateFrom []dataSetT) error {
if err := jd.dropDSInTx(tx, ds); err != nil {
return err
}
delete(jd.dsRangeFuncMap, ds.Index)
}
return nil
}
Expand Down Expand Up @@ -535,7 +541,7 @@ func (jd *Handle) checkIfMigrateDS(ds dataSetT) (
(select count(*) from %[1]q) as totalJobCount,
(select count(*) from %[2]q where job_state = ANY($1)) as terminalJobCount,
(select created_at from %[1]q order by job_id desc limit 1) as maxCreatedAt,
(select exec_time < $2 from %[2]q where job_state = ANY($1) order by id asc limit 1) as retentionExpired
(select COALESCE(exec_time, NOW()) < $2 from %[2]q where job_state = ANY($1) order by id asc limit 1) as retentionExpired
)
select totalJobCount, terminalJobCount, maxCreatedAt, retentionExpired from combinedResult`,
ds.JobTable, ds.JobStatusTable)
Expand Down

0 comments on commit 52427e4

Please sign in to comment.