diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index 0c727162236..af813ae13e9 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -107,7 +107,7 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table return nil, vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") default: } - candidates := tp.getMatchingTablets(ctx) + candidates := tp.GetMatchingTablets(ctx) if len(candidates) == 0 { // if no candidates were found, sleep and try again @@ -145,9 +145,9 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table } } -// getMatchingTablets returns a list of TabletInfo for tablets +// GetMatchingTablets returns a list of TabletInfo for tablets // that match the cells, keyspace, shard and tabletTypes for this TabletPicker -func (tp *TabletPicker) getMatchingTablets(ctx context.Context) []*topo.TabletInfo { +func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletInfo { // Special handling for MASTER tablet type // Since there is only one master, we ignore cell and find the master aliases := make([]*topodatapb.TabletAlias, 0) diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 1168c87d94c..4fa1c963894 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -2132,9 +2132,14 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla s := "" var progress wrangler.TableCopyProgress for table := range *copyProgress { + var rowCountPct, tableSizePct int64 progress = *(*copyProgress)[table] - rowCountPct := 100.0 * progress.TargetRowCount / progress.SourceRowCount - tableSizePct := 100.0 * progress.TargetTableSize / progress.SourceTableSize + if progress.SourceRowCount > 0 { + rowCountPct = 100.0 * progress.TargetRowCount / progress.SourceRowCount + } + if progress.SourceTableSize > 0 { + tableSizePct = 100.0 * progress.TargetTableSize / progress.SourceTableSize + } s += fmt.Sprintf("%s: rows copied %d/%d (%d%%), size copied %d/%d (%d%%)\n", table, progress.TargetRowCount, progress.SourceRowCount, rowCountPct, progress.TargetTableSize, progress.SourceTableSize, tableSizePct) diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index 910e94d9b42..9ddf5c75473 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -736,8 +736,11 @@ func (vre *Engine) WaitForPos(ctx context.Context, id int, pos string) error { select { case <-ctx.Done(): - log.Errorf("Error waiting for pos: %s, last pos: %s: %v, wait time: %v", pos, qr.Rows[0][0].ToString(), ctx.Err(), time.Since(start)) - return fmt.Errorf("error waiting for pos: %s, last pos: %s: %v, wait time: %v", pos, qr.Rows[0][0].ToString(), ctx.Err(), time.Since(start)) + err = fmt.Errorf("error waiting for pos: %s, last pos: %s: %v, wait time: %v: %s", + pos, qr.Rows[0][0].ToString(), ctx.Err(), time.Since(start), + "possibly no tablets are available to stream in the source keyspace for your cell and tablet_types setting") + log.Error(err.Error()) + return err case <-vre.ctx.Done(): return fmt.Errorf("vreplication is closing: %v", vre.ctx.Err()) case <-tkr.C: diff --git a/go/vt/wrangler/fake_dbclient_test.go b/go/vt/wrangler/fake_dbclient_test.go index 9aeef6541c2..8c24f39a361 100644 --- a/go/vt/wrangler/fake_dbclient_test.go +++ b/go/vt/wrangler/fake_dbclient_test.go @@ -100,6 +100,11 @@ func (dc *fakeDBClient) addQueryRE(query string, result *sqltypes.Result, err er dc.queriesRE[query] = &dbResults{results: []*dbResult{dbr}, err: err} } +func (dc *fakeDBClient) getInvariant(query string) *sqltypes.Result { + return dc.invariants[query] +} + +// note: addInvariant will replace a previous result for a query with the provided one: this is used in the tests func (dc *fakeDBClient) addInvariant(query string, result *sqltypes.Result) { dc.invariants[query] = result } @@ -138,6 +143,7 @@ func (dc *fakeDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Re if testMode == "debug" { fmt.Printf("ExecuteFetch: %s\n", query) } + if dbrs := dc.queries[query]; dbrs != nil { return dbrs.next(query) } diff --git a/go/vt/wrangler/resharder.go b/go/vt/wrangler/resharder.go index 5d856e579a4..a758d849824 100644 --- a/go/vt/wrangler/resharder.go +++ b/go/vt/wrangler/resharder.go @@ -80,6 +80,7 @@ func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sour if err != nil { return vterrors.Wrap(err, "buildResharder") } + rs.stopAfterCopy = stopAfterCopy if !skipSchemaCopy { if err := rs.copySchema(ctx); err != nil { diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 381e4bf1987..e07a86b1753 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -26,6 +26,8 @@ import ( "sync" "time" + "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/json2" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/concurrency" @@ -367,8 +369,52 @@ func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflowNam return sw.logs(), nil } +func (wr *Wrangler) areTabletsAvailableToStreamFrom(ctx context.Context, ts *trafficSwitcher, keyspace string, shards []*topo.ShardInfo) error { + var cells []string + tabletTypes := ts.optTabletTypes + if ts.optCells != "" { + cells = strings.Split(ts.optCells, ",") + } + // FIXME: currently there is a default setting in the tablet that is used if user does not specify a tablet type, + // we use the value specified in the tablet flag `-vreplication_tablet_type` + // but ideally we should populate the vreplication table with a default value when we setup the workflow + if tabletTypes == "" { + tabletTypes = "MASTER,REPLICA" + } + + var wg sync.WaitGroup + allErrors := &concurrency.AllErrorRecorder{} + for _, shard := range shards { + wg.Add(1) + go func(cells []string, keyspace string, shard *topo.ShardInfo) { + defer wg.Done() + if cells == nil { + cells = append(cells, shard.MasterAlias.Cell) + } + tp, err := discovery.NewTabletPicker(wr.ts, cells, keyspace, shard.ShardName(), tabletTypes) + if err != nil { + allErrors.RecordError(err) + return + } + tablets := tp.GetMatchingTablets(ctx) + if len(tablets) == 0 { + allErrors.RecordError(fmt.Errorf("no tablet found to source data in keyspace %s, shard %s", keyspace, shard.ShardName())) + return + } + }(cells, keyspace, shard) + } + + wg.Wait() + if allErrors.HasErrors() { + log.Errorf("%s", allErrors.Error()) + return allErrors.Error() + } + return nil +} + // SwitchWrites is a generic way of migrating write traffic for a resharding workflow. -func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowName string, timeout time.Duration, cancel, reverse, reverseReplication bool, dryRun bool) (journalID int64, dryRunResults *[]string, err error) { +func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowName string, timeout time.Duration, + cancel, reverse, reverseReplication bool, dryRun bool) (journalID int64, dryRunResults *[]string, err error) { ts, ws, err := wr.getWorkflowState(ctx, targetKeyspace, workflowName) _ = ws if err != nil { @@ -399,6 +445,13 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowNa return 0, nil, err } + if reverseReplication { + err := wr.areTabletsAvailableToStreamFrom(ctx, ts, ts.targetKeyspace, ts.targetShards()) + if err != nil { + return 0, nil, err + } + } + // Need to lock both source and target keyspaces. tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.sourceKeyspace, "SwitchWrites") if lockErr != nil { diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index d9e49802f87..e9c1a8f9f22 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -314,9 +314,8 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe tme.startTablets(t) tme.createDBClients(ctx, t) tme.setMasterPositions() - for i, targetShard := range targetShards { - var rows []string + var rows, rowsRdOnly []string for j, sourceShard := range sourceShards { if !key.KeyRangesIntersect(tme.targetKeyRanges[i], tme.sourceKeyRanges[j]) { continue @@ -332,12 +331,18 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe }, } rows = append(rows, fmt.Sprintf("%d|%v|||", j+1, bls)) + rowsRdOnly = append(rows, fmt.Sprintf("%d|%v|||RDONLY", j+1, bls)) } tme.dbTargetClients[i].addInvariant(vreplQueryks, sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|source|message|cell|tablet_types", "int64|varchar|varchar|varchar|varchar"), rows...), ) + tme.dbTargetClients[i].addInvariant(vreplQueryks+"-rdonly", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|message|cell|tablet_types", + "int64|varchar|varchar|varchar|varchar"), + rowsRdOnly...), + ) } tme.targetKeyspace = "ks" diff --git a/go/vt/wrangler/traffic_switcher_test.go b/go/vt/wrangler/traffic_switcher_test.go index 4241c2fbd35..21c21e57dba 100644 --- a/go/vt/wrangler/traffic_switcher_test.go +++ b/go/vt/wrangler/traffic_switcher_test.go @@ -1735,6 +1735,303 @@ func TestReverseVReplicationUpdateQuery(t *testing.T) { } } +func TestShardMigrateNoAvailableTabletsForReverseReplication(t *testing.T) { + ctx := context.Background() + tme := newTestShardMigrater(ctx, t, []string{"-40", "40-"}, []string{"-80", "80-"}) + defer tme.stopTablets(t) + + // Initial check + checkServedTypes(t, tme.ts, "ks:-40", 3) + checkServedTypes(t, tme.ts, "ks:40-", 3) + checkServedTypes(t, tme.ts, "ks:-80", 0) + checkServedTypes(t, tme.ts, "ks:80-", 0) + + tme.expectNoPreviousJournals() + //------------------------------------------------------------------------------------------------------------------- + // Single cell RDONLY migration. + _, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", []topodatapb.TabletType{topodatapb.TabletType_RDONLY}, []string{"cell1"}, workflow.DirectionForward, false) + if err != nil { + t.Fatal(err) + } + checkCellServedTypes(t, tme.ts, "ks:-40", "cell1", 2) + checkCellServedTypes(t, tme.ts, "ks:40-", "cell1", 2) + checkCellServedTypes(t, tme.ts, "ks:-80", "cell1", 1) + checkCellServedTypes(t, tme.ts, "ks:80-", "cell1", 1) + checkCellServedTypes(t, tme.ts, "ks:-40", "cell2", 3) + checkCellServedTypes(t, tme.ts, "ks:40-", "cell2", 3) + checkCellServedTypes(t, tme.ts, "ks:-80", "cell2", 0) + checkCellServedTypes(t, tme.ts, "ks:80-", "cell2", 0) + verifyQueries(t, tme.allDBClients) + + tme.expectNoPreviousJournals() + //------------------------------------------------------------------------------------------------------------------- + // Other cell REPLICA migration. + _, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", []topodatapb.TabletType{topodatapb.TabletType_REPLICA}, []string{"cell2"}, workflow.DirectionForward, false) + if err != nil { + t.Fatal(err) + } + checkCellServedTypes(t, tme.ts, "ks:-40", "cell1", 2) + checkCellServedTypes(t, tme.ts, "ks:40-", "cell1", 2) + checkCellServedTypes(t, tme.ts, "ks:-80", "cell1", 1) + checkCellServedTypes(t, tme.ts, "ks:80-", "cell1", 1) + checkCellServedTypes(t, tme.ts, "ks:-40", "cell2", 1) + checkCellServedTypes(t, tme.ts, "ks:40-", "cell2", 1) + checkCellServedTypes(t, tme.ts, "ks:-80", "cell2", 2) + checkCellServedTypes(t, tme.ts, "ks:80-", "cell2", 2) + verifyQueries(t, tme.allDBClients) + + tme.expectNoPreviousJournals() + //------------------------------------------------------------------------------------------------------------------- + // Single cell backward REPLICA migration. + _, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", []topodatapb.TabletType{topodatapb.TabletType_REPLICA}, []string{"cell2"}, workflow.DirectionBackward, false) + if err != nil { + t.Fatal(err) + } + checkCellServedTypes(t, tme.ts, "ks:-40", "cell1", 2) + checkCellServedTypes(t, tme.ts, "ks:40-", "cell1", 2) + checkCellServedTypes(t, tme.ts, "ks:-80", "cell1", 1) + checkCellServedTypes(t, tme.ts, "ks:80-", "cell1", 1) + checkCellServedTypes(t, tme.ts, "ks:-40", "cell2", 3) + checkCellServedTypes(t, tme.ts, "ks:40-", "cell2", 3) + checkCellServedTypes(t, tme.ts, "ks:-80", "cell2", 0) + checkCellServedTypes(t, tme.ts, "ks:80-", "cell2", 0) + verifyQueries(t, tme.allDBClients) + + tme.expectNoPreviousJournals() + //------------------------------------------------------------------------------------------------------------------- + // Switch all RDONLY. + // This is an extra step that does not exist in the tables test. + // The per-cell migration mechanism is different for tables. So, this + // extra step is needed to bring things in sync. + _, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", []topodatapb.TabletType{topodatapb.TabletType_RDONLY}, nil, workflow.DirectionForward, false) + if err != nil { + t.Fatal(err) + } + checkServedTypes(t, tme.ts, "ks:-40", 2) + checkServedTypes(t, tme.ts, "ks:40-", 2) + checkServedTypes(t, tme.ts, "ks:-80", 1) + checkServedTypes(t, tme.ts, "ks:80-", 1) + verifyQueries(t, tme.allDBClients) + + tme.expectNoPreviousJournals() + //------------------------------------------------------------------------------------------------------------------- + // Switch all REPLICA. + _, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", []topodatapb.TabletType{topodatapb.TabletType_REPLICA}, nil, workflow.DirectionForward, false) + if err != nil { + t.Fatal(err) + } + checkServedTypes(t, tme.ts, "ks:-40", 1) + checkServedTypes(t, tme.ts, "ks:40-", 1) + checkServedTypes(t, tme.ts, "ks:-80", 2) + checkServedTypes(t, tme.ts, "ks:80-", 2) + verifyQueries(t, tme.allDBClients) + + tme.expectNoPreviousJournals() + //------------------------------------------------------------------------------------------------------------------- + // All cells RDONLY backward migration. + _, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", []topodatapb.TabletType{topodatapb.TabletType_RDONLY}, nil, workflow.DirectionBackward, false) + if err != nil { + t.Fatal(err) + } + checkServedTypes(t, tme.ts, "ks:-40", 2) + checkServedTypes(t, tme.ts, "ks:40-", 2) + checkServedTypes(t, tme.ts, "ks:-80", 1) + checkServedTypes(t, tme.ts, "ks:80-", 1) + verifyQueries(t, tme.allDBClients) + + //------------------------------------------------------------------------------------------------------------------- + // Can't switch master with SwitchReads. + _, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", []topodatapb.TabletType{topodatapb.TabletType_MASTER}, nil, workflow.DirectionForward, false) + want := "tablet type must be REPLICA or RDONLY: MASTER" + if err == nil || err.Error() != want { + t.Errorf("SwitchReads(master) err: %v, want %v", err, want) + } + verifyQueries(t, tme.allDBClients) + + //------------------------------------------------------------------------------------------------------------------- + // Test SwitchWrites cancelation on failure. + + tme.expectNoPreviousJournals() + // Switch all the reads first. + _, err = tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", []topodatapb.TabletType{topodatapb.TabletType_RDONLY}, nil, workflow.DirectionForward, false) + if err != nil { + t.Fatal(err) + } + checkServedTypes(t, tme.ts, "ks:-40", 1) + checkServedTypes(t, tme.ts, "ks:40-", 1) + checkServedTypes(t, tme.ts, "ks:-80", 2) + checkServedTypes(t, tme.ts, "ks:80-", 2) + checkIsMasterServing(t, tme.ts, "ks:-40", true) + checkIsMasterServing(t, tme.ts, "ks:40-", true) + checkIsMasterServing(t, tme.ts, "ks:-80", false) + checkIsMasterServing(t, tme.ts, "ks:80-", false) + + checkJournals := func() { + tme.dbSourceClients[0].addQuery("select val from _vt.resharding_journal where id=6432976123657117097", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select val from _vt.resharding_journal where id=6432976123657117097", &sqltypes.Result{}, nil) + } + checkJournals() + + stopStreams := func() { + tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped' and message != 'FROZEN'", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse' and state = 'Stopped' and message != 'FROZEN'", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse'", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select id, workflow, source, pos from _vt.vreplication where db_name='vt_ks' and workflow != 'test_reverse'", &sqltypes.Result{}, nil) + } + stopStreams() + + deleteReverseReplicaion := func() { + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test_reverse'", resultid3, nil) + tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test_reverse'", resultid34, nil) + tme.dbSourceClients[0].addQuery("delete from _vt.vreplication where id in (3)", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("delete from _vt.vreplication where id in (3, 4)", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("delete from _vt.copy_state where vrepl_id in (3)", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("delete from _vt.copy_state where vrepl_id in (3, 4)", &sqltypes.Result{}, nil) + } + cancelMigration := func() { + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow != 'test_reverse'", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow != 'test_reverse'", &sqltypes.Result{}, nil) + + tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", resultid12, nil) + tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", resultid2, nil) + tme.dbTargetClients[0].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (1, 2)", &sqltypes.Result{}, nil) + tme.dbTargetClients[1].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (2)", &sqltypes.Result{}, nil) + tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 1", runningResult(1), nil) + tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 2", runningResult(2), nil) + tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 2", runningResult(2), nil) + + deleteReverseReplicaion() + } + cancelMigration() + + _, _, err = tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 0*time.Second, false, false, true, false) + want = "DeadlineExceeded" + if err == nil || !strings.Contains(err.Error(), want) { + t.Errorf("SwitchWrites(0 timeout) err: %v, must contain %v", err, want) + } + + verifyQueries(t, tme.allDBClients) + checkServedTypes(t, tme.ts, "ks:-40", 1) + checkServedTypes(t, tme.ts, "ks:40-", 1) + checkServedTypes(t, tme.ts, "ks:-80", 2) + checkServedTypes(t, tme.ts, "ks:80-", 2) + checkIsMasterServing(t, tme.ts, "ks:-40", true) + checkIsMasterServing(t, tme.ts, "ks:40-", true) + checkIsMasterServing(t, tme.ts, "ks:-80", false) + checkIsMasterServing(t, tme.ts, "ks:80-", false) + + //------------------------------------------------------------------------------------------------------------------- + // Test successful SwitchWrites. + + checkJournals() + stopStreams() + + waitForCatchup := func() { + // mi.waitForCatchup-> mi.wr.tmc.VReplicationWaitForPos + state := sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "pos|state|message", + "varchar|varchar|varchar"), + "MariaDB/5-456-892|Running", + ) + tme.dbTargetClients[0].addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil) + tme.dbTargetClients[1].addQuery("select pos, state, message from _vt.vreplication where id=2", state, nil) + tme.dbTargetClients[0].addQuery("select pos, state, message from _vt.vreplication where id=2", state, nil) + + // mi.waitForCatchup-> mi.wr.tmc.VReplicationExec('stopped for cutover') + tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where id = 1", resultid1, nil) + tme.dbTargetClients[0].addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (1)", &sqltypes.Result{}, nil) + tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where id = 2", resultid2, nil) + tme.dbTargetClients[0].addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (2)", &sqltypes.Result{}, nil) + tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where id = 2", resultid2, nil) + tme.dbTargetClients[1].addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (2)", &sqltypes.Result{}, nil) + tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) + tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + } + waitForCatchup() + + createReverseVReplication := func() { + deleteReverseReplicaion() + + tme.dbSourceClients[0].addQueryRE("insert into _vt.vreplication.*-80.*-40.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 1}, nil) + tme.dbSourceClients[1].addQueryRE("insert into _vt.vreplication.*-80.*40-.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 1}, nil) + tme.dbSourceClients[1].addQueryRE("insert into _vt.vreplication.*80-.*40-.*MariaDB/5-456-893.*Stopped", &sqltypes.Result{InsertID: 2}, nil) + tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) + tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) + tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + } + createReverseVReplication() + + createJournals := func() { + journal1 := "insert into _vt.resharding_journal.*6432976123657117097.*migration_type:SHARDS.*local_position.*MariaDB/5-456-892.*shard_gtids.*-80.*MariaDB/5-456-893.*participants.*40.*40" + tme.dbSourceClients[0].addQueryRE(journal1, &sqltypes.Result{}, nil) + journal2 := "insert into _vt.resharding_journal.*6432976123657117097.*migration_type:SHARDS.*local_position.*MariaDB/5-456-892.*shard_gtids.*80.*MariaDB/5-456-893.*shard_gtids.*80.*MariaDB/5-456-893.*participants.*40.*40" + tme.dbSourceClients[1].addQueryRE(journal2, &sqltypes.Result{}, nil) + } + createJournals() + + startReverseVReplication := func() { + tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks'", resultid34, nil) + tme.dbSourceClients[0].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil) + tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil) + tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil) + tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks'", resultid34, nil) + tme.dbSourceClients[1].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil) + tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil) + tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil) + } + startReverseVReplication() + + freezeTargetVReplication := func() { + tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", resultid12, nil) + tme.dbTargetClients[0].addQuery("update _vt.vreplication set message = 'FROZEN' where id in (1, 2)", &sqltypes.Result{}, nil) + tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil) + tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", resultid2, nil) + tme.dbTargetClients[1].addQuery("update _vt.vreplication set message = 'FROZEN' where id in (2)", &sqltypes.Result{}, nil) + tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil) + } + freezeTargetVReplication() + + // Temporarily set tablet types to RDONLY to test that SwitchWrites fails if no tablets of rdonly are available + invariants := make(map[string]*sqltypes.Result) + for i := range tme.targetShards { + invariants[fmt.Sprintf("%s-%d", vreplQueryks, i)] = tme.dbTargetClients[i].getInvariant(vreplQueryks) + tme.dbTargetClients[i].addInvariant(vreplQueryks, tme.dbTargetClients[i].getInvariant(vreplQueryks+"-rdonly")) + } + _, _, err = tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, false, false, true, false) + require.Error(t, err) + require.True(t, strings.Contains(err.Error(), "no tablet found")) + require.True(t, strings.Contains(err.Error(), "-80")) + require.True(t, strings.Contains(err.Error(), "80-")) + require.False(t, strings.Contains(err.Error(), "40")) + for i := range tme.targetShards { + tme.dbTargetClients[i].addInvariant(vreplQueryks, invariants[fmt.Sprintf("%s-%d", vreplQueryks, i)]) + } + + journalID, _, err := tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, false, false, true, false) + if err != nil { + t.Fatal(err) + } + if journalID != 6432976123657117097 { + t.Errorf("journal id: %d, want 6432976123657117097", journalID) + } + + verifyQueries(t, tme.allDBClients) + + checkServedTypes(t, tme.ts, "ks:-40", 0) + checkServedTypes(t, tme.ts, "ks:40-", 0) + checkServedTypes(t, tme.ts, "ks:-80", 3) + checkServedTypes(t, tme.ts, "ks:80-", 3) + + checkIsMasterServing(t, tme.ts, "ks:-40", false) + checkIsMasterServing(t, tme.ts, "ks:40-", false) + checkIsMasterServing(t, tme.ts, "ks:-80", true) + checkIsMasterServing(t, tme.ts, "ks:80-", true) + + verifyQueries(t, tme.allDBClients) +} + func checkRouting(t *testing.T, wr *Wrangler, want map[string][]string) { t.Helper() ctx := context.Background()