Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

dm-worker/: refine query error(#512) #519

Merged
merged 4 commits into from
Mar 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"google.golang.org/grpc"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

const subtaskSampleFile = "./subtask.toml"

func TestServer(t *testing.T) {
TestingT(t)
}
Expand Down Expand Up @@ -80,7 +84,7 @@ func (t *testServer) TestServer(c *C) {
c.Assert(s.worker.meta.PeekLog(), IsNil)

// start task
subtaskCfgBytes, err := ioutil.ReadFile("./subtask.toml")
subtaskCfgBytes, err := ioutil.ReadFile(subtaskSampleFile)
c.Assert(err, IsNil)

resp1, err := cli.StartSubTask(context.Background(), &pb.StartSubTaskRequest{
Expand Down Expand Up @@ -162,3 +166,39 @@ func (t *testServer) createClient(c *C, addr string) pb.WorkerClient {
c.Assert(err, IsNil)
return pb.NewWorkerClient(conn)
}

func (t *testServer) TestQueryError(c *C) {
cfg := NewConfig()
c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml"}), IsNil)

s := NewServer(cfg)
s.closed.Set(false)

w := &Worker{
cfg: cfg,
relayHolder: NewRelayHolder(cfg),
// initial relay holder, the cfg's password will be decrypted in NewRelayHolder
subTaskHolder: newSubTaskHolder(),
}
w.closed.Set(closedFalse)

subtaskCfg := config.SubTaskConfig{}
err := subtaskCfg.DecodeFile(subtaskSampleFile, true)
c.Assert(err, IsNil)

// subtask failed just after it is started
st := NewSubTask(&subtaskCfg)
st.fail(errors.New("mockSubtaskFail"))
w.subTaskHolder.recordSubTask(st)
s.worker = w

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
resp, err := s.QueryError(ctx, &pb.QueryErrorRequest{})
c.Assert(err, IsNil)
c.Assert(resp, NotNil)
c.Assert(resp.Result, IsTrue)
c.Assert(resp.Msg, HasLen, 0)
c.Assert(resp.SubTaskError, HasLen, 1)
c.Assert(resp.SubTaskError[0].String(), Matches, `[\s\S]*mockSubtaskFail[\s\S]*`)
}
48 changes: 31 additions & 17 deletions dm/worker/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sort"

"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/utils"

"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
Expand All @@ -27,12 +28,18 @@ import (

// Status returns the status of the current sub task
func (st *SubTask) Status() interface{} {
return st.CurrUnit().Status()
if cu := st.CurrUnit(); cu != nil {
return cu.Status()
}
return nil
}

// Error returns the error of the current sub task
func (st *SubTask) Error() interface{} {
return st.CurrUnit().Error()
if cu := st.CurrUnit(); cu != nil {
return cu.Error()
}
return nil
}

// StatusJSON returns the status of the current sub task as json string
Expand Down Expand Up @@ -133,7 +140,7 @@ func (w *Worker) Error(stName string) []*pb.SubTaskError {
return nil // no sub task started
}

error := make([]*pb.SubTaskError, 0, len(sts))
errs := make([]*pb.SubTaskError, 0, len(sts))

// return error order by name
names := make([]string, 0, len(sts))
Expand All @@ -158,26 +165,33 @@ func (w *Worker) Error(stName string) []*pb.SubTaskError {
stError = pb.SubTaskError{
Name: name,
Stage: st.Stage(),
Unit: cu.Type(),
}

// oneof error
us := cu.Error()
switch cu.Type() {
case pb.UnitType_Check:
stError.Error = &pb.SubTaskError_Check{Check: us.(*pb.CheckError)}
case pb.UnitType_Dump:
stError.Error = &pb.SubTaskError_Dump{Dump: us.(*pb.DumpError)}
case pb.UnitType_Load:
stError.Error = &pb.SubTaskError_Load{Load: us.(*pb.LoadError)}
case pb.UnitType_Sync:
stError.Error = &pb.SubTaskError_Sync{Sync: us.(*pb.SyncError)}
if cu != nil {
// one of error
stError.Unit = cu.Type()
us := cu.Error()
switch cu.Type() {
case pb.UnitType_Check:
stError.Error = &pb.SubTaskError_Check{Check: us.(*pb.CheckError)}
case pb.UnitType_Dump:
stError.Error = &pb.SubTaskError_Dump{Dump: us.(*pb.DumpError)}
case pb.UnitType_Load:
stError.Error = &pb.SubTaskError_Load{Load: us.(*pb.LoadError)}
case pb.UnitType_Sync:
stError.Error = &pb.SubTaskError_Sync{Sync: us.(*pb.SyncError)}
}
} else if result := st.Result(); result != nil {
processErrorMsg := utils.JoinProcessErrors(result.Errors)
if len(processErrorMsg) > 0 {
stError.Error = &pb.SubTaskError_Msg{Msg: processErrorMsg}
}
}
}
error = append(error, &stError)
errs = append(errs, &stError)
}

return error
return errs
}

// statusProcessResult returns a clone of *pb.ProcessResult, but omit the `Error` field, so no duplicated
Expand Down
16 changes: 15 additions & 1 deletion pkg/utils/string.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@

package utils

import "fmt"
import (
"fmt"
"strings"

"github.com/pingcap/dm/dm/pb"
)

const (
defaultStringLenLimit = 1024
Expand All @@ -38,3 +43,12 @@ func TruncateString(s string, n int) string {
func TruncateInterface(v interface{}, n int) string {
return TruncateString(fmt.Sprintf("%+v", v), n)
}

// JoinProcessErrors return the string of pb.ProcessErrors joined by ", "
func JoinProcessErrors(errors []*pb.ProcessError) string {
serrs := make([]string, 0, len(errors))
for _, serr := range errors {
serrs = append(serrs, serr.String())
}
return strings.Join(serrs, ", ")
}