Skip to content

Commit

Permalink
consistent lookup: handle insert ignore
Browse files Browse the repository at this point in the history
Fixes #4974

Insert ignore wasn't working correctly because the verify was using
the normal transaction instead of the 'pre' transaction where the
rows were actually created.

This implementation changes the consistent lookup's Verify function
to also use the 'pre' transaction always.

There is another code path that gets used if the vindex is unowned.
That will also cause a 'pre' transaction to be created. Without this
change, the transaction would have been a normal one. This mildly
affects the commit order, but there should be no material difference.

Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
  • Loading branch information
sougou committed Jul 3, 2019
1 parent c341ec1 commit cf7fe76
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 11 deletions.
42 changes: 41 additions & 1 deletion go/vt/vtgate/endtoend/lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func TestConsistentLookupMultiInsert(t *testing.T) {
exec(t, conn, "delete from t1_id2_idx where id2=4")
}

func TestHashLookupMultiInsertIgnore(t *testing.T) {
func TestLookupMultiInsertIgnore(t *testing.T) {
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
if err != nil {
Expand Down Expand Up @@ -260,6 +260,46 @@ func TestHashLookupMultiInsertIgnore(t *testing.T) {
}
}

func TestConsistentLookupMultiInsertIgnore(t *testing.T) {
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
if err != nil {
t.Fatal(err)
}
defer conn.Close()
// conn2 is for queries that target shards.
conn2, err := mysql.Connect(ctx, &vtParams)
if err != nil {
t.Fatal(err)
}
defer conn2.Close()

// DB should start out clean
qr := exec(t, conn, "select count(*) from t1_id2_idx")
if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(0)]]"; got != want {
t.Errorf("select:\n%v want\n%v", got, want)
}
qr = exec(t, conn, "select count(*) from t1")
if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(0)]]"; got != want {
t.Errorf("select:\n%v want\n%v", got, want)
}

// Try inserting a bunch of ids at once
exec(t, conn, "begin")
exec(t, conn, "insert ignore into t1(id1, id2) values(50,60), (30,40), (10,20)")
exec(t, conn, "commit")

// Verify
qr = exec(t, conn, "select id1, id2 from t1 order by id1")
if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(10) INT64(20)] [INT64(30) INT64(40)] [INT64(50) INT64(60)]]"; got != want {
t.Errorf("select:\n%v want\n%v", got, want)
}
qr = exec(t, conn, "select id2 from t1_id2_idx order by id2")
if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(20)] [INT64(40)] [INT64(60)]]"; got != want {
t.Errorf("select:\n%v want\n%v", got, want)
}
}

func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
t.Helper()
qr, err := conn.ExecuteFetch(query, 1000, true)
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtgate/vindexes/consistent_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/vtgate"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -205,7 +206,7 @@ func (lu *clCommon) IsFunctional() bool {

// Verify returns true if ids maps to ksids.
func (lu *clCommon) Verify(vcursor VCursor, ids []sqltypes.Value, ksids [][]byte) ([]bool, error) {
return lu.lkp.Verify(vcursor, ids, ksidsToValues(ksids))
return lu.lkp.VerifyCustom(vcursor, ids, ksidsToValues(ksids), vtgate.CommitOrder_PRE)
}

// Create reserves the id by inserting it into the vindex table.
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/vindexes/consistent_lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ func TestConsistentLookupVerify(t *testing.T) {
t.Error(err)
}
vc.verifyLog(t, []string{
"Execute select fromc1 from t where fromc1 = :fromc1 and toc = :toc [{fromc1 1} {toc test1}] false",
"Execute select fromc1 from t where fromc1 = :fromc1 and toc = :toc [{fromc1 2} {toc test2}] false",
"ExecutePre select fromc1 from t where fromc1 = :fromc1 and toc = :toc [{fromc1 1} {toc test1}] false",
"ExecutePre select fromc1 from t where fromc1 = :fromc1 and toc = :toc [{fromc1 2} {toc test2}] false",
})

// Test query fail.
Expand Down
16 changes: 9 additions & 7 deletions go/vt/vtgate/vindexes/lookup_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,21 @@ func (lkp *lookupInternal) Lookup(vcursor VCursor, ids []sqltypes.Value) ([]*sql

// Verify returns true if ids map to values.
func (lkp *lookupInternal) Verify(vcursor VCursor, ids, values []sqltypes.Value) ([]bool, error) {
co := vtgatepb.CommitOrder_NORMAL
if lkp.Autocommit {
co = vtgatepb.CommitOrder_AUTOCOMMIT
}
return lkp.VerifyCustom(vcursor, ids, values, co)
}

func (lkp *lookupInternal) VerifyCustom(vcursor VCursor, ids, values []sqltypes.Value, co vtgatepb.CommitOrder) ([]bool, error) {
out := make([]bool, len(ids))
for i, id := range ids {
bindVars := map[string]*querypb.BindVariable{
lkp.FromColumns[0]: sqltypes.ValueBindVariable(id),
lkp.To: sqltypes.ValueBindVariable(values[i]),
}
var err error
var result *sqltypes.Result
co := vtgatepb.CommitOrder_NORMAL
if lkp.Autocommit {
co = vtgatepb.CommitOrder_AUTOCOMMIT
}
result, err = vcursor.Execute("VindexVerify", lkp.ver, bindVars, false /* isDML */, co)
result, err := vcursor.Execute("VindexVerify", lkp.ver, bindVars, false /* isDML */, co)
if err != nil {
return nil, fmt.Errorf("lookup.Verify: %v", err)
}
Expand Down

0 comments on commit cf7fe76

Please sign in to comment.