Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions doc/command-line-flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ It is not reliable to parse the `ALTER` statement to determine if it is instant
### binlogsyncer-max-reconnect-attempts
`--binlogsyncer-max-reconnect-attempts=0`, the maximum number of attempts to re-establish a broken inspector connection for sync binlog. `0` or `negative number` means infinite retry, default `0`

### checkpoint

`--checkpoint` enables periodic checkpoints of the gh-ost's state so that gh-ost can resume a migration from the checkpoint with `--resume`. Checkpoints are written to a separate table named `_${original_table_name}_ghk`. It is recommended to use with `--gtid` for checkpoints.
See also: [`resuming-migrations`](resume.md)

### checkpoint-seconds

`--checkpoint-seconds` specifies the seconds between checkpoints. Default is 300.

### conf

`--conf=/path/to/my.cnf`: file where credentials are specified. Should be in (or contain) the following format:
Expand Down Expand Up @@ -226,6 +235,11 @@ Optionally involve the process ID, for example: `--replica-server-id=$((10000000
It's on you to choose a number that does not collide with another `gh-ost` or another running replica.
See also: [`concurrent-migrations`](cheatsheet.md#concurrent-migrations) on the cheatsheet.

### resume

`--resume` attempts to resume a migration that was previously interrupted from the last checkpoint. The first `gh-ost` invocation must run with `--checkpoint` and have successfully written a checkpoint in order for `--resume` to work.
See also: [`resuming-migrations`](resume.md)

### serve-socket-file

Defaults to an auto-determined and advertised upon startup file. Defines Unix socket file to serve on.
Expand Down
42 changes: 42 additions & 0 deletions doc/resume.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Resuming Migrations

`gh-ost` can attempt to resume an interrupted migration from a checkpoint if the following conditions are met:
- The first `gh-ost` process was invoked with `--checkpoint`
- The first `gh-ost` process had at least one successful checkpoint
- The binlogs from the last checkpoint's binlog coordinates still exist on the replica gh-ost is inspecting (specified by `--host`)

To resume, invoke `gh-ost` again with the same arguments with the `--resume` flag.

> [!WARNING]
> It is recommended use `--checkpoint` with `--gtid` enabled so that checkpoint binlog coordinates store GTID sets rather than file positions. In that case, `gh-ost` can resume using a different replica than it originally attached to.

## Example
The migration starts with a `gh-ost` invocation such as:
```shell
gh-ost \
--chunk-size=100 \
--host=replica1.company.com \
--database="mydb" \
--table="mytable" \
--alter="add column mycol varchar(20)"
--gtid \
--checkpoint \
--checkpoint-seconds=60 \
--execute
```

In this example `gh-ost` writes a checkpoint to a table `_mytable_ghk` every 60 seconds. After `gh-ost` is interrupted/killed, the migration can be resumed with:
```shell
# resume migration
gh-ost \
--chunk-size=100
--host=replica1.company.com \
--database="mydb" \
--table="mytable" \
--alter="add column mycol varchar(20)"
--gtid \
--resume \
--execute
```

`gh-ost` then reconnects at the binlog coordinates of the last checkpoint and resumes copying rows at the chunk specified by the checkpoint. The data integrity of the ghost table is preserved because `gh-ost` applies row DMLs and copies row in an idempotent way.
13 changes: 13 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type MigrationContext struct {
GoogleCloudPlatform bool
AzureMySQL bool
AttemptInstantDDL bool
Resume bool

// SkipPortValidation allows skipping the port validation in `ValidateConnection`
// This is useful when connecting to a MySQL instance where the external port
Expand Down Expand Up @@ -153,6 +154,8 @@ type MigrationContext struct {
HooksHintToken string
HooksStatusIntervalSec int64
PanicOnWarnings bool
Checkpoint bool
CheckpointIntervalSeconds int64

DropServeSocket bool
ServeSocketFile string
Expand Down Expand Up @@ -239,6 +242,7 @@ type MigrationContext struct {
Iteration int64
MigrationIterationRangeMinValues *sql.ColumnValues
MigrationIterationRangeMaxValues *sql.ColumnValues
InitialStreamerCoords mysql.BinlogCoordinates
ForceTmpTableName string

IncludeTriggers bool
Expand Down Expand Up @@ -380,6 +384,15 @@ func (this *MigrationContext) GetChangelogTableName() string {
}
}

// GetCheckpointTableName generates the name of checkpoint table.
func (this *MigrationContext) GetCheckpointTableName() string {
if this.ForceTmpTableName != "" {
return getSafeTableName(this.ForceTmpTableName, "ghk")
} else {
return getSafeTableName(this.OriginalTableName, "ghk")
}
}

// GetVoluntaryLockName returns a name of a voluntary lock to be used throughout
// the swap-tables process.
func (this *MigrationContext) GetVoluntaryLockName() string {
Expand Down
7 changes: 7 additions & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ func main() {
flag.StringVar(&migrationContext.TriggerSuffix, "trigger-suffix", "", "Add a suffix to the trigger name (i.e '_v2'). Requires '--include-triggers'")
flag.BoolVar(&migrationContext.RemoveTriggerSuffix, "remove-trigger-suffix-if-exists", false, "Remove given suffix from name of trigger. Requires '--include-triggers' and '--trigger-suffix'")
flag.BoolVar(&migrationContext.SkipPortValidation, "skip-port-validation", false, "Skip port validation for MySQL connections")
flag.BoolVar(&migrationContext.Checkpoint, "checkpoint", false, "Enable migration checkpoints")
flag.Int64Var(&migrationContext.CheckpointIntervalSeconds, "checkpoint-seconds", 300, "The number of seconds between checkpoints")
flag.BoolVar(&migrationContext.Resume, "resume", false, "Attempt to resume migration from checkpoint")

maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'. When status exceeds threshold, app throttles writes")
criticalLoad := flag.String("critical-load", "", "Comma delimited status-name=threshold, same format as --max-load. When status exceeds threshold, app panics and quits")
Expand Down Expand Up @@ -284,6 +287,9 @@ func main() {
if *storageEngine == "rocksdb" {
migrationContext.Log.Warning("RocksDB storage engine support is experimental")
}
if migrationContext.CheckpointIntervalSeconds < 10 {
migrationContext.Log.Fatalf("--checkpoint-seconds should be >=10")
}

switch *cutOver {
case "atomic", "default", "":
Expand Down Expand Up @@ -316,6 +322,7 @@ func main() {
}
migrationContext.CliPassword = string(bytePassword)
}

migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis)
migrationContext.SetNiceRatio(*niceRatio)
migrationContext.SetChunkSize(*chunkSize)
Expand Down
147 changes: 142 additions & 5 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2021 GitHub Inc.
Copyright 2025 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/

Expand All @@ -21,6 +21,9 @@ import (
"context"
"database/sql/driver"

"errors"
"sync"

"github.com/github/gh-ost/go/mysql"
drivermysql "github.com/go-sql-driver/mysql"
"github.com/openark/golib/sqlutils"
Expand All @@ -31,6 +34,9 @@ const (
atomicCutOverMagicHint = "ghost-cut-over-sentry"
)

// ErrNoCheckpointFound is returned when an empty checkpoint table is queried.
var ErrNoCheckpointFound = errors.New("no checkpoint found in _ghk table")

type dmlBuildResult struct {
query string
args []interface{}
Expand Down Expand Up @@ -66,9 +72,17 @@ type Applier struct {
finishedMigrating int64
name string

dmlDeleteQueryBuilder *sql.DMLDeleteQueryBuilder
dmlInsertQueryBuilder *sql.DMLInsertQueryBuilder
dmlUpdateQueryBuilder *sql.DMLUpdateQueryBuilder
CurrentCoordinatesMutex sync.Mutex
CurrentCoordinates mysql.BinlogCoordinates

LastIterationRangeMutex sync.Mutex
LastIterationRangeMinValues *sql.ColumnValues
LastIterationRangeMaxValues *sql.ColumnValues

dmlDeleteQueryBuilder *sql.DMLDeleteQueryBuilder
dmlInsertQueryBuilder *sql.DMLInsertQueryBuilder
dmlUpdateQueryBuilder *sql.DMLUpdateQueryBuilder
checkpointInsertQueryBuilder *sql.CheckpointInsertQueryBuilder
}

func NewApplier(migrationContext *base.MigrationContext) *Applier {
Expand Down Expand Up @@ -144,6 +158,15 @@ func (this *Applier) prepareQueries() (err error) {
); err != nil {
return err
}
if this.migrationContext.Checkpoint {
if this.checkpointInsertQueryBuilder, err = sql.NewCheckpointQueryBuilder(
this.migrationContext.DatabaseName,
this.migrationContext.GetCheckpointTableName(),
&this.migrationContext.UniqueKey.Columns,
); err != nil {
return err
}
}
return nil
}

Expand Down Expand Up @@ -400,6 +423,54 @@ func (this *Applier) CreateChangelogTable() error {
return nil
}

// Create the checkpoint table to store the chunk copy and applier state.
// There are two sets of columns with the same types as the shared unique key,
// one for IterationMinValues and one for IterationMaxValues.
func (this *Applier) CreateCheckpointTable() error {
if err := this.DropCheckpointTable(); err != nil {
return err
}
colDefs := []string{
"`gh_ost_chk_id` bigint auto_increment primary key",
"`gh_ost_chk_timestamp` bigint",
"`gh_ost_chk_coords` varchar(4096)",
"`gh_ost_chk_iteration` bigint",
"`gh_ost_rows_copied` bigint",
"`gh_ost_dml_applied` bigint",
}
for _, col := range this.migrationContext.UniqueKey.Columns.Columns() {
if col.MySQLType == "" {
return fmt.Errorf("CreateCheckpoinTable: column %s has no type information. applyColumnTypes must be called", sql.EscapeName(col.Name))
}
minColName := sql.TruncateColumnName(col.Name, sql.MaxColumnNameLength-4) + "_min"
colDef := fmt.Sprintf("%s %s", sql.EscapeName(minColName), col.MySQLType)
if !col.Nullable {
colDef += " NOT NULL"
}
colDefs = append(colDefs, colDef)
}

for _, col := range this.migrationContext.UniqueKey.Columns.Columns() {
maxColName := sql.TruncateColumnName(col.Name, sql.MaxColumnNameLength-4) + "_max"
colDef := fmt.Sprintf("%s %s", sql.EscapeName(maxColName), col.MySQLType)
if !col.Nullable {
colDef += " NOT NULL"
}
colDefs = append(colDefs, colDef)
}

query := fmt.Sprintf("create /* gh-ost */ table %s.%s (\n %s\n)",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetCheckpointTableName()),
strings.Join(colDefs, ",\n "),
)
this.migrationContext.Log.Infof("Created checkpoint table")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
return nil
}

// dropTable drops a given table on the applied host
func (this *Applier) dropTable(tableName string) error {
query := fmt.Sprintf(`drop /* gh-ost */ table if exists %s.%s`,
Expand Down Expand Up @@ -494,6 +565,11 @@ func (this *Applier) DropChangelogTable() error {
return this.dropTable(this.migrationContext.GetChangelogTableName())
}

// DropCheckpointTable drops the checkpoint table on applier host
func (this *Applier) DropCheckpointTable() error {
return this.dropTable(this.migrationContext.GetCheckpointTableName())
}

// DropOldTable drops the _Old table on the applier host
func (this *Applier) DropOldTable() error {
return this.dropTable(this.migrationContext.GetOldTableName())
Expand Down Expand Up @@ -542,6 +618,60 @@ func (this *Applier) WriteChangelogState(value string) (string, error) {
return this.WriteAndLogChangelog("state", value)
}

// WriteCheckpoints writes a checkpoint to the _ghk table.
func (this *Applier) WriteCheckpoint(chk *Checkpoint) (int64, error) {
var insertId int64
uniqueKeyArgs := sqlutils.Args(chk.IterationRangeMin.AbstractValues()...)
uniqueKeyArgs = append(uniqueKeyArgs, chk.IterationRangeMax.AbstractValues()...)
query, uniqueKeyArgs, err := this.checkpointInsertQueryBuilder.BuildQuery(uniqueKeyArgs)
if err != nil {
return insertId, err
}
args := sqlutils.Args(chk.LastTrxCoords.String(), chk.Iteration, chk.RowsCopied, chk.DMLApplied)
args = append(args, uniqueKeyArgs...)
res, err := this.db.Exec(query, args...)
if err != nil {
return insertId, err
}
return res.LastInsertId()
}

func (this *Applier) ReadLastCheckpoint() (*Checkpoint, error) {
row := this.db.QueryRow(fmt.Sprintf(`select /* gh-ost */ * from %s.%s order by gh_ost_chk_id desc limit 1`, this.migrationContext.DatabaseName, this.migrationContext.GetCheckpointTableName()))
chk := &Checkpoint{
IterationRangeMin: sql.NewColumnValues(this.migrationContext.UniqueKey.Columns.Len()),
IterationRangeMax: sql.NewColumnValues(this.migrationContext.UniqueKey.Columns.Len()),
}

var coordStr string
var timestamp int64
ptrs := []interface{}{&chk.Id, &timestamp, &coordStr, &chk.Iteration, &chk.RowsCopied, &chk.DMLApplied}
ptrs = append(ptrs, chk.IterationRangeMin.ValuesPointers...)
ptrs = append(ptrs, chk.IterationRangeMax.ValuesPointers...)
err := row.Scan(ptrs...)
if err != nil {
if errors.Is(err, gosql.ErrNoRows) {
return nil, ErrNoCheckpointFound
}
return nil, err
}
chk.Timestamp = time.Unix(timestamp, 0)
if this.migrationContext.UseGTIDs {
gtidCoords, err := mysql.NewGTIDBinlogCoordinates(coordStr)
if err != nil {
return nil, err
}
chk.LastTrxCoords = gtidCoords
} else {
fileCoords, err := mysql.ParseFileBinlogCoordinates(coordStr)
if err != nil {
return nil, err
}
chk.LastTrxCoords = fileCoords
}
return chk, nil
}

// InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table.
// This is done asynchronously
func (this *Applier) InitiateHeartbeat() {
Expand Down Expand Up @@ -686,8 +816,15 @@ func (this *Applier) ReadMigrationRangeValues() error {
// CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values,
// which will be used for copying the next chunk of rows. Ir returns "false" if there is
// no further chunk to work through, i.e. we're past the last chunk and are done with
// iterating the range (and this done with copying row chunks)
// iterating the range (and thus done with copying row chunks)
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) {
this.LastIterationRangeMutex.Lock()
if this.migrationContext.MigrationIterationRangeMinValues != nil && this.migrationContext.MigrationIterationRangeMaxValues != nil {
this.LastIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMinValues.Clone()
this.LastIterationRangeMaxValues = this.migrationContext.MigrationIterationRangeMaxValues.Clone()
}
this.LastIterationRangeMutex.Unlock()

this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues
if this.migrationContext.MigrationIterationRangeMinValues == nil {
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
Expand Down
Loading