From 4cc8f7475032f7b0e284a4ffca5f3b5897b9bd3c Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sun, 17 May 2020 19:56:50 -0400 Subject: [PATCH 1/6] Cancel any row count queries before attempting to cut over Closes #830. Switches from using `QueryRow` to `QueryRowContext`, and stores a context.CancelFunc in the migration context, which is called to halt any running row count query before beginning the cut over. --- go/base/context.go | 1 + go/logic/inspect.go | 16 ++++++++++++++-- go/logic/migrator.go | 17 +++++++++++++---- 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 52d02dd34..46cca33a2 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -80,6 +80,7 @@ type MigrationContext struct { OriginalTableName string AlterStatement string + CountTableRowsCancelFunc func() CountTableRows bool ConcurrentCountTableRows bool AllowedRunningOnMaster bool diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 31184b0c2..e27284673 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -6,6 +6,7 @@ package logic import ( + "context" gosql "database/sql" "fmt" "reflect" @@ -521,7 +522,7 @@ func (this *Inspector) estimateTableRowsViaExplain() error { } // CountTableRows counts exact number of rows on the original table -func (this *Inspector) CountTableRows() error { +func (this *Inspector) CountTableRows(ctx context.Context) error { atomic.StoreInt64(&this.migrationContext.CountingRowsFlag, 1) defer atomic.StoreInt64(&this.migrationContext.CountingRowsFlag, 0) @@ -529,9 +530,20 @@ func (this *Inspector) CountTableRows() error { query := fmt.Sprintf(`select /* gh-ost */ count(*) as rows from %s.%s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) var rowsEstimate int64 - if err := this.db.QueryRow(query).Scan(&rowsEstimate); err != nil { + if err := this.db.QueryRowContext(ctx, query).Scan(&rowsEstimate); err != nil { return err } + select { + case <-ctx.Done(): + log.Infof("exact row count cancelled (%s), likely because I'm about to cut over", ctx.Err()) + return nil + default: + // row count query finished. nil out the cancel func, so the main migration thread + // doesn't bother calling it after row copy is done. + this.migrationContext.CountTableRowsCancelFunc = nil + break + } + atomic.StoreInt64(&this.migrationContext.RowsEstimate, rowsEstimate) this.migrationContext.UsedRowsEstimateMethod = base.CountRowsEstimate diff --git a/go/logic/migrator.go b/go/logic/migrator.go index b1a238fda..c7e99e554 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -6,6 +6,7 @@ package logic import ( + "context" "fmt" "io" "math" @@ -281,8 +282,8 @@ func (this *Migrator) countTableRows() (err error) { return nil } - countRowsFunc := func() error { - if err := this.inspector.CountTableRows(); err != nil { + countRowsFunc := func(ctx context.Context) error { + if err := this.inspector.CountTableRows(ctx); err != nil { return err } if err := this.hooksExecutor.onRowCountComplete(); err != nil { @@ -292,12 +293,16 @@ func (this *Migrator) countTableRows() (err error) { } if this.migrationContext.ConcurrentCountTableRows { + // store a cancel func so we can stop this query before a cut over + rowCountContext, rowCountCancel := context.WithCancel(context.Background()) + this.migrationContext.CountTableRowsCancelFunc = rowCountCancel + log.Infof("As instructed, counting rows in the background; meanwhile I will use an estimated count, and will update it later on") - go countRowsFunc() + go countRowsFunc(rowCountContext) // and we ignore errors, because this turns to be a background job return nil } - return countRowsFunc() + return countRowsFunc(context.Background()) } func (this *Migrator) createFlagFiles() (err error) { @@ -401,6 +406,10 @@ func (this *Migrator) Migrate() (err error) { } this.printStatus(ForcePrintStatusRule) + if this.migrationContext.CountTableRowsCancelFunc != nil { + log.Info("stopping query for exact row count, because that can accidentally lock out the cut over") + this.migrationContext.CountTableRowsCancelFunc() + } if err := this.hooksExecutor.onBeforeCutOver(); err != nil { return err } From 687e1d0d4053764af571a5d1387c228356677857 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sun, 28 Jun 2020 10:53:44 -0400 Subject: [PATCH 2/6] Make it threadsafe --- go/base/context.go | 31 +++++++++++++++++++++++++++++++ go/logic/inspect.go | 2 +- 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/go/base/context.go b/go/base/context.go index 46cca33a2..685771297 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -80,6 +80,7 @@ type MigrationContext struct { OriginalTableName string AlterStatement string + countMutex sync.Mutex CountTableRowsCancelFunc func() CountTableRows bool ConcurrentCountTableRows bool @@ -397,6 +398,36 @@ func (this *MigrationContext) IsTransactionalTable() bool { return false } +// SetCountTableRowsCancelFunc sets the cancel function for the CountTableRows query context +func (this *MigrationContext) SetCountTableRowsCancelFunc(f func()) { + this.countMutex.Lock() + defer this.countMutex.Unlock() + + this.CountTableRowsCancelFunc = f +} + +// IsCountingTableRows returns true if the migration has a table count query running +func (this *MigrationContext) IsCountingTableRows() bool { + this.countMutex.Lock() + defer this.countMutex.Unlock() + + return this.CountTableRowsCancelFunc != nil +} + +// CancelTableRowsCount cancels the CountTableRows query context. It is safe to +// call function even when IsCountingTableRows is false. +func (this *MigrationContext) CancelTableRowsCount() { + this.countMutex.Lock() + defer this.countMutex.Unlock() + + if this.CountTableRowsCancelFunc == nil { + return + } + + this.CountTableRowsCancelFunc() + this.CountTableRowsCancelFunc = nil +} + // ElapsedTime returns time since very beginning of the process func (this *MigrationContext) ElapsedTime() time.Duration { return time.Since(this.StartTime) diff --git a/go/logic/inspect.go b/go/logic/inspect.go index e27284673..edb5cc120 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -540,7 +540,7 @@ func (this *Inspector) CountTableRows(ctx context.Context) error { default: // row count query finished. nil out the cancel func, so the main migration thread // doesn't bother calling it after row copy is done. - this.migrationContext.CountTableRowsCancelFunc = nil + this.migrationContext.SetCountTableRowsCancelFunc(nil) break } From 96281cd60d4765127829cd9709f9ec3c8d7e12f6 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sun, 28 Jun 2020 13:54:32 -0400 Subject: [PATCH 3/6] Kill the count query on the database side as well * Explicitly grab a connection to run the count, store its connection id * When the query context is canceled, run a `KILL QUERY ?` on that connection id --- go/logic/inspect.go | 43 +++++++++++++++++++++++++++++++------------ 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/go/logic/inspect.go b/go/logic/inspect.go index edb5cc120..b4c5b2c90 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -521,6 +521,14 @@ func (this *Inspector) estimateTableRowsViaExplain() error { return nil } +// Kill kills a query for connectionID. +// - @amason: this should go somewhere _other_ than `logic`, but I couldn't decide +// between `base`, `sql`, or `mysql`. +func Kill(db *gosql.DB, connectionID string) error { + _, err := db.Exec(`KILL QUERY %s`, connectionID) + return err +} + // CountTableRows counts exact number of rows on the original table func (this *Inspector) CountTableRows(ctx context.Context) error { atomic.StoreInt64(&this.migrationContext.CountingRowsFlag, 1) @@ -528,22 +536,33 @@ func (this *Inspector) CountTableRows(ctx context.Context) error { log.Infof("As instructed, I'm issuing a SELECT COUNT(*) on the table. This may take a while") - query := fmt.Sprintf(`select /* gh-ost */ count(*) as rows from %s.%s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) - var rowsEstimate int64 - if err := this.db.QueryRowContext(ctx, query).Scan(&rowsEstimate); err != nil { + conn, err := this.db.Conn(ctx) + if err != nil { return err } - select { - case <-ctx.Done(): - log.Infof("exact row count cancelled (%s), likely because I'm about to cut over", ctx.Err()) - return nil - default: - // row count query finished. nil out the cancel func, so the main migration thread - // doesn't bother calling it after row copy is done. - this.migrationContext.SetCountTableRowsCancelFunc(nil) - break + defer conn.Close() + + var connectionID string + if err := conn.QueryRowContext(ctx, `SELECT /* gh-ost */ CONNECTION_ID()`).Scan(&connectionID); err != nil { + return err + } + + query := fmt.Sprintf(`select /* gh-ost */ count(*) as rows from %s.%s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) + var rowsEstimate int64 + if err := conn.QueryRowContext(ctx, query).Scan(&rowsEstimate); err != nil { + switch err { + case context.Canceled, context.DeadlineExceeded: + log.Infof("exact row count cancelled (%s), likely because I'm about to cut over. I'm going to kill that query.", ctx.Err()) + return Kill(this.db, connectionID) + default: + return err + } } + // row count query finished. nil out the cancel func, so the main migration thread + // doesn't bother calling it after row copy is done. + this.migrationContext.SetCountTableRowsCancelFunc(nil) + atomic.StoreInt64(&this.migrationContext.RowsEstimate, rowsEstimate) this.migrationContext.UsedRowsEstimateMethod = base.CountRowsEstimate From c1181aa84f8e4804997dad1e1572850939c4be1b Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sun, 28 Jun 2020 14:31:41 -0400 Subject: [PATCH 4/6] Rewrite these to use the threadsafe functions, stop exporting the cancel func --- go/base/context.go | 12 ++++++------ go/logic/migrator.go | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 685771297..d42582973 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -81,7 +81,7 @@ type MigrationContext struct { AlterStatement string countMutex sync.Mutex - CountTableRowsCancelFunc func() + countTableRowsCancelFunc func() CountTableRows bool ConcurrentCountTableRows bool AllowedRunningOnMaster bool @@ -403,7 +403,7 @@ func (this *MigrationContext) SetCountTableRowsCancelFunc(f func()) { this.countMutex.Lock() defer this.countMutex.Unlock() - this.CountTableRowsCancelFunc = f + this.countTableRowsCancelFunc = f } // IsCountingTableRows returns true if the migration has a table count query running @@ -411,7 +411,7 @@ func (this *MigrationContext) IsCountingTableRows() bool { this.countMutex.Lock() defer this.countMutex.Unlock() - return this.CountTableRowsCancelFunc != nil + return this.countTableRowsCancelFunc != nil } // CancelTableRowsCount cancels the CountTableRows query context. It is safe to @@ -420,12 +420,12 @@ func (this *MigrationContext) CancelTableRowsCount() { this.countMutex.Lock() defer this.countMutex.Unlock() - if this.CountTableRowsCancelFunc == nil { + if this.countTableRowsCancelFunc == nil { return } - this.CountTableRowsCancelFunc() - this.CountTableRowsCancelFunc = nil + this.countTableRowsCancelFunc() + this.countTableRowsCancelFunc = nil } // ElapsedTime returns time since very beginning of the process diff --git a/go/logic/migrator.go b/go/logic/migrator.go index c7e99e554..3015c82d1 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -295,7 +295,7 @@ func (this *Migrator) countTableRows() (err error) { if this.migrationContext.ConcurrentCountTableRows { // store a cancel func so we can stop this query before a cut over rowCountContext, rowCountCancel := context.WithCancel(context.Background()) - this.migrationContext.CountTableRowsCancelFunc = rowCountCancel + this.migrationContext.SetCountTableRowsCancelFunc(rowCountCancel) log.Infof("As instructed, counting rows in the background; meanwhile I will use an estimated count, and will update it later on") go countRowsFunc(rowCountContext) @@ -406,9 +406,9 @@ func (this *Migrator) Migrate() (err error) { } this.printStatus(ForcePrintStatusRule) - if this.migrationContext.CountTableRowsCancelFunc != nil { + if this.migrationContext.IsCountingTableRows() { log.Info("stopping query for exact row count, because that can accidentally lock out the cut over") - this.migrationContext.CountTableRowsCancelFunc() + this.migrationContext.CancelTableRowsCount() } if err := this.hooksExecutor.onBeforeCutOver(); err != nil { return err From 2844e9589c1e16928488640345738fae98e9c3fd Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Sun, 5 Jun 2022 20:24:59 +0200 Subject: [PATCH 5/6] Update logger --- go/logic/inspect.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 074f52540..becda1888 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -564,7 +564,7 @@ func (this *Inspector) CountTableRows(ctx context.Context) error { if err := conn.QueryRowContext(ctx, query).Scan(&rowsEstimate); err != nil { switch err { case context.Canceled, context.DeadlineExceeded: - log.Infof("exact row count cancelled (%s), likely because I'm about to cut over. I'm going to kill that query.", ctx.Err()) + this.migrationContext.Log.Infof("exact row count cancelled (%s), likely because I'm about to cut over. I'm going to kill that query.", ctx.Err()) return Kill(this.db, connectionID) default: return err From 9a657ea4f161e2826feb178d859c1f5ad7dc8745 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Sun, 5 Jun 2022 20:25:55 +0200 Subject: [PATCH 6/6] Update logger --- go/logic/migrator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index b1609eb08..f6cf6f936 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -417,7 +417,7 @@ func (this *Migrator) Migrate() (err error) { this.printStatus(ForcePrintStatusRule) if this.migrationContext.IsCountingTableRows() { - log.Info("stopping query for exact row count, because that can accidentally lock out the cut over") + this.migrationContext.Log.Info("stopping query for exact row count, because that can accidentally lock out the cut over") this.migrationContext.CancelTableRowsCount() } if err := this.hooksExecutor.onBeforeCutOver(); err != nil {