From 8f522d1e2a1949feb89880969cf6aeec54770076 Mon Sep 17 00:00:00 2001 From: Michael Pawliszyn Date: Mon, 10 Jun 2019 11:36:58 -0400 Subject: [PATCH] Reorder keyspace ids when reordering bulk lookup table inserts. Otherwise bulk `insert ignore` queries will fail as the validate assumes the values and ids are still in the same order. Signed-off-by: Michael Pawliszyn --- go/vt/vtgate/vindexes/consistent_lookup.go | 2 +- go/vt/vtgate/vindexes/lookup.go | 8 ++++---- go/vt/vtgate/vindexes/lookup_hash.go | 8 ++++---- go/vt/vtgate/vindexes/lookup_internal.go | 18 ++++++++++-------- go/vt/vtgate/vindexes/lookup_test.go | 10 +++++++++- .../vindexes/lookup_unicodeloosemd5_hash.go | 8 ++++---- 6 files changed, 32 insertions(+), 22 deletions(-) diff --git a/go/vt/vtgate/vindexes/consistent_lookup.go b/go/vt/vtgate/vindexes/consistent_lookup.go index f040947a7f4..413d71604d9 100644 --- a/go/vt/vtgate/vindexes/consistent_lookup.go +++ b/go/vt/vtgate/vindexes/consistent_lookup.go @@ -210,7 +210,7 @@ func (lu *clCommon) Verify(vcursor VCursor, ids []sqltypes.Value, ksids [][]byte // Create reserves the id by inserting it into the vindex table. func (lu *clCommon) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, ignoreMode bool) error { - err := lu.lkp.createCustom(vcursor, rowsColValues, ksidsToValues(ksids), ignoreMode, vtgatepb.CommitOrder_PRE) + err := lu.lkp.createCustom(vcursor, rowsColValues, ksids, ksidsToValues(ksids), ignoreMode, vtgatepb.CommitOrder_PRE) if err == nil { return nil } diff --git a/go/vt/vtgate/vindexes/lookup.go b/go/vt/vtgate/vindexes/lookup.go index 875dc057063..a0c6d938639 100644 --- a/go/vt/vtgate/vindexes/lookup.go +++ b/go/vt/vtgate/vindexes/lookup.go @@ -108,7 +108,7 @@ func (ln *LookupNonUnique) Verify(vcursor VCursor, ids []sqltypes.Value, ksids [ // Create reserves the id by inserting it into the vindex table. func (ln *LookupNonUnique) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, ignoreMode bool) error { - return ln.lkp.Create(vcursor, rowsColValues, ksidsToValues(ksids), ignoreMode) + return ln.lkp.Create(vcursor, rowsColValues, ksids, ksidsToValues(ksids), ignoreMode) } // Delete deletes the entry from the vindex table. @@ -118,7 +118,7 @@ func (ln *LookupNonUnique) Delete(vcursor VCursor, rowsColValues [][]sqltypes.Va // Update updates the entry in the vindex table. func (ln *LookupNonUnique) Update(vcursor VCursor, oldValues []sqltypes.Value, ksid []byte, newValues []sqltypes.Value) error { - return ln.lkp.Update(vcursor, oldValues, sqltypes.MakeTrusted(sqltypes.VarBinary, ksid), newValues) + return ln.lkp.Update(vcursor, oldValues, ksid, sqltypes.MakeTrusted(sqltypes.VarBinary, ksid), newValues) } // MarshalJSON returns a JSON representation of LookupHash. @@ -261,12 +261,12 @@ func (lu *LookupUnique) Verify(vcursor VCursor, ids []sqltypes.Value, ksids [][] // Create reserves the id by inserting it into the vindex table. func (lu *LookupUnique) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, ignoreMode bool) error { - return lu.lkp.Create(vcursor, rowsColValues, ksidsToValues(ksids), ignoreMode) + return lu.lkp.Create(vcursor, rowsColValues, ksids, ksidsToValues(ksids), ignoreMode) } // Update updates the entry in the vindex table. func (lu *LookupUnique) Update(vcursor VCursor, oldValues []sqltypes.Value, ksid []byte, newValues []sqltypes.Value) error { - return lu.lkp.Update(vcursor, oldValues, sqltypes.MakeTrusted(sqltypes.VarBinary, ksid), newValues) + return lu.lkp.Update(vcursor, oldValues, ksid, sqltypes.MakeTrusted(sqltypes.VarBinary, ksid), newValues) } // Delete deletes the entry from the vindex table. diff --git a/go/vt/vtgate/vindexes/lookup_hash.go b/go/vt/vtgate/vindexes/lookup_hash.go index 644149079c3..47eea8fbfe8 100644 --- a/go/vt/vtgate/vindexes/lookup_hash.go +++ b/go/vt/vtgate/vindexes/lookup_hash.go @@ -155,7 +155,7 @@ func (lh *LookupHash) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, if err != nil { return fmt.Errorf("lookup.Create.vunhash: %v", err) } - return lh.lkp.Create(vcursor, rowsColValues, values, ignoreMode) + return lh.lkp.Create(vcursor, rowsColValues, ksids, values, ignoreMode) } // Update updates the entry in the vindex table. @@ -164,7 +164,7 @@ func (lh *LookupHash) Update(vcursor VCursor, oldValues []sqltypes.Value, ksid [ if err != nil { return fmt.Errorf("lookup.Update.vunhash: %v", err) } - return lh.lkp.Update(vcursor, oldValues, sqltypes.NewUint64(v), newValues) + return lh.lkp.Update(vcursor, oldValues, ksid, sqltypes.NewUint64(v), newValues) } // Delete deletes the entry from the vindex table. @@ -309,7 +309,7 @@ func (lhu *LookupHashUnique) Create(vcursor VCursor, rowsColValues [][]sqltypes. if err != nil { return fmt.Errorf("lookup.Create.vunhash: %v", err) } - return lhu.lkp.Create(vcursor, rowsColValues, values, ignoreMode) + return lhu.lkp.Create(vcursor, rowsColValues, ksids, values, ignoreMode) } // Delete deletes the entry from the vindex table. @@ -327,7 +327,7 @@ func (lhu *LookupHashUnique) Update(vcursor VCursor, oldValues []sqltypes.Value, if err != nil { return fmt.Errorf("lookup.Update.vunhash: %v", err) } - return lhu.lkp.Update(vcursor, oldValues, sqltypes.NewUint64(v), newValues) + return lhu.lkp.Update(vcursor, oldValues, ksid, sqltypes.NewUint64(v), newValues) } // MarshalJSON returns a JSON representation of LookupHashUnique. diff --git a/go/vt/vtgate/vindexes/lookup_internal.go b/go/vt/vtgate/vindexes/lookup_internal.go index d3e80c6d4d9..e93dc6454b0 100644 --- a/go/vt/vtgate/vindexes/lookup_internal.go +++ b/go/vt/vtgate/vindexes/lookup_internal.go @@ -107,6 +107,7 @@ func (lkp *lookupInternal) Verify(vcursor VCursor, ids, values []sqltypes.Value) type sorter struct { rowsColValues [][]sqltypes.Value + ksids [][]byte toValues []sqltypes.Value } @@ -132,6 +133,7 @@ func (v *sorter) Less(i, j int) bool { func (v *sorter) Swap(i, j int) { v.toValues[i], v.toValues[j] = v.toValues[j], v.toValues[i] + v.ksids[i], v.ksids[j] = v.ksids[j], v.ksids[i] v.rowsColValues[i], v.rowsColValues[j] = v.rowsColValues[j], v.rowsColValues[i] } @@ -145,14 +147,14 @@ func (v *sorter) Swap(i, j int) { // If we assume that the primary vindex is on column_c. The call to create will look like this: // Create(vcursor, [[value_a0, value_b0,], [value_a1, value_b1]], [binary(value_c0), binary(value_c1)]) // Notice that toValues contains the computed binary value of the keyspace_id. -func (lkp *lookupInternal) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, toValues []sqltypes.Value, ignoreMode bool) error { +func (lkp *lookupInternal) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, toValues []sqltypes.Value, ignoreMode bool) error { if lkp.Autocommit { - return lkp.createCustom(vcursor, rowsColValues, toValues, ignoreMode, vtgatepb.CommitOrder_AUTOCOMMIT) + return lkp.createCustom(vcursor, rowsColValues, ksids, toValues, ignoreMode, vtgatepb.CommitOrder_AUTOCOMMIT) } - return lkp.createCustom(vcursor, rowsColValues, toValues, ignoreMode, vtgatepb.CommitOrder_NORMAL) + return lkp.createCustom(vcursor, rowsColValues, ksids, toValues, ignoreMode, vtgatepb.CommitOrder_NORMAL) } -func (lkp *lookupInternal) createCustom(vcursor VCursor, rowsColValues [][]sqltypes.Value, toValues []sqltypes.Value, ignoreMode bool, co vtgatepb.CommitOrder) error { +func (lkp *lookupInternal) createCustom(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, toValues []sqltypes.Value, ignoreMode bool, co vtgatepb.CommitOrder) error { if len(rowsColValues) == 0 { // This code is unreachable. It's just a failsafe. return nil @@ -174,7 +176,7 @@ func (lkp *lookupInternal) createCustom(vcursor VCursor, rowsColValues [][]sqlty fmt.Fprintf(buf, "%s) values(", lkp.To) bindVars := make(map[string]*querypb.BindVariable, 2*len(rowsColValues)) - sort.Sort(&sorter{rowsColValues: rowsColValues, toValues: toValues}) + sort.Sort(&sorter{rowsColValues: rowsColValues, ksids: ksids, toValues: toValues}) for rowIdx := range toValues { colIds := rowsColValues[rowIdx] if rowIdx != 0 { @@ -248,11 +250,11 @@ func (lkp *lookupInternal) Delete(vcursor VCursor, rowsColValues [][]sqltypes.Va } // Update implements the update functionality. -func (lkp *lookupInternal) Update(vcursor VCursor, oldValues []sqltypes.Value, ksid sqltypes.Value, newValues []sqltypes.Value) error { - if err := lkp.Delete(vcursor, [][]sqltypes.Value{oldValues}, ksid, vtgatepb.CommitOrder_NORMAL); err != nil { +func (lkp *lookupInternal) Update(vcursor VCursor, oldValues []sqltypes.Value, ksid []byte, toValue sqltypes.Value, newValues []sqltypes.Value) error { + if err := lkp.Delete(vcursor, [][]sqltypes.Value{oldValues}, toValue, vtgatepb.CommitOrder_NORMAL); err != nil { return err } - return lkp.Create(vcursor, [][]sqltypes.Value{newValues}, []sqltypes.Value{ksid}, false /* ignoreMode */) + return lkp.Create(vcursor, [][]sqltypes.Value{newValues}, [][]byte{ksid}, []sqltypes.Value{toValue}, false /* ignoreMode */) } func (lkp *lookupInternal) initDelStmt() string { diff --git a/go/vt/vtgate/vindexes/lookup_test.go b/go/vt/vtgate/vindexes/lookup_test.go index e1a58e45bf1..a18104327b5 100644 --- a/go/vt/vtgate/vindexes/lookup_test.go +++ b/go/vt/vtgate/vindexes/lookup_test.go @@ -377,10 +377,18 @@ func TestLookupNonUniqueCreate(t *testing.T) { // With ignore. vc.queries = nil - err = lookupNonUnique.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NewInt64(2)}, {sqltypes.NewInt64(1)}}, [][]byte{[]byte("test2"), []byte("test1")}, true /* ignoreMode */) + rowsColsValues := [][]sqltypes.Value{{sqltypes.NewInt64(2)}, {sqltypes.NewInt64(1)}} + ksids := [][]byte{[]byte("test2"), []byte("test1")} + err = lookupNonUnique.(Lookup).Create(vc, rowsColsValues, ksids, true /* ignoreMode */) if err != nil { t.Error(err) } + if !reflect.DeepEqual(rowsColsValues, [][]sqltypes.Value{{sqltypes.NewInt64(1)}, {sqltypes.NewInt64(2)}}) { + t.Errorf("inserts not reordered. Lookup table inserts get reordered on a bulk insert to avoid locking") + } + if !reflect.DeepEqual(ksids, [][]byte{[]byte("test1"), []byte("test2")}) { + t.Errorf("keyspace ids not reordered. Keyspace ids must also get reordered on a bulk insert") + } wantqueries[0].Sql = "insert ignore into t(fromc, toc) values(:fromc0, :toc0), (:fromc1, :toc1)" if !reflect.DeepEqual(vc.queries, wantqueries) { diff --git a/go/vt/vtgate/vindexes/lookup_unicodeloosemd5_hash.go b/go/vt/vtgate/vindexes/lookup_unicodeloosemd5_hash.go index 5f617364732..d6a22d83e92 100644 --- a/go/vt/vtgate/vindexes/lookup_unicodeloosemd5_hash.go +++ b/go/vt/vtgate/vindexes/lookup_unicodeloosemd5_hash.go @@ -168,7 +168,7 @@ func (lh *LookupUnicodeLooseMD5Hash) Create(vcursor VCursor, rowsColValues [][]s if err != nil { return fmt.Errorf("lookup.Create.convert: %v", err) } - return lh.lkp.Create(vcursor, rowsColValues, values, ignoreMode) + return lh.lkp.Create(vcursor, rowsColValues, ksids, values, ignoreMode) } // Update updates the entry in the vindex table. @@ -185,7 +185,7 @@ func (lh *LookupUnicodeLooseMD5Hash) Update(vcursor VCursor, oldValues []sqltype if err != nil { return fmt.Errorf("lookup.Update.convert: %v", err) } - return lh.lkp.Update(vcursor, oldValues, sqltypes.NewUint64(v), newValues) + return lh.lkp.Update(vcursor, oldValues, ksid, sqltypes.NewUint64(v), newValues) } // Delete deletes the entry from the vindex table. @@ -333,7 +333,7 @@ func (lhu *LookupUnicodeLooseMD5HashUnique) Create(vcursor VCursor, rowsColValue if err != nil { return fmt.Errorf("lookup.Create.convert: %v", err) } - return lhu.lkp.Create(vcursor, rowsColValues, values, ignoreMode) + return lhu.lkp.Create(vcursor, rowsColValues, ksids, values, ignoreMode) } // Delete deletes the entry from the vindex table. @@ -363,7 +363,7 @@ func (lhu *LookupUnicodeLooseMD5HashUnique) Update(vcursor VCursor, oldValues [] if err != nil { return fmt.Errorf("lookup.Update.convert: %v", err) } - return lhu.lkp.Update(vcursor, oldValues, sqltypes.NewUint64(v), newValues) + return lhu.lkp.Update(vcursor, oldValues, ksid, sqltypes.NewUint64(v), newValues) } // MarshalJSON returns a JSON representation of LookupHashUnique.