Skip to content

Commit

Permalink
Merge pull request #5669 from planetscale/ss-vrepl-prep-lookup
Browse files Browse the repository at this point in the history
vrepl: Prep for lookup vindex backfill
  • Loading branch information
deepthi authored Jan 13, 2020
2 parents 803b736 + 2660176 commit 3670303
Show file tree
Hide file tree
Showing 9 changed files with 374 additions and 166 deletions.
224 changes: 118 additions & 106 deletions go/vt/proto/binlogdata/binlogdata.pb.go

Large diffs are not rendered by default.

27 changes: 27 additions & 0 deletions go/vt/vtgate/vindexes/consistent_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtgate"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
Expand Down Expand Up @@ -83,6 +84,12 @@ func (lu *ConsistentLookup) NeedsVCursor() bool {
// Map can map ids to key.Destination objects.
func (lu *ConsistentLookup) Map(vcursor VCursor, ids []sqltypes.Value) ([]key.Destination, error) {
out := make([]key.Destination, 0, len(ids))
if lu.writeOnly {
for range ids {
out = append(out, key.DestinationKeyRange{KeyRange: &topodatapb.KeyRange{}})
}
return out, nil
}

results, err := lu.lkp.Lookup(vcursor, ids)
if err != nil {
Expand Down Expand Up @@ -142,6 +149,13 @@ func (lu *ConsistentLookupUnique) NeedsVCursor() bool {
// Map can map ids to key.Destination objects.
func (lu *ConsistentLookupUnique) Map(vcursor VCursor, ids []sqltypes.Value) ([]key.Destination, error) {
out := make([]key.Destination, 0, len(ids))
if lu.writeOnly {
for range ids {
out = append(out, key.DestinationKeyRange{KeyRange: &topodatapb.KeyRange{}})
}
return out, nil
}

results, err := lu.lkp.Lookup(vcursor, ids)
if err != nil {
return nil, err
Expand All @@ -166,6 +180,7 @@ func (lu *ConsistentLookupUnique) Map(vcursor VCursor, ids []sqltypes.Value) ([]
// Unique and a Lookup.
type clCommon struct {
name string
writeOnly bool
lkp lookupInternal
keyspace string
ownerTable string
Expand All @@ -180,6 +195,11 @@ type clCommon struct {
// newCLCommon is commone code for the consistent lookup vindexes.
func newCLCommon(name string, m map[string]string) (*clCommon, error) {
lu := &clCommon{name: name}
var err error
lu.writeOnly, err = boolFromMap(m, "write_only")
if err != nil {
return nil, err
}

if err := lu.lkp.Init(m, false /* autocommit */, false /* upsert */); err != nil {
return nil, err
Expand Down Expand Up @@ -211,6 +231,13 @@ func (lu *clCommon) String() string {

// Verify returns true if ids maps to ksids.
func (lu *clCommon) Verify(vcursor VCursor, ids []sqltypes.Value, ksids [][]byte) ([]bool, error) {
if lu.writeOnly {
out := make([]bool, len(ids))
for i := range ids {
out[i] = true
}
return out, nil
}
return lu.lkp.VerifyCustom(vcursor, ids, ksidsToValues(ksids), vtgate.CommitOrder_PRE)
}

Expand Down
102 changes: 81 additions & 21 deletions go/vt/vtgate/vindexes/consistent_lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
"vitess.io/vitess/go/vt/sqlparser"
)

func TestConsistentLookupInit(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup")
lookup := createConsistentLookup(t, "consistent_lookup", true)
cols := []sqlparser.ColIdent{
sqlparser.NewColIdent("fc"),
}
Expand All @@ -43,26 +44,29 @@ func TestConsistentLookupInit(t *testing.T) {
if err == nil || !strings.Contains(err.Error(), want) {
t.Errorf("SetOwnerInfo: %v, want %v", err, want)
}
if got := lookup.(*ConsistentLookup).writeOnly; !got {
t.Errorf("lookup.writeOnly: false, want true")
}
}

func TestConsistentLookupInfo(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup")
lookup := createConsistentLookup(t, "consistent_lookup", false)
assert.Equal(t, 20, lookup.Cost())
assert.Equal(t, "consistent_lookup", lookup.String())
assert.False(t, lookup.IsUnique())
assert.True(t, lookup.NeedsVCursor())
}

func TestConsistentLookupUniqueInfo(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup_unique")
lookup := createConsistentLookup(t, "consistent_lookup_unique", false)
assert.Equal(t, 10, lookup.Cost())
assert.Equal(t, "consistent_lookup_unique", lookup.String())
assert.True(t, lookup.IsUnique())
assert.True(t, lookup.NeedsVCursor())
}

func TestConsistentLookupMap(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup")
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(makeTestResult(2), nil)
vc.AddResult(makeTestResult(2), nil)
Expand Down Expand Up @@ -98,8 +102,28 @@ func TestConsistentLookupMap(t *testing.T) {
}
}

func TestConsistentLookupMapWriteOnly(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup", true)

got, err := lookup.Map(nil, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)})
if err != nil {
t.Error(err)
}
want := []key.Destination{
key.DestinationKeyRange{
KeyRange: &topodatapb.KeyRange{},
},
key.DestinationKeyRange{
KeyRange: &topodatapb.KeyRange{},
},
}
if !reflect.DeepEqual(got, want) {
t.Errorf("Map(): %#v, want %+v", got, want)
}
}

func TestConsistentLookupUniqueMap(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup_unique")
lookup := createConsistentLookup(t, "consistent_lookup_unique", false)
vc := &loggingVCursor{}
vc.AddResult(makeTestResult(0), nil)
vc.AddResult(makeTestResult(1), nil)
Expand Down Expand Up @@ -129,8 +153,28 @@ func TestConsistentLookupUniqueMap(t *testing.T) {
}
}

func TestConsistentLookupUniqueMapWriteOnly(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup_unique", true)

got, err := lookup.Map(nil, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)})
if err != nil {
t.Error(err)
}
want := []key.Destination{
key.DestinationKeyRange{
KeyRange: &topodatapb.KeyRange{},
},
key.DestinationKeyRange{
KeyRange: &topodatapb.KeyRange{},
},
}
if !reflect.DeepEqual(got, want) {
t.Errorf("Map(): %#v, want %+v", got, want)
}
}

func TestConsistentLookupMapAbsent(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup")
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(makeTestResult(0), nil)
vc.AddResult(makeTestResult(0), nil)
Expand All @@ -153,7 +197,7 @@ func TestConsistentLookupMapAbsent(t *testing.T) {
}

func TestConsistentLookupVerify(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup")
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(makeTestResult(1), nil)
vc.AddResult(makeTestResult(1), nil)
Expand All @@ -174,10 +218,21 @@ func TestConsistentLookupVerify(t *testing.T) {
if err == nil || err.Error() != want {
t.Errorf("lookup(query fail) err: %v, want %s", err, want)
}

// Test write_only.
lookup = createConsistentLookup(t, "consistent_lookup", true)
got, err := lookup.Verify(nil, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}, [][]byte{[]byte(""), []byte("")})
if err != nil {
t.Error(err)
}
wantBools := []bool{true, true}
if !reflect.DeepEqual(got, wantBools) {
t.Errorf("lookup.Verify(writeOnly): %v, want %v", got, wantBools)
}
}

func TestConsistentLookupCreateSimple(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup")
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(&sqltypes.Result{}, nil)

Expand All @@ -199,7 +254,7 @@ func TestConsistentLookupCreateSimple(t *testing.T) {
}

func TestConsistentLookupCreateThenRecreate(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup")
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(nil, errors.New("Duplicate entry"))
vc.AddResult(&sqltypes.Result{}, nil)
Expand All @@ -222,7 +277,7 @@ func TestConsistentLookupCreateThenRecreate(t *testing.T) {
}

func TestConsistentLookupCreateThenUpdate(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup")
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(nil, errors.New("Duplicate entry"))
vc.AddResult(makeTestResult(1), nil)
Expand All @@ -247,7 +302,7 @@ func TestConsistentLookupCreateThenUpdate(t *testing.T) {
}

func TestConsistentLookupCreateThenSkipUpdate(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup")
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(nil, errors.New("Duplicate entry"))
vc.AddResult(makeTestResult(1), nil)
Expand All @@ -271,7 +326,7 @@ func TestConsistentLookupCreateThenSkipUpdate(t *testing.T) {
}

func TestConsistentLookupCreateThenDupkey(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup")
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(nil, errors.New("Duplicate entry"))
vc.AddResult(makeTestResult(1), nil)
Expand All @@ -297,7 +352,7 @@ func TestConsistentLookupCreateThenDupkey(t *testing.T) {
}

func TestConsistentLookupCreateNonDupError(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup")
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(nil, errors.New("general error"))

Expand All @@ -318,7 +373,7 @@ func TestConsistentLookupCreateNonDupError(t *testing.T) {
}

func TestConsistentLookupCreateThenBadRows(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup")
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(nil, errors.New("Duplicate entry"))
vc.AddResult(makeTestResult(2), nil)
Expand All @@ -341,7 +396,7 @@ func TestConsistentLookupCreateThenBadRows(t *testing.T) {
}

func TestConsistentLookupDelete(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup")
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(&sqltypes.Result{}, nil)

Expand All @@ -359,7 +414,7 @@ func TestConsistentLookupDelete(t *testing.T) {
}

func TestConsistentLookupUpdate(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup")
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(&sqltypes.Result{}, nil)
vc.AddResult(&sqltypes.Result{}, nil)
Expand All @@ -383,7 +438,7 @@ func TestConsistentLookupUpdate(t *testing.T) {
}

func TestConsistentLookupNoUpdate(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup")
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(&sqltypes.Result{}, nil)
vc.AddResult(&sqltypes.Result{}, nil)
Expand All @@ -403,12 +458,17 @@ func TestConsistentLookupNoUpdate(t *testing.T) {
vc.verifyLog(t, []string{})
}

func createConsistentLookup(t *testing.T, name string) SingleColumn {
func createConsistentLookup(t *testing.T, name string, writeOnly bool) SingleColumn {
t.Helper()
write := "false"
if writeOnly {
write = "true"
}
l, err := CreateVindex(name, name, map[string]string{
"table": "t",
"from": "fromc1,fromc2",
"to": "toc",
"table": "t",
"from": "fromc1,fromc2",
"to": "toc",
"write_only": write,
})
if err != nil {
t.Fatal(err)
Expand Down
67 changes: 67 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,73 @@ func TestPlayerCopyTablesNone(t *testing.T) {
})
}

func TestPlayerCopyTablesStopAfterCopy(t *testing.T) {
defer deleteTablet(addTablet(100))

execStatements(t, []string{
"create table src1(id int, val varbinary(128), primary key(id))",
"insert into src1 values(2, 'bbb'), (1, 'aaa')",
fmt.Sprintf("create table %s.dst1(id int, val varbinary(128), primary key(id))", vrepldb),
})
defer execStatements(t, []string{
"drop table src1",
fmt.Sprintf("drop table %s.dst1", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "dst1",
Filter: "select * from src1",
}},
}

bls := &binlogdatapb.BinlogSource{
Keyspace: env.KeyspaceName,
Shard: env.ShardName,
Filter: filter,
OnDdl: binlogdatapb.OnDDLAction_IGNORE,
StopAfterCopy: true,
}
query := binlogplayer.CreateVReplicationState("test", bls, "", binlogplayer.VReplicationInit, playerEngine.dbName)
qr, err := playerEngine.Exec(query)
if err != nil {
t.Fatal(err)
}
defer func() {
query := fmt.Sprintf("delete from _vt.vreplication where id = %d", qr.InsertID)
if _, err := playerEngine.Exec(query); err != nil {
t.Fatal(err)
}
expectDeleteQueries(t)
}()

expectDBClientQueries(t, []string{
"/insert into _vt.vreplication",
// Create the list of tables to copy and transition to Copying state.
"begin",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
"commit",
"rollback",
// The first fast-forward has no starting point. So, it just saves the current position.
"/update _vt.vreplication set pos=",
"begin",
"insert into dst1(id,val) values (1,'aaa'), (2,'bbb')",
`/update _vt.copy_state set lastpk='fields:<name:\\"id\\" type:INT32 > rows:<lengths:1 values:\\"2\\" > ' where vrepl_id=.*`,
"commit",
// copy of dst1 is done: delete from copy_state.
"/delete from _vt.copy_state.*dst1",
"rollback",
// All tables copied. Stop vreplication because we requested it.
"/update _vt.vreplication set state='Stopped'",
})
expectData(t, "dst1", [][]string{
{"1", "aaa"},
{"2", "bbb"},
})
}

func TestPlayerCopyTableCancel(t *testing.T) {
defer deleteTablet(addTablet(100))

Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ func (vr *vreplicator) Replicate(ctx context.Context) error {
return err
}
default:
if vr.source.StopAfterCopy {
return vr.setState(binlogplayer.BlpStopped, "Stopped after copy.")
}
if err := vr.setState(binlogplayer.BlpRunning, ""); err != nil {
return err
}
Expand Down
Loading

0 comments on commit 3670303

Please sign in to comment.