Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reorder keyspace ids when reordering bulk lookup table inserts. #4919

Merged
merged 1 commit into from
Jun 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/vt/vtgate/vindexes/consistent_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (lu *clCommon) Verify(vcursor VCursor, ids []sqltypes.Value, ksids [][]byte

// Create reserves the id by inserting it into the vindex table.
func (lu *clCommon) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, ignoreMode bool) error {
err := lu.lkp.createCustom(vcursor, rowsColValues, ksidsToValues(ksids), ignoreMode, vtgatepb.CommitOrder_PRE)
err := lu.lkp.createCustom(vcursor, rowsColValues, ksids, ksidsToValues(ksids), ignoreMode, vtgatepb.CommitOrder_PRE)
if err == nil {
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/vindexes/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (ln *LookupNonUnique) Verify(vcursor VCursor, ids []sqltypes.Value, ksids [

// Create reserves the id by inserting it into the vindex table.
func (ln *LookupNonUnique) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, ignoreMode bool) error {
return ln.lkp.Create(vcursor, rowsColValues, ksidsToValues(ksids), ignoreMode)
return ln.lkp.Create(vcursor, rowsColValues, ksids, ksidsToValues(ksids), ignoreMode)
}

// Delete deletes the entry from the vindex table.
Expand All @@ -118,7 +118,7 @@ func (ln *LookupNonUnique) Delete(vcursor VCursor, rowsColValues [][]sqltypes.Va

// Update updates the entry in the vindex table.
func (ln *LookupNonUnique) Update(vcursor VCursor, oldValues []sqltypes.Value, ksid []byte, newValues []sqltypes.Value) error {
return ln.lkp.Update(vcursor, oldValues, sqltypes.MakeTrusted(sqltypes.VarBinary, ksid), newValues)
return ln.lkp.Update(vcursor, oldValues, ksid, sqltypes.MakeTrusted(sqltypes.VarBinary, ksid), newValues)
}

// MarshalJSON returns a JSON representation of LookupHash.
Expand Down Expand Up @@ -261,12 +261,12 @@ func (lu *LookupUnique) Verify(vcursor VCursor, ids []sqltypes.Value, ksids [][]

// Create reserves the id by inserting it into the vindex table.
func (lu *LookupUnique) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, ignoreMode bool) error {
return lu.lkp.Create(vcursor, rowsColValues, ksidsToValues(ksids), ignoreMode)
return lu.lkp.Create(vcursor, rowsColValues, ksids, ksidsToValues(ksids), ignoreMode)
}

// Update updates the entry in the vindex table.
func (lu *LookupUnique) Update(vcursor VCursor, oldValues []sqltypes.Value, ksid []byte, newValues []sqltypes.Value) error {
return lu.lkp.Update(vcursor, oldValues, sqltypes.MakeTrusted(sqltypes.VarBinary, ksid), newValues)
return lu.lkp.Update(vcursor, oldValues, ksid, sqltypes.MakeTrusted(sqltypes.VarBinary, ksid), newValues)
}

// Delete deletes the entry from the vindex table.
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/vindexes/lookup_hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (lh *LookupHash) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value,
if err != nil {
return fmt.Errorf("lookup.Create.vunhash: %v", err)
}
return lh.lkp.Create(vcursor, rowsColValues, values, ignoreMode)
return lh.lkp.Create(vcursor, rowsColValues, ksids, values, ignoreMode)
}

// Update updates the entry in the vindex table.
Expand All @@ -164,7 +164,7 @@ func (lh *LookupHash) Update(vcursor VCursor, oldValues []sqltypes.Value, ksid [
if err != nil {
return fmt.Errorf("lookup.Update.vunhash: %v", err)
}
return lh.lkp.Update(vcursor, oldValues, sqltypes.NewUint64(v), newValues)
return lh.lkp.Update(vcursor, oldValues, ksid, sqltypes.NewUint64(v), newValues)
}

// Delete deletes the entry from the vindex table.
Expand Down Expand Up @@ -309,7 +309,7 @@ func (lhu *LookupHashUnique) Create(vcursor VCursor, rowsColValues [][]sqltypes.
if err != nil {
return fmt.Errorf("lookup.Create.vunhash: %v", err)
}
return lhu.lkp.Create(vcursor, rowsColValues, values, ignoreMode)
return lhu.lkp.Create(vcursor, rowsColValues, ksids, values, ignoreMode)
}

// Delete deletes the entry from the vindex table.
Expand All @@ -327,7 +327,7 @@ func (lhu *LookupHashUnique) Update(vcursor VCursor, oldValues []sqltypes.Value,
if err != nil {
return fmt.Errorf("lookup.Update.vunhash: %v", err)
}
return lhu.lkp.Update(vcursor, oldValues, sqltypes.NewUint64(v), newValues)
return lhu.lkp.Update(vcursor, oldValues, ksid, sqltypes.NewUint64(v), newValues)
}

// MarshalJSON returns a JSON representation of LookupHashUnique.
Expand Down
18 changes: 10 additions & 8 deletions go/vt/vtgate/vindexes/lookup_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (lkp *lookupInternal) Verify(vcursor VCursor, ids, values []sqltypes.Value)

type sorter struct {
rowsColValues [][]sqltypes.Value
ksids [][]byte
toValues []sqltypes.Value
}

Expand All @@ -132,6 +133,7 @@ func (v *sorter) Less(i, j int) bool {

func (v *sorter) Swap(i, j int) {
v.toValues[i], v.toValues[j] = v.toValues[j], v.toValues[i]
v.ksids[i], v.ksids[j] = v.ksids[j], v.ksids[i]
v.rowsColValues[i], v.rowsColValues[j] = v.rowsColValues[j], v.rowsColValues[i]
}

Expand All @@ -145,14 +147,14 @@ func (v *sorter) Swap(i, j int) {
// If we assume that the primary vindex is on column_c. The call to create will look like this:
// Create(vcursor, [[value_a0, value_b0,], [value_a1, value_b1]], [binary(value_c0), binary(value_c1)])
// Notice that toValues contains the computed binary value of the keyspace_id.
func (lkp *lookupInternal) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, toValues []sqltypes.Value, ignoreMode bool) error {
func (lkp *lookupInternal) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, toValues []sqltypes.Value, ignoreMode bool) error {
if lkp.Autocommit {
return lkp.createCustom(vcursor, rowsColValues, toValues, ignoreMode, vtgatepb.CommitOrder_AUTOCOMMIT)
return lkp.createCustom(vcursor, rowsColValues, ksids, toValues, ignoreMode, vtgatepb.CommitOrder_AUTOCOMMIT)
}
return lkp.createCustom(vcursor, rowsColValues, toValues, ignoreMode, vtgatepb.CommitOrder_NORMAL)
return lkp.createCustom(vcursor, rowsColValues, ksids, toValues, ignoreMode, vtgatepb.CommitOrder_NORMAL)
}

func (lkp *lookupInternal) createCustom(vcursor VCursor, rowsColValues [][]sqltypes.Value, toValues []sqltypes.Value, ignoreMode bool, co vtgatepb.CommitOrder) error {
func (lkp *lookupInternal) createCustom(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, toValues []sqltypes.Value, ignoreMode bool, co vtgatepb.CommitOrder) error {
if len(rowsColValues) == 0 {
// This code is unreachable. It's just a failsafe.
return nil
Expand All @@ -174,7 +176,7 @@ func (lkp *lookupInternal) createCustom(vcursor VCursor, rowsColValues [][]sqlty
fmt.Fprintf(buf, "%s) values(", lkp.To)

bindVars := make(map[string]*querypb.BindVariable, 2*len(rowsColValues))
sort.Sort(&sorter{rowsColValues: rowsColValues, toValues: toValues})
sort.Sort(&sorter{rowsColValues: rowsColValues, ksids: ksids, toValues: toValues})
for rowIdx := range toValues {
colIds := rowsColValues[rowIdx]
if rowIdx != 0 {
Expand Down Expand Up @@ -248,11 +250,11 @@ func (lkp *lookupInternal) Delete(vcursor VCursor, rowsColValues [][]sqltypes.Va
}

// Update implements the update functionality.
func (lkp *lookupInternal) Update(vcursor VCursor, oldValues []sqltypes.Value, ksid sqltypes.Value, newValues []sqltypes.Value) error {
if err := lkp.Delete(vcursor, [][]sqltypes.Value{oldValues}, ksid, vtgatepb.CommitOrder_NORMAL); err != nil {
func (lkp *lookupInternal) Update(vcursor VCursor, oldValues []sqltypes.Value, ksid []byte, toValue sqltypes.Value, newValues []sqltypes.Value) error {
if err := lkp.Delete(vcursor, [][]sqltypes.Value{oldValues}, toValue, vtgatepb.CommitOrder_NORMAL); err != nil {
return err
}
return lkp.Create(vcursor, [][]sqltypes.Value{newValues}, []sqltypes.Value{ksid}, false /* ignoreMode */)
return lkp.Create(vcursor, [][]sqltypes.Value{newValues}, [][]byte{ksid}, []sqltypes.Value{toValue}, false /* ignoreMode */)
}

func (lkp *lookupInternal) initDelStmt() string {
Expand Down
10 changes: 9 additions & 1 deletion go/vt/vtgate/vindexes/lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,10 +377,18 @@ func TestLookupNonUniqueCreate(t *testing.T) {

// With ignore.
vc.queries = nil
err = lookupNonUnique.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NewInt64(2)}, {sqltypes.NewInt64(1)}}, [][]byte{[]byte("test2"), []byte("test1")}, true /* ignoreMode */)
rowsColsValues := [][]sqltypes.Value{{sqltypes.NewInt64(2)}, {sqltypes.NewInt64(1)}}
ksids := [][]byte{[]byte("test2"), []byte("test1")}
err = lookupNonUnique.(Lookup).Create(vc, rowsColsValues, ksids, true /* ignoreMode */)
if err != nil {
t.Error(err)
}
if !reflect.DeepEqual(rowsColsValues, [][]sqltypes.Value{{sqltypes.NewInt64(1)}, {sqltypes.NewInt64(2)}}) {
t.Errorf("inserts not reordered. Lookup table inserts get reordered on a bulk insert to avoid locking")
}
if !reflect.DeepEqual(ksids, [][]byte{[]byte("test1"), []byte("test2")}) {
t.Errorf("keyspace ids not reordered. Keyspace ids must also get reordered on a bulk insert")
}

wantqueries[0].Sql = "insert ignore into t(fromc, toc) values(:fromc0, :toc0), (:fromc1, :toc1)"
if !reflect.DeepEqual(vc.queries, wantqueries) {
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/vindexes/lookup_unicodeloosemd5_hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (lh *LookupUnicodeLooseMD5Hash) Create(vcursor VCursor, rowsColValues [][]s
if err != nil {
return fmt.Errorf("lookup.Create.convert: %v", err)
}
return lh.lkp.Create(vcursor, rowsColValues, values, ignoreMode)
return lh.lkp.Create(vcursor, rowsColValues, ksids, values, ignoreMode)
}

// Update updates the entry in the vindex table.
Expand All @@ -185,7 +185,7 @@ func (lh *LookupUnicodeLooseMD5Hash) Update(vcursor VCursor, oldValues []sqltype
if err != nil {
return fmt.Errorf("lookup.Update.convert: %v", err)
}
return lh.lkp.Update(vcursor, oldValues, sqltypes.NewUint64(v), newValues)
return lh.lkp.Update(vcursor, oldValues, ksid, sqltypes.NewUint64(v), newValues)
}

// Delete deletes the entry from the vindex table.
Expand Down Expand Up @@ -333,7 +333,7 @@ func (lhu *LookupUnicodeLooseMD5HashUnique) Create(vcursor VCursor, rowsColValue
if err != nil {
return fmt.Errorf("lookup.Create.convert: %v", err)
}
return lhu.lkp.Create(vcursor, rowsColValues, values, ignoreMode)
return lhu.lkp.Create(vcursor, rowsColValues, ksids, values, ignoreMode)
}

// Delete deletes the entry from the vindex table.
Expand Down Expand Up @@ -363,7 +363,7 @@ func (lhu *LookupUnicodeLooseMD5HashUnique) Update(vcursor VCursor, oldValues []
if err != nil {
return fmt.Errorf("lookup.Update.convert: %v", err)
}
return lhu.lkp.Update(vcursor, oldValues, sqltypes.NewUint64(v), newValues)
return lhu.lkp.Update(vcursor, oldValues, ksid, sqltypes.NewUint64(v), newValues)
}

// MarshalJSON returns a JSON representation of LookupHashUnique.
Expand Down