Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(jobsdb): add support for schema migration changesets to run always #2746

Merged
merged 4 commits into from
Dec 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 112 additions & 48 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,15 +835,57 @@ func (jd *HandleT) init() {
}
jd.workersAndAuxSetup()

// Database schema migration should happen early, even before jobsdb is started,
// so that we can be sure that all the necessary tables are created and considered to be in
// the latest schema version, before rudder-migrator starts introducing new tables.
jd.dsListLock.WithLock(func(l lock.LockToken) {
switch jd.ownerType {
case Write, ReadWrite:
jd.setupDatabaseTables(l, jd.clearAll)
}
err := jd.WithTx(func(tx *Tx) error {
// only one migration should run at a time and block all other processes from adding or removing tables
return jd.withDistributedLock(context.Background(), tx, "schema_migrate", func() error {
// Database schema migration should happen early, even before jobsdb is started,
// so that we can be sure that all the necessary tables are created and considered to be in
// the latest schema version, before rudder-migrator starts introducing new tables.
jd.dsListLock.WithLock(func(l lock.LockToken) {
writer := jd.ownerType == Write || jd.ownerType == ReadWrite
if writer && jd.clearAll {
jd.dropDatabaseTables(l)
}
templateData := func() map[string]interface{} {
// Important: if jobsdb type is acting as a writer then refreshDSList
// doesn't return the full list of datasets, only the rightmost two.
// But we need to run the schema migration against all datasets, no matter
// whether jobsdb is a writer or not.
datasets := getDSList(jd, jd.dbHandle, jd.tablePrefix)

datasetIndices := make([]string, 0)
for _, dataset := range datasets {
datasetIndices = append(datasetIndices, dataset.Index)
}

return map[string]interface{}{
"Prefix": jd.tablePrefix,
"Datasets": datasetIndices,
}
}()

if writer {
jd.setupDatabaseTables(templateData)
}

// Run changesets that should always run for both writer and reader jobsdbs.
//
// When running separate gw and processor instances we cannot control the order of execution
// and we cannot guarantee that after a gw migration completes, processor
// will not create new tables using the old schema.
//
// Changesets that run always can help in such cases, by bringing non-migrated tables into a usable state.
jd.runAlwaysChangesets(templateData)

// finally refresh the dataset list to make sure [datasetList] field is populated
jd.refreshDSList(l)
})
return nil
})
})
if err != nil {
panic(fmt.Errorf("failed to run schema migration for %s: %w", jd.tablePrefix, err))
}
}

func (jd *HandleT) workersAndAuxSetup() {
Expand Down Expand Up @@ -2695,47 +2737,51 @@ func (jd *HandleT) addNewDSLoop(ctx context.Context) {
var releaseDsListLock chan<- lock.LockToken
// start a transaction
err := jd.WithTx(func(tx *Tx) error {
// acquire a advisory transaction level blocking lock, which is released once the transaction ends.
sqlStatement := fmt.Sprintf(`SELECT pg_advisory_xact_lock(%d);`, advisoryLock)
_, err := tx.ExecContext(context.TODO(), sqlStatement)
if err != nil {
return fmt.Errorf("error while acquiring advisory lock %d: %w", advisoryLock, err)
}

// We acquire the list lock only after we have acquired the advisory lock.
// We will release the list lock after the transaction ends, that's why we need to use an async lock
dsListLock, releaseDsListLock, err = jd.dsListLock.AsyncLockWithCtx(ctx)
if err != nil {
return err
}
// refresh ds list
var dsList []dataSetT
var nextDSIdx string
// make sure we are operating on the latest version of the list
dsList = getDSList(jd, tx, jd.tablePrefix)
latestDS := dsList[len(dsList)-1]
full, err := jd.checkIfFullDSInTx(tx, latestDS)
if err != nil {
return fmt.Errorf("error while checking if DS is full: %w", err)
}
// checkIfFullDS is true for last DS in the list
if full {
if _, err = tx.Exec(fmt.Sprintf(`LOCK TABLE %q IN EXCLUSIVE MODE;`, latestDS.JobTable)); err != nil {
return fmt.Errorf("error locking table %s: %w", latestDS.JobTable, err)
}

nextDSIdx = jd.doComputeNewIdxForAppend(dsList)
jd.logger.Infof("[[ %s : addNewDSLoop ]]: NewDS", jd.tablePrefix)
if err = jd.addNewDSInTx(tx, dsListLock, dsList, newDataSet(jd.tablePrefix, nextDSIdx)); err != nil {
return fmt.Errorf("error adding new DS: %w", err)
}
return jd.withDistributedSharedLock(context.TODO(), tx, "schema_migrate", func() error { // cannot run while schema migration is running
return jd.withDistributedLock(context.TODO(), tx, "add_ds", func() error { // only one add_ds can run at a time
// acquire a advisory transaction level blocking lock, which is released once the transaction ends.
sqlStatement := fmt.Sprintf(`SELECT pg_advisory_xact_lock(%d);`, advisoryLock)
_, err := tx.ExecContext(context.TODO(), sqlStatement)
if err != nil {
return fmt.Errorf("error while acquiring advisory lock %d: %w", advisoryLock, err)
}

// previous DS should become read only
if err = setReadonlyDsInTx(tx, latestDS); err != nil {
return fmt.Errorf("error making dataset read only: %w", err)
}
}
return nil
// We acquire the list lock only after we have acquired the advisory lock.
// We will release the list lock after the transaction ends, that's why we need to use an async lock
dsListLock, releaseDsListLock, err = jd.dsListLock.AsyncLockWithCtx(ctx)
if err != nil {
return err
}
// refresh ds list
var dsList []dataSetT
var nextDSIdx string
// make sure we are operating on the latest version of the list
dsList = getDSList(jd, tx, jd.tablePrefix)
latestDS := dsList[len(dsList)-1]
full, err := jd.checkIfFullDSInTx(tx, latestDS)
if err != nil {
return fmt.Errorf("error while checking if DS is full: %w", err)
}
// checkIfFullDS is true for last DS in the list
if full {
if _, err = tx.Exec(fmt.Sprintf(`LOCK TABLE %q IN EXCLUSIVE MODE;`, latestDS.JobTable)); err != nil {
return fmt.Errorf("error locking table %s: %w", latestDS.JobTable, err)
}

nextDSIdx = jd.doComputeNewIdxForAppend(dsList)
jd.logger.Infof("[[ %s : addNewDSLoop ]]: NewDS", jd.tablePrefix)
if err = jd.addNewDSInTx(tx, dsListLock, dsList, newDataSet(jd.tablePrefix, nextDSIdx)); err != nil {
return fmt.Errorf("error adding new DS: %w", err)
}

// previous DS should become read only
if err = setReadonlyDsInTx(tx, latestDS); err != nil {
return fmt.Errorf("error making dataset read only: %w", err)
}
}
return nil
})
})
})
jd.assertError(err)

Expand Down Expand Up @@ -3683,3 +3729,21 @@ type smallDS struct {
ds dataSetT
recordsLeft int
}

func (jd *HandleT) withDistributedLock(ctx context.Context, tx *Tx, operation string, f func() error) error {
advisoryLock := jd.getAdvisoryLockForOperation(operation)
_, err := tx.ExecContext(ctx, fmt.Sprintf(`SELECT pg_advisory_xact_lock(%d);`, advisoryLock))
if err != nil {
return fmt.Errorf("error while acquiring advisory lock %d for operation %s: %w", advisoryLock, operation, err)
}
return f()
}

func (jd *HandleT) withDistributedSharedLock(ctx context.Context, tx *Tx, operation string, f func() error) error {
advisoryLock := jd.getAdvisoryLockForOperation(operation)
_, err := tx.ExecContext(ctx, fmt.Sprintf(`SELECT pg_advisory_xact_lock_shared(%d);`, advisoryLock))
if err != nil {
return fmt.Errorf("error while acquiring a shared advisory lock %d for operation %s: %w", advisoryLock, operation, err)
}
return f()
}
2 changes: 1 addition & 1 deletion jobsdb/jobsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,7 @@ func TestSortDnumList(t *testing.T) {

func Test_GetAdvisoryLockForOperation_Unique(t *testing.T) {
calculated := map[int64]string{}
for _, operation := range []string{"add_ds", "migrate_ds"} {
for _, operation := range []string{"add_ds", "migrate_ds", "schema_migrate"} {
for _, prefix := range []string{"gw", "rt", "batch_rt", "proc_error"} {
h := &HandleT{tablePrefix: prefix}
key := fmt.Sprintf("%s_%s", prefix, operation)
Expand Down
118 changes: 60 additions & 58 deletions jobsdb/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,80 +67,82 @@ func (jd *HandleT) doMigrateDS(ctx context.Context) error {
var l lock.LockToken
var lockChan chan<- lock.LockToken
err := jd.WithTx(func(tx *Tx) error {
// Take the lock and run actual migration
if !jd.dsMigrationLock.TryLockWithCtx(ctx) {
return fmt.Errorf("failed to acquire lock: %w", ctx.Err())
}
defer jd.dsMigrationLock.Unlock()
// repeat the check after the dsMigrationLock is acquired to get correct pending jobs count.
// the pending jobs count cannot change after the dsMigrationLock is acquired
if migrateFrom, pendingJobsCount, insertBeforeDS = jd.getMigrationList(dsList); len(migrateFrom) == 0 {
return nil
}

if pendingJobsCount > 0 { // migrate incomplete jobs
var destination dataSetT
err := jd.dsListLock.WithLockInCtx(ctx, func(l lock.LockToken) error {
destination = newDataSet(jd.tablePrefix, jd.computeNewIdxForIntraNodeMigration(l, insertBeforeDS))
return jd.withDistributedSharedLock(ctx, tx, "schema_migrate", func() error { // cannot run while schema migration is running
// Take the lock and run actual migration
if !jd.dsMigrationLock.TryLockWithCtx(ctx) {
return fmt.Errorf("failed to acquire lock: %w", ctx.Err())
}
defer jd.dsMigrationLock.Unlock()
// repeat the check after the dsMigrationLock is acquired to get correct pending jobs count.
// the pending jobs count cannot change after the dsMigrationLock is acquired
if migrateFrom, pendingJobsCount, insertBeforeDS = jd.getMigrationList(dsList); len(migrateFrom) == 0 {
return nil
})
if err != nil {
return err
}

jd.logger.Infof("[[ migrateDSLoop ]]: Migrate from: %v", migrateFrom)
jd.logger.Infof("[[ migrateDSLoop ]]: To: %v", destination)
jd.logger.Infof("[[ migrateDSLoop ]]: Next: %v", insertBeforeDS)
if pendingJobsCount > 0 { // migrate incomplete jobs
var destination dataSetT
err := jd.dsListLock.WithLockInCtx(ctx, func(l lock.LockToken) error {
destination = newDataSet(jd.tablePrefix, jd.computeNewIdxForIntraNodeMigration(l, insertBeforeDS))
return nil
})
if err != nil {
return err
}

jd.logger.Infof("[[ migrateDSLoop ]]: Migrate from: %v", migrateFrom)
jd.logger.Infof("[[ migrateDSLoop ]]: To: %v", destination)
jd.logger.Infof("[[ migrateDSLoop ]]: Next: %v", insertBeforeDS)

opPayload, err := json.Marshal(&journalOpPayloadT{From: migrateFrom, To: destination})
if err != nil {
return err
}
opID, err := jd.JournalMarkStartInTx(tx, migrateCopyOperation, opPayload)
if err != nil {
return err
}

err = jd.addDSInTx(tx, destination)
if err != nil {
return err
}

totalJobsMigrated := 0
var noJobsMigrated int
for _, source := range migrateFrom {
jd.logger.Infof("[[ migrateDSLoop ]]: Migrate: %v to: %v", source, destination)
noJobsMigrated, err = jd.migrateJobsInTx(ctx, tx, source, destination)
if err != nil {
return err
}
totalJobsMigrated += noJobsMigrated
}
err = jd.journalMarkDoneInTx(tx, opID)
if err != nil {
return err
}
jd.logger.Infof("[[ migrateDSLoop ]]: Total migrated %d jobs", totalJobsMigrated)
}

opPayload, err := json.Marshal(&journalOpPayloadT{From: migrateFrom, To: destination})
opPayload, err := json.Marshal(&journalOpPayloadT{From: migrateFrom})
if err != nil {
return err
}
opID, err := jd.JournalMarkStartInTx(tx, migrateCopyOperation, opPayload)
opID, err := jd.JournalMarkStartInTx(tx, postMigrateDSOperation, opPayload)
if err != nil {
return err
}

err = jd.addDSInTx(tx, destination)
// acquire an async lock, as this needs to be released after the transaction commits
l, lockChan, err = jd.dsListLock.AsyncLockWithCtx(ctx)
if err != nil {
return err
}

totalJobsMigrated := 0
var noJobsMigrated int
for _, source := range migrateFrom {
jd.logger.Infof("[[ migrateDSLoop ]]: Migrate: %v to: %v", source, destination)
noJobsMigrated, err = jd.migrateJobsInTx(ctx, tx, source, destination)
if err != nil {
return err
}
totalJobsMigrated += noJobsMigrated
}
err = jd.journalMarkDoneInTx(tx, opID)
err = jd.postMigrateHandleDS(tx, migrateFrom)
if err != nil {
return err
}
jd.logger.Infof("[[ migrateDSLoop ]]: Total migrated %d jobs", totalJobsMigrated)
}

opPayload, err := json.Marshal(&journalOpPayloadT{From: migrateFrom})
if err != nil {
return err
}
opID, err := jd.JournalMarkStartInTx(tx, postMigrateDSOperation, opPayload)
if err != nil {
return err
}
// acquire an async lock, as this needs to be released after the transaction commits
l, lockChan, err = jd.dsListLock.AsyncLockWithCtx(ctx)
if err != nil {
return err
}
err = jd.postMigrateHandleDS(tx, migrateFrom)
if err != nil {
return err
}
return jd.journalMarkDoneInTx(tx, opID)
return jd.journalMarkDoneInTx(tx, opID)
})
})
if l != nil {
if err == nil {
Expand Down
Loading