Skip to content

Commit

Permalink
Merge pull request #4965 from planetscale/ss-alt-vindex-sort
Browse files Browse the repository at this point in the history
vindexes: alt approach for lookup sorted inserts
  • Loading branch information
sougou authored Jun 27, 2019
2 parents 84a36ab + 3daf82c commit e6dd092
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 24 deletions.
45 changes: 45 additions & 0 deletions go/vt/vtgate/endtoend/lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,51 @@ func TestConsistentLookupMultiInsert(t *testing.T) {
if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(5)]]"; got != want {
t.Errorf("select:\n%v want\n%v", got, want)
}
exec(t, conn, "delete from t1 where id1=1")
exec(t, conn, "delete from t1 where id1=2")
exec(t, conn, "delete from t1 where id1=3")
exec(t, conn, "delete from t1 where id1=4")
exec(t, conn, "delete from t1_id2_idx where id2=4")
}

func TestHashLookupMultiInsertIgnore(t *testing.T) {
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
if err != nil {
t.Fatal(err)
}
defer conn.Close()
// conn2 is for queries that target shards.
conn2, err := mysql.Connect(ctx, &vtParams)
if err != nil {
t.Fatal(err)
}
defer conn2.Close()

// DB should start out clean
qr := exec(t, conn, "select count(*) from t2_id4_idx")
if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(0)]]"; got != want {
t.Errorf("select:\n%v want\n%v", got, want)
}
qr = exec(t, conn, "select count(*) from t2")
if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(0)]]"; got != want {
t.Errorf("select:\n%v want\n%v", got, want)
}

// Try inserting a bunch of ids at once
exec(t, conn, "begin")
exec(t, conn, "insert ignore into t2(id3, id4) values(50,60), (30,40), (10,20)")
exec(t, conn, "commit")

// Verify
qr = exec(t, conn, "select id3, id4 from t2 order by id3")
if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(10) INT64(20)] [INT64(30) INT64(40)] [INT64(50) INT64(60)]]"; got != want {
t.Errorf("select:\n%v want\n%v", got, want)
}
qr = exec(t, conn, "select id3, id4 from t2_id4_idx order by id3")
if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(10) INT64(20)] [INT64(30) INT64(40)] [INT64(50) INT64(60)]]"; got != want {
t.Errorf("select:\n%v want\n%v", got, want)
}
}

func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
Expand Down
39 changes: 39 additions & 0 deletions go/vt/vtgate/endtoend/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ create table aggr_test(
val2 bigint,
primary key(id)
) Engine=InnoDB;
create table t2(
id3 bigint,
id4 bigint,
primary key(id3)
) Engine=InnoDB;
create table t2_id4_idx(
id bigint not null auto_increment,
id4 bigint,
id3 bigint,
primary key(id),
key idx_id4(id4)
) Engine=InnoDB;
`

vschema = &vschemapb.Keyspace{
Expand All @@ -78,6 +92,16 @@ create table aggr_test(
},
Owner: "t1",
},
"t2_id4_idx": {
Type: "lookup_hash",
Params: map[string]string{
"table": "t2_id4_idx",
"from": "id4",
"to": "id3",
"autocommit": "true",
},
Owner: "t2",
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
Expand All @@ -95,6 +119,21 @@ create table aggr_test(
Name: "hash",
}},
},
"t2": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id3",
Name: "hash",
}, {
Column: "id4",
Name: "t2_id4_idx",
}},
},
"t2_id4_idx": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id4",
Name: "hash",
}},
},
"vstream_test": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id",
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/vindexes/consistent_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (lu *clCommon) Verify(vcursor VCursor, ids []sqltypes.Value, ksids [][]byte

// Create reserves the id by inserting it into the vindex table.
func (lu *clCommon) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, ignoreMode bool) error {
err := lu.lkp.createCustom(vcursor, rowsColValues, ksids, ksidsToValues(ksids), ignoreMode, vtgatepb.CommitOrder_PRE)
err := lu.lkp.createCustom(vcursor, rowsColValues, ksidsToValues(ksids), ignoreMode, vtgatepb.CommitOrder_PRE)
if err == nil {
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/vindexes/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (ln *LookupNonUnique) Verify(vcursor VCursor, ids []sqltypes.Value, ksids [

// Create reserves the id by inserting it into the vindex table.
func (ln *LookupNonUnique) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, ignoreMode bool) error {
return ln.lkp.Create(vcursor, rowsColValues, ksids, ksidsToValues(ksids), ignoreMode)
return ln.lkp.Create(vcursor, rowsColValues, ksidsToValues(ksids), ignoreMode)
}

// Delete deletes the entry from the vindex table.
Expand Down Expand Up @@ -261,7 +261,7 @@ func (lu *LookupUnique) Verify(vcursor VCursor, ids []sqltypes.Value, ksids [][]

// Create reserves the id by inserting it into the vindex table.
func (lu *LookupUnique) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, ignoreMode bool) error {
return lu.lkp.Create(vcursor, rowsColValues, ksids, ksidsToValues(ksids), ignoreMode)
return lu.lkp.Create(vcursor, rowsColValues, ksidsToValues(ksids), ignoreMode)
}

// Update updates the entry in the vindex table.
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/vindexes/lookup_hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (lh *LookupHash) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value,
if err != nil {
return fmt.Errorf("lookup.Create.vunhash: %v", err)
}
return lh.lkp.Create(vcursor, rowsColValues, ksids, values, ignoreMode)
return lh.lkp.Create(vcursor, rowsColValues, values, ignoreMode)
}

// Update updates the entry in the vindex table.
Expand Down Expand Up @@ -309,7 +309,7 @@ func (lhu *LookupHashUnique) Create(vcursor VCursor, rowsColValues [][]sqltypes.
if err != nil {
return fmt.Errorf("lookup.Create.vunhash: %v", err)
}
return lhu.lkp.Create(vcursor, rowsColValues, ksids, values, ignoreMode)
return lhu.lkp.Create(vcursor, rowsColValues, values, ignoreMode)
}

// Delete deletes the entry from the vindex table.
Expand Down
17 changes: 9 additions & 8 deletions go/vt/vtgate/vindexes/lookup_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ func (lkp *lookupInternal) Verify(vcursor VCursor, ids, values []sqltypes.Value)

type sorter struct {
rowsColValues [][]sqltypes.Value
ksids [][]byte
toValues []sqltypes.Value
}

Expand All @@ -133,7 +132,6 @@ func (v *sorter) Less(i, j int) bool {

func (v *sorter) Swap(i, j int) {
v.toValues[i], v.toValues[j] = v.toValues[j], v.toValues[i]
v.ksids[i], v.ksids[j] = v.ksids[j], v.ksids[i]
v.rowsColValues[i], v.rowsColValues[j] = v.rowsColValues[j], v.rowsColValues[i]
}

Expand All @@ -147,14 +145,14 @@ func (v *sorter) Swap(i, j int) {
// If we assume that the primary vindex is on column_c. The call to create will look like this:
// Create(vcursor, [[value_a0, value_b0,], [value_a1, value_b1]], [binary(value_c0), binary(value_c1)])
// Notice that toValues contains the computed binary value of the keyspace_id.
func (lkp *lookupInternal) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, toValues []sqltypes.Value, ignoreMode bool) error {
func (lkp *lookupInternal) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, toValues []sqltypes.Value, ignoreMode bool) error {
if lkp.Autocommit {
return lkp.createCustom(vcursor, rowsColValues, ksids, toValues, ignoreMode, vtgatepb.CommitOrder_AUTOCOMMIT)
return lkp.createCustom(vcursor, rowsColValues, toValues, ignoreMode, vtgatepb.CommitOrder_AUTOCOMMIT)
}
return lkp.createCustom(vcursor, rowsColValues, ksids, toValues, ignoreMode, vtgatepb.CommitOrder_NORMAL)
return lkp.createCustom(vcursor, rowsColValues, toValues, ignoreMode, vtgatepb.CommitOrder_NORMAL)
}

func (lkp *lookupInternal) createCustom(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, toValues []sqltypes.Value, ignoreMode bool, co vtgatepb.CommitOrder) error {
func (lkp *lookupInternal) createCustom(vcursor VCursor, rowsColValues [][]sqltypes.Value, toValues []sqltypes.Value, ignoreMode bool, co vtgatepb.CommitOrder) error {
if len(rowsColValues) == 0 {
// This code is unreachable. It's just a failsafe.
return nil
Expand All @@ -176,7 +174,10 @@ func (lkp *lookupInternal) createCustom(vcursor VCursor, rowsColValues [][]sqlty
fmt.Fprintf(buf, "%s) values(", lkp.To)

bindVars := make(map[string]*querypb.BindVariable, 2*len(rowsColValues))
sort.Sort(&sorter{rowsColValues: rowsColValues, ksids: ksids, toValues: toValues})
// Make a copy before sorting.
rowsColValues = append([][]sqltypes.Value(nil), rowsColValues...)
toValues = append([]sqltypes.Value(nil), toValues...)
sort.Sort(&sorter{rowsColValues: rowsColValues, toValues: toValues})
for rowIdx := range toValues {
colIds := rowsColValues[rowIdx]
if rowIdx != 0 {
Expand Down Expand Up @@ -254,7 +255,7 @@ func (lkp *lookupInternal) Update(vcursor VCursor, oldValues []sqltypes.Value, k
if err := lkp.Delete(vcursor, [][]sqltypes.Value{oldValues}, toValue, vtgatepb.CommitOrder_NORMAL); err != nil {
return err
}
return lkp.Create(vcursor, [][]sqltypes.Value{newValues}, [][]byte{ksid}, []sqltypes.Value{toValue}, false /* ignoreMode */)
return lkp.Create(vcursor, [][]sqltypes.Value{newValues}, []sqltypes.Value{toValue}, false /* ignoreMode */)
}

func (lkp *lookupInternal) initDelStmt() string {
Expand Down
10 changes: 1 addition & 9 deletions go/vt/vtgate/vindexes/lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,18 +377,10 @@ func TestLookupNonUniqueCreate(t *testing.T) {

// With ignore.
vc.queries = nil
rowsColsValues := [][]sqltypes.Value{{sqltypes.NewInt64(2)}, {sqltypes.NewInt64(1)}}
ksids := [][]byte{[]byte("test2"), []byte("test1")}
err = lookupNonUnique.(Lookup).Create(vc, rowsColsValues, ksids, true /* ignoreMode */)
err = lookupNonUnique.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NewInt64(2)}, {sqltypes.NewInt64(1)}}, [][]byte{[]byte("test2"), []byte("test1")}, true /* ignoreMode */)
if err != nil {
t.Error(err)
}
if !reflect.DeepEqual(rowsColsValues, [][]sqltypes.Value{{sqltypes.NewInt64(1)}, {sqltypes.NewInt64(2)}}) {
t.Errorf("inserts not reordered. Lookup table inserts get reordered on a bulk insert to avoid locking")
}
if !reflect.DeepEqual(ksids, [][]byte{[]byte("test1"), []byte("test2")}) {
t.Errorf("keyspace ids not reordered. Keyspace ids must also get reordered on a bulk insert")
}

wantqueries[0].Sql = "insert ignore into t(fromc, toc) values(:fromc0, :toc0), (:fromc1, :toc1)"
if !reflect.DeepEqual(vc.queries, wantqueries) {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/vindexes/lookup_unicodeloosemd5_hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (lh *LookupUnicodeLooseMD5Hash) Create(vcursor VCursor, rowsColValues [][]s
if err != nil {
return fmt.Errorf("lookup.Create.convert: %v", err)
}
return lh.lkp.Create(vcursor, rowsColValues, ksids, values, ignoreMode)
return lh.lkp.Create(vcursor, rowsColValues, values, ignoreMode)
}

// Update updates the entry in the vindex table.
Expand Down Expand Up @@ -333,7 +333,7 @@ func (lhu *LookupUnicodeLooseMD5HashUnique) Create(vcursor VCursor, rowsColValue
if err != nil {
return fmt.Errorf("lookup.Create.convert: %v", err)
}
return lhu.lkp.Create(vcursor, rowsColValues, ksids, values, ignoreMode)
return lhu.lkp.Create(vcursor, rowsColValues, values, ignoreMode)
}

// Delete deletes the entry from the vindex table.
Expand Down

0 comments on commit e6dd092

Please sign in to comment.