From ce43e287b485ffacb59fa7331913f5b1dbf6b060 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 7 Jun 2021 23:54:28 +0800 Subject: [PATCH] syncer: load table structure from dump files (#1699) --- _utils/terror_gen/errors_release.txt | 4 +- errors.toml | 4 +- loader/loader.go | 45 ++++------ loader/util.go | 25 ------ pkg/terror/error_list.go | 4 +- pkg/utils/file.go | 51 ++++++++++++ syncer/syncer.go | 118 +++++++++++++++++++++++---- syncer/syncer_test.go | 36 ++++++++ tests/all_mode/run.sh | 1 + tests/run.sh | 9 ++ tests/safe_mode/run.sh | 4 +- tests/shardddl1/run.sh | 6 +- tests/sharding/run.sh | 13 +-- 13 files changed, 234 insertions(+), 86 deletions(-) diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index 3dcb820060..75d9662432 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -277,10 +277,10 @@ ErrSyncerUnitInjectDDLOnly,[code=36023:class=sync-unit:scope=internal:level=low] ErrSyncerUnitInjectDDLWithoutSchema,[code=36024:class=sync-unit:scope=internal:level=low], "Message: injected DDL %s without schema name not valid" ErrSyncerUnitNotSupportedOperate,[code=36025:class=sync-unit:scope=internal:level=medium], "Message: op %s not supported" ErrSyncerUnitNilOperatorReq,[code=36026:class=sync-unit:scope=internal:level=medium], "Message: nil request not valid" -ErrSyncerUnitDMLColumnNotMatch,[code=36027:class=sync-unit:scope=internal:level=high], "Message: Column count doesn't match value count: %d (columns) vs %d (values)" +ErrSyncerUnitDMLColumnNotMatch,[code=36027:class=sync-unit:scope=internal:level=high], "Message: Column count doesn't match value count: %d (columns) vs %d (values), Workaround: Please check the log files to see if a related DDL has been skipped, and you could use `operate-schema` to get and set the table structure." ErrSyncerUnitDMLOldNewValueMismatch,[code=36028:class=sync-unit:scope=internal:level=high], "Message: Old value count doesn't match new value count: %d (old) vs %d (new)" ErrSyncerUnitDMLPruneColumnMismatch,[code=36029:class=sync-unit:scope=internal:level=high], "Message: prune DML columns and data mismatch in length: %d (columns) %d (data)" -ErrSyncerUnitGenBinlogEventFilter,[code=36030:class=sync-unit:scope=internal:level=high], "Message: generate binlog event filter, Workaround: Pleass check the `filters` config in source and task configuration files." +ErrSyncerUnitGenBinlogEventFilter,[code=36030:class=sync-unit:scope=internal:level=high], "Message: generate binlog event filter, Workaround: Please check the `filters` config in source and task configuration files." ErrSyncerUnitGenTableRouter,[code=36031:class=sync-unit:scope=internal:level=high], "Message: generate table router, Workaround: Please check `routes` config in task configuration file." ErrSyncerUnitGenColumnMapping,[code=36032:class=sync-unit:scope=internal:level=high], "Message: generate column mapping, Workaround: Please check the `column-mappings` config in task configuration file." ErrSyncerUnitDoColumnMapping,[code=36033:class=sync-unit:scope=internal:level=high], "Message: mapping row data %v for table `%s`.`%s`" diff --git a/errors.toml b/errors.toml index e0dc58b8ad..a7c351ae91 100644 --- a/errors.toml +++ b/errors.toml @@ -1675,7 +1675,7 @@ tags = ["internal", "medium"] [error.DM-sync-unit-36027] message = "Column count doesn't match value count: %d (columns) vs %d (values)" description = "" -workaround = "" +workaround = "Please check the log files to see if a related DDL has been skipped, and you could use `operate-schema` to get and set the table structure." tags = ["internal", "high"] [error.DM-sync-unit-36028] @@ -1693,7 +1693,7 @@ tags = ["internal", "high"] [error.DM-sync-unit-36030] message = "generate binlog event filter" description = "" -workaround = "Pleass check the `filters` config in source and task configuration files." +workaround = "Please check the `filters` config in source and task configuration files." tags = ["internal", "high"] [error.DM-sync-unit-36031] diff --git a/loader/loader.go b/loader/loader.go index da571e4d8f..4d771661e2 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -888,22 +888,18 @@ func (l *Loader) prepareDBFiles(files map[string]struct{}) error { l.totalFileCount.Store(0) // reset schemaFileCount := 0 for file := range files { - if !strings.HasSuffix(file, "-schema-create.sql") { + db, ok := utils.GetDBFromDumpFilename(file) + if !ok { continue } - - idx := strings.LastIndex(file, "-schema-create.sql") - if idx > 0 { - schemaFileCount++ - db := file[:idx] - if l.skipSchemaAndTable(&filter.Table{Schema: db}) { - l.logger.Warn("ignore schema file", zap.String("schema file", file)) - continue - } - - l.db2Tables[db] = make(Tables2DataFiles) - l.totalFileCount.Add(1) // for schema + schemaFileCount++ + if l.skipSchemaAndTable(&filter.Table{Schema: db}) { + l.logger.Warn("ignore schema file", zap.String("schema file", file)) + continue } + + l.db2Tables[db] = make(Tables2DataFiles) + l.totalFileCount.Add(1) // for schema } if schemaFileCount == 0 { @@ -918,21 +914,11 @@ func (l *Loader) prepareDBFiles(files map[string]struct{}) error { func (l *Loader) prepareTableFiles(files map[string]struct{}) error { var tablesNumber float64 - for file := range files { - if !strings.HasSuffix(file, "-schema.sql") { - continue - } - - idx := strings.LastIndex(file, "-schema.sql") - name := file[:idx] - fields := strings.Split(name, ".") - if len(fields) != 2 { - l.logger.Warn("invalid table schema file", zap.String("file", file)) + db, table, ok := utils.GetTableFromDumpFilename(file) + if !ok { continue } - - db, table := fields[0], fields[1] if l.skipSchemaAndTable(&filter.Table{Schema: db, Name: table}) { l.logger.Warn("ignore table file", zap.String("table file", file)) continue @@ -1037,7 +1023,7 @@ func (l *Loader) prepare() error { } // collect dir files. - files, err := CollectDirFiles(l.cfg.Dir) + files, err := utils.CollectDirFiles(l.cfg.Dir) if err != nil { return err } @@ -1484,14 +1470,17 @@ func (l *Loader) cleanDumpFiles() { l.logger.Warn("error when remove loaded dump folder", zap.String("data folder", l.cfg.Dir), zap.Error(err)) } } else { - // leave metadata file, only delete sql files - files, err := CollectDirFiles(l.cfg.Dir) + // leave metadata file and table structure files, only delete data files + files, err := utils.CollectDirFiles(l.cfg.Dir) if err != nil { l.logger.Warn("fail to collect files", zap.String("data folder", l.cfg.Dir), zap.Error(err)) } var lastErr error for f := range files { if strings.HasSuffix(f, ".sql") { + if strings.HasSuffix(f, "-schema-create.sql") || strings.HasSuffix(f, "-schema.sql") { + continue + } lastErr = os.Remove(filepath.Join(l.cfg.Dir, f)) } } diff --git a/loader/util.go b/loader/util.go index 9c5d136025..309bfa0971 100644 --- a/loader/util.go +++ b/loader/util.go @@ -18,36 +18,11 @@ import ( "fmt" "os" "path" - "path/filepath" "strings" "github.com/pingcap/dm/pkg/terror" ) -// CollectDirFiles gets files in path. -func CollectDirFiles(path string) (map[string]struct{}, error) { - files := make(map[string]struct{}) - err := filepath.Walk(path, func(_ string, f os.FileInfo, err error) error { - if err != nil { - return err - } - - if f == nil { - return nil - } - - if f.IsDir() { - return nil - } - - name := strings.TrimSpace(f.Name()) - files[name] = struct{}{} - return nil - }) - - return files, err -} - // SQLReplace works like strings.Replace but only supports one replacement. // It uses backquote pairs to quote the old and new word. func SQLReplace(s, oldStr, newStr string, ansiquote bool) string { diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index 4bf5374ebd..b1ab0625c5 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -964,10 +964,10 @@ var ( ErrSyncerUnitInjectDDLWithoutSchema = New(codeSyncerUnitInjectDDLWithoutSchema, ClassSyncUnit, ScopeInternal, LevelLow, "injected DDL %s without schema name not valid", "") ErrSyncerUnitNotSupportedOperate = New(codeSyncerUnitNotSupportedOperate, ClassSyncUnit, ScopeInternal, LevelMedium, "op %s not supported", "") ErrSyncerUnitNilOperatorReq = New(codeSyncerUnitNilOperatorReq, ClassSyncUnit, ScopeInternal, LevelMedium, "nil request not valid", "") - ErrSyncerUnitDMLColumnNotMatch = New(codeSyncerUnitDMLColumnNotMatch, ClassSyncUnit, ScopeInternal, LevelHigh, "Column count doesn't match value count: %d (columns) vs %d (values)", "") + ErrSyncerUnitDMLColumnNotMatch = New(codeSyncerUnitDMLColumnNotMatch, ClassSyncUnit, ScopeInternal, LevelHigh, "Column count doesn't match value count: %d (columns) vs %d (values)", "Please check the log files to see if a related DDL has been skipped, and you could use `operate-schema` to get and set the table structure.") ErrSyncerUnitDMLOldNewValueMismatch = New(codeSyncerUnitDMLOldNewValueMismatch, ClassSyncUnit, ScopeInternal, LevelHigh, "Old value count doesn't match new value count: %d (old) vs %d (new)", "") ErrSyncerUnitDMLPruneColumnMismatch = New(codeSyncerUnitDMLPruneColumnMismatch, ClassSyncUnit, ScopeInternal, LevelHigh, "prune DML columns and data mismatch in length: %d (columns) %d (data)", "") - ErrSyncerUnitGenBinlogEventFilter = New(codeSyncerUnitGenBinlogEventFilter, ClassSyncUnit, ScopeInternal, LevelHigh, "generate binlog event filter", "Pleass check the `filters` config in source and task configuration files.") + ErrSyncerUnitGenBinlogEventFilter = New(codeSyncerUnitGenBinlogEventFilter, ClassSyncUnit, ScopeInternal, LevelHigh, "generate binlog event filter", "Please check the `filters` config in source and task configuration files.") ErrSyncerUnitGenTableRouter = New(codeSyncerUnitGenTableRouter, ClassSyncUnit, ScopeInternal, LevelHigh, "generate table router", "Please check `routes` config in task configuration file.") ErrSyncerUnitGenColumnMapping = New(codeSyncerUnitGenColumnMapping, ClassSyncUnit, ScopeInternal, LevelHigh, "generate column mapping", "Please check the `column-mappings` config in task configuration file.") ErrSyncerUnitDoColumnMapping = New(codeSyncerUnitDoColumnMapping, ClassSyncUnit, ScopeInternal, LevelHigh, "mapping row data %v for table `%s`.`%s`", "") diff --git a/pkg/utils/file.go b/pkg/utils/file.go index 4a4edef948..2e8cc07263 100644 --- a/pkg/utils/file.go +++ b/pkg/utils/file.go @@ -18,6 +18,8 @@ import ( "io/ioutil" "os" "path" + "path/filepath" + "strings" "go.uber.org/zap" @@ -94,3 +96,52 @@ func WriteFileAtomic(filename string, data []byte, perm os.FileMode) error { } return os.Rename(f.Name(), filename) } + +// CollectDirFiles gets files in path. +func CollectDirFiles(path string) (map[string]struct{}, error) { + files := make(map[string]struct{}) + err := filepath.Walk(path, func(_ string, f os.FileInfo, err error) error { + if err != nil { + return err + } + + if f == nil { + return nil + } + + if f.IsDir() { + return nil + } + + name := strings.TrimSpace(f.Name()) + files[name] = struct{}{} + return nil + }) + + return files, err +} + +// GetDBFromDumpFilename extracts db name from dump filename. +func GetDBFromDumpFilename(filename string) (db string, ok bool) { + if !strings.HasSuffix(filename, "-schema-create.sql") { + return "", false + } + + idx := strings.LastIndex(filename, "-schema-create.sql") + return filename[:idx], true +} + +// GetTableFromDumpFilename extracts db and table name from dump filename. +func GetTableFromDumpFilename(filename string) (db, table string, ok bool) { + if !strings.HasSuffix(filename, "-schema.sql") { + return "", "", false + } + + idx := strings.LastIndex(filename, "-schema.sql") + name := filename[:idx] + fields := strings.Split(name, ".") + if len(fields) != 2 { + return "", "", false + } + return fields[0], fields[1], true +} diff --git a/syncer/syncer.go b/syncer/syncer.go index 3924d20342..755c2b3ab7 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -14,6 +14,7 @@ package syncer import ( + "bytes" "context" "crypto/tls" "fmt" @@ -44,6 +45,7 @@ import ( "go.uber.org/zap" "github.com/pingcap/dm/dm/config" + common2 "github.com/pingcap/dm/dm/ctl/common" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" "github.com/pingcap/dm/pkg/binlog" @@ -634,14 +636,13 @@ func (s *Syncer) getTable(tctx *tcontext.Context, origSchema, origTable, renamed return ti, nil } -// trackTableInfoFromDownstream tries to track the table info from the downstream. +// trackTableInfoFromDownstream tries to track the table info from the downstream. It will not overwrite existing table. func (s *Syncer) trackTableInfoFromDownstream(tctx *tcontext.Context, origSchema, origTable, renamedSchema, renamedTable string) error { - // TODO: Switch to use the HTTP interface to retrieve the TableInfo directly - // (and get rid of ddlDBConn). + // TODO: Switch to use the HTTP interface to retrieve the TableInfo directly if HTTP port is available // use parser for downstream. parser2, err := utils.GetParserForConn(tctx.Ctx, s.ddlDBConn.baseConn.DBConn) if err != nil { - return terror.ErrSchemaTrackerCannotFetchDownstreamTable.Delegate(err, renamedSchema, renamedTable, origSchema, origTable) + return terror.ErrSchemaTrackerCannotParseDownstreamTable.Delegate(err, renamedSchema, renamedTable, origSchema, origTable) } rows, err := s.ddlDBConn.querySQL(tctx, "SHOW CREATE TABLE "+dbutil.TableName(renamedSchema, renamedTable)) @@ -1121,6 +1122,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } }() + // some initialization that can't be put in Syncer.Init fresh, err := s.IsFreshTask(ctx) if err != nil { return err @@ -1131,24 +1133,36 @@ func (s *Syncer) Run(ctx context.Context) (err error) { return err } } - needFlushCheckpoint, err := s.adjustGlobalPointGTID(tctx) + + var ( + flushCheckpoint bool + cleanDumpFile = s.cfg.CleanDumpFile + ) + flushCheckpoint, err = s.adjustGlobalPointGTID(tctx) if err != nil { return err } - if needFlushCheckpoint || s.cfg.Mode == config.ModeAll { + if s.cfg.Mode == config.ModeAll { + flushCheckpoint = true + err = s.loadTableStructureFromDump(ctx) + if err != nil { + tctx.L().Warn("error happened when load table structure from dump files", zap.Error(err)) + cleanDumpFile = false + } + } + + if flushCheckpoint { if err = s.flushCheckPoints(); err != nil { tctx.L().Warn("fail to flush checkpoints when starting task", zap.Error(err)) - } else if s.cfg.Mode == config.ModeAll && s.cfg.CleanDumpFile { - tctx.L().Info("try to remove loaded files") - metadataFile := path.Join(s.cfg.Dir, "metadata") - if err = os.Remove(metadataFile); err != nil { - tctx.L().Warn("error when remove loaded dump file", zap.String("data file", metadataFile), zap.Error(err)) - } - if err = os.Remove(s.cfg.Dir); err != nil { - tctx.L().Warn("error when remove loaded dump folder", zap.String("data folder", s.cfg.Dir), zap.Error(err)) - } } } + if cleanDumpFile { + tctx.L().Info("try to remove all dump files") + if err = os.RemoveAll(s.cfg.Dir); err != nil { + tctx.L().Warn("error when remove loaded dump folder", zap.String("data folder", s.cfg.Dir), zap.Error(err)) + } + } + failpoint.Inject("AdjustGTIDExit", func() { tctx.L().Warn("exit triggered", zap.String("failpoint", "AdjustGTIDExit")) s.streamerController.Close(tctx) @@ -1330,7 +1344,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } if err != nil { - // TODO: wrap the error with terror, and attach binlog position tctx.L().Error("fail to fetch binlog", log.ShortError(err)) if isConnectionRefusedError(err) { @@ -2186,13 +2199,14 @@ func (s *Syncer) trackDDL(usedSchema string, sql string, tableNames [][]*filter. srcTables, targetTables := tableNames[0], tableNames[1] srcTable := srcTables[0] - // Make sure the tables are all loaded into the schema tracker. + // Make sure the needed tables are all loaded into the schema tracker. var ( shouldExecDDLOnSchemaTracker bool shouldSchemaExist bool shouldTableExistNum int // tableNames[:shouldTableExistNum] should exist shouldRefTableExistNum int // tableNames[1:shouldTableExistNum] should exist, since first one is "caller table" ) + switch node := stmt.(type) { case *ast.CreateDatabaseStmt: shouldExecDDLOnSchemaTracker = true @@ -2212,7 +2226,7 @@ func (s *Syncer) trackDDL(usedSchema string, sql string, tableNames [][]*filter. case *ast.CreateTableStmt, *ast.CreateViewStmt: shouldExecDDLOnSchemaTracker = true shouldSchemaExist = true - // for CREATE TABLE LIKE/AS, there should be reference tables which should exist + // for CREATE TABLE LIKE/AS, the reference tables should exist shouldRefTableExistNum = len(srcTables) case *ast.DropTableStmt: shouldExecDDLOnSchemaTracker = true @@ -2327,6 +2341,74 @@ func (s *Syncer) genRouter() error { return nil } +func (s *Syncer) loadTableStructureFromDump(ctx context.Context) error { + logger := s.tctx.L() + + files, err := utils.CollectDirFiles(s.cfg.Dir) + if err != nil { + logger.Warn("fail to get dump files", zap.Error(err)) + return err + } + var dbs, tables []string + var tableFiles [][2]string // [db, filename] + for f := range files { + if db, ok := utils.GetDBFromDumpFilename(f); ok { + dbs = append(dbs, db) + continue + } + if db, table, ok := utils.GetTableFromDumpFilename(f); ok { + tables = append(tables, dbutil.TableName(db, table)) + tableFiles = append(tableFiles, [2]string{db, f}) + continue + } + } + logger.Info("fetch table structure form dump files", + zap.Strings("database", dbs), + zap.Any("tables", tables)) + for _, db := range dbs { + if err = s.schemaTracker.CreateSchemaIfNotExists(db); err != nil { + return err + } + } + + var firstErr error + setFirstErr := func(err error) { + if firstErr == nil { + firstErr = err + } + } + + for _, dbAndFile := range tableFiles { + db, file := dbAndFile[0], dbAndFile[1] + filepath := path.Join(s.cfg.Dir, file) + content, err2 := common2.GetFileContent(filepath) + if err2 != nil { + logger.Warn("fail to read file for creating table in schema tracker", + zap.String("db", db), + zap.String("file", filepath), + zap.Error(err)) + setFirstErr(err2) + continue + } + stmts := bytes.Split(content, []byte(";")) + for _, stmt := range stmts { + stmt = bytes.TrimSpace(stmt) + if len(stmt) == 0 || bytes.HasPrefix(stmt, []byte("/*")) { + continue + } + err = s.schemaTracker.Exec(ctx, db, string(stmt)) + if err != nil { + logger.Warn("fail to create table for dump files", + zap.Any("file", filepath), + zap.ByteString("statement", stmt), + zap.Error(err)) + setFirstErr(err) + } + } + } + return firstErr +} + func (s *Syncer) printStatus(ctx context.Context) { defer s.wg.Done() diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index a6276b5232..81637f3918 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1693,3 +1693,39 @@ func (s *Syncer) setupMockCheckpoint(c *C, checkPointDBConn *sql.Conn, checkPoin s.checkpoint.(*RemoteCheckPoint).dbConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(checkPointDBConn, &retry.FiniteRetryStrategy{})} c.Assert(s.checkpoint.(*RemoteCheckPoint).prepare(tcontext.Background()), IsNil) } + +func (s *testSyncerSuite) TestTrackDownstreamTableWontOverwrite(c *C) { + syncer := Syncer{} + ctx := context.Background() + tctx := tcontext.Background() + + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + dbConn, err := db.Conn(ctx) + c.Assert(err, IsNil) + baseConn := conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{}) + syncer.ddlDBConn = &DBConn{cfg: s.cfg, baseConn: baseConn} + syncer.schemaTracker, err = schema.NewTracker(ctx, s.cfg.Name, defaultTestSessionCfg, baseConn) + c.Assert(err, IsNil) + + upTable, downTable := "up", "down" + schema := "test" + createTableSQL := "CREATE TABLE up (c1 int, c2 int);" + + mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows( + sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", "")) + mock.ExpectQuery("SHOW CREATE TABLE.*").WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow(downTable, " CREATE TABLE `"+downTable+"` (\n `c` int(11) DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + + c.Assert(syncer.schemaTracker.CreateSchemaIfNotExists(schema), IsNil) + c.Assert(syncer.schemaTracker.Exec(ctx, "test", createTableSQL), IsNil) + ti, err := syncer.getTable(tctx, schema, upTable, schema, downTable) + c.Assert(err, IsNil) + c.Assert(ti.Columns, HasLen, 2) + c.Assert(syncer.trackTableInfoFromDownstream(tctx, schema, upTable, schema, downTable), IsNil) + newTi, err := syncer.getTable(tctx, schema, upTable, schema, downTable) + c.Assert(err, IsNil) + c.Assert(newTi, DeepEquals, ti) + c.Assert(mock.ExpectationsWereMet(), IsNil) +} diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index c5d6cc6221..fa27fc92cb 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -43,6 +43,7 @@ function test_session_config() { sed -i 's/tidb_retry_limit: "fjs"/tidb_retry_limit: "10"/g' $WORK_DIR/dm-task.yaml dmctl_start_task "$WORK_DIR/dm-task.yaml" "--remove-meta" + run_sql_source1 "create table if not exists all_mode.t1 (c int); insert into all_mode.t1 (id, name) values (9, 'haha');" check_sync_diff $WORK_DIR $cur/conf/diff_config.toml run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ diff --git a/tests/run.sh b/tests/run.sh index 3e92747cea..4ac65ace00 100755 --- a/tests/run.sh +++ b/tests/run.sh @@ -25,6 +25,13 @@ check_mysql() { done } +set_default_variables() { + host=$1 + port=$2 + password=$3 + mysql -u root -h ${host} -P ${port} -p${password} -e "set global character_set_server='utf8mb4';set global collation_server='utf8mb4_bin';" +} + start_services() { stop_services @@ -37,6 +44,8 @@ start_services() { check_mysql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 check_mysql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + set_default_variables $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + set_default_variables $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 } if [ "$#" -ge 1 ]; then diff --git a/tests/safe_mode/run.sh b/tests/safe_mode/run.sh index 16d1d49186..dea665a50b 100755 --- a/tests/safe_mode/run.sh +++ b/tests/safe_mode/run.sh @@ -47,11 +47,11 @@ function consistency_none() { name1=$(grep "Log: " $WORK_DIR/worker1/dumped_data.test/metadata | tail -1 | awk -F: '{print $2}' | tr -d ' ') pos1=$(grep "Pos: " $WORK_DIR/worker1/dumped_data.test/metadata | tail -1 | awk -F: '{print $2}' | tr -d ' ') gtid1=$(grep "GTID:" $WORK_DIR/worker1/dumped_data.test/metadata | tail -1 | awk -F: '{print $2,":",$3}' | tr -d ' ') - check_log_contains $WORK_DIR/worker1/log/dm-worker.log "\[\"enable safe-mode because of inconsistent dump, will exit at\"\] \[task=test\] \[unit=\"binlog replication\"\] \[location=\"position: ($name1, $pos1), gtid-set: $gtid1\"\]" + check_log_contain_with_retry "\[\"enable safe-mode because of inconsistent dump, will exit at\"\] \[task=test\] \[unit=\"binlog replication\"\] \[location=\"position: ($name1, $pos1), gtid-set: $gtid1\"\]" $WORK_DIR/worker1/log/dm-worker.log name2=$(grep "Log: " $WORK_DIR/worker2/dumped_data.test/metadata | tail -1 | awk -F: '{print $2}' | tr -d ' ') pos2=$(grep "Pos: " $WORK_DIR/worker2/dumped_data.test/metadata | tail -1 | awk -F: '{print $2}' | tr -d ' ') gtid2=$(grep "GTID:" $WORK_DIR/worker2/dumped_data.test/metadata | tail -1 | awk -F: '{print $2,":",$3}' | tr -d ' ') - check_log_contains $WORK_DIR/worker2/log/dm-worker.log "\[\"enable safe-mode because of inconsistent dump, will exit at\"\] \[task=test\] \[unit=\"binlog replication\"\] \[location=\"position: ($name2, $pos2), gtid-set: $gtid2\"\]" + check_log_contain_with_retry "\[\"enable safe-mode because of inconsistent dump, will exit at\"\] \[task=test\] \[unit=\"binlog replication\"\] \[location=\"position: ($name2, $pos2), gtid-set: $gtid2\"\]" $WORK_DIR/worker2/log/dm-worker.log run_sql_source2 "SET @@GLOBAL.SQL_MODE='ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION'" cleanup_data safe_mode_target diff --git a/tests/shardddl1/run.sh b/tests/shardddl1/run.sh index 52adc7dc32..322ed0ec8d 100644 --- a/tests/shardddl1/run.sh +++ b/tests/shardddl1/run.sh @@ -10,9 +10,13 @@ source $cur/../_utils/shardddl_lib.sh function DM_001_CASE() { run_sql_source1 "alter table ${shardddl1}.${tb1} add column new_col1 int;" run_sql_source1 "alter table ${shardddl1}.${tb2} add column new_col1 int;" + # schema tracker could track per table without error run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ - "Duplicate column name 'new_col1'" 1 + "\"result\": true" 2 \ + "\"synced\": true" 1 + # only downstream sees a duplicate error, but currently ignored by DM + check_log_contain_with_retry "Duplicate column name 'new_col1'" $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log } function DM_001() { diff --git a/tests/sharding/run.sh b/tests/sharding/run.sh index 68ce31ee29..d172ae3060 100755 --- a/tests/sharding/run.sh +++ b/tests/sharding/run.sh @@ -29,6 +29,12 @@ function run() { check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT check_metric $MASTER_PORT 'start_leader_counter' 3 0 2 + # operate mysql config to worker + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml + sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml + sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml + # now, for pessimistic shard DDL, if interrupted after executed DDL but before flush checkpoint, # re-sync this DDL will cause the source try to sync the DDL of the previous lock again, # this will need to recover the replication manually, @@ -39,16 +45,11 @@ function run() { export GO_FAILPOINTS="github.com/pingcap/dm/syncer/FlushCheckpointStage=return(2)" run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 export GO_FAILPOINTS='' run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT - # operate mysql config to worker - cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml - cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml - sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml - sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml - dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 # start DM task only