Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Online DDL: progress & ETA for Vreplication migrations #8015

Merged
merged 5 commits into from
Jun 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func TestSchemaChange(t *testing.T) {
uuid := testOnlineDDLStatement(t, alterTableSuccessfulStatement, "online", "vtgate", "vrepl_col")
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
testRows(t)
testMigrationRowCount(t, uuid)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false)
})
Expand All @@ -258,6 +259,7 @@ func TestSchemaChange(t *testing.T) {
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "online", "vtctl", "vrepl_col")
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
testRows(t)
testMigrationRowCount(t, uuid)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false)
})
Expand Down Expand Up @@ -372,6 +374,21 @@ func testRows(t *testing.T) {
require.Equal(t, countInserts, row.AsInt64("c", 0))
}

func testMigrationRowCount(t *testing.T, uuid string) {
insertMutex.Lock()
defer insertMutex.Unlock()

var totalRowsCopied uint64
// count sum of rows copied in all shards, that should be the total number of rows inserted to the table
rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
rowsCopied := row.AsUint64("rows_copied", 0)
totalRowsCopied += rowsCopied
}
require.Equal(t, uint64(countInserts), totalRowsCopied)
}

func testWithInitialSchema(t *testing.T) {
// Create 4 tables
var sqlQuery = "" //nolint
Expand Down
5 changes: 5 additions & 0 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,11 @@ func GenerateUpdatePos(uid uint32, pos mysql.Position, timeUpdated int64, txTime
"update _vt.vreplication set pos=%v, time_updated=%v, rows_copied=%v, message='' where id=%v", strGTID, timeUpdated, rowsCopied, uid)
}

// GenerateUpdateRowsCopied returns a statement to update the rows_copied value in the _vt.vreplication table.
func GenerateUpdateRowsCopied(uid uint32, rowsCopied int64) string {
return fmt.Sprintf("update _vt.vreplication set rows_copied=%v where id=%v", rowsCopied, uid)
}

// GenerateUpdateTime returns a statement to update time_updated in the _vt.vreplication table.
func GenerateUpdateTime(uid uint32, timeUpdated int64) (string, error) {
if timeUpdated == 0 {
Expand Down
52 changes: 48 additions & 4 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er
}()

// Tables are now swapped! Migration is successful
_ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull, etaSecondsNow, rowsCopiedUnknown)
_ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull, etaSecondsNow, s.rowsCopied)
return nil

// deferred function will re-enable writes now
Expand Down Expand Up @@ -789,7 +789,9 @@ func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schem
if err := v.analyze(ctx, conn); err != nil {
return err
}

if err := e.updateMigrationTableRows(ctx, onlineDDL.UUID, v.tableRows); err != nil {
return err
}
if revertMigration == nil {
// Original ALTER TABLE request for vreplication
if err := e.validateTableForAlterAction(ctx, onlineDDL); err != nil {
Expand Down Expand Up @@ -2110,6 +2112,7 @@ func (e *Executor) readVReplStream(ctx context.Context, uuid string, okIfMissing
transactionTimestamp: row.AsInt64("transaction_timestamp", 0),
state: row.AsString("state", ""),
message: row.AsString("message", ""),
rowsCopied: row.AsInt64("rows_copied", 0),
bls: &binlogdatapb.BinlogSource{},
}
if err := prototext.Unmarshal([]byte(s.source), s.bls); err != nil {
Expand Down Expand Up @@ -2174,6 +2177,7 @@ func (e *Executor) isVReplMigrationReadyToCutOver(ctx context.Context, s *VReplS
return false, nil
}
}

return true, nil
}

Expand Down Expand Up @@ -2232,6 +2236,11 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
// migrationMutex lock and it's now safe to ensure vreplMigrationRunning is 1
atomic.StoreInt64(&e.vreplMigrationRunning, 1)
_ = e.updateMigrationTimestamp(ctx, "liveness_timestamp", uuid)

_ = e.updateRowsCopied(ctx, uuid, s.rowsCopied)
_ = e.updateMigrationProgressByRowsCopied(ctx, uuid, s.rowsCopied)
_ = e.updateMigrationETASecondsByProgress(ctx, uuid)

isReady, err := e.isVReplMigrationReadyToCutOver(ctx, s)
if err != nil {
return countRunnning, cancellable, err
Expand Down Expand Up @@ -2579,7 +2588,7 @@ func (e *Executor) updateMySQLTable(ctx context.Context, uuid string, tableName
return err
}

func (e *Executor) updateETASeconds(ctx context.Context, uuid string, etaSeconds int64) error {
func (e *Executor) updateMigrationETASeconds(ctx context.Context, uuid string, etaSeconds int64) error {
query, err := sqlparser.ParseAndBind(sqlUpdateMigrationETASeconds,
sqltypes.Int64BindVariable(etaSeconds),
sqltypes.StringBindVariable(uuid),
Expand Down Expand Up @@ -2609,6 +2618,41 @@ func (e *Executor) updateMigrationProgress(ctx context.Context, uuid string, pro
return err
}

func (e *Executor) updateMigrationProgressByRowsCopied(ctx context.Context, uuid string, rowsCopied int64) error {
query, err := sqlparser.ParseAndBind(sqlUpdateMigrationProgressByRowsCopied,
sqltypes.Int64BindVariable(rowsCopied),
sqltypes.StringBindVariable(uuid),
)
if err != nil {
return err
}
_, err = e.execQuery(ctx, query)
return err
}

func (e *Executor) updateMigrationETASecondsByProgress(ctx context.Context, uuid string) error {
query, err := sqlparser.ParseAndBind(sqlUpdateMigrationETASecondsByProgress,
sqltypes.StringBindVariable(uuid),
)
if err != nil {
return err
}
_, err = e.execQuery(ctx, query)
return err
}

func (e *Executor) updateMigrationTableRows(ctx context.Context, uuid string, tableRows int64) error {
query, err := sqlparser.ParseAndBind(sqlUpdateMigrationTableRows,
sqltypes.Int64BindVariable(tableRows),
sqltypes.StringBindVariable(uuid),
)
if err != nil {
return err
}
_, err = e.execQuery(ctx, query)
return err
}

func (e *Executor) updateRowsCopied(ctx context.Context, uuid string, rowsCopied int64) error {
if rowsCopied <= 0 {
// Number of rows can only be positive. Zero or negative must mean "no information" and
Expand Down Expand Up @@ -2769,7 +2813,7 @@ func (e *Executor) onSchemaMigrationStatus(ctx context.Context,
if err = e.updateMigrationProgress(ctx, uuid, progressPct); err != nil {
return err
}
if err = e.updateETASeconds(ctx, uuid, etaSeconds); err != nil {
if err = e.updateMigrationETASeconds(ctx, uuid, etaSeconds); err != nil {
return err
}
if err := e.updateRowsCopied(ctx, uuid, rowsCopied); err != nil {
Expand Down
32 changes: 31 additions & 1 deletion go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
alterSchemaMigrationsTableTableCompleteIndex = "ALTER TABLE _vt.schema_migrations add KEY table_complete_idx (migration_status, keyspace(64), mysql_table(64), completed_timestamp)"
alterSchemaMigrationsTableETASeconds = "ALTER TABLE _vt.schema_migrations add column eta_seconds bigint NOT NULL DEFAULT -1"
alterSchemaMigrationsTableRowsCopied = "ALTER TABLE _vt.schema_migrations add column rows_copied bigint unsigned NOT NULL DEFAULT 0"
alterSchemaMigrationsTableTableRows = "ALTER TABLE _vt.schema_migrations add column table_rows bigint NOT NULL DEFAULT 0"

sqlInsertMigration = `INSERT IGNORE INTO _vt.schema_migrations (
migration_uuid,
Expand Down Expand Up @@ -151,6 +152,32 @@ const (
WHERE
migration_uuid=%a
`
sqlUpdateMigrationTableRows = `UPDATE _vt.schema_migrations
SET table_rows=%a
WHERE
migration_uuid=%a
`
sqlUpdateMigrationProgressByRowsCopied = `UPDATE _vt.schema_migrations
SET
progress=CASE
WHEN table_rows=0 THEN 100
ELSE LEAST(100, 100*%a/table_rows)
END
WHERE
migration_uuid=%a
`
sqlUpdateMigrationETASecondsByProgress = `UPDATE _vt.schema_migrations
SET
eta_seconds=CASE
WHEN progress=0 THEN -1
WHEN table_rows=0 THEN 0
ELSE GREATEST(0,
TIMESTAMPDIFF(SECOND, started_timestamp, NOW())*((100/progress)-1)
)
END
WHERE
migration_uuid=%a
`
sqlRetryMigrationWhere = `UPDATE _vt.schema_migrations
SET
migration_status='queued',
Expand Down Expand Up @@ -330,6 +357,7 @@ const (
sqlDropTable = "DROP TABLE `%a`"
sqlAlterTableOptions = "ALTER TABLE `%a` %s"
sqlShowColumnsFrom = "SHOW COLUMNS FROM `%a`"
sqlShowTableStatus = "SHOW TABLE STATUS LIKE '%a'"
sqlGetAutoIncrement = `
SELECT
AUTO_INCREMENT
Expand All @@ -351,7 +379,8 @@ const (
time_updated,
transaction_timestamp,
state,
message
message,
rows_copied
FROM _vt.vreplication
WHERE
workflow=%a
Expand Down Expand Up @@ -402,4 +431,5 @@ var applyDDL = []string{
alterSchemaMigrationsTableTableCompleteIndex,
alterSchemaMigrationsTableETASeconds,
alterSchemaMigrationsTableRowsCopied,
alterSchemaMigrationsTableTableRows,
}
23 changes: 22 additions & 1 deletion go/vt/vttablet/onlineddl/vrepl.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type VReplStream struct {
transactionTimestamp int64
state string
message string
rowsCopied int64
bls *binlogdatapb.BinlogSource
}

Expand All @@ -64,6 +65,7 @@ type VRepl struct {
targetTable string
pos string
alterOptions string
tableRows int64

sharedPKColumns *vrepl.ColumnList

Expand Down Expand Up @@ -173,6 +175,21 @@ func (v *VRepl) readTableColumns(ctx context.Context, conn *dbconnpool.DBConnect
return vrepl.NewColumnList(columnNames), vrepl.NewColumnList(virtualColumnNames), vrepl.NewColumnList(pkColumnNames), nil
}

// readTableStatus reads table status information
func (v *VRepl) readTableStatus(ctx context.Context, conn *dbconnpool.DBConnection, tableName string) (tableRows int64, err error) {
parsed := sqlparser.BuildParsedQuery(sqlShowTableStatus, tableName)
rs, err := conn.ExecuteFetch(parsed.Query, math.MaxInt64, true)
if err != nil {
return 0, err
}
row := rs.Named().Row()
if row == nil {
return 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Cannot SHOW TABLE STATUS LIKE '%s'", tableName)
}
tableRows, err = row.ToInt64("Rows")
return tableRows, err
}

// applyColumnTypes
func (v *VRepl) applyColumnTypes(ctx context.Context, conn *dbconnpool.DBConnection, tableName string, columnsLists ...*vrepl.ColumnList) error {
query, err := sqlparser.ParseAndBind(sqlSelectColumnTypes,
Expand Down Expand Up @@ -332,7 +349,11 @@ func (v *VRepl) analyzeAlter(ctx context.Context) error {
return nil
}

func (v *VRepl) analyzeTables(ctx context.Context, conn *dbconnpool.DBConnection) error {
func (v *VRepl) analyzeTables(ctx context.Context, conn *dbconnpool.DBConnection) (err error) {
v.tableRows, err = v.readTableStatus(ctx, conn, v.sourceTable)
if err != nil {
return err
}
// columns:
sourceColumns, sourceVirtualColumns, sourcePKColumns, err := v.readTableColumns(ctx, conn, v.sourceTable)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ var tabletTypesStr = flag.String("vreplication_tablet_type", "MASTER,REPLICA", "
// stop replicating.
var waitRetryTime = 1 * time.Second

// How frequently vcopier will update _vt.vreplication rows_copied
var rowsCopiedUpdateInterval = 30 * time.Second

// Engine is the engine for handling vreplication.
type Engine struct {
// mu synchronizes isOpen, cancelRetry, controllers and wg.
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,19 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma
lastpkpb = sqltypes.ResultToProto3(lastpkqr)
}

rowsCopiedTicker := time.NewTicker(rowsCopiedUpdateInterval)
defer rowsCopiedTicker.Stop()

var pkfields []*querypb.Field
var updateCopyState *sqlparser.ParsedQuery
var bv map[string]*querypb.BindVariable
var sqlbuffer bytes2.Buffer
err = vc.vr.sourceVStreamer.VStreamRows(ctx, initialPlan.SendRule.Filter, lastpkpb, func(rows *binlogdatapb.VStreamRowsResponse) error {
for {
select {
case <-rowsCopiedTicker.C:
update := binlogplayer.GenerateUpdateRowsCopied(vc.vr.id, vc.vr.stats.CopyRowCount.Get())
_, _ = vc.vr.dbClient.Execute(update)
case <-ctx.Done():
return io.EOF
default:
Expand Down