diff --git a/doc/command-line-flags.md b/doc/command-line-flags.md index 7b5efd9fb..c706a4fe4 100644 --- a/doc/command-line-flags.md +++ b/doc/command-line-flags.md @@ -64,6 +64,15 @@ It is not reliable to parse the `ALTER` statement to determine if it is instant ### binlogsyncer-max-reconnect-attempts `--binlogsyncer-max-reconnect-attempts=0`, the maximum number of attempts to re-establish a broken inspector connection for sync binlog. `0` or `negative number` means infinite retry, default `0` +### checkpoint + +`--checkpoint` enables periodic checkpoints of the gh-ost's state so that gh-ost can resume a migration from the checkpoint with `--resume`. Checkpoints are written to a separate table named `_${original_table_name}_ghk`. It is recommended to use with `--gtid` for checkpoints. +See also: [`resuming-migrations`](resume.md) + +### checkpoint-seconds + +`--checkpoint-seconds` specifies the seconds between checkpoints. Default is 300. + ### conf `--conf=/path/to/my.cnf`: file where credentials are specified. Should be in (or contain) the following format: @@ -226,6 +235,11 @@ Optionally involve the process ID, for example: `--replica-server-id=$((10000000 It's on you to choose a number that does not collide with another `gh-ost` or another running replica. See also: [`concurrent-migrations`](cheatsheet.md#concurrent-migrations) on the cheatsheet. +### resume + +`--resume` attempts to resume a migration that was previously interrupted from the last checkpoint. The first `gh-ost` invocation must run with `--checkpoint` and have successfully written a checkpoint in order for `--resume` to work. +See also: [`resuming-migrations`](resume.md) + ### serve-socket-file Defaults to an auto-determined and advertised upon startup file. Defines Unix socket file to serve on. diff --git a/doc/resume.md b/doc/resume.md new file mode 100644 index 000000000..46d6ac909 --- /dev/null +++ b/doc/resume.md @@ -0,0 +1,42 @@ +# Resuming Migrations + +`gh-ost` can attempt to resume an interrupted migration from a checkpoint if the following conditions are met: +- The first `gh-ost` process was invoked with `--checkpoint` +- The first `gh-ost` process had at least one successful checkpoint +- The binlogs from the last checkpoint's binlog coordinates still exist on the replica gh-ost is inspecting (specified by `--host`) + +To resume, invoke `gh-ost` again with the same arguments with the `--resume` flag. + +> [!WARNING] +> It is recommended use `--checkpoint` with `--gtid` enabled so that checkpoint binlog coordinates store GTID sets rather than file positions. In that case, `gh-ost` can resume using a different replica than it originally attached to. + +## Example +The migration starts with a `gh-ost` invocation such as: +```shell +gh-ost \ +--chunk-size=100 \ +--host=replica1.company.com \ +--database="mydb" \ +--table="mytable" \ +--alter="add column mycol varchar(20)" +--gtid \ +--checkpoint \ +--checkpoint-seconds=60 \ +--execute +``` + +In this example `gh-ost` writes a checkpoint to a table `_mytable_ghk` every 60 seconds. After `gh-ost` is interrupted/killed, the migration can be resumed with: +```shell +# resume migration +gh-ost \ +--chunk-size=100 +--host=replica1.company.com \ +--database="mydb" \ +--table="mytable" \ +--alter="add column mycol varchar(20)" +--gtid \ +--resume \ +--execute +``` + +`gh-ost` then reconnects at the binlog coordinates of the last checkpoint and resumes copying rows at the chunk specified by the checkpoint. The data integrity of the ghost table is preserved because `gh-ost` applies row DMLs and copies row in an idempotent way. diff --git a/go/base/context.go b/go/base/context.go index 0a1cae739..35030e30b 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -103,6 +103,7 @@ type MigrationContext struct { GoogleCloudPlatform bool AzureMySQL bool AttemptInstantDDL bool + Resume bool // SkipPortValidation allows skipping the port validation in `ValidateConnection` // This is useful when connecting to a MySQL instance where the external port @@ -153,6 +154,8 @@ type MigrationContext struct { HooksHintToken string HooksStatusIntervalSec int64 PanicOnWarnings bool + Checkpoint bool + CheckpointIntervalSeconds int64 DropServeSocket bool ServeSocketFile string @@ -239,6 +242,7 @@ type MigrationContext struct { Iteration int64 MigrationIterationRangeMinValues *sql.ColumnValues MigrationIterationRangeMaxValues *sql.ColumnValues + InitialStreamerCoords mysql.BinlogCoordinates ForceTmpTableName string IncludeTriggers bool @@ -380,6 +384,15 @@ func (this *MigrationContext) GetChangelogTableName() string { } } +// GetCheckpointTableName generates the name of checkpoint table. +func (this *MigrationContext) GetCheckpointTableName() string { + if this.ForceTmpTableName != "" { + return getSafeTableName(this.ForceTmpTableName, "ghk") + } else { + return getSafeTableName(this.OriginalTableName, "ghk") + } +} + // GetVoluntaryLockName returns a name of a voluntary lock to be used throughout // the swap-tables process. func (this *MigrationContext) GetVoluntaryLockName() string { diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 6391cf4fb..0a2a8afa4 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -145,6 +145,9 @@ func main() { flag.StringVar(&migrationContext.TriggerSuffix, "trigger-suffix", "", "Add a suffix to the trigger name (i.e '_v2'). Requires '--include-triggers'") flag.BoolVar(&migrationContext.RemoveTriggerSuffix, "remove-trigger-suffix-if-exists", false, "Remove given suffix from name of trigger. Requires '--include-triggers' and '--trigger-suffix'") flag.BoolVar(&migrationContext.SkipPortValidation, "skip-port-validation", false, "Skip port validation for MySQL connections") + flag.BoolVar(&migrationContext.Checkpoint, "checkpoint", false, "Enable migration checkpoints") + flag.Int64Var(&migrationContext.CheckpointIntervalSeconds, "checkpoint-seconds", 300, "The number of seconds between checkpoints") + flag.BoolVar(&migrationContext.Resume, "resume", false, "Attempt to resume migration from checkpoint") maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'. When status exceeds threshold, app throttles writes") criticalLoad := flag.String("critical-load", "", "Comma delimited status-name=threshold, same format as --max-load. When status exceeds threshold, app panics and quits") @@ -284,6 +287,9 @@ func main() { if *storageEngine == "rocksdb" { migrationContext.Log.Warning("RocksDB storage engine support is experimental") } + if migrationContext.CheckpointIntervalSeconds < 10 { + migrationContext.Log.Fatalf("--checkpoint-seconds should be >=10") + } switch *cutOver { case "atomic", "default", "": @@ -316,6 +322,7 @@ func main() { } migrationContext.CliPassword = string(bytePassword) } + migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis) migrationContext.SetNiceRatio(*niceRatio) migrationContext.SetChunkSize(*chunkSize) diff --git a/go/logic/applier.go b/go/logic/applier.go index 30ac97695..3fe7f2287 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -1,5 +1,5 @@ /* - Copyright 2021 GitHub Inc. + Copyright 2025 GitHub Inc. See https://github.com/github/gh-ost/blob/master/LICENSE */ @@ -21,6 +21,9 @@ import ( "context" "database/sql/driver" + "errors" + "sync" + "github.com/github/gh-ost/go/mysql" drivermysql "github.com/go-sql-driver/mysql" "github.com/openark/golib/sqlutils" @@ -31,6 +34,9 @@ const ( atomicCutOverMagicHint = "ghost-cut-over-sentry" ) +// ErrNoCheckpointFound is returned when an empty checkpoint table is queried. +var ErrNoCheckpointFound = errors.New("no checkpoint found in _ghk table") + type dmlBuildResult struct { query string args []interface{} @@ -66,9 +72,17 @@ type Applier struct { finishedMigrating int64 name string - dmlDeleteQueryBuilder *sql.DMLDeleteQueryBuilder - dmlInsertQueryBuilder *sql.DMLInsertQueryBuilder - dmlUpdateQueryBuilder *sql.DMLUpdateQueryBuilder + CurrentCoordinatesMutex sync.Mutex + CurrentCoordinates mysql.BinlogCoordinates + + LastIterationRangeMutex sync.Mutex + LastIterationRangeMinValues *sql.ColumnValues + LastIterationRangeMaxValues *sql.ColumnValues + + dmlDeleteQueryBuilder *sql.DMLDeleteQueryBuilder + dmlInsertQueryBuilder *sql.DMLInsertQueryBuilder + dmlUpdateQueryBuilder *sql.DMLUpdateQueryBuilder + checkpointInsertQueryBuilder *sql.CheckpointInsertQueryBuilder } func NewApplier(migrationContext *base.MigrationContext) *Applier { @@ -144,6 +158,15 @@ func (this *Applier) prepareQueries() (err error) { ); err != nil { return err } + if this.migrationContext.Checkpoint { + if this.checkpointInsertQueryBuilder, err = sql.NewCheckpointQueryBuilder( + this.migrationContext.DatabaseName, + this.migrationContext.GetCheckpointTableName(), + &this.migrationContext.UniqueKey.Columns, + ); err != nil { + return err + } + } return nil } @@ -400,6 +423,54 @@ func (this *Applier) CreateChangelogTable() error { return nil } +// Create the checkpoint table to store the chunk copy and applier state. +// There are two sets of columns with the same types as the shared unique key, +// one for IterationMinValues and one for IterationMaxValues. +func (this *Applier) CreateCheckpointTable() error { + if err := this.DropCheckpointTable(); err != nil { + return err + } + colDefs := []string{ + "`gh_ost_chk_id` bigint auto_increment primary key", + "`gh_ost_chk_timestamp` bigint", + "`gh_ost_chk_coords` varchar(4096)", + "`gh_ost_chk_iteration` bigint", + "`gh_ost_rows_copied` bigint", + "`gh_ost_dml_applied` bigint", + } + for _, col := range this.migrationContext.UniqueKey.Columns.Columns() { + if col.MySQLType == "" { + return fmt.Errorf("CreateCheckpoinTable: column %s has no type information. applyColumnTypes must be called", sql.EscapeName(col.Name)) + } + minColName := sql.TruncateColumnName(col.Name, sql.MaxColumnNameLength-4) + "_min" + colDef := fmt.Sprintf("%s %s", sql.EscapeName(minColName), col.MySQLType) + if !col.Nullable { + colDef += " NOT NULL" + } + colDefs = append(colDefs, colDef) + } + + for _, col := range this.migrationContext.UniqueKey.Columns.Columns() { + maxColName := sql.TruncateColumnName(col.Name, sql.MaxColumnNameLength-4) + "_max" + colDef := fmt.Sprintf("%s %s", sql.EscapeName(maxColName), col.MySQLType) + if !col.Nullable { + colDef += " NOT NULL" + } + colDefs = append(colDefs, colDef) + } + + query := fmt.Sprintf("create /* gh-ost */ table %s.%s (\n %s\n)", + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetCheckpointTableName()), + strings.Join(colDefs, ",\n "), + ) + this.migrationContext.Log.Infof("Created checkpoint table") + if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { + return err + } + return nil +} + // dropTable drops a given table on the applied host func (this *Applier) dropTable(tableName string) error { query := fmt.Sprintf(`drop /* gh-ost */ table if exists %s.%s`, @@ -494,6 +565,11 @@ func (this *Applier) DropChangelogTable() error { return this.dropTable(this.migrationContext.GetChangelogTableName()) } +// DropCheckpointTable drops the checkpoint table on applier host +func (this *Applier) DropCheckpointTable() error { + return this.dropTable(this.migrationContext.GetCheckpointTableName()) +} + // DropOldTable drops the _Old table on the applier host func (this *Applier) DropOldTable() error { return this.dropTable(this.migrationContext.GetOldTableName()) @@ -542,6 +618,60 @@ func (this *Applier) WriteChangelogState(value string) (string, error) { return this.WriteAndLogChangelog("state", value) } +// WriteCheckpoints writes a checkpoint to the _ghk table. +func (this *Applier) WriteCheckpoint(chk *Checkpoint) (int64, error) { + var insertId int64 + uniqueKeyArgs := sqlutils.Args(chk.IterationRangeMin.AbstractValues()...) + uniqueKeyArgs = append(uniqueKeyArgs, chk.IterationRangeMax.AbstractValues()...) + query, uniqueKeyArgs, err := this.checkpointInsertQueryBuilder.BuildQuery(uniqueKeyArgs) + if err != nil { + return insertId, err + } + args := sqlutils.Args(chk.LastTrxCoords.String(), chk.Iteration, chk.RowsCopied, chk.DMLApplied) + args = append(args, uniqueKeyArgs...) + res, err := this.db.Exec(query, args...) + if err != nil { + return insertId, err + } + return res.LastInsertId() +} + +func (this *Applier) ReadLastCheckpoint() (*Checkpoint, error) { + row := this.db.QueryRow(fmt.Sprintf(`select /* gh-ost */ * from %s.%s order by gh_ost_chk_id desc limit 1`, this.migrationContext.DatabaseName, this.migrationContext.GetCheckpointTableName())) + chk := &Checkpoint{ + IterationRangeMin: sql.NewColumnValues(this.migrationContext.UniqueKey.Columns.Len()), + IterationRangeMax: sql.NewColumnValues(this.migrationContext.UniqueKey.Columns.Len()), + } + + var coordStr string + var timestamp int64 + ptrs := []interface{}{&chk.Id, ×tamp, &coordStr, &chk.Iteration, &chk.RowsCopied, &chk.DMLApplied} + ptrs = append(ptrs, chk.IterationRangeMin.ValuesPointers...) + ptrs = append(ptrs, chk.IterationRangeMax.ValuesPointers...) + err := row.Scan(ptrs...) + if err != nil { + if errors.Is(err, gosql.ErrNoRows) { + return nil, ErrNoCheckpointFound + } + return nil, err + } + chk.Timestamp = time.Unix(timestamp, 0) + if this.migrationContext.UseGTIDs { + gtidCoords, err := mysql.NewGTIDBinlogCoordinates(coordStr) + if err != nil { + return nil, err + } + chk.LastTrxCoords = gtidCoords + } else { + fileCoords, err := mysql.ParseFileBinlogCoordinates(coordStr) + if err != nil { + return nil, err + } + chk.LastTrxCoords = fileCoords + } + return chk, nil +} + // InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table. // This is done asynchronously func (this *Applier) InitiateHeartbeat() { @@ -686,8 +816,15 @@ func (this *Applier) ReadMigrationRangeValues() error { // CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values, // which will be used for copying the next chunk of rows. Ir returns "false" if there is // no further chunk to work through, i.e. we're past the last chunk and are done with -// iterating the range (and this done with copying row chunks) +// iterating the range (and thus done with copying row chunks) func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) { + this.LastIterationRangeMutex.Lock() + if this.migrationContext.MigrationIterationRangeMinValues != nil && this.migrationContext.MigrationIterationRangeMaxValues != nil { + this.LastIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMinValues.Clone() + this.LastIterationRangeMaxValues = this.migrationContext.MigrationIterationRangeMaxValues.Clone() + } + this.LastIterationRangeMutex.Unlock() + this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues if this.migrationContext.MigrationIterationRangeMinValues == nil { this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 13e8a4d3b..232349d16 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -1,5 +1,5 @@ /* - Copyright 2022 GitHub Inc. + Copyright 2025 GitHub Inc. See https://github.com/github/gh-ost/blob/master/LICENSE */ @@ -15,12 +15,13 @@ import ( "github.com/stretchr/testify/suite" "github.com/testcontainers/testcontainers-go" - "github.com/testcontainers/testcontainers-go/modules/mysql" + testmysql "github.com/testcontainers/testcontainers-go/modules/mysql" "fmt" "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/binlog" + "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" "github.com/testcontainers/testcontainers-go/wait" ) @@ -207,11 +208,11 @@ type ApplierTestSuite struct { func (suite *ApplierTestSuite) SetupSuite() { ctx := context.Background() - mysqlContainer, err := mysql.Run(ctx, + mysqlContainer, err := testmysql.Run(ctx, testMysqlContainerImage, - mysql.WithDatabase(testMysqlDatabase), - mysql.WithUsername(testMysqlUser), - mysql.WithPassword(testMysqlPass), + testmysql.WithDatabase(testMysqlDatabase), + testmysql.WithUsername(testMysqlUser), + testmysql.WithPassword(testMysqlPass), testcontainers.WithWaitStrategy(wait.ForExposedPort()), ) suite.Require().NoError(err) @@ -631,6 +632,92 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQueryFai suite.Require().Contains(applier.migrationContext.MigrationLastInsertSQLWarnings[0], "Warning: Data truncated for column 'name' at row 1") } +func (suite *ApplierTestSuite) TestWriteCheckpoint() { + ctx := context.Background() + + var err error + + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id int not null, id2 char(4) CHARACTER SET utf8mb4, primary key(id, id2))", getTestTableName())) + suite.Require().NoError(err) + + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, id2 char(4) CHARACTER SET utf8mb4, name varchar(20), primary key(id, id2));", getTestGhostTableName())) + suite.Require().NoError(err) + + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, id2) VALUES (?,?), (?,?), (?,?)", getTestTableName()), 411, "君子懷德", 411, "小人懷土", 212, "君子不器") + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.InspectorConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + + migrationContext.AlterStatementOptions = "add column name varchar(20)" + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "id2"}) + migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "id2"}) + migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "id2"}) + migrationContext.Checkpoint = true + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "PRIMARY", + NameInGhostTable: "PRIMARY", + Columns: *sql.NewColumnList([]string{"id", "id2"}), + } + + inspector := NewInspector(migrationContext) + suite.Require().NoError(inspector.InitDBConnections()) + + err = inspector.applyColumnTypes(testMysqlDatabase, testMysqlTableName, &migrationContext.UniqueKey.Columns) + suite.Require().NoError(err) + + applier := NewApplier(migrationContext) + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + err = applier.CreateChangelogTable() + suite.Require().NoError(err) + + err = applier.CreateCheckpointTable() + suite.Require().NoError(err) + + err = applier.prepareQueries() + suite.Require().NoError(err) + + err = applier.ReadMigrationRangeValues() + suite.Require().NoError(err) + + // checkpoint table is empty + _, err = applier.ReadLastCheckpoint() + suite.Require().ErrorIs(err, ErrNoCheckpointFound) + + // write a checkpoint and read it back + coords := mysql.NewFileBinlogCoordinates("mysql-bin.000003", int64(219202907)) + + chk := &Checkpoint{ + LastTrxCoords: coords, + IterationRangeMin: applier.migrationContext.MigrationRangeMinValues, + IterationRangeMax: applier.migrationContext.MigrationRangeMaxValues, + Iteration: 2, + RowsCopied: 100000, + DMLApplied: 200000, + } + id, err := applier.WriteCheckpoint(chk) + suite.Require().NoError(err) + suite.Require().Equal(int64(1), id) + + gotChk, err := applier.ReadLastCheckpoint() + suite.Require().NoError(err) + + suite.Require().Equal(chk.Iteration, gotChk.Iteration) + suite.Require().Equal(chk.LastTrxCoords.String(), gotChk.LastTrxCoords.String()) + suite.Require().Equal(chk.IterationRangeMin.String(), gotChk.IterationRangeMin.String()) + suite.Require().Equal(chk.IterationRangeMax.String(), gotChk.IterationRangeMax.String()) + suite.Require().Equal(chk.RowsCopied, gotChk.RowsCopied) + suite.Require().Equal(chk.DMLApplied, gotChk.DMLApplied) +} + func TestApplier(t *testing.T) { suite.Run(t, new(ApplierTestSuite)) } diff --git a/go/logic/checkpoint.go b/go/logic/checkpoint.go new file mode 100644 index 000000000..69cc2bd20 --- /dev/null +++ b/go/logic/checkpoint.go @@ -0,0 +1,31 @@ +/* + Copyright 2025 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package logic + +import ( + "time" + + "github.com/github/gh-ost/go/mysql" + "github.com/github/gh-ost/go/sql" +) + +// Checkpoint holds state necessary to resume a migration. +type Checkpoint struct { + Id int64 + Timestamp time.Time + // LastTrxCoords are coordinates of a transaction + // that has been applied on ghost table. + LastTrxCoords mysql.BinlogCoordinates + // IterationRangeMin is the min shared key value + // for the chunk copier range. + IterationRangeMin *sql.ColumnValues + // IterationRangeMax is the max shared key value + // for the chunk copier range. + IterationRangeMax *sql.ColumnValues + Iteration int64 + RowsCopied int64 + DMLApplied int64 +} diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 7a7dc8424..044360153 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -1,5 +1,5 @@ /* - Copyright 2022 GitHub Inc. + Copyright 2025 GitHub Inc. See https://github.com/github/gh-ost/blob/master/LICENSE */ @@ -711,12 +711,17 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL columnName := m.GetString("COLUMN_NAME") columnType := m.GetString("COLUMN_TYPE") columnOctetLength := m.GetUint("CHARACTER_OCTET_LENGTH") + isNullable := m.GetString("IS_NULLABLE") extra := m.GetString("EXTRA") for _, columnsList := range columnsLists { column := columnsList.GetColumn(columnName) if column == nil { continue } + column.MySQLType = columnType + if isNullable == "YES" { + column.Nullable = true + } if strings.Contains(columnType, "unsigned") { column.IsUnsigned = true diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 4d7074b22..33271d01a 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1,5 +1,5 @@ /* - Copyright 2022 GitHub Inc. + Copyright 2025 GitHub Inc. See https://github.com/github/gh-ost/blob/master/LICENSE */ @@ -26,6 +26,7 @@ var ( ErrMigratorUnsupportedRenameAlter = errors.New("ALTER statement seems to RENAME the table. This is not supported, and you should run your RENAME outside gh-ost.") ErrMigrationNotAllowedOnMaster = errors.New("It seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (this reduces load from the master). To proceed please provide --allow-on-master.") RetrySleepFn = time.Sleep + checkpointTimeout = 2 * time.Second ) type ChangelogState string @@ -46,6 +47,7 @@ type tableWriteFunc func() error type applyEventStruct struct { writeFunc *tableWriteFunc dmlEvent *binlog.BinlogDMLEvent + coords mysql.BinlogCoordinates } func newApplyEventStructByFunc(writeFunc *tableWriteFunc) *applyEventStruct { @@ -53,8 +55,8 @@ func newApplyEventStructByFunc(writeFunc *tableWriteFunc) *applyEventStruct { return result } -func newApplyEventStructByDML(dmlEvent *binlog.BinlogDMLEvent) *applyEventStruct { - result := &applyEventStruct{dmlEvent: dmlEvent} +func newApplyEventStructByDML(dmlEntry *binlog.BinlogEntry) *applyEventStruct { + result := &applyEventStruct{dmlEvent: dmlEntry.DmlEvent, coords: dmlEntry.Coordinates} return result } @@ -91,8 +93,6 @@ type Migrator struct { copyRowsQueue chan tableWriteFunc applyEventsQueue chan *applyEventStruct - handledChangelogStates map[string]bool - finishedMigrating int64 } @@ -107,10 +107,9 @@ func NewMigrator(context *base.MigrationContext, appVersion string) *Migrator { rowCopyComplete: make(chan error), allEventsUpToLockProcessed: make(chan string), - copyRowsQueue: make(chan tableWriteFunc), - applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), - handledChangelogStates: make(map[string]bool), - finishedMigrating: 0, + copyRowsQueue: make(chan tableWriteFunc), + applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), + finishedMigrating: 0, } return migrator } @@ -201,20 +200,20 @@ func (this *Migrator) canStopStreaming() bool { } // onChangelogEvent is called when a binlog event operation on the changelog table is intercepted. -func (this *Migrator) onChangelogEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) { +func (this *Migrator) onChangelogEvent(dmlEntry *binlog.BinlogEntry) (err error) { // Hey, I created the changelog table, I know the type of columns it has! - switch hint := dmlEvent.NewColumnValues.StringColumn(2); hint { + switch hint := dmlEntry.DmlEvent.NewColumnValues.StringColumn(2); hint { case "state": - return this.onChangelogStateEvent(dmlEvent) + return this.onChangelogStateEvent(dmlEntry) case "heartbeat": - return this.onChangelogHeartbeatEvent(dmlEvent) + return this.onChangelogHeartbeatEvent(dmlEntry) default: return nil } } -func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) { - changelogStateString := dmlEvent.NewColumnValues.StringColumn(3) +func (this *Migrator) onChangelogStateEvent(dmlEntry *binlog.BinlogEntry) (err error) { + changelogStateString := dmlEntry.DmlEvent.NewColumnValues.StringColumn(3) changelogState := ReadChangelogState(changelogStateString) this.migrationContext.Log.Infof("Intercepted changelog state %s", changelogState) switch changelogState { @@ -242,14 +241,17 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er return nil } -func (this *Migrator) onChangelogHeartbeatEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) { - changelogHeartbeatString := dmlEvent.NewColumnValues.StringColumn(3) +func (this *Migrator) onChangelogHeartbeatEvent(dmlEntry *binlog.BinlogEntry) (err error) { + changelogHeartbeatString := dmlEntry.DmlEvent.NewColumnValues.StringColumn(3) heartbeatTime, err := time.Parse(time.RFC3339Nano, changelogHeartbeatString) if err != nil { return this.migrationContext.Log.Errore(err) } else { this.migrationContext.SetLastHeartbeatOnChangelogTime(heartbeatTime) + this.applier.CurrentCoordinatesMutex.Lock() + this.applier.CurrentCoordinates = dmlEntry.Coordinates + this.applier.CurrentCoordinatesMutex.Unlock() return nil } } @@ -352,8 +354,14 @@ func (this *Migrator) Migrate() (err error) { if err := this.initiateInspector(); err != nil { return err } - if err := this.initiateStreaming(); err != nil { - return err + // If we are resuming, we will initiateStreaming later when we know + // the coordinates to resume streaming. + // If not resuming, the streamer must be initiated before the applier, + // so that the "GhostTableMigrated" event gets processed. + if !this.migrationContext.Resume { + if err := this.initiateStreaming(); err != nil { + return err + } } if err := this.initiateApplier(); err != nil { return err @@ -384,9 +392,11 @@ func (this *Migrator) Migrate() (err error) { } initialLag, _ := this.inspector.getReplicationLag() - this.migrationContext.Log.Infof("Waiting for ghost table to be migrated. Current lag is %+v", initialLag) - <-this.ghostTableMigrated - this.migrationContext.Log.Debugf("ghost table migrated") + if !this.migrationContext.Resume { + this.migrationContext.Log.Infof("Waiting for ghost table to be migrated. Current lag is %+v", initialLag) + <-this.ghostTableMigrated + this.migrationContext.Log.Debugf("ghost table migrated") + } // Yay! We now know the Ghost and Changelog tables are good to examine! // When running on replica, this means the replica has those tables. When running // on master this is always true, of course, and yet it also implies this knowledge @@ -394,10 +404,38 @@ func (this *Migrator) Migrate() (err error) { if err := this.inspector.inspectOriginalAndGhostTables(); err != nil { return err } + // We can prepare some of the queries on the applier if err := this.applier.prepareQueries(); err != nil { return err } + + // inspectOriginalAndGhostTables must be called before creating checkpoint table. + if this.migrationContext.Checkpoint && !this.migrationContext.Resume { + if err := this.applier.CreateCheckpointTable(); err != nil { + this.migrationContext.Log.Errorf("Unable to create checkpoint table, see further error details.") + } + } + + if this.migrationContext.Resume { + lastCheckpoint, err := this.applier.ReadLastCheckpoint() + if err != nil { + return this.migrationContext.Log.Errorf("No checkpoint found, unable to resume: %+v", err) + } + this.migrationContext.Log.Infof("Resuming from checkpoint coords=%+v range_min=%+v range_max=%+v iteration=%d", + lastCheckpoint.LastTrxCoords, lastCheckpoint.IterationRangeMin.String(), lastCheckpoint.IterationRangeMax.String(), lastCheckpoint.Iteration) + + this.migrationContext.MigrationIterationRangeMinValues = lastCheckpoint.IterationRangeMin + this.migrationContext.MigrationIterationRangeMaxValues = lastCheckpoint.IterationRangeMax + this.migrationContext.Iteration = lastCheckpoint.Iteration + this.migrationContext.TotalRowsCopied = lastCheckpoint.RowsCopied + this.migrationContext.TotalDMLEventsApplied = lastCheckpoint.DMLApplied + this.migrationContext.InitialStreamerCoords = lastCheckpoint.LastTrxCoords + if err := this.initiateStreaming(); err != nil { + return err + } + } + // Validation complete! We're good to execute this migration if err := this.hooksExecutor.onValidated(); err != nil { return err @@ -427,6 +465,9 @@ func (this *Migrator) Migrate() (err error) { go this.iterateChunks() this.migrationContext.MarkRowCopyStartTime() go this.initiateStatus() + if this.migrationContext.Checkpoint { + go this.checkpointLoop() + } this.migrationContext.Log.Debugf("Operating until row copy is complete") this.consumeRowCopyComplete() @@ -1082,7 +1123,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied), len(this.applyEventsQueue), cap(this.applyEventsQueue), base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), - currentBinlogCoordinates, + currentBinlogCoordinates.DisplayString(), this.migrationContext.GetCurrentLagDuration().Seconds(), this.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds(), state, @@ -1119,8 +1160,8 @@ func (this *Migrator) initiateStreaming() error { false, this.migrationContext.DatabaseName, this.migrationContext.GetChangelogTableName(), - func(dmlEvent *binlog.BinlogDMLEvent) error { - return this.onChangelogEvent(dmlEvent) + func(dmlEntry *binlog.BinlogEntry) error { + return this.onChangelogEvent(dmlEntry) }, ) @@ -1153,8 +1194,8 @@ func (this *Migrator) addDMLEventsListener() error { false, this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, - func(dmlEvent *binlog.BinlogDMLEvent) error { - this.applyEventsQueue <- newApplyEventStructByDML(dmlEvent) + func(dmlEntry *binlog.BinlogEntry) error { + this.applyEventsQueue <- newApplyEventStructByDML(dmlEntry) return nil }, ) @@ -1179,32 +1220,33 @@ func (this *Migrator) initiateApplier() error { if err := this.applier.InitDBConnections(); err != nil { return err } - if err := this.applier.ValidateOrDropExistingTables(); err != nil { - return err - } - if err := this.applier.CreateChangelogTable(); err != nil { - this.migrationContext.Log.Errorf("Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out") - return err - } - if err := this.applier.CreateGhostTable(); err != nil { - this.migrationContext.Log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") - return err - } - - if err := this.applier.AlterGhost(); err != nil { - this.migrationContext.Log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out") - return err - } - - if this.migrationContext.OriginalTableAutoIncrement > 0 && !this.parser.IsAutoIncrementDefined() { - // Original table has AUTO_INCREMENT value and the -alter statement does not indicate any override, - // so we should copy AUTO_INCREMENT value onto our ghost table. - if err := this.applier.AlterGhostAutoIncrement(); err != nil { - this.migrationContext.Log.Errorf("Unable to ALTER ghost table AUTO_INCREMENT value, see further error details. Bailing out") + if !this.migrationContext.Resume { + if err := this.applier.ValidateOrDropExistingTables(); err != nil { + return err + } + if err := this.applier.CreateChangelogTable(); err != nil { + this.migrationContext.Log.Errorf("Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out") + return err + } + if err := this.applier.CreateGhostTable(); err != nil { + this.migrationContext.Log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") return err } + if err := this.applier.AlterGhost(); err != nil { + this.migrationContext.Log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out") + return err + } + + if this.migrationContext.OriginalTableAutoIncrement > 0 && !this.parser.IsAutoIncrementDefined() { + // Original table has AUTO_INCREMENT value and the -alter statement does not indicate any override, + // so we should copy AUTO_INCREMENT value onto our ghost table. + if err := this.applier.AlterGhostAutoIncrement(); err != nil { + this.migrationContext.Log.Errorf("Unable to ALTER ghost table AUTO_INCREMENT value, see further error details. Bailing out") + return err + } + } + this.applier.WriteChangelogState(string(GhostTableMigrated)) } - this.applier.WriteChangelogState(string(GhostTableMigrated)) if err := this.applier.StateMetadataLockInstrument(); err != nil { this.migrationContext.Log.Errorf("Unable to enable metadata lock instrument, see further error details. Bailing out") return err @@ -1339,6 +1381,11 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { if err := this.retryOperation(applyEventFunc); err != nil { return this.migrationContext.Log.Errore(err) } + // update applier coordinates + this.applier.CurrentCoordinatesMutex.Lock() + this.applier.CurrentCoordinates = eventStruct.coords + this.applier.CurrentCoordinatesMutex.Unlock() + if nonDmlStructToApply != nil { // We pulled DML events from the queue, and then we hit a non-DML event. Wait! // We need to handle it! @@ -1350,6 +1397,72 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { return nil } +// Checkpoint attempts to write a checkpoint of the Migrator's current state. +// It gets the binlog coordinates of the last received trx and waits until the +// applier reaches that trx. At that point it's safe to resume from these coordinates. +func (this *Migrator) Checkpoint(ctx context.Context) (*Checkpoint, error) { + coords := this.eventsStreamer.GetCurrentBinlogCoordinates() + this.applier.LastIterationRangeMutex.Lock() + if this.applier.LastIterationRangeMaxValues == nil || this.applier.LastIterationRangeMinValues == nil { + this.applier.LastIterationRangeMutex.Unlock() + return nil, errors.New("iteration range is empty, not checkpointing...") + } + chk := &Checkpoint{ + Iteration: this.migrationContext.GetIteration(), + IterationRangeMin: this.applier.LastIterationRangeMinValues.Clone(), + IterationRangeMax: this.applier.LastIterationRangeMaxValues.Clone(), + LastTrxCoords: coords, + RowsCopied: atomic.LoadInt64(&this.migrationContext.TotalRowsCopied), + DMLApplied: atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied), + } + this.applier.LastIterationRangeMutex.Unlock() + + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + this.applier.CurrentCoordinatesMutex.Lock() + if coords.SmallerThanOrEquals(this.applier.CurrentCoordinates) { + id, err := this.applier.WriteCheckpoint(chk) + chk.Id = id + this.applier.CurrentCoordinatesMutex.Unlock() + return chk, err + } + this.applier.CurrentCoordinatesMutex.Unlock() + time.Sleep(500 * time.Millisecond) + } + } +} + +func (this *Migrator) checkpointLoop() { + if this.migrationContext.Noop { + this.migrationContext.Log.Debugf("Noop operation; not really checkpointing") + return + } + checkpointInterval := time.Duration(this.migrationContext.CheckpointIntervalSeconds) * time.Second + ticker := time.NewTicker(checkpointInterval) + for t := range ticker.C { + if atomic.LoadInt64(&this.finishedMigrating) > 0 { + return + } + this.migrationContext.Log.Infof("starting checkpoint at %+v", t) + ctx, cancel := context.WithTimeout(context.Background(), checkpointTimeout) + chk, err := this.Checkpoint(ctx) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + this.migrationContext.Log.Errorf("checkpoint attempt timed out after %+v", checkpointTimeout) + } else { + this.migrationContext.Log.Errorf("error attempting checkpoint: %+v", err) + } + } else { + this.migrationContext.Log.Infof("checkpoint success at coords=%+v range_min=%+v range_max=%+v iteration=%d", + chk.LastTrxCoords.DisplayString(), chk.IterationRangeMin.String(), chk.IterationRangeMax.String(), chk.Iteration) + } + cancel() + } +} + // executeWriteFuncs writes data via applier: both the rowcopy and the events backlog. // This is where the ghost table gets the data. The function fills the data single-threaded. // Both event backlog and rowcopy events are polled; the backlog events have precedence. @@ -1428,6 +1541,9 @@ func (this *Migrator) finalCleanup() error { if err := this.retryOperation(this.applier.DropChangelogTable); err != nil { return err } + if err := this.retryOperation(this.applier.DropCheckpointTable); err != nil { + return err + } if this.migrationContext.OkToDropTable && !this.migrationContext.TestOnReplica { if err := this.retryOperation(this.applier.DropOldTable); err != nil { return err diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index 86c060acf..b268054ab 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -21,12 +21,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "github.com/testcontainers/testcontainers-go/modules/mysql" + testmysql "github.com/testcontainers/testcontainers-go/modules/mysql" "runtime" "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/binlog" + "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" @@ -35,6 +36,7 @@ import ( func TestMigratorOnChangelogEvent(t *testing.T) { migrationContext := base.NewMigrationContext() migrator := NewMigrator(migrationContext, "1.2.3") + migrator.applier = NewApplier(migrationContext) t.Run("heartbeat", func(t *testing.T) { columnValues := sql.ToColumnValues([]interface{}{ @@ -43,10 +45,12 @@ func TestMigratorOnChangelogEvent(t *testing.T) { "heartbeat", "2022-08-16T00:45:10.52Z", }) - require.Nil(t, migrator.onChangelogEvent(&binlog.BinlogDMLEvent{ - DatabaseName: "test", - DML: binlog.InsertDML, - NewColumnValues: columnValues, + require.Nil(t, migrator.onChangelogEvent(&binlog.BinlogEntry{ + DmlEvent: &binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.InsertDML, + NewColumnValues: columnValues}, + Coordinates: mysql.NewFileBinlogCoordinates("mysql-bin.000004", int64(4)), })) }) @@ -66,10 +70,12 @@ func TestMigratorOnChangelogEvent(t *testing.T) { "state", AllEventsUpToLockProcessed, }) - require.Nil(t, migrator.onChangelogEvent(&binlog.BinlogDMLEvent{ - DatabaseName: "test", - DML: binlog.InsertDML, - NewColumnValues: columnValues, + require.Nil(t, migrator.onChangelogEvent(&binlog.BinlogEntry{ + DmlEvent: &binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.InsertDML, + NewColumnValues: columnValues}, + Coordinates: mysql.NewFileBinlogCoordinates("mysql-bin.000004", int64(4)), })) wg.Wait() }) @@ -85,10 +91,12 @@ func TestMigratorOnChangelogEvent(t *testing.T) { "state", GhostTableMigrated, }) - require.Nil(t, migrator.onChangelogEvent(&binlog.BinlogDMLEvent{ - DatabaseName: "test", - DML: binlog.InsertDML, - NewColumnValues: columnValues, + require.Nil(t, migrator.onChangelogEvent(&binlog.BinlogEntry{ + DmlEvent: &binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.InsertDML, + NewColumnValues: columnValues}, + Coordinates: mysql.NewFileBinlogCoordinates("mysql-bin.000004", int64(4)), })) }) @@ -99,10 +107,12 @@ func TestMigratorOnChangelogEvent(t *testing.T) { "state", Migrated, }) - require.Nil(t, migrator.onChangelogEvent(&binlog.BinlogDMLEvent{ - DatabaseName: "test", - DML: binlog.InsertDML, - NewColumnValues: columnValues, + require.Nil(t, migrator.onChangelogEvent(&binlog.BinlogEntry{ + DmlEvent: &binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.InsertDML, + NewColumnValues: columnValues}, + Coordinates: mysql.NewFileBinlogCoordinates("mysql-bin.000004", int64(4)), })) }) @@ -113,10 +123,12 @@ func TestMigratorOnChangelogEvent(t *testing.T) { "state", ReadMigrationRangeValues, }) - require.Nil(t, migrator.onChangelogEvent(&binlog.BinlogDMLEvent{ - DatabaseName: "test", - DML: binlog.InsertDML, - NewColumnValues: columnValues, + require.Nil(t, migrator.onChangelogEvent(&binlog.BinlogEntry{ + DmlEvent: &binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.InsertDML, + NewColumnValues: columnValues}, + Coordinates: mysql.NewFileBinlogCoordinates("mysql-bin.000004", int64(4)), })) }) } @@ -283,11 +295,11 @@ type MigratorTestSuite struct { func (suite *MigratorTestSuite) SetupSuite() { ctx := context.Background() - mysqlContainer, err := mysql.Run(ctx, + mysqlContainer, err := testmysql.Run(ctx, testMysqlContainerImage, - mysql.WithDatabase(testMysqlDatabase), - mysql.WithUsername(testMysqlUser), - mysql.WithPassword(testMysqlPass), + testmysql.WithDatabase(testMysqlDatabase), + testmysql.WithUsername(testMysqlUser), + testmysql.WithPassword(testMysqlPass), testcontainers.WithWaitStrategy(wait.ForExposedPort()), ) suite.Require().NoError(err) diff --git a/go/logic/streamer.go b/go/logic/streamer.go index fd0240ffd..63afc3f3d 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -24,7 +24,7 @@ type BinlogEventListener struct { async bool databaseName string tableName string - onDmlEvent func(event *binlog.BinlogDMLEvent) error + onDmlEvent func(event *binlog.BinlogEntry) error } const ( @@ -49,18 +49,19 @@ type EventsStreamer struct { func NewEventsStreamer(migrationContext *base.MigrationContext) *EventsStreamer { return &EventsStreamer{ - connectionConfig: migrationContext.InspectorConnectionConfig, - migrationContext: migrationContext, - listeners: [](*BinlogEventListener){}, - listenersMutex: &sync.Mutex{}, - eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize), - name: "streamer", + connectionConfig: migrationContext.InspectorConnectionConfig, + migrationContext: migrationContext, + listeners: [](*BinlogEventListener){}, + listenersMutex: &sync.Mutex{}, + eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize), + name: "streamer", + initialBinlogCoordinates: migrationContext.InitialStreamerCoords, } } // AddListener registers a new listener for binlog events, on a per-table basis func (this *EventsStreamer) AddListener( - async bool, databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogDMLEvent) error) (err error) { + async bool, databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogEntry) error) (err error) { this.listenersMutex.Lock() defer this.listenersMutex.Unlock() @@ -82,24 +83,24 @@ func (this *EventsStreamer) AddListener( // notifyListeners will notify relevant listeners with given DML event. Only // listeners registered for changes on the table on which the DML operates are notified. -func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent) { +func (this *EventsStreamer) notifyListeners(binlogEntry *binlog.BinlogEntry) { this.listenersMutex.Lock() defer this.listenersMutex.Unlock() for _, listener := range this.listeners { listener := listener - if !strings.EqualFold(listener.databaseName, binlogEvent.DatabaseName) { + if !strings.EqualFold(listener.databaseName, binlogEntry.DmlEvent.DatabaseName) { continue } - if !strings.EqualFold(listener.tableName, binlogEvent.TableName) { + if !strings.EqualFold(listener.tableName, binlogEntry.DmlEvent.TableName) { continue } if listener.async { go func() { - listener.onDmlEvent(binlogEvent) + listener.onDmlEvent(binlogEntry) }() } else { - listener.onDmlEvent(binlogEvent) + listener.onDmlEvent(binlogEntry) } } } @@ -114,8 +115,10 @@ func (this *EventsStreamer) InitDBConnections() (err error) { return err } this.dbVersion = version - if err := this.readCurrentBinlogCoordinates(); err != nil { - return err + if this.initialBinlogCoordinates == nil || this.initialBinlogCoordinates.IsEmpty() { + if err := this.readCurrentBinlogCoordinates(); err != nil { + return err + } } if err := this.initBinlogReader(this.initialBinlogCoordinates); err != nil { return err @@ -176,7 +179,7 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error { go func() { for binlogEntry := range this.eventsChannel { if binlogEntry.DmlEvent != nil { - this.notifyListeners(binlogEntry.DmlEvent) + this.notifyListeners(binlogEntry) } } }() diff --git a/go/logic/streamer_test.go b/go/logic/streamer_test.go index baa6076da..2c5d3886b 100644 --- a/go/logic/streamer_test.go +++ b/go/logic/streamer_test.go @@ -90,8 +90,8 @@ func (suite *EventsStreamerTestSuite) TestStreamEvents() { streamCtx, cancel := context.WithCancel(context.Background()) dmlEvents := make([]*binlog.BinlogDMLEvent, 0) - err = streamer.AddListener(false, testMysqlDatabase, testMysqlTableName, func(event *binlog.BinlogDMLEvent) error { - dmlEvents = append(dmlEvents, event) + err = streamer.AddListener(false, testMysqlDatabase, testMysqlTableName, func(event *binlog.BinlogEntry) error { + dmlEvents = append(dmlEvents, event.DmlEvent) // Stop once we've collected three events if len(dmlEvents) == 3 { @@ -165,8 +165,8 @@ func (suite *EventsStreamerTestSuite) TestStreamEventsAutomaticallyReconnects() streamCtx, cancel := context.WithCancel(context.Background()) dmlEvents := make([]*binlog.BinlogDMLEvent, 0) - err = streamer.AddListener(false, testMysqlDatabase, testMysqlTableName, func(event *binlog.BinlogDMLEvent) error { - dmlEvents = append(dmlEvents, event) + err = streamer.AddListener(false, testMysqlDatabase, testMysqlTableName, func(event *binlog.BinlogEntry) error { + dmlEvents = append(dmlEvents, event.DmlEvent) // Stop once we've collected three events if len(dmlEvents) == 3 { diff --git a/go/sql/builder.go b/go/sql/builder.go index f2683181f..757d74910 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -20,6 +20,7 @@ const ( GreaterThanOrEqualsComparisonSign ValueComparisonSign = ">=" GreaterThanComparisonSign ValueComparisonSign = ">" NotEqualsComparisonSign ValueComparisonSign = "!=" + MaxColumnNameLength = 64 ) // EscapeName will escape a db/table/column/... name by wrapping with backticks. @@ -32,6 +33,21 @@ func EscapeName(name string) string { return fmt.Sprintf("`%s`", name) } +// TruncateColumnName truncates a name so it can be used as a MySQL +// column name, taking into account UTF-8 characters. +func TruncateColumnName(name string, limit int) string { + truncatedName := name + chars := 0 + for byteIdx := range name { + if chars >= limit { + truncatedName = name[:byteIdx] + break + } + chars++ + } + return truncatedName +} + func buildColumnsPreparedValues(columns *ColumnList) []string { values := make([]string, columns.Len()) for i, column := range columns.Columns() { @@ -101,6 +117,68 @@ func BuildEqualsPreparedComparison(columns []string) (result string, err error) return BuildEqualsComparison(columns, values) } +// It holds the prepared query statement so it doesn't need to be recreated every time. +type CheckpointInsertQueryBuilder struct { + uniqueKeyColumns *ColumnList + preparedStatement string +} + +func NewCheckpointQueryBuilder(databaseName, tableName string, uniqueKeyColumns *ColumnList) (*CheckpointInsertQueryBuilder, error) { + if uniqueKeyColumns.Len() == 0 { + return nil, fmt.Errorf("Got 0 columns in BuildSetCheckpointInsertQuery") + } + values := buildColumnsPreparedValues(uniqueKeyColumns) + minUniqueColNames := []string{} + maxUniqueColNames := []string{} + for _, name := range uniqueKeyColumns.Names() { + minColName := TruncateColumnName(name, MaxColumnNameLength-4) + "_min" + maxColName := TruncateColumnName(name, MaxColumnNameLength-4) + "_max" + minUniqueColNames = append(minUniqueColNames, minColName) + maxUniqueColNames = append(maxUniqueColNames, maxColName) + } + databaseName = EscapeName(databaseName) + tableName = EscapeName(tableName) + stmt := fmt.Sprintf(` + insert /* gh-ost */ + into %s.%s + (gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, + gh_ost_rows_copied, gh_ost_dml_applied, + %s, %s) + values + (unix_timestamp(now()), ?, ?, + ?, ?, + %s, %s)`, + databaseName, tableName, + strings.Join(minUniqueColNames, ", "), + strings.Join(maxUniqueColNames, ", "), + strings.Join(values, ", "), + strings.Join(values, ", "), + ) + + b := &CheckpointInsertQueryBuilder{ + uniqueKeyColumns: uniqueKeyColumns, + preparedStatement: stmt, + } + return b, nil +} + +// BuildQuery builds the insert query. +func (b *CheckpointInsertQueryBuilder) BuildQuery(uniqueKeyArgs []interface{}) (string, []interface{}, error) { + if len(uniqueKeyArgs) != 2*b.uniqueKeyColumns.Len() { + return "", nil, fmt.Errorf("args count differs from 2 x unique key column count") + } + convertedArgs := make([]interface{}, 0, 2*b.uniqueKeyColumns.Len()) + for i, column := range b.uniqueKeyColumns.Columns() { + minArg := column.convertArg(uniqueKeyArgs[i], true) + convertedArgs = append(convertedArgs, minArg) + } + for i, column := range b.uniqueKeyColumns.Columns() { + minArg := column.convertArg(uniqueKeyArgs[i+b.uniqueKeyColumns.Len()], true) + convertedArgs = append(convertedArgs, minArg) + } + return b.preparedStatement, convertedArgs, nil +} + func BuildSetPreparedClause(columns *ColumnList) (result string, err error) { if columns.Len() == 0 { return "", fmt.Errorf("Got 0 columns in BuildSetPreparedClause") diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index a6735a324..06e402c89 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -790,3 +790,28 @@ func TestBuildDMLUpdateQuerySignedUnsigned(t *testing.T) { require.Equal(t, []interface{}{uint8(253)}, uniqueKeyArgs) } } + +func TestCheckpointQueryBuilder(t *testing.T) { + databaseName := "mydb" + tableName := "_tbl_ghk" + valueArgs := []interface{}{"mona", "mascot", int8(-17), "anothername", "anotherposition", int8(-2)} + uniqueKeyColumns := NewColumnList([]string{"name", "position", "my_very_long_column_that_is_64_utf8_characters_long_很长很长很长很长很长很长"}) + builder, err := NewCheckpointQueryBuilder(databaseName, tableName, uniqueKeyColumns) + require.NoError(t, err) + query, uniqueKeyArgs, err := builder.BuildQuery(valueArgs) + require.NoError(t, err) + expected := ` + insert /* gh-ost */ into mydb._tbl_ghk + (gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, + gh_ost_rows_copied, gh_ost_dml_applied, + name_min, position_min, my_very_long_column_that_is_64_utf8_characters_long_很长很长很长很长_min, + name_max, position_max, my_very_long_column_that_is_64_utf8_characters_long_很长很长很长很长_max) + values + (unix_timestamp(now()), ?, ?, + ?, ?, + ?, ?, ?, + ?, ?, ?) + ` + require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) + require.Equal(t, []interface{}{"mona", "mascot", int8(-17), "anothername", "anotherposition", int8(-2)}, uniqueKeyArgs) +} diff --git a/go/sql/types.go b/go/sql/types.go index aac52bc32..a01fb8bff 100644 --- a/go/sql/types.go +++ b/go/sql/types.go @@ -38,10 +38,12 @@ type CharacterSetConversion struct { } type Column struct { - Name string - IsUnsigned bool - IsVirtual bool - Charset string + Name string + IsUnsigned bool + IsVirtual bool + Charset string + // Type represents a subset of MySQL types + // used for mapping columns to golang values. Type ColumnType EnumValues string timezoneConversion *TimezoneConversion @@ -50,6 +52,9 @@ type Column struct { // https://github.com/github/gh-ost/issues/909 BinaryOctetLength uint charsetConversion *CharacterSetConversion + CharacterSetName string + Nullable bool + MySQLType string } func (this *Column) convertArg(arg interface{}, isUniqueKeyColumn bool) interface{} { @@ -345,3 +350,9 @@ func (this *ColumnValues) String() string { } return strings.Join(stringValues, ",") } + +func (this *ColumnValues) Clone() *ColumnValues { + cv := NewColumnValues(len(this.abstractValues)) + copy(cv.abstractValues, this.abstractValues) + return cv +} diff --git a/localtests/test.sh b/localtests/test.sh index 7fe9c2ab2..6776d227a 100755 --- a/localtests/test.sh +++ b/localtests/test.sh @@ -151,7 +151,7 @@ sysbench_prepare() { --mysql-password=opensesame \ --mysql-db=test \ --tables=1 \ - --table-size=10000 \ + --table-size=20000 \ prepare } @@ -169,7 +169,7 @@ sysbench_run_cmd() { --threads=2 \ --time=30 \ --report-interval=10 \ - --rate=500 \ + --rate=200 \ run" echo $cmd } @@ -254,7 +254,6 @@ test_single() { table_name="gh_ost_test" ghost_table_name="_gh_ost_test_gho" - trap cleanup EXIT INT TERM # test with sysbench oltp write load if [[ "$test_name" == "sysbench" ]]; then if ! command -v sysbench &>/dev/null; then @@ -273,6 +272,7 @@ test_single() { echo -n "Started sysbench (PID $sysbench_pid): " echo $load_cmd fi + trap cleanup SIGINT # cmd="GOTRACEBACK=crash $ghost_binary \ @@ -299,6 +299,7 @@ test_single() { --verbose \ --debug \ --stack \ + --checkpoint \ --execute ${extra_args[@]}" echo_dot echo $cmd >$exec_command_file