Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer: fix the case-sensitive issue (#1738) #1886

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions chaos/cases/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ import (
"github.com/chaos-mesh/go-sqlsmith"
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"github.com/pingcap/tidb-tools/pkg/dbutil"

config2 "github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/conn"
Expand Down Expand Up @@ -90,6 +89,17 @@ func newTask(ctx context.Context, cli pb.MasterClient, taskFile string, schema s
if err2 != nil {
return nil, err2
}
if taskCfg.CaseSensitive {
lcSetting, err2 := utils.FetchLowerCaseTableNamesSetting(ctx, conn.baseConn.DBConn)
if err2 != nil {
return nil, err2
}
if lcSetting == utils.LCTableNamesMixed {
msg := "can not set `case-sensitive = true` when upstream `lower_case_table_names = 2`"
log.L().Error(msg, zap.Any("instance", cfg))
return nil, errors.New(msg)
}
}
sourceDBs = append(sourceDBs, db)
sourceConns = append(sourceConns, conn)
res = append(res, singleResult{})
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4
github.com/pingcap/parser v0.0.0-20210415081931-48e7f467fd74
github.com/pingcap/tidb v1.1.0-beta.0.20210330094614-60111e1c4b6f
github.com/pingcap/tidb-tools v5.0.1-0.20210420102153-beed8ddc59e9+incompatible
github.com/pingcap/tidb-tools v5.1.0-alpha.0.20210603090026-288ab02f1c79+incompatible
github.com/prometheus/client_golang v1.5.1
github.com/rakyll/statik v0.1.6
github.com/shopspring/decimal v0.0.0-20200105231215-408a2507e114
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -847,8 +847,8 @@ github.com/pingcap/tidb-tools v4.0.1+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnw
github.com/pingcap/tidb-tools v4.0.5-0.20200820082341-afeaaaaaa153+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb-tools v4.0.5-0.20200820092506-34ea90c93237+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb-tools v5.0.1-0.20210420102153-beed8ddc59e9+incompatible h1:jvsCYfIx30AnyQQxfzoCdzm8xRX0ZRJT3BxpFSKqrTo=
github.com/pingcap/tidb-tools v5.0.1-0.20210420102153-beed8ddc59e9+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb-tools v5.1.0-alpha.0.20210603090026-288ab02f1c79+incompatible h1:yCmJpDxoxm7kU+K5ORfLmzwLwJ16CdIYoJtP7INyN4w=
github.com/pingcap/tidb-tools v5.1.0-alpha.0.20210603090026-288ab02f1c79+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20200417094153-7316d94df1ee/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20200604070248-508f03b0b342/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
Expand Down
2 changes: 1 addition & 1 deletion loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ func (l *Loader) skipSchemaAndTable(table *filter.Table) bool {
table.Name = unescapePercent(table.Name, l.logger)

tbs := []*filter.Table{table}
tbs = l.baList.ApplyOn(tbs)
tbs = l.baList.Apply(tbs)
return len(tbs) == 0
}

Expand Down
21 changes: 15 additions & 6 deletions pkg/parser/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"

"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
Expand Down Expand Up @@ -46,13 +47,20 @@ func Parse(p *parser.Parser, sql, charset, collation string) (stmt []ast.StmtNod

// ref: https://github.com/pingcap/tidb/blob/09feccb529be2830944e11f5fed474020f50370f/server/sql_info_fetcher.go#L46
type tableNameExtractor struct {
curDB string
names []*filter.Table
curDB string
flavor utils.LowerCaseTableNamesFlavor
names []*filter.Table
}

func (tne *tableNameExtractor) Enter(in ast.Node) (ast.Node, bool) {
if t, ok := in.(*ast.TableName); ok {
tb := &filter.Table{Schema: t.Schema.L, Name: t.Name.L}
var tb *filter.Table
if tne.flavor == utils.LCTableNamesSensitive {
tb = &filter.Table{Schema: t.Schema.O, Name: t.Name.O}
} else {
tb = &filter.Table{Schema: t.Schema.L, Name: t.Name.L}
}

if tb.Schema == "" {
tb.Schema = tne.curDB
}
Expand All @@ -71,7 +79,7 @@ func (tne *tableNameExtractor) Leave(in ast.Node) (ast.Node, bool) {
// specifically, for `create table like` DDL, result contains [sourceTableName, sourceRefTableName]
// for rename table ddl, result contains [old1, new1, old1, new1, old2, new2, old3, new3, ...] because of TiDB parser
// for other DDL, order of tableName is the node visit order.
func FetchDDLTableNames(schema string, stmt ast.StmtNode) ([]*filter.Table, error) {
func FetchDDLTableNames(schema string, stmt ast.StmtNode, flavor utils.LowerCaseTableNamesFlavor) ([]*filter.Table, error) {
switch stmt.(type) {
case ast.DDLNode:
default:
Expand All @@ -89,8 +97,9 @@ func FetchDDLTableNames(schema string, stmt ast.StmtNode) ([]*filter.Table, erro
}

e := &tableNameExtractor{
curDB: schema,
names: make([]*filter.Table, 0),
curDB: schema,
flavor: flavor,
names: make([]*filter.Table, 0),
}
stmt.Accept(e)

Expand Down
15 changes: 8 additions & 7 deletions pkg/parser/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb-tools/pkg/filter"

"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

var _ = Suite(&testParserSuite{})
Expand Down Expand Up @@ -63,11 +64,11 @@ var testCases = []testCase{
[]string{"DROP DATABASE IF EXISTS `xs1`"},
},
{
"drop table `s1`.`t1`",
[]string{"DROP TABLE IF EXISTS `s1`.`t1`"},
[][]*filter.Table{{genTableName("s1", "t1")}},
[][]*filter.Table{{genTableName("xs1", "xt1")}},
[]string{"DROP TABLE IF EXISTS `xs1`.`xt1`"},
"drop table `Ss1`.`tT1`",
[]string{"DROP TABLE IF EXISTS `Ss1`.`tT1`"},
[][]*filter.Table{{genTableName("Ss1", "tT1")}},
[][]*filter.Table{{genTableName("xSs1", "xtT1")}},
[]string{"DROP TABLE IF EXISTS `xSs1`.`xtT1`"},
},
{
"drop table `s1`.`t1`, `s2`.`t2`",
Expand Down Expand Up @@ -374,7 +375,7 @@ func (t *testParserSuite) TestError(c *C) {

stmts, err := Parse(p, dml, "", "")
c.Assert(err, IsNil)
_, err = FetchDDLTableNames("test", stmts[0])
_, err = FetchDDLTableNames("test", stmts[0], utils.LCTableNamesInsensitive)
c.Assert(terror.ErrUnknownTypeDDL.Equal(err), IsTrue)

_, err = RenameDDLTable(stmts[0], nil)
Expand Down Expand Up @@ -406,7 +407,7 @@ func (t *testParserSuite) TestResolveDDL(c *C) {
c.Assert(err, IsNil)
c.Assert(s, HasLen, 1)

tableNames, err := FetchDDLTableNames("test", s[0])
tableNames, err := FetchDDLTableNames("test", s[0], utils.LCTableNamesSensitive)
c.Assert(err, IsNil)
c.Assert(tableNames, DeepEquals, tbs[j])

Expand Down
52 changes: 52 additions & 0 deletions pkg/shardddl/optimism/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,55 @@ func deleteDroppedColumnByColumnOp(lockID, column string) clientv3.Op {
func deleteDroppedColumnsByLockOp(lockID string) clientv3.Op {
return clientv3.OpDelete(common.ShardDDLOptimismDroppedColumnsKeyAdapter.Encode(lockID), clientv3.WithPrefix())
}

// deleteSourceDroppedColumnsOp return a DELETE etcd operation for the specified lock relate to a upstream table.
func deleteSourceDroppedColumnsOp(lockID, column, source, upSchema, upTable string) clientv3.Op {
return clientv3.OpDelete(common.ShardDDLOptimismDroppedColumnsKeyAdapter.Encode(lockID, column, source, upSchema, upTable))
}

// CheckColumns try to check and fix all the schema and table names for delete columns infos.
func CheckColumns(cli *clientv3.Client, source string, schemaMap map[string]string, tablesMap map[string]map[string]string) error {
allColInfos, _, err := GetAllDroppedColumns(cli)
if err != nil {
return err
}

for lockID, colDropInfo := range allColInfos {
for columnName, sourceDropInfo := range colDropInfo {
tableInfos, ok := sourceDropInfo[source]
if !ok {
continue
}
for schema, tableDropInfo := range tableInfos {
realSchema, hasChange := schemaMap[schema]
if !hasChange {
realSchema = schema
}
tableMap := tablesMap[schema]
for table, stage := range tableDropInfo {
realTable, tblChange := tableMap[table]
if !tblChange {
realTable = table
tblChange = hasChange
}
if tblChange {
key := common.ShardDDLOptimismDroppedColumnsKeyAdapter.Encode(lockID, columnName, source, realSchema, realTable)
val, err := json.Marshal(stage)
if err != nil {
return err
}
opPut := clientv3.OpPut(key, string(val))
opDel := deleteSourceDroppedColumnsOp(lockID, columnName, source, schema, table)

_, _, err = etcdutil.DoOpsInOneTxnWithRetry(cli, opPut, opDel)
if err != nil {
return err
}
}
}
}
}
}

return nil
}
44 changes: 44 additions & 0 deletions pkg/shardddl/optimism/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,3 +344,47 @@ func (oldInfo *OldInfo) toInfo() Info {
TableInfosAfter: []*model.TableInfo{oldInfo.TableInfoAfter},
}
}

// CheckDDLInfos try to check and fix all the schema and table names for DDL info.
func CheckDDLInfos(cli *clientv3.Client, source string, schemaMap map[string]string, tablesMap map[string]map[string]string) error {
allInfos, _, err := GetAllInfo(cli)
if err != nil {
return err
}

for _, taskTableInfos := range allInfos {
sourceInfos, ok := taskTableInfos[source]
if !ok {
continue
}
for schema, tblInfos := range sourceInfos {
realSchema, hasChange := schemaMap[schema]
if !hasChange {
realSchema = schema
}

tblMap := tablesMap[schema]
for tbl, info := range tblInfos {
realTable, tableChange := tblMap[tbl]
if !tableChange {
realTable = tbl
tableChange = hasChange
}
if tableChange {
delOp := deleteInfoOp(info)
info.UpSchema = realSchema
info.UpTable = realTable
putOp, err := putInfoOp(info)
if err != nil {
return err
}
_, _, err = etcdutil.DoOpsInOneTxnWithRetry(cli, delOp, putOp)
if err != nil {
return err
}
}
}
}
}
return nil
}
45 changes: 45 additions & 0 deletions pkg/shardddl/optimism/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,3 +282,48 @@ func WatchOperationPut(ctx context.Context, cli *clientv3.Client,
func deleteOperationOp(op Operation) clientv3.Op {
return clientv3.OpDelete(common.ShardDDLOptimismOperationKeyAdapter.Encode(op.Task, op.Source, op.UpSchema, op.UpTable))
}

// CheckOperations try to check and fix all the schema and table names for operation infos.
func CheckOperations(cli *clientv3.Client, source string, schemaMap map[string]string, tablesMap map[string]map[string]string) error {
allOperations, rev, err := GetAllOperations(cli)
if err != nil {
return err
}

for _, taskTableOps := range allOperations {
sourceOps, ok := taskTableOps[source]
if !ok {
continue
}
for schema, tblOps := range sourceOps {
realSchema, hasChange := schemaMap[schema]
if !hasChange {
realSchema = schema
}

tblMap := tablesMap[schema]
for tbl, info := range tblOps {
realTable, tblChange := tblMap[tbl]
if !tblChange {
realTable = tbl
tblChange = hasChange
}
if tblChange {
newOperation := info
newOperation.UpSchema = realSchema
newOperation.UpTable = realTable
_, _, err = PutOperation(cli, false, newOperation, rev)
if err != nil {
return err
}
deleteOp := deleteOperationOp(info)
_, _, err = etcdutil.DoOpsInOneTxnWithRetry(cli, deleteOp)
if err != nil {
return err
}
}
}
}
}
return err
}
55 changes: 55 additions & 0 deletions pkg/shardddl/optimism/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,3 +307,58 @@ func putSourceTablesOp(st SourceTables) (clientv3.Op, error) {
key := common.ShardDDLOptimismSourceTablesKeyAdapter.Encode(st.Task, st.Source)
return clientv3.OpPut(key, value), nil
}

// CheckSourceTables try to check and fix all the source schemas and table names.
func CheckSourceTables(cli *clientv3.Client, source string, schemaMap map[string]string, talesMap map[string]map[string]string) error {
allSourceTables, _, err := GetAllSourceTables(cli)
if err != nil {
return err
}

for _, taskSourceTables := range allSourceTables {
sourceTables, ok := taskSourceTables[source]
if !ok {
continue
}
schemaKeys := make([]string, 0)
tblKeys := make([]string, 0)
hasChange := false
for _, tableSources := range sourceTables.Tables {
for _, sources := range tableSources {
for schema, tbls := range sources {
if _, ok := schemaMap[schema]; ok {
schemaKeys = append(schemaKeys, schema)
hasChange = true
}

tblMap, ok := talesMap[schema]
if !ok {
continue
}
for tbl := range tbls {
if t, ok := tblMap[tbl]; ok {
tblKeys = append(tblKeys, t)
hasChange = true
}
}
for _, t := range tblKeys {
tbls[tblMap[t]] = tbls[t]
delete(tbls, t)
}
tblKeys = tblKeys[:0]
}
for _, s := range schemaKeys {
sources[schemaMap[s]] = sources[s]
delete(sources, s)
}
schemaKeys = schemaKeys[:0]
}
}
if hasChange {
if _, err = PutSourceTables(cli, sourceTables); err != nil {
return err
}
}
}
return err
}
Loading