Skip to content

Commit

Permalink
Merge pull request #6145 from planetscale/fix-consistentlookup-ownedt…
Browse files Browse the repository at this point in the history
…able

Fix owned table name formatting and duplicate error code in consistent lookup
  • Loading branch information
harshit-gangal authored May 4, 2020
2 parents 3079017 + f8ea17d commit ab29699
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 42 deletions.
12 changes: 7 additions & 5 deletions go/test/endtoend/vtgate/lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package vtgate
import (
"context"
"fmt"
"strings"
"testing"

"github.com/stretchr/testify/assert"

"vitess.io/vitess/go/test/utils"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -59,10 +60,11 @@ func TestConsistentLookup(t *testing.T) {
exec(t, conn, "begin")
_, err = conn.ExecuteFetch("insert into t1(id1, id2) values(1, 4)", 1000, false)
exec(t, conn, "rollback")
want := "duplicate entry"
if err == nil || !strings.Contains(err.Error(), want) {
t.Errorf("second insert: %v, must contain %s", err, want)
}
require.Error(t, err)
mysqlErr := err.(*mysql.SQLError)
assert.Equal(t, 1062, mysqlErr.Num)
assert.Equal(t, "23000", mysqlErr.State)
assert.Contains(t, mysqlErr.Message, "Duplicate entry")

// Simple delete.
exec(t, conn, "begin")
Expand Down
132 changes: 119 additions & 13 deletions go/test/endtoend/vtgate/sequence/seq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"strings"
"testing"

"github.com/stretchr/testify/assert"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
Expand All @@ -32,11 +34,11 @@ import (
)

var (
clusterInstance *cluster.LocalProcessCluster
keyspaceName = "ks"
cell = "zone1"
hostname = "localhost"
sqlSchema = `
clusterInstance *cluster.LocalProcessCluster
cell = "zone1"
hostname = "localhost"
unshardedKs = "uks"
unshardedSQLSchema = `
create table sequence_test(
id bigint,
val varchar(16),
Expand All @@ -49,9 +51,14 @@ var (
cache bigint default null,
primary key(id)
) comment 'vitess_sequence' Engine=InnoDB;
CREATE TABLE id_seq ( id INT, next_id BIGINT, cache BIGINT, PRIMARY KEY(id)) comment 'vitess_sequence';
INSERT INTO id_seq (id, next_id, cache) values (0, 1, 1000);
`

vSchema = `
unshardedVSchema = `
{
"sharded":false,
"vindexes": {
Expand All @@ -74,10 +81,77 @@ var (
},
"sequence_test_seq": {
"type": "sequence"
}
},
"id_seq": {
"type": "sequence"
}
}
}
`
`

shardedKeyspaceName = `sks`

shardedSQLSchema = `
CREATE TABLE ` + "`dotted.tablename`" + ` (
id BIGINT NOT NULL,
c1 DOUBLE NOT NULL,
c2 BIGINT,
PRIMARY KEY (id),
UNIQUE KEY (c1, c2)
);
CREATE TABLE lookup_vindex (
c1 DOUBLE NOT NULL,
c2 BIGINT,
keyspace_id BLOB,
UNIQUE KEY (c1, c2)
);
`

shardedVSchema = `
{
"sharded": true,
"vindexes": {
"lookup_vindex": {
"type": "consistent_lookup",
"params": {
"from": "c1,c2",
"table": "lookup_vindex",
"to": "keyspace_id"
},
"owner": "dotted.tablename"
},
"hash": {
"type": "hash"
}
},
"tables": {
"dotted.tablename": {
"columnVindexes": [
{
"column": "id",
"name": "hash"
},
{
"name": "lookup_vindex",
"columns": [ "c1", "c2" ]
}
],
"autoIncrement": {
"column": "id",
"sequence": "id_seq"
}
},
"lookup_vindex": {
"columnVindexes": [
{
"column": "c1",
"name": "hash"
}
]
}
}
}`
)

func TestMain(m *testing.M) {
Expand All @@ -94,12 +168,21 @@ func TestMain(m *testing.M) {
}

// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: sqlSchema,
VSchema: vSchema,
uKeyspace := &cluster.Keyspace{
Name: unshardedKs,
SchemaSQL: unshardedSQLSchema,
VSchema: unshardedVSchema,
}
if err := clusterInstance.StartUnshardedKeyspace(*uKeyspace, 1, false); err != nil {
return 1
}

sKeyspace := &cluster.Keyspace{
Name: shardedKeyspaceName,
SchemaSQL: shardedSQLSchema,
VSchema: shardedVSchema,
}
if err := clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false); err != nil {
if err := clusterInstance.StartKeyspace(*sKeyspace, []string{"-80", "80-"}, 1, false); err != nil {
return 1
}

Expand Down Expand Up @@ -172,3 +255,26 @@ func TestSeq(t *testing.T) {
}

}

func TestDotTableSeq(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
vtParams := mysql.ConnParams{
Host: "localhost",
Port: clusterInstance.VtgateMySQLPort,
DbName: shardedKeyspaceName,
}
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

_, err = conn.ExecuteFetch("insert into `dotted.tablename` (c1,c2) values (10,10)", 1000, true)
require.NoError(t, err)

_, err = conn.ExecuteFetch("insert into `dotted.tablename` (c1,c2) values (10,10)", 1000, true)
require.Error(t, err)
mysqlErr := err.(*mysql.SQLError)
assert.Equal(t, 1062, mysqlErr.Num)
assert.Equal(t, "23000", mysqlErr.State)
assert.Contains(t, mysqlErr.Message, "Duplicate entry")
}
13 changes: 8 additions & 5 deletions go/vt/vtgate/endtoend/lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ package endtoend
import (
"context"
"fmt"
"strings"
"testing"

"github.com/stretchr/testify/assert"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
)
Expand Down Expand Up @@ -57,10 +60,10 @@ func TestConsistentLookup(t *testing.T) {
exec(t, conn, "begin")
_, err = conn.ExecuteFetch("insert into t1(id1, id2) values(1, 4)", 1000, false)
exec(t, conn, "rollback")
want := "duplicate entry"
if err == nil || !strings.Contains(err.Error(), want) {
t.Errorf("second insert: %v, must contain %s", err, want)
}
require.Error(t, err)
mysqlErr := err.(*mysql.SQLError)
assert.Equal(t, 1062, mysqlErr.Num)
assert.Equal(t, "23000", mysqlErr.State)

// Simple delete.
exec(t, conn, "begin")
Expand Down
18 changes: 8 additions & 10 deletions go/vt/vtgate/vindexes/consistent_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtgate"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
)

var (
Expand Down Expand Up @@ -211,7 +209,7 @@ func newCLCommon(name string, m map[string]string) (*clCommon, error) {

func (lu *clCommon) SetOwnerInfo(keyspace, table string, cols []sqlparser.ColIdent) error {
lu.keyspace = keyspace
lu.ownerTable = table
lu.ownerTable = sqlparser.String(sqlparser.NewTableIdent(table))
if len(cols) != len(lu.lkp.FromColumns) {
return fmt.Errorf("owner table column count does not match vindex %s", lu.name)
}
Expand Down Expand Up @@ -245,22 +243,22 @@ func (lu *clCommon) Verify(vcursor VCursor, ids []sqltypes.Value, ksids [][]byte

// Create reserves the id by inserting it into the vindex table.
func (lu *clCommon) Create(vcursor VCursor, rowsColValues [][]sqltypes.Value, ksids [][]byte, ignoreMode bool) error {
err := lu.lkp.createCustom(vcursor, rowsColValues, ksidsToValues(ksids), ignoreMode, vtgatepb.CommitOrder_PRE)
if err == nil {
origErr := lu.lkp.createCustom(vcursor, rowsColValues, ksidsToValues(ksids), ignoreMode, vtgatepb.CommitOrder_PRE)
if origErr == nil {
return nil
}
if !strings.Contains(err.Error(), "Duplicate entry") {
return err
if !strings.Contains(origErr.Error(), "Duplicate entry") {
return origErr
}
for i, row := range rowsColValues {
if err := lu.handleDup(vcursor, row, ksids[i]); err != nil {
if err := lu.handleDup(vcursor, row, ksids[i], origErr); err != nil {
return err
}
}
return nil
}

func (lu *clCommon) handleDup(vcursor VCursor, values []sqltypes.Value, ksid []byte) error {
func (lu *clCommon) handleDup(vcursor VCursor, values []sqltypes.Value, ksid []byte, dupError error) error {
bindVars := make(map[string]*querypb.BindVariable, len(values))
for colnum, val := range values {
bindVars[lu.lkp.FromColumns[colnum]] = sqltypes.ValueBindVariable(val)
Expand All @@ -285,7 +283,7 @@ func (lu *clCommon) handleDup(vcursor VCursor, values []sqltypes.Value, ksid []b
return err
}
if len(qr.Rows) >= 1 {
return vterrors.Errorf(vtrpcpb.Code_ALREADY_EXISTS, "duplicate entry %v", values)
return dupError
}
if bytes.Equal(existingksid, ksid) {
return nil
Expand Down
16 changes: 7 additions & 9 deletions go/vt/vtgate/vindexes/consistent_lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func TestConsistentLookupCreateThenUpdate(t *testing.T) {
vc.verifyLog(t, []string{
"ExecutePre insert into t(fromc1, fromc2, toc) values(:fromc10, :fromc20, :toc0) [{fromc10 1} {fromc20 2} {toc0 test1}] true",
"ExecutePre select toc from t where fromc1 = :fromc1 and fromc2 = :fromc2 for update [{fromc1 1} {fromc2 2} {toc test1}] false",
"ExecuteKeyspaceID select fc1 from t1 where fc1 = :fromc1 and fc2 = :fromc2 lock in share mode [{fromc1 1} {fromc2 2} {toc test1}] false",
"ExecuteKeyspaceID select fc1 from `dot.t1` where fc1 = :fromc1 and fc2 = :fromc2 lock in share mode [{fromc1 1} {fromc2 2} {toc test1}] false",
"ExecutePre update t set toc=:toc where fromc1 = :fromc1 and fromc2 = :fromc2 [{fromc1 1} {fromc2 2} {toc test1}] true",
})
}
Expand All @@ -308,14 +308,14 @@ func TestConsistentLookupCreateThenSkipUpdate(t *testing.T) {
vc.verifyLog(t, []string{
"ExecutePre insert into t(fromc1, fromc2, toc) values(:fromc10, :fromc20, :toc0) [{fromc10 1} {fromc20 2} {toc0 1}] true",
"ExecutePre select toc from t where fromc1 = :fromc1 and fromc2 = :fromc2 for update [{fromc1 1} {fromc2 2} {toc 1}] false",
"ExecuteKeyspaceID select fc1 from t1 where fc1 = :fromc1 and fc2 = :fromc2 lock in share mode [{fromc1 1} {fromc2 2} {toc 1}] false",
"ExecuteKeyspaceID select fc1 from `dot.t1` where fc1 = :fromc1 and fc2 = :fromc2 lock in share mode [{fromc1 1} {fromc2 2} {toc 1}] false",
})
}

func TestConsistentLookupCreateThenDupkey(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(nil, errors.New("Duplicate entry"))
vc.AddResult(nil, errors.New("Duplicate entry, pass mysql error as it is"))
vc.AddResult(makeTestResult(1), nil)
vc.AddResult(makeTestResult(1), nil)
vc.AddResult(&sqltypes.Result{}, nil)
Expand All @@ -327,14 +327,12 @@ func TestConsistentLookupCreateThenDupkey(t *testing.T) {
}},
[][]byte{[]byte("test1")},
false /* ignoreMode */)
want := "duplicate entry"
if err == nil || !strings.Contains(err.Error(), want) {
t.Errorf("lookup(query fail) err: %v, must contain %s", err, want)
}
require.Error(t, err)
assert.Contains(t, err.Error(), "Duplicate entry, pass mysql error as it is")
vc.verifyLog(t, []string{
"ExecutePre insert into t(fromc1, fromc2, toc) values(:fromc10, :fromc20, :toc0) [{fromc10 1} {fromc20 2} {toc0 test1}] true",
"ExecutePre select toc from t where fromc1 = :fromc1 and fromc2 = :fromc2 for update [{fromc1 1} {fromc2 2} {toc test1}] false",
"ExecuteKeyspaceID select fc1 from t1 where fc1 = :fromc1 and fc2 = :fromc2 lock in share mode [{fromc1 1} {fromc2 2} {toc test1}] false",
"ExecuteKeyspaceID select fc1 from `dot.t1` where fc1 = :fromc1 and fc2 = :fromc2 lock in share mode [{fromc1 1} {fromc2 2} {toc test1}] false",
})
}

Expand Down Expand Up @@ -499,7 +497,7 @@ func createConsistentLookup(t *testing.T, name string, writeOnly bool) SingleCol
sqlparser.NewColIdent("fc1"),
sqlparser.NewColIdent("fc2"),
}
if err := l.(WantOwnerInfo).SetOwnerInfo("ks", "t1", cols); err != nil {
if err := l.(WantOwnerInfo).SetOwnerInfo("ks", "dot.t1", cols); err != nil {
t.Fatal(err)
}
return l.(SingleColumn)
Expand Down

0 comments on commit ab29699

Please sign in to comment.