Skip to content

Commit

Permalink
Merge pull request #4829 from planetscale/ss-vcopy
Browse files Browse the repository at this point in the history
vreplication: SplitClone & VerticalSplitClone
  • Loading branch information
sougou authored Apr 23, 2019
2 parents eb85db2 + 78058b8 commit c585c8f
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 7 deletions.
2 changes: 1 addition & 1 deletion examples/local/vttablet-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ source $script_root/env.sh

init_db_sql_file="$VTROOT/config/init_db.sql"

export EXTRA_MY_CNF=$VTROOT/config/mycnf/default-fast.cnf
export EXTRA_MY_CNF=$VTROOT/config/mycnf/default-fast.cnf:$VTROOT/config/mycnf/rbr.cnf

case "$MYSQL_FLAVOR" in
"MySQL56")
Expand Down
2 changes: 1 addition & 1 deletion go/vt/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func ParseKeyRangeParts(start, end string) (*topodatapb.KeyRange, error) {
// KeyRangeString prints a topodatapb.KeyRange
func KeyRangeString(k *topodatapb.KeyRange) string {
if k == nil {
return "<nil>"
return "-"
}
return hex.EncodeToString(k.Start) + "-" + hex.EncodeToString(k.End)
}
Expand Down
14 changes: 14 additions & 0 deletions go/vt/topo/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,20 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string)
return result, nil
}

// GetOnlyShard returns the single ShardInfo of an unsharded keyspace.
func (ts *Server) GetOnlyShard(ctx context.Context, keyspace string) (*ShardInfo, error) {
allShards, err := ts.FindAllShardsInKeyspace(ctx, keyspace)
if err != nil {
return nil, err
}
if len(allShards) == 1 {
for _, s := range allShards {
return s, nil
}
}
return nil, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "keyspace %s must have one and only one shard: %v", keyspace, allShards)
}

// DeleteKeyspace wraps the underlying Conn.Delete
// and dispatches the event.
func (ts *Server) DeleteKeyspace(ctx context.Context, keyspace string) error {
Expand Down
32 changes: 32 additions & 0 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,12 @@ var commands = []commandGroup{
{"ValidateKeyspace", commandValidateKeyspace,
"[-ping-tablets] <keyspace name>",
"Validates that all nodes reachable from the specified keyspace are consistent."},
{"SplitClone", commandSplitClone,
"<keyspace> <from_shards> <to_shards>",
"Start the SplitClone process to perform horizontal resharding. Example: SplitClone ks '0' '-80,80-'"},
{"VerticalSplitClone", commandVerticalSplitClone,
"<from_keyspace> <to_keyspace> <tables>",
"Start the VerticalSplitClone process to perform vertical resharding. Example: SplitClone from_ks to_ks 'a,/b.*/'"},
{"MigrateServedTypes", commandMigrateServedTypes,
"[-cells=c1,c2,...] [-reverse] [-skip-refresh-state] <keyspace/shard> <served tablet type>",
"Migrates a serving type from the source shard to the shards that it replicates to. This command also rebuilds the serving graph. The <keyspace/shard> argument can specify any of the shards involved in the migration."},
Expand Down Expand Up @@ -1718,6 +1724,32 @@ func commandValidateKeyspace(ctx context.Context, wr *wrangler.Wrangler, subFlag
return wr.ValidateKeyspace(ctx, keyspace, *pingTablets)
}

func commandSplitClone(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 3 {
return fmt.Errorf("three arguments are required: keyspace, from_shards, to_shards")
}
keyspace := subFlags.Arg(0)
from := strings.Split(subFlags.Arg(1), ",")
to := strings.Split(subFlags.Arg(2), ",")
return wr.SplitClone(ctx, keyspace, from, to)
}

func commandVerticalSplitClone(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 3 {
return fmt.Errorf("three arguments are required: from_keyspace, to_keyspace, tables")
}
fromKeyspace := subFlags.Arg(0)
toKeyspace := subFlags.Arg(1)
tables := strings.Split(subFlags.Arg(2), ",")
return wr.VerticalSplitClone(ctx, fromKeyspace, toKeyspace, tables)
}

func commandMigrateServedTypes(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
cellsStr := subFlags.String("cells", "", "Specifies a comma-separated list of cells to update")
reverse := subFlags.Bool("reverse", false, "Moves the served tablet type backward instead of forward. Use in case of trouble")
Expand Down
10 changes: 8 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,13 @@ func buildQuery(tableName, filter string) string {
}

func buildTablePlan(rule *binlogdatapb.Rule, tableKeys map[string][]string, lastpk *sqltypes.Result) (*TablePlan, error) {
sel, fromTable, err := analyzeSelectFrom(rule.Filter)
query := rule.Filter
if query == "" {
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("select * from %v", sqlparser.NewTableIdent(rule.Match))
query = buf.String()
}
sel, fromTable, err := analyzeSelectFrom(query)
if err != nil {
return nil, err
}
Expand All @@ -162,7 +168,7 @@ func buildTablePlan(rule *binlogdatapb.Rule, tableKeys map[string][]string, last
if !expr.TableName.IsEmpty() {
return nil, fmt.Errorf("unsupported qualifier for '*' expression: %v", sqlparser.String(expr))
}
sendRule.Filter = rule.Filter
sendRule.Filter = query
tablePlan := &TablePlan{
TargetName: rule.Match,
SendRule: sendRule,
Expand Down
11 changes: 8 additions & 3 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
// play is not resumable. If pausePos is set, play returns without updating the vreplication state.
func (vp *vplayer) play(ctx context.Context) error {
if !vp.stopPos.IsZero() && vp.startPos.AtLeast(vp.stopPos) {
return vp.vr.setState(binlogplayer.BlpStopped, fmt.Sprintf("Stop position %v already reached: %v", vp.startPos, vp.stopPos))
if vp.saveStop {
return vp.vr.setState(binlogplayer.BlpStopped, fmt.Sprintf("Stop position %v already reached: %v", vp.startPos, vp.stopPos))
}
return nil
}

plan, err := buildReplicatorPlan(vp.vr.source.Filter, vp.vr.tableKeys, vp.copyState)
Expand Down Expand Up @@ -324,8 +327,10 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
}
if !vp.pos.Equal(vp.stopPos) && vp.pos.AtLeast(vp.stopPos) {
// Code is unreachable, but bad data can cause this to happen.
if err := vp.vr.setState(binlogplayer.BlpStopped, fmt.Sprintf("next event position %v exceeds stop pos %v, exiting without applying", vp.pos, vp.stopPos)); err != nil {
return err
if vp.saveStop {
if err := vp.vr.setState(binlogplayer.BlpStopped, fmt.Sprintf("next event position %v exceeds stop pos %v, exiting without applying", vp.pos, vp.stopPos)); err != nil {
return err
}
}
return io.EOF
}
Expand Down
102 changes: 102 additions & 0 deletions go/vt/wrangler/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/topotools/events"
"vitess.io/vitess/go/vt/vterrors"
)

const (
Expand Down Expand Up @@ -90,6 +91,107 @@ func (wr *Wrangler) SetKeyspaceShardingInfo(ctx context.Context, keyspace, shard
return wr.ts.UpdateKeyspace(ctx, ki)
}

// SplitClone initiates a SplitClone workflow.
func (wr *Wrangler) SplitClone(ctx context.Context, keyspace string, from, to []string) error {
var fromShards, toShards []*topo.ShardInfo
for _, shard := range from {
si, err := wr.ts.GetShard(ctx, keyspace, shard)
if err != nil {
return vterrors.Wrapf(err, "GetShard(%s) failed", shard)
}
fromShards = append(fromShards, si)
}
for _, shard := range to {
si, err := wr.ts.GetShard(ctx, keyspace, shard)
if err != nil {
return vterrors.Wrapf(err, "GetShard(%s) failed", shard)
}
toShards = append(toShards, si)
}
// TODO(sougou): validate from and to shards.

for _, dest := range toShards {
master, err := wr.ts.GetTablet(ctx, dest.MasterAlias)
if err != nil {
return vterrors.Wrapf(err, "GetTablet(%v) failed", dest.MasterAlias)
}
var ids []uint64
for _, source := range fromShards {
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "/.*",
Filter: key.KeyRangeString(dest.KeyRange),
}},
}
bls := &binlogdatapb.BinlogSource{
Keyspace: keyspace,
Shard: source.ShardName(),
Filter: filter,
}
cmd := binlogplayer.CreateVReplicationState("VSplitClone", bls, "", binlogplayer.BlpStopped, master.DbName())
qr, err := wr.TabletManagerClient().VReplicationExec(ctx, master.Tablet, cmd)
if err != nil {
return vterrors.Wrapf(err, "VReplicationExec(%v, %s) failed", dest.MasterAlias, cmd)
}
if err := wr.SourceShardAdd(ctx, keyspace, dest.ShardName(), uint32(qr.InsertId), keyspace, source.ShardName(), source.Shard.KeyRange, nil); err != nil {
return vterrors.Wrapf(err, "SourceShardAdd(%s, %s) failed", dest.ShardName(), source.ShardName())
}
ids = append(ids, qr.InsertId)
}
// Start vreplication only if all metadata was successfully created.
for _, id := range ids {
cmd := fmt.Sprintf("update _vt.vreplication set state='%s' where id=%d", binlogplayer.VReplicationInit, id)
if _, err = wr.TabletManagerClient().VReplicationExec(ctx, master.Tablet, cmd); err != nil {
return vterrors.Wrapf(err, "VReplicationExec(%v, %s) failed", dest.MasterAlias, cmd)
}
}
}
return wr.refreshMasters(ctx, toShards)
}

// VerticalSplitClone initiates a VerticalSplitClone workflow.
func (wr *Wrangler) VerticalSplitClone(ctx context.Context, fromKeyspace, toKeyspace string, tables []string) error {
source, err := wr.ts.GetOnlyShard(ctx, fromKeyspace)
if err != nil {
return vterrors.Wrapf(err, "GetOnlyShard(%s) failed", fromKeyspace)
}
dest, err := wr.ts.GetOnlyShard(ctx, toKeyspace)
if err != nil {
return vterrors.Wrapf(err, "GetOnlyShard(%s) failed", toKeyspace)
}
// TODO(sougou): validate from and to shards.

master, err := wr.ts.GetTablet(ctx, dest.MasterAlias)
if err != nil {
return vterrors.Wrapf(err, "GetTablet(%v) failed", dest.MasterAlias)
}
filter := &binlogdatapb.Filter{}
for _, table := range tables {
filter.Rules = append(filter.Rules, &binlogdatapb.Rule{
Match: table,
})
}
bls := &binlogdatapb.BinlogSource{
Keyspace: fromKeyspace,
Shard: source.ShardName(),
Filter: filter,
}
cmd := binlogplayer.CreateVReplicationState("VSplitClone", bls, "", binlogplayer.BlpStopped, master.DbName())
qr, err := wr.TabletManagerClient().VReplicationExec(ctx, master.Tablet, cmd)
if err != nil {
return vterrors.Wrapf(err, "VReplicationExec(%v, %s) failed", dest.MasterAlias, cmd)
}
if err := wr.SourceShardAdd(ctx, toKeyspace, dest.ShardName(), uint32(qr.InsertId), fromKeyspace, source.ShardName(), nil, tables); err != nil {
return vterrors.Wrapf(err, "SourceShardAdd(%s, %s) failed", dest.ShardName(), source.ShardName())
}
// Start vreplication only if metadata was successfully created.
cmd = fmt.Sprintf("update _vt.vreplication set state='%s' where id=%d", binlogplayer.VReplicationInit, qr.InsertId)
if _, err = wr.TabletManagerClient().VReplicationExec(ctx, master.Tablet, cmd); err != nil {
return vterrors.Wrapf(err, "VReplicationExec(%v, %s) failed", dest.MasterAlias, cmd)
}
return wr.refreshMasters(ctx, []*topo.ShardInfo{dest})
}

// ShowResharding shows all resharding related metadata for the keyspace/shard.
func (wr *Wrangler) ShowResharding(ctx context.Context, keyspace, shard string) (err error) {
ki, err := wr.ts.GetKeyspace(ctx, keyspace)
Expand Down

0 comments on commit c585c8f

Please sign in to comment.