From 8dffefc35870fb1a4d5cd91b13314c76f42f5d7c Mon Sep 17 00:00:00 2001 From: Xuecheng Zhang Date: Wed, 17 Jun 2020 13:48:25 +0800 Subject: [PATCH 1/4] fix(load): stop goroutines after restore returned (#744) --- dm/worker/task_checker_test.go | 1 + loader/loader.go | 1 + pkg/retry/errors.go | 1 + 3 files changed, 3 insertions(+) diff --git a/dm/worker/task_checker_test.go b/dm/worker/task_checker_test.go index 33992b67ed..29c68ded01 100644 --- a/dm/worker/task_checker_test.go +++ b/dm/worker/task_checker_test.go @@ -305,6 +305,7 @@ func (s *testTaskCheckerSuite) TestIsResumableError(c *check.C) { {pb.ErrorType_ExecSQL, terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{1105, "unsupported drop integer primary key", tmysql.DefaultMySQLState}, "alter table t drop column id"), false}, {pb.ErrorType_ExecSQL, terror.ErrDBExecuteFailed.Delegate(errors.New("Error 1062: Duplicate entry '5' for key 'PRIMARY'")), false}, {pb.ErrorType_ExecSQL, terror.ErrDBExecuteFailed.Delegate(errors.New("INSERT INTO `db`.`tbl` (`c1`,`c2`) VALUES (?,?);: Error 1406: Data too long for column 'c2' at row 1")), false}, + {pb.ErrorType_ExecSQL, terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{1067, "Invalid default value for 'ct'", tmysql.DefaultMySQLState}, "CREATE TABLE `tbl` (`c1` int(11) NOT NULL,`ct` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '创建时间',PRIMARY KEY (`c1`)) ENGINE=InnoDB DEFAULT CHARSET=latin1"), false}, // real error is generated by `Delegate` and multiple `Annotatef`, we use `New` to simplify it {pb.ErrorType_UnknownError, terror.ErrParserParseRelayLog.New("parse relay log file bin.000018 from offset 555 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file bin.000018 from offset 0 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004/bin.000018: binlog checksum mismatch, data may be corrupted"), false}, {pb.ErrorType_UnknownError, terror.ErrParserParseRelayLog.New("parse relay log file bin.000018 from offset 500 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file bin.000018 from offset 0 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004/bin.000018: get event err EOF, need 1567488104 but got 316323"), false}, diff --git a/loader/loader.go b/loader/loader.go index b06f035434..6b7409b2cc 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -487,6 +487,7 @@ func (l *Loader) Process(ctx context.Context, pr chan pb.ProcessResult) { err := l.Restore(newCtx) close(l.runFatalChan) // Restore returned, all potential fatal sent to l.runFatalChan + cancel() // cancel the goroutines created in `Restore`. failpoint.Inject("dontWaitWorkerExit", func(_ failpoint.Value) { l.logCtx.L().Info("", zap.String("failpoint", "dontWaitWorkerExit")) diff --git a/pkg/retry/errors.go b/pkg/retry/errors.go index 4b3caf4bf7..e4190c6109 100644 --- a/pkg/retry/errors.go +++ b/pkg/retry/errors.go @@ -30,6 +30,7 @@ var ( "unsupported modify collate", "unsupported drop integer primary key", "Unsupported collation", + "Invalid default value for", } // UnsupportedDMLMsgs list the error messages of some un-recoverable DML, which is used in task auto recovery From 25654da657b2dcc373af22a10839116eaaf3c0a2 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Wed, 17 Jun 2020 14:42:01 +0800 Subject: [PATCH 2/4] *: including error codes in the response of `query-status`; refine task checker --- dm/worker/status.go | 6 +++--- dm/worker/task_checker.go | 30 +++++++++++++----------------- dm/worker/task_checker_test.go | 32 +++++++++++++++----------------- 3 files changed, 31 insertions(+), 37 deletions(-) diff --git a/dm/worker/status.go b/dm/worker/status.go index 643bba3642..14c58de120 100644 --- a/dm/worker/status.go +++ b/dm/worker/status.go @@ -194,8 +194,8 @@ func (w *Worker) Error(stName string) []*pb.SubTaskError { return errs } -// statusProcessResult returns a clone of *pb.ProcessResult, but omit the `Error` field, so no duplicated -// error message will be displayed in `query-status`, because the `Msg` field contains enough error information. +// statusProcessResult returns a clone of *pb.ProcessResult, but omit the `Msg` field, so no duplicated +// error message will be displayed in `query-status`, because the `Error` field contains enough error information. func statusProcessResult(pr *pb.ProcessResult) *pb.ProcessResult { if pr == nil { return nil @@ -203,7 +203,7 @@ func statusProcessResult(pr *pb.ProcessResult) *pb.ProcessResult { result := proto.Clone(pr).(*pb.ProcessResult) if result != nil { for i := range result.Errors { - result.Errors[i].Error = nil + result.Errors[i].Msg = "" } } return result diff --git a/dm/worker/task_checker.go b/dm/worker/task_checker.go index d5fa5f0ae1..39d476ee84 100644 --- a/dm/worker/task_checker.go +++ b/dm/worker/task_checker.go @@ -263,25 +263,21 @@ func isResumableError(err *pb.ProcessError) bool { return true } - switch err.Type { - case pb.ErrorType_ExecSQL: - // not elegant code, because TiDB doesn't expose some error - for _, msg := range retry.UnsupportedDDLMsgs { - if strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) { - return false - } + // not elegant code, because TiDB doesn't expose some error + for _, msg := range retry.UnsupportedDDLMsgs { + if strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) { + return false } - for _, msg := range retry.UnsupportedDMLMsgs { - if strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) { - return false - } + } + for _, msg := range retry.UnsupportedDMLMsgs { + if strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) { + return false } - case pb.ErrorType_UnknownError: - if err.Error.ErrCode == int32(terror.ErrParserParseRelayLog.Code()) { - for _, msg := range retry.ParseRelayLogErrMsgs { - if strings.Contains(strings.ToLower(err.Error.Message), strings.ToLower(msg)) { - return false - } + } + if err.Error.ErrCode == int32(terror.ErrParserParseRelayLog.Code()) { + for _, msg := range retry.ParseRelayLogErrMsgs { + if strings.Contains(strings.ToLower(err.Error.Message), strings.ToLower(msg)) { + return false } } } diff --git a/dm/worker/task_checker_test.go b/dm/worker/task_checker_test.go index 29c68ded01..742a6c16f6 100644 --- a/dm/worker/task_checker_test.go +++ b/dm/worker/task_checker_test.go @@ -292,33 +292,31 @@ func (s *testTaskCheckerSuite) TestCheckTaskIndependent(c *check.C) { func (s *testTaskCheckerSuite) TestIsResumableError(c *check.C) { testCases := []struct { - errorType pb.ErrorType err error resumable bool }{ // only DM new error is checked - {pb.ErrorType_ExecSQL, &tmysql.SQLError{1105, "unsupported modify column length 20 is less than origin 40", tmysql.DefaultMySQLState}, true}, - {pb.ErrorType_ExecSQL, &tmysql.SQLError{1105, "unsupported drop integer primary key", tmysql.DefaultMySQLState}, true}, - {pb.ErrorType_ExecSQL, nil, true}, - {pb.ErrorType_ExecSQL, terror.ErrDBExecuteFailed.Generate("file test.t3.sql: execute statement failed: USE `test_abc`;: context canceled"), true}, - {pb.ErrorType_ExecSQL, terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{1105, "unsupported modify column length 20 is less than origin 40", tmysql.DefaultMySQLState}, "alter table t modify col varchar(20)"), false}, - {pb.ErrorType_ExecSQL, terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{1105, "unsupported drop integer primary key", tmysql.DefaultMySQLState}, "alter table t drop column id"), false}, - {pb.ErrorType_ExecSQL, terror.ErrDBExecuteFailed.Delegate(errors.New("Error 1062: Duplicate entry '5' for key 'PRIMARY'")), false}, - {pb.ErrorType_ExecSQL, terror.ErrDBExecuteFailed.Delegate(errors.New("INSERT INTO `db`.`tbl` (`c1`,`c2`) VALUES (?,?);: Error 1406: Data too long for column 'c2' at row 1")), false}, - {pb.ErrorType_ExecSQL, terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{1067, "Invalid default value for 'ct'", tmysql.DefaultMySQLState}, "CREATE TABLE `tbl` (`c1` int(11) NOT NULL,`ct` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '创建时间',PRIMARY KEY (`c1`)) ENGINE=InnoDB DEFAULT CHARSET=latin1"), false}, + {&tmysql.SQLError{1105, "unsupported modify column length 20 is less than origin 40", tmysql.DefaultMySQLState}, true}, + {&tmysql.SQLError{1105, "unsupported drop integer primary key", tmysql.DefaultMySQLState}, true}, + {terror.ErrDBExecuteFailed.Generate("file test.t3.sql: execute statement failed: USE `test_abc`;: context canceled"), true}, + {terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{1105, "unsupported modify column length 20 is less than origin 40", tmysql.DefaultMySQLState}, "alter table t modify col varchar(20)"), false}, + {terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{1105, "unsupported drop integer primary key", tmysql.DefaultMySQLState}, "alter table t drop column id"), false}, + {terror.ErrDBExecuteFailed.Delegate(errors.New("Error 1062: Duplicate entry '5' for key 'PRIMARY'")), false}, + {terror.ErrDBExecuteFailed.Delegate(errors.New("INSERT INTO `db`.`tbl` (`c1`,`c2`) VALUES (?,?);: Error 1406: Data too long for column 'c2' at row 1")), false}, + {terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{1067, "Invalid default value for 'ct'", tmysql.DefaultMySQLState}, "CREATE TABLE `tbl` (`c1` int(11) NOT NULL,`ct` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '创建时间',PRIMARY KEY (`c1`)) ENGINE=InnoDB DEFAULT CHARSET=latin1"), false}, // real error is generated by `Delegate` and multiple `Annotatef`, we use `New` to simplify it - {pb.ErrorType_UnknownError, terror.ErrParserParseRelayLog.New("parse relay log file bin.000018 from offset 555 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file bin.000018 from offset 0 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004/bin.000018: binlog checksum mismatch, data may be corrupted"), false}, - {pb.ErrorType_UnknownError, terror.ErrParserParseRelayLog.New("parse relay log file bin.000018 from offset 500 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file bin.000018 from offset 0 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004/bin.000018: get event err EOF, need 1567488104 but got 316323"), false}, + {terror.ErrParserParseRelayLog.New("parse relay log file bin.000018 from offset 555 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file bin.000018 from offset 0 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004/bin.000018: binlog checksum mismatch, data may be corrupted"), false}, + {terror.ErrParserParseRelayLog.New("parse relay log file bin.000018 from offset 500 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file bin.000018 from offset 0 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004/bin.000018: get event err EOF, need 1567488104 but got 316323"), false}, // unresumable terror codes - {pb.ErrorType_UnknownError, terror.ErrSyncUnitDDLWrongSequence.Generate("wrong sequence", "right sequence"), false}, - {pb.ErrorType_UnknownError, terror.ErrSyncerShardDDLConflict.Generate("conflict DDL"), false}, + {terror.ErrSyncUnitDDLWrongSequence.Generate("wrong sequence", "right sequence"), false}, + {terror.ErrSyncerShardDDLConflict.Generate("conflict DDL"), false}, // others - {pb.ErrorType_UnknownError, nil, true}, - {pb.ErrorType_UnknownError, errors.New("unknown error"), true}, + {nil, true}, + {errors.New("unknown error"), true}, } for _, tc := range testCases { - err := unit.NewProcessError(tc.errorType, tc.err) + err := unit.NewProcessError(pb.ErrorType_UnknownError, tc.err) fmt.Printf("error: %v\n", err) c.Assert(isResumableError(err), check.Equals, tc.resumable) } From c6d8fab37c8b42c3c3f8e4917c00a0dc6abe4684 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Wed, 17 Jun 2020 16:07:52 +0800 Subject: [PATCH 3/4] *: fix result for non-terror --- dm/worker/status.go | 4 +++- dm/worker/subtask_test.go | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/dm/worker/status.go b/dm/worker/status.go index 14c58de120..220cb82bc8 100644 --- a/dm/worker/status.go +++ b/dm/worker/status.go @@ -203,7 +203,9 @@ func statusProcessResult(pr *pb.ProcessResult) *pb.ProcessResult { result := proto.Clone(pr).(*pb.ProcessResult) if result != nil { for i := range result.Errors { - result.Errors[i].Msg = "" + if result.Errors[i].Error != nil { + result.Errors[i].Msg = "" + } } } return result diff --git a/dm/worker/subtask_test.go b/dm/worker/subtask_test.go index aecc134a88..0a43ad407e 100644 --- a/dm/worker/subtask_test.go +++ b/dm/worker/subtask_test.go @@ -174,7 +174,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) { st.units = nil st.Run() c.Assert(st.Stage(), Equals, pb.Stage_Paused) - c.Assert(strings.Contains(st.Result().Errors[0].Msg, "has no dm units for mode"), IsTrue) + c.Assert(strings.Contains(st.Result().Errors[0].Error.String(), "has no dm units for mode"), IsTrue) mockDumper := NewMockUnit(pb.UnitType_Dump) mockLoader := NewMockUnit(pb.UnitType_Load) From 45e13d06668abec4e0af989aa303a9db57fe027d Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Wed, 17 Jun 2020 16:27:16 +0800 Subject: [PATCH 4/4] tests: fix results check --- tests/all_mode/run.sh | 2 +- tests/relay_interrupt/run.sh | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index ddfafda4f1..52d13fc0d2 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -29,7 +29,7 @@ function test_session_config(){ dmctl_start_task "$WORK_DIR/dm-task.yaml" run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ - "'tidb_retry_limit' can't be set to the value" 2 + "'tidb_retry_limit' can't be set to the value" 4 run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "stop-task test" \ "\"result\": true" 3 diff --git a/tests/relay_interrupt/run.sh b/tests/relay_interrupt/run.sh index 8227ec6e06..5a64925de3 100644 --- a/tests/relay_interrupt/run.sh +++ b/tests/relay_interrupt/run.sh @@ -47,7 +47,7 @@ function run() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status -w 127.0.0.1:$WORKER1_PORT" \ "no sub task started" 1 \ - "ERROR" 1 + "ERROR" 2 echo "start task and query status, task have error message" task_conf="$cur/conf/dm-task.yaml" @@ -58,7 +58,7 @@ function run() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status -w 127.0.0.1:$WORKER1_PORT" \ "there aren't any data under relay log directory" 1 \ - "ERROR" 1 + "ERROR" 2 run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status" \ "\"taskName\": \"test\"" 1 \