From 1ab972b2298b6e3cb5a583b47175add66fad1390 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Thu, 4 Apr 2019 21:13:33 -0700 Subject: [PATCH 1/2] vreplication: SplitClone & VerticalSplitClone This is an alternate and experimental version where we replace the vtworker SplitClone and VerticalSplitClone with a vreplication based approach. I want to get this submitted so people can try this and provide feedback. If the performance is acceptable, then we can solidify this with more checks and tests. Signed-off-by: Sugu Sougoumarane --- examples/local/vttablet-up.sh | 2 +- go/vt/key/key.go | 2 +- go/vt/topo/keyspace.go | 14 +++ go/vt/vtctl/vtctl.go | 32 ++++++ .../vreplication/table_plan_builder.go | 10 +- .../tabletmanager/vreplication/vplayer.go | 11 +- go/vt/wrangler/keyspace.go | 102 ++++++++++++++++++ 7 files changed, 166 insertions(+), 7 deletions(-) diff --git a/examples/local/vttablet-up.sh b/examples/local/vttablet-up.sh index 00cea50a5b5..bdd16ba6df6 100755 --- a/examples/local/vttablet-up.sh +++ b/examples/local/vttablet-up.sh @@ -38,7 +38,7 @@ source $script_root/env.sh init_db_sql_file="$VTROOT/config/init_db.sql" -export EXTRA_MY_CNF=$VTROOT/config/mycnf/default-fast.cnf +export EXTRA_MY_CNF=$VTROOT/config/mycnf/default-fast.cnf:$VTROOT/config/mycnf/rbr.cnf case "$MYSQL_FLAVOR" in "MySQL56") diff --git a/go/vt/key/key.go b/go/vt/key/key.go index 88c51e26f22..367311ba5c8 100644 --- a/go/vt/key/key.go +++ b/go/vt/key/key.go @@ -144,7 +144,7 @@ func ParseKeyRangeParts(start, end string) (*topodatapb.KeyRange, error) { // KeyRangeString prints a topodatapb.KeyRange func KeyRangeString(k *topodatapb.KeyRange) string { if k == nil { - return "" + return "-" } return hex.EncodeToString(k.Start) + "-" + hex.EncodeToString(k.End) } diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index 96a58fd970b..3519c1ebc92 100644 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -255,6 +255,20 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string) return result, nil } +// GetOnlyShard returns the single ShardInfo of an unsharded keyspace. +func (ts *Server) GetOnlyShard(ctx context.Context, keyspace string) (*ShardInfo, error) { + allShards, err := ts.FindAllShardsInKeyspace(ctx, keyspace) + if err != nil { + return nil, err + } + if len(allShards) == 1 { + for _, s := range allShards { + return s, nil + } + } + return nil, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "keyspace %s must have one and only one shard: %v", keyspace, allShards) +} + // DeleteKeyspace wraps the underlying Conn.Delete // and dispatches the event. func (ts *Server) DeleteKeyspace(ctx context.Context, keyspace string) error { diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 80fbedc9859..ec24995f734 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -307,6 +307,12 @@ var commands = []commandGroup{ {"ValidateKeyspace", commandValidateKeyspace, "[-ping-tablets] ", "Validates that all nodes reachable from the specified keyspace are consistent."}, + {"SplitClone", commandSplitClone, + " ", + "Start the SplitClone process to perform horizontal resharding. Example: SplitClone ks '0' '-80,80-'"}, + {"VerticalSplitClone", commandVerticalSplitClone, + " ", + "Start the VerticalSplitClone process to perform vertical resharding. Example: SplitClone from_ks to_ks 'a,/b.*/'"}, {"MigrateServedTypes", commandMigrateServedTypes, "[-cells=c1,c2,...] [-reverse] [-skip-refresh-state] ", "Migrates a serving type from the source shard to the shards that it replicates to. This command also rebuilds the serving graph. The argument can specify any of the shards involved in the migration."}, @@ -1718,6 +1724,32 @@ func commandValidateKeyspace(ctx context.Context, wr *wrangler.Wrangler, subFlag return wr.ValidateKeyspace(ctx, keyspace, *pingTablets) } +func commandSplitClone(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { + if err := subFlags.Parse(args); err != nil { + return err + } + if subFlags.NArg() != 3 { + return fmt.Errorf("three arguments are required: keyspace, from_shards, to_shards") + } + keyspace := subFlags.Arg(0) + from := strings.Split(subFlags.Arg(1), ",") + to := strings.Split(subFlags.Arg(2), ",") + return wr.SplitClone(ctx, keyspace, from, to) +} + +func commandVerticalSplitClone(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { + if err := subFlags.Parse(args); err != nil { + return err + } + if subFlags.NArg() != 3 { + return fmt.Errorf("three arguments are required: from_keyspace, to_keyspace, tables") + } + fromKeyspace := subFlags.Arg(0) + toKeyspace := subFlags.Arg(1) + tables := strings.Split(subFlags.Arg(2), ",") + return wr.VerticalSplitClone(ctx, fromKeyspace, toKeyspace, tables) +} + func commandMigrateServedTypes(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { cellsStr := subFlags.String("cells", "", "Specifies a comma-separated list of cells to update") reverse := subFlags.Bool("reverse", false, "Moves the served tablet type backward instead of forward. Use in case of trouble") diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go index 59eb78b6077..c913f656a35 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go @@ -147,7 +147,13 @@ func buildQuery(tableName, filter string) string { } func buildTablePlan(rule *binlogdatapb.Rule, tableKeys map[string][]string, lastpk *sqltypes.Result) (*TablePlan, error) { - sel, fromTable, err := analyzeSelectFrom(rule.Filter) + query := rule.Filter + if query == "" { + buf := sqlparser.NewTrackedBuffer(nil) + buf.Myprintf("select * from %v", sqlparser.NewTableIdent(rule.Match)) + query = buf.String() + } + sel, fromTable, err := analyzeSelectFrom(query) if err != nil { return nil, err } @@ -162,7 +168,7 @@ func buildTablePlan(rule *binlogdatapb.Rule, tableKeys map[string][]string, last if !expr.TableName.IsEmpty() { return nil, fmt.Errorf("unsupported qualifier for '*' expression: %v", sqlparser.String(expr)) } - sendRule.Filter = rule.Filter + sendRule.Filter = query tablePlan := &TablePlan{ TargetName: rule.Match, SendRule: sendRule, diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index bb4f52a56e4..8e2e630d6b4 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -78,7 +78,10 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map // play is not resumable. If pausePos is set, play returns without updating the vreplication state. func (vp *vplayer) play(ctx context.Context) error { if !vp.stopPos.IsZero() && vp.startPos.AtLeast(vp.stopPos) { - return vp.vr.setState(binlogplayer.BlpStopped, fmt.Sprintf("Stop position %v already reached: %v", vp.startPos, vp.stopPos)) + if vp.saveStop { + return vp.vr.setState(binlogplayer.BlpStopped, fmt.Sprintf("Stop position %v already reached: %v", vp.startPos, vp.stopPos)) + } + return nil } plan, err := buildReplicatorPlan(vp.vr.source.Filter, vp.vr.tableKeys, vp.copyState) @@ -324,8 +327,10 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m } if !vp.pos.Equal(vp.stopPos) && vp.pos.AtLeast(vp.stopPos) { // Code is unreachable, but bad data can cause this to happen. - if err := vp.vr.setState(binlogplayer.BlpStopped, fmt.Sprintf("next event position %v exceeds stop pos %v, exiting without applying", vp.pos, vp.stopPos)); err != nil { - return err + if vp.saveStop { + if err := vp.vr.setState(binlogplayer.BlpStopped, fmt.Sprintf("next event position %v exceeds stop pos %v, exiting without applying", vp.pos, vp.stopPos)); err != nil { + return err + } } return io.EOF } diff --git a/go/vt/wrangler/keyspace.go b/go/vt/wrangler/keyspace.go index 9ef846f58f4..5113a04742f 100644 --- a/go/vt/wrangler/keyspace.go +++ b/go/vt/wrangler/keyspace.go @@ -38,6 +38,7 @@ import ( "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/topotools/events" + "vitess.io/vitess/go/vt/vterrors" ) const ( @@ -90,6 +91,107 @@ func (wr *Wrangler) SetKeyspaceShardingInfo(ctx context.Context, keyspace, shard return wr.ts.UpdateKeyspace(ctx, ki) } +// SplitClone initiates a SplitClone workflow. +func (wr *Wrangler) SplitClone(ctx context.Context, keyspace string, from, to []string) error { + var fromShards, toShards []*topo.ShardInfo + for _, shard := range from { + si, err := wr.ts.GetShard(ctx, keyspace, shard) + if err != nil { + return vterrors.Wrapf(err, "GetShard(%s) failed", shard) + } + fromShards = append(fromShards, si) + } + for _, shard := range to { + si, err := wr.ts.GetShard(ctx, keyspace, shard) + if err != nil { + return vterrors.Wrapf(err, "GetShard(%s) failed", shard) + } + toShards = append(toShards, si) + } + // TODO(sougou): validate from and to shards. + + for _, dest := range toShards { + master, err := wr.ts.GetTablet(ctx, dest.MasterAlias) + if err != nil { + return vterrors.Wrapf(err, "GetTablet(%v) failed", dest.MasterAlias) + } + var ids []uint64 + for _, source := range fromShards { + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "/.*", + Filter: key.KeyRangeString(dest.KeyRange), + }}, + } + bls := &binlogdatapb.BinlogSource{ + Keyspace: keyspace, + Shard: source.ShardName(), + Filter: filter, + } + cmd := binlogplayer.CreateVReplicationState("VSplitClone", bls, "", binlogplayer.BlpStopped, master.DbName()) + qr, err := wr.TabletManagerClient().VReplicationExec(ctx, master.Tablet, cmd) + if err != nil { + return vterrors.Wrapf(err, "VReplicationExec(%v, %s) failed", dest.MasterAlias, cmd) + } + if err := wr.SourceShardAdd(ctx, keyspace, dest.ShardName(), uint32(qr.InsertId), keyspace, source.ShardName(), source.Shard.KeyRange, nil); err != nil { + return vterrors.Wrapf(err, "SourceShardAdd(%s, %s) failed", dest.ShardName(), source.ShardName()) + } + ids = append(ids, qr.InsertId) + } + // Start vreplication only if all metadata was successfully created. + for _, id := range ids { + cmd := fmt.Sprintf("update _vt.vreplication set state='%s' where id=%d", binlogplayer.VReplicationInit, id) + if _, err = wr.TabletManagerClient().VReplicationExec(ctx, master.Tablet, cmd); err != nil { + return vterrors.Wrapf(err, "VReplicationExec(%v, %s) failed", dest.MasterAlias, cmd) + } + } + } + return nil +} + +// VerticalSplitClone initiates a VerticalSplitClone workflow. +func (wr *Wrangler) VerticalSplitClone(ctx context.Context, fromKeyspace, toKeyspace string, tables []string) error { + source, err := wr.ts.GetOnlyShard(ctx, fromKeyspace) + if err != nil { + return vterrors.Wrapf(err, "GetOnlyShard(%s) failed", fromKeyspace) + } + dest, err := wr.ts.GetOnlyShard(ctx, toKeyspace) + if err != nil { + return vterrors.Wrapf(err, "GetOnlyShard(%s) failed", toKeyspace) + } + // TODO(sougou): validate from and to shards. + + master, err := wr.ts.GetTablet(ctx, dest.MasterAlias) + if err != nil { + return vterrors.Wrapf(err, "GetTablet(%v) failed", dest.MasterAlias) + } + filter := &binlogdatapb.Filter{} + for _, table := range tables { + filter.Rules = append(filter.Rules, &binlogdatapb.Rule{ + Match: table, + }) + } + bls := &binlogdatapb.BinlogSource{ + Keyspace: fromKeyspace, + Shard: source.ShardName(), + Filter: filter, + } + cmd := binlogplayer.CreateVReplicationState("VSplitClone", bls, "", binlogplayer.BlpStopped, master.DbName()) + qr, err := wr.TabletManagerClient().VReplicationExec(ctx, master.Tablet, cmd) + if err != nil { + return vterrors.Wrapf(err, "VReplicationExec(%v, %s) failed", dest.MasterAlias, cmd) + } + if err := wr.SourceShardAdd(ctx, toKeyspace, dest.ShardName(), uint32(qr.InsertId), fromKeyspace, source.ShardName(), nil, tables); err != nil { + return vterrors.Wrapf(err, "SourceShardAdd(%s, %s) failed", dest.ShardName(), source.ShardName()) + } + // Start vreplication only if metadata was successfully created. + cmd = fmt.Sprintf("update _vt.vreplication set state='%s' where id=%d", binlogplayer.VReplicationInit, qr.InsertId) + if _, err = wr.TabletManagerClient().VReplicationExec(ctx, master.Tablet, cmd); err != nil { + return vterrors.Wrapf(err, "VReplicationExec(%v, %s) failed", dest.MasterAlias, cmd) + } + return nil +} + // ShowResharding shows all resharding related metadata for the keyspace/shard. func (wr *Wrangler) ShowResharding(ctx context.Context, keyspace, shard string) (err error) { ki, err := wr.ts.GetKeyspace(ctx, keyspace) From 78058b8c33b40b3d75e69739e31e228468a30279 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Mon, 22 Apr 2019 14:35:27 -0700 Subject: [PATCH 2/2] vreplication: refresh masters at end of splitclone Signed-off-by: Sugu Sougoumarane --- go/vt/wrangler/keyspace.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/wrangler/keyspace.go b/go/vt/wrangler/keyspace.go index 5113a04742f..3b193f87cd7 100644 --- a/go/vt/wrangler/keyspace.go +++ b/go/vt/wrangler/keyspace.go @@ -146,7 +146,7 @@ func (wr *Wrangler) SplitClone(ctx context.Context, keyspace string, from, to [] } } } - return nil + return wr.refreshMasters(ctx, toShards) } // VerticalSplitClone initiates a VerticalSplitClone workflow. @@ -189,7 +189,7 @@ func (wr *Wrangler) VerticalSplitClone(ctx context.Context, fromKeyspace, toKeys if _, err = wr.TabletManagerClient().VReplicationExec(ctx, master.Tablet, cmd); err != nil { return vterrors.Wrapf(err, "VReplicationExec(%v, %s) failed", dest.MasterAlias, cmd) } - return nil + return wr.refreshMasters(ctx, []*topo.ShardInfo{dest}) } // ShowResharding shows all resharding related metadata for the keyspace/shard.