From 3daf82c694890693258b68f2444445abf07aa01f Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Wed, 26 Jun 2019 21:24:14 -0700 Subject: [PATCH] vindexes: alt approach for lookup sorted inserts Looks like too many callers are affected if the passed in values are sorted by the `Create` function. Instead of changing all the call sites, it's better that `Create` sorts a copy of the values and leave the external behavior unchanged. Signed-off-by: Sugu Sougoumarane --- go/vt/vtgate/endtoend/lookup_test.go | 45 +++++++++++++++++++ go/vt/vtgate/endtoend/main_test.go | 39 ++++++++++++++++ go/vt/vtgate/vindexes/consistent_lookup.go | 2 +- go/vt/vtgate/vindexes/lookup.go | 4 +- go/vt/vtgate/vindexes/lookup_hash.go | 4 +- go/vt/vtgate/vindexes/lookup_internal.go | 17 +++---- go/vt/vtgate/vindexes/lookup_test.go | 10 +---- .../vindexes/lookup_unicodeloosemd5_hash.go | 4 +- 8 files changed, 101 insertions(+), 24 deletions(-) diff --git a/go/vt/vtgate/endtoend/lookup_test.go b/go/vt/vtgate/endtoend/lookup_test.go index 0491e5f1dc2..46d8e189aae 100644 --- a/go/vt/vtgate/endtoend/lookup_test.go +++ b/go/vt/vtgate/endtoend/lookup_test.go @@ -213,6 +213,51 @@ func TestConsistentLookupMultiInsert(t *testing.T) { if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(5)]]"; got != want { t.Errorf("select:\n%v want\n%v", got, want) } + exec(t, conn, "delete from t1 where id1=1") + exec(t, conn, "delete from t1 where id1=2") + exec(t, conn, "delete from t1 where id1=3") + exec(t, conn, "delete from t1 where id1=4") + exec(t, conn, "delete from t1_id2_idx where id2=4") +} + +func TestHashLookupMultiInsertIgnore(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + // conn2 is for queries that target shards. + conn2, err := mysql.Connect(ctx, &vtParams) + if err != nil { + t.Fatal(err) + } + defer conn2.Close() + + // DB should start out clean + qr := exec(t, conn, "select count(*) from t2_id4_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(0)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + qr = exec(t, conn, "select count(*) from t2") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(0)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + // Try inserting a bunch of ids at once + exec(t, conn, "begin") + exec(t, conn, "insert ignore into t2(id3, id4) values(50,60), (30,40), (10,20)") + exec(t, conn, "commit") + + // Verify + qr = exec(t, conn, "select id3, id4 from t2 order by id3") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(10) INT64(20)] [INT64(30) INT64(40)] [INT64(50) INT64(60)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + qr = exec(t, conn, "select id3, id4 from t2_id4_idx order by id3") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(10) INT64(20)] [INT64(30) INT64(40)] [INT64(50) INT64(60)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } } func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { diff --git a/go/vt/vtgate/endtoend/main_test.go b/go/vt/vtgate/endtoend/main_test.go index d56d7789215..f2fcef7c49d 100644 --- a/go/vt/vtgate/endtoend/main_test.go +++ b/go/vt/vtgate/endtoend/main_test.go @@ -61,6 +61,20 @@ create table aggr_test( val2 bigint, primary key(id) ) Engine=InnoDB; + +create table t2( + id3 bigint, + id4 bigint, + primary key(id3) +) Engine=InnoDB; + +create table t2_id4_idx( + id bigint not null auto_increment, + id4 bigint, + id3 bigint, + primary key(id), + key idx_id4(id4) +) Engine=InnoDB; ` vschema = &vschemapb.Keyspace{ @@ -78,6 +92,16 @@ create table aggr_test( }, Owner: "t1", }, + "t2_id4_idx": { + Type: "lookup_hash", + Params: map[string]string{ + "table": "t2_id4_idx", + "from": "id4", + "to": "id3", + "autocommit": "true", + }, + Owner: "t2", + }, }, Tables: map[string]*vschemapb.Table{ "t1": { @@ -95,6 +119,21 @@ create table aggr_test( Name: "hash", }}, }, + "t2": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id3", + Name: "hash", + }, { + Column: "id4", + Name: "t2_id4_idx", + }}, + }, + "t2_id4_idx": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id4", + Name: "hash", + }}, + }, "vstream_test": { ColumnVindexes: []*vschemapb.ColumnVindex{{ Column: "id", diff --git a/go/vt/vtgate/vindexes/consistent_lookup.go b/go/vt/vtgate/vindexes/consistent_lookup.go index 413d71604d9..f040947a7f4 100644 --- a/go/vt/vtgate/vindexes/consistent_lookup.go +++ b/go/vt/vtgate/vindexes/consistent_lookup.go @@ -210,7 +210,7 @@ func (lu *clCommon) Verify(vcursor VCursor, ids []sqltypes.Value, ksids [][]byte // Create reserves the id by inserting it into the vindex table. func (lu *clCommon) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, ignoreMode bool) error { - err := lu.lkp.createCustom(vcursor, rowsColValues, ksids, ksidsToValues(ksids), ignoreMode, vtgatepb.CommitOrder_PRE) + err := lu.lkp.createCustom(vcursor, rowsColValues, ksidsToValues(ksids), ignoreMode, vtgatepb.CommitOrder_PRE) if err == nil { return nil } diff --git a/go/vt/vtgate/vindexes/lookup.go b/go/vt/vtgate/vindexes/lookup.go index a0c6d938639..fd4b816235e 100644 --- a/go/vt/vtgate/vindexes/lookup.go +++ b/go/vt/vtgate/vindexes/lookup.go @@ -108,7 +108,7 @@ func (ln *LookupNonUnique) Verify(vcursor VCursor, ids []sqltypes.Value, ksids [ // Create reserves the id by inserting it into the vindex table. func (ln *LookupNonUnique) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, ignoreMode bool) error { - return ln.lkp.Create(vcursor, rowsColValues, ksids, ksidsToValues(ksids), ignoreMode) + return ln.lkp.Create(vcursor, rowsColValues, ksidsToValues(ksids), ignoreMode) } // Delete deletes the entry from the vindex table. @@ -261,7 +261,7 @@ func (lu *LookupUnique) Verify(vcursor VCursor, ids []sqltypes.Value, ksids [][] // Create reserves the id by inserting it into the vindex table. func (lu *LookupUnique) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, ignoreMode bool) error { - return lu.lkp.Create(vcursor, rowsColValues, ksids, ksidsToValues(ksids), ignoreMode) + return lu.lkp.Create(vcursor, rowsColValues, ksidsToValues(ksids), ignoreMode) } // Update updates the entry in the vindex table. diff --git a/go/vt/vtgate/vindexes/lookup_hash.go b/go/vt/vtgate/vindexes/lookup_hash.go index 47eea8fbfe8..330ef13e408 100644 --- a/go/vt/vtgate/vindexes/lookup_hash.go +++ b/go/vt/vtgate/vindexes/lookup_hash.go @@ -155,7 +155,7 @@ func (lh *LookupHash) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, if err != nil { return fmt.Errorf("lookup.Create.vunhash: %v", err) } - return lh.lkp.Create(vcursor, rowsColValues, ksids, values, ignoreMode) + return lh.lkp.Create(vcursor, rowsColValues, values, ignoreMode) } // Update updates the entry in the vindex table. @@ -309,7 +309,7 @@ func (lhu *LookupHashUnique) Create(vcursor VCursor, rowsColValues [][]sqltypes. if err != nil { return fmt.Errorf("lookup.Create.vunhash: %v", err) } - return lhu.lkp.Create(vcursor, rowsColValues, ksids, values, ignoreMode) + return lhu.lkp.Create(vcursor, rowsColValues, values, ignoreMode) } // Delete deletes the entry from the vindex table. diff --git a/go/vt/vtgate/vindexes/lookup_internal.go b/go/vt/vtgate/vindexes/lookup_internal.go index e93dc6454b0..1eae733425e 100644 --- a/go/vt/vtgate/vindexes/lookup_internal.go +++ b/go/vt/vtgate/vindexes/lookup_internal.go @@ -107,7 +107,6 @@ func (lkp *lookupInternal) Verify(vcursor VCursor, ids, values []sqltypes.Value) type sorter struct { rowsColValues [][]sqltypes.Value - ksids [][]byte toValues []sqltypes.Value } @@ -133,7 +132,6 @@ func (v *sorter) Less(i, j int) bool { func (v *sorter) Swap(i, j int) { v.toValues[i], v.toValues[j] = v.toValues[j], v.toValues[i] - v.ksids[i], v.ksids[j] = v.ksids[j], v.ksids[i] v.rowsColValues[i], v.rowsColValues[j] = v.rowsColValues[j], v.rowsColValues[i] } @@ -147,14 +145,14 @@ func (v *sorter) Swap(i, j int) { // If we assume that the primary vindex is on column_c. The call to create will look like this: // Create(vcursor, [[value_a0, value_b0,], [value_a1, value_b1]], [binary(value_c0), binary(value_c1)]) // Notice that toValues contains the computed binary value of the keyspace_id. -func (lkp *lookupInternal) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, toValues []sqltypes.Value, ignoreMode bool) error { +func (lkp *lookupInternal) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, toValues []sqltypes.Value, ignoreMode bool) error { if lkp.Autocommit { - return lkp.createCustom(vcursor, rowsColValues, ksids, toValues, ignoreMode, vtgatepb.CommitOrder_AUTOCOMMIT) + return lkp.createCustom(vcursor, rowsColValues, toValues, ignoreMode, vtgatepb.CommitOrder_AUTOCOMMIT) } - return lkp.createCustom(vcursor, rowsColValues, ksids, toValues, ignoreMode, vtgatepb.CommitOrder_NORMAL) + return lkp.createCustom(vcursor, rowsColValues, toValues, ignoreMode, vtgatepb.CommitOrder_NORMAL) } -func (lkp *lookupInternal) createCustom(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, toValues []sqltypes.Value, ignoreMode bool, co vtgatepb.CommitOrder) error { +func (lkp *lookupInternal) createCustom(vcursor VCursor, rowsColValues [][]sqltypes.Value, toValues []sqltypes.Value, ignoreMode bool, co vtgatepb.CommitOrder) error { if len(rowsColValues) == 0 { // This code is unreachable. It's just a failsafe. return nil @@ -176,7 +174,10 @@ func (lkp *lookupInternal) createCustom(vcursor VCursor, rowsColValues [][]sqlty fmt.Fprintf(buf, "%s) values(", lkp.To) bindVars := make(map[string]*querypb.BindVariable, 2*len(rowsColValues)) - sort.Sort(&sorter{rowsColValues: rowsColValues, ksids: ksids, toValues: toValues}) + // Make a copy before sorting. + rowsColValues = append([][]sqltypes.Value(nil), rowsColValues...) + toValues = append([]sqltypes.Value(nil), toValues...) + sort.Sort(&sorter{rowsColValues: rowsColValues, toValues: toValues}) for rowIdx := range toValues { colIds := rowsColValues[rowIdx] if rowIdx != 0 { @@ -254,7 +255,7 @@ func (lkp *lookupInternal) Update(vcursor VCursor, oldValues []sqltypes.Value, k if err := lkp.Delete(vcursor, [][]sqltypes.Value{oldValues}, toValue, vtgatepb.CommitOrder_NORMAL); err != nil { return err } - return lkp.Create(vcursor, [][]sqltypes.Value{newValues}, [][]byte{ksid}, []sqltypes.Value{toValue}, false /* ignoreMode */) + return lkp.Create(vcursor, [][]sqltypes.Value{newValues}, []sqltypes.Value{toValue}, false /* ignoreMode */) } func (lkp *lookupInternal) initDelStmt() string { diff --git a/go/vt/vtgate/vindexes/lookup_test.go b/go/vt/vtgate/vindexes/lookup_test.go index a18104327b5..e1a58e45bf1 100644 --- a/go/vt/vtgate/vindexes/lookup_test.go +++ b/go/vt/vtgate/vindexes/lookup_test.go @@ -377,18 +377,10 @@ func TestLookupNonUniqueCreate(t *testing.T) { // With ignore. vc.queries = nil - rowsColsValues := [][]sqltypes.Value{{sqltypes.NewInt64(2)}, {sqltypes.NewInt64(1)}} - ksids := [][]byte{[]byte("test2"), []byte("test1")} - err = lookupNonUnique.(Lookup).Create(vc, rowsColsValues, ksids, true /* ignoreMode */) + err = lookupNonUnique.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NewInt64(2)}, {sqltypes.NewInt64(1)}}, [][]byte{[]byte("test2"), []byte("test1")}, true /* ignoreMode */) if err != nil { t.Error(err) } - if !reflect.DeepEqual(rowsColsValues, [][]sqltypes.Value{{sqltypes.NewInt64(1)}, {sqltypes.NewInt64(2)}}) { - t.Errorf("inserts not reordered. Lookup table inserts get reordered on a bulk insert to avoid locking") - } - if !reflect.DeepEqual(ksids, [][]byte{[]byte("test1"), []byte("test2")}) { - t.Errorf("keyspace ids not reordered. Keyspace ids must also get reordered on a bulk insert") - } wantqueries[0].Sql = "insert ignore into t(fromc, toc) values(:fromc0, :toc0), (:fromc1, :toc1)" if !reflect.DeepEqual(vc.queries, wantqueries) { diff --git a/go/vt/vtgate/vindexes/lookup_unicodeloosemd5_hash.go b/go/vt/vtgate/vindexes/lookup_unicodeloosemd5_hash.go index d6a22d83e92..ebf52a6caf8 100644 --- a/go/vt/vtgate/vindexes/lookup_unicodeloosemd5_hash.go +++ b/go/vt/vtgate/vindexes/lookup_unicodeloosemd5_hash.go @@ -168,7 +168,7 @@ func (lh *LookupUnicodeLooseMD5Hash) Create(vcursor VCursor, rowsColValues [][]s if err != nil { return fmt.Errorf("lookup.Create.convert: %v", err) } - return lh.lkp.Create(vcursor, rowsColValues, ksids, values, ignoreMode) + return lh.lkp.Create(vcursor, rowsColValues, values, ignoreMode) } // Update updates the entry in the vindex table. @@ -333,7 +333,7 @@ func (lhu *LookupUnicodeLooseMD5HashUnique) Create(vcursor VCursor, rowsColValue if err != nil { return fmt.Errorf("lookup.Create.convert: %v", err) } - return lhu.lkp.Create(vcursor, rowsColValues, ksids, values, ignoreMode) + return lhu.lkp.Create(vcursor, rowsColValues, values, ignoreMode) } // Delete deletes the entry from the vindex table.