From 0bdcaa35c319c1253609fd0e2eda888e13060b74 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 27 Apr 2021 15:44:48 +0300 Subject: [PATCH 1/8] VReplication: tracking rows_copied Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/binlog/binlogplayer/binlog_player.go | 13 +++++++------ go/vt/binlog/binlogplayer/binlog_player_test.go | 8 ++++---- go/vt/vttablet/onlineddl/executor.go | 4 ++-- .../vttablet/tabletmanager/vreplication/vcopier.go | 4 ++-- .../vttablet/tabletmanager/vreplication/vplayer.go | 14 +++++++------- go/vt/vttablet/tabletserver/vstreamer/vstreamer.go | 3 --- 6 files changed, 22 insertions(+), 24 deletions(-) diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index 0caa6dfc907..3d113558ad9 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -476,7 +476,7 @@ func (blp *BinlogPlayer) writeRecoveryPosition(tx *binlogdatapb.BinlogTransactio } now := time.Now().Unix() - updateRecovery := GenerateUpdatePos(blp.uid, position, now, tx.EventToken.Timestamp) + updateRecovery := GenerateUpdatePos(blp.uid, position, now, tx.EventToken.Timestamp, blp.blplStats.CopyRowCount.Get()) qr, err := blp.exec(updateRecovery) if err != nil { @@ -554,6 +554,7 @@ var AlterVReplicationTable = []string{ "ALTER TABLE _vt.vreplication ADD COLUMN db_name VARBINARY(255) NOT NULL", "ALTER TABLE _vt.vreplication MODIFY source BLOB NOT NULL", "ALTER TABLE _vt.vreplication ADD KEY workflow_idx (workflow(64))", + "ALTER TABLE _vt.vreplication ADD COLUMN rows_copied BIGINT(20) NOT NULL DEFAULT 0", } // VRSettings contains the settings of a vreplication table. @@ -624,16 +625,16 @@ func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource, // GenerateUpdatePos returns a statement to update a value in the // _vt.vreplication table. -func GenerateUpdatePos(uid uint32, pos mysql.Position, timeUpdated int64, txTimestamp int64) string { +func GenerateUpdatePos(uid uint32, pos mysql.Position, timeUpdated int64, txTimestamp int64, rowsCopied int64) string { if txTimestamp != 0 { return fmt.Sprintf( - "update _vt.vreplication set pos=%v, time_updated=%v, transaction_timestamp=%v, message='' where id=%v", - encodeString(mysql.EncodePosition(pos)), timeUpdated, txTimestamp, uid) + "update _vt.vreplication set pos=%v, time_updated=%v, transaction_timestamp=%v, rows_copied=%v, message='' where id=%v", + encodeString(mysql.EncodePosition(pos)), timeUpdated, txTimestamp, rowsCopied, uid) } return fmt.Sprintf( - "update _vt.vreplication set pos=%v, time_updated=%v, message='' where id=%v", - encodeString(mysql.EncodePosition(pos)), timeUpdated, uid) + "update _vt.vreplication set pos=%v, time_updated=%v, rows_copied=%v, message='' where id=%v", + encodeString(mysql.EncodePosition(pos)), timeUpdated, rowsCopied, uid) } // GenerateUpdateTime returns a statement to update time_updated in the _vt.vreplication table. diff --git a/go/vt/binlog/binlogplayer/binlog_player_test.go b/go/vt/binlog/binlogplayer/binlog_player_test.go index df3000862eb..f937734d1dc 100644 --- a/go/vt/binlog/binlogplayer/binlog_player_test.go +++ b/go/vt/binlog/binlogplayer/binlog_player_test.go @@ -358,10 +358,10 @@ func TestCreateVReplicationTables(t *testing.T) { func TestUpdateVReplicationPos(t *testing.T) { gtid := mysql.MustParseGTID("MariaDB", "0-1-8283") want := "update _vt.vreplication " + - "set pos='MariaDB/0-1-8283', time_updated=88822, message='' " + + "set pos='MariaDB/0-1-8283', time_updated=88822, rows_copied=0, message='' " + "where id=78522" - got := GenerateUpdatePos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 0) + got := GenerateUpdatePos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 0, 0) if got != want { t.Errorf("updateVReplicationPos() = %#v, want %#v", got, want) } @@ -370,10 +370,10 @@ func TestUpdateVReplicationPos(t *testing.T) { func TestUpdateVReplicationTimestamp(t *testing.T) { gtid := mysql.MustParseGTID("MariaDB", "0-2-582") want := "update _vt.vreplication " + - "set pos='MariaDB/0-2-582', time_updated=88822, transaction_timestamp=481828, message='' " + + "set pos='MariaDB/0-2-582', time_updated=88822, transaction_timestamp=481828, rows_copied=0, message='' " + "where id=78522" - got := GenerateUpdatePos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 481828) + got := GenerateUpdatePos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 481828, 0) if got != want { t.Errorf("updateVReplicationPos() = %#v, want %#v", got, want) } diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index ab3db8bb475..146b1ee87c3 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -105,7 +105,7 @@ var ghostOverridePath = flag.String("gh-ost-path", "", "override default gh-ost var ptOSCOverridePath = flag.String("pt-osc-path", "", "override default pt-online-schema-change binary full path") var migrationCheckInterval = flag.Duration("migration_check_interval", 1*time.Minute, "Interval between migration checks") var retainOnlineDDLTables = flag.Duration("retain_online_ddl_tables", 24*time.Hour, "How long should vttablet keep an old migrated table before purging it") -var migrationNextCheckIntervals = []time.Duration{1 * time.Second, 5 * time.Second} +var migrationNextCheckIntervals = []time.Duration{1 * time.Second, 5 * time.Second, 10 * time.Second, 20 * time.Second} const ( maxPasswordLength = 32 // MySQL's *replication* password may not exceed 32 characters @@ -1635,12 +1635,12 @@ func (e *Executor) evaluateDeclarativeDiff(ctx context.Context, onlineDDL *schem // - what's the migration strategy? // The function invokes the appropriate handlers for each of those cases. func (e *Executor) executeMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) error { + defer e.triggerNextCheckInterval() failMigration := func(err error) error { _ = e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed) if err != nil { _ = e.updateMigrationMessage(ctx, onlineDDL.UUID, err.Error()) } - e.triggerNextCheckInterval() return err } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go index c9e796550fb..bc3e62b73a9 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go @@ -342,8 +342,8 @@ func (vc *vcopier) fastForward(ctx context.Context, copyState map[string]*sqltyp return err } if settings.StartPos.IsZero() { - update := binlogplayer.GenerateUpdatePos(vc.vr.id, pos, time.Now().Unix(), 0) - _, err := vc.vr.dbClient.Execute(update) + update := binlogplayer.GenerateUpdatePos(vc.vr.id, pos, time.Now().Unix(), 0, vc.vr.stats.CopyRowCount.Get()) + _, err := withDDL.Exec(vc.vr.vre.ctx, update, vc.vr.dbClient.Execute) return err } return newVPlayer(vc.vr, settings, copyState, pos, "fastforward").play(ctx) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 1ec20f87c9a..2961891d7e6 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -199,7 +199,7 @@ func (vp *vplayer) applyStmtEvent(ctx context.Context, event *binlogdatapb.VEven } if event.Type == binlogdatapb.VEventType_SAVEPOINT || vp.canAcceptStmtEvents { start := time.Now() - _, err := vp.vr.dbClient.ExecuteWithRetry(ctx, sql) + _, err := withDDL.Exec(vp.vr.vre.ctx, sql, vp.vr.dbClient.ExecuteWithRetry) vp.vr.stats.QueryTimings.Record(vp.phase, start) vp.vr.stats.QueryCount.Add(vp.phase, 1) return err @@ -216,7 +216,7 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row _, err := tplan.applyChange(change, func(sql string) (*sqltypes.Result, error) { stats := NewVrLogStats("ROWCHANGE") start := time.Now() - qr, err := vp.vr.dbClient.ExecuteWithRetry(ctx, sql) + qr, err := withDDL.Exec(vp.vr.vre.ctx, sql, vp.vr.dbClient.ExecuteWithRetry) vp.vr.stats.QueryCount.Add(vp.phase, 1) vp.vr.stats.QueryTimings.Record(vp.phase, start) stats.Send(sql) @@ -231,8 +231,8 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row func (vp *vplayer) updatePos(ts int64) (posReached bool, err error) { vp.numAccumulatedHeartbeats = 0 - update := binlogplayer.GenerateUpdatePos(vp.vr.id, vp.pos, time.Now().Unix(), ts) - if _, err := vp.vr.dbClient.Execute(update); err != nil { + update := binlogplayer.GenerateUpdatePos(vp.vr.id, vp.pos, time.Now().Unix(), ts, vp.vr.stats.CopyRowCount.Get()) + if _, err := withDDL.Exec(vp.vr.vre.ctx, update, vp.vr.dbClient.Execute); err != nil { return false, fmt.Errorf("error %v updating position", err) } vp.unsavedEvent = nil @@ -255,7 +255,7 @@ func (vp *vplayer) updateCurrentTime(tm int64) error { if err != nil { return err } - if _, err := vp.vr.dbClient.Execute(update); err != nil { + if _, err := withDDL.Exec(vp.vr.vre.ctx, update, vp.vr.dbClient.Execute); err != nil { return fmt.Errorf("error %v updating time", err) } return nil @@ -550,7 +550,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m // So, we apply the DDL first, and then save the position. // Manual intervention may be needed if there is a partial // failure here. - if _, err := vp.vr.dbClient.ExecuteWithRetry(ctx, event.Statement); err != nil { + if _, err := withDDL.Exec(vp.vr.vre.ctx, event.Statement, vp.vr.dbClient.ExecuteWithRetry); err != nil { return err } stats.Send(fmt.Sprintf("%v", event.Statement)) @@ -562,7 +562,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m return io.EOF } case binlogdatapb.OnDDLAction_EXEC_IGNORE: - if _, err := vp.vr.dbClient.ExecuteWithRetry(ctx, event.Statement); err != nil { + if _, err := withDDL.Exec(vp.vr.vre.ctx, event.Statement, vp.vr.dbClient.ExecuteWithRetry); err != nil { log.Infof("Ignoring error: %v for DDL: %s", err, event.Statement) } stats.Send(fmt.Sprintf("%v", event.Statement)) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 0252e3fa333..a72fbab2d37 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -80,9 +80,6 @@ type vstreamer struct { vse *Engine } -// CopyState contains the last PK for tables to be copied -type CopyState map[string][]*sqltypes.Result - // streamerPlan extends the original plan to also include // the TableMap, which comes from the binlog. It's used // to extract values from the ROW events. From 0b1968fb342e3ca08738344b564af3664b67fa09 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 27 Apr 2021 16:45:35 +0300 Subject: [PATCH 2/8] fix engine_test Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/tabletmanager/vreplication/engine_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go index 1fddd276412..ad6894dceb3 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go @@ -498,6 +498,7 @@ func TestCreateDBAndTable(t *testing.T) { dbClient.ExpectRequestRE("ALTER TABLE _vt.vreplication ADD COLUMN db_name.*", &sqltypes.Result{}, nil) dbClient.ExpectRequestRE("ALTER TABLE _vt.vreplication MODIFY source.*", &sqltypes.Result{}, nil) dbClient.ExpectRequestRE("ALTER TABLE _vt.vreplication ADD KEY.*", &sqltypes.Result{}, nil) + dbClient.ExpectRequestRE("ALTER TABLE _vt.vreplication ADD COLUMN rows_copied.*", &sqltypes.Result{}, nil) dbClient.ExpectRequestRE("create table if not exists _vt.resharding_journal.*", &sqltypes.Result{}, nil) dbClient.ExpectRequestRE("create table if not exists _vt.copy_state.*", &sqltypes.Result{}, nil) } From 6bd98b034ed00996f18babde20aab08b24cb2854 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 27 Apr 2021 18:02:46 +0300 Subject: [PATCH 3/8] testing: support query list preceeded with WithDDL, or not preceeded... Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../vreplication/framework_test.go | 32 ++++++++++++------- go/vt/withddl/withddl.go | 5 +++ 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 6531821651a..787465cbbc0 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -460,11 +460,17 @@ func expectLogsAndUnsubscribe(t *testing.T, logs []LogExpectation, logCh chan in } func expectDBClientQueries(t *testing.T, queries []string) { + queriesWithDDLs := append(withDDL.DDLs(), queries...) + // Either 'queries' or 'queriesWithDDLs' must match globalDBQueries t.Helper() failed := false - for i, query := range queries { + for i, queryWithDDL := range queriesWithDDLs { + query := "N/A" + if i < len(queries) { + query = queries[i] + } if failed { - t.Errorf("no query received, expecting %s", query) + t.Errorf("no query received, expecting %s or %s", query, queryWithDDL) continue } var got string @@ -477,21 +483,23 @@ func expectDBClientQueries(t *testing.T, queries []string) { goto retry } - var match bool - if query[0] == '/' { - result, err := regexp.MatchString(query[1:], got) - if err != nil { - panic(err) + matchQuery := func(q string) bool { + if q[0] == '/' { + result, err := regexp.MatchString(q[1:], got) + if err != nil { + panic(err) + } + return result } - match = result - } else { - match = (got == query) + return (got == q) } + + match := matchQuery(query) || matchQuery(queryWithDDL) if !match { - t.Errorf("query:\n%q, does not match query %d:\n%q", got, i, query) + t.Errorf("query:\n%q, does not match query %d:\n%q, or\n%q", got, i, query, queryWithDDL) } case <-time.After(5 * time.Second): - t.Errorf("no query received, expecting %s", query) + t.Errorf("no query received, expecting %s or %s", query, queryWithDDL) failed = true } } diff --git a/go/vt/withddl/withddl.go b/go/vt/withddl/withddl.go index 92526d5055d..fe5fcfd9566 100644 --- a/go/vt/withddl/withddl.go +++ b/go/vt/withddl/withddl.go @@ -45,6 +45,11 @@ func New(ddls []string) *WithDDL { } } +// DDLs returns a copy of the ddls +func (wd *WithDDL) DDLs() []string { + return wd.ddls[:] +} + // Exec executes the query using the supplied function. // If there are any schema errors, it applies the DDLs and retries. // Funcs can be any of these types: From e2a5aa156548b0ee3e86286428df46a81f2ab4f3 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 27 Apr 2021 21:27:57 +0300 Subject: [PATCH 4/8] attempt to fix unit test Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../vreplication/framework_test.go | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 787465cbbc0..cf899c4718f 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -460,17 +460,13 @@ func expectLogsAndUnsubscribe(t *testing.T, logs []LogExpectation, logCh chan in } func expectDBClientQueries(t *testing.T, queries []string) { - queriesWithDDLs := append(withDDL.DDLs(), queries...) + ddls := withDDL.DDLs() // Either 'queries' or 'queriesWithDDLs' must match globalDBQueries t.Helper() failed := false - for i, queryWithDDL := range queriesWithDDLs { - query := "N/A" - if i < len(queries) { - query = queries[i] - } + for i, query := range queries { if failed { - t.Errorf("no query received, expecting %s or %s", query, queryWithDDL) + t.Errorf("no query received, expecting %s", query) continue } var got string @@ -482,24 +478,28 @@ func expectDBClientQueries(t *testing.T, queries []string) { if heartbeatRe.MatchString(got) { goto retry } + for _, ddl := range ddls { + if got == ddl { + goto retry + } + } - matchQuery := func(q string) bool { - if q[0] == '/' { - result, err := regexp.MatchString(q[1:], got) + matchQuery := func() bool { + if query[0] == '/' { + result, err := regexp.MatchString(query[1:], got) if err != nil { panic(err) } return result } - return (got == q) + return (got == query) } - match := matchQuery(query) || matchQuery(queryWithDDL) - if !match { - t.Errorf("query:\n%q, does not match query %d:\n%q, or\n%q", got, i, query, queryWithDDL) + if !matchQuery() { + t.Errorf("query:\n%q, does not match query %d:\n%q", got, i, query) } case <-time.After(5 * time.Second): - t.Errorf("no query received, expecting %s or %s", query, queryWithDDL) + t.Errorf("no query received, expecting %s", query) failed = true } } From 6677c051b6be3df3883a3216f3d69ffc3dd1cca3 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 28 Apr 2021 08:15:36 +0300 Subject: [PATCH 5/8] align back with original match logic/code Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../vreplication/framework_test.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index cf899c4718f..a8880a23576 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -484,18 +484,17 @@ func expectDBClientQueries(t *testing.T, queries []string) { } } - matchQuery := func() bool { - if query[0] == '/' { - result, err := regexp.MatchString(query[1:], got) - if err != nil { - panic(err) - } - return result + var match bool + if query[0] == '/' { + result, err := regexp.MatchString(query[1:], got) + if err != nil { + panic(err) } - return (got == query) + match = result + } else { + match = (got == query) } - - if !matchQuery() { + if !match { t.Errorf("query:\n%q, does not match query %d:\n%q", got, i, query) } case <-time.After(5 * time.Second): From c1779bfc19261b2c46258066420ae8ba857434ab Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 28 Apr 2021 11:25:43 +0300 Subject: [PATCH 6/8] ensure to run withDDL exactly once upon initialization Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/binlog/binlogplayer/binlog_player.go | 5 +++++ .../tabletmanager/vreplication/controller.go | 5 +++++ .../tabletmanager/vreplication/controller_test.go | 11 +++++++++++ go/vt/vttablet/tabletmanager/vreplication/engine.go | 3 +++ .../tabletmanager/vreplication/framework_test.go | 7 ++++--- go/vt/vttablet/tabletmanager/vreplication/vcopier.go | 2 +- go/vt/vttablet/tabletmanager/vreplication/vplayer.go | 12 ++++++------ 7 files changed, 35 insertions(+), 10 deletions(-) diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index 3d113558ad9..eebaa736bfa 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -557,6 +557,11 @@ var AlterVReplicationTable = []string{ "ALTER TABLE _vt.vreplication ADD COLUMN rows_copied BIGINT(20) NOT NULL DEFAULT 0", } +var WithDDLInitialQueries = []string{ + "SELECT db_name FROM _vt.vreplication LIMIT 0", + "SELECT rows_copied FROM _vt.vreplication LIMIT 0", +} + // VRSettings contains the settings of a vreplication table. type VRSettings struct { StartPos mysql.Position diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 5a35f346f27..319d287eaf4 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -197,6 +197,11 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { if err := dbClient.Connect(); err != nil { return vterrors.Wrap(err, "can't connect to database") } + for _, query := range withDDLInitialQueries { + if _, err := withDDL.Exec(ctx, query, dbClient.ExecuteFetch); err != nil { + log.Errorf("cannot apply withDDL init query '%s': %v", query, err) + } + } defer dbClient.Close() var tablet *topodatapb.Tablet diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go index f47a113376e..3fb6647d075 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go @@ -54,6 +54,12 @@ var ( testPos = "MariaDB/0-1-1083" ) +func expectWithDDLInitialQueries(dbClient *binlogplayer.MockDBClient) { + for _, query := range withDDLInitialQueries { + dbClient.ExpectRequest(query, nil, nil) + } +} + func TestControllerKeyRange(t *testing.T) { resetBinlogClient() wantTablet := addTablet(100) @@ -66,6 +72,7 @@ func TestControllerKeyRange(t *testing.T) { } dbClient := binlogplayer.NewMockDBClient(t) + expectWithDDLInitialQueries(dbClient) dbClient.ExpectRequestRE("update _vt.vreplication set message='Picked source tablet.*", testDMLResponse, nil) dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil) dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil) @@ -102,6 +109,7 @@ func TestControllerTables(t *testing.T) { } dbClient := binlogplayer.NewMockDBClient(t) + expectWithDDLInitialQueries(dbClient) dbClient.ExpectRequestRE("update _vt.vreplication set message='Picked source tablet.*", testDMLResponse, nil) dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil) dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil) @@ -195,6 +203,7 @@ func TestControllerOverrides(t *testing.T) { } dbClient := binlogplayer.NewMockDBClient(t) + expectWithDDLInitialQueries(dbClient) dbClient.ExpectRequestRE("update _vt.vreplication set message='Picked source tablet.*", testDMLResponse, nil) dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil) dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil) @@ -260,6 +269,7 @@ func TestControllerRetry(t *testing.T) { } dbClient := binlogplayer.NewMockDBClient(t) + expectWithDDLInitialQueries(dbClient) dbClient.ExpectRequestRE("update _vt.vreplication set message='Picked source tablet.*", testDMLResponse, nil) dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil) dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", nil, errors.New("(expected error)")) @@ -295,6 +305,7 @@ func TestControllerStopPosition(t *testing.T) { } dbClient := binlogplayer.NewMockDBClient(t) + expectWithDDLInitialQueries(dbClient) dbClient.ExpectRequestRE("update _vt.vreplication set message='Picked source tablet.*", testDMLResponse, nil) dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil) withStop := &sqltypes.Result{ diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index 5a4114e37ab..6dbdeaac666 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -63,6 +63,7 @@ const ( ) var withDDL *withddl.WithDDL +var withDDLInitialQueries []string const ( throttlerAppName = "vreplication" @@ -73,6 +74,8 @@ func init() { allddls = append(allddls, binlogplayer.AlterVReplicationTable...) allddls = append(allddls, createReshardingJournalTable, createCopyState) withDDL = withddl.New(allddls) + + withDDLInitialQueries = append(withDDLInitialQueries, binlogplayer.WithDDLInitialQueries...) } // this are the default tablet_types that will be used by the tablet picker to find sources for a vreplication stream diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index a8880a23576..41526f4f517 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -460,7 +460,8 @@ func expectLogsAndUnsubscribe(t *testing.T, logs []LogExpectation, logCh chan in } func expectDBClientQueries(t *testing.T, queries []string) { - ddls := withDDL.DDLs() + extraQueries := withDDL.DDLs() + extraQueries = append(extraQueries, withDDLInitialQueries...) // Either 'queries' or 'queriesWithDDLs' must match globalDBQueries t.Helper() failed := false @@ -478,8 +479,8 @@ func expectDBClientQueries(t *testing.T, queries []string) { if heartbeatRe.MatchString(got) { goto retry } - for _, ddl := range ddls { - if got == ddl { + for _, extraQuery := range extraQueries { + if got == extraQuery { goto retry } } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go index bc3e62b73a9..f55d7f44e8b 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go @@ -343,7 +343,7 @@ func (vc *vcopier) fastForward(ctx context.Context, copyState map[string]*sqltyp } if settings.StartPos.IsZero() { update := binlogplayer.GenerateUpdatePos(vc.vr.id, pos, time.Now().Unix(), 0, vc.vr.stats.CopyRowCount.Get()) - _, err := withDDL.Exec(vc.vr.vre.ctx, update, vc.vr.dbClient.Execute) + _, err := vc.vr.dbClient.Execute(update) return err } return newVPlayer(vc.vr, settings, copyState, pos, "fastforward").play(ctx) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 2961891d7e6..ff3f40a11b7 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -199,7 +199,7 @@ func (vp *vplayer) applyStmtEvent(ctx context.Context, event *binlogdatapb.VEven } if event.Type == binlogdatapb.VEventType_SAVEPOINT || vp.canAcceptStmtEvents { start := time.Now() - _, err := withDDL.Exec(vp.vr.vre.ctx, sql, vp.vr.dbClient.ExecuteWithRetry) + _, err := vp.vr.dbClient.ExecuteWithRetry(ctx, sql) vp.vr.stats.QueryTimings.Record(vp.phase, start) vp.vr.stats.QueryCount.Add(vp.phase, 1) return err @@ -216,7 +216,7 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row _, err := tplan.applyChange(change, func(sql string) (*sqltypes.Result, error) { stats := NewVrLogStats("ROWCHANGE") start := time.Now() - qr, err := withDDL.Exec(vp.vr.vre.ctx, sql, vp.vr.dbClient.ExecuteWithRetry) + qr, err := vp.vr.dbClient.ExecuteWithRetry(ctx, sql) vp.vr.stats.QueryCount.Add(vp.phase, 1) vp.vr.stats.QueryTimings.Record(vp.phase, start) stats.Send(sql) @@ -232,7 +232,7 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row func (vp *vplayer) updatePos(ts int64) (posReached bool, err error) { vp.numAccumulatedHeartbeats = 0 update := binlogplayer.GenerateUpdatePos(vp.vr.id, vp.pos, time.Now().Unix(), ts, vp.vr.stats.CopyRowCount.Get()) - if _, err := withDDL.Exec(vp.vr.vre.ctx, update, vp.vr.dbClient.Execute); err != nil { + if _, err := vp.vr.dbClient.Execute(update); err != nil { return false, fmt.Errorf("error %v updating position", err) } vp.unsavedEvent = nil @@ -255,7 +255,7 @@ func (vp *vplayer) updateCurrentTime(tm int64) error { if err != nil { return err } - if _, err := withDDL.Exec(vp.vr.vre.ctx, update, vp.vr.dbClient.Execute); err != nil { + if _, err := vp.vr.dbClient.Execute(update); err != nil { return fmt.Errorf("error %v updating time", err) } return nil @@ -550,7 +550,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m // So, we apply the DDL first, and then save the position. // Manual intervention may be needed if there is a partial // failure here. - if _, err := withDDL.Exec(vp.vr.vre.ctx, event.Statement, vp.vr.dbClient.ExecuteWithRetry); err != nil { + if _, err := vp.vr.dbClient.ExecuteWithRetry(ctx, event.Statement); err != nil { return err } stats.Send(fmt.Sprintf("%v", event.Statement)) @@ -562,7 +562,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m return io.EOF } case binlogdatapb.OnDDLAction_EXEC_IGNORE: - if _, err := withDDL.Exec(vp.vr.vre.ctx, event.Statement, vp.vr.dbClient.ExecuteWithRetry); err != nil { + if _, err := vp.vr.dbClient.ExecuteWithRetry(ctx, event.Statement); err != nil { log.Infof("Ignoring error: %v for DDL: %s", err, event.Statement) } stats.Send(fmt.Sprintf("%v", event.Statement)) From c58049e786d8641d33420301941565d439de27d9 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Wed, 28 Apr 2021 11:49:28 +0200 Subject: [PATCH 7/8] Fix controller tests Signed-off-by: Rohit Nayak --- go/vt/binlog/binlogplayer/mock_dbclient.go | 49 ++++++++++++++----- .../vreplication/controller_test.go | 11 ----- 2 files changed, 38 insertions(+), 22 deletions(-) diff --git a/go/vt/binlog/binlogplayer/mock_dbclient.go b/go/vt/binlog/binlogplayer/mock_dbclient.go index 70fe4fcdf68..82571ec8de8 100644 --- a/go/vt/binlog/binlogplayer/mock_dbclient.go +++ b/go/vt/binlog/binlogplayer/mock_dbclient.go @@ -18,6 +18,7 @@ package binlogplayer import ( "regexp" + "strings" "testing" "time" @@ -30,11 +31,12 @@ const mockClientUNameDba = "Dba" // MockDBClient mocks a DBClient. // It must be configured to expect requests in a specific order. type MockDBClient struct { - t *testing.T - UName string - expect []*mockExpect - currentResult int - done chan struct{} + t *testing.T + UName string + expect []*mockExpect + currentResult int + done chan struct{} + queriesToIgnore []*mockExpect // these queries will return a standard nil result, you SHOULD NOT expect them in the tests } type mockExpect struct { @@ -44,20 +46,38 @@ type mockExpect struct { err error } +func getQueriesToIgnore() []*mockExpect { + var queriesToIgnore []*mockExpect + for _, query := range WithDDLInitialQueries { + exp := &mockExpect{ + query: query, + re: nil, + result: &sqltypes.Result{}, + err: nil, + } + queriesToIgnore = append(queriesToIgnore, exp) + + } + return queriesToIgnore +} + // NewMockDBClient returns a new DBClientMock with the default "Filtered" UName. func NewMockDBClient(t *testing.T) *MockDBClient { return &MockDBClient{ - t: t, - UName: mockClientUNameFiltered, - done: make(chan struct{}), + t: t, + UName: mockClientUNameFiltered, + done: make(chan struct{}), + queriesToIgnore: getQueriesToIgnore(), } } +// NewMockDbaClient returns a new DBClientMock with the default "Dba" UName. func NewMockDbaClient(t *testing.T) *MockDBClient { return &MockDBClient{ - t: t, - UName: mockClientUNameDba, - done: make(chan struct{}), + t: t, + UName: mockClientUNameDba, + done: make(chan struct{}), + queriesToIgnore: getQueriesToIgnore(), } } @@ -142,6 +162,13 @@ func (dc *MockDBClient) Close() { func (dc *MockDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error) { dc.t.Helper() dc.t.Logf("DBClient query: %v", query) + + for _, q := range dc.queriesToIgnore { + if strings.EqualFold(q.query, query) || strings.Contains(strings.ToLower(query), strings.ToLower(q.query)) { + return q.result, q.err + } + } + if dc.currentResult >= len(dc.expect) { dc.t.Fatalf("DBClientMock: query: %s, no more requests are expected", query) } diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go index 3fb6647d075..f47a113376e 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go @@ -54,12 +54,6 @@ var ( testPos = "MariaDB/0-1-1083" ) -func expectWithDDLInitialQueries(dbClient *binlogplayer.MockDBClient) { - for _, query := range withDDLInitialQueries { - dbClient.ExpectRequest(query, nil, nil) - } -} - func TestControllerKeyRange(t *testing.T) { resetBinlogClient() wantTablet := addTablet(100) @@ -72,7 +66,6 @@ func TestControllerKeyRange(t *testing.T) { } dbClient := binlogplayer.NewMockDBClient(t) - expectWithDDLInitialQueries(dbClient) dbClient.ExpectRequestRE("update _vt.vreplication set message='Picked source tablet.*", testDMLResponse, nil) dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil) dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil) @@ -109,7 +102,6 @@ func TestControllerTables(t *testing.T) { } dbClient := binlogplayer.NewMockDBClient(t) - expectWithDDLInitialQueries(dbClient) dbClient.ExpectRequestRE("update _vt.vreplication set message='Picked source tablet.*", testDMLResponse, nil) dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil) dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil) @@ -203,7 +195,6 @@ func TestControllerOverrides(t *testing.T) { } dbClient := binlogplayer.NewMockDBClient(t) - expectWithDDLInitialQueries(dbClient) dbClient.ExpectRequestRE("update _vt.vreplication set message='Picked source tablet.*", testDMLResponse, nil) dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil) dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil) @@ -269,7 +260,6 @@ func TestControllerRetry(t *testing.T) { } dbClient := binlogplayer.NewMockDBClient(t) - expectWithDDLInitialQueries(dbClient) dbClient.ExpectRequestRE("update _vt.vreplication set message='Picked source tablet.*", testDMLResponse, nil) dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil) dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", nil, errors.New("(expected error)")) @@ -305,7 +295,6 @@ func TestControllerStopPosition(t *testing.T) { } dbClient := binlogplayer.NewMockDBClient(t) - expectWithDDLInitialQueries(dbClient) dbClient.ExpectRequestRE("update _vt.vreplication set message='Picked source tablet.*", testDMLResponse, nil) dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil) withStop := &sqltypes.Result{ From 8f01ad9a53deb88dbad6835c99f56013d62d4214 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 28 Apr 2021 13:12:49 +0300 Subject: [PATCH 8/8] fix tests Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/tabletmanager/vreplication/framework_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 41526f4f517..cb0b9abb2f8 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -519,6 +519,7 @@ func expectNontxQueries(t *testing.T, queries []string) { t.Helper() failed := false + skipQueries := withDDLInitialQueries for i, query := range queries { if failed { t.Errorf("no query received, expecting %s", query) @@ -531,6 +532,11 @@ func expectNontxQueries(t *testing.T, queries []string) { if got == "begin" || got == "commit" || got == "rollback" || strings.Contains(got, "update _vt.vreplication set pos") || heartbeatRe.MatchString(got) { goto retry } + for _, skipQuery := range skipQueries { + if got == skipQuery { + goto retry + } + } var match bool if query[0] == '/' {