Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Routing Rules: Improved escaping of reserved and invalid names for tables and keyspaces #9522

Merged
merged 2 commits into from
Jan 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions go/sqlescape/ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,12 @@ func EscapeIDs(identifiers []string) []string {
}
return result
}

// UnescapeID reverses any backticking in the input string.
func UnescapeID(in string) string {
mattlord marked this conversation as resolved.
Show resolved Hide resolved
l := len(in)
if l >= 2 && in[0] == '`' && in[l-1] == '`' {
return in[1 : l-1]
}
return in
}
2 changes: 0 additions & 2 deletions go/test/endtoend/sharding/base_sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ func checkStreamHealthEqualsBinlogPlayerVars(t *testing.T, vttablet cluster.Vtta

streamCountStr := fmt.Sprintf("%v", reflect.ValueOf(tabletVars["VReplicationStreamCount"]))
streamCount, _ := strconv.Atoi(streamCountStr)
log.Infof(">>>>>>>>>>>>>>>>>> tabletVars are %+v", tabletVars)
vreplicationLagMaxStr := fmt.Sprintf("%v", reflect.ValueOf(tabletVars["VReplicationLagSecondsMax"]))
vreplicationLagMax, _ := strconv.ParseFloat(vreplicationLagMaxStr, 64)

Expand All @@ -203,7 +202,6 @@ func checkStreamHealthEqualsBinlogPlayerVars(t *testing.T, vttablet cluster.Vtta
assert.NotNil(t, streamHealthResponse.RealtimeStats.BinlogPlayersCount)

assert.Equal(t, streamCount, int(streamHealthResponse.RealtimeStats.BinlogPlayersCount))
log.Infof(">>>>>>>>> vreplicationLagMax %v, FilteredReplicationLagSeconds %v", vreplicationLagMax, streamHealthResponse.RealtimeStats.FilteredReplicationLagSeconds)
assert.Equal(t, vreplicationLagMax, float64(streamHealthResponse.RealtimeStats.FilteredReplicationLagSeconds))
}

Expand Down
3 changes: 2 additions & 1 deletion go/test/endtoend/vreplication/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ func getClusterConfig(idx int, dataRootDir string) *ClusterConfig {
}

return &ClusterConfig{
hostname: "localhost",
// The ipv4 loopback address is used with the mysql client so that tcp is used in the test ("localhost" causes the socket file to be used, which fails)
hostname: "127.0.0.1",
topoPort: etcdPort,
vtctldPort: basePort,
vtctldGrpcPort: basePort + 999,
Expand Down
20 changes: 13 additions & 7 deletions go/test/endtoend/vreplication/config.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package vreplication

// The product, customer, and tenant tables are used to exercise and test most Workflow variants.
// The product, customer, and Lead tables are used to exercise and test most Workflow variants.
// We violate the NO_ZERO_DATES and NO_ZERO_IN_DATE sql_modes that are enabled by default in
// MySQL 5.7+ and MariaDB 10.2+ to ensure that vreplication still works everywhere and the
// permissive sql_mode now used in vreplication causes no unwanted side effects.
// The Lead table also allows us to test several things:
// 1. Mixed case identifiers
// 2. Column names with special characters in them, namely a dash
// 3. Identifiers using reserved words, as lead is a reserved word in MySQL 8.0+ (https://dev.mysql.com/doc/refman/8.0/en/keywords.html)
var (
initialProductSchema = `
create table product(pid int, description varbinary(128), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key(pid));
Expand All @@ -15,7 +19,7 @@ create table orders(oid int, cid int, pid int, mname varchar(128), price int, qt
create table order_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
create table customer2(cid int, name varbinary(128), typ enum('individual','soho','enterprise'), sport set('football','cricket','baseball'),ts timestamp not null default current_timestamp, primary key(cid));
create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
create table tenant(tenant_id binary(16), name varbinary(16), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key (tenant_id));
create table ` + "`Lead`(`Lead-id`" + ` binary(16), name varbinary(16), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key (` + "`Lead-id`" + `));
`

initialProductVSchema = `
Expand All @@ -35,7 +39,7 @@ create table tenant(tenant_id binary(16), name varbinary(16), date1 datetime not
"order_seq": {
"type": "sequence"
},
"tenant": {}
"Lead": {}
}
}
`
Expand Down Expand Up @@ -76,10 +80,10 @@ create table tenant(tenant_id binary(16), name varbinary(16), date1 datetime not
"sequence": "customer_seq2"
}
},
"tenant": {
"Lead": {
"column_vindexes": [
{
"column": "tenant_id",
"column": "Lead-id",
"name": "bmd5"
}
]
Expand Down Expand Up @@ -241,11 +245,13 @@ create table tenant(tenant_id binary(16), name varbinary(16), date1 datetime not
}
}
`

// the merchant-type keyspace allows us to test keyspace names with special characters in them (dash)
materializeMerchantOrdersSpec = `
{
"workflow": "morders",
"sourceKeyspace": "customer",
"targetKeyspace": "merchant",
"targetKeyspace": "merchant-type",
"tableSettings": [{
"targetTable": "morders",
"sourceExpression": "select oid, cid, mname, pid, price, qty, total from orders",
Expand All @@ -258,7 +264,7 @@ create table tenant(tenant_id binary(16), name varbinary(16), date1 datetime not
{
"workflow": "msales",
"sourceKeyspace": "customer",
"targetKeyspace": "merchant",
"targetKeyspace": "merchant-type",
"tableSettings": [{
"targetTable": "msales",
"sourceExpression": "select mname as merchant_name, count(*) as kount, sum(price) as amount from orders group by merchant_name",
Expand Down
7 changes: 7 additions & 0 deletions go/test/endtoend/vreplication/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"net/http"
"os/exec"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -288,3 +289,9 @@ func printRoutingRules(t *testing.T, vc *VitessCluster, msg string) error {
fmt.Printf("Routing Rules::%s:\n%s\n", msg, output)
return nil
}

func osExec(t *testing.T, command string, args []string) (string, error) {
cmd := exec.Command(command, args...)
output, err := cmd.CombinedOutput()
return string(output), err
}
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func testMoveTablesV2Workflow(t *testing.T) {

testVSchemaForSequenceAfterMoveTables(t)

createMoveTablesWorkflow(t, "tenant")
createMoveTablesWorkflow(t, "Lead")
output, _ = vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...)
require.Contains(t, output, "Following workflow(s) found in keyspace customer: wf1")

Expand Down
6 changes: 4 additions & 2 deletions go/test/endtoend/vreplication/unsharded_init_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ insert into orders(oid, cid, mname, pid, price, qty) values(3, 2, 'monoprice', 2
insert into customer2(cid, name, typ, sport) values(1, 'john',1,'football,baseball');
insert into customer2(cid, name, typ, sport) values(2, 'paul','soho','cricket');
insert into customer2(cid, name, typ, sport) values(3, 'ringo','enterprise','');
-- for testing edge case where inserted binary value is 15 bytes, field is 16, mysql adds a null while storing but binlog returns 15 bytes
insert into tenant(tenant_id, name) values (x'02BD00987932461E8820C908E84BAE', 'abc');
-- for testing edge cases:
-- 1. where inserted binary value is 15 bytes, field is 16, mysql adds a null while storing but binlog returns 15 bytes
-- 2. where mixed case, special characters, or reserved words are used in identifiers
insert into `Lead`(`Lead-id`, name) values (x'02BD00987932461E8820C908E84BAE', 'abc');


83 changes: 52 additions & 31 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,12 @@ var (

// for some tests we keep an open transaction during a SwitchWrites and commit it afterwards, to reproduce https://github.com/vitessio/vitess/issues/9400
// we also then delete the extra row (if) added so that the row counts for the future count comparisons stay the same
const openTxQuery = "insert into customer(cid, name, typ, sport, meta) values(4, 'openTxQuery',1,'football,baseball','{}');"
const deleteOpenTxQuery = "delete from customer where name = 'openTxQuery'"
const (
openTxQuery = "insert into customer(cid, name, typ, sport, meta) values(4, 'openTxQuery',1,'football,baseball','{}');"
deleteOpenTxQuery = "delete from customer where name = 'openTxQuery'"

merchantKeyspace = "merchant-type"
)

func init() {
defaultRdonly = 0
Expand Down Expand Up @@ -118,12 +122,11 @@ func TestBasicVreplicationWorkflow(t *testing.T) {
verifyClusterHealth(t, vc)
insertInitialData(t)
materializeRollup(t)

shardCustomer(t, true, []*Cell{defaultCell}, defaultCellName, false)

// the tenant table was to test a specific case with binary sharding keys. Drop it now so that we don't
// the Lead table was to test a specific case with binary sharding keys. Drop it now so that we don't
// have to update the rest of the tests
execVtgateQuery(t, vtgateConn, "customer", "drop table tenant")
execVtgateQuery(t, vtgateConn, "customer", "drop table `Lead`")
validateRollupReplicates(t)
shardOrders(t)
shardMerchant(t)
Expand Down Expand Up @@ -341,7 +344,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
defaultCell := cells[0]
custKs := vc.Cells[defaultCell.Name].Keyspaces["customer"]

tables := "customer,tenant"
tables := "customer,Lead"
moveTables(t, sourceCellOrAlias, workflow, sourceKs, targetKs, tables)

customerTab1 := custKs.Shards["-80"].Tablets["zone1-200"].Vttablet
Expand All @@ -356,7 +359,15 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
insertQuery1 := "insert into customer(cid, name) values(1001, 'tempCustomer1')"
matchInsertQuery1 := "insert into customer(cid, `name`) values (:vtg1, :vtg2)"
require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "product", insertQuery1, matchInsertQuery1))
execVtgateQuery(t, vtgateConn, "product", "update tenant set name='xyz'")

// confirm that the backticking of table names in the routing rules works
output, err := osExec(t, "mysql", []string{"-u", "vtdba", "-P", fmt.Sprintf("%d", vc.ClusterConfig.vtgateMySQLPort),
"--host=127.0.0.1", "-e", "select * from Lead"})
if err != nil {
require.FailNow(t, output)
}
execVtgateQuery(t, vtgateConn, "product", "update Lead set name='xyz'")

vdiff(t, ksWorkflow, "")
switchReadsDryRun(t, allCellNames, ksWorkflow, dryRunResultsReadCustomerShard)
switchReads(t, allCellNames, ksWorkflow)
Expand Down Expand Up @@ -485,7 +496,7 @@ func reshardCustomer2to4Split(t *testing.T, cells []*Cell, sourceCellOrAlias str

func reshardMerchant2to3SplitMerge(t *testing.T) {
t.Run("reshardMerchant2to3SplitMerge", func(t *testing.T) {
ksName := "merchant"
ksName := merchantKeyspace
counts := map[string]int{"zone1-1600": 0, "zone1-1700": 2, "zone1-1800": 0}
reshard(t, ksName, "merchant", "m2m3", "-80,80-", "-40,40-c0,c0-", 1600, counts, dryRunResultsSwitchWritesM2m3, nil, "")
validateCount(t, vtgateConn, ksName, "merchant", 2)
Expand All @@ -505,16 +516,18 @@ func reshardMerchant2to3SplitMerge(t *testing.T) {
}

for _, shard := range strings.Split("-40,40-c0,c0-", ",") {
output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", "merchant:"+shard)
ksShard := fmt.Sprintf("%s:%s", merchantKeyspace, shard)
output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", ksShard)
if err != nil {
t.Fatalf("GetShard merchant failed for: %s: %v", shard, err)
}
assert.NotContains(t, output, "node doesn't exist", "GetShard failed for valid shard merchant:"+shard)
assert.Contains(t, output, "primary_alias", "GetShard failed for valid shard merchant:"+shard)
assert.NotContains(t, output, "node doesn't exist", "GetShard failed for valid shard "+ksShard)
assert.Contains(t, output, "primary_alias", "GetShard failed for valid shard "+ksShard)
}

for _, shard := range strings.Split("-40,40-c0,c0-", ",") {
expectNumberOfStreams(t, vtgateConn, "reshardMerchant2to3SplitMerge", "m2m3", "merchant:"+shard, 0)
ksShard := fmt.Sprintf("%s:%s", merchantKeyspace, shard)
expectNumberOfStreams(t, vtgateConn, "reshardMerchant2to3SplitMerge", "m2m3", ksShard, 0)
}

var found bool
Expand All @@ -529,7 +542,7 @@ func reshardMerchant2to3SplitMerge(t *testing.T) {

func reshardMerchant3to1Merge(t *testing.T) {
t.Run("reshardMerchant3to1Merge", func(t *testing.T) {
ksName := "merchant"
ksName := merchantKeyspace
counts := map[string]int{"zone1-2000": 3}
reshard(t, ksName, "merchant", "m3m1", "-40,40-c0,c0-", "0", 2000, counts, nil, nil, "")
validateCount(t, vtgateConn, ksName, "merchant", 3)
Expand Down Expand Up @@ -635,33 +648,41 @@ func shardMerchant(t *testing.T) {
workflow := "p2m"
cell := defaultCell.Name
sourceKs := "product"
targetKs := "merchant"
targetKs := merchantKeyspace
tables := "merchant"
ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow)
if _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "merchant", "-80,80-", merchantVSchema, "", defaultReplicas, defaultRdonly, 400); err != nil {
if _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, merchantKeyspace, "-80,80-", merchantVSchema, "", defaultReplicas, defaultRdonly, 400); err != nil {
t.Fatal(err)
}
if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "merchant", "-80"), 1); err != nil {
if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", merchantKeyspace, "-80"), 1); err != nil {
t.Fatal(err)
}
if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "merchant", "80-"), 1); err != nil {
if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", merchantKeyspace, "80-"), 1); err != nil {
t.Fatal(err)
}
moveTables(t, cell, workflow, sourceKs, targetKs, tables)
merchantKs := vc.Cells[defaultCell.Name].Keyspaces["merchant"]
merchantKs := vc.Cells[defaultCell.Name].Keyspaces[merchantKeyspace]
merchantTab1 := merchantKs.Shards["-80"].Tablets["zone1-400"].Vttablet
merchantTab2 := merchantKs.Shards["80-"].Tablets["zone1-500"].Vttablet
catchup(t, merchantTab1, workflow, "MoveTables")
catchup(t, merchantTab2, workflow, "MoveTables")

vdiff(t, "merchant.p2m", "")
vdiff(t, fmt.Sprintf("%s.%s", merchantKeyspace, workflow), "")
switchReads(t, allCellNames, ksWorkflow)
switchWrites(t, ksWorkflow, false)
printRoutingRules(t, vc, "After merchant movetables")

// confirm that the backticking of keyspaces in the routing rules works
output, err := osExec(t, "mysql", []string{"-u", "vtdba", "-P", fmt.Sprintf("%d", vc.ClusterConfig.vtgateMySQLPort),
fmt.Sprintf("--host=%s", vc.ClusterConfig.hostname), "-e", "select * from merchant"})
if err != nil {
require.FailNow(t, output)
}
dropSources(t, ksWorkflow)

validateCountInTablet(t, merchantTab1, "merchant", "merchant", 1)
validateCountInTablet(t, merchantTab2, "merchant", "merchant", 1)
validateCount(t, vtgateConn, "merchant", "merchant", 2)
validateCountInTablet(t, merchantTab1, merchantKeyspace, "merchant", 1)
validateCountInTablet(t, merchantTab2, merchantKeyspace, "merchant", 1)
validateCount(t, vtgateConn, merchantKeyspace, "merchant", 2)
})
}

Expand Down Expand Up @@ -844,29 +865,29 @@ func materializeMerchantSales(t *testing.T) {
t.Run("materializeMerchantSales", func(t *testing.T) {
workflow := "msales"
materialize(t, materializeMerchantSalesSpec)
merchantTablets := vc.getVttabletsInKeyspace(t, defaultCell, "merchant", "primary")
merchantTablets := vc.getVttabletsInKeyspace(t, defaultCell, merchantKeyspace, "primary")
for _, tab := range merchantTablets {
catchup(t, tab, workflow, "Materialize")
}
validateCountInTablet(t, merchantTablets["zone1-400"], "merchant", "msales", 1)
validateCountInTablet(t, merchantTablets["zone1-500"], "merchant", "msales", 1)
validateCount(t, vtgateConn, "merchant", "msales", 2)
validateCountInTablet(t, merchantTablets["zone1-400"], merchantKeyspace, "msales", 1)
validateCountInTablet(t, merchantTablets["zone1-500"], merchantKeyspace, "msales", 1)
validateCount(t, vtgateConn, merchantKeyspace, "msales", 2)
})
}

func materializeMerchantOrders(t *testing.T) {
t.Run("materializeMerchantOrders", func(t *testing.T) {
workflow := "morders"
keyspace := "merchant"
keyspace := merchantKeyspace
applyVSchema(t, merchantOrdersVSchema, keyspace)
materialize(t, materializeMerchantOrdersSpec)
merchantTablets := vc.getVttabletsInKeyspace(t, defaultCell, "merchant", "primary")
merchantTablets := vc.getVttabletsInKeyspace(t, defaultCell, merchantKeyspace, "primary")
for _, tab := range merchantTablets {
catchup(t, tab, workflow, "Materialize")
}
validateCountInTablet(t, merchantTablets["zone1-400"], "merchant", "morders", 2)
validateCountInTablet(t, merchantTablets["zone1-500"], "merchant", "morders", 1)
validateCount(t, vtgateConn, "merchant", "morders", 3)
validateCountInTablet(t, merchantTablets["zone1-400"], merchantKeyspace, "morders", 2)
validateCountInTablet(t, merchantTablets["zone1-500"], merchantKeyspace, "morders", 1)
validateCount(t, vtgateConn, merchantKeyspace, "morders", 3)
})
}

Expand Down
Loading