Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

fix(load): stop goroutines after restore returned (#744) #747

Merged
merged 5 commits into from
Jun 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions dm/worker/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,18 @@ func (w *Worker) Error(stName string) []*pb.SubTaskError {
return errs
}

// statusProcessResult returns a clone of *pb.ProcessResult, but omit the `Error` field, so no duplicated
// error message will be displayed in `query-status`, because the `Msg` field contains enough error information.
// statusProcessResult returns a clone of *pb.ProcessResult, but omit the `Msg` field, so no duplicated
// error message will be displayed in `query-status`, because the `Error` field contains enough error information.
func statusProcessResult(pr *pb.ProcessResult) *pb.ProcessResult {
if pr == nil {
return nil
}
result := proto.Clone(pr).(*pb.ProcessResult)
if result != nil {
for i := range result.Errors {
result.Errors[i].Error = nil
if result.Errors[i].Error != nil {
result.Errors[i].Msg = ""
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now, our Msg do not including error codes, class, etc.

update this line will change the response from

                    "errors": [
                        {
                            "Type": "UnknownError",
                            "msg": "TCPReader get relay event with error: ERROR 1236 (HY000): Could not find first log file name in binary log index file",
                            "error": null
                        }
                    ],

to

                        "errors": [
                            {
                                "Type": "UnknownError",
                                "msg": "",
                                "error": {
                                    "ErrCode": 10006,
                                    "ErrClass": 1,
                                    "ErrScope": 2,
                                    "ErrLevel": 3,
                                    "Message": "run table schema failed - dbfile ./dumped_data.task_single/db_single.tbl-schema.sql: execute statement failed: CREATE TABLE `tbl` (`c1` int(11) NOT NULL,`ct` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '创 建时间',PRIMARY KEY (`c1`)) ENGINE=InnoDB DEFAULT CHARSET=latin1;: Error 1067: Invalid default value for 'ct'",
                                    "RawCause": "Error 1067: Invalid default value for 'ct'"
                                }
                            }
                        ],

}
}
}
return result
Expand Down
2 changes: 1 addition & 1 deletion dm/worker/subtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) {
st.units = nil
st.Run()
c.Assert(st.Stage(), Equals, pb.Stage_Paused)
c.Assert(strings.Contains(st.Result().Errors[0].Msg, "has no dm units for mode"), IsTrue)
c.Assert(strings.Contains(st.Result().Errors[0].Error.String(), "has no dm units for mode"), IsTrue)

mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
Expand Down
30 changes: 13 additions & 17 deletions dm/worker/task_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,25 +263,21 @@ func isResumableError(err *pb.ProcessError) bool {
return true
}

switch err.Type {
case pb.ErrorType_ExecSQL:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove this ProcessError.Type later because we have better code, class, etc. cc @GMHDBJD

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do it in #746 later

// not elegant code, because TiDB doesn't expose some error
for _, msg := range retry.UnsupportedDDLMsgs {
if strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) {
return false
}
// not elegant code, because TiDB doesn't expose some error
for _, msg := range retry.UnsupportedDDLMsgs {
if strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) {
return false
}
for _, msg := range retry.UnsupportedDMLMsgs {
if strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) {
return false
}
}
for _, msg := range retry.UnsupportedDMLMsgs {
if strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) {
return false
}
case pb.ErrorType_UnknownError:
if err.Error.ErrCode == int32(terror.ErrParserParseRelayLog.Code()) {
for _, msg := range retry.ParseRelayLogErrMsgs {
if strings.Contains(strings.ToLower(err.Error.Message), strings.ToLower(msg)) {
return false
}
}
if err.Error.ErrCode == int32(terror.ErrParserParseRelayLog.Code()) {
for _, msg := range retry.ParseRelayLogErrMsgs {
if strings.Contains(strings.ToLower(err.Error.Message), strings.ToLower(msg)) {
return false
}
}
}
Expand Down
31 changes: 15 additions & 16 deletions dm/worker/task_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,32 +292,31 @@ func (s *testTaskCheckerSuite) TestCheckTaskIndependent(c *check.C) {

func (s *testTaskCheckerSuite) TestIsResumableError(c *check.C) {
testCases := []struct {
errorType pb.ErrorType
err error
resumable bool
}{
// only DM new error is checked
{pb.ErrorType_ExecSQL, &tmysql.SQLError{1105, "unsupported modify column length 20 is less than origin 40", tmysql.DefaultMySQLState}, true},
{pb.ErrorType_ExecSQL, &tmysql.SQLError{1105, "unsupported drop integer primary key", tmysql.DefaultMySQLState}, true},
{pb.ErrorType_ExecSQL, nil, true},
{pb.ErrorType_ExecSQL, terror.ErrDBExecuteFailed.Generate("file test.t3.sql: execute statement failed: USE `test_abc`;: context canceled"), true},
{pb.ErrorType_ExecSQL, terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{1105, "unsupported modify column length 20 is less than origin 40", tmysql.DefaultMySQLState}, "alter table t modify col varchar(20)"), false},
{pb.ErrorType_ExecSQL, terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{1105, "unsupported drop integer primary key", tmysql.DefaultMySQLState}, "alter table t drop column id"), false},
{pb.ErrorType_ExecSQL, terror.ErrDBExecuteFailed.Delegate(errors.New("Error 1062: Duplicate entry '5' for key 'PRIMARY'")), false},
{pb.ErrorType_ExecSQL, terror.ErrDBExecuteFailed.Delegate(errors.New("INSERT INTO `db`.`tbl` (`c1`,`c2`) VALUES (?,?);: Error 1406: Data too long for column 'c2' at row 1")), false},
{&tmysql.SQLError{1105, "unsupported modify column length 20 is less than origin 40", tmysql.DefaultMySQLState}, true},
{&tmysql.SQLError{1105, "unsupported drop integer primary key", tmysql.DefaultMySQLState}, true},
{terror.ErrDBExecuteFailed.Generate("file test.t3.sql: execute statement failed: USE `test_abc`;: context canceled"), true},
{terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{1105, "unsupported modify column length 20 is less than origin 40", tmysql.DefaultMySQLState}, "alter table t modify col varchar(20)"), false},
{terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{1105, "unsupported drop integer primary key", tmysql.DefaultMySQLState}, "alter table t drop column id"), false},
{terror.ErrDBExecuteFailed.Delegate(errors.New("Error 1062: Duplicate entry '5' for key 'PRIMARY'")), false},
{terror.ErrDBExecuteFailed.Delegate(errors.New("INSERT INTO `db`.`tbl` (`c1`,`c2`) VALUES (?,?);: Error 1406: Data too long for column 'c2' at row 1")), false},
{terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{1067, "Invalid default value for 'ct'", tmysql.DefaultMySQLState}, "CREATE TABLE `tbl` (`c1` int(11) NOT NULL,`ct` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '创建时间',PRIMARY KEY (`c1`)) ENGINE=InnoDB DEFAULT CHARSET=latin1"), false},
// real error is generated by `Delegate` and multiple `Annotatef`, we use `New` to simplify it
{pb.ErrorType_UnknownError, terror.ErrParserParseRelayLog.New("parse relay log file bin.000018 from offset 555 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file bin.000018 from offset 0 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004/bin.000018: binlog checksum mismatch, data may be corrupted"), false},
{pb.ErrorType_UnknownError, terror.ErrParserParseRelayLog.New("parse relay log file bin.000018 from offset 500 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file bin.000018 from offset 0 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004/bin.000018: get event err EOF, need 1567488104 but got 316323"), false},
{terror.ErrParserParseRelayLog.New("parse relay log file bin.000018 from offset 555 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file bin.000018 from offset 0 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004/bin.000018: binlog checksum mismatch, data may be corrupted"), false},
{terror.ErrParserParseRelayLog.New("parse relay log file bin.000018 from offset 500 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file bin.000018 from offset 0 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004/bin.000018: get event err EOF, need 1567488104 but got 316323"), false},
// unresumable terror codes
{pb.ErrorType_UnknownError, terror.ErrSyncUnitDDLWrongSequence.Generate("wrong sequence", "right sequence"), false},
{pb.ErrorType_UnknownError, terror.ErrSyncerShardDDLConflict.Generate("conflict DDL"), false},
{terror.ErrSyncUnitDDLWrongSequence.Generate("wrong sequence", "right sequence"), false},
{terror.ErrSyncerShardDDLConflict.Generate("conflict DDL"), false},
// others
{pb.ErrorType_UnknownError, nil, true},
{pb.ErrorType_UnknownError, errors.New("unknown error"), true},
{nil, true},
{errors.New("unknown error"), true},
}

for _, tc := range testCases {
err := unit.NewProcessError(tc.errorType, tc.err)
err := unit.NewProcessError(pb.ErrorType_UnknownError, tc.err)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this pb.ErrorType_UnknownError should be removed later.

fmt.Printf("error: %v\n", err)
c.Assert(isResumableError(err), check.Equals, tc.resumable)
}
Expand Down
1 change: 1 addition & 0 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ func (l *Loader) Process(ctx context.Context, pr chan pb.ProcessResult) {

err := l.Restore(newCtx)
close(l.runFatalChan) // Restore returned, all potential fatal sent to l.runFatalChan
cancel() // cancel the goroutines created in `Restore`.

failpoint.Inject("dontWaitWorkerExit", func(_ failpoint.Value) {
l.logCtx.L().Info("", zap.String("failpoint", "dontWaitWorkerExit"))
Expand Down
1 change: 1 addition & 0 deletions pkg/retry/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var (
"unsupported modify collate",
"unsupported drop integer primary key",
"Unsupported collation",
"Invalid default value for",
}

// UnsupportedDMLMsgs list the error messages of some un-recoverable DML, which is used in task auto recovery
Expand Down
2 changes: 1 addition & 1 deletion tests/all_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ function test_session_config(){
dmctl_start_task "$WORK_DIR/dm-task.yaml"
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"'tidb_retry_limit' can't be set to the value" 2
"'tidb_retry_limit' can't be set to the value" 4
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task test" \
"\"result\": true" 3
Expand Down
4 changes: 2 additions & 2 deletions tests/relay_interrupt/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ function run() {
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status -w 127.0.0.1:$WORKER1_PORT" \
"no sub task started" 1 \
"ERROR" 1
"ERROR" 2

echo "start task and query status, task have error message"
task_conf="$cur/conf/dm-task.yaml"
Expand All @@ -58,7 +58,7 @@ function run() {
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status -w 127.0.0.1:$WORKER1_PORT" \
"there aren't any data under relay log directory" 1 \
"ERROR" 1
"ERROR" 2
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status" \
"\"taskName\": \"test\"" 1 \
Expand Down