From 9256a540dab4836f917f90bf202dce7fad28c66d Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sat, 24 Apr 2021 11:39:28 -0400 Subject: [PATCH 1/6] Extract StreamMigrator code from package wrangler Signed-off-by: Andrew Mason --- go/vt/vtctl/workflow/stream_migrator.go | 657 +++++++++++++++++++++++ go/vt/vtctl/workflow/traffic_switcher.go | 36 ++ 2 files changed, 693 insertions(+) create mode 100644 go/vt/vtctl/workflow/stream_migrator.go diff --git a/go/vt/vtctl/workflow/stream_migrator.go b/go/vt/vtctl/workflow/stream_migrator.go new file mode 100644 index 00000000000..a065e161d5f --- /dev/null +++ b/go/vt/vtctl/workflow/stream_migrator.go @@ -0,0 +1,657 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "context" + "fmt" + "strings" + "sync" + "text/template" + + "github.com/golang/protobuf/proto" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/concurrency" + "vitess.io/vitess/go/vt/key" + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/evalengine" + "vitess.io/vitess/go/vt/vtgate/vindexes" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" +) + +// (TODO:@ajm188) Does this need to be exported? +type StreamType int + +const ( + StreamTypeUnknown = StreamType(iota) + StreamTypeSharded + StreamTypeReference +) + +type StreamMigrator struct { + streams map[string][]*VReplicationStream + workflows []string + templates []*VReplicationStream + ts ITrafficSwitcher + logger logutil.Logger +} + +// BuildStreamMigrator creates a new StreamMigrator based on the given +// TrafficSwitcher. +func BuildStreamMigrator(ctx context.Context, ts ITrafficSwitcher, cancelMigrate bool) (*StreamMigrator, error) { + sm := &StreamMigrator{ + ts: ts, + logger: ts.Logger(), + } + + if sm.ts.MigrationType() == binlogdatapb.MigrationType_TABLES { + // Source streams should be stopped only for shard migrations. + return sm, nil + } + + var err error + + sm.streams, err = sm.readSourceStreams(ctx, cancelMigrate) + if err != nil { + return nil, err + } + + // Loop executes only once. + for _, tabletStreams := range sm.streams { + tmpl, err := sm.templatize(ctx, tabletStreams) + if err != nil { + return nil, err + } + + sm.workflows = VReplicationStreams(tmpl).Workflows() + break + } + + return sm, nil +} + +// StreamMigratorFinalize finalizes the stream migration. +// +// (TODO:@ajm88) in the original implementation, "it's a standalone function +// because it does not use the streamMigrater state". That's still true, but +// moving this to a method on StreamMigrator would provide a cleaner namespacing +// in package workflow. But, we would have to update more callers in order to +// do that (*wrangler.switcher's streamMigrateFinalize would need to change its +// signature to also take a *workflow.StreamMigrator), so we will do that in a +// second PR. +func StreamMigratorFinalize(ctx context.Context, ts ITrafficSwitcher, workflows []string) error { + if len(workflows) == 0 { + return nil + } + workflowList := stringListify(workflows) + err := ts.ForAllSources(func(source *MigrationSource) error { + query := fmt.Sprintf("delete from _vt.vreplication where db_name=%s and workflow in (%s)", encodeString(source.GetPrimary().DbName()), workflowList) + _, err := ts.VReplicationExec(ctx, source.GetPrimary().Alias, query) + return err + }) + if err != nil { + return err + } + err = ts.ForAllTargets(func(target *MigrationTarget) error { + query := fmt.Sprintf("update _vt.vreplication set state='Running' where db_name=%s and workflow in (%s)", encodeString(target.GetPrimary().DbName()), workflowList) + _, err := ts.VReplicationExec(ctx, target.GetPrimary().Alias, query) + return err + }) + return err +} + +func (sm *StreamMigrator) CancelMigration(ctx context.Context) { + if sm.streams == nil { + return + } + + _ = sm.deleteTargetStreams(ctx) + + err := sm.ts.ForAllSources(func(source *MigrationSource) error { + query := fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos=null, message='' where db_name=%s and workflow != %s", encodeString(source.GetPrimary().DbName()), encodeString(sm.ts.ReverseWorkflowName())) + _, err := sm.ts.VReplicationExec(ctx, source.GetPrimary().Alias, query) + return err + }) + if err != nil { + sm.logger.Errorf("Cancel migration failed: could not restart source streams: %v", err) + } +} + +func (sm *StreamMigrator) MigrateStreams(ctx context.Context) error { + if sm.streams == nil { + return nil + } + + if err := sm.deleteTargetStreams(ctx); err != nil { + return err + } + + return sm.createTargetStreams(ctx, sm.templates) +} + +func (sm *StreamMigrator) StopStreams(ctx context.Context) ([]string, error) { + if sm.streams == nil { + return nil, nil + } + + if err := sm.stopSourceStreams(ctx); err != nil { + return nil, err + } + + positions, err := sm.syncSourceStreams(ctx) + if err != nil { + return nil, err + } + + return sm.verifyStreamPositions(ctx, positions) +} + +/* tablet streams */ + +func (sm *StreamMigrator) readTabletStreams(ctx context.Context, ti *topo.TabletInfo, constraint string) ([]*VReplicationStream, error) { + var query string + if constraint == "" { + query = fmt.Sprintf("select id, workflow, source, pos from _vt.vreplication where db_name=%s and workflow != %s", encodeString(ti.DbName()), encodeString(sm.ts.ReverseWorkflowName())) + } else { + query = fmt.Sprintf("select id, workflow, source, pos from _vt.vreplication where db_name=%s and workflow != %s and %s", encodeString(ti.DbName()), encodeString(sm.ts.ReverseWorkflowName()), constraint) + } + p3qr, err := sm.ts.TabletManagerClient().VReplicationExec(ctx, ti.Tablet, query) + if err != nil { + return nil, err + } + qr := sqltypes.Proto3ToResult(p3qr) + + tabletStreams := make([]*VReplicationStream, 0, len(qr.Rows)) + for _, row := range qr.Rows { + id, err := evalengine.ToInt64(row[0]) + if err != nil { + return nil, err + } + workflowName := row[1].ToString() + if workflowName == "" { + return nil, fmt.Errorf("VReplication streams must have named workflows for migration: shard: %s:%s, stream: %d", ti.Keyspace, ti.Shard, id) + } + if workflowName == sm.ts.WorkflowName() { + return nil, fmt.Errorf("VReplication stream has the same workflow name as the resharding workflow: shard: %s:%s, stream: %d", ti.Keyspace, ti.Shard, id) + } + var bls binlogdatapb.BinlogSource + if err := proto.UnmarshalText(row[2].ToString(), &bls); err != nil { + return nil, err + } + isReference, err := sm.blsIsReference(&bls) + if err != nil { + return nil, vterrors.Wrap(err, "blsIsReference") + } + if isReference { + sm.ts.Logger().Infof("readTabletStreams: ignoring reference table %+v", bls) + continue + } + pos, err := mysql.DecodePosition(row[3].ToString()) + if err != nil { + return nil, err + } + tabletStreams = append(tabletStreams, &VReplicationStream{ + ID: uint32(id), + Workflow: workflowName, + BinlogSource: &bls, + Position: pos, + }) + } + return tabletStreams, nil +} + +/* source streams */ + +func (sm *StreamMigrator) readSourceStreams(ctx context.Context, cancelMigrate bool) (map[string][]*VReplicationStream, error) { + streams := make(map[string][]*VReplicationStream) + var mu sync.Mutex + err := sm.ts.ForAllSources(func(source *MigrationSource) error { + if !cancelMigrate { + // This flow protects us from the following scenario: When we create streams, + // we always do it in two phases. We start them off as Stopped, and then + // update them to Running. If such an operation fails, we may be left with + // lingering Stopped streams. They should actually be cleaned up by the user. + // In the current workflow, we stop streams and restart them. + // Once existing streams are stopped, there will be confusion about which of + // them can be restarted because they will be no different from the lingering streams. + // To prevent this confusion, we first check if there are any stopped streams. + // If so, we request the operator to clean them up, or restart them before going ahead. + // This allows us to assume that all stopped streams can be safely restarted + // if we cancel the operation. + stoppedStreams, err := sm.readTabletStreams(ctx, source.GetPrimary(), "state = 'Stopped' and message != 'FROZEN'") + if err != nil { + return err + } + if len(stoppedStreams) != 0 { + return fmt.Errorf("cannot migrate until all streams are running: %s: %d", source.GetShard().ShardName(), source.GetPrimary().Alias.Uid) + } + } + tabletStreams, err := sm.readTabletStreams(ctx, source.GetPrimary(), "") + if err != nil { + return err + } + if len(tabletStreams) == 0 { + // No VReplication is running. So, we have no work to do. + return nil + } + p3qr, err := sm.ts.TabletManagerClient().VReplicationExec(ctx, source.GetPrimary().Tablet, fmt.Sprintf("select vrepl_id from _vt.copy_state where vrepl_id in %s", VReplicationStreams(tabletStreams).Values())) + if err != nil { + return err + } + if len(p3qr.Rows) != 0 { + return fmt.Errorf("cannot migrate while vreplication streams in source shards are still copying: %s", source.GetShard().ShardName()) + } + + mu.Lock() + defer mu.Unlock() + streams[source.GetShard().ShardName()] = tabletStreams + return nil + }) + if err != nil { + return nil, err + } + // Validate that streams match across source shards. + streams2 := make(map[string][]*VReplicationStream) + var reference []*VReplicationStream + var refshard string + for k, v := range streams { + if reference == nil { + refshard = k + reference = v + continue + } + streams2[k] = append([]*VReplicationStream(nil), v...) + } + for shard, tabletStreams := range streams2 { + nextStream: + for _, refStream := range reference { + for i := 0; i < len(tabletStreams); i++ { + vrs := tabletStreams[i] + if refStream.Workflow == vrs.Workflow && + refStream.BinlogSource.Keyspace == vrs.BinlogSource.Keyspace && + refStream.BinlogSource.Shard == vrs.BinlogSource.Shard { + // Delete the matched item and scan for the next stream. + tabletStreams = append(tabletStreams[:i], tabletStreams[i+1:]...) + continue nextStream + } + } + return nil, fmt.Errorf("streams are mismatched across source shards: %s vs %s", refshard, shard) + } + if len(tabletStreams) != 0 { + return nil, fmt.Errorf("streams are mismatched across source shards: %s vs %s", refshard, shard) + } + } + return streams, nil +} + +func (sm *StreamMigrator) stopSourceStreams(ctx context.Context) error { + stoppedStreams := make(map[string][]*VReplicationStream) + var mu sync.Mutex + err := sm.ts.ForAllSources(func(source *MigrationSource) error { + tabletStreams := sm.streams[source.GetShard().ShardName()] + if len(tabletStreams) == 0 { + return nil + } + query := fmt.Sprintf("update _vt.vreplication set state='Stopped', message='for cutover' where id in %s", VReplicationStreams(tabletStreams).Values()) + _, err := sm.ts.TabletManagerClient().VReplicationExec(ctx, source.GetPrimary().Tablet, query) + if err != nil { + return err + } + tabletStreams, err = sm.readTabletStreams(ctx, source.GetPrimary(), fmt.Sprintf("id in %s", VReplicationStreams(tabletStreams).Values())) + if err != nil { + return err + } + mu.Lock() + defer mu.Unlock() + stoppedStreams[source.GetShard().ShardName()] = tabletStreams + return nil + }) + if err != nil { + return err + } + sm.streams = stoppedStreams + return nil +} + +func (sm *StreamMigrator) syncSourceStreams(ctx context.Context) (map[string]mysql.Position, error) { + stopPositions := make(map[string]mysql.Position) + for _, tabletStreams := range sm.streams { + for _, vrs := range tabletStreams { + key := fmt.Sprintf("%s:%s", vrs.BinlogSource.Keyspace, vrs.BinlogSource.Shard) + pos, ok := stopPositions[key] + if !ok || vrs.Position.AtLeast(pos) { + sm.ts.Logger().Infof("syncSourceStreams setting stopPositions +%s %+v %d", key, vrs.Position, vrs.ID) + stopPositions[key] = vrs.Position + } + } + } + var wg sync.WaitGroup + allErrors := &concurrency.AllErrorRecorder{} + for shard, tabletStreams := range sm.streams { + for _, vrs := range tabletStreams { + key := fmt.Sprintf("%s:%s", vrs.BinlogSource.Keyspace, vrs.BinlogSource.Shard) + pos := stopPositions[key] + sm.ts.Logger().Infof("syncSourceStreams before go func +%s %+v %d", key, pos, vrs.ID) + if vrs.Position.Equal(pos) { + continue + } + wg.Add(1) + go func(vrs *VReplicationStream, shard string, pos mysql.Position) { + defer wg.Done() + sm.ts.Logger().Infof("syncSourceStreams beginning of go func %s %s %+v %d", shard, vrs.BinlogSource.Shard, pos, vrs.ID) + + si, err := sm.ts.TopoServer().GetShard(ctx, sm.ts.SourceKeyspaceName(), shard) + if err != nil { + allErrors.RecordError(err) + return + } + master, err := sm.ts.TopoServer().GetTablet(ctx, si.MasterAlias) + if err != nil { + allErrors.RecordError(err) + return + } + query := fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos='%s', message='synchronizing for cutover' where id=%d", mysql.EncodePosition(pos), vrs.ID) + if _, err := sm.ts.TabletManagerClient().VReplicationExec(ctx, master.Tablet, query); err != nil { + allErrors.RecordError(err) + return + } + sm.ts.Logger().Infof("Waiting for keyspace:shard: %v:%v, position %v", sm.ts.SourceKeyspaceName(), shard, pos) + if err := sm.ts.TabletManagerClient().VReplicationWaitForPos(ctx, master.Tablet, int(vrs.ID), mysql.EncodePosition(pos)); err != nil { + allErrors.RecordError(err) + return + } + sm.ts.Logger().Infof("Position for keyspace:shard: %v:%v reached", sm.ts.SourceKeyspaceName(), shard) + }(vrs, shard, pos) + } + } + wg.Wait() + return stopPositions, allErrors.AggrError(vterrors.Aggregate) +} + +func (sm *StreamMigrator) verifyStreamPositions(ctx context.Context, stopPositions map[string]mysql.Position) ([]string, error) { + stoppedStreams := make(map[string][]*VReplicationStream) + var mu sync.Mutex + err := sm.ts.ForAllSources(func(source *MigrationSource) error { + tabletStreams := sm.streams[source.GetShard().ShardName()] + if len(tabletStreams) == 0 { + return nil + } + tabletStreams, err := sm.readTabletStreams(ctx, source.GetPrimary(), fmt.Sprintf("id in %s", VReplicationStreams(tabletStreams).Values())) + if err != nil { + return err + } + mu.Lock() + defer mu.Unlock() + stoppedStreams[source.GetShard().ShardName()] = tabletStreams + return nil + }) + if err != nil { + return nil, err + } + + // This is not really required because it's not used later. + // But we keep it up-to-date for good measure. + sm.streams = stoppedStreams + + var oneSet []*VReplicationStream + allErrors := &concurrency.AllErrorRecorder{} + for _, tabletStreams := range stoppedStreams { + if oneSet == nil { + oneSet = tabletStreams + } + for _, vrs := range tabletStreams { + key := fmt.Sprintf("%s:%s", vrs.BinlogSource.Keyspace, vrs.BinlogSource.Shard) + pos := stopPositions[key] + if !vrs.Position.Equal(pos) { + allErrors.RecordError(fmt.Errorf("%s: stream %d position: %s does not match %s", key, vrs.ID, mysql.EncodePosition(vrs.Position), mysql.EncodePosition(pos))) + } + } + } + if allErrors.HasErrors() { + return nil, allErrors.AggrError(vterrors.Aggregate) + } + sm.templates, err = sm.templatize(ctx, oneSet) + if err != nil { + // Unreachable: we've already templatized this before. + return nil, err + } + return VReplicationStreams(sm.templates).Workflows(), allErrors.AggrError(vterrors.Aggregate) +} + +/* target streams */ + +func (sm *StreamMigrator) createTargetStreams(ctx context.Context, tmpl []*VReplicationStream) error { + if len(tmpl) == 0 { + return nil + } + + return sm.ts.ForAllTargets(func(target *MigrationTarget) error { + ig := vreplication.NewInsertGenerator(binlogplayer.BlpStopped, target.GetPrimary().DbName()) + tabletStreams := VReplicationStreams(tmpl).Copy().ToSlice() + + for _, vrs := range tabletStreams { + for _, rule := range vrs.BinlogSource.Filter.Rules { + buf := &strings.Builder{} + t := template.Must(template.New("").Parse(rule.Filter)) + if err := t.Execute(buf, key.KeyRangeString(target.GetShard().KeyRange)); err != nil { + return err + } + + rule.Filter = buf.String() + } + + ig.AddRow(vrs.Workflow, vrs.BinlogSource, mysql.EncodePosition(vrs.Position), "", "") + } + + _, err := sm.ts.VReplicationExec(ctx, target.GetPrimary().GetAlias(), ig.String()) + return err + }) +} + +func (sm *StreamMigrator) deleteTargetStreams(ctx context.Context) error { + if len(sm.workflows) == 0 { + return nil + } + + workflows := stringListify(sm.workflows) + err := sm.ts.ForAllTargets(func(target *MigrationTarget) error { + query := fmt.Sprintf("delete from _vt.vreplication where db_name=%s and workflow in (%s)", encodeString(target.GetPrimary().DbName()), workflows) + _, err := sm.ts.VReplicationExec(ctx, target.GetPrimary().Alias, query) + return err + }) + + if err != nil { + sm.logger.Warningf("Could not delete migrated streams: %v", err) + } + + return err +} + +/* templatizing */ + +func (sm *StreamMigrator) templatize(ctx context.Context, tabletStreams []*VReplicationStream) ([]*VReplicationStream, error) { + tabletStreams = VReplicationStreams(tabletStreams).Copy().ToSlice() + var shardedStreams []*VReplicationStream + for _, vrs := range tabletStreams { + streamType := StreamTypeUnknown + for _, rule := range vrs.BinlogSource.Filter.Rules { + typ, err := sm.templatizeRule(ctx, rule) + if err != nil { + return nil, err + } + switch typ { + case StreamTypeSharded: + if streamType == StreamTypeReference { + return nil, fmt.Errorf("cannot migrate streams with a mix of reference and sharded tables: %v", vrs.BinlogSource) + } + streamType = StreamTypeSharded + case StreamTypeReference: + if streamType == StreamTypeSharded { + return nil, fmt.Errorf("cannot migrate streams with a mix of reference and sharded tables: %v", vrs.BinlogSource) + } + streamType = StreamTypeReference + } + } + if streamType == StreamTypeSharded { + shardedStreams = append(shardedStreams, vrs) + } + } + return shardedStreams, nil +} + +// templatizeRule replaces keyrange values with {{.}}. +// This can then be used by go's template package to substitute other keyrange values. +func (sm *StreamMigrator) templatizeRule(ctx context.Context, rule *binlogdatapb.Rule) (StreamType, error) { + vtable, ok := sm.ts.SourceKeyspaceSchema().Tables[rule.Match] + if !ok { + return StreamTypeUnknown, fmt.Errorf("table %v not found in vschema", rule.Match) + } + if vtable.Type == vindexes.TypeReference { + return StreamTypeReference, nil + } + switch { + case rule.Filter == "": + return StreamTypeUnknown, fmt.Errorf("rule %v does not have a select expression in vreplication", rule) + case key.IsKeyRange(rule.Filter): + rule.Filter = "{{.}}" + return StreamTypeSharded, nil + case rule.Filter == vreplication.ExcludeStr: + return StreamTypeUnknown, fmt.Errorf("unexpected rule in vreplication: %v", rule) + default: + err := sm.templatizeKeyRange(ctx, rule) + if err != nil { + return StreamTypeUnknown, err + } + return StreamTypeSharded, nil + } +} + +func (sm *StreamMigrator) templatizeKeyRange(ctx context.Context, rule *binlogdatapb.Rule) error { + statement, err := sqlparser.Parse(rule.Filter) + if err != nil { + return err + } + sel, ok := statement.(*sqlparser.Select) + if !ok { + return fmt.Errorf("unexpected query: %v", rule.Filter) + } + var expr sqlparser.Expr + if sel.Where != nil { + expr = sel.Where.Expr + } + exprs := sqlparser.SplitAndExpression(nil, expr) + for _, subexpr := range exprs { + funcExpr, ok := subexpr.(*sqlparser.FuncExpr) + if !ok || !funcExpr.Name.EqualString("in_keyrange") { + continue + } + var krExpr sqlparser.SelectExpr + switch len(funcExpr.Exprs) { + case 1: + krExpr = funcExpr.Exprs[0] + case 3: + krExpr = funcExpr.Exprs[2] + default: + return fmt.Errorf("unexpected in_keyrange parameters: %v", sqlparser.String(funcExpr)) + } + aliased, ok := krExpr.(*sqlparser.AliasedExpr) + if !ok { + return fmt.Errorf("unexpected in_keyrange parameters: %v", sqlparser.String(funcExpr)) + } + val, ok := aliased.Expr.(*sqlparser.Literal) + if !ok { + return fmt.Errorf("unexpected in_keyrange parameters: %v", sqlparser.String(funcExpr)) + } + if strings.Contains(rule.Filter, "{{") { + return fmt.Errorf("cannot migrate queries that contain '{{' in their string: %s", rule.Filter) + } + val.Val = "{{.}}" + rule.Filter = sqlparser.String(statement) + return nil + } + // There was no in_keyrange expression. Create a new one. + vtable := sm.ts.SourceKeyspaceSchema().Tables[rule.Match] + inkr := &sqlparser.FuncExpr{ + Name: sqlparser.NewColIdent("in_keyrange"), + Exprs: sqlparser.SelectExprs{ + &sqlparser.AliasedExpr{Expr: &sqlparser.ColName{Name: vtable.ColumnVindexes[0].Columns[0]}}, + &sqlparser.AliasedExpr{Expr: sqlparser.NewStrLiteral(vtable.ColumnVindexes[0].Type)}, + &sqlparser.AliasedExpr{Expr: sqlparser.NewStrLiteral("{{.}}")}, + }, + } + sel.AddWhere(inkr) + rule.Filter = sqlparser.String(statement) + return nil +} + +/* misc */ + +func (sm *StreamMigrator) blsIsReference(bls *binlogdatapb.BinlogSource) (bool, error) { + streamType := StreamTypeUnknown + for _, rule := range bls.Filter.Rules { + typ, err := sm.identifyRuleType(rule) + if err != nil { + return false, err + } + switch typ { + case StreamTypeSharded: + if streamType == StreamTypeReference { + return false, fmt.Errorf("cannot reshard streams with a mix of reference and sharded tables: %v", bls) + } + streamType = StreamTypeSharded + case StreamTypeReference: + if streamType == StreamTypeSharded { + return false, fmt.Errorf("cannot reshard streams with a mix of reference and sharded tables: %v", bls) + } + streamType = StreamTypeReference + } + } + return streamType == StreamTypeReference, nil +} + +func (sm *StreamMigrator) identifyRuleType(rule *binlogdatapb.Rule) (StreamType, error) { + vtable, ok := sm.ts.SourceKeyspaceSchema().Tables[rule.Match] + if !ok { + return 0, fmt.Errorf("table %v not found in vschema", rule.Match) + } + if vtable.Type == vindexes.TypeReference { + return StreamTypeReference, nil + } + // In this case, 'sharded' means that it's not a reference + // table. We don't care about any other subtleties. + return StreamTypeSharded, nil +} + +func stringListify(ss []string) string { + var buf strings.Builder + + prefix := "" + for _, s := range ss { + fmt.Fprintf(&buf, "%s%s", prefix, encodeString(s)) + prefix = ", " + } + + return buf.String() +} diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 59ae0d474b2..a2be66c5155 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -29,11 +29,15 @@ import ( "github.com/golang/protobuf/proto" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtgate/evalengine" + "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/tmclient" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) var ( @@ -47,6 +51,38 @@ var ( ErrNoStreams = errors.New("no streams found") ) +// ITrafficSwitcher is a temporary hack to allow us to move streamMigrater out +// of package wrangler without also needing to move trafficSwitcher in the same +// changeset. +// +// After moving TrafficSwitcher to this package, this type should be removed, +// and StreamMigrator should be updated to contain a field of type +// *TrafficSwitcher instead of ITrafficSwitcher. +type ITrafficSwitcher interface { + /* Functions that expose types and behavior contained in *wrangler.Wrangler */ + + TopoServer() *topo.Server + TabletManagerClient() tmclient.TabletManagerClient + Logger() logutil.Logger + // VReplicationExec here is used when we want the (*wrangler.Wrangler) + // implementation, which does a topo lookup on the tablet alias before + // calling the underlying TabletManagerClient RPC. + VReplicationExec(ctx context.Context, alias *topodatapb.TabletAlias, query string) (*querypb.QueryResult, error) + + /* Functions that expose fields on the *wrangler.trafficSwitcher */ + + MigrationType() binlogdatapb.MigrationType + ReverseWorkflowName() string + SourceKeyspaceName() string + SourceKeyspaceSchema() *vindexes.KeyspaceSchema + WorkflowName() string + + /* Functions that *wrangler.trafficSwitcher implements */ + + ForAllSources(f func(source *MigrationSource) error) error + ForAllTargets(f func(target *MigrationTarget) error) error +} + // TargetInfo contains the metadata for a set of targets involved in a workflow. type TargetInfo struct { Targets map[string]*MigrationTarget From adc065e60a7f132c1160a4775c632f7724de8fba Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sat, 24 Apr 2021 16:06:43 -0400 Subject: [PATCH 2/6] Formatting changes Signed-off-by: Andrew Mason --- go/vt/vtctl/workflow/stream_migrator.go | 191 ++++++++++++++++++------ 1 file changed, 144 insertions(+), 47 deletions(-) diff --git a/go/vt/vtctl/workflow/stream_migrator.go b/go/vt/vtctl/workflow/stream_migrator.go index a065e161d5f..f6f627d7622 100644 --- a/go/vt/vtctl/workflow/stream_migrator.go +++ b/go/vt/vtctl/workflow/stream_migrator.go @@ -105,20 +105,24 @@ func StreamMigratorFinalize(ctx context.Context, ts ITrafficSwitcher, workflows if len(workflows) == 0 { return nil } + workflowList := stringListify(workflows) err := ts.ForAllSources(func(source *MigrationSource) error { query := fmt.Sprintf("delete from _vt.vreplication where db_name=%s and workflow in (%s)", encodeString(source.GetPrimary().DbName()), workflowList) _, err := ts.VReplicationExec(ctx, source.GetPrimary().Alias, query) return err }) + if err != nil { return err } + err = ts.ForAllTargets(func(target *MigrationTarget) error { query := fmt.Sprintf("update _vt.vreplication set state='Running' where db_name=%s and workflow in (%s)", encodeString(target.GetPrimary().DbName()), workflowList) _, err := ts.VReplicationExec(ctx, target.GetPrimary().Alias, query) return err }) + return err } @@ -172,46 +176,56 @@ func (sm *StreamMigrator) StopStreams(ctx context.Context) ([]string, error) { func (sm *StreamMigrator) readTabletStreams(ctx context.Context, ti *topo.TabletInfo, constraint string) ([]*VReplicationStream, error) { var query string - if constraint == "" { + + switch constraint { + case "": query = fmt.Sprintf("select id, workflow, source, pos from _vt.vreplication where db_name=%s and workflow != %s", encodeString(ti.DbName()), encodeString(sm.ts.ReverseWorkflowName())) - } else { + default: query = fmt.Sprintf("select id, workflow, source, pos from _vt.vreplication where db_name=%s and workflow != %s and %s", encodeString(ti.DbName()), encodeString(sm.ts.ReverseWorkflowName()), constraint) } + p3qr, err := sm.ts.TabletManagerClient().VReplicationExec(ctx, ti.Tablet, query) if err != nil { return nil, err } - qr := sqltypes.Proto3ToResult(p3qr) + qr := sqltypes.Proto3ToResult(p3qr) tabletStreams := make([]*VReplicationStream, 0, len(qr.Rows)) + for _, row := range qr.Rows { id, err := evalengine.ToInt64(row[0]) if err != nil { return nil, err } + workflowName := row[1].ToString() - if workflowName == "" { + switch workflowName { + case "": return nil, fmt.Errorf("VReplication streams must have named workflows for migration: shard: %s:%s, stream: %d", ti.Keyspace, ti.Shard, id) - } - if workflowName == sm.ts.WorkflowName() { + case sm.ts.WorkflowName(): return nil, fmt.Errorf("VReplication stream has the same workflow name as the resharding workflow: shard: %s:%s, stream: %d", ti.Keyspace, ti.Shard, id) } + var bls binlogdatapb.BinlogSource if err := proto.UnmarshalText(row[2].ToString(), &bls); err != nil { return nil, err } + isReference, err := sm.blsIsReference(&bls) if err != nil { return nil, vterrors.Wrap(err, "blsIsReference") } + if isReference { sm.ts.Logger().Infof("readTabletStreams: ignoring reference table %+v", bls) continue } + pos, err := mysql.DecodePosition(row[3].ToString()) if err != nil { return nil, err } + tabletStreams = append(tabletStreams, &VReplicationStream{ ID: uint32(id), Workflow: workflowName, @@ -225,8 +239,11 @@ func (sm *StreamMigrator) readTabletStreams(ctx context.Context, ti *topo.Tablet /* source streams */ func (sm *StreamMigrator) readSourceStreams(ctx context.Context, cancelMigrate bool) (map[string][]*VReplicationStream, error) { - streams := make(map[string][]*VReplicationStream) - var mu sync.Mutex + var ( + mu sync.Mutex + streams = make(map[string][]*VReplicationStream) + ) + err := sm.ts.ForAllSources(func(source *MigrationSource) error { if !cancelMigrate { // This flow protects us from the following scenario: When we create streams, @@ -244,23 +261,28 @@ func (sm *StreamMigrator) readSourceStreams(ctx context.Context, cancelMigrate b if err != nil { return err } + if len(stoppedStreams) != 0 { return fmt.Errorf("cannot migrate until all streams are running: %s: %d", source.GetShard().ShardName(), source.GetPrimary().Alias.Uid) } } + tabletStreams, err := sm.readTabletStreams(ctx, source.GetPrimary(), "") if err != nil { return err } + if len(tabletStreams) == 0 { // No VReplication is running. So, we have no work to do. return nil } - p3qr, err := sm.ts.TabletManagerClient().VReplicationExec(ctx, source.GetPrimary().Tablet, fmt.Sprintf("select vrepl_id from _vt.copy_state where vrepl_id in %s", VReplicationStreams(tabletStreams).Values())) - if err != nil { + + query := fmt.Sprintf("select vrepl_id from _vt.copy_state where vrepl_id in %s", VReplicationStreams(tabletStreams).Values()) + p3qr, err := sm.ts.TabletManagerClient().VReplicationExec(ctx, source.GetPrimary().Tablet, query) + switch { + case err != nil: return err - } - if len(p3qr.Rows) != 0 { + case len(p3qr.Rows) != 0: return fmt.Errorf("cannot migrate while vreplication streams in source shards are still copying: %s", source.GetShard().ShardName()) } @@ -269,94 +291,127 @@ func (sm *StreamMigrator) readSourceStreams(ctx context.Context, cancelMigrate b streams[source.GetShard().ShardName()] = tabletStreams return nil }) + if err != nil { return nil, err } + // Validate that streams match across source shards. - streams2 := make(map[string][]*VReplicationStream) - var reference []*VReplicationStream - var refshard string + var ( + reference []*VReplicationStream + refshard string + streams2 = make(map[string][]*VReplicationStream) + ) + for k, v := range streams { if reference == nil { refshard = k reference = v continue } + streams2[k] = append([]*VReplicationStream(nil), v...) } + for shard, tabletStreams := range streams2 { - nextStream: - for _, refStream := range reference { - for i := 0; i < len(tabletStreams); i++ { - vrs := tabletStreams[i] - if refStream.Workflow == vrs.Workflow && - refStream.BinlogSource.Keyspace == vrs.BinlogSource.Keyspace && - refStream.BinlogSource.Shard == vrs.BinlogSource.Shard { - // Delete the matched item and scan for the next stream. - tabletStreams = append(tabletStreams[:i], tabletStreams[i+1:]...) - continue nextStream + err := func() error { + for _, refStream := range reference { + for i := 0; i < len(tabletStreams); i++ { + vrs := tabletStreams[i] + + if refStream.Workflow == vrs.Workflow && + refStream.BinlogSource.Keyspace == vrs.BinlogSource.Keyspace && + refStream.BinlogSource.Shard == vrs.BinlogSource.Shard { + // Delete the matched item and scan for the next stream. + tabletStreams = append(tabletStreams[:i], tabletStreams[i+1:]...) + return nil + } } + + return fmt.Errorf("streams are mismatched across source shards: %s vs %s", refshard, shard) } - return nil, fmt.Errorf("streams are mismatched across source shards: %s vs %s", refshard, shard) - } - if len(tabletStreams) != 0 { - return nil, fmt.Errorf("streams are mismatched across source shards: %s vs %s", refshard, shard) + + if len(tabletStreams) != 0 { + return fmt.Errorf("streams are mismatched across source shards: %s vs %s", refshard, shard) + } + + return nil + }() + + if err != nil { + return nil, err } } + return streams, nil } func (sm *StreamMigrator) stopSourceStreams(ctx context.Context) error { - stoppedStreams := make(map[string][]*VReplicationStream) - var mu sync.Mutex + var ( + mu sync.Mutex + stoppedStreams = make(map[string][]*VReplicationStream) + ) + err := sm.ts.ForAllSources(func(source *MigrationSource) error { tabletStreams := sm.streams[source.GetShard().ShardName()] if len(tabletStreams) == 0 { return nil } + query := fmt.Sprintf("update _vt.vreplication set state='Stopped', message='for cutover' where id in %s", VReplicationStreams(tabletStreams).Values()) _, err := sm.ts.TabletManagerClient().VReplicationExec(ctx, source.GetPrimary().Tablet, query) if err != nil { return err } + tabletStreams, err = sm.readTabletStreams(ctx, source.GetPrimary(), fmt.Sprintf("id in %s", VReplicationStreams(tabletStreams).Values())) if err != nil { return err } + mu.Lock() defer mu.Unlock() stoppedStreams[source.GetShard().ShardName()] = tabletStreams + return nil }) + if err != nil { return err } + sm.streams = stoppedStreams return nil } func (sm *StreamMigrator) syncSourceStreams(ctx context.Context) (map[string]mysql.Position, error) { stopPositions := make(map[string]mysql.Position) + for _, tabletStreams := range sm.streams { for _, vrs := range tabletStreams { key := fmt.Sprintf("%s:%s", vrs.BinlogSource.Keyspace, vrs.BinlogSource.Shard) - pos, ok := stopPositions[key] - if !ok || vrs.Position.AtLeast(pos) { + if pos, ok := stopPositions[key]; !ok || vrs.Position.AtLeast(pos) { sm.ts.Logger().Infof("syncSourceStreams setting stopPositions +%s %+v %d", key, vrs.Position, vrs.ID) stopPositions[key] = vrs.Position } } } - var wg sync.WaitGroup - allErrors := &concurrency.AllErrorRecorder{} + + var ( + wg sync.WaitGroup + allErrors concurrency.AllErrorRecorder + ) + for shard, tabletStreams := range sm.streams { for _, vrs := range tabletStreams { key := fmt.Sprintf("%s:%s", vrs.BinlogSource.Keyspace, vrs.BinlogSource.Shard) pos := stopPositions[key] sm.ts.Logger().Infof("syncSourceStreams before go func +%s %+v %d", key, pos, vrs.ID) + if vrs.Position.Equal(pos) { continue } + wg.Add(1) go func(vrs *VReplicationStream, shard string, pos mysql.Position) { defer wg.Done() @@ -367,46 +422,59 @@ func (sm *StreamMigrator) syncSourceStreams(ctx context.Context) (map[string]mys allErrors.RecordError(err) return } - master, err := sm.ts.TopoServer().GetTablet(ctx, si.MasterAlias) + + primary, err := sm.ts.TopoServer().GetTablet(ctx, si.MasterAlias) if err != nil { allErrors.RecordError(err) return } + query := fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos='%s', message='synchronizing for cutover' where id=%d", mysql.EncodePosition(pos), vrs.ID) - if _, err := sm.ts.TabletManagerClient().VReplicationExec(ctx, master.Tablet, query); err != nil { + if _, err := sm.ts.TabletManagerClient().VReplicationExec(ctx, primary.Tablet, query); err != nil { allErrors.RecordError(err) return } + sm.ts.Logger().Infof("Waiting for keyspace:shard: %v:%v, position %v", sm.ts.SourceKeyspaceName(), shard, pos) - if err := sm.ts.TabletManagerClient().VReplicationWaitForPos(ctx, master.Tablet, int(vrs.ID), mysql.EncodePosition(pos)); err != nil { + if err := sm.ts.TabletManagerClient().VReplicationWaitForPos(ctx, primary.Tablet, int(vrs.ID), mysql.EncodePosition(pos)); err != nil { allErrors.RecordError(err) return } + sm.ts.Logger().Infof("Position for keyspace:shard: %v:%v reached", sm.ts.SourceKeyspaceName(), shard) }(vrs, shard, pos) } } + wg.Wait() + return stopPositions, allErrors.AggrError(vterrors.Aggregate) } func (sm *StreamMigrator) verifyStreamPositions(ctx context.Context, stopPositions map[string]mysql.Position) ([]string, error) { - stoppedStreams := make(map[string][]*VReplicationStream) - var mu sync.Mutex + var ( + mu sync.Mutex + stoppedStreams = make(map[string][]*VReplicationStream) + ) + err := sm.ts.ForAllSources(func(source *MigrationSource) error { tabletStreams := sm.streams[source.GetShard().ShardName()] if len(tabletStreams) == 0 { return nil } + tabletStreams, err := sm.readTabletStreams(ctx, source.GetPrimary(), fmt.Sprintf("id in %s", VReplicationStreams(tabletStreams).Values())) if err != nil { return err } + mu.Lock() defer mu.Unlock() stoppedStreams[source.GetShard().ShardName()] = tabletStreams + return nil }) + if err != nil { return nil, err } @@ -415,28 +483,34 @@ func (sm *StreamMigrator) verifyStreamPositions(ctx context.Context, stopPositio // But we keep it up-to-date for good measure. sm.streams = stoppedStreams - var oneSet []*VReplicationStream - allErrors := &concurrency.AllErrorRecorder{} + var ( + oneSet []*VReplicationStream + allErrors concurrency.AllErrorRecorder + ) + for _, tabletStreams := range stoppedStreams { if oneSet == nil { oneSet = tabletStreams } + for _, vrs := range tabletStreams { key := fmt.Sprintf("%s:%s", vrs.BinlogSource.Keyspace, vrs.BinlogSource.Shard) - pos := stopPositions[key] - if !vrs.Position.Equal(pos) { + if pos := stopPositions[key]; !vrs.Position.Equal(pos) { allErrors.RecordError(fmt.Errorf("%s: stream %d position: %s does not match %s", key, vrs.ID, mysql.EncodePosition(vrs.Position), mysql.EncodePosition(pos))) } } } + if allErrors.HasErrors() { return nil, allErrors.AggrError(vterrors.Aggregate) } + sm.templates, err = sm.templatize(ctx, oneSet) if err != nil { // Unreachable: we've already templatized this before. return nil, err } + return VReplicationStreams(sm.templates).Workflows(), allErrors.AggrError(vterrors.Aggregate) } @@ -454,6 +528,7 @@ func (sm *StreamMigrator) createTargetStreams(ctx context.Context, tmpl []*VRepl for _, vrs := range tabletStreams { for _, rule := range vrs.BinlogSource.Filter.Rules { buf := &strings.Builder{} + t := template.Must(template.New("").Parse(rule.Filter)) if err := t.Execute(buf, key.KeyRangeString(target.GetShard().KeyRange)); err != nil { return err @@ -492,15 +567,18 @@ func (sm *StreamMigrator) deleteTargetStreams(ctx context.Context) error { /* templatizing */ func (sm *StreamMigrator) templatize(ctx context.Context, tabletStreams []*VReplicationStream) ([]*VReplicationStream, error) { - tabletStreams = VReplicationStreams(tabletStreams).Copy().ToSlice() var shardedStreams []*VReplicationStream + + tabletStreams = VReplicationStreams(tabletStreams).Copy().ToSlice() for _, vrs := range tabletStreams { streamType := StreamTypeUnknown + for _, rule := range vrs.BinlogSource.Filter.Rules { typ, err := sm.templatizeRule(ctx, rule) if err != nil { return nil, err } + switch typ { case StreamTypeSharded: if streamType == StreamTypeReference { @@ -514,10 +592,12 @@ func (sm *StreamMigrator) templatize(ctx context.Context, tabletStreams []*VRepl streamType = StreamTypeReference } } + if streamType == StreamTypeSharded { shardedStreams = append(shardedStreams, vrs) } } + return shardedStreams, nil } @@ -528,9 +608,11 @@ func (sm *StreamMigrator) templatizeRule(ctx context.Context, rule *binlogdatapb if !ok { return StreamTypeUnknown, fmt.Errorf("table %v not found in vschema", rule.Match) } + if vtable.Type == vindexes.TypeReference { return StreamTypeReference, nil } + switch { case rule.Filter == "": return StreamTypeUnknown, fmt.Errorf("rule %v does not have a select expression in vreplication", rule) @@ -540,10 +622,10 @@ func (sm *StreamMigrator) templatizeRule(ctx context.Context, rule *binlogdatapb case rule.Filter == vreplication.ExcludeStr: return StreamTypeUnknown, fmt.Errorf("unexpected rule in vreplication: %v", rule) default: - err := sm.templatizeKeyRange(ctx, rule) - if err != nil { + if err := sm.templatizeKeyRange(ctx, rule); err != nil { return StreamTypeUnknown, err } + return StreamTypeSharded, nil } } @@ -553,20 +635,24 @@ func (sm *StreamMigrator) templatizeKeyRange(ctx context.Context, rule *binlogda if err != nil { return err } + sel, ok := statement.(*sqlparser.Select) if !ok { return fmt.Errorf("unexpected query: %v", rule.Filter) } + var expr sqlparser.Expr if sel.Where != nil { expr = sel.Where.Expr } + exprs := sqlparser.SplitAndExpression(nil, expr) for _, subexpr := range exprs { funcExpr, ok := subexpr.(*sqlparser.FuncExpr) if !ok || !funcExpr.Name.EqualString("in_keyrange") { continue } + var krExpr sqlparser.SelectExpr switch len(funcExpr.Exprs) { case 1: @@ -576,21 +662,26 @@ func (sm *StreamMigrator) templatizeKeyRange(ctx context.Context, rule *binlogda default: return fmt.Errorf("unexpected in_keyrange parameters: %v", sqlparser.String(funcExpr)) } + aliased, ok := krExpr.(*sqlparser.AliasedExpr) if !ok { return fmt.Errorf("unexpected in_keyrange parameters: %v", sqlparser.String(funcExpr)) } + val, ok := aliased.Expr.(*sqlparser.Literal) if !ok { return fmt.Errorf("unexpected in_keyrange parameters: %v", sqlparser.String(funcExpr)) } + if strings.Contains(rule.Filter, "{{") { return fmt.Errorf("cannot migrate queries that contain '{{' in their string: %s", rule.Filter) } + val.Val = "{{.}}" rule.Filter = sqlparser.String(statement) return nil } + // There was no in_keyrange expression. Create a new one. vtable := sm.ts.SourceKeyspaceSchema().Tables[rule.Match] inkr := &sqlparser.FuncExpr{ @@ -615,19 +706,23 @@ func (sm *StreamMigrator) blsIsReference(bls *binlogdatapb.BinlogSource) (bool, if err != nil { return false, err } + switch typ { case StreamTypeSharded: if streamType == StreamTypeReference { return false, fmt.Errorf("cannot reshard streams with a mix of reference and sharded tables: %v", bls) } + streamType = StreamTypeSharded case StreamTypeReference: if streamType == StreamTypeSharded { return false, fmt.Errorf("cannot reshard streams with a mix of reference and sharded tables: %v", bls) } + streamType = StreamTypeReference } } + return streamType == StreamTypeReference, nil } @@ -636,9 +731,11 @@ func (sm *StreamMigrator) identifyRuleType(rule *binlogdatapb.Rule) (StreamType, if !ok { return 0, fmt.Errorf("table %v not found in vschema", rule.Match) } + if vtable.Type == vindexes.TypeReference { return StreamTypeReference, nil } + // In this case, 'sharded' means that it's not a reference // table. We don't care about any other subtleties. return StreamTypeSharded, nil From c232eb0250311c6ea4ac781857a933465439eef8 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sun, 25 Apr 2021 12:07:23 -0400 Subject: [PATCH 3/6] Fix break behavior Signed-off-by: Andrew Mason --- go/vt/vtctl/workflow/stream_migrator.go | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/go/vt/vtctl/workflow/stream_migrator.go b/go/vt/vtctl/workflow/stream_migrator.go index f6f627d7622..f0f49cc7d65 100644 --- a/go/vt/vtctl/workflow/stream_migrator.go +++ b/go/vt/vtctl/workflow/stream_migrator.go @@ -314,8 +314,8 @@ func (sm *StreamMigrator) readSourceStreams(ctx context.Context, cancelMigrate b } for shard, tabletStreams := range streams2 { - err := func() error { - for _, refStream := range reference { + for _, refStream := range reference { + err := func() error { for i := 0; i < len(tabletStreams); i++ { vrs := tabletStreams[i] @@ -329,17 +329,15 @@ func (sm *StreamMigrator) readSourceStreams(ctx context.Context, cancelMigrate b } return fmt.Errorf("streams are mismatched across source shards: %s vs %s", refshard, shard) - } + }() - if len(tabletStreams) != 0 { - return fmt.Errorf("streams are mismatched across source shards: %s vs %s", refshard, shard) + if err != nil { + return nil, err } + } - return nil - }() - - if err != nil { - return nil, err + if len(tabletStreams) != 0 { + return nil, fmt.Errorf("streams are mismatched across source shards: %s vs %s", refshard, shard) } } From 872fa4610a3794f6c1ce34526a12c2514415dceb Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sun, 25 Apr 2021 12:07:53 -0400 Subject: [PATCH 4/6] Begin migrating StreamMigrator tests to `package workflow` Signed-off-by: Andrew Mason --- go/vt/vtctl/workflow/stream_migrator_test.go | 348 ++++++++++++++++++ go/vt/vtctl/workflow/traffic_switcher_test.go | 11 + 2 files changed, 359 insertions(+) create mode 100644 go/vt/vtctl/workflow/stream_migrator_test.go diff --git a/go/vt/vtctl/workflow/stream_migrator_test.go b/go/vt/vtctl/workflow/stream_migrator_test.go new file mode 100644 index 00000000000..6b515679e77 --- /dev/null +++ b/go/vt/vtctl/workflow/stream_migrator_test.go @@ -0,0 +1,348 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "context" + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/proto/vschema" + "vitess.io/vitess/go/vt/vtgate/vindexes" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" +) + +func TestTemplatize(t *testing.T) { + tests := []struct { + in []*VReplicationStream + out string + err string + }{{ + // First test contains all fields. + in: []*VReplicationStream{{ + ID: 1, + Workflow: "test", + BinlogSource: &binlogdatapb.BinlogSource{ + Keyspace: "ks", + Shard: "80-", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1 where in_keyrange('-80')", + }}, + }, + }, + }}, + out: `[{"ID":1,"Workflow":"test","BinlogSource":{"keyspace":"ks","shard":"80-","filter":{"rules":[{"match":"t1","filter":"select * from t1 where in_keyrange('{{.}}')"}]}}}]`, + }, { + // Reference table. + in: []*VReplicationStream{{ + BinlogSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "ref", + Filter: "", + }}, + }, + }, + }}, + out: "", + }, { + // Sharded table. + in: []*VReplicationStream{{ + BinlogSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "-80", + }}, + }, + }, + }}, + out: `[{"ID":0,"Workflow":"","BinlogSource":{"filter":{"rules":[{"match":"t1","filter":"{{.}}"}]}}}]`, + }, { + // table not found + in: []*VReplicationStream{{ + BinlogSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t3", + }}, + }, + }, + }}, + err: `table t3 not found in vschema`, + }, { + // sharded table with no filter + in: []*VReplicationStream{{ + BinlogSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + }}, + }, + }, + }}, + err: `rule match:"t1" does not have a select expression in vreplication`, + }, { + // Excluded table. + in: []*VReplicationStream{{ + BinlogSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: vreplication.ExcludeStr, + }}, + }, + }, + }}, + err: `unexpected rule in vreplication: match:"t1" filter:"exclude" `, + }, { + // Sharded table and ref table + in: []*VReplicationStream{{ + BinlogSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "-80", + }, { + Match: "ref", + Filter: "", + }}, + }, + }, + }}, + err: `cannot migrate streams with a mix of reference and sharded tables: filter: rules: > `, + }, { + // Ref table and sharded table (different code path) + in: []*VReplicationStream{{ + BinlogSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "ref", + Filter: "", + }, { + Match: "t2", + Filter: "-80", + }}, + }, + }, + }}, + err: `cannot migrate streams with a mix of reference and sharded tables: filter: rules: > `, + }, { + // Ref table with select expression + in: []*VReplicationStream{{ + BinlogSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "ref", + Filter: "select * from t1", + }}, + }, + }, + }}, + out: "", + }, { + // Select expresstion with no keyrange value + in: []*VReplicationStream{{ + BinlogSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1", + }}, + }, + }, + }}, + out: `[{"ID":0,"Workflow":"","BinlogSource":{"filter":{"rules":[{"match":"t1","filter":"select * from t1 where in_keyrange(c1, 'hash', '{{.}}')"}]}}}]`, + }, { + // Select expresstion with one keyrange value + in: []*VReplicationStream{{ + BinlogSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1 where in_keyrange('-80')", + }}, + }, + }, + }}, + out: `[{"ID":0,"Workflow":"","BinlogSource":{"filter":{"rules":[{"match":"t1","filter":"select * from t1 where in_keyrange('{{.}}')"}]}}}]`, + }, { + // Select expresstion with three keyrange values + in: []*VReplicationStream{{ + BinlogSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1 where in_keyrange(col, vdx, '-80')", + }}, + }, + }, + }}, + out: `[{"ID":0,"Workflow":"","BinlogSource":{"filter":{"rules":[{"match":"t1","filter":"select * from t1 where in_keyrange(col, vdx, '{{.}}')"}]}}}]`, + }, { + // syntax error + in: []*VReplicationStream{{ + BinlogSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "bad syntax", + }}, + }, + }, + }}, + err: "syntax error at position 4 near 'bad'", + }, { + // invalid statement + in: []*VReplicationStream{{ + BinlogSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "update t set a=1", + }}, + }, + }, + }}, + err: "unexpected query: update t set a=1", + }, { + // invalid in_keyrange + in: []*VReplicationStream{{ + BinlogSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1 where in_keyrange(col, vdx, '-80', extra)", + }}, + }, + }, + }}, + err: "unexpected in_keyrange parameters: in_keyrange(col, vdx, '-80', extra)", + }, { + // * in_keyrange + in: []*VReplicationStream{{ + BinlogSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1 where in_keyrange(*)", + }}, + }, + }, + }}, + err: "unexpected in_keyrange parameters: in_keyrange(*)", + }, { + // non-string in_keyrange + in: []*VReplicationStream{{ + BinlogSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1 where in_keyrange(aa)", + }}, + }, + }, + }}, + err: "unexpected in_keyrange parameters: in_keyrange(aa)", + }, { + // '{{' in query + in: []*VReplicationStream{{ + BinlogSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select '{{' from t1 where in_keyrange('-80')", + }}, + }, + }, + }}, + err: "cannot migrate queries that contain '{{' in their string: select '{{' from t1 where in_keyrange('-80')", + }} + vs := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschema.Vindex{ + "thash": { + Type: "hash", + }, + }, + Tables: map[string]*vschema.Table{ + "t1": { + ColumnVindexes: []*vschema.ColumnVindex{{ + Columns: []string{"c1"}, + Name: "thash", + }}, + }, + "t2": { + ColumnVindexes: []*vschema.ColumnVindex{{ + Columns: []string{"c1"}, + Name: "thash", + }}, + }, + "ref": { + Type: vindexes.TypeReference, + }, + }, + } + ksschema, err := vindexes.BuildKeyspaceSchema(vs, "ks") + require.NoError(t, err, "could not create test keyspace %+v", vs) + + ts := &testTrafficSwitcher{ + sourceKeyspaceSchema: ksschema, + } + for _, tt := range tests { + sm := &StreamMigrator{ts: ts} + out, err := sm.templatize(context.Background(), tt.in) + if tt.err != "" { + assert.Error(t, err, "templatize(%v) expected to get err=%s, got %+v", stringifyVRS(tt.in), tt.err, err) + } + + got := stringifyVRS(out) + assert.Equal(t, tt.out, got, "templatize(%v) mismatch", stringifyVRS(tt.in)) + } +} + +func stringifyVRS(streams []*VReplicationStream) string { + if len(streams) == 0 { + return "" + } + + type testVRS struct { + ID uint32 + Workflow string + BinlogSource *binlogdatapb.BinlogSource + } + + converted := make([]*testVRS, len(streams)) + for i, stream := range streams { + converted[i] = &testVRS{ + ID: stream.ID, + Workflow: stream.Workflow, + BinlogSource: stream.BinlogSource, + } + } + + b, _ := json.Marshal(converted) + return string(b) +} diff --git a/go/vt/vtctl/workflow/traffic_switcher_test.go b/go/vt/vtctl/workflow/traffic_switcher_test.go index 0c67e7a1879..447e47d7490 100644 --- a/go/vt/vtctl/workflow/traffic_switcher_test.go +++ b/go/vt/vtctl/workflow/traffic_switcher_test.go @@ -20,8 +20,19 @@ import ( "testing" "github.com/stretchr/testify/assert" + + "vitess.io/vitess/go/vt/vtgate/vindexes" ) +type testTrafficSwitcher struct { + ITrafficSwitcher + sourceKeyspaceSchema *vindexes.KeyspaceSchema +} + +func (tts *testTrafficSwitcher) SourceKeyspaceSchema() *vindexes.KeyspaceSchema { + return tts.sourceKeyspaceSchema +} + func TestReverseWorkflowName(t *testing.T) { tests := []struct { in string From 5fb091dbfbffd2a01e4958717bbb7269169ecaa9 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sun, 25 Apr 2021 16:28:13 -0400 Subject: [PATCH 5/6] Migrate the rest of streamMigrater to `package workflow` Delegate the external interface used by other components in `package wrangler` to use the `workflow.StreamMigrator` under the hood. Also, delete the templatize tests, but keep the other streamMigrater tests, because they invoke streamMigrater only indirectly. Signed-off-by: Andrew Mason --- go/vt/vtctl/workflow/stream_migrator.go | 22 +- go/vt/wrangler/resharder.go | 24 +- go/vt/wrangler/stream_migrater.go | 590 +----------------------- go/vt/wrangler/stream_migrater_test.go | 326 ------------- go/vt/wrangler/switcher.go | 11 +- go/vt/wrangler/switcher_dry_run.go | 16 +- go/vt/wrangler/switcher_interface.go | 11 +- go/vt/wrangler/traffic_switcher.go | 50 +- 8 files changed, 105 insertions(+), 945 deletions(-) diff --git a/go/vt/vtctl/workflow/stream_migrator.go b/go/vt/vtctl/workflow/stream_migrator.go index f0f49cc7d65..be59038b022 100644 --- a/go/vt/vtctl/workflow/stream_migrator.go +++ b/go/vt/vtctl/workflow/stream_migrator.go @@ -41,9 +41,13 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) -// (TODO:@ajm188) Does this need to be exported? +// StreamType is an enum representing the kind of stream. +// +// (TODO:@ajm188) This should be made package-private once the last references +// in package wrangler are removed. type StreamType int +// StreamType values. const ( StreamTypeUnknown = StreamType(iota) StreamTypeSharded @@ -126,6 +130,22 @@ func StreamMigratorFinalize(ctx context.Context, ts ITrafficSwitcher, workflows return err } +// Streams returns a deep-copy of the StreamMigrator's streams map. +func (sm *StreamMigrator) Streams() map[string][]*VReplicationStream { + streams := make(map[string][]*VReplicationStream, len(sm.streams)) + + for k, v := range sm.streams { + streams[k] = VReplicationStreams(v).Copy().ToSlice() + } + + return streams +} + +// Templates returns a copy of the StreamMigrator's template streams. +func (sm *StreamMigrator) Templates() []*VReplicationStream { + return VReplicationStreams(sm.templates).Copy().ToSlice() +} + func (sm *StreamMigrator) CancelMigration(ctx context.Context) { if sm.streams == nil { return diff --git a/go/vt/wrangler/resharder.go b/go/vt/wrangler/resharder.go index 27de39c859b..bfff746dbb7 100644 --- a/go/vt/wrangler/resharder.go +++ b/go/vt/wrangler/resharder.go @@ -23,6 +23,7 @@ import ( "time" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/vtctl/workflow" "github.com/golang/protobuf/proto" "github.com/pkg/errors" @@ -243,39 +244,40 @@ func (rs *resharder) readRefStreams(ctx context.Context) error { // blsIsReference is partially copied from streamMigrater.templatize. // It reuses the constants from that function also. func (rs *resharder) blsIsReference(bls *binlogdatapb.BinlogSource) (bool, error) { - streamType := unknown + streamType := workflow.StreamTypeUnknown for _, rule := range bls.Filter.Rules { typ, err := rs.identifyRuleType(rule) if err != nil { return false, err } + switch typ { - case sharded: - if streamType == reference { + case workflow.StreamTypeSharded: + if streamType == workflow.StreamTypeReference { return false, fmt.Errorf("cannot reshard streams with a mix of reference and sharded tables: %v", bls) } - streamType = sharded - case reference: - if streamType == sharded { + streamType = workflow.StreamTypeSharded + case workflow.StreamTypeReference: + if streamType == workflow.StreamTypeSharded { return false, fmt.Errorf("cannot reshard streams with a mix of reference and sharded tables: %v", bls) } - streamType = reference + streamType = workflow.StreamTypeReference } } - return streamType == reference, nil + return streamType == workflow.StreamTypeReference, nil } -func (rs *resharder) identifyRuleType(rule *binlogdatapb.Rule) (int, error) { +func (rs *resharder) identifyRuleType(rule *binlogdatapb.Rule) (workflow.StreamType, error) { vtable, ok := rs.vschema.Tables[rule.Match] if !ok { return 0, fmt.Errorf("table %v not found in vschema", rule.Match) } if vtable.Type == vindexes.TypeReference { - return reference, nil + return workflow.StreamTypeReference, nil } // In this case, 'sharded' means that it's not a reference // table. We don't care about any other subtleties. - return sharded, nil + return workflow.StreamTypeSharded, nil } func (rs *resharder) copySchema(ctx context.Context) error { diff --git a/go/vt/wrangler/stream_migrater.go b/go/vt/wrangler/stream_migrater.go index 6244d17c210..97a5257789f 100644 --- a/go/vt/wrangler/stream_migrater.go +++ b/go/vt/wrangler/stream_migrater.go @@ -18,615 +18,39 @@ package wrangler import ( "context" - "fmt" - "strings" - "sync" - "text/template" - "github.com/golang/protobuf/proto" - - "vitess.io/vitess/go/mysql" - "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/vt/binlog/binlogplayer" - "vitess.io/vitess/go/vt/concurrency" - "vitess.io/vitess/go/vt/key" - "vitess.io/vitess/go/vt/sqlparser" - "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtctl/workflow" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/vtgate/evalengine" - "vitess.io/vitess/go/vt/vtgate/vindexes" - "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" - - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) type streamMigrater struct { - streams map[string][]*workflow.VReplicationStream - workflows []string - templates []*workflow.VReplicationStream - ts *trafficSwitcher + sm *workflow.StreamMigrator } func buildStreamMigrater(ctx context.Context, ts *trafficSwitcher, cancelMigrate bool) (*streamMigrater, error) { - sm := &streamMigrater{ts: ts} - if sm.ts.migrationType == binlogdatapb.MigrationType_TABLES { - // Source streams should be stopped only for shard migrations. - return sm, nil - } - streams, err := sm.readSourceStreams(ctx, cancelMigrate) + sm, err := workflow.BuildStreamMigrator(ctx, ts, cancelMigrate) if err != nil { return nil, err } - sm.streams = streams - // Loop executes only once. - for _, tabletStreams := range sm.streams { - tmpl, err := sm.templatize(ctx, tabletStreams) - if err != nil { - return nil, err - } - sm.workflows = tabletStreamWorkflows(tmpl) - return sm, nil - } - return sm, nil -} - -func (sm *streamMigrater) readSourceStreams(ctx context.Context, cancelMigrate bool) (map[string][]*workflow.VReplicationStream, error) { - streams := make(map[string][]*workflow.VReplicationStream) - var mu sync.Mutex - err := sm.ts.forAllSources(func(source *workflow.MigrationSource) error { - if !cancelMigrate { - // This flow protects us from the following scenario: When we create streams, - // we always do it in two phases. We start them off as Stopped, and then - // update them to Running. If such an operation fails, we may be left with - // lingering Stopped streams. They should actually be cleaned up by the user. - // In the current workflow, we stop streams and restart them. - // Once existing streams are stopped, there will be confusion about which of - // them can be restarted because they will be no different from the lingering streams. - // To prevent this confusion, we first check if there are any stopped streams. - // If so, we request the operator to clean them up, or restart them before going ahead. - // This allows us to assume that all stopped streams can be safely restarted - // if we cancel the operation. - stoppedStreams, err := sm.readTabletStreams(ctx, source.GetPrimary(), "state = 'Stopped' and message != 'FROZEN'") - if err != nil { - return err - } - if len(stoppedStreams) != 0 { - return fmt.Errorf("cannot migrate until all streams are running: %s: %d", source.GetShard().ShardName(), source.GetPrimary().Alias.Uid) - } - } - tabletStreams, err := sm.readTabletStreams(ctx, source.GetPrimary(), "") - if err != nil { - return err - } - if len(tabletStreams) == 0 { - // No VReplication is running. So, we have no work to do. - return nil - } - p3qr, err := sm.ts.wr.tmc.VReplicationExec(ctx, source.GetPrimary().Tablet, fmt.Sprintf("select vrepl_id from _vt.copy_state where vrepl_id in %s", tabletStreamValues(tabletStreams))) - if err != nil { - return err - } - if len(p3qr.Rows) != 0 { - return fmt.Errorf("cannot migrate while vreplication streams in source shards are still copying: %s", source.GetShard().ShardName()) - } - - mu.Lock() - defer mu.Unlock() - streams[source.GetShard().ShardName()] = tabletStreams - return nil - }) - if err != nil { - return nil, err - } - // Validate that streams match across source shards. - streams2 := make(map[string][]*workflow.VReplicationStream) - var reference []*workflow.VReplicationStream - var refshard string - for k, v := range streams { - if reference == nil { - refshard = k - reference = v - continue - } - streams2[k] = append([]*workflow.VReplicationStream(nil), v...) - } - for shard, tabletStreams := range streams2 { - nextStream: - for _, refStream := range reference { - for i := 0; i < len(tabletStreams); i++ { - vrs := tabletStreams[i] - if refStream.Workflow == vrs.Workflow && - refStream.BinlogSource.Keyspace == vrs.BinlogSource.Keyspace && - refStream.BinlogSource.Shard == vrs.BinlogSource.Shard { - // Delete the matched item and scan for the next stream. - tabletStreams = append(tabletStreams[:i], tabletStreams[i+1:]...) - continue nextStream - } - } - return nil, fmt.Errorf("streams are mismatched across source shards: %s vs %s", refshard, shard) - } - if len(tabletStreams) != 0 { - return nil, fmt.Errorf("streams are mismatched across source shards: %s vs %s", refshard, shard) - } - } - return streams, nil + return &streamMigrater{sm: sm}, nil } func (sm *streamMigrater) stopStreams(ctx context.Context) ([]string, error) { - if sm.streams == nil { - return nil, nil - } - if err := sm.stopSourceStreams(ctx); err != nil { - return nil, err - } - positions, err := sm.syncSourceStreams(ctx) - if err != nil { - return nil, err - } - return sm.verifyStreamPositions(ctx, positions) -} - -// blsIsReference is partially copied from streamMigrater.templatize. -// It reuses the constants from that function also. -func (sm *streamMigrater) blsIsReference(bls *binlogdatapb.BinlogSource) (bool, error) { - streamType := unknown - for _, rule := range bls.Filter.Rules { - typ, err := sm.identifyRuleType(rule) - if err != nil { - return false, err - } - switch typ { - case sharded: - if streamType == reference { - return false, fmt.Errorf("cannot reshard streams with a mix of reference and sharded tables: %v", bls) - } - streamType = sharded - case reference: - if streamType == sharded { - return false, fmt.Errorf("cannot reshard streams with a mix of reference and sharded tables: %v", bls) - } - streamType = reference - } - } - return streamType == reference, nil -} - -func (sm *streamMigrater) identifyRuleType(rule *binlogdatapb.Rule) (int, error) { - vtable, ok := sm.ts.sourceKSSchema.Tables[rule.Match] - if !ok { - return 0, fmt.Errorf("table %v not found in vschema", rule.Match) - } - if vtable.Type == vindexes.TypeReference { - return reference, nil - } - // In this case, 'sharded' means that it's not a reference - // table. We don't care about any other subtleties. - return sharded, nil -} - -func (sm *streamMigrater) readTabletStreams(ctx context.Context, ti *topo.TabletInfo, constraint string) ([]*workflow.VReplicationStream, error) { - var query string - if constraint == "" { - query = fmt.Sprintf("select id, workflow, source, pos from _vt.vreplication where db_name=%s and workflow != %s", encodeString(ti.DbName()), encodeString(sm.ts.reverseWorkflow)) - } else { - query = fmt.Sprintf("select id, workflow, source, pos from _vt.vreplication where db_name=%s and workflow != %s and %s", encodeString(ti.DbName()), encodeString(sm.ts.reverseWorkflow), constraint) - } - p3qr, err := sm.ts.wr.tmc.VReplicationExec(ctx, ti.Tablet, query) - if err != nil { - return nil, err - } - qr := sqltypes.Proto3ToResult(p3qr) - - tabletStreams := make([]*workflow.VReplicationStream, 0, len(qr.Rows)) - for _, row := range qr.Rows { - id, err := evalengine.ToInt64(row[0]) - if err != nil { - return nil, err - } - workflowName := row[1].ToString() - if workflowName == "" { - return nil, fmt.Errorf("VReplication streams must have named workflows for migration: shard: %s:%s, stream: %d", ti.Keyspace, ti.Shard, id) - } - if workflowName == sm.ts.workflow { - return nil, fmt.Errorf("VReplication stream has the same workflow name as the resharding workflow: shard: %s:%s, stream: %d", ti.Keyspace, ti.Shard, id) - } - var bls binlogdatapb.BinlogSource - if err := proto.UnmarshalText(row[2].ToString(), &bls); err != nil { - return nil, err - } - isReference, err := sm.blsIsReference(&bls) - if err != nil { - return nil, vterrors.Wrap(err, "blsIsReference") - } - if isReference { - sm.ts.wr.Logger().Infof("readTabletStreams: ignoring reference table %+v", bls) - continue - } - pos, err := mysql.DecodePosition(row[3].ToString()) - if err != nil { - return nil, err - } - tabletStreams = append(tabletStreams, &workflow.VReplicationStream{ - ID: uint32(id), - Workflow: workflowName, - BinlogSource: &bls, - Position: pos, - }) - } - return tabletStreams, nil -} - -func (sm *streamMigrater) stopSourceStreams(ctx context.Context) error { - stoppedStreams := make(map[string][]*workflow.VReplicationStream) - var mu sync.Mutex - err := sm.ts.forAllSources(func(source *workflow.MigrationSource) error { - tabletStreams := sm.streams[source.GetShard().ShardName()] - if len(tabletStreams) == 0 { - return nil - } - query := fmt.Sprintf("update _vt.vreplication set state='Stopped', message='for cutover' where id in %s", tabletStreamValues(tabletStreams)) - _, err := sm.ts.wr.tmc.VReplicationExec(ctx, source.GetPrimary().Tablet, query) - if err != nil { - return err - } - tabletStreams, err = sm.readTabletStreams(ctx, source.GetPrimary(), fmt.Sprintf("id in %s", tabletStreamValues(tabletStreams))) - if err != nil { - return err - } - mu.Lock() - defer mu.Unlock() - stoppedStreams[source.GetShard().ShardName()] = tabletStreams - return nil - }) - if err != nil { - return err - } - sm.streams = stoppedStreams - return nil -} - -func (sm *streamMigrater) syncSourceStreams(ctx context.Context) (map[string]mysql.Position, error) { - stopPositions := make(map[string]mysql.Position) - for _, tabletStreams := range sm.streams { - for _, vrs := range tabletStreams { - key := fmt.Sprintf("%s:%s", vrs.BinlogSource.Keyspace, vrs.BinlogSource.Shard) - pos, ok := stopPositions[key] - if !ok || vrs.Position.AtLeast(pos) { - sm.ts.wr.Logger().Infof("syncSourceStreams setting stopPositions +%s %+v %d", key, vrs.Position, vrs.ID) - stopPositions[key] = vrs.Position - } - } - } - var wg sync.WaitGroup - allErrors := &concurrency.AllErrorRecorder{} - for shard, tabletStreams := range sm.streams { - for _, vrs := range tabletStreams { - key := fmt.Sprintf("%s:%s", vrs.BinlogSource.Keyspace, vrs.BinlogSource.Shard) - pos := stopPositions[key] - sm.ts.wr.Logger().Infof("syncSourceStreams before go func +%s %+v %d", key, pos, vrs.ID) - if vrs.Position.Equal(pos) { - continue - } - wg.Add(1) - go func(vrs *workflow.VReplicationStream, shard string, pos mysql.Position) { - defer wg.Done() - sm.ts.wr.Logger().Infof("syncSourceStreams beginning of go func %s %s %+v %d", shard, vrs.BinlogSource.Shard, pos, vrs.ID) - - si, err := sm.ts.wr.ts.GetShard(ctx, sm.ts.sourceKeyspace, shard) - if err != nil { - allErrors.RecordError(err) - return - } - master, err := sm.ts.wr.ts.GetTablet(ctx, si.MasterAlias) - if err != nil { - allErrors.RecordError(err) - return - } - query := fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos='%s', message='synchronizing for cutover' where id=%d", mysql.EncodePosition(pos), vrs.ID) - if _, err := sm.ts.wr.tmc.VReplicationExec(ctx, master.Tablet, query); err != nil { - allErrors.RecordError(err) - return - } - sm.ts.wr.Logger().Infof("Waiting for keyspace:shard: %v:%v, position %v", sm.ts.sourceKeyspace, shard, pos) - if err := sm.ts.wr.tmc.VReplicationWaitForPos(ctx, master.Tablet, int(vrs.ID), mysql.EncodePosition(pos)); err != nil { - allErrors.RecordError(err) - return - } - sm.ts.wr.Logger().Infof("Position for keyspace:shard: %v:%v reached", sm.ts.sourceKeyspace, shard) - }(vrs, shard, pos) - } - } - wg.Wait() - return stopPositions, allErrors.AggrError(vterrors.Aggregate) -} - -func (sm *streamMigrater) verifyStreamPositions(ctx context.Context, stopPositions map[string]mysql.Position) ([]string, error) { - stoppedStreams := make(map[string][]*workflow.VReplicationStream) - var mu sync.Mutex - err := sm.ts.forAllSources(func(source *workflow.MigrationSource) error { - tabletStreams := sm.streams[source.GetShard().ShardName()] - if len(tabletStreams) == 0 { - return nil - } - tabletStreams, err := sm.readTabletStreams(ctx, source.GetPrimary(), fmt.Sprintf("id in %s", tabletStreamValues(tabletStreams))) - if err != nil { - return err - } - mu.Lock() - defer mu.Unlock() - stoppedStreams[source.GetShard().ShardName()] = tabletStreams - return nil - }) - if err != nil { - return nil, err - } - - // This is not really required because it's not used later. - // But we keep it up-to-date for good measure. - sm.streams = stoppedStreams - - var oneSet []*workflow.VReplicationStream - allErrors := &concurrency.AllErrorRecorder{} - for _, tabletStreams := range stoppedStreams { - if oneSet == nil { - oneSet = tabletStreams - } - for _, vrs := range tabletStreams { - key := fmt.Sprintf("%s:%s", vrs.BinlogSource.Keyspace, vrs.BinlogSource.Shard) - pos := stopPositions[key] - if !vrs.Position.Equal(pos) { - allErrors.RecordError(fmt.Errorf("%s: stream %d position: %s does not match %s", key, vrs.ID, mysql.EncodePosition(vrs.Position), mysql.EncodePosition(pos))) - } - } - } - if allErrors.HasErrors() { - return nil, allErrors.AggrError(vterrors.Aggregate) - } - sm.templates, err = sm.templatize(ctx, oneSet) - if err != nil { - // Unreachable: we've already templatized this before. - return nil, err - } - return tabletStreamWorkflows(sm.templates), allErrors.AggrError(vterrors.Aggregate) + return sm.sm.StopStreams(ctx) } func (sm *streamMigrater) migrateStreams(ctx context.Context) error { - if sm.streams == nil { - return nil - } - - // Delete any previous stray workflows that might have been left-over - // due to a failed migration. - if err := sm.deleteTargetStreams(ctx); err != nil { - return err - } - - return sm.createTargetStreams(ctx, sm.templates) -} - -const ( - unknown = iota - sharded - reference -) - -// templatizeRule replaces keyrange values with {{.}}. -// This can then be used by go's template package to substitute other keyrange values. -func (sm *streamMigrater) templatize(ctx context.Context, tabletStreams []*workflow.VReplicationStream) ([]*workflow.VReplicationStream, error) { - tabletStreams = copyTabletStreams(tabletStreams) - var shardedStreams []*workflow.VReplicationStream - for _, vrs := range tabletStreams { - streamType := unknown - for _, rule := range vrs.BinlogSource.Filter.Rules { - typ, err := sm.templatizeRule(ctx, rule) - if err != nil { - return nil, err - } - switch typ { - case sharded: - if streamType == reference { - return nil, fmt.Errorf("cannot migrate streams with a mix of reference and sharded tables: %v", vrs.BinlogSource) - } - streamType = sharded - case reference: - if streamType == sharded { - return nil, fmt.Errorf("cannot migrate streams with a mix of reference and sharded tables: %v", vrs.BinlogSource) - } - streamType = reference - } - } - if streamType == sharded { - shardedStreams = append(shardedStreams, vrs) - } - } - return shardedStreams, nil -} - -func (sm *streamMigrater) templatizeRule(ctx context.Context, rule *binlogdatapb.Rule) (int, error) { - vtable, ok := sm.ts.sourceKSSchema.Tables[rule.Match] - if !ok { - return 0, fmt.Errorf("table %v not found in vschema", rule.Match) - } - if vtable.Type == vindexes.TypeReference { - return reference, nil - } - switch { - case rule.Filter == "": - return unknown, fmt.Errorf("rule %v does not have a select expression in vreplication", rule) - case key.IsKeyRange(rule.Filter): - rule.Filter = "{{.}}" - return sharded, nil - case rule.Filter == vreplication.ExcludeStr: - return unknown, fmt.Errorf("unexpected rule in vreplication: %v", rule) - default: - err := sm.templatizeKeyRange(ctx, rule) - if err != nil { - return unknown, err - } - return sharded, nil - } -} - -func (sm *streamMigrater) templatizeKeyRange(ctx context.Context, rule *binlogdatapb.Rule) error { - statement, err := sqlparser.Parse(rule.Filter) - if err != nil { - return err - } - sel, ok := statement.(*sqlparser.Select) - if !ok { - return fmt.Errorf("unexpected query: %v", rule.Filter) - } - var expr sqlparser.Expr - if sel.Where != nil { - expr = sel.Where.Expr - } - exprs := sqlparser.SplitAndExpression(nil, expr) - for _, subexpr := range exprs { - funcExpr, ok := subexpr.(*sqlparser.FuncExpr) - if !ok || !funcExpr.Name.EqualString("in_keyrange") { - continue - } - var krExpr sqlparser.SelectExpr - switch len(funcExpr.Exprs) { - case 1: - krExpr = funcExpr.Exprs[0] - case 3: - krExpr = funcExpr.Exprs[2] - default: - return fmt.Errorf("unexpected in_keyrange parameters: %v", sqlparser.String(funcExpr)) - } - aliased, ok := krExpr.(*sqlparser.AliasedExpr) - if !ok { - return fmt.Errorf("unexpected in_keyrange parameters: %v", sqlparser.String(funcExpr)) - } - val, ok := aliased.Expr.(*sqlparser.Literal) - if !ok { - return fmt.Errorf("unexpected in_keyrange parameters: %v", sqlparser.String(funcExpr)) - } - if strings.Contains(rule.Filter, "{{") { - return fmt.Errorf("cannot migrate queries that contain '{{' in their string: %s", rule.Filter) - } - val.Val = "{{.}}" - rule.Filter = sqlparser.String(statement) - return nil - } - // There was no in_keyrange expression. Create a new one. - vtable := sm.ts.sourceKSSchema.Tables[rule.Match] - inkr := &sqlparser.FuncExpr{ - Name: sqlparser.NewColIdent("in_keyrange"), - Exprs: sqlparser.SelectExprs{ - &sqlparser.AliasedExpr{Expr: &sqlparser.ColName{Name: vtable.ColumnVindexes[0].Columns[0]}}, - &sqlparser.AliasedExpr{Expr: sqlparser.NewStrLiteral(vtable.ColumnVindexes[0].Type)}, - &sqlparser.AliasedExpr{Expr: sqlparser.NewStrLiteral("{{.}}")}, - }, - } - sel.AddWhere(inkr) - rule.Filter = sqlparser.String(statement) - return nil -} - -func (sm *streamMigrater) createTargetStreams(ctx context.Context, tmpl []*workflow.VReplicationStream) error { - - if len(tmpl) == 0 { - return nil - } - return sm.ts.forAllTargets(func(target *workflow.MigrationTarget) error { - tabletStreams := copyTabletStreams(tmpl) - for _, vrs := range tabletStreams { - for _, rule := range vrs.BinlogSource.Filter.Rules { - buf := &strings.Builder{} - t := template.Must(template.New("").Parse(rule.Filter)) - if err := t.Execute(buf, key.KeyRangeString(target.GetShard().KeyRange)); err != nil { - return err - } - rule.Filter = buf.String() - } - } - - ig := vreplication.NewInsertGenerator(binlogplayer.BlpStopped, target.GetPrimary().DbName()) - for _, vrs := range tabletStreams { - ig.AddRow(vrs.Workflow, vrs.BinlogSource, mysql.EncodePosition(vrs.Position), "", "") - } - _, err := sm.ts.wr.VReplicationExec(ctx, target.GetPrimary().Alias, ig.String()) - return err - }) + return sm.sm.MigrateStreams(ctx) } func (sm *streamMigrater) cancelMigration(ctx context.Context) { - if sm.streams == nil { - return - } - - // Ignore error. We still want to restart the source streams if deleteTargetStreams fails. - _ = sm.deleteTargetStreams(ctx) - - err := sm.ts.forAllSources(func(source *workflow.MigrationSource) error { - query := fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos=null, message='' where db_name=%s and workflow != %s", encodeString(source.GetPrimary().DbName()), encodeString(sm.ts.reverseWorkflow)) - _, err := sm.ts.wr.VReplicationExec(ctx, source.GetPrimary().Alias, query) - return err - }) - if err != nil { - sm.ts.wr.Logger().Errorf("Cancel migration failed: could not restart source streams: %v", err) - } -} - -func (sm *streamMigrater) deleteTargetStreams(ctx context.Context) error { - if len(sm.workflows) == 0 { - return nil - } - workflowList := stringListify(sm.workflows) - err := sm.ts.forAllTargets(func(target *workflow.MigrationTarget) error { - query := fmt.Sprintf("delete from _vt.vreplication where db_name=%s and workflow in (%s)", encodeString(target.GetPrimary().DbName()), workflowList) - _, err := sm.ts.wr.VReplicationExec(ctx, target.GetPrimary().Alias, query) - return err - }) - if err != nil { - sm.ts.wr.Logger().Warningf("Could not delete migrated streams: %v", err) - } - return err + sm.sm.CancelMigration(ctx) } // streamMigraterFinalize finalizes the stream migration. // It's a standalone function because it does not use the streamMigrater state. func streamMigraterfinalize(ctx context.Context, ts *trafficSwitcher, workflows []string) error { - if len(workflows) == 0 { - return nil - } - workflowList := stringListify(workflows) - err := ts.forAllSources(func(source *workflow.MigrationSource) error { - query := fmt.Sprintf("delete from _vt.vreplication where db_name=%s and workflow in (%s)", encodeString(source.GetPrimary().DbName()), workflowList) - _, err := ts.wr.VReplicationExec(ctx, source.GetPrimary().Alias, query) - return err - }) - if err != nil { - return err - } - err = ts.forAllTargets(func(target *workflow.MigrationTarget) error { - query := fmt.Sprintf("update _vt.vreplication set state='Running' where db_name=%s and workflow in (%s)", encodeString(target.GetPrimary().DbName()), workflowList) - _, err := ts.wr.VReplicationExec(ctx, target.GetPrimary().Alias, query) - return err - }) - return err -} - -func tabletStreamValues(tabletStreams []*workflow.VReplicationStream) string { - return workflow.VReplicationStreams(tabletStreams).Values() -} - -func tabletStreamWorkflows(tabletStreams []*workflow.VReplicationStream) []string { - return workflow.VReplicationStreams(tabletStreams).Workflows() -} - -func stringListify(in []string) string { - buf := &strings.Builder{} - prefix := "" - for _, str := range in { - fmt.Fprintf(buf, "%s%s", prefix, encodeString(str)) - prefix = ", " - } - return buf.String() + return workflow.StreamMigratorFinalize(ctx, ts, workflows) } func copyTabletStreams(in []*workflow.VReplicationStream) []*workflow.VReplicationStream { diff --git a/go/vt/wrangler/stream_migrater_test.go b/go/vt/wrangler/stream_migrater_test.go index 9009aa773a9..c470c82b6a1 100644 --- a/go/vt/wrangler/stream_migrater_test.go +++ b/go/vt/wrangler/stream_migrater_test.go @@ -17,9 +17,7 @@ limitations under the License. package wrangler import ( - "encoding/json" "fmt" - "reflect" "strings" "testing" "time" @@ -29,11 +27,6 @@ import ( "vitess.io/vitess/go/sqltypes" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/proto/vschema" - vschemapb "vitess.io/vitess/go/vt/proto/vschema" - "vitess.io/vitess/go/vt/vtctl/workflow" - "vitess.io/vitess/go/vt/vtgate/vindexes" - "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" ) var ( @@ -1436,322 +1429,3 @@ func TestStreamMigrateStreamsMismatch(t *testing.T) { } verifyQueries(t, tme.allDBClients) } - -func TestTemplatize(t *testing.T) { - tests := []struct { - in []*workflow.VReplicationStream - out string - err string - }{{ - // First test contains all fields. - in: []*workflow.VReplicationStream{{ - ID: 1, - Workflow: "test", - BinlogSource: &binlogdatapb.BinlogSource{ - Keyspace: "ks", - Shard: "80-", - Filter: &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "t1", - Filter: "select * from t1 where in_keyrange('-80')", - }}, - }, - }, - }}, - out: `[{"ID":1,"Workflow":"test","Bls":{"keyspace":"ks","shard":"80-","filter":{"rules":[{"match":"t1","filter":"select * from t1 where in_keyrange('{{.}}')"}]}}}]`, - }, { - // Reference table. - in: []*workflow.VReplicationStream{{ - BinlogSource: &binlogdatapb.BinlogSource{ - Filter: &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "ref", - Filter: "", - }}, - }, - }, - }}, - out: "", - }, { - // Sharded table. - in: []*workflow.VReplicationStream{{ - BinlogSource: &binlogdatapb.BinlogSource{ - Filter: &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "t1", - Filter: "-80", - }}, - }, - }, - }}, - out: `[{"ID":0,"Workflow":"","Bls":{"filter":{"rules":[{"match":"t1","filter":"{{.}}"}]}}}]`, - }, { - // table not found - in: []*workflow.VReplicationStream{{ - BinlogSource: &binlogdatapb.BinlogSource{ - Filter: &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "t3", - }}, - }, - }, - }}, - err: `table t3 not found in vschema`, - }, { - // sharded table with no filter - in: []*workflow.VReplicationStream{{ - BinlogSource: &binlogdatapb.BinlogSource{ - Filter: &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "t1", - }}, - }, - }, - }}, - err: `rule match:"t1" does not have a select expression in vreplication`, - }, { - // Excluded table. - in: []*workflow.VReplicationStream{{ - BinlogSource: &binlogdatapb.BinlogSource{ - Filter: &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "t1", - Filter: vreplication.ExcludeStr, - }}, - }, - }, - }}, - err: `unexpected rule in vreplication: match:"t1" filter:"exclude" `, - }, { - // Sharded table and ref table - in: []*workflow.VReplicationStream{{ - BinlogSource: &binlogdatapb.BinlogSource{ - Filter: &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "t1", - Filter: "-80", - }, { - Match: "ref", - Filter: "", - }}, - }, - }, - }}, - err: `cannot migrate streams with a mix of reference and sharded tables: filter: rules: > `, - }, { - // Ref table and sharded table (different code path) - in: []*workflow.VReplicationStream{{ - BinlogSource: &binlogdatapb.BinlogSource{ - Filter: &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "ref", - Filter: "", - }, { - Match: "t2", - Filter: "-80", - }}, - }, - }, - }}, - err: `cannot migrate streams with a mix of reference and sharded tables: filter: rules: > `, - }, { - // Ref table with select expression - in: []*workflow.VReplicationStream{{ - BinlogSource: &binlogdatapb.BinlogSource{ - Filter: &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "ref", - Filter: "select * from t1", - }}, - }, - }, - }}, - out: "", - }, { - // Select expresstion with no keyrange value - in: []*workflow.VReplicationStream{{ - BinlogSource: &binlogdatapb.BinlogSource{ - Filter: &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "t1", - Filter: "select * from t1", - }}, - }, - }, - }}, - out: `[{"ID":0,"Workflow":"","Bls":{"filter":{"rules":[{"match":"t1","filter":"select * from t1 where in_keyrange(c1, 'hash', '{{.}}')"}]}}}]`, - }, { - // Select expresstion with one keyrange value - in: []*workflow.VReplicationStream{{ - BinlogSource: &binlogdatapb.BinlogSource{ - Filter: &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "t1", - Filter: "select * from t1 where in_keyrange('-80')", - }}, - }, - }, - }}, - out: `[{"ID":0,"Workflow":"","Bls":{"filter":{"rules":[{"match":"t1","filter":"select * from t1 where in_keyrange('{{.}}')"}]}}}]`, - }, { - // Select expresstion with three keyrange values - in: []*workflow.VReplicationStream{{ - BinlogSource: &binlogdatapb.BinlogSource{ - Filter: &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "t1", - Filter: "select * from t1 where in_keyrange(col, vdx, '-80')", - }}, - }, - }, - }}, - out: `[{"ID":0,"Workflow":"","Bls":{"filter":{"rules":[{"match":"t1","filter":"select * from t1 where in_keyrange(col, vdx, '{{.}}')"}]}}}]`, - }, { - // syntax error - in: []*workflow.VReplicationStream{{ - BinlogSource: &binlogdatapb.BinlogSource{ - Filter: &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "t1", - Filter: "bad syntax", - }}, - }, - }, - }}, - err: "syntax error at position 4 near 'bad'", - }, { - // invalid statement - in: []*workflow.VReplicationStream{{ - BinlogSource: &binlogdatapb.BinlogSource{ - Filter: &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "t1", - Filter: "update t set a=1", - }}, - }, - }, - }}, - err: "unexpected query: update t set a=1", - }, { - // invalid in_keyrange - in: []*workflow.VReplicationStream{{ - BinlogSource: &binlogdatapb.BinlogSource{ - Filter: &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "t1", - Filter: "select * from t1 where in_keyrange(col, vdx, '-80', extra)", - }}, - }, - }, - }}, - err: "unexpected in_keyrange parameters: in_keyrange(col, vdx, '-80', extra)", - }, { - // * in_keyrange - in: []*workflow.VReplicationStream{{ - BinlogSource: &binlogdatapb.BinlogSource{ - Filter: &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "t1", - Filter: "select * from t1 where in_keyrange(*)", - }}, - }, - }, - }}, - err: "unexpected in_keyrange parameters: in_keyrange(*)", - }, { - // non-string in_keyrange - in: []*workflow.VReplicationStream{{ - BinlogSource: &binlogdatapb.BinlogSource{ - Filter: &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "t1", - Filter: "select * from t1 where in_keyrange(aa)", - }}, - }, - }, - }}, - err: "unexpected in_keyrange parameters: in_keyrange(aa)", - }, { - // '{{' in query - in: []*workflow.VReplicationStream{{ - BinlogSource: &binlogdatapb.BinlogSource{ - Filter: &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "t1", - Filter: "select '{{' from t1 where in_keyrange('-80')", - }}, - }, - }, - }}, - err: "cannot migrate queries that contain '{{' in their string: select '{{' from t1 where in_keyrange('-80')", - }} - vs := &vschemapb.Keyspace{ - Sharded: true, - Vindexes: map[string]*vschema.Vindex{ - "thash": { - Type: "hash", - }, - }, - Tables: map[string]*vschema.Table{ - "t1": { - ColumnVindexes: []*vschema.ColumnVindex{{ - Columns: []string{"c1"}, - Name: "thash", - }}, - }, - "t2": { - ColumnVindexes: []*vschema.ColumnVindex{{ - Columns: []string{"c1"}, - Name: "thash", - }}, - }, - "ref": { - Type: vindexes.TypeReference, - }, - }, - } - ksschema, err := vindexes.BuildKeyspaceSchema(vs, "ks") - if err != nil { - t.Fatal(err) - } - ts := &trafficSwitcher{ - sourceKSSchema: ksschema, - } - for _, tt := range tests { - sm := &streamMigrater{ts: ts} - out, err := sm.templatize(context.Background(), tt.in) - var gotErr string - if err != nil { - gotErr = err.Error() - } - if gotErr != tt.err { - t.Errorf("templatize(%v) err: %v, want %v", stringifyVRS(tt.in), err, tt.err) - } - got := stringifyVRS(out) - if !reflect.DeepEqual(tt.out, got) { - t.Errorf("templatize(%v):\n%v, want\n%v", stringifyVRS(tt.in), got, tt.out) - } - } -} - -type testVRS struct { - ID uint32 - Workflow string - Bls *binlogdatapb.BinlogSource -} - -func stringifyVRS(in []*workflow.VReplicationStream) string { - if len(in) == 0 { - return "" - } - var converted []*testVRS - for _, vrs := range in { - converted = append(converted, &testVRS{ - ID: vrs.ID, - Workflow: vrs.Workflow, - Bls: vrs.BinlogSource, - }) - } - b, _ := json.Marshal(converted) - return string(b) -} diff --git a/go/vt/wrangler/switcher.go b/go/vt/wrangler/switcher.go index aad7ae46ead..e5aa9982772 100644 --- a/go/vt/wrangler/switcher.go +++ b/go/vt/wrangler/switcher.go @@ -20,6 +20,7 @@ import ( "time" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/vtctl/workflow" "context" ) @@ -87,8 +88,8 @@ func (r *switcher) createReverseVReplication(ctx context.Context) error { return r.ts.createReverseVReplication(ctx) } -func (r *switcher) migrateStreams(ctx context.Context, sm *streamMigrater) error { - return sm.migrateStreams(ctx) +func (r *switcher) migrateStreams(ctx context.Context, sm *workflow.StreamMigrator) error { + return sm.MigrateStreams(ctx) } func (r *switcher) waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error { @@ -99,11 +100,11 @@ func (r *switcher) stopSourceWrites(ctx context.Context) error { return r.ts.stopSourceWrites(ctx) } -func (r *switcher) stopStreams(ctx context.Context, sm *streamMigrater) ([]string, error) { - return sm.stopStreams(ctx) +func (r *switcher) stopStreams(ctx context.Context, sm *workflow.StreamMigrator) ([]string, error) { + return sm.StopStreams(ctx) } -func (r *switcher) cancelMigration(ctx context.Context, sm *streamMigrater) { +func (r *switcher) cancelMigration(ctx context.Context, sm *workflow.StreamMigrator) { r.ts.wr.Logger().Infof("Cancel was requested.") r.ts.cancelMigration(ctx, sm) } diff --git a/go/vt/wrangler/switcher_dry_run.go b/go/vt/wrangler/switcher_dry_run.go index c0b52c10589..8c069c05e08 100644 --- a/go/vt/wrangler/switcher_dry_run.go +++ b/go/vt/wrangler/switcher_dry_run.go @@ -148,14 +148,16 @@ func (dr *switcherDryRun) createReverseVReplication(ctx context.Context) error { return nil } -func (dr *switcherDryRun) migrateStreams(ctx context.Context, sm *streamMigrater) error { - if len(sm.templates) == 0 { +func (dr *switcherDryRun) migrateStreams(ctx context.Context, sm *workflow.StreamMigrator) error { + templates := sm.Templates() + + if len(templates) == 0 { return nil } logs := make([]string, 0) dr.drLog.Log(fmt.Sprintf("Migrate streams to %s:", dr.ts.targetKeyspace)) - for key, streams := range sm.streams { + for key, streams := range sm.Streams() { for _, stream := range streams { logs = append(logs, fmt.Sprintf("\tShard %s Id %d, Workflow %s, Pos %s, BinLogSource %v", key, stream.ID, stream.Workflow, mysql.EncodePosition(stream.Position), stream.BinlogSource)) } @@ -166,7 +168,7 @@ func (dr *switcherDryRun) migrateStreams(ctx context.Context, sm *streamMigrater logs = nil } for _, target := range dr.ts.targets { - tabletStreams := copyTabletStreams(sm.templates) + tabletStreams := templates for _, vrs := range tabletStreams { logs = append(logs, fmt.Sprintf("\t Keyspace %s, Shard %s, Tablet %d, Workflow %s, Id %d, Pos %v, BinLogSource %s", vrs.BinlogSource.Keyspace, vrs.BinlogSource.Shard, target.GetPrimary().Alias.Uid, vrs.Workflow, vrs.ID, mysql.EncodePosition(vrs.Position), vrs.BinlogSource)) @@ -197,9 +199,9 @@ func (dr *switcherDryRun) stopSourceWrites(ctx context.Context) error { return nil } -func (dr *switcherDryRun) stopStreams(ctx context.Context, sm *streamMigrater) ([]string, error) { +func (dr *switcherDryRun) stopStreams(ctx context.Context, sm *workflow.StreamMigrator) ([]string, error) { logs := make([]string, 0) - for _, streams := range sm.streams { + for _, streams := range sm.Streams() { for _, stream := range streams { logs = append(logs, fmt.Sprintf("\tId %d Keyspace %s Shard %s Rules %s at Position %v", stream.ID, stream.BinlogSource.Keyspace, stream.BinlogSource.Shard, stream.BinlogSource.Filter, stream.Position)) @@ -212,7 +214,7 @@ func (dr *switcherDryRun) stopStreams(ctx context.Context, sm *streamMigrater) ( return nil, nil } -func (dr *switcherDryRun) cancelMigration(ctx context.Context, sm *streamMigrater) { +func (dr *switcherDryRun) cancelMigration(ctx context.Context, sm *workflow.StreamMigrator) { dr.drLog.Log("Cancel stream migrations as requested") } diff --git a/go/vt/wrangler/switcher_interface.go b/go/vt/wrangler/switcher_interface.go index 87272e6736b..399cb4c633b 100644 --- a/go/vt/wrangler/switcher_interface.go +++ b/go/vt/wrangler/switcher_interface.go @@ -17,20 +17,21 @@ limitations under the License. package wrangler import ( + "context" "time" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/vtctl/workflow" - "context" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) type iswitcher interface { lockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error) - cancelMigration(ctx context.Context, sm *streamMigrater) - stopStreams(ctx context.Context, sm *streamMigrater) ([]string, error) + cancelMigration(ctx context.Context, sm *workflow.StreamMigrator) + stopStreams(ctx context.Context, sm *workflow.StreamMigrator) ([]string, error) stopSourceWrites(ctx context.Context) error waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error - migrateStreams(ctx context.Context, sm *streamMigrater) error + migrateStreams(ctx context.Context, sm *workflow.StreamMigrator) error createReverseVReplication(ctx context.Context) error createJournals(ctx context.Context, sourceWorkflows []string) error allowTargetWrites(ctx context.Context) error diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 6dfe7a90259..4243c807a53 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -34,6 +34,7 @@ import ( "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topotools" @@ -41,8 +42,10 @@ import ( "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" + "vitess.io/vitess/go/vt/vttablet/tmclient" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" ) @@ -115,6 +118,39 @@ type trafficSwitcher struct { externalTopo *topo.Server } +/* +begin: implementation of workflow.ITrafficSwitcher + +(NOTE:@ajm188) Please see comments on that interface type for why this exists. +This is temporary to allow workflow.StreamMigrator to use this trafficSwitcher +code and should be removed in the very near-term when we move trafficSwitcher to +package workflow as well. +*/ + +var _ workflow.ITrafficSwitcher = (*trafficSwitcher)(nil) + +func (ts *trafficSwitcher) TopoServer() *topo.Server { return ts.wr.ts } +func (ts *trafficSwitcher) TabletManagerClient() tmclient.TabletManagerClient { return ts.wr.tmc } +func (ts *trafficSwitcher) Logger() logutil.Logger { return ts.wr.logger } +func (ts *trafficSwitcher) VReplicationExec(ctx context.Context, alias *topodatapb.TabletAlias, query string) (*querypb.QueryResult, error) { + return ts.wr.VReplicationExec(ctx, alias, query) +} + +func (ts *trafficSwitcher) MigrationType() binlogdatapb.MigrationType { return ts.migrationType } +func (ts *trafficSwitcher) ReverseWorkflowName() string { return ts.reverseWorkflow } +func (ts *trafficSwitcher) SourceKeyspaceName() string { return ts.sourceKSSchema.Keyspace.Name } +func (ts *trafficSwitcher) SourceKeyspaceSchema() *vindexes.KeyspaceSchema { return ts.sourceKSSchema } +func (ts *trafficSwitcher) WorkflowName() string { return ts.workflow } + +func (ts *trafficSwitcher) ForAllSources(f func(source *workflow.MigrationSource) error) error { + return ts.forAllSources(f) +} +func (ts *trafficSwitcher) ForAllTargets(f func(source *workflow.MigrationTarget) error) error { + return ts.forAllTargets(f) +} + +/* end: implementation of workflow.ITrafficSwitcher */ + // For a Reshard, to check whether we have switched reads for a tablet type, we check if any one of the source shards has // the query service disabled in its tablet control record func (wr *Wrangler) getCellsWithShardReadsSwitched(ctx context.Context, targetKeyspace string, si *topo.ShardInfo, tabletType string) ( @@ -444,15 +480,15 @@ func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflow st } // SwitchWrites is a generic way of migrating write traffic for a resharding workflow. -func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflow string, timeout time.Duration, cancel, reverse, reverseReplication bool, dryRun bool) (journalID int64, dryRunResults *[]string, err error) { - ts, ws, err := wr.getWorkflowState(ctx, targetKeyspace, workflow) +func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowName string, timeout time.Duration, cancel, reverse, reverseReplication bool, dryRun bool) (journalID int64, dryRunResults *[]string, err error) { + ts, ws, err := wr.getWorkflowState(ctx, targetKeyspace, workflowName) _ = ws if err != nil { wr.Logger().Errorf("getWorkflowState failed: %v", err) return 0, nil, err } if ts == nil { - errorMsg := fmt.Sprintf("workflow %s not found in keyspace %s", workflow, targetKeyspace) + errorMsg := fmt.Sprintf("workflow %s not found in keyspace %s", workflowName, targetKeyspace) wr.Logger().Errorf(errorMsg) return 0, nil, fmt.Errorf(errorMsg) } @@ -501,7 +537,7 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflow s } if !journalsExist { ts.wr.Logger().Infof("No previous journals were found. Proceeding normally.") - sm, err := buildStreamMigrater(ctx, ts, cancel) + sm, err := workflow.BuildStreamMigrator(ctx, ts, cancel) if err != nil { ts.wr.Logger().Errorf("buildStreamMigrater failed: %v", err) return 0, nil, err @@ -514,7 +550,7 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflow s sourceWorkflows, err = sw.stopStreams(ctx, sm) if err != nil { ts.wr.Logger().Errorf("stopStreams failed: %v", err) - for key, streams := range sm.streams { + for key, streams := range sm.Streams() { for _, stream := range streams { ts.wr.Logger().Errorf("stream in stopStreams: key %s shard %s stream %+v", key, stream.BinlogSource.Shard, stream.BinlogSource) } @@ -1089,7 +1125,7 @@ func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicati }) } -func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *streamMigrater) { +func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *workflow.StreamMigrator) { var err error if ts.migrationType == binlogdatapb.MigrationType_TABLES { err = ts.changeTableSourceWrites(ctx, allowWrites) @@ -1100,7 +1136,7 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *streamMigrat ts.wr.Logger().Errorf("Cancel migration failed:", err) } - sm.cancelMigration(ctx) + sm.CancelMigration(ctx) err = ts.forAllTargets(func(target *workflow.MigrationTarget) error { query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s", encodeString(target.GetPrimary().DbName()), encodeString(ts.workflow)) From 8191ac6608ec5b7c10a59eef8f169ceadbf2c68f Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sun, 25 Apr 2021 16:30:42 -0400 Subject: [PATCH 6/6] Remove dead code Signed-off-by: Andrew Mason --- go/vt/wrangler/stream_migrater.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/go/vt/wrangler/stream_migrater.go b/go/vt/wrangler/stream_migrater.go index 97a5257789f..e01c22fc864 100644 --- a/go/vt/wrangler/stream_migrater.go +++ b/go/vt/wrangler/stream_migrater.go @@ -26,15 +26,6 @@ type streamMigrater struct { sm *workflow.StreamMigrator } -func buildStreamMigrater(ctx context.Context, ts *trafficSwitcher, cancelMigrate bool) (*streamMigrater, error) { - sm, err := workflow.BuildStreamMigrator(ctx, ts, cancelMigrate) - if err != nil { - return nil, err - } - - return &streamMigrater{sm: sm}, nil -} - func (sm *streamMigrater) stopStreams(ctx context.Context) ([]string, error) { return sm.sm.StopStreams(ctx) } @@ -52,7 +43,3 @@ func (sm *streamMigrater) cancelMigration(ctx context.Context) { func streamMigraterfinalize(ctx context.Context, ts *trafficSwitcher, workflows []string) error { return workflow.StreamMigratorFinalize(ctx, ts, workflows) } - -func copyTabletStreams(in []*workflow.VReplicationStream) []*workflow.VReplicationStream { - return workflow.VReplicationStreams(in).Copy().ToSlice() -}