Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switchwrites: error if no tablets available on target for reverse replication #8142

Merged
merged 2 commits into from
Jun 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
return nil, vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
default:
}
candidates := tp.getMatchingTablets(ctx)
candidates := tp.GetMatchingTablets(ctx)

if len(candidates) == 0 {
// if no candidates were found, sleep and try again
Expand Down Expand Up @@ -145,9 +145,9 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
}
}

// getMatchingTablets returns a list of TabletInfo for tablets
// GetMatchingTablets returns a list of TabletInfo for tablets
// that match the cells, keyspace, shard and tabletTypes for this TabletPicker
func (tp *TabletPicker) getMatchingTablets(ctx context.Context) []*topo.TabletInfo {
func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletInfo {
// Special handling for MASTER tablet type
// Since there is only one master, we ignore cell and find the master
aliases := make([]*topodatapb.TabletAlias, 0)
Expand Down
9 changes: 7 additions & 2 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2132,9 +2132,14 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
s := ""
var progress wrangler.TableCopyProgress
for table := range *copyProgress {
var rowCountPct, tableSizePct int64
progress = *(*copyProgress)[table]
rowCountPct := 100.0 * progress.TargetRowCount / progress.SourceRowCount
tableSizePct := 100.0 * progress.TargetTableSize / progress.SourceTableSize
if progress.SourceRowCount > 0 {
rowCountPct = 100.0 * progress.TargetRowCount / progress.SourceRowCount
}
if progress.SourceTableSize > 0 {
tableSizePct = 100.0 * progress.TargetTableSize / progress.SourceTableSize
}
s += fmt.Sprintf("%s: rows copied %d/%d (%d%%), size copied %d/%d (%d%%)\n",
table, progress.TargetRowCount, progress.SourceRowCount, rowCountPct,
progress.TargetTableSize, progress.SourceTableSize, tableSizePct)
Expand Down
7 changes: 5 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,8 +736,11 @@ func (vre *Engine) WaitForPos(ctx context.Context, id int, pos string) error {

select {
case <-ctx.Done():
log.Errorf("Error waiting for pos: %s, last pos: %s: %v, wait time: %v", pos, qr.Rows[0][0].ToString(), ctx.Err(), time.Since(start))
return fmt.Errorf("error waiting for pos: %s, last pos: %s: %v, wait time: %v", pos, qr.Rows[0][0].ToString(), ctx.Err(), time.Since(start))
err = fmt.Errorf("error waiting for pos: %s, last pos: %s: %v, wait time: %v: %s",
pos, qr.Rows[0][0].ToString(), ctx.Err(), time.Since(start),
"possibly no tablets are available to stream in the source keyspace for your cell and tablet_types setting")
log.Error(err.Error())
return err
case <-vre.ctx.Done():
return fmt.Errorf("vreplication is closing: %v", vre.ctx.Err())
case <-tkr.C:
Expand Down
6 changes: 6 additions & 0 deletions go/vt/wrangler/fake_dbclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ func (dc *fakeDBClient) addQueryRE(query string, result *sqltypes.Result, err er
dc.queriesRE[query] = &dbResults{results: []*dbResult{dbr}, err: err}
}

func (dc *fakeDBClient) getInvariant(query string) *sqltypes.Result {
return dc.invariants[query]
}

// note: addInvariant will replace a previous result for a query with the provided one: this is used in the tests
func (dc *fakeDBClient) addInvariant(query string, result *sqltypes.Result) {
dc.invariants[query] = result
}
Expand Down Expand Up @@ -138,6 +143,7 @@ func (dc *fakeDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Re
if testMode == "debug" {
fmt.Printf("ExecuteFetch: %s\n", query)
}

if dbrs := dc.queries[query]; dbrs != nil {
return dbrs.next(query)
}
Expand Down
1 change: 1 addition & 0 deletions go/vt/wrangler/resharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sour
if err != nil {
return vterrors.Wrap(err, "buildResharder")
}

rs.stopAfterCopy = stopAfterCopy
if !skipSchemaCopy {
if err := rs.copySchema(ctx); err != nil {
Expand Down
55 changes: 54 additions & 1 deletion go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"sync"
"time"

"vitess.io/vitess/go/vt/discovery"

"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/concurrency"
Expand Down Expand Up @@ -367,8 +369,52 @@ func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflowNam
return sw.logs(), nil
}

func (wr *Wrangler) areTabletsAvailableToStreamFrom(ctx context.Context, ts *trafficSwitcher, keyspace string, shards []*topo.ShardInfo) error {
var cells []string
tabletTypes := ts.optTabletTypes
if ts.optCells != "" {
cells = strings.Split(ts.optCells, ",")
}
// FIXME: currently there is a default setting in the tablet that is used if user does not specify a tablet type,
// we use the value specified in the tablet flag `-vreplication_tablet_type`
// but ideally we should populate the vreplication table with a default value when we setup the workflow
if tabletTypes == "" {
tabletTypes = "MASTER,REPLICA"
}

var wg sync.WaitGroup
allErrors := &concurrency.AllErrorRecorder{}
for _, shard := range shards {
wg.Add(1)
go func(cells []string, keyspace string, shard *topo.ShardInfo) {
defer wg.Done()
if cells == nil {
cells = append(cells, shard.MasterAlias.Cell)
}
tp, err := discovery.NewTabletPicker(wr.ts, cells, keyspace, shard.ShardName(), tabletTypes)
if err != nil {
allErrors.RecordError(err)
return
}
tablets := tp.GetMatchingTablets(ctx)
if len(tablets) == 0 {
allErrors.RecordError(fmt.Errorf("no tablet found to source data in keyspace %s, shard %s", keyspace, shard.ShardName()))
return
}
}(cells, keyspace, shard)
}

wg.Wait()
if allErrors.HasErrors() {
log.Errorf("%s", allErrors.Error())
return allErrors.Error()
}
return nil
}

// SwitchWrites is a generic way of migrating write traffic for a resharding workflow.
func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowName string, timeout time.Duration, cancel, reverse, reverseReplication bool, dryRun bool) (journalID int64, dryRunResults *[]string, err error) {
func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowName string, timeout time.Duration,
cancel, reverse, reverseReplication bool, dryRun bool) (journalID int64, dryRunResults *[]string, err error) {
ts, ws, err := wr.getWorkflowState(ctx, targetKeyspace, workflowName)
_ = ws
if err != nil {
Expand Down Expand Up @@ -399,6 +445,13 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowNa
return 0, nil, err
}

if reverseReplication {
err := wr.areTabletsAvailableToStreamFrom(ctx, ts, ts.targetKeyspace, ts.targetShards())
if err != nil {
return 0, nil, err
}
}

// Need to lock both source and target keyspaces.
tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.sourceKeyspace, "SwitchWrites")
if lockErr != nil {
Expand Down
9 changes: 7 additions & 2 deletions go/vt/wrangler/traffic_switcher_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,8 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe
tme.startTablets(t)
tme.createDBClients(ctx, t)
tme.setMasterPositions()

for i, targetShard := range targetShards {
var rows []string
var rows, rowsRdOnly []string
for j, sourceShard := range sourceShards {
if !key.KeyRangesIntersect(tme.targetKeyRanges[i], tme.sourceKeyRanges[j]) {
continue
Expand All @@ -332,12 +331,18 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe
},
}
rows = append(rows, fmt.Sprintf("%d|%v|||", j+1, bls))
rowsRdOnly = append(rows, fmt.Sprintf("%d|%v|||RDONLY", j+1, bls))
}
tme.dbTargetClients[i].addInvariant(vreplQueryks, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
rows...),
)
tme.dbTargetClients[i].addInvariant(vreplQueryks+"-rdonly", sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
rowsRdOnly...),
)
}

tme.targetKeyspace = "ks"
Expand Down
Loading