Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Binary PK: fix bug where padding of binary columns was being done incorrectly #6963

Merged
merged 3 commits into from
Oct 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions go/mysql/binlog_event_rbr.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,12 +872,7 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, styp querypb.Typ
l := int(data[pos])
mdata := data[pos+1 : pos+1+l]
if sqltypes.IsBinary(styp) {
// Fixed length binaries have to be padded with zeroes
// up to the length of the field. Otherwise, equality checks
// fail against saved data. See https://github.com/vitessio/vitess/issues/3984.
ret := make([]byte, max)
copy(ret, mdata)
return sqltypes.MakeTrusted(querypb.Type_BINARY, ret), l + 1, nil
return sqltypes.MakeTrusted(querypb.Type_BINARY, mdata), l + 1, nil
}
return sqltypes.MakeTrusted(querypb.Type_VARCHAR, mdata), l + 1, nil

Expand Down
10 changes: 3 additions & 7 deletions go/vt/vttablet/tabletmanager/vreplication/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -515,7 +514,6 @@ func expectNontxQueries(t *testing.T, queries []string) {
retry:
select {
case got = <-globalDBQueries:

if got == "begin" || got == "commit" || got == "rollback" || strings.Contains(got, "update _vt.vreplication set pos") || heartbeatRe.MatchString(got) {
goto retry
}
Expand All @@ -530,11 +528,9 @@ func expectNontxQueries(t *testing.T, queries []string) {
} else {
match = (got == query)
}
if !match {
t.Errorf("query:\n%q, does not match query %d:\n%q", got, i, query)
}
require.True(t, match, "query %d:: got:%s, want:%s", i, got, query)
case <-time.After(5 * time.Second):
t.Errorf("no query received, expecting %s", query)
t.Fatalf("no query received, expecting %s", query)
failed = true
}
}
Expand Down
29 changes: 23 additions & 6 deletions go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,10 @@ type colExpr struct {
// references contains all the column names referenced in the expression.
references map[string]bool

isGrouped bool
isPK bool
isGrouped bool
isPK bool
dataType string
columnType string
}

// operation is the opcode for the colExpr.
Expand Down Expand Up @@ -470,12 +472,14 @@ func (tpb *tablePlanBuilder) analyzePK(pkInfoMap map[string][]*PrimaryKeyInfo) e
for _, pkcol := range pkcols {
cexpr := tpb.findCol(sqlparser.NewColIdent(pkcol.Name))
if cexpr == nil {
return fmt.Errorf("primary key column %s not found in select list", pkcol)
return fmt.Errorf("primary key column %v not found in select list", pkcol)
}
if cexpr.operation != opExpr {
return fmt.Errorf("primary key column %s is not allowed to reference an aggregate expression", pkcol)
return fmt.Errorf("primary key column %v is not allowed to reference an aggregate expression", pkcol)
}
cexpr.isPK = true
cexpr.dataType = pkcol.DataType
cexpr.columnType = pkcol.ColumnType
tpb.pkCols = append(tpb.pkCols, cexpr)
}
return nil
Expand Down Expand Up @@ -662,16 +666,29 @@ func (tpb *tablePlanBuilder) generateDeleteStatement() *sqlparser.ParsedQuery {
return buf.ParsedQuery()
}

// For binary(n) column types, the value in the where clause needs to be padded with nulls upto the length of the column
// for MySQL comparison to work properly. This is achieved by casting it to the column type
func castIfNecessary(buf *sqlparser.TrackedBuffer, cexpr *colExpr) {
if cexpr.dataType == "binary" {
buf.Myprintf("cast(%v as %s)", cexpr.expr, cexpr.columnType)
return
}
buf.Myprintf("%v", cexpr.expr)
}

func (tpb *tablePlanBuilder) generateWhere(buf *sqlparser.TrackedBuffer, bvf *bindvarFormatter) {
buf.WriteString(" where ")
bvf.mode = bvBefore
separator := ""
for _, cexpr := range tpb.pkCols {
if _, ok := cexpr.expr.(*sqlparser.ColName); ok {
buf.Myprintf("%s%v=%v", separator, cexpr.colName, cexpr.expr)
buf.Myprintf("%s%v=", separator, cexpr.colName)
castIfNecessary(buf, cexpr)
} else {
// Parenthesize non-trivial expressions.
buf.Myprintf("%s%v=(%v)", separator, cexpr.colName, cexpr.expr)
buf.Myprintf("%s%v=(", separator, cexpr.colName)
castIfNecessary(buf, cexpr)
buf.Myprintf(")")
}
separator = " and "
}
Expand Down
100 changes: 100 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,106 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"
)

func TestPlayerCopyCharPK(t *testing.T) {
defer deleteTablet(addTablet(100))

savedPacketSize := *vstreamer.PacketSize
// PacketSize of 1 byte will send at most one row at a time.
*vstreamer.PacketSize = 1
defer func() { *vstreamer.PacketSize = savedPacketSize }()

savedCopyTimeout := copyTimeout
// copyTimeout should be low enough to have time to send one row.
copyTimeout = 500 * time.Millisecond
defer func() { copyTimeout = savedCopyTimeout }()

savedWaitRetryTime := waitRetryTime
// waitRetry time should be very low to cause the wait loop to execute multipel times.
waitRetryTime = 10 * time.Millisecond
defer func() { waitRetryTime = savedWaitRetryTime }()

execStatements(t, []string{
"create table src(idc binary(2) , val int, primary key(idc))",
"insert into src values('a', 1), ('c', 2)",
fmt.Sprintf("create table %s.dst(idc binary(2), val int, primary key(idc))", vrepldb),
})
defer execStatements(t, []string{
"drop table src",
fmt.Sprintf("drop table %s.dst", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

count := 0
vstreamRowsSendHook = func(ctx context.Context) {
defer func() { count++ }()
// Allow the first two calls to go through: field info and one row.
if count <= 1 {
return
}
// Insert a row with PK which is < the lastPK till now because of the utf8mb4 collation
execStatements(t, []string{
"update src set val = 3 where idc = 'a\000'",
})
// Wait for context to expire and then send the row.
// This will cause the copier to abort and go back to catchup mode.
<-ctx.Done()
// Do this no more than once.
vstreamRowsSendHook = nil
}

vstreamHook = func(context.Context) {
// Sleeping 50ms guarantees that the catchup wait loop executes multiple times.
// This is because waitRetryTime is set to 10ms.
time.Sleep(50 * time.Millisecond)
// Do this no more than once.
vstreamHook = nil
}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "dst",
Filter: "select * from src",
}},
}

bls := &binlogdatapb.BinlogSource{
Keyspace: env.KeyspaceName,
Shard: env.ShardName,
Filter: filter,
OnDdl: binlogdatapb.OnDDLAction_IGNORE,
}

query := binlogplayer.CreateVReplicationState("test", bls, "", binlogplayer.VReplicationInit, playerEngine.dbName)
qr, err := playerEngine.Exec(query)
if err != nil {
t.Fatal(err)
}
defer func() {
query := fmt.Sprintf("delete from _vt.vreplication where id = %d", qr.InsertID)
if _, err := playerEngine.Exec(query); err != nil {
t.Fatal(err)
}
expectDeleteQueries(t)
}()

expectNontxQueries(t, []string{
"/insert into _vt.vreplication",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
"insert into dst(idc,val) values ('a\\0',1)",
`/update _vt.copy_state set lastpk='fields:<name:\\"idc\\" type:BINARY > rows:<lengths:2 values:\\"a\\\\000\\" > ' where vrepl_id=.*`,
`update dst set val=3 where idc=cast('a' as binary(2)) and ('a') <= ('a\0')`,
"insert into dst(idc,val) values ('c\\0',2)",
`/update _vt.copy_state set lastpk='fields:<name:\\"idc\\" type:BINARY > rows:<lengths:2 values:\\"c\\\\000\\" > ' where vrepl_id=.*`,
"/delete from _vt.copy_state.*dst",
"/update _vt.vreplication set state='Running'",
})
expectData(t, "dst", [][]string{
{"a\000", "3"},
{"c\000", "2"},
})
}

// TestPlayerCopyVarcharPKCaseInsensitive tests the copy/catchup phase for a table with a varchar primary key
// which is case insensitive.
func TestPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
Expand Down
137 changes: 132 additions & 5 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,133 @@ import (
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

func TestCharPK(t *testing.T) {
defer deleteTablet(addTablet(100))

execStatements(t, []string{
"create table t1(id int, val binary(2), primary key(val))",
fmt.Sprintf("create table %s.t1(id int, val binary(2), primary key(val))", vrepldb),
"create table t2(id int, val char(2), primary key(val))",
fmt.Sprintf("create table %s.t2(id int, val char(2), primary key(val))", vrepldb),
"create table t3(id int, val varbinary(2), primary key(val))",
fmt.Sprintf("create table %s.t3(id int, val varbinary(2), primary key(val))", vrepldb),
"create table t4(id int, val varchar(2), primary key(val))",
fmt.Sprintf("create table %s.t4(id int, val varchar(2), primary key(val))", vrepldb),
})
defer execStatements(t, []string{
"drop table t1",
fmt.Sprintf("drop table %s.t1", vrepldb),
"drop table t2",
fmt.Sprintf("drop table %s.t2", vrepldb),
"drop table t3",
fmt.Sprintf("drop table %s.t3", vrepldb),
"drop table t4",
fmt.Sprintf("drop table %s.t4", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select * from t1",
}, {
Match: "t2",
Filter: "select * from t2",
}, {
Match: "t3",
Filter: "select * from t3",
}, {
Match: "t4",
Filter: "select * from t4",
}},
}
bls := &binlogdatapb.BinlogSource{
Keyspace: env.KeyspaceName,
Shard: env.ShardName,
Filter: filter,
OnDdl: binlogdatapb.OnDDLAction_IGNORE,
}
cancel, _ := startVReplication(t, bls, "")
defer cancel()

testcases := []struct {
input string
output string
table string
data [][]string
}{{ //binary(2)
input: "insert into t1 values(1, 'a')",
output: "insert into t1(id,val) values (1,'a')",
table: "t1",
data: [][]string{
{"1", "a\000"},
},
}, {
input: "update t1 set id = 2 where val = 'a\000'",
output: "update t1 set id=2 where val=cast('a' as binary(2))",
table: "t1",
data: [][]string{
{"2", "a\000"},
},
}, { //char(2)
input: "insert into t2 values(1, 'a')",
output: "insert into t2(id,val) values (1,'a')",
table: "t2",
data: [][]string{
{"1", "a"},
},
}, {
input: "update t2 set id = 2 where val = 'a'",
output: "update t2 set id=2 where val='a'",
table: "t2",
data: [][]string{
{"2", "a"},
},
}, { //varbinary(2)
input: "insert into t3 values(1, 'a')",
output: "insert into t3(id,val) values (1,'a')",
table: "t3",
data: [][]string{
{"1", "a"},
},
}, {
input: "update t3 set id = 2 where val = 'a'",
output: "update t3 set id=2 where val='a'",
table: "t3",
data: [][]string{
{"2", "a"},
},
}, { //varchar(2)
input: "insert into t4 values(1, 'a')",
output: "insert into t4(id,val) values (1,'a')",
table: "t4",
data: [][]string{
{"1", "a"},
},
}, {
input: "update t4 set id = 2 where val = 'a'",
output: "update t4 set id=2 where val='a'",
table: "t4",
data: [][]string{
{"2", "a"},
},
}}

for _, tcases := range testcases {
execStatements(t, []string{tcases.input})
output := []string{
"begin",
tcases.output,
"/update _vt.vreplication set pos",
"commit",
}
expectDBClientQueries(t, output)
if tcases.table != "" {
expectData(t, tcases.table, tcases.data)
}
}
}

func TestRollup(t *testing.T) {
defer deleteTablet(addTablet(100))

Expand Down Expand Up @@ -1153,10 +1280,10 @@ func TestPlayerTypes(t *testing.T) {
},
}, {
input: "insert into vitess_strings values('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'a', 'a,b')",
output: "insert into vitess_strings(vb,c,vc,b,tb,bl,ttx,tx,en,s) values ('a','b','c','d\\0\\0\\0','e','f','g','h','1','3')",
output: "insert into vitess_strings(vb,c,vc,b,tb,bl,ttx,tx,en,s) values ('a','b','c','d','e','f','g','h','1','3')",
table: "vitess_strings",
data: [][]string{
{"a", "b", "c", "d\x00\x00\x00", "e", "f", "g", "h", "a", "a,b"},
{"a", "b", "c", "d\000\000\000", "e", "f", "g", "h", "a", "a,b"},
},
}, {
input: "insert into vitess_misc values(1, '\x01', '2012-01-01', '2012-01-01 15:45:45', '15:45:45', point(1, 2))",
Expand All @@ -1174,15 +1301,15 @@ func TestPlayerTypes(t *testing.T) {
},
}, {
input: "insert into binary_pk values('a', 'aaa')",
output: "insert into binary_pk(b,val) values ('a\\0\\0\\0','aaa')",
output: "insert into binary_pk(b,val) values ('a','aaa')",
table: "binary_pk",
data: [][]string{
{"a\x00\x00\x00", "aaa"},
{"a\000\000\000", "aaa"},
},
}, {
// Binary pk is a special case: https://github.com/vitessio/vitess/issues/3984
input: "update binary_pk set val='bbb' where b='a\\0\\0\\0'",
output: "update binary_pk set val='bbb' where b='a\\0\\0\\0'",
output: "update binary_pk set val='bbb' where b=cast('a' as binary(4))",
table: "binary_pk",
data: [][]string{
{"a\x00\x00\x00", "bbb"},
Expand Down
Loading