diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index 1ae36f6a12..cc240fad62 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -171,6 +171,11 @@ ErrConfigExprFilterManyExpr,[code=20039:class=config:scope=internal:level=high], ErrConfigExprFilterNotFound,[code=20040:class=config:scope=internal:level=high], "Message: mysql-instance(%d)'s expression-filters %s not exist in expression-filter, Workaround: Please check the `expression-filters` config in task configuration file." ErrConfigExprFilterWrongGrammar,[code=20041:class=config:scope=internal:level=high], "Message: expression-filter name(%s) SQL(%s) has wrong grammar: %v, Workaround: Please check the `expression-filters` config in task configuration file." ErrConfigExprFilterEmptyName,[code=20042:class=config:scope=internal:level=high], "Message: expression-filter %s has empty %s, Workaround: Please check the `expression-filters` config in task configuration file." +ErrConfigCheckerMaxTooSmall,[code=20043:class=config:scope=internal:level=high], "Message: `backoff-max` value %v is less than `backoff-min` value %v, Workaround: Please increase `backoff-max` config in task configuration file." +ErrConfigGenBAList,[code=20044:class=config:scope=internal:level=high], "Message: generate block allow list error, Workaround: Please check the `block-allow-list` config in task configuration file." +ErrConfigGenTableRouter,[code=20045:class=config:scope=internal:level=high], "Message: generate table router error, Workaround: Please check the `routes` config in task configuration file." +ErrConfigGenColumnMapping,[code=20046:class=config:scope=internal:level=high], "Message: generate column mapping error, Workaround: Please check the `column-mappings` config in task configuration file." +ErrConfigInvalidChunkFileSize,[code=20047:class=config:scope=internal:level=high], "Message: invalid `chunk-filesize` %v, Workaround: Please check the `chunk-filesize` config in task configuration file." ErrBinlogExtractPosition,[code=22001:class=binlog-op:scope=internal:level=high] ErrBinlogInvalidFilename,[code=22002:class=binlog-op:scope=internal:level=high], "Message: invalid binlog filename" ErrBinlogParsePosFromStr,[code=22003:class=binlog-op:scope=internal:level=high] diff --git a/checker/checker.go b/checker/checker.go index fdd0ef4380..1469578707 100644 --- a/checker/checker.go +++ b/checker/checker.go @@ -90,7 +90,7 @@ func NewChecker(cfgs []*config.SubTaskConfig, checkingItems map[string]string, e } for _, cfg := range cfgs { - // we have verify it in subtask config + // we have verify it in SubTaskConfig.Adjust replica, _ := cfg.DecryptPassword() c.instances = append(c.instances, &mysqlInstance{ cfg: replica, diff --git a/dm/config/source_config.go b/dm/config/source_config.go index a4303d5c83..db70e5ee83 100644 --- a/dm/config/source_config.go +++ b/dm/config/source_config.go @@ -206,6 +206,10 @@ func (c *SourceConfig) Verify() error { return terror.ErrConfigBinlogEventFilter.Delegate(err) } + if c.Checker.BackoffMax.Duration < c.Checker.BackoffMin.Duration { + return terror.ErrConfigCheckerMaxTooSmall.Generate(c.Checker.BackoffMax.Duration, c.Checker.BackoffMin.Duration) + } + return nil } diff --git a/dm/config/subtask.go b/dm/config/subtask.go index 6122fc46d3..37ed2cac45 100644 --- a/dm/config/subtask.go +++ b/dm/config/subtask.go @@ -27,6 +27,7 @@ import ( router "github.com/pingcap/tidb-tools/pkg/table-router" "go.uber.org/zap" + "github.com/pingcap/dm/pkg/dumpling" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" @@ -248,7 +249,7 @@ func (c *SubTaskConfig) Decode(data string, verifyDecryptPassword bool) error { return c.Adjust(verifyDecryptPassword) } -// Adjust adjusts configs. +// Adjust adjusts and verifies configs. func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error { if c.Name == "" { return terror.ErrConfigTaskNameEmpty.Generate() @@ -308,6 +309,24 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error { c.BAList = c.BWList } + if _, err := filter.New(c.CaseSensitive, c.BAList); err != nil { + return terror.ErrConfigGenBAList.Delegate(err) + } + if _, err := router.NewTableRouter(c.CaseSensitive, c.RouteRules); err != nil { + return terror.ErrConfigGenTableRouter.Delegate(err) + } + // NewMapping will fill arguments with the default values. + if _, err := column.NewMapping(c.CaseSensitive, c.ColumnMappingRules); err != nil { + return terror.ErrConfigGenColumnMapping.Delegate(err) + } + if _, err := dumpling.ParseFileSize(c.MydumperConfig.ChunkFilesize, 0); err != nil { + return terror.ErrConfigInvalidChunkFileSize.Generate(c.MydumperConfig.ChunkFilesize) + } + + // TODO: check every member + // TODO: since we checked here, we could remove other terror like ErrSyncerUnitGenBAList + // TODO: or we should check at task config and source config rather than this subtask config, to reduce duplication + return nil } diff --git a/dm/config/task_test.go b/dm/config/task_test.go index f02417f62a..81b8de9d7e 100644 --- a/dm/config/task_test.go +++ b/dm/config/task_test.go @@ -571,7 +571,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) { } routeRule1 = router.TableRule{ SchemaPattern: "db*", - TablePattern: "tbl*", + TargetSchema: "db", } routeRule2 = router.TableRule{ SchemaPattern: "db*", @@ -580,8 +580,8 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) { TargetTable: "tbl", } routeRule3 = router.TableRule{ - SchemaPattern: "database*", - TablePattern: "table*", + SchemaPattern: "schema*", + TargetSchema: "schema", } routeRule4 = router.TableRule{ SchemaPattern: "schema*", diff --git a/dm/ctl/master/unlock_ddl_lock.go b/dm/ctl/master/unlock_ddl_lock.go index 9630967aae..fe57648db3 100644 --- a/dm/ctl/master/unlock_ddl_lock.go +++ b/dm/ctl/master/unlock_ddl_lock.go @@ -54,7 +54,7 @@ func unlockDDLLockFunc(cmd *cobra.Command, _ []string) error { sources, _ := common.GetSourceArgs(cmd) if len(sources) > 0 { - fmt.Println("shoud not specify any sources") + fmt.Println("should not specify any sources") return errors.New("please check output to see error") } diff --git a/dm/master/server.go b/dm/master/server.go index 4c4df741af..61a9d0ca94 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -867,8 +867,8 @@ func (s *Server) PurgeWorkerRelay(ctx context.Context, req *pb.PurgeWorkerRelayR wg.Add(1) go func(worker *scheduler.Worker, source string) { defer wg.Done() - resp, err3 := worker.SendRequest(ctx, workerReq, s.cfg.RPCTimeout) var workerResp *pb.CommonWorkerResponse + resp, err3 := worker.SendRequest(ctx, workerReq, s.cfg.RPCTimeout) if err3 != nil { workerResp = errorCommonWorkerResponse(err3.Error(), source, worker.BaseInfo().Name) } else { @@ -1019,8 +1019,8 @@ func (s *Server) getStatusFromWorkers(ctx context.Context, sources []string, tas sourceID := args[0].(string) w, _ := args[1].(*scheduler.Worker) - resp, err := w.SendRequest(ctx, workerReq, s.cfg.RPCTimeout) var workerStatus *pb.QueryStatusResponse + resp, err := w.SendRequest(ctx, workerReq, s.cfg.RPCTimeout) if err != nil { workerStatus = &pb.QueryStatusResponse{ Result: false, @@ -1174,6 +1174,9 @@ func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest var ( cfgs []*config.SourceConfig err error + resp = &pb.OperateSourceResponse{ + Result: false, + } ) switch req.Op { case pb.SourceOp_StartSource, pb.SourceOp_UpdateSource: @@ -1182,9 +1185,6 @@ func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest // don't check the upstream connections, because upstream may be inaccessible cfgs, err = parseSourceConfig(req.Config) } - resp := &pb.OperateSourceResponse{ - Result: false, - } if err != nil { resp.Msg = err.Error() // nolint:nilerr @@ -1536,8 +1536,8 @@ func (s *Server) waitOperationOk(ctx context.Context, cli *scheduler.Worker, tas } } } - resp, err := cli.SendRequest(ctx, req, s.cfg.RPCTimeout) var queryResp *pb.QueryStatusResponse + resp, err := cli.SendRequest(ctx, req, s.cfg.RPCTimeout) if err != nil { log.L().Error("fail to query operation", zap.Int("retryNum", num), zap.String("task", taskName), zap.String("source", sourceID), zap.Stringer("expect", expect), log.ShortError(err)) @@ -1896,8 +1896,8 @@ func (s *Server) OperateSchema(ctx context.Context, req *pb.OperateSchemaRequest }, } - resp, err := worker.SendRequest(ctx, &workerReq, s.cfg.RPCTimeout) var workerResp *pb.CommonWorkerResponse + resp, err := worker.SendRequest(ctx, &workerReq, s.cfg.RPCTimeout) if err != nil { workerResp = errorCommonWorkerResponse(err.Error(), source, worker.BaseInfo().Name) } else { @@ -2115,8 +2115,8 @@ func (s *Server) HandleError(ctx context.Context, req *pb.HandleErrorRequest) (* workerRespCh <- errorCommonWorkerResponse(fmt.Sprintf("source %s relevant worker-client not found", source), source, "") return } - resp, err := worker.SendRequest(ctx, &workerReq, s.cfg.RPCTimeout) var workerResp *pb.CommonWorkerResponse + resp, err := worker.SendRequest(ctx, &workerReq, s.cfg.RPCTimeout) if err != nil { workerResp = errorCommonWorkerResponse(err.Error(), source, worker.BaseInfo().Name) } else { diff --git a/dm/master/shardddl/optimist.go b/dm/master/shardddl/optimist.go index 9bad67eec0..a5396ab757 100644 --- a/dm/master/shardddl/optimist.go +++ b/dm/master/shardddl/optimist.go @@ -634,9 +634,9 @@ func (o *Optimist) handleOperationPut(ctx context.Context, opCh <-chan optimism. // handleLock handles a single shard DDL lock. func (o *Optimist) handleLock(info optimism.Info, tts []optimism.TargetTable, skipDone bool) error { - lockID, newDDLs, cols, err := o.lk.TrySync(o.cli, info, tts) cfStage := optimism.ConflictNone cfMsg := "" + lockID, newDDLs, cols, err := o.lk.TrySync(o.cli, info, tts) switch { case info.IgnoreConflict: o.logger.Warn("error occur when trying to sync for shard DDL info, this often means shard DDL conflict detected", diff --git a/dumpling/dumpling.go b/dumpling/dumpling.go index 975945f1eb..bbd867fa88 100644 --- a/dumpling/dumpling.go +++ b/dumpling/dumpling.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" "github.com/pingcap/dm/pkg/conn" + dutils "github.com/pingcap/dm/pkg/dumpling" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" @@ -58,11 +59,13 @@ func NewDumpling(cfg *config.SubTaskConfig) *Dumpling { // Init implements Unit.Init. func (m *Dumpling) Init(ctx context.Context) error { var err error - m.dumpConfig, err = m.constructArgs() + if m.dumpConfig, err = m.constructArgs(); err != nil { + return err + } m.detectSQLMode(ctx) m.dumpConfig.SessionParams["time_zone"] = "+00:00" m.logger.Info("create dumpling", zap.Stringer("config", m.dumpConfig)) - return err + return nil } // Process implements Unit.Process. @@ -229,7 +232,7 @@ func (m *Dumpling) constructArgs() (*export.Config, error) { dumpConfig.Threads = cfg.Threads } if cfg.ChunkFilesize != "" { - dumpConfig.FileSize, err = parseFileSize(cfg.ChunkFilesize) + dumpConfig.FileSize, err = dutils.ParseFileSize(cfg.ChunkFilesize, export.UnspecifiedSize) if err != nil { m.logger.Warn("parsed some unsupported arguments", zap.Error(err)) return nil, err diff --git a/dumpling/util.go b/dumpling/util.go index c7c7a114e2..aeb0174d9e 100644 --- a/dumpling/util.go +++ b/dumpling/util.go @@ -16,14 +16,13 @@ package dumpling import ( "errors" "fmt" - "strconv" "strings" - "github.com/docker/go-units" "github.com/pingcap/dumpling/v4/export" filter "github.com/pingcap/tidb-tools/pkg/table-filter" "github.com/spf13/pflag" + dutils "github.com/pingcap/dm/pkg/dumpling" "github.com/pingcap/dm/pkg/log" ) @@ -96,7 +95,7 @@ func parseExtraArgs(logger *log.Logger, dumpCfg *export.Config, args []string) e } if fileSizeStr != "" { - dumpCfg.FileSize, err = parseFileSize(fileSizeStr) + dumpCfg.FileSize, err = dutils.ParseFileSize(fileSizeStr, export.UnspecifiedSize) if err != nil { return err } @@ -114,20 +113,6 @@ func parseExtraArgs(logger *log.Logger, dumpCfg *export.Config, args []string) e return nil } -func parseFileSize(fileSizeStr string) (uint64, error) { - var fileSize uint64 - if len(fileSizeStr) == 0 { - fileSize = export.UnspecifiedSize - } else if fileSizeMB, err := strconv.ParseUint(fileSizeStr, 10, 64); err == nil { - fileSize = fileSizeMB * units.MiB - } else if size, err := units.RAMInBytes(fileSizeStr); err == nil { - fileSize = uint64(size) - } else { - return 0, err - } - return fileSize, nil -} - // parseTableFilter parses `--tables-list` and `--filter`. // copy (and update) from https://github.com/pingcap/dumpling/blob/6f74c686e54183db7b869775af1c32df46462a6a/cmd/dumpling/main.go#L222. func parseTableFilter(tablesList, filters []string) (filter.Filter, error) { diff --git a/errors.toml b/errors.toml index 493e38fbfd..742cceab55 100644 --- a/errors.toml +++ b/errors.toml @@ -1036,6 +1036,36 @@ description = "" workaround = "Please check the `expression-filters` config in task configuration file." tags = ["internal", "high"] +[error.DM-config-20043] +message = "`backoff-max` value %v is less than `backoff-min` value %v" +description = "" +workaround = "Please increase `backoff-max` config in task configuration file." +tags = ["internal", "high"] + +[error.DM-config-20044] +message = "generate block allow list error" +description = "" +workaround = "Please check the `block-allow-list` config in task configuration file." +tags = ["internal", "high"] + +[error.DM-config-20045] +message = "generate table router error" +description = "" +workaround = "Please check the `routes` config in task configuration file." +tags = ["internal", "high"] + +[error.DM-config-20046] +message = "generate column mapping error" +description = "" +workaround = "Please check the `column-mappings` config in task configuration file." +tags = ["internal", "high"] + +[error.DM-config-20047] +message = "invalid `chunk-filesize` %v" +description = "" +workaround = "Please check the `chunk-filesize` config in task configuration file." +tags = ["internal", "high"] + [error.DM-binlog-op-22001] message = "" description = "" diff --git a/loader/loader.go b/loader/loader.go index 284aa49da0..d9c835ed63 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -337,12 +337,11 @@ func (w *Worker) dispatchSQL(ctx context.Context, file string, offset int64, tab // do nothing } line, err := br.ReadString('\n') - cur += int64(len(line)) - if err == io.EOF { w.logger.Info("data are scanned finished.", zap.String("data file", file), zap.Int64("offset", offset)) break } + cur += int64(len(line)) realLine := strings.TrimSpace(line[:len(line)-1]) if len(realLine) == 0 { diff --git a/pkg/binlog/event/common.go b/pkg/binlog/event/common.go index 331b45b24c..12de6fd8cd 100644 --- a/pkg/binlog/event/common.go +++ b/pkg/binlog/event/common.go @@ -116,6 +116,9 @@ func GenCommonGTIDEvent(flavor string, serverID uint32, latestPos uint32, gSet g return nil, terror.ErrBinlogMariaDBServerIDMismatch.Generate(mariaGTID.ServerID, header.ServerID) } gtidEv, err = GenMariaDBGTIDEvent(header, latestPos, mariaGTID.SequenceNumber, mariaGTID.DomainID) + if err != nil { + return gtidEv, err + } // in go-mysql, set ServerID in parseEvent. we try to set it directly gtidEvBody := gtidEv.Event.(*replication.MariadbGTIDEvent) gtidEvBody.GTID.ServerID = header.ServerID diff --git a/pkg/dumpling/utils.go b/pkg/dumpling/utils.go index dfbddb8c7a..0e1c06c8af 100644 --- a/pkg/dumpling/utils.go +++ b/pkg/dumpling/utils.go @@ -21,6 +21,7 @@ import ( "strconv" "strings" + "github.com/docker/go-units" "github.com/go-mysql-org/go-mysql/mysql" "github.com/pingcap/dm/pkg/binlog" @@ -184,3 +185,18 @@ func readFollowingGTIDs(br *bufio.Reader, flavor string) (string, error) { following.WriteString(line) } } + +// ParseFileSize parses the size in MiB from input. +func ParseFileSize(fileSizeStr string, defaultSize uint64) (uint64, error) { + var fileSize uint64 + if len(fileSizeStr) == 0 { + fileSize = defaultSize + } else if fileSizeMB, err := strconv.ParseUint(fileSizeStr, 10, 64); err == nil { + fileSize = fileSizeMB * units.MiB + } else if size, err := units.RAMInBytes(fileSizeStr); err == nil { + fileSize = uint64(size) + } else { + return 0, err + } + return fileSize, nil +} diff --git a/pkg/ha/relay.go b/pkg/ha/relay.go index 9b041573a6..266e6f52d8 100644 --- a/pkg/ha/relay.go +++ b/pkg/ha/relay.go @@ -137,6 +137,9 @@ func GetRelayConfig(cli *clientv3.Client, worker string) (*config.SourceConfig, var rev2 int64 sourceResp := txnResp.Responses[0].GetResponseRange() newSource, rev2, err = getSourceIDFromResp((*clientv3.GetResponse)(sourceResp)) + if err != nil { + return nil, 0, err + } if newSource != source { log.L().Warn("relay config has been changed, will take a retry", diff --git a/pkg/streamer/reader.go b/pkg/streamer/reader.go index 25aa57f463..4d63dd009d 100644 --- a/pkg/streamer/reader.go +++ b/pkg/streamer/reader.go @@ -377,10 +377,10 @@ func (r *BinlogReader) parseDirAsPossible(ctx context.Context, s *LocalStreamer, firstParse = true // new relay log file need to parse } needSwitch, latestPos, nextUUID, nextBinlogName, err = r.parseFileAsPossible(ctx, s, relayLogFile, offset, dir, firstParse, currentUUID, i == len(files)-1) - firstParse = false // already parsed if err != nil { return false, "", "", terror.Annotatef(err, "parse relay log file %s from offset %d in dir %s", relayLogFile, offset, dir) } + firstParse = false // already parsed if needSwitch { // need switch to next relay sub directory return true, nextUUID, nextBinlogName, nil @@ -408,10 +408,10 @@ func (r *BinlogReader) parseFileAsPossible(ctx context.Context, s *LocalStreamer default: } needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile(ctx, s, relayLogFile, latestPos, relayLogDir, firstParse, currentUUID, possibleLast, replaceWithHeartbeat) - firstParse = false // set to false to handle the `continue` below if err != nil { return false, 0, "", "", terror.Annotatef(err, "parse relay log file %s from offset %d in dir %s", relayLogFile, latestPos, relayLogDir) } + firstParse = false // set to false to handle the `continue` below if needReParse { r.tctx.L().Debug("continue to re-parse relay log file", zap.String("file", relayLogFile), zap.String("directory", relayLogDir)) continue // should continue to parse this file diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index a8955675e8..63915e9586 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -234,6 +234,11 @@ const ( codeConfigExprFilterNotFound codeConfigExprFilterWrongGrammar codeConfigExprFilterEmptyName + codeConfigCheckerMaxTooSmall + codeConfigGenBAList + codeConfigGenTableRouter + codeConfigGenColumnMapping + codeConfigInvalidChunkFileSize ) // Binlog operation error code list. @@ -852,6 +857,11 @@ var ( ErrConfigExprFilterNotFound = New(codeConfigExprFilterNotFound, ClassConfig, ScopeInternal, LevelHigh, "mysql-instance(%d)'s expression-filters %s not exist in expression-filter", "Please check the `expression-filters` config in task configuration file.") ErrConfigExprFilterWrongGrammar = New(codeConfigExprFilterWrongGrammar, ClassConfig, ScopeInternal, LevelHigh, "expression-filter name(%s) SQL(%s) has wrong grammar: %v", "Please check the `expression-filters` config in task configuration file.") ErrConfigExprFilterEmptyName = New(codeConfigExprFilterEmptyName, ClassConfig, ScopeInternal, LevelHigh, "expression-filter %s has empty %s", "Please check the `expression-filters` config in task configuration file.") + ErrConfigCheckerMaxTooSmall = New(codeConfigCheckerMaxTooSmall, ClassConfig, ScopeInternal, LevelHigh, "`backoff-max` value %v is less than `backoff-min` value %v", "Please increase `backoff-max` config in task configuration file.") + ErrConfigGenBAList = New(codeConfigGenBAList, ClassConfig, ScopeInternal, LevelHigh, "generate block allow list error", "Please check the `block-allow-list` config in task configuration file.") + ErrConfigGenTableRouter = New(codeConfigGenTableRouter, ClassConfig, ScopeInternal, LevelHigh, "generate table router error", "Please check the `routes` config in task configuration file.") + ErrConfigGenColumnMapping = New(codeConfigGenColumnMapping, ClassConfig, ScopeInternal, LevelHigh, "generate column mapping error", "Please check the `column-mappings` config in task configuration file.") + ErrConfigInvalidChunkFileSize = New(codeConfigInvalidChunkFileSize, ClassConfig, ScopeInternal, LevelHigh, "invalid `chunk-filesize` %v", "Please check the `chunk-filesize` config in task configuration file.") // Binlog operation error. ErrBinlogExtractPosition = New(codeBinlogExtractPosition, ClassBinlogOp, ScopeInternal, LevelHigh, "", "") diff --git a/relay/meta.go b/relay/meta.go index 0187eb5333..6987f9c399 100644 --- a/relay/meta.go +++ b/relay/meta.go @@ -158,7 +158,10 @@ func (lm *LocalMeta) AdjustWithStartPos(binlogName string, binlogGTID string, en // check whether already have meaningful pos if len(lm.currentUUID) > 0 { - _, suffix, _ := utils.ParseSuffixForUUID(lm.currentUUID) + _, suffix, err := utils.ParseSuffixForUUID(lm.currentUUID) + if err != nil { + return false, err + } currPos := mysql.Position{Name: lm.BinLogName, Pos: lm.BinLogPos} if suffix != minUUIDSufix || currPos.Compare(minCheckpoint) > 0 || len(lm.BinlogGTID) > 0 { return false, nil // current pos is meaningful, do nothing diff --git a/tests/dmctl_basic/conf/get_task.yaml b/tests/dmctl_basic/conf/get_task.yaml index b0ea31a5d0..83a32f9570 100644 --- a/tests/dmctl_basic/conf/get_task.yaml +++ b/tests/dmctl_basic/conf/get_task.yaml @@ -100,6 +100,7 @@ column-mappings: - "1" - "" - t_ + - "" create-table-query: "" cm-02: schema-pattern: dmctl @@ -111,6 +112,7 @@ column-mappings: - "2" - "" - t_ + - "" create-table-query: "" expression-filter: {} black-white-list: {} diff --git a/tests/dmctl_command/run.sh b/tests/dmctl_command/run.sh index e77934263f..932fad8d0e 100644 --- a/tests/dmctl_command/run.sh +++ b/tests/dmctl_command/run.sh @@ -67,6 +67,16 @@ function run() { check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + # check wrong backoff-max + cp $cur/conf/source1.yaml $WORK_DIR/wrong-source.yaml + sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/wrong-source.yaml + sed -i "s/backoff-max: 5m/backoff-max: 0.1s/g" $WORK_DIR/wrong-source.yaml + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "operate-source create $WORK_DIR/wrong-source.yaml" \ + "\"result\": false" 1 \ + "Please increase \`backoff-max\`" 1 + # operate mysql config to worker cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml @@ -74,6 +84,23 @@ function run() { dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 + # check wrong do-tables + cp $cur/conf/dm-task.yaml $WORK_DIR/wrong-dm-task.yaml + sed -i "/do-dbs:/a\ do-tables:\n - db-name: \"dmctl_command\"" $WORK_DIR/wrong-dm-task.yaml + sed -i "/do-dbs:/d" $WORK_DIR/wrong-dm-task.yaml + echo "ignore-checking-items: [\"all\"]" >>$WORK_DIR/wrong-dm-task.yaml + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $WORK_DIR/wrong-dm-task.yaml" \ + "Table string cannot be empty" 1 + + # check wrong chunk-filesize + cp $cur/conf/dm-task.yaml $WORK_DIR/wrong-dm-task.yaml + sed -i "s/chunk-filesize: 64/chunk-filesize: 6qwe4/g" $WORK_DIR/wrong-dm-task.yaml + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $WORK_DIR/wrong-dm-task.yaml" \ + "invalid \`chunk-filesize\` 6qwe4" 1 + # start DM task with command mode run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "start-task $cur/conf/dm-task.yaml" \