diff --git a/chaos/cases/task.go b/chaos/cases/task.go index a983b7200f..6fbeeba283 100644 --- a/chaos/cases/task.go +++ b/chaos/cases/task.go @@ -24,11 +24,10 @@ import ( "github.com/chaos-mesh/go-sqlsmith" "github.com/pingcap/errors" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb-tools/pkg/dbutil" "go.uber.org/zap" "golang.org/x/sync/errgroup" - "github.com/pingcap/tidb-tools/pkg/dbutil" - config2 "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/pkg/conn" @@ -90,6 +89,17 @@ func newTask(ctx context.Context, cli pb.MasterClient, taskFile string, schema s if err2 != nil { return nil, err2 } + if taskCfg.CaseSensitive { + lcSetting, err2 := utils.FetchLowerCaseTableNamesSetting(ctx, conn.baseConn.DBConn) + if err2 != nil { + return nil, err2 + } + if lcSetting == utils.LCTableNamesMixed { + msg := "can not set `case-sensitive = true` when upstream `lower_case_table_names = 2`" + log.L().Error(msg, zap.Any("instance", cfg)) + return nil, errors.New(msg) + } + } sourceDBs = append(sourceDBs, db) sourceConns = append(sourceConns, conn) res = append(res, singleResult{}) diff --git a/go.mod b/go.mod index 66e0c4bb8e..71d08c926d 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 github.com/pingcap/parser v0.0.0-20210415081931-48e7f467fd74 github.com/pingcap/tidb v1.1.0-beta.0.20210330094614-60111e1c4b6f - github.com/pingcap/tidb-tools v5.0.1-0.20210420102153-beed8ddc59e9+incompatible + github.com/pingcap/tidb-tools v5.1.0-alpha.0.20210603090026-288ab02f1c79+incompatible github.com/prometheus/client_golang v1.5.1 github.com/rakyll/statik v0.1.6 github.com/shopspring/decimal v0.0.0-20200105231215-408a2507e114 diff --git a/go.sum b/go.sum index 5a01efd618..3da7a17007 100644 --- a/go.sum +++ b/go.sum @@ -845,8 +845,8 @@ github.com/pingcap/tidb-tools v4.0.1+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnw github.com/pingcap/tidb-tools v4.0.5-0.20200820082341-afeaaaaaa153+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tidb-tools v4.0.5-0.20200820092506-34ea90c93237+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= -github.com/pingcap/tidb-tools v5.0.1-0.20210420102153-beed8ddc59e9+incompatible h1:jvsCYfIx30AnyQQxfzoCdzm8xRX0ZRJT3BxpFSKqrTo= -github.com/pingcap/tidb-tools v5.0.1-0.20210420102153-beed8ddc59e9+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= +github.com/pingcap/tidb-tools v5.1.0-alpha.0.20210603090026-288ab02f1c79+incompatible h1:yCmJpDxoxm7kU+K5ORfLmzwLwJ16CdIYoJtP7INyN4w= +github.com/pingcap/tidb-tools v5.1.0-alpha.0.20210603090026-288ab02f1c79+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= github.com/pingcap/tipb v0.0.0-20200417094153-7316d94df1ee/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= github.com/pingcap/tipb v0.0.0-20200604070248-508f03b0b342/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= diff --git a/loader/loader.go b/loader/loader.go index d9c835ed63..34cbb1acc0 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -656,7 +656,7 @@ func (l *Loader) skipSchemaAndTable(table *filter.Table) bool { table.Name = unescapePercent(table.Name, l.logger) tbs := []*filter.Table{table} - tbs = l.baList.ApplyOn(tbs) + tbs = l.baList.Apply(tbs) return len(tbs) == 0 } diff --git a/pkg/parser/common.go b/pkg/parser/common.go index 028170bb63..bf91eb7549 100644 --- a/pkg/parser/common.go +++ b/pkg/parser/common.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/parser" "github.com/pingcap/parser/ast" @@ -46,13 +47,20 @@ func Parse(p *parser.Parser, sql, charset, collation string) (stmt []ast.StmtNod // ref: https://github.com/pingcap/tidb/blob/09feccb529be2830944e11f5fed474020f50370f/server/sql_info_fetcher.go#L46 type tableNameExtractor struct { - curDB string - names []*filter.Table + curDB string + flavor utils.LowerCaseTableNamesFlavor + names []*filter.Table } func (tne *tableNameExtractor) Enter(in ast.Node) (ast.Node, bool) { if t, ok := in.(*ast.TableName); ok { - tb := &filter.Table{Schema: t.Schema.L, Name: t.Name.L} + var tb *filter.Table + if tne.flavor == utils.LCTableNamesSensitive { + tb = &filter.Table{Schema: t.Schema.O, Name: t.Name.O} + } else { + tb = &filter.Table{Schema: t.Schema.L, Name: t.Name.L} + } + if tb.Schema == "" { tb.Schema = tne.curDB } @@ -71,7 +79,7 @@ func (tne *tableNameExtractor) Leave(in ast.Node) (ast.Node, bool) { // specifically, for `create table like` DDL, result contains [sourceTableName, sourceRefTableName] // for rename table ddl, result contains [old1, new1, old1, new1, old2, new2, old3, new3, ...] because of TiDB parser // for other DDL, order of tableName is the node visit order. -func FetchDDLTableNames(schema string, stmt ast.StmtNode) ([]*filter.Table, error) { +func FetchDDLTableNames(schema string, stmt ast.StmtNode, flavor utils.LowerCaseTableNamesFlavor) ([]*filter.Table, error) { switch stmt.(type) { case ast.DDLNode: default: @@ -89,8 +97,9 @@ func FetchDDLTableNames(schema string, stmt ast.StmtNode) ([]*filter.Table, erro } e := &tableNameExtractor{ - curDB: schema, - names: make([]*filter.Table, 0), + curDB: schema, + flavor: flavor, + names: make([]*filter.Table, 0), } stmt.Accept(e) diff --git a/pkg/parser/common_test.go b/pkg/parser/common_test.go index c29fbe520e..cc20fb31b1 100644 --- a/pkg/parser/common_test.go +++ b/pkg/parser/common_test.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/tidb-tools/pkg/filter" "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/pkg/utils" ) var _ = Suite(&testParserSuite{}) @@ -63,11 +64,11 @@ var testCases = []testCase{ []string{"DROP DATABASE IF EXISTS `xs1`"}, }, { - "drop table `s1`.`t1`", - []string{"DROP TABLE IF EXISTS `s1`.`t1`"}, - [][]*filter.Table{{genTableName("s1", "t1")}}, - [][]*filter.Table{{genTableName("xs1", "xt1")}}, - []string{"DROP TABLE IF EXISTS `xs1`.`xt1`"}, + "drop table `Ss1`.`tT1`", + []string{"DROP TABLE IF EXISTS `Ss1`.`tT1`"}, + [][]*filter.Table{{genTableName("Ss1", "tT1")}}, + [][]*filter.Table{{genTableName("xSs1", "xtT1")}}, + []string{"DROP TABLE IF EXISTS `xSs1`.`xtT1`"}, }, { "drop table `s1`.`t1`, `s2`.`t2`", @@ -374,7 +375,7 @@ func (t *testParserSuite) TestError(c *C) { stmts, err := Parse(p, dml, "", "") c.Assert(err, IsNil) - _, err = FetchDDLTableNames("test", stmts[0]) + _, err = FetchDDLTableNames("test", stmts[0], utils.LCTableNamesInsensitive) c.Assert(terror.ErrUnknownTypeDDL.Equal(err), IsTrue) _, err = RenameDDLTable(stmts[0], nil) @@ -406,7 +407,7 @@ func (t *testParserSuite) TestResolveDDL(c *C) { c.Assert(err, IsNil) c.Assert(s, HasLen, 1) - tableNames, err := FetchDDLTableNames("test", s[0]) + tableNames, err := FetchDDLTableNames("test", s[0], utils.LCTableNamesSensitive) c.Assert(err, IsNil) c.Assert(tableNames, DeepEquals, tbs[j]) diff --git a/pkg/shardddl/optimism/column.go b/pkg/shardddl/optimism/column.go index ef2c7fa619..db600bbcce 100644 --- a/pkg/shardddl/optimism/column.go +++ b/pkg/shardddl/optimism/column.go @@ -109,3 +109,55 @@ func deleteDroppedColumnByColumnOp(lockID, column string) clientv3.Op { func deleteDroppedColumnsByLockOp(lockID string) clientv3.Op { return clientv3.OpDelete(common.ShardDDLOptimismDroppedColumnsKeyAdapter.Encode(lockID), clientv3.WithPrefix()) } + +// deleteSourceDroppedColumnsOp return a DELETE etcd operation for the specified lock relate to a upstream table. +func deleteSourceDroppedColumnsOp(lockID, column, source, upSchema, upTable string) clientv3.Op { + return clientv3.OpDelete(common.ShardDDLOptimismDroppedColumnsKeyAdapter.Encode(lockID, column, source, upSchema, upTable)) +} + +// CheckColumns try to check and fix all the schema and table names for delete columns infos. +func CheckColumns(cli *clientv3.Client, source string, schemaMap map[string]string, tablesMap map[string]map[string]string) error { + allColInfos, _, err := GetAllDroppedColumns(cli) + if err != nil { + return err + } + + for lockID, colDropInfo := range allColInfos { + for columnName, sourceDropInfo := range colDropInfo { + tableInfos, ok := sourceDropInfo[source] + if !ok { + continue + } + for schema, tableDropInfo := range tableInfos { + realSchema, hasChange := schemaMap[schema] + if !hasChange { + realSchema = schema + } + tableMap := tablesMap[schema] + for table, stage := range tableDropInfo { + realTable, tblChange := tableMap[table] + if !tblChange { + realTable = table + tblChange = hasChange + } + if tblChange { + key := common.ShardDDLOptimismDroppedColumnsKeyAdapter.Encode(lockID, columnName, source, realSchema, realTable) + val, err := json.Marshal(stage) + if err != nil { + return err + } + opPut := clientv3.OpPut(key, string(val)) + opDel := deleteSourceDroppedColumnsOp(lockID, columnName, source, schema, table) + + _, _, err = etcdutil.DoOpsInOneTxnWithRetry(cli, opPut, opDel) + if err != nil { + return err + } + } + } + } + } + } + + return nil +} diff --git a/pkg/shardddl/optimism/info.go b/pkg/shardddl/optimism/info.go index 1865572c1b..d964d810b6 100644 --- a/pkg/shardddl/optimism/info.go +++ b/pkg/shardddl/optimism/info.go @@ -344,3 +344,47 @@ func (oldInfo *OldInfo) toInfo() Info { TableInfosAfter: []*model.TableInfo{oldInfo.TableInfoAfter}, } } + +// CheckDDLInfos try to check and fix all the schema and table names for DDL info. +func CheckDDLInfos(cli *clientv3.Client, source string, schemaMap map[string]string, tablesMap map[string]map[string]string) error { + allInfos, _, err := GetAllInfo(cli) + if err != nil { + return err + } + + for _, taskTableInfos := range allInfos { + sourceInfos, ok := taskTableInfos[source] + if !ok { + continue + } + for schema, tblInfos := range sourceInfos { + realSchema, hasChange := schemaMap[schema] + if !hasChange { + realSchema = schema + } + + tblMap := tablesMap[schema] + for tbl, info := range tblInfos { + realTable, tableChange := tblMap[tbl] + if !tableChange { + realTable = tbl + tableChange = hasChange + } + if tableChange { + delOp := deleteInfoOp(info) + info.UpSchema = realSchema + info.UpTable = realTable + putOp, err := putInfoOp(info) + if err != nil { + return err + } + _, _, err = etcdutil.DoOpsInOneTxnWithRetry(cli, delOp, putOp) + if err != nil { + return err + } + } + } + } + } + return nil +} diff --git a/pkg/shardddl/optimism/operation.go b/pkg/shardddl/optimism/operation.go index 2446bcd71f..c2ee8d5fed 100644 --- a/pkg/shardddl/optimism/operation.go +++ b/pkg/shardddl/optimism/operation.go @@ -282,3 +282,48 @@ func WatchOperationPut(ctx context.Context, cli *clientv3.Client, func deleteOperationOp(op Operation) clientv3.Op { return clientv3.OpDelete(common.ShardDDLOptimismOperationKeyAdapter.Encode(op.Task, op.Source, op.UpSchema, op.UpTable)) } + +// CheckOperations try to check and fix all the schema and table names for operation infos. +func CheckOperations(cli *clientv3.Client, source string, schemaMap map[string]string, tablesMap map[string]map[string]string) error { + allOperations, rev, err := GetAllOperations(cli) + if err != nil { + return err + } + + for _, taskTableOps := range allOperations { + sourceOps, ok := taskTableOps[source] + if !ok { + continue + } + for schema, tblOps := range sourceOps { + realSchema, hasChange := schemaMap[schema] + if !hasChange { + realSchema = schema + } + + tblMap := tablesMap[schema] + for tbl, info := range tblOps { + realTable, tblChange := tblMap[tbl] + if !tblChange { + realTable = tbl + tblChange = hasChange + } + if tblChange { + newOperation := info + newOperation.UpSchema = realSchema + newOperation.UpTable = realTable + _, _, err = PutOperation(cli, false, newOperation, rev) + if err != nil { + return err + } + deleteOp := deleteOperationOp(info) + _, _, err = etcdutil.DoOpsInOneTxnWithRetry(cli, deleteOp) + if err != nil { + return err + } + } + } + } + } + return err +} diff --git a/pkg/shardddl/optimism/table.go b/pkg/shardddl/optimism/table.go index 17ec7feaa8..20615e854b 100644 --- a/pkg/shardddl/optimism/table.go +++ b/pkg/shardddl/optimism/table.go @@ -307,3 +307,58 @@ func putSourceTablesOp(st SourceTables) (clientv3.Op, error) { key := common.ShardDDLOptimismSourceTablesKeyAdapter.Encode(st.Task, st.Source) return clientv3.OpPut(key, value), nil } + +// CheckSourceTables try to check and fix all the source schemas and table names. +func CheckSourceTables(cli *clientv3.Client, source string, schemaMap map[string]string, talesMap map[string]map[string]string) error { + allSourceTables, _, err := GetAllSourceTables(cli) + if err != nil { + return err + } + + for _, taskSourceTables := range allSourceTables { + sourceTables, ok := taskSourceTables[source] + if !ok { + continue + } + schemaKeys := make([]string, 0) + tblKeys := make([]string, 0) + hasChange := false + for _, tableSources := range sourceTables.Tables { + for _, sources := range tableSources { + for schema, tbls := range sources { + if _, ok := schemaMap[schema]; ok { + schemaKeys = append(schemaKeys, schema) + hasChange = true + } + + tblMap, ok := talesMap[schema] + if !ok { + continue + } + for tbl := range tbls { + if t, ok := tblMap[tbl]; ok { + tblKeys = append(tblKeys, t) + hasChange = true + } + } + for _, t := range tblKeys { + tbls[tblMap[t]] = tbls[t] + delete(tbls, t) + } + tblKeys = tblKeys[:0] + } + for _, s := range schemaKeys { + sources[schemaMap[s]] = sources[s] + delete(sources, s) + } + schemaKeys = schemaKeys[:0] + } + } + if hasChange { + if _, err = PutSourceTables(cli, sourceTables); err != nil { + return err + } + } + } + return err +} diff --git a/pkg/utils/common.go b/pkg/utils/common.go index d0b4a23621..21f5d359c9 100644 --- a/pkg/utils/common.go +++ b/pkg/utils/common.go @@ -84,7 +84,7 @@ func FetchAllDoTables(ctx context.Context, db *sql.DB, bw *filter.Filter) (map[s Name: "", // schema level }) } - ftSchemas = bw.ApplyOn(ftSchemas) + ftSchemas = bw.Apply(ftSchemas) if len(ftSchemas) == 0 { log.L().Warn("no schema need to sync") return nil, nil @@ -105,7 +105,7 @@ func FetchAllDoTables(ctx context.Context, db *sql.DB, bw *filter.Filter) (map[s Name: table, }) } - ftTables = bw.ApplyOn(ftTables) + ftTables = bw.Apply(ftTables) if len(ftTables) == 0 { log.L().Info("no tables need to sync", zap.String("schema", schema)) continue // NOTE: should we still keep it as an empty elem? @@ -153,6 +153,35 @@ func FetchTargetDoTables(ctx context.Context, db *sql.DB, bw *filter.Filter, rou return mapper, nil } +// LowerCaseTableNamesFlavor represents the type of db `lower_case_table_names` settings. +type LowerCaseTableNamesFlavor uint8 + +const ( + // LCTableNamesSensitive represent lower_case_table_names = 0, case sensitive. + LCTableNamesSensitive LowerCaseTableNamesFlavor = 0 + // LCTableNamesInsensitive represent lower_case_table_names = 1, case insensitive. + LCTableNamesInsensitive = 1 + // LCTableNamesMixed represent lower_case_table_names = 2, table names are case-sensitive, but case-insensitive in usage. + LCTableNamesMixed = 2 +) + +// FetchLowerCaseTableNamesSetting return the `lower_case_table_names` setting of target db. +func FetchLowerCaseTableNamesSetting(ctx context.Context, conn *sql.Conn) (LowerCaseTableNamesFlavor, error) { + query := "SELECT @@lower_case_table_names;" + row := conn.QueryRowContext(ctx, query) + if row.Err() != nil { + return LCTableNamesSensitive, terror.ErrDBExecuteFailed.Delegate(row.Err(), query) + } + var res uint8 + if err := row.Scan(&res); err != nil { + return LCTableNamesSensitive, terror.ErrDBExecuteFailed.Delegate(err, query) + } + if res > LCTableNamesMixed { + return LCTableNamesSensitive, terror.ErrDBUnExpect.Generate(fmt.Sprintf("invalid `lower_case_table_names` value '%d'", res)) + } + return LowerCaseTableNamesFlavor(res), nil +} + // CompareShardingDDLs compares s and t ddls // only concern in content, ignore order of ddl. func CompareShardingDDLs(s, t []string) bool { @@ -218,6 +247,22 @@ func NonRepeatStringsEqual(a, b []string) bool { return true } +// GenTableID generates table ID. +func GenTableID(schema, table string) (id string, isSchemaOnly bool) { + if len(table) == 0 { + return "`" + schema + "`", true + } + return "`" + schema + "`.`" + table + "`", false +} + +// UnpackTableID unpacks table ID to pair. +func UnpackTableID(id string) (string, string) { + parts := strings.Split(id, "`.`") + schema := strings.TrimLeft(parts[0], "`") + table := strings.TrimRight(parts[1], "`") + return schema, table +} + type session struct { sessionctx.Context vars *variable.SessionVars diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index 975b95fd0b..75aa4b068e 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -247,6 +247,9 @@ type CheckPoint interface { // String return text of global position String() string + + // CheckAndUpdate check the checkpoint data consistency and try to fix them if possible + CheckAndUpdate(ctx context.Context, schemas map[string]string, tables map[string]map[string]string) error } // RemoteCheckPoint implements CheckPoint @@ -840,6 +843,41 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error { return terror.WithScope(terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError), terror.ScopeDownstream) } +// CheckAndUpdate check the checkpoint data consistency and try to fix them if possible. +func (cp *RemoteCheckPoint) CheckAndUpdate(ctx context.Context, schemas map[string]string, tables map[string]map[string]string) error { + cp.Lock() + hasChange := false + for lcSchema, tableMap := range tables { + tableCps, ok := cp.points[lcSchema] + if !ok { + continue + } + for lcTable, table := range tableMap { + tableCp, ok := tableCps[lcTable] + if !ok { + continue + } + tableCps[table] = tableCp + delete(tableCps, lcTable) + hasChange = true + } + } + for lcSchema, schema := range schemas { + if tableCps, ok := cp.points[lcSchema]; ok { + cp.points[schema] = tableCps + delete(cp.points, lcSchema) + hasChange = true + } + } + cp.Unlock() + + if hasChange { + tctx := tcontext.NewContext(ctx, log.L()) + return cp.FlushPointsExcept(tctx, nil, nil, nil) + } + return nil +} + // LoadMeta implements CheckPoint.LoadMeta. func (cp *RemoteCheckPoint) LoadMeta() error { cp.Lock() diff --git a/syncer/ddl.go b/syncer/ddl.go index bbd152336b..88325a56bc 100644 --- a/syncer/ddl.go +++ b/syncer/ddl.go @@ -139,7 +139,7 @@ func (s *Syncer) splitAndFilterDDL( return nil, nil, terror.Annotatef(terror.ErrSyncerUnitParseStmt.New(err2.Error()), "ddl %s", sql) } - tableNames, err2 := parserpkg.FetchDDLTableNames(schema, stmt2) + tableNames, err2 := parserpkg.FetchDDLTableNames(schema, stmt2, s.SourceTableNamesFlavor) if err2 != nil { return nil, nil, err2 } @@ -183,7 +183,7 @@ func (s *Syncer) routeDDL(p *parser.Parser, schema, sql string) (string, [][]*fi return "", nil, nil, terror.Annotatef(terror.ErrSyncerUnitParseStmt.New(err.Error()), "ddl %s", sql) } - tableNames, err := parserpkg.FetchDDLTableNames(schema, stmt) + tableNames, err := parserpkg.FetchDDLTableNames(schema, stmt, s.SourceTableNamesFlavor) if err != nil { return "", nil, nil, err } @@ -210,7 +210,7 @@ func (s *Syncer) handleOnlineDDL(tctx *tcontext.Context, p *parser.Parser, schem return []string{sql}, nil, nil } - tableNames, err := parserpkg.FetchDDLTableNames(schema, stmt) + tableNames, err := parserpkg.FetchDDLTableNames(schema, stmt, s.SourceTableNamesFlavor) if err != nil { return nil, nil, err } @@ -272,10 +272,10 @@ func (s *Syncer) dropSchemaInSharding(tctx *tcontext.Context, sourceSchema strin } // delete from sharding group firstly for name, tables := range sources { - targetSchema, targetTable := UnpackTableID(name) + targetSchema, targetTable := utils.UnpackTableID(name) sourceIDs := make([]string, 0, len(tables)) for _, table := range tables { - sourceID, _ := GenTableID(table[0], table[1]) + sourceID, _ := utils.GenTableID(table[0], table[1]) sourceIDs = append(sourceIDs, sourceID) } err := s.sgk.LeaveGroup(targetSchema, targetTable, sourceIDs) diff --git a/syncer/ddl_test.go b/syncer/ddl_test.go index a3d399caff..bce7da75fc 100644 --- a/syncer/ddl_test.go +++ b/syncer/ddl_test.go @@ -523,6 +523,10 @@ func (m mockOnlinePlugin) Clear(tctx *tcontext.Context) error { func (m mockOnlinePlugin) Close() { } +func (m mockOnlinePlugin) CheckAndUpdate(tctx *tcontext.Context, schemas map[string]string, tables map[string]map[string]string) error { + return nil +} + func (s *testSyncerSuite) TestClearOnlineDDL(c *C) { var ( targetDB = "target_db" diff --git a/syncer/filter.go b/syncer/filter.go index dfff7b86c4..7726e4842e 100644 --- a/syncer/filter.go +++ b/syncer/filter.go @@ -41,7 +41,7 @@ func (s *Syncer) skipQuery(tables []*filter.Table, stmt ast.StmtNode, sql string } if len(tables) > 0 { - tbs := s.baList.ApplyOn(tables) + tbs := s.baList.Apply(tables) if len(tbs) != len(tables) { return true, nil } @@ -86,7 +86,7 @@ func (s *Syncer) skipDMLEvent(schema string, table string, eventType replication } tbs := []*filter.Table{{Schema: schema, Name: table}} - tbs = s.baList.ApplyOn(tbs) + tbs = s.baList.Apply(tbs) if len(tbs) == 0 { return true, nil } diff --git a/syncer/handle_error.go b/syncer/handle_error.go index 4271bd14a5..1c58caff8c 100644 --- a/syncer/handle_error.go +++ b/syncer/handle_error.go @@ -89,7 +89,7 @@ func (s *Syncer) genEvents(ctx context.Context, sqls []string) ([]*replication.B switch node.(type) { case ast.DDLNode: - tableNames, err := parserpkg.FetchDDLTableNames("", node) + tableNames, err := parserpkg.FetchDDLTableNames("", node, s.SourceTableNamesFlavor) if err != nil { return nil, err } diff --git a/syncer/online-ddl-tools/ghost.go b/syncer/online-ddl-tools/ghost.go index db708b6a0b..f0e54f0306 100644 --- a/syncer/online-ddl-tools/ghost.go +++ b/syncer/online-ddl-tools/ghost.go @@ -177,3 +177,8 @@ func (g *Ghost) Close() { func (g *Ghost) ResetConn(tctx *tcontext.Context) error { return g.storge.ResetConn(tctx) } + +// CheckAndUpdate try to check and fix the schema/table case-sensitive issue. +func (g *Ghost) CheckAndUpdate(tctx *tcontext.Context, schemas map[string]string, tables map[string]map[string]string) error { + return g.storge.CheckAndUpdate(tctx, schemas, tables, g.RealName) +} diff --git a/syncer/online-ddl-tools/online_ddl.go b/syncer/online-ddl-tools/online_ddl.go index 227aaf8003..c94d2d6873 100644 --- a/syncer/online-ddl-tools/online_ddl.go +++ b/syncer/online-ddl-tools/online_ddl.go @@ -65,6 +65,8 @@ type OnlinePlugin interface { Clear(tctx *tcontext.Context) error // Close closes online ddl plugin Close() + // CheckAndUpdate try to check and fix the schema/table case-sensitive issue + CheckAndUpdate(tctx *tcontext.Context, schemas map[string]string, tables map[string]map[string]string) error } // TableType is type of table. @@ -225,7 +227,12 @@ func (s *Storage) Save(tctx *tcontext.Context, ghostSchema, ghostTable, realSche return nil } info.DDLs = append(info.DDLs, ddl) - ddlsBytes, err := json.Marshal(mSchema[ghostTable]) + err := s.saveToDB(tctx, ghostSchema, ghostTable, info) + return terror.WithScope(err, terror.ScopeDownstream) +} + +func (s *Storage) saveToDB(tctx *tcontext.Context, ghostSchema, ghostTable string, ddl *GhostDDLInfo) error { + ddlsBytes, err := json.Marshal(ddl) if err != nil { return terror.ErrSyncerUnitOnlineDDLInvalidMeta.Delegate(err) } @@ -243,7 +250,10 @@ func (s *Storage) Save(tctx *tcontext.Context, ghostSchema, ghostTable, realSche func (s *Storage) Delete(tctx *tcontext.Context, ghostSchema, ghostTable string) error { s.Lock() defer s.Unlock() + return s.delete(tctx, ghostSchema, ghostTable) +} +func (s *Storage) delete(tctx *tcontext.Context, ghostSchema, ghostTable string) error { mSchema, ok := s.ddls[ghostSchema] if !ok { return nil @@ -315,3 +325,50 @@ func (s *Storage) createTable(tctx *tcontext.Context) error { _, err := s.dbConn.ExecuteSQL(tctx, []string{sql}) return terror.WithScope(err, terror.ScopeDownstream) } + +// CheckAndUpdate try to check and fix the schema/table case-sensitive issue. +func (s *Storage) CheckAndUpdate( + tctx *tcontext.Context, + schemaMap map[string]string, + tablesMap map[string]map[string]string, + realNameFn func(table string) string, +) error { + s.Lock() + defer s.Unlock() + + changedSchemas := make([]string, 0) + for schema, tblDDLInfos := range s.ddls { + realSchema, hasChange := schemaMap[schema] + if !hasChange { + realSchema = schema + } else { + changedSchemas = append(changedSchemas, schema) + } + tblMap := tablesMap[schema] + for tbl, ddlInfos := range tblDDLInfos { + realTbl, tableChange := tblMap[tbl] + if !tableChange { + realTbl = tbl + tableChange = hasChange + } + if tableChange { + targetTable := realNameFn(realTbl) + ddlInfos.Table = targetTable + err := s.saveToDB(tctx, realSchema, realTbl, ddlInfos) + if err != nil { + return err + } + err = s.delete(tctx, schema, tbl) + if err != nil { + return err + } + } + } + } + for _, schema := range changedSchemas { + ddl := s.ddls[schema] + s.ddls[schemaMap[schema]] = ddl + delete(s.ddls, schema) + } + return nil +} diff --git a/syncer/online-ddl-tools/pt_osc.go b/syncer/online-ddl-tools/pt_osc.go index 73133261f0..14d3bbc1c5 100644 --- a/syncer/online-ddl-tools/pt_osc.go +++ b/syncer/online-ddl-tools/pt_osc.go @@ -178,3 +178,8 @@ func (p *PT) Close() { func (p *PT) ResetConn(tctx *tcontext.Context) error { return p.storge.ResetConn(tctx) } + +// CheckAndUpdate try to check and fix the schema/table case-sensitive issue. +func (p *PT) CheckAndUpdate(tctx *tcontext.Context, schemas map[string]string, tables map[string]map[string]string) error { + return p.storge.CheckAndUpdate(tctx, schemas, tables, p.RealName) +} diff --git a/syncer/shardddl/optimist.go b/syncer/shardddl/optimist.go index 073176b317..01cea5788c 100644 --- a/syncer/shardddl/optimist.go +++ b/syncer/shardddl/optimist.go @@ -202,3 +202,29 @@ func (o *Optimist) PendingOperation() *optimism.Operation { op := *o.pendingOp return &op } + +// CheckPersistentData check and fix the persistent data. +// +// NOTE: currently this function is not used because user will meet error at early version +// if set unsupported case-sensitive. +func (o *Optimist) CheckPersistentData(source string, schemas map[string]string, tables map[string]map[string]string) error { + if o.cli == nil { + return nil + } + err := optimism.CheckSourceTables(o.cli, source, schemas, tables) + if err != nil { + return err + } + + err = optimism.CheckDDLInfos(o.cli, source, schemas, tables) + if err != nil { + return err + } + + err = optimism.CheckOperations(o.cli, source, schemas, tables) + if err != nil { + return err + } + + return optimism.CheckColumns(o.cli, source, schemas, tables) +} diff --git a/syncer/sharding-meta/shardmeta.go b/syncer/sharding-meta/shardmeta.go index 40cd556688..c0b0c3c70b 100644 --- a/syncer/sharding-meta/shardmeta.go +++ b/syncer/sharding-meta/shardmeta.go @@ -276,3 +276,74 @@ func (meta *ShardingMeta) FlushData(sourceID, tableID string) ([]string, [][]int } return sqls, args } + +func (meta *ShardingMeta) genRemoveSQL(sourceID, tableID string) (string, []interface{}) { + sql := fmt.Sprintf("DELETE FROM %s where source_id=? and target_table_id=?", meta.tableName) + return sql, []interface{}{sourceID, tableID} +} + +// CheckAndUpdate check and fix schema and table names for all the sharding groups. +func (meta *ShardingMeta) CheckAndUpdate(targetID string, schemaMap map[string]string, tablesMap map[string]map[string]string) ([]string, [][]interface{}, error) { + if len(schemaMap) == 0 && len(tablesMap) == 0 { + return nil, nil, nil + } + + checkSourceID := func(source string) (string, bool) { + schemaName, tblName := utils.UnpackTableID(source) + realSchema, changed := schemaMap[schemaName] + if !changed { + realSchema = schemaName + } + tblMap := tablesMap[schemaName] + realTable, ok := tblMap[tblName] + if ok { + changed = true + } else { + realTable = tblName + } + newID, _ := utils.GenTableID(realSchema, realTable) + return newID, changed + } + + for _, item := range meta.global.Items { + newID, changed := checkSourceID(item.Source) + if changed { + item.Source = newID + } + } + + sourceIDsMap := make(map[string]string) + for sourceID, seqs := range meta.sources { + newSourceID, changed := checkSourceID(sourceID) + for _, item := range seqs.Items { + newID, hasChanged := checkSourceID(item.Source) + if hasChanged { + item.Source = newID + changed = true + } + } + if changed { + sourceIDsMap[sourceID] = newSourceID + } + } + var ( + sqls []string + args [][]interface{} + ) + for oldID, newID := range sourceIDsMap { + if oldID != newID { + seqs := meta.sources[oldID] + delete(meta.sources, oldID) + meta.sources[newID] = seqs + removeSQL, arg := meta.genRemoveSQL(oldID, targetID) + sqls = append(sqls, removeSQL) + args = append(args, arg) + } + log.L().Info("fix sharding meta", zap.String("old", oldID), zap.String("new", newID)) + fixedSQLs, fixedArgs := meta.FlushData(newID, targetID) + sqls = append(sqls, fixedSQLs...) + args = append(args, fixedArgs...) + } + + return sqls, args, nil +} diff --git a/syncer/sharding_group.go b/syncer/sharding_group.go index 0269260e37..f3fb51b54e 100644 --- a/syncer/sharding_group.go +++ b/syncer/sharding_group.go @@ -72,7 +72,6 @@ package syncer import ( "fmt" - "strings" "sync" "github.com/pingcap/dm/dm/config" @@ -82,6 +81,7 @@ import ( tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/cputil" "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/dm/syncer/dbconn" shardmeta "github.com/pingcap/dm/syncer/sharding-meta" @@ -281,7 +281,7 @@ func (sg *ShardingGroup) Tables() [][]string { sources := sg.Sources() tables := make([][]string, 0, len(sources)) for id := range sources { - schema, table := UnpackTableID(id) + schema, table := utils.UnpackTableID(id) tables = append(tables, []string{schema, table}) } return tables @@ -300,7 +300,7 @@ func (sg *ShardingGroup) UnresolvedTables() [][]string { tables := make([][]string, 0, len(sg.sources)) for id := range sg.sources { - schema, table := UnpackTableID(id) + schema, table := utils.UnpackTableID(id) tables = append(tables, []string{schema, table}) } return tables @@ -365,22 +365,6 @@ func (sg *ShardingGroup) FlushData(targetTableID string) ([]string, [][]interfac return sg.meta.FlushData(sg.sourceID, targetTableID) } -// GenTableID generates table ID. -func GenTableID(schema, table string) (id string, isSchemaOnly bool) { - if len(table) == 0 { - return fmt.Sprintf("`%s`", schema), true - } - return fmt.Sprintf("`%s`.`%s`", schema, table), false -} - -// UnpackTableID unpacks table ID to pair. -func UnpackTableID(id string) (string, string) { - parts := strings.Split(id, "`.`") - schema := strings.TrimLeft(parts[0], "`") - table := strings.TrimRight(parts[1], "`") - return schema, table -} - // ShardingGroupKeeper used to keep ShardingGroup. type ShardingGroupKeeper struct { sync.RWMutex @@ -414,8 +398,8 @@ func NewShardingGroupKeeper(tctx *tcontext.Context, cfg *config.SubTaskConfig) * func (k *ShardingGroupKeeper) AddGroup(targetSchema, targetTable string, sourceIDs []string, meta *shardmeta.ShardingMeta, merge bool) (needShardingHandle bool, group *ShardingGroup, synced bool, remain int, err error) { // if need to support target table-level sharding DDL // we also need to support target schema-level sharding DDL - schemaID, _ := GenTableID(targetSchema, "") - targetTableID, _ := GenTableID(targetSchema, targetTable) + schemaID, _ := utils.GenTableID(targetSchema, "") + targetTableID, _ := utils.GenTableID(targetSchema, targetTable) k.Lock() defer k.Unlock() @@ -479,8 +463,8 @@ func (k *ShardingGroupKeeper) ResetGroups() { // LeaveGroup leaves group according to target schema, table and source IDs // LeaveGroup doesn't affect in syncing process. func (k *ShardingGroupKeeper) LeaveGroup(targetSchema, targetTable string, sources []string) error { - schemaID, _ := GenTableID(targetSchema, "") - targetTableID, _ := GenTableID(targetSchema, targetTable) + schemaID, _ := utils.GenTableID(targetSchema, "") + targetTableID, _ := utils.GenTableID(targetSchema, targetTable) k.Lock() defer k.Unlock() if group, ok := k.groups[targetTableID]; ok { @@ -512,7 +496,7 @@ func (k *ShardingGroupKeeper) LeaveGroup(targetSchema, targetTable string, sourc func (k *ShardingGroupKeeper) TrySync( targetSchema, targetTable, source string, location, endLocation binlog.Location, ddls []string) ( needShardingHandle bool, group *ShardingGroup, synced, active bool, remain int, err error) { - targetTableID, schemaOnly := GenTableID(targetSchema, targetTable) + targetTableID, schemaOnly := utils.GenTableID(targetSchema, targetTable) if schemaOnly { // NOTE: now we don't support syncing for schema only sharding DDL return false, nil, true, false, 0, nil @@ -562,7 +546,7 @@ func (k *ShardingGroupKeeper) UnresolvedTables() (map[string]bool, [][]string) { // Group returns target table's group, nil if not exist. func (k *ShardingGroupKeeper) Group(targetSchema, targetTable string) *ShardingGroup { - targetTableID, _ := GenTableID(targetSchema, targetTable) + targetTableID, _ := utils.GenTableID(targetSchema, targetTable) k.RLock() defer k.RUnlock() return k.groups[targetTableID] @@ -741,6 +725,26 @@ func (k *ShardingGroupKeeper) LoadShardMeta(flavor string, enableGTID bool) (map return meta, terror.WithScope(terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError), terror.ScopeDownstream) } +// CheckAndFix try to check and fix the schema/table case-sensitive issue. +// +// NOTE: CheckAndFix is called before sharding groups are inited. +func (k *ShardingGroupKeeper) CheckAndFix(metas map[string]*shardmeta.ShardingMeta, schemaMap map[string]string, tablesMap map[string]map[string]string) error { + k.Lock() + defer k.Unlock() + for targetID, meta := range metas { + sqls, args, err := meta.CheckAndUpdate(targetID, schemaMap, tablesMap) + if err != nil { + return err + } + _, err = k.dbConn.ExecuteSQL(k.tctx, sqls, args...) + if err != nil { + return err + } + } + + return nil +} + // ShardingReSync represents re-sync info for a sharding DDL group. type ShardingReSync struct { currLocation binlog.Location // current DDL's binlog location, initialize to first DDL's location diff --git a/syncer/sharding_group_test.go b/syncer/sharding_group_test.go index 9a555dc76d..ef2e021452 100644 --- a/syncer/sharding_group_test.go +++ b/syncer/sharding_group_test.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/dm/pkg/cputil" "github.com/pingcap/dm/pkg/retry" "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/dm/syncer/dbconn" ) @@ -223,8 +224,8 @@ func (t *testShardingGroupSuite) TestTableID(c *C) { } for _, ca := range cases { // ignore isSchemaOnly - id, _ := GenTableID(ca[0], ca[1]) - schema, table := UnpackTableID(id) + id, _ := utils.GenTableID(ca[0], ca[1]) + schema, table := utils.UnpackTableID(id) c.Assert(schema, Equals, ca[0]) c.Assert(table, Equals, ca[1]) } diff --git a/syncer/syncer.go b/syncer/syncer.go index 6bbb06a19e..ad735bb713 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -201,6 +201,9 @@ type Syncer struct { addJobFunc func(*job) error + // `lower_case_table_names` setting of upstream db + SourceTableNamesFlavor utils.LowerCaseTableNamesFlavor + tsOffset atomic.Int64 // time offset between upstream and syncer, DM's timestamp - MySQL's timestamp secondsBehindMaster atomic.Int64 // current task delay second behind upstream workerLagMap map[string]*atomic.Int64 // worker's sync lag key:WorkerLagKey val: lag @@ -341,21 +344,30 @@ func (s *Syncer) Init(ctx context.Context) (err error) { return err } + var schemaMap map[string]string + var tableMap map[string]map[string]string + if s.SourceTableNamesFlavor == utils.LCTableNamesSensitive { + // TODO: we should avoid call this function multi times + allTables, err1 := utils.FetchAllDoTables(ctx, s.fromDB.BaseDB.DB, s.baList) + if err1 != nil { + return err1 + } + schemaMap, tableMap = buildLowerCaseTableNamesMap(allTables) + } + switch s.cfg.ShardMode { case config.ShardPessimistic: err = s.sgk.Init() if err != nil { return err } - - err = s.initShardingGroups(ctx) + err = s.initShardingGroups(ctx, true) if err != nil { return err } rollbackHolder.Add(fr.FuncRollback{Name: "close-sharding-group-keeper", Fn: s.sgk.Close}) case config.ShardOptimistic: - err = s.initOptimisticShardDDL(ctx) - if err != nil { + if err = s.initOptimisticShardDDL(ctx); err != nil { return err } } @@ -370,6 +382,17 @@ func (s *Syncer) Init(ctx context.Context) (err error) { if err != nil { return err } + if s.SourceTableNamesFlavor == utils.LCTableNamesSensitive { + if err = s.checkpoint.CheckAndUpdate(ctx, schemaMap, tableMap); err != nil { + return err + } + + if s.onlineDDL != nil { + if err = s.onlineDDL.CheckAndUpdate(s.tctx, schemaMap, tableMap); err != nil { + return err + } + } + } if s.cfg.EnableHeartbeat { s.heartbeat, err = GetHeartbeat(&HeartbeatConfig{ serverID: s.cfg.ServerID, @@ -398,9 +421,55 @@ func (s *Syncer) Init(ctx context.Context) (err error) { return nil } +// buildLowerCaseTableNamesMap build a lower case schema map and lower case table map for all tables +// Input: map of schema --> list of tables +// Output: schema names map: lower_case_schema_name --> schema_name +// tables names map: lower_case_schema_name --> lower_case_table_name --> table_name +// Note: the result will skip the schemas and tables that their lower_case_name are the same. +func buildLowerCaseTableNamesMap(tables map[string][]string) (map[string]string, map[string]map[string]string) { + schemaMap := make(map[string]string) + tablesMap := make(map[string]map[string]string) + lowerCaseSchemaSet := make(map[string]string) + for schema, tableNames := range tables { + lcSchema := strings.ToLower(schema) + // track if there are multiple schema names with the same lower case name. + // just skip this kind of schemas. + if rawSchema, ok := lowerCaseSchemaSet[lcSchema]; ok { + delete(schemaMap, lcSchema) + delete(tablesMap, lcSchema) + log.L().Warn("skip check schema with same lower case value", + zap.Strings("schemas", []string{schema, rawSchema})) + continue + } + lowerCaseSchemaSet[lcSchema] = schema + + if lcSchema != schema { + schemaMap[lcSchema] = schema + } + tblsMap := make(map[string]string) + lowerCaseTableSet := make(map[string]string) + for _, tb := range tableNames { + lcTbl := strings.ToLower(tb) + if rawTbl, ok := lowerCaseTableSet[lcTbl]; ok { + delete(tblsMap, lcTbl) + log.L().Warn("skip check tables with same lower case value", zap.String("schema", schema), + zap.Strings("table", []string{tb, rawTbl})) + continue + } + if lcTbl != tb { + tblsMap[lcTbl] = tb + } + } + if len(tblsMap) > 0 { + tablesMap[lcSchema] = tblsMap + } + } + return schemaMap, tablesMap +} + // initShardingGroups initializes sharding groups according to source MySQL, filter rules and router rules // NOTE: now we don't support modify router rules after task has started. -func (s *Syncer) initShardingGroups(ctx context.Context) error { +func (s *Syncer) initShardingGroups(ctx context.Context, needCheck bool) error { // fetch tables from source and filter them sourceTables, err := s.fromDB.fetchAllDoTables(ctx, s.baList) if err != nil { @@ -422,7 +491,7 @@ func (s *Syncer) initShardingGroups(ctx context.Context) error { if !ok { mSchema[targetTable] = make([]string, 0, len(tables)) } - ID, _ := GenTableID(schema, table) + ID, _ := utils.GenTableID(schema, table) mSchema[targetTable] = append(mSchema[targetTable], ID) } } @@ -431,11 +500,18 @@ func (s *Syncer) initShardingGroups(ctx context.Context) error { if err2 != nil { return err2 } + if needCheck && s.SourceTableNamesFlavor == utils.LCTableNamesSensitive { + // try fix persistent data before init + schemaMap, tableMap := buildLowerCaseTableNamesMap(sourceTables) + if err2 = s.sgk.CheckAndFix(loadMeta, schemaMap, tableMap); err2 != nil { + return err2 + } + } // add sharding group for targetSchema, mSchema := range mapper { for targetTable, sourceIDs := range mSchema { - tableID, _ := GenTableID(targetSchema, targetTable) + tableID, _ := utils.GenTableID(targetSchema, targetTable) _, _, _, _, err := s.sgk.AddGroup(targetSchema, targetTable, sourceIDs, loadMeta[tableID], false) if err != nil { return err @@ -1891,7 +1967,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err } if s.cfg.ShardMode == config.ShardPessimistic { - source, _ := GenTableID(originSchema, originTable) + source, _ := utils.GenTableID(originSchema, originTable) if s.sgk.InSyncing(schemaName, tableName, source, *ec.currentLocation) { // if in unsync stage and not before active DDL, ignore it // if in sharding re-sync stage and not before active DDL (the next DDL to be synced), ignore it @@ -2031,7 +2107,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext, o // if err2 != nil, stmts should be nil so below for-loop is skipped for _, stmt := range stmts { if _, ok := stmt.(ast.DDLNode); ok { - tableNames, err3 := parserpkg.FetchDDLTableNames(usedSchema, stmt) + tableNames, err3 := parserpkg.FetchDDLTableNames(usedSchema, stmt, s.SourceTableNamesFlavor) if err3 != nil { continue } @@ -2133,7 +2209,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext, o } continue case *ast.DropTableStmt: - sourceID, _ := GenTableID(tableNames[0][0].Schema, tableNames[0][0].Name) + sourceID, _ := utils.GenTableID(tableNames[0][0].Schema, tableNames[0][0].Name) err = s.sgk.LeaveGroup(tableNames[1][0].Schema, tableNames[1][0].Name, []string{sourceID}) if err != nil { return err @@ -2268,7 +2344,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext, o // so when restarting before sharding DDLs synced, this binlog can be re-sync again to trigger the TrySync startLocation := ec.startLocation - source, _ = GenTableID(ddlInfo.tableNames[0][0].Schema, ddlInfo.tableNames[0][0].Name) + source, _ = utils.GenTableID(ddlInfo.tableNames[0][0].Schema, ddlInfo.tableNames[0][0].Name) var annotate string switch ddlInfo.stmt.(type) { @@ -2311,7 +2387,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext, o } if needShardingHandle { - target, _ := GenTableID(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) + target, _ := utils.GenTableID(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) metrics.UnsyncedTableGauge.WithLabelValues(s.cfg.Name, target, s.cfg.SourceID).Set(float64(remain)) err = ec.safeMode.IncrForTable(ec.tctx, ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) // try enable safe-mode when starting syncing for sharding group if err != nil { @@ -2759,6 +2835,15 @@ func (s *Syncer) createDBs(ctx context.Context) error { if err != nil { return err } + conn, err := s.fromDB.BaseDB.GetBaseConn(ctx) + if err != nil { + return err + } + lcFlavor, err := utils.FetchLowerCaseTableNamesSetting(ctx, conn.DBConn) + if err != nil { + return err + } + s.SourceTableNamesFlavor = lcFlavor hasSQLMode := false // get sql_mode from upstream db @@ -3034,7 +3119,7 @@ func (s *Syncer) Update(cfg *config.SubTaskConfig) error { return err } - err = s.initShardingGroups(context.Background()) // FIXME: fix context when re-implementing `Update` + err = s.initShardingGroups(context.Background(), false) // FIXME: fix context when re-implementing `Update` if err != nil { return err } diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 515773f7d4..d6ade0d771 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -296,7 +296,7 @@ func (s *testSyncerSuite) TestSelectDB(c *C) { stmt, err := p.ParseOneStmt(query, "", "") c.Assert(err, IsNil) - tableNames, err := parserpkg.FetchDDLTableNames(string(ev.Schema), stmt) + tableNames, err := parserpkg.FetchDDLTableNames(string(ev.Schema), stmt, syncer.SourceTableNamesFlavor) c.Assert(err, IsNil) r, err := syncer.skipQuery(tableNames, stmt, query) @@ -434,7 +434,7 @@ func (s *testSyncerSuite) TestIgnoreDB(c *C) { stmt, err := p.ParseOneStmt(sql, "", "") c.Assert(err, IsNil) - tableNames, err := parserpkg.FetchDDLTableNames(sql, stmt) + tableNames, err := parserpkg.FetchDDLTableNames(sql, stmt, syncer.SourceTableNamesFlavor) c.Assert(err, IsNil) r, err := syncer.skipQuery(tableNames, stmt, sql) c.Assert(err, IsNil) @@ -1595,7 +1595,7 @@ func checkEventWithTableResult(c *C, syncer *Syncer, allEvents []*replication.Bi stmt, err := p.ParseOneStmt(sql, "", "") c.Assert(err, IsNil) - tableNames, err := parserpkg.FetchDDLTableNames(string(ev.Schema), stmt) + tableNames, err := parserpkg.FetchDDLTableNames(string(ev.Schema), stmt, syncer.SourceTableNamesFlavor) c.Assert(err, IsNil) r, err := syncer.skipQuery(tableNames, stmt, sql) c.Assert(err, IsNil) diff --git a/tests/_utils/check_metric b/tests/_utils/check_metric index 5a2b761121..600d1d0495 100755 --- a/tests/_utils/check_metric +++ b/tests/_utils/check_metric @@ -17,7 +17,7 @@ shift 3 counter=0 while [ $counter -lt $retry_count ]; do metric=$(curl -s http://127.0.0.1:$port/metrics | grep $metric_name | grep -v "#" | awk '{printf("%d",$2)}') - if [ $lower -lt $metric ] && [ $metric -lt $upper ]; then + if [ -n $metric ] && [ $lower -lt $metric ] && [ $metric -lt $upper ]; then exit 0 fi ((counter += 1)) diff --git a/tests/case_sensitive/conf/diff_config.toml b/tests/case_sensitive/conf/diff_config.toml new file mode 100644 index 0000000000..5e9f2123ce --- /dev/null +++ b/tests/case_sensitive/conf/diff_config.toml @@ -0,0 +1,17 @@ +# diff Configuration. + +log-level = "info" + +chunk-size = 1000 + +check-thread-count = 4 + +sample-percent = 100 + +use-checksum = true + +fix-sql-file = "fix.sql" + +dm-addr = "http://127.0.0.1:8261" + +dm-task = "test" diff --git a/tests/case_sensitive/conf/dm-master.toml b/tests/case_sensitive/conf/dm-master.toml new file mode 100644 index 0000000000..53a294e7d0 --- /dev/null +++ b/tests/case_sensitive/conf/dm-master.toml @@ -0,0 +1,6 @@ +# Master Configuration. +master-addr = ":8261" +advertise-addr = "127.0.0.1:8261" + +rpc-timeout = "30s" +auto-compaction-retention = "3s" diff --git a/tests/case_sensitive/conf/dm-task.yaml b/tests/case_sensitive/conf/dm-task.yaml new file mode 100644 index 0000000000..c187f61a07 --- /dev/null +++ b/tests/case_sensitive/conf/dm-task.yaml @@ -0,0 +1,78 @@ +--- +name: test +task-mode: all +is-sharding: false +meta-schema: "dm_meta" +heartbeat-update-interval: 1 +heartbeat-report-interval: 1 +case-sensitive: true + +target-database: + host: "127.0.0.1" + port: 4000 + user: "root" + password: "" + session: + tidb_skip_utf8_check: 1 + tidb_disable_txn_auto_retry: off + tidb_retry_limit: "10" + +mysql-instances: + - source-id: "mysql-replica-01" + black-white-list: "instance" # compatible with deprecated config + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + route-rules: ["global"] + filter-rules: ["global"] + + - source-id: "mysql-replica-02" + block-allow-list: "instance" + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + route-rules: ["global"] + filter-rules: ["global"] + +black-white-list: # compatible with deprecated config + instance: + do-dbs: ["Upper_DB*", "lower_db"] + do-tables: + - db-name: "Upper_DB*" + tbl-name: "Do_Table*" + - db-name: "Upper_DB" + tbl-name: "lower_table*" + - db-name: "lower_db" + tbl-name: "Upper_Table*" + +routes: + global: + schema-pattern: "Upper_DB" + table-pattern: "Do_Table" + target-schema: "UPPER_DB_ROUTE" + target-table: "do_table_route" + +filters: + global: + schema-pattern: "Upper_DB*" + table-pattern: "Do_Table*" + events: ["truncate table"] + action: Ignore + +mydumpers: + global: + threads: 4 + chunk-filesize: 64 + skip-tz-utc: true + extra-args: "" + +loaders: + global: + pool-size: 16 + dir: "./dumped_data" + +syncers: + global: + worker-count: 16 + batch: 100 + enable-ansi-quotes: false # compatible with deprecated config diff --git a/tests/case_sensitive/conf/dm-worker1.toml b/tests/case_sensitive/conf/dm-worker1.toml new file mode 100644 index 0000000000..7a72ea72bf --- /dev/null +++ b/tests/case_sensitive/conf/dm-worker1.toml @@ -0,0 +1,2 @@ +name = "worker1" +join = "127.0.0.1:8261" diff --git a/tests/case_sensitive/conf/dm-worker2.toml b/tests/case_sensitive/conf/dm-worker2.toml new file mode 100644 index 0000000000..010e21c73e --- /dev/null +++ b/tests/case_sensitive/conf/dm-worker2.toml @@ -0,0 +1,2 @@ +name = "worker2" +join = "127.0.0.1:8261" diff --git a/tests/case_sensitive/conf/source1.yaml b/tests/case_sensitive/conf/source1.yaml new file mode 100644 index 0000000000..7712f0200c --- /dev/null +++ b/tests/case_sensitive/conf/source1.yaml @@ -0,0 +1,18 @@ +# MySQL Configuration. + +source-id: mysql-replica-01 +flavor: '' +enable-gtid: true +enable-relay: false +relay-binlog-name: '' +relay-binlog-gtid: '' +from: + host: 127.0.0.1 + user: root + password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= + port: 3306 +checker: + check-enable: true + backoff-rollback: 5m + backoff-max: 5m + diff --git a/tests/case_sensitive/conf/source2.yaml b/tests/case_sensitive/conf/source2.yaml new file mode 100644 index 0000000000..32f44eb948 --- /dev/null +++ b/tests/case_sensitive/conf/source2.yaml @@ -0,0 +1,21 @@ +# MySQL Configuration. + +source-id: mysql-replica-02 +flavor: '' +enable-gtid: true +enable-relay: false +relay-binlog-name: '' +relay-binlog-gtid: '' +from: + host: 127.0.0.1 + user: root + password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= + port: 3307 + + +# let dm-worker2 use the default config for checker +#checker: +# check-enable: true +# backoff-rollback: 5m +# backoff-max: 5m + diff --git a/tests/case_sensitive/data/db1.increment.sql b/tests/case_sensitive/data/db1.increment.sql new file mode 100644 index 0000000000..dc89a519c9 --- /dev/null +++ b/tests/case_sensitive/data/db1.increment.sql @@ -0,0 +1,22 @@ +use Upper_DB; +insert into Do_Table (id, name, dt, ts) values (100, 'Eddard Stark', now(), '2021-05-11 12:01:05'); +alter table Do_Table drop column `dt`; +insert into Do_Table (id, name, ts) values (101, 'Spike', '2021-05-31 12:01:05'); +alter table Do_Table add index `idx_name_ts` (`name`, `ts`); +insert into Do_Table (id, name, ts) values (102, 'Jet', '2021-05-31 13:01:00'); + +use Upper_DB1; +insert into Do_Table1 (id, name) values (3, 'third'); +alter table Do_Table1 add column info json; +alter table Do_Table1 add column gen_id int as (info->'$.id'); +alter table Do_Table1 add index multi_col(`name`, `gen_id`); +insert into Do_Table1 (id, name, info) values (4, 'gentest', '{"id": 123}'); +insert into Do_Table1 (id, name, info) values (5, 'gentest', '{"id": 124}'); +update Do_Table1 set info = '{"id": 120}' where id = 1; +update Do_Table1 set info = '{"id": 121}' where id = 2; +update Do_Table1 set info = '{"id": 122}' where id = 3; + +use `Upper_Db_IGNORE`; +insert into ignore_table (id) values (0); +alter table ignore_table add column description varchar(32); +insert into ignore_table (id, description) values (0, 'ignored'); diff --git a/tests/case_sensitive/data/db1.prepare.sql b/tests/case_sensitive/data/db1.prepare.sql new file mode 100644 index 0000000000..05e34769d1 --- /dev/null +++ b/tests/case_sensitive/data/db1.prepare.sql @@ -0,0 +1,31 @@ +drop database if exists `Upper_DB`; +create database `Upper_DB`; +use `Upper_DB`; +create table Do_Table ( + id int NOT NULL AUTO_INCREMENT, + name varchar(20), + dt datetime, + ts timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (id)); +-- test ANSI_QUOTES works with quote in string +insert into Do_Table (id, name, dt, ts) values (1, 'ar"ya', now(), now()), (2, 'catelyn', '2021-05-11 10:01:05', '2021-05-11 10:01:05'); + +create table Do_table_ignore(id int PRIMARY KEY); +insert into Do_table_ignore values (1); + +create table lower_table (id int NOT NULL, val varchar(12), PRIMARY KEY (id)); +insert into lower_table (id, val) values (0, 'lalala'); + +-- should be ignored +create table lower_Table1 (id int NOT NULL PRIMARY KEY); + +drop database if exists `Upper_DB1`; +create database `Upper_DB1`; +use `Upper_DB1`; +create table Do_Table1 (id int NOT NULL AUTO_INCREMENT, name varchar(20), PRIMARY KEY (id)); +insert into Do_Table1 (id, name) values (1, 'test'), (2, 'test2'); + +drop database IF EXISTS `Upper_Db_IGNORE`; +create database `Upper_Db_IGNORE`; +use `Upper_Db_IGNORE`; +create table `ignore_table`(id int); \ No newline at end of file diff --git a/tests/case_sensitive/data/db2.increment.sql b/tests/case_sensitive/data/db2.increment.sql new file mode 100644 index 0000000000..c48c0f4780 --- /dev/null +++ b/tests/case_sensitive/data/db2.increment.sql @@ -0,0 +1,8 @@ +use lower_db; +delete from Upper_Table where name = 'Sansa'; + +-- test sql_mode=NO_AUTO_VALUE_ON_ZERO +insert into Upper_Table (id, name) values (0,'haha'); + +use `lower_db_ignore`; +insert into `Upper_Table` (id) values (1); \ No newline at end of file diff --git a/tests/case_sensitive/data/db2.prepare.sql b/tests/case_sensitive/data/db2.prepare.sql new file mode 100644 index 0000000000..98efbb4b44 --- /dev/null +++ b/tests/case_sensitive/data/db2.prepare.sql @@ -0,0 +1,17 @@ +drop database if exists `lower_db`; +create database `lower_db`; +use `lower_db`; +create table Upper_Table ( + id int NOT NULL AUTO_INCREMENT, + name varchar(20), + ts timestamp, + PRIMARY KEY (id)); +insert into Upper_Table (name, ts) values ('Arya', now()), ('Bran', '2021-05-11 10:01:05'), ('Sansa', NULL); + +create table upper_table(id int NOT NULL PRIMARY KEY); + +-- test block-allow-list +drop database if exists `lower_db_ignore`; +create database `lower_db_ignore`; +use `lower_db_ignore`; +create table `Upper_Table`(id int); diff --git a/tests/case_sensitive/run.sh b/tests/case_sensitive/run.sh new file mode 100755 index 0000000000..3ad86e6821 --- /dev/null +++ b/tests/case_sensitive/run.sh @@ -0,0 +1,119 @@ +#!/bin/bash + +set -eu + +cur=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $cur/../_utils/test_prepare +WORK_DIR=$TEST_DIR/$TEST_NAME +API_VERSION="v1alpha1" + +function run() { + run_sql_both_source "SET @@GLOBAL.SQL_MODE='ANSI_QUOTES,NO_AUTO_VALUE_ON_ZERO'" + inject_points=( + "github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")" + "github.com/pingcap/dm/relay/NewUpstreamServer=return(true)" + ) + export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})" + + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + check_contains 'Query OK, 2 rows affected' + run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + check_contains 'Query OK, 3 rows affected' + # manually create the route table + run_sql 'CREATE DATABASE IF NOT EXISTS `UPPER_DB_ROUTE`' $TIDB_PORT $TIDB_PASSWORD + + # start DM worker and master + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + check_metric $MASTER_PORT 'start_leader_counter' 3 0 2 + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + # operate mysql config to worker + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml + # make sure source1 is bound to worker1 + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 + + # start DM task only + cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml + dmctl_start_task "$WORK_DIR/dm-task.yaml" "--remove-meta" + # check task has started + check_metric $WORKER1_PORT "dm_worker_task_state{source_id=\"mysql-replica-01\",task=\"test\"}" 3 1 3 + check_metric $WORKER2_PORT "dm_worker_task_state{source_id=\"mysql-replica-02\",task=\"test\"}" 3 1 3 + + # use sync_diff_inspector to check full dump loader + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + # restart dm-worker1 + pkill -hup -f dm-worker1.toml 2>/dev/null || true + wait_pattern_exit dm-worker1.toml + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + # make sure worker1 have bound a source, and the source should same with bound before + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "worker1" 1 + + # restart dm-worker2 + pkill -hup -f dm-worker2.toml 2>/dev/null || true + wait_pattern_exit dm-worker2.toml + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + check_metric $WORKER1_PORT "dm_worker_task_state{source_id=\"mysql-replica-01\",task=\"test\"}" 3 1 3 + check_metric $WORKER2_PORT "dm_worker_task_state{source_id=\"mysql-replica-02\",task=\"test\"}" 3 1 3 + + sleep 10 + echo "after restart dm-worker, task should resume automatically" + + # wait for task running + check_http_alive 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/status/test '"stage": "Running"' 10 + sleep 2 # still wait for subtask running on other dm-workers + + run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + + # use sync_diff_inspector to check data now! + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + # test block-allow-list by the way + run_sql "show databases;" $TIDB_PORT $TIDB_PASSWORD + check_not_contains "Upper_Db_IGNORE" + check_contains "Upper_DB1" + check_contains "lower_db" + # test route-rule + check_contains "UPPER_DB_ROUTE" + + run_sql "show tables from UPPER_DB_ROUTE" $TIDB_PORT $TIDB_PASSWORD + check_contains "do_table_route" + check_not_contains "Do_table_ignore" + + run_sql "select count(*) from UPPER_DB_ROUTE.do_table_route" $TIDB_PORT $TIDB_PASSWORD + # ensure the truncate is ignored and the new row is inserted + check_contains "count(*): 5" + # test binlog event filter + run_sql "truncate table Upper_DB.Do_Table" $MYSQL_PORT1 $MYSQL_PASSWORD1 + # insert another row + run_sql "INSERT INTO Upper_DB.Do_Table (id, name) values (103, 'new');" $MYSQL_PORT1 $MYSQL_PASSWORD1 + sleep 2 + run_sql "select count(*) from UPPER_DB_ROUTE.do_table_route" $TIDB_PORT $TIDB_PASSWORD + # ensure the truncate is ignored and the new row is inserted + check_contains "count(*): 6" + + export GO_FAILPOINTS='' +} + +trap cleanup_process EXIT +trap "cleanup_data Upper_DB Upper_DB1 lower_db UPPER_DB_ROUTE sync_diff_inspector" EXIT + +# cleanup_data all_mode +# also cleanup dm processes in case of last run failed +cleanup_process $* +cleanup_data Upper_DB Upper_DB1 lower_db UPPER_DB_ROUTE +run $* + +echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>" diff --git a/tests/others_integration.txt b/tests/others_integration.txt index 9ece51e2b2..a85703c683 100644 --- a/tests/others_integration.txt +++ b/tests/others_integration.txt @@ -11,3 +11,4 @@ downstream_more_column expression_filter fake_rotate_event metrics +case_sensitive diff --git a/tests/start_task/run.sh b/tests/start_task/run.sh index f96d33164a..b713b7d2d1 100644 --- a/tests/start_task/run.sh +++ b/tests/start_task/run.sh @@ -103,12 +103,14 @@ function run() { "ERROR" 1 echo "reset go failpoints, and need restart dm-worker, then start task again" + kill_dm_worker kill_dm_master export GO_FAILPOINTS='' - run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT sleep 5 dmctl_start_task_standalone $task_conf diff --git a/tests/tiup/conf/diff_config.toml b/tests/tiup/conf/diff_config.toml index 881528c575..6005b3b7b8 100644 --- a/tests/tiup/conf/diff_config.toml +++ b/tests/tiup/conf/diff_config.toml @@ -12,13 +12,13 @@ is-sharding = true [[table-config.source-tables]] instance-id = "source-1" -schema = "sharding1" -table = "~t.*" +schema = "Sharding1" +table = "~[Tt].*" [[table-config.source-tables]] instance-id = "source-2" schema = "sharding2" -table = "~t.*" +table = "~[Tt].*" [[source-db]] host = "mysql1" diff --git a/tests/tiup/conf/diff_config_pessimistic.toml b/tests/tiup/conf/diff_config_pessimistic.toml index 7044266457..7126be9d27 100644 --- a/tests/tiup/conf/diff_config_pessimistic.toml +++ b/tests/tiup/conf/diff_config_pessimistic.toml @@ -12,13 +12,13 @@ is-sharding = true [[table-config.source-tables]] instance-id = "source-1" -schema = "pes_sharding1" -table = "~t.*" +schema = "pes_Sharding1" +table = "~[Tt].*" [[table-config.source-tables]] instance-id = "source-2" schema = "pes_sharding2" -table = "~t.*" +table = "~[Tt].*" [[source-db]] host = "mysql1" diff --git a/tests/tiup/conf/task.yaml b/tests/tiup/conf/task.yaml index 89b7c8e2fb..4d23e46e29 100644 --- a/tests/tiup/conf/task.yaml +++ b/tests/tiup/conf/task.yaml @@ -3,6 +3,7 @@ name: upgrade_via_tiup task-mode: all is-sharding: true enable-heartbeat: true +case-sensitive: true target-database: host: tidb @@ -21,18 +22,18 @@ mysql-instances: black-white-list: instance: - do-dbs: ["~^sharding[\\d]+"] + do-dbs: ["~^[sS]harding[\\d]+"] do-tables: - - db-name: "~^sharding[\\d]+" - tbl-name: "~^t[\\d]+" + - db-name: "~^[sS]harding[\\d]+" + tbl-name: "~^[tT][\\d]+" routes: sharding-route-rules-table: - schema-pattern: sharding* - table-pattern: t* + schema-pattern: "[sS]harding*" + table-pattern: "[tT]*" target-schema: db_target target-table: t_target sharding-route-rules-schema: - schema-pattern: sharding* + schema-pattern: "[sS]harding*" target-schema: db_target diff --git a/tests/tiup/conf/task_optimistic.yaml b/tests/tiup/conf/task_optimistic.yaml index c3d8d7d20a..11d6230394 100644 --- a/tests/tiup/conf/task_optimistic.yaml +++ b/tests/tiup/conf/task_optimistic.yaml @@ -4,6 +4,7 @@ task-mode: all is-sharding: true shard-mode: optimistic enable-heartbeat: true +case-sensitive: true target-database: host: tidb diff --git a/tests/tiup/conf/task_pessimistic.yaml b/tests/tiup/conf/task_pessimistic.yaml index 69386748a2..43ab1b8548 100644 --- a/tests/tiup/conf/task_pessimistic.yaml +++ b/tests/tiup/conf/task_pessimistic.yaml @@ -4,6 +4,7 @@ task-mode: all is-sharding: true shard-mode: pessimistic enable-heartbeat: true +case-sensitive: true target-database: host: tidb @@ -22,18 +23,18 @@ mysql-instances: black-white-list: instance: - do-dbs: ["~^pes_sharding[\\d]+"] + do-dbs: ["~^pes_[Ss]harding[\\d]+"] do-tables: - - db-name: "~^pes_sharding[\\d]+" - tbl-name: "~^t[\\d]+" + - db-name: "~^pes_[sS]harding[\\d]+" + tbl-name: "~^[Tt][\\d]+" routes: sharding-route-rules-table: - schema-pattern: pes_sharding* - table-pattern: t* + schema-pattern: "pes_[sS]harding*" + table-pattern: "[tT]*" target-schema: pes_db_target target-table: t_target sharding-route-rules-schema: - schema-pattern: pes_sharding* + schema-pattern: "pes_[sS]harding*" target-schema: pes_db_target diff --git a/tests/tiup/docker/docker-compose.yml b/tests/tiup/docker/docker-compose.yml index e31d7ad438..e1360d2129 100644 --- a/tests/tiup/docker/docker-compose.yml +++ b/tests/tiup/docker/docker-compose.yml @@ -31,7 +31,7 @@ services: dm-tiup: ipv4_address: 172.28.0.201 image: mysql:5.7.32 - command: --default-authentication-plugin=mysql_native_password --log-bin=/var/lib/mysql/mysql-bin --server-id=1 --binlog-format=ROW --gtid_mode=ON --enforce-gtid-consistency=true + command: --default-authentication-plugin=mysql_native_password --log-bin=/var/lib/mysql/mysql-bin --server-id=1 --binlog-format=ROW --gtid_mode=ON --enforce-gtid-consistency=true --lower_case_table_names=0 restart: always environment: MYSQL_ROOT_PASSWORD: "" @@ -44,7 +44,7 @@ services: dm-tiup: ipv4_address: 172.28.0.202 image: mariadb:10.5.8 - command: --log-bin --server-id=1 --binlog-format=ROW + command: --log-bin --server-id=1 --binlog-format=ROW --lower_case_table_names=0 restart: always environment: MYSQL_ROOT_PASSWORD: "" diff --git a/tests/tiup/lib.sh b/tests/tiup/lib.sh index ad9eb4134b..1f5fca6d17 100755 --- a/tests/tiup/lib.sh +++ b/tests/tiup/lib.sh @@ -12,15 +12,18 @@ TASK_NAME="upgrade_via_tiup" TASK_PESS_NAME="upgrade_via_tiup_pessimistic" TASK_OPTI_NAME="upgrade_via_tiup_optimistic" -DB1=sharding1 +DB1=Sharding1 DB2=sharding2 +# can't run upgrade test with upper case schema name in optimist mode DB3=opt_sharding1 DB4=opt_sharding2 -DB5=pes_sharding1 +DB5=pes_Sharding1 DB6=pes_sharding2 -TBL1=t1 +TBL1=T1 TBL2=t2 -TBL3=t3 +TBL3=T3 +TBL_LOWER1=t1 +TBL_LOWER3=t3 function exec_sql() { echo $3 | mysql -h $1 -P $2 @@ -61,15 +64,15 @@ function exec_full_stage() { # prepare optimsitic full data exec_sql mysql1 3306 "CREATE DATABASE $DB3 CHARACTER SET UTF8MB4 COLLATE utf8mb4_bin;" exec_sql mariadb2 3306 "CREATE DATABASE $DB4 CHARACTER SET UTF8MB4 COLLATE utf8mb4_bin;" - exec_sql mysql1 3306 "CREATE TABLE $DB3.$TBL1 (c1 INT PRIMARY KEY, c2 TEXT);" + exec_sql mysql1 3306 "CREATE TABLE $DB3.$TBL_LOWER1 (c1 INT PRIMARY KEY, c2 TEXT);" exec_sql mysql1 3306 "CREATE TABLE $DB3.$TBL2 (c1 INT PRIMARY KEY, c2 TEXT);" exec_sql mariadb2 3306 "CREATE TABLE $DB4.$TBL2 (c1 INT PRIMARY KEY, c2 TEXT);" - exec_sql mariadb2 3306 "CREATE TABLE $DB4.$TBL3 (c1 INT PRIMARY KEY, c2 TEXT);" + exec_sql mariadb2 3306 "CREATE TABLE $DB4.$TBL_LOWER3 (c1 INT PRIMARY KEY, c2 TEXT);" - exec_sql mysql1 3306 "INSERT INTO $DB3.$TBL1 (c1, c2) VALUES (1, '1');" + exec_sql mysql1 3306 "INSERT INTO $DB3.$TBL_LOWER1 (c1, c2) VALUES (1, '1');" exec_sql mysql1 3306 "INSERT INTO $DB3.$TBL2 (c1, c2) VALUES (2, '2');" exec_sql mariadb2 3306 "INSERT INTO $DB4.$TBL2 (c1, c2) VALUES (11, '11');" - exec_sql mariadb2 3306 "INSERT INTO $DB4.$TBL3 (c1, c2) VALUES (12, '12');" + exec_sql mariadb2 3306 "INSERT INTO $DB4.$TBL_LOWER3 (c1, c2) VALUES (12, '12');" # prepare pessimistic full data exec_sql mysql1 3306 "CREATE DATABASE $DB5;" @@ -93,22 +96,22 @@ function exec_incremental_stage1() { exec_sql mariadb2 3306 "INSERT INTO $DB2.$TBL3 (c1, c2) VALUES (112, '112');" # prepare optimistic incremental data - exec_sql mysql1 3306 "INSERT INTO $DB3.$TBL1 (c1, c2) VALUES (101, '101');" + exec_sql mysql1 3306 "INSERT INTO $DB3.$TBL_LOWER1 (c1, c2) VALUES (101, '101');" exec_sql mysql1 3306 "INSERT INTO $DB3.$TBL2 (c1, c2) VALUES (102, '102');" exec_sql mariadb2 3306 "INSERT INTO $DB4.$TBL2 (c1, c2) VALUES (111, '111');" - exec_sql mariadb2 3306 "INSERT INTO $DB4.$TBL3 (c1, c2) VALUES (112, '112');" + exec_sql mariadb2 3306 "INSERT INTO $DB4.$TBL_LOWER3 (c1, c2) VALUES (112, '112');" # optimistic shard ddls - exec_sql mysql1 3306 "ALTER TABLE $DB3.$TBL1 ADD COLUMN c3 INT;" + exec_sql mysql1 3306 "ALTER TABLE $DB3.$TBL_LOWER1 ADD COLUMN c3 INT;" exec_sql mysql1 3306 "ALTER TABLE $DB3.$TBL2 ADD COLUMN c4 INT;" exec_sql mariadb2 3306 "ALTER TABLE $DB4.$TBL2 ADD COLUMN c3 INT;" - exec_sql mariadb2 3306 "ALTER TABLE $DB4.$TBL3 ADD COLUMN c4 INT;" + exec_sql mariadb2 3306 "ALTER TABLE $DB4.$TBL_LOWER3 ADD COLUMN c4 INT;" # prepare optimistic incremental data - exec_sql mysql1 3306 "INSERT INTO $DB3.$TBL1 (c1, c2, c3) VALUES (103, '103', 103);" + exec_sql mysql1 3306 "INSERT INTO $DB3.$TBL_LOWER1 (c1, c2, c3) VALUES (103, '103', 103);" exec_sql mysql1 3306 "INSERT INTO $DB3.$TBL2 (c1, c2, c4) VALUES (104, '104', 104);" exec_sql mariadb2 3306 "INSERT INTO $DB4.$TBL2 (c1, c2, c3) VALUES (113, '113', 113);" - exec_sql mariadb2 3306 "INSERT INTO $DB4.$TBL3 (c1, c2, c4) VALUES (114, '114', 114);" + exec_sql mariadb2 3306 "INSERT INTO $DB4.$TBL_LOWER3 (c1, c2, c4) VALUES (114, '114', 114);" # prepare pessimistic incremental data exec_sql mysql1 3306 "INSERT INTO $DB5.$TBL1 (c1, c2) VALUES (101, '101');" @@ -135,22 +138,22 @@ function exec_incremental_stage2() { exec_sql mariadb2 3306 "INSERT INTO $DB2.$TBL3 (c1, c2) VALUES (212, '212');" # prepare optimistic incremental data - exec_sql mysql1 3306 "INSERT INTO $DB3.$TBL1 (c1, c2, c3) VALUES (201, '201', 201);" + exec_sql mysql1 3306 "INSERT INTO $DB3.$TBL_LOWER1 (c1, c2, c3) VALUES (201, '201', 201);" exec_sql mysql1 3306 "INSERT INTO $DB3.$TBL2 (c1, c2, c4) VALUES (202, '202', 202);" exec_sql mariadb2 3306 "INSERT INTO $DB4.$TBL2 (c1, c2, c3) VALUES (211, '211', 211);" - exec_sql mariadb2 3306 "INSERT INTO $DB4.$TBL3 (c1, c2, c4) VALUES (212, '212', 212);" + exec_sql mariadb2 3306 "INSERT INTO $DB4.$TBL_LOWER3 (c1, c2, c4) VALUES (212, '212', 212);" # optimistic shard ddls - exec_sql mysql1 3306 "ALTER TABLE $DB3.$TBL1 ADD COLUMN c4 INT;" + exec_sql mysql1 3306 "ALTER TABLE $DB3.$TBL_LOWER1 ADD COLUMN c4 INT;" exec_sql mysql1 3306 "ALTER TABLE $DB3.$TBL2 ADD COLUMN c3 INT AFTER c2;" exec_sql mariadb2 3306 "ALTER TABLE $DB4.$TBL2 ADD COLUMN c4 INT;" - exec_sql mariadb2 3306 "ALTER TABLE $DB4.$TBL3 ADD COLUMN c3 INT AFTER c2;" + exec_sql mariadb2 3306 "ALTER TABLE $DB4.$TBL_LOWER3 ADD COLUMN c3 INT AFTER c2;" # prepare optimistic incremental data - exec_sql mysql1 3306 "INSERT INTO $DB3.$TBL1 (c1, c2, c3, c4) VALUES (203, '203', 203, 203);" + exec_sql mysql1 3306 "INSERT INTO $DB3.$TBL_LOWER1 (c1, c2, c3, c4) VALUES (203, '203', 203, 203);" exec_sql mysql1 3306 "INSERT INTO $DB3.$TBL2 (c1, c2, c3, c4) VALUES (204, '204', 204, 204);" exec_sql mariadb2 3306 "INSERT INTO $DB4.$TBL2 (c1, c2, c3, c4) VALUES (213, '213', 213, 213);" - exec_sql mariadb2 3306 "INSERT INTO $DB4.$TBL3 (c1, c2, c3, c4) VALUES (214, '214', 214, 214);" + exec_sql mariadb2 3306 "INSERT INTO $DB4.$TBL_LOWER3 (c1, c2, c3, c4) VALUES (214, '214', 214, 214);" # prepare pessimistic incremental data exec_sql mysql1 3306 "INSERT INTO $DB5.$TBL1 (c1, c2, c3) VALUES (201, '201', 201);"