Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
worker: add metrics (#772)
Browse files Browse the repository at this point in the history
* worker: add metrics

* worker: fix typo

* worker: pass make check

* worker: fix broken code

* worker: address comments
  • Loading branch information
lance6716 authored Jul 3, 2020
1 parent 41790a9 commit 381f202
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 36 deletions.
21 changes: 18 additions & 3 deletions dm/worker/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,28 @@ import (
"github.com/pingcap/dm/syncer"
)

const (
opErrTypeBeforeOp = "BeforeAnyOp"
opErrTypeSourceBound = "SourceBound"
)

var (
taskState = metricsproxy.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "worker",
Name: "task_state",
Help: "state of task, 0 - invalidStage, 1 - New, 2 - Running, 3 - Paused, 4 - Stopped, 5 - Finished",
}, []string{"task"})
}, []string{"task", "source_id"})

// opErrCounter cleans on worker close, which is the same time dm-worker exits, so no explicit clean
opErrCounter = metricsproxy.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "worker",
Name: "operate_error",
Help: "number of different operate error",
}, []string{"worker", "type"})

cpuUsageGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Expand Down Expand Up @@ -93,6 +107,7 @@ func RegistryMetrics() {
registry.MustRegister(prometheus.NewGoCollector())

registry.MustRegister(taskState)
registry.MustRegister(opErrCounter)
registry.MustRegister(cpuUsageGauge)

relay.RegisterMetrics(registry)
Expand Down Expand Up @@ -123,6 +138,6 @@ func InitStatus(lis net.Listener) {
}
}

func (st *SubTask) removeLabelValuesWithTaskInMetrics(task string) {
taskState.DeleteAllAboutLabels(prometheus.Labels{"task": task})
func (st *SubTask) removeLabelValuesWithTaskInMetrics(task string, source string) {
taskState.DeleteAllAboutLabels(prometheus.Labels{"task": task, "source_id": source})
}
4 changes: 2 additions & 2 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (s *Server) handleSourceBound(ctx context.Context, boundCh chan ha.SourceBo
s.setSourceStatus(bound.Source, err, true)
if err != nil {
// record the reason for operating source bound
// TODO: add better metrics
opErrCounter.WithLabelValues(s.cfg.Name, opErrTypeSourceBound).Inc()
log.L().Error("fail to operate sourceBound on worker", zap.String("worker", s.cfg.Name),
zap.Stringer("bound", bound), zap.Error(err))
if etcdutil.IsRetryableError(err) {
Expand Down Expand Up @@ -743,7 +743,7 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error {

log.L().Info("start worker", zap.String("sourceCfg", cfg.String()), zap.Reflect("subTasks", subTaskCfgs))

w, err := NewWorker(cfg, s.etcdClient)
w, err := NewWorker(cfg, s.etcdClient, s.cfg.Name)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func (t *testServer) TestQueryError(c *C) {

sourceCfg := loadSourceConfigWithoutPassword(c)
sourceCfg.EnableRelay = false
w, err := NewWorker(&sourceCfg, nil)
w, err := NewWorker(&sourceCfg, nil, "")
c.Assert(err, IsNil)
w.closed.Set(closedFalse)

Expand Down
10 changes: 5 additions & 5 deletions dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func NewSubTaskWithStage(cfg *config.SubTaskConfig, stage pb.Stage, etcdClient *
cancel: cancel,
etcdClient: etcdClient,
}
taskState.WithLabelValues(st.cfg.Name).Set(float64(st.stage))
taskState.WithLabelValues(st.cfg.Name, st.cfg.SourceID).Set(float64(st.stage))
return &st
}

Expand Down Expand Up @@ -371,7 +371,7 @@ func (st *SubTask) setStage(stage pb.Stage) {
st.Lock()
defer st.Unlock()
st.stage = stage
taskState.WithLabelValues(st.cfg.Name).Set(float64(st.stage))
taskState.WithLabelValues(st.cfg.Name, st.cfg.SourceID).Set(float64(st.stage))
}

// stageCAS sets stage to newStage if its current value is oldStage
Expand All @@ -381,7 +381,7 @@ func (st *SubTask) stageCAS(oldStage, newStage pb.Stage) bool {

if st.stage == oldStage {
st.stage = newStage
taskState.WithLabelValues(st.cfg.Name).Set(float64(st.stage))
taskState.WithLabelValues(st.cfg.Name, st.cfg.SourceID).Set(float64(st.stage))
return true
}
return false
Expand All @@ -393,7 +393,7 @@ func (st *SubTask) setStageIfNot(oldStage, newStage pb.Stage) bool {
defer st.Unlock()
if st.stage != oldStage {
st.stage = newStage
taskState.WithLabelValues(st.cfg.Name).Set(float64(st.stage))
taskState.WithLabelValues(st.cfg.Name, st.cfg.SourceID).Set(float64(st.stage))
return true
}
return false
Expand Down Expand Up @@ -429,7 +429,7 @@ func (st *SubTask) Close() {

st.cancel()
st.closeUnits() // close all un-closed units
st.removeLabelValuesWithTaskInMetrics(st.cfg.Name)
st.removeLabelValuesWithTaskInMetrics(st.cfg.Name, st.cfg.SourceID)
st.wg.Wait()
st.setStageIfNot(pb.Stage_Finished, pb.Stage_Stopped)
}
Expand Down
4 changes: 2 additions & 2 deletions dm/worker/task_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (s *testTaskCheckerSuite) TestCheck(c *check.C) {
cfg := loadSourceConfigWithoutPassword(c)
cfg.RelayDir = dir
cfg.MetaDir = dir
w, err := NewWorker(&cfg, nil)
w, err := NewWorker(&cfg, nil, "")
c.Assert(err, check.IsNil)
w.closed.Set(closedFalse)

Expand Down Expand Up @@ -207,7 +207,7 @@ func (s *testTaskCheckerSuite) TestCheckTaskIndependent(c *check.C) {
cfg := loadSourceConfigWithoutPassword(c)
cfg.RelayDir = dir
cfg.MetaDir = dir
w, err := NewWorker(&cfg, nil)
w, err := NewWorker(&cfg, nil, "")
c.Assert(err, check.IsNil)
w.closed.Set(closedFalse)

Expand Down
47 changes: 28 additions & 19 deletions dm/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,19 @@ type Worker struct {
configFile string

etcdClient *clientv3.Client

name string
}

// NewWorker creates a new Worker
func NewWorker(cfg *config.SourceConfig, etcdClient *clientv3.Client) (w *Worker, err error) {
func NewWorker(cfg *config.SourceConfig, etcdClient *clientv3.Client, name string) (w *Worker, err error) {
w = &Worker{
cfg: cfg,
tracer: tracing.InitTracerHub(cfg.Tracer),
subTaskHolder: newSubTaskHolder(),
l: log.With(zap.String("component", "worker controller")),
etcdClient: etcdClient,
name: name,
}
w.ctx, w.cancel = context.WithCancel(context.Background())
w.closed.Set(closedTrue)
Expand Down Expand Up @@ -330,11 +333,12 @@ func (w *Worker) resetSubtaskStage(etcdCli *clientv3.Client) (int64, error) {
// TODO: right operation sequences may get error when we get etcdErrCompact, need to handle it later
// For example, Expect: Running -(pause)-> Paused -(resume)-> Running
// we get an etcd compact error at the first running. If we try to "resume" it now, we will get an error
err = w.operateSubTaskStage(stage, subtaskCfg)
opType, err := w.operateSubTaskStage(stage, subtaskCfg)
if err != nil {
// TODO: add better metrics
opErrCounter.WithLabelValues(w.name, opType).Inc()
log.L().Error("fail to operate subtask stage", zap.Stringer("stage", stage),
zap.String("task", subtaskCfg.Name), zap.Error(err))

}
delete(sts, name)
}
Expand All @@ -343,7 +347,7 @@ func (w *Worker) resetSubtaskStage(etcdCli *clientv3.Client) (int64, error) {
for name := range sts {
err = w.OperateSubTask(name, pb.TaskOp_Stop)
if err != nil {
// TODO: add better metrics
opErrCounter.WithLabelValues(w.name, pb.TaskOp_Stop.String()).Inc()
log.L().Error("fail to stop subtask", zap.String("task", name), zap.Error(err))
}
}
Expand Down Expand Up @@ -404,9 +408,9 @@ func (w *Worker) handleSubTaskStage(ctx context.Context, stageCh chan ha.Stage,
log.L().Info("worker is closed, handleSubTaskStage will quit now")
return nil
case stage := <-stageCh:
err := w.operateSubTaskStageWithoutConfig(stage)
opType, err := w.operateSubTaskStageWithoutConfig(stage)
if err != nil {
// TODO: add better metrics
opErrCounter.WithLabelValues(w.name, opType).Inc()
log.L().Error("fail to operate subtask stage", zap.Stringer("stage", stage), zap.Error(err))
if etcdutil.IsRetryableError(err) {
return err
Expand All @@ -422,36 +426,39 @@ func (w *Worker) handleSubTaskStage(ctx context.Context, stageCh chan ha.Stage,
}
}

func (w *Worker) operateSubTaskStage(stage ha.Stage, subTaskCfg config.SubTaskConfig) error {
// operateSubTaskStage returns TaskOp.String() additionally to record metrics
func (w *Worker) operateSubTaskStage(stage ha.Stage, subTaskCfg config.SubTaskConfig) (string, error) {
var op pb.TaskOp
switch {
case stage.Expect == pb.Stage_Running:
if st := w.subTaskHolder.findSubTask(stage.Task); st == nil {
w.StartSubTask(&subTaskCfg)
log.L().Info("load subtask", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name))
return nil
// error is nil, opErrTypeBeforeOp will be ignored
return opErrTypeBeforeOp, nil
}
op = pb.TaskOp_Resume
case stage.Expect == pb.Stage_Paused:
op = pb.TaskOp_Pause
case stage.IsDeleted:
op = pb.TaskOp_Stop
}
return w.OperateSubTask(stage.Task, op)
return op.String(), w.OperateSubTask(stage.Task, op)
}

func (w *Worker) operateSubTaskStageWithoutConfig(stage ha.Stage) error {
// operateSubTaskStageWithoutConfig returns TaskOp additionally to record metrics
func (w *Worker) operateSubTaskStageWithoutConfig(stage ha.Stage) (string, error) {
var subTaskCfg config.SubTaskConfig
if stage.Expect == pb.Stage_Running {
if st := w.subTaskHolder.findSubTask(stage.Task); st == nil {
tsm, _, err := ha.GetSubTaskCfg(w.etcdClient, stage.Source, stage.Task, stage.Revision)
if err != nil {
// TODO: need retry
return terror.Annotate(err, "fail to get subtask config from etcd")
return opErrTypeBeforeOp, terror.Annotate(err, "fail to get subtask config from etcd")
}
var ok bool
if subTaskCfg, ok = tsm[stage.Task]; !ok {
return terror.ErrWorkerFailToGetSubtaskConfigFromEtcd.Generate(stage.Task)
return opErrTypeBeforeOp, terror.ErrWorkerFailToGetSubtaskConfigFromEtcd.Generate(stage.Task)
}
}
}
Expand Down Expand Up @@ -495,9 +502,9 @@ func (w *Worker) observeRelayStage(ctx context.Context, etcdCli *clientv3.Client
if stage.IsEmpty() {
stage.IsDeleted = true
}
err1 = w.operateRelayStage(ctx, stage)
opType, err1 := w.operateRelayStage(ctx, stage)
if err1 != nil {
// TODO: add better metrics
opErrCounter.WithLabelValues(w.name, opType).Inc()
log.L().Error("fail to operate relay", zap.Stringer("stage", stage), zap.Error(err1))
}
}
Expand All @@ -521,9 +528,9 @@ func (w *Worker) handleRelayStage(ctx context.Context, stageCh chan ha.Stage, er
log.L().Info("worker is closed, handleRelayStage will quit now")
return nil
case stage := <-stageCh:
err := w.operateRelayStage(ctx, stage)
opType, err := w.operateRelayStage(ctx, stage)
if err != nil {
// TODO: add better metrics
opErrCounter.WithLabelValues(w.name, opType).Inc()
log.L().Error("fail to operate relay", zap.Stringer("stage", stage), zap.Error(err))
}
case err := <-errCh:
Expand All @@ -535,22 +542,24 @@ func (w *Worker) handleRelayStage(ctx context.Context, stageCh chan ha.Stage, er
}
}

func (w *Worker) operateRelayStage(ctx context.Context, stage ha.Stage) error {
// operateRelayStage returns RelayOp.String() additionally to record metrics
// *RelayOp is nil only when error is nil, so record on error will not meet nil-pointer deference
func (w *Worker) operateRelayStage(ctx context.Context, stage ha.Stage) (string, error) {
var op pb.RelayOp
switch {
case stage.Expect == pb.Stage_Running:
if w.relayHolder.Stage() == pb.Stage_New {
w.relayHolder.Start()
w.relayPurger.Start()
return nil
return opErrTypeBeforeOp, nil
}
op = pb.RelayOp_ResumeRelay
case stage.Expect == pb.Stage_Paused:
op = pb.RelayOp_PauseRelay
case stage.IsDeleted:
op = pb.RelayOp_StopRelay
}
return w.OperateRelay(ctx, &pb.OperateRelayRequest{Op: op})
return op.String(), w.OperateRelay(ctx, &pb.OperateRelayRequest{Op: op})
}

// HandleSQLs implements Handler.HandleSQLs.
Expand Down
8 changes: 4 additions & 4 deletions dm/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ func (t *testServer) testWorker(c *C) {
NewRelayHolder = NewRealRelayHolder
}()

_, err := NewWorker(&cfg, nil)
_, err := NewWorker(&cfg, nil, "")
c.Assert(err, ErrorMatches, "init error")

NewRelayHolder = NewDummyRelayHolder
w, err := NewWorker(&cfg, nil)
w, err := NewWorker(&cfg, nil, "")
c.Assert(err, IsNil)
c.Assert(w.StatusJSON(""), HasLen, emptyWorkerStatusInfoJSONLength)
//c.Assert(w.closed.Get(), Equals, closedFalse)
Expand Down Expand Up @@ -226,7 +226,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) {
sourceCfg.EnableRelay = false

// step 1: start worker
w, err := NewWorker(&sourceCfg, etcdCli)
w, err := NewWorker(&sourceCfg, etcdCli, "")
c.Assert(err, IsNil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -339,7 +339,7 @@ func (t *testWorkerEtcdCompact) TestWatchRelayStageEtcdCompact(c *C) {
sourceCfg.MetaDir = c.MkDir()

// step 1: start worker
w, err := NewWorker(&sourceCfg, etcdCli)
w, err := NewWorker(&sourceCfg, etcdCli, "")
c.Assert(err, IsNil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down

0 comments on commit 381f202

Please sign in to comment.