From ea2601d13d9a2ef82ad764398ebfe82330000c64 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 6 Mar 2020 12:09:34 +0800 Subject: [PATCH 1/3] fix worker query error bug --- dm/worker/server_test.go | 38 ++++++++++++++++++++++++++++++- dm/worker/status.go | 48 ++++++++++++++++++++++++++-------------- pkg/utils/string.go | 16 +++++++++++++- 3 files changed, 83 insertions(+), 19 deletions(-) diff --git a/dm/worker/server_test.go b/dm/worker/server_test.go index 16313aa601..fb9512248e 100644 --- a/dm/worker/server_test.go +++ b/dm/worker/server_test.go @@ -21,13 +21,17 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/errors" "google.golang.org/grpc" + "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" ) +const subtaskSampleFile = "./subtask.toml" + func TestServer(t *testing.T) { TestingT(t) } @@ -80,7 +84,7 @@ func (t *testServer) TestServer(c *C) { c.Assert(s.worker.meta.PeekLog(), IsNil) // start task - subtaskCfgBytes, err := ioutil.ReadFile("./subtask.toml") + subtaskCfgBytes, err := ioutil.ReadFile(subtaskSampleFile) c.Assert(err, IsNil) resp1, err := cli.StartSubTask(context.Background(), &pb.StartSubTaskRequest{ @@ -162,3 +166,35 @@ func (t *testServer) createClient(c *C, addr string) pb.WorkerClient { c.Assert(err, IsNil) return pb.NewWorkerClient(conn) } + +func (t *testServer) TestQueryError(c *C) { + cfg := NewConfig() + c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml"}), IsNil) + + s := NewServer(cfg) + s.closed.Set(false) + + w, err := NewWorker(cfg) + c.Assert(err, IsNil) + w.closed.Set(closedFalse) + + subtaskCfg := config.SubTaskConfig{} + err = subtaskCfg.DecodeFile(subtaskSampleFile) + c.Assert(err, IsNil) + + // subtask failed just after it is started + st := NewSubTask(&subtaskCfg) + st.fail(errors.New("mockSubtaskFail")) + w.subTaskHolder.recordSubTask(st) + s.worker = w + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + resp, err := s.QueryError(ctx, &pb.QueryErrorRequest{}) + c.Assert(err, IsNil) + c.Assert(resp, NotNil) + c.Assert(resp.Result, IsTrue) + c.Assert(resp.Msg, HasLen, 0) + c.Assert(resp.SubTaskError, HasLen, 1) + c.Assert(resp.SubTaskError[0].String(), Matches, `[\s\S]*mockSubtaskFail[\s\S]*`) +} diff --git a/dm/worker/status.go b/dm/worker/status.go index 069f1757d9..643bba3642 100644 --- a/dm/worker/status.go +++ b/dm/worker/status.go @@ -19,6 +19,7 @@ import ( "sort" "github.com/pingcap/dm/dm/pb" + "github.com/pingcap/dm/pkg/utils" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" @@ -27,12 +28,18 @@ import ( // Status returns the status of the current sub task func (st *SubTask) Status() interface{} { - return st.CurrUnit().Status() + if cu := st.CurrUnit(); cu != nil { + return cu.Status() + } + return nil } // Error returns the error of the current sub task func (st *SubTask) Error() interface{} { - return st.CurrUnit().Error() + if cu := st.CurrUnit(); cu != nil { + return cu.Error() + } + return nil } // StatusJSON returns the status of the current sub task as json string @@ -133,7 +140,7 @@ func (w *Worker) Error(stName string) []*pb.SubTaskError { return nil // no sub task started } - error := make([]*pb.SubTaskError, 0, len(sts)) + errs := make([]*pb.SubTaskError, 0, len(sts)) // return error order by name names := make([]string, 0, len(sts)) @@ -158,26 +165,33 @@ func (w *Worker) Error(stName string) []*pb.SubTaskError { stError = pb.SubTaskError{ Name: name, Stage: st.Stage(), - Unit: cu.Type(), } - // oneof error - us := cu.Error() - switch cu.Type() { - case pb.UnitType_Check: - stError.Error = &pb.SubTaskError_Check{Check: us.(*pb.CheckError)} - case pb.UnitType_Dump: - stError.Error = &pb.SubTaskError_Dump{Dump: us.(*pb.DumpError)} - case pb.UnitType_Load: - stError.Error = &pb.SubTaskError_Load{Load: us.(*pb.LoadError)} - case pb.UnitType_Sync: - stError.Error = &pb.SubTaskError_Sync{Sync: us.(*pb.SyncError)} + if cu != nil { + // one of error + stError.Unit = cu.Type() + us := cu.Error() + switch cu.Type() { + case pb.UnitType_Check: + stError.Error = &pb.SubTaskError_Check{Check: us.(*pb.CheckError)} + case pb.UnitType_Dump: + stError.Error = &pb.SubTaskError_Dump{Dump: us.(*pb.DumpError)} + case pb.UnitType_Load: + stError.Error = &pb.SubTaskError_Load{Load: us.(*pb.LoadError)} + case pb.UnitType_Sync: + stError.Error = &pb.SubTaskError_Sync{Sync: us.(*pb.SyncError)} + } + } else if result := st.Result(); result != nil { + processErrorMsg := utils.JoinProcessErrors(result.Errors) + if len(processErrorMsg) > 0 { + stError.Error = &pb.SubTaskError_Msg{Msg: processErrorMsg} + } } } - error = append(error, &stError) + errs = append(errs, &stError) } - return error + return errs } // statusProcessResult returns a clone of *pb.ProcessResult, but omit the `Error` field, so no duplicated diff --git a/pkg/utils/string.go b/pkg/utils/string.go index ed9136fb2a..34f6c95b1f 100644 --- a/pkg/utils/string.go +++ b/pkg/utils/string.go @@ -13,7 +13,12 @@ package utils -import "fmt" +import ( + "fmt" + "strings" + + "github.com/pingcap/dm/dm/pb" +) const ( defaultStringLenLimit = 1024 @@ -38,3 +43,12 @@ func TruncateString(s string, n int) string { func TruncateInterface(v interface{}, n int) string { return TruncateString(fmt.Sprintf("%+v", v), n) } + +// JoinProcessErrors return the string of pb.ProcessErrors joined by ", " +func JoinProcessErrors(errors []*pb.ProcessError) string { + serrs := make([]string, 0, len(errors)) + for _, serr := range errors { + serrs = append(serrs, serr.String()) + } + return strings.Join(serrs, ", ") +} From 31ce76b6ed77e3c565046817209d134f8379ac01 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sat, 7 Mar 2020 18:27:15 +0800 Subject: [PATCH 2/3] fix --- dm/worker/server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dm/worker/server_test.go b/dm/worker/server_test.go index fb9512248e..a8b3557d21 100644 --- a/dm/worker/server_test.go +++ b/dm/worker/server_test.go @@ -179,7 +179,7 @@ func (t *testServer) TestQueryError(c *C) { w.closed.Set(closedFalse) subtaskCfg := config.SubTaskConfig{} - err = subtaskCfg.DecodeFile(subtaskSampleFile) + err = subtaskCfg.DecodeFile(subtaskSampleFile, true) c.Assert(err, IsNil) // subtask failed just after it is started From 2f1ac81dfddb598de5adc4f5163070e6e890486c Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sat, 7 Mar 2020 19:28:01 +0800 Subject: [PATCH 3/3] update test --- dm/worker/server_test.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/dm/worker/server_test.go b/dm/worker/server_test.go index a8b3557d21..931727a4e7 100644 --- a/dm/worker/server_test.go +++ b/dm/worker/server_test.go @@ -174,12 +174,16 @@ func (t *testServer) TestQueryError(c *C) { s := NewServer(cfg) s.closed.Set(false) - w, err := NewWorker(cfg) - c.Assert(err, IsNil) + w := &Worker{ + cfg: cfg, + relayHolder: NewRelayHolder(cfg), + // initial relay holder, the cfg's password will be decrypted in NewRelayHolder + subTaskHolder: newSubTaskHolder(), + } w.closed.Set(closedFalse) subtaskCfg := config.SubTaskConfig{} - err = subtaskCfg.DecodeFile(subtaskSampleFile, true) + err := subtaskCfg.DecodeFile(subtaskSampleFile, true) c.Assert(err, IsNil) // subtask failed just after it is started