diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 3d33d3366c6..28d2736869a 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -834,6 +834,16 @@ func (jd *HandleT) init() { jd.dbHandle = sqlDB } jd.workersAndAuxSetup() + + // Database schema migration should happen early, even before jobsdb is started, + // so that we can be sure that all the necessary tables are created and considered to be in + // the latest schema version, before rudder-migrator starts introducing new tables. + jd.dsListLock.WithLock(func(l lock.LockToken) { + switch jd.ownerType { + case Write, ReadWrite: + jd.setupDatabaseTables(l, jd.clearAll) + } + }) } func (jd *HandleT) workersAndAuxSetup() { @@ -898,24 +908,19 @@ func (jd *HandleT) Start() error { jd.backgroundGroup = g if !jd.skipSetupDBSetup { - jd.setUpForOwnerType(ctx, jd.ownerType, jd.clearAll) - - // Avoid clearing the database, if .Start() is called again. - jd.clearAll = false + jd.setUpForOwnerType(ctx, jd.ownerType) } return nil } -func (jd *HandleT) setUpForOwnerType(ctx context.Context, ownerType OwnerType, clearAll bool) { +func (jd *HandleT) setUpForOwnerType(ctx context.Context, ownerType OwnerType) { jd.dsListLock.WithLock(func(l lock.LockToken) { switch ownerType { case Read: jd.readerSetup(ctx, l) case Write: - jd.setupDatabaseTables(l, clearAll) jd.writerSetup(ctx, l) case ReadWrite: - jd.setupDatabaseTables(l, clearAll) jd.readerWriterSetup(ctx, l) } })