Skip to content

Commit

Permalink
chore(jobsdb): add support for schema migration changesets to run alw…
Browse files Browse the repository at this point in the history
…ays (#2746)
  • Loading branch information
atzoum authored Dec 1, 2022
1 parent 0cf5fd6 commit 463533e
Show file tree
Hide file tree
Showing 28 changed files with 205 additions and 286 deletions.
160 changes: 112 additions & 48 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,15 +835,57 @@ func (jd *HandleT) init() {
}
jd.workersAndAuxSetup()

// Database schema migration should happen early, even before jobsdb is started,
// so that we can be sure that all the necessary tables are created and considered to be in
// the latest schema version, before rudder-migrator starts introducing new tables.
jd.dsListLock.WithLock(func(l lock.LockToken) {
switch jd.ownerType {
case Write, ReadWrite:
jd.setupDatabaseTables(l, jd.clearAll)
}
err := jd.WithTx(func(tx *Tx) error {
// only one migration should run at a time and block all other processes from adding or removing tables
return jd.withDistributedLock(context.Background(), tx, "schema_migrate", func() error {
// Database schema migration should happen early, even before jobsdb is started,
// so that we can be sure that all the necessary tables are created and considered to be in
// the latest schema version, before rudder-migrator starts introducing new tables.
jd.dsListLock.WithLock(func(l lock.LockToken) {
writer := jd.ownerType == Write || jd.ownerType == ReadWrite
if writer && jd.clearAll {
jd.dropDatabaseTables(l)
}
templateData := func() map[string]interface{} {
// Important: if jobsdb type is acting as a writer then refreshDSList
// doesn't return the full list of datasets, only the rightmost two.
// But we need to run the schema migration against all datasets, no matter
// whether jobsdb is a writer or not.
datasets := getDSList(jd, jd.dbHandle, jd.tablePrefix)

datasetIndices := make([]string, 0)
for _, dataset := range datasets {
datasetIndices = append(datasetIndices, dataset.Index)
}

return map[string]interface{}{
"Prefix": jd.tablePrefix,
"Datasets": datasetIndices,
}
}()

if writer {
jd.setupDatabaseTables(templateData)
}

// Run changesets that should always run for both writer and reader jobsdbs.
//
// When running separate gw and processor instances we cannot control the order of execution
// and we cannot guarantee that after a gw migration completes, processor
// will not create new tables using the old schema.
//
// Changesets that run always can help in such cases, by bringing non-migrated tables into a usable state.
jd.runAlwaysChangesets(templateData)

// finally refresh the dataset list to make sure [datasetList] field is populated
jd.refreshDSList(l)
})
return nil
})
})
if err != nil {
panic(fmt.Errorf("failed to run schema migration for %s: %w", jd.tablePrefix, err))
}
}

func (jd *HandleT) workersAndAuxSetup() {
Expand Down Expand Up @@ -2695,47 +2737,51 @@ func (jd *HandleT) addNewDSLoop(ctx context.Context) {
var releaseDsListLock chan<- lock.LockToken
// start a transaction
err := jd.WithTx(func(tx *Tx) error {
// acquire a advisory transaction level blocking lock, which is released once the transaction ends.
sqlStatement := fmt.Sprintf(`SELECT pg_advisory_xact_lock(%d);`, advisoryLock)
_, err := tx.ExecContext(context.TODO(), sqlStatement)
if err != nil {
return fmt.Errorf("error while acquiring advisory lock %d: %w", advisoryLock, err)
}

// We acquire the list lock only after we have acquired the advisory lock.
// We will release the list lock after the transaction ends, that's why we need to use an async lock
dsListLock, releaseDsListLock, err = jd.dsListLock.AsyncLockWithCtx(ctx)
if err != nil {
return err
}
// refresh ds list
var dsList []dataSetT
var nextDSIdx string
// make sure we are operating on the latest version of the list
dsList = getDSList(jd, tx, jd.tablePrefix)
latestDS := dsList[len(dsList)-1]
full, err := jd.checkIfFullDSInTx(tx, latestDS)
if err != nil {
return fmt.Errorf("error while checking if DS is full: %w", err)
}
// checkIfFullDS is true for last DS in the list
if full {
if _, err = tx.Exec(fmt.Sprintf(`LOCK TABLE %q IN EXCLUSIVE MODE;`, latestDS.JobTable)); err != nil {
return fmt.Errorf("error locking table %s: %w", latestDS.JobTable, err)
}

nextDSIdx = jd.doComputeNewIdxForAppend(dsList)
jd.logger.Infof("[[ %s : addNewDSLoop ]]: NewDS", jd.tablePrefix)
if err = jd.addNewDSInTx(tx, dsListLock, dsList, newDataSet(jd.tablePrefix, nextDSIdx)); err != nil {
return fmt.Errorf("error adding new DS: %w", err)
}
return jd.withDistributedSharedLock(context.TODO(), tx, "schema_migrate", func() error { // cannot run while schema migration is running
return jd.withDistributedLock(context.TODO(), tx, "add_ds", func() error { // only one add_ds can run at a time
// acquire a advisory transaction level blocking lock, which is released once the transaction ends.
sqlStatement := fmt.Sprintf(`SELECT pg_advisory_xact_lock(%d);`, advisoryLock)
_, err := tx.ExecContext(context.TODO(), sqlStatement)
if err != nil {
return fmt.Errorf("error while acquiring advisory lock %d: %w", advisoryLock, err)
}

// previous DS should become read only
if err = setReadonlyDsInTx(tx, latestDS); err != nil {
return fmt.Errorf("error making dataset read only: %w", err)
}
}
return nil
// We acquire the list lock only after we have acquired the advisory lock.
// We will release the list lock after the transaction ends, that's why we need to use an async lock
dsListLock, releaseDsListLock, err = jd.dsListLock.AsyncLockWithCtx(ctx)
if err != nil {
return err
}
// refresh ds list
var dsList []dataSetT
var nextDSIdx string
// make sure we are operating on the latest version of the list
dsList = getDSList(jd, tx, jd.tablePrefix)
latestDS := dsList[len(dsList)-1]
full, err := jd.checkIfFullDSInTx(tx, latestDS)
if err != nil {
return fmt.Errorf("error while checking if DS is full: %w", err)
}
// checkIfFullDS is true for last DS in the list
if full {
if _, err = tx.Exec(fmt.Sprintf(`LOCK TABLE %q IN EXCLUSIVE MODE;`, latestDS.JobTable)); err != nil {
return fmt.Errorf("error locking table %s: %w", latestDS.JobTable, err)
}

nextDSIdx = jd.doComputeNewIdxForAppend(dsList)
jd.logger.Infof("[[ %s : addNewDSLoop ]]: NewDS", jd.tablePrefix)
if err = jd.addNewDSInTx(tx, dsListLock, dsList, newDataSet(jd.tablePrefix, nextDSIdx)); err != nil {
return fmt.Errorf("error adding new DS: %w", err)
}

// previous DS should become read only
if err = setReadonlyDsInTx(tx, latestDS); err != nil {
return fmt.Errorf("error making dataset read only: %w", err)
}
}
return nil
})
})
})
jd.assertError(err)

Expand Down Expand Up @@ -3683,3 +3729,21 @@ type smallDS struct {
ds dataSetT
recordsLeft int
}

func (jd *HandleT) withDistributedLock(ctx context.Context, tx *Tx, operation string, f func() error) error {
advisoryLock := jd.getAdvisoryLockForOperation(operation)
_, err := tx.ExecContext(ctx, fmt.Sprintf(`SELECT pg_advisory_xact_lock(%d);`, advisoryLock))
if err != nil {
return fmt.Errorf("error while acquiring advisory lock %d for operation %s: %w", advisoryLock, operation, err)
}
return f()
}

func (jd *HandleT) withDistributedSharedLock(ctx context.Context, tx *Tx, operation string, f func() error) error {
advisoryLock := jd.getAdvisoryLockForOperation(operation)
_, err := tx.ExecContext(ctx, fmt.Sprintf(`SELECT pg_advisory_xact_lock_shared(%d);`, advisoryLock))
if err != nil {
return fmt.Errorf("error while acquiring a shared advisory lock %d for operation %s: %w", advisoryLock, operation, err)
}
return f()
}
2 changes: 1 addition & 1 deletion jobsdb/jobsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,7 @@ func TestSortDnumList(t *testing.T) {

func Test_GetAdvisoryLockForOperation_Unique(t *testing.T) {
calculated := map[int64]string{}
for _, operation := range []string{"add_ds", "migrate_ds"} {
for _, operation := range []string{"add_ds", "migrate_ds", "schema_migrate"} {
for _, prefix := range []string{"gw", "rt", "batch_rt", "proc_error"} {
h := &HandleT{tablePrefix: prefix}
key := fmt.Sprintf("%s_%s", prefix, operation)
Expand Down
118 changes: 60 additions & 58 deletions jobsdb/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,80 +67,82 @@ func (jd *HandleT) doMigrateDS(ctx context.Context) error {
var l lock.LockToken
var lockChan chan<- lock.LockToken
err := jd.WithTx(func(tx *Tx) error {
// Take the lock and run actual migration
if !jd.dsMigrationLock.TryLockWithCtx(ctx) {
return fmt.Errorf("failed to acquire lock: %w", ctx.Err())
}
defer jd.dsMigrationLock.Unlock()
// repeat the check after the dsMigrationLock is acquired to get correct pending jobs count.
// the pending jobs count cannot change after the dsMigrationLock is acquired
if migrateFrom, pendingJobsCount, insertBeforeDS = jd.getMigrationList(dsList); len(migrateFrom) == 0 {
return nil
}

if pendingJobsCount > 0 { // migrate incomplete jobs
var destination dataSetT
err := jd.dsListLock.WithLockInCtx(ctx, func(l lock.LockToken) error {
destination = newDataSet(jd.tablePrefix, jd.computeNewIdxForIntraNodeMigration(l, insertBeforeDS))
return jd.withDistributedSharedLock(ctx, tx, "schema_migrate", func() error { // cannot run while schema migration is running
// Take the lock and run actual migration
if !jd.dsMigrationLock.TryLockWithCtx(ctx) {
return fmt.Errorf("failed to acquire lock: %w", ctx.Err())
}
defer jd.dsMigrationLock.Unlock()
// repeat the check after the dsMigrationLock is acquired to get correct pending jobs count.
// the pending jobs count cannot change after the dsMigrationLock is acquired
if migrateFrom, pendingJobsCount, insertBeforeDS = jd.getMigrationList(dsList); len(migrateFrom) == 0 {
return nil
})
if err != nil {
return err
}

jd.logger.Infof("[[ migrateDSLoop ]]: Migrate from: %v", migrateFrom)
jd.logger.Infof("[[ migrateDSLoop ]]: To: %v", destination)
jd.logger.Infof("[[ migrateDSLoop ]]: Next: %v", insertBeforeDS)
if pendingJobsCount > 0 { // migrate incomplete jobs
var destination dataSetT
err := jd.dsListLock.WithLockInCtx(ctx, func(l lock.LockToken) error {
destination = newDataSet(jd.tablePrefix, jd.computeNewIdxForIntraNodeMigration(l, insertBeforeDS))
return nil
})
if err != nil {
return err
}

jd.logger.Infof("[[ migrateDSLoop ]]: Migrate from: %v", migrateFrom)
jd.logger.Infof("[[ migrateDSLoop ]]: To: %v", destination)
jd.logger.Infof("[[ migrateDSLoop ]]: Next: %v", insertBeforeDS)

opPayload, err := json.Marshal(&journalOpPayloadT{From: migrateFrom, To: destination})
if err != nil {
return err
}
opID, err := jd.JournalMarkStartInTx(tx, migrateCopyOperation, opPayload)
if err != nil {
return err
}

err = jd.addDSInTx(tx, destination)
if err != nil {
return err
}

totalJobsMigrated := 0
var noJobsMigrated int
for _, source := range migrateFrom {
jd.logger.Infof("[[ migrateDSLoop ]]: Migrate: %v to: %v", source, destination)
noJobsMigrated, err = jd.migrateJobsInTx(ctx, tx, source, destination)
if err != nil {
return err
}
totalJobsMigrated += noJobsMigrated
}
err = jd.journalMarkDoneInTx(tx, opID)
if err != nil {
return err
}
jd.logger.Infof("[[ migrateDSLoop ]]: Total migrated %d jobs", totalJobsMigrated)
}

opPayload, err := json.Marshal(&journalOpPayloadT{From: migrateFrom, To: destination})
opPayload, err := json.Marshal(&journalOpPayloadT{From: migrateFrom})
if err != nil {
return err
}
opID, err := jd.JournalMarkStartInTx(tx, migrateCopyOperation, opPayload)
opID, err := jd.JournalMarkStartInTx(tx, postMigrateDSOperation, opPayload)
if err != nil {
return err
}

err = jd.addDSInTx(tx, destination)
// acquire an async lock, as this needs to be released after the transaction commits
l, lockChan, err = jd.dsListLock.AsyncLockWithCtx(ctx)
if err != nil {
return err
}

totalJobsMigrated := 0
var noJobsMigrated int
for _, source := range migrateFrom {
jd.logger.Infof("[[ migrateDSLoop ]]: Migrate: %v to: %v", source, destination)
noJobsMigrated, err = jd.migrateJobsInTx(ctx, tx, source, destination)
if err != nil {
return err
}
totalJobsMigrated += noJobsMigrated
}
err = jd.journalMarkDoneInTx(tx, opID)
err = jd.postMigrateHandleDS(tx, migrateFrom)
if err != nil {
return err
}
jd.logger.Infof("[[ migrateDSLoop ]]: Total migrated %d jobs", totalJobsMigrated)
}

opPayload, err := json.Marshal(&journalOpPayloadT{From: migrateFrom})
if err != nil {
return err
}
opID, err := jd.JournalMarkStartInTx(tx, postMigrateDSOperation, opPayload)
if err != nil {
return err
}
// acquire an async lock, as this needs to be released after the transaction commits
l, lockChan, err = jd.dsListLock.AsyncLockWithCtx(ctx)
if err != nil {
return err
}
err = jd.postMigrateHandleDS(tx, migrateFrom)
if err != nil {
return err
}
return jd.journalMarkDoneInTx(tx, opID)
return jd.journalMarkDoneInTx(tx, opID)
})
})
if l != nil {
if err == nil {
Expand Down
Loading

0 comments on commit 463533e

Please sign in to comment.