Skip to content

Commit

Permalink
Fix the wrong task num metrics when there are some exception tasks
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG committed Dec 21, 2023
1 parent 76e4076 commit caf61d9
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 54 deletions.
4 changes: 2 additions & 2 deletions server/cdc_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (e *MetaCDC) ReloadTask() {
e.cdcTasks.data[taskInfo.TaskID] = taskInfo
e.cdcTasks.Unlock()

metrics.TaskNumVec.Add(taskInfo.State)
metrics.TaskNumVec.Add(taskInfo.TaskID, taskInfo.State)
metrics.TaskStateVec.WithLabelValues(taskInfo.TaskID).Set(float64(taskInfo.State))
if err := e.startInternal(taskInfo, taskInfo.State == meta.TaskStateRunning); err != nil {
log.Warn("fail to start the task", zap.Any("task_info", taskInfo), zap.Error(err))
Expand Down Expand Up @@ -279,7 +279,7 @@ func (e *MetaCDC) Create(req *request.CreateRequest) (resp *request.CreateRespon
revertCollectionNames()
return nil, servererror.NewServerError(errors.WithMessage(err, "fail to put the task info to etcd"))
}
metrics.TaskNumVec.Add(info.State)
metrics.TaskNumVec.Add(info.TaskID, info.State)
metrics.TaskStateVec.WithLabelValues(info.TaskID).Set(float64(info.State))
e.cdcTasks.Lock()
e.cdcTasks.data[info.TaskID] = info
Expand Down
10 changes: 8 additions & 2 deletions server/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,14 @@ var (
registry *prometheus.Registry

TaskNumVec = &TaskNumMetric{
metricDesc: prometheus.NewDesc(prometheus.BuildFQName(milvusNamespace, systemName, "task_num"),
"cdc task number", []string{taskStateLabelName}, nil),
metricDesc: prometheus.NewDesc(
prometheus.BuildFQName(milvusNamespace, systemName, "task_num"),
"cdc task number",
[]string{taskStateLabelName}, nil,
),
initialTaskMap: make(map[string]struct{}),
runningTaskMap: make(map[string]struct{}),
pauseTaskMap: make(map[string]struct{}),
}

TaskStateVec = prometheus.NewGaugeVec(
Expand Down
97 changes: 55 additions & 42 deletions server/metrics/metrics_task_num.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package metrics

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

Expand All @@ -27,63 +29,66 @@ import (
)

type TaskNumMetric struct {
metricDesc *prometheus.Desc
initialNum int
runningNum int
pauseNum int
metricDesc *prometheus.Desc
numLock sync.RWMutex
initialTaskMap map[string]struct{}
runningTaskMap map[string]struct{}
pauseTaskMap map[string]struct{}
}

// add it should be adapted if you modify the meta.MinTaskState or the meta.MaxTaskState
func (t *TaskNumMetric) add(s meta.TaskState, errTip string) {
innerLog := log.With(zap.String("tip", errTip), zap.Int("state", int(s)))
if !s.IsValidTaskState() {
innerLog.Warn("invalid task state")
return
}
switch s {
func (t *TaskNumMetric) getStateMap(state meta.TaskState) map[string]struct{} {
switch state {
case meta.TaskStateInitial:
t.initialNum++
return t.initialTaskMap
case meta.TaskStateRunning:
t.runningNum++
return t.runningTaskMap
case meta.TaskStatePaused:
t.pauseNum++
return t.pauseTaskMap
default:
innerLog.Warn("add, not handle the state")
return
return nil
}
}

// reduce it should be adapted if you modify the meta.MinTaskState or the meta.MaxTaskState
func (t *TaskNumMetric) reduce(s meta.TaskState, errTip string) {
innerLog := log.With(zap.String("tip", errTip), zap.Int("state", int(s)))
if !s.IsValidTaskState() {
innerLog.Warn("invalid task state")
func (t *TaskNumMetric) UpdateState(taskID string, newState meta.TaskState, oldStates meta.TaskState) {
t.numLock.Lock()
defer t.numLock.Unlock()

oldStatesMap := t.getStateMap(oldStates)
newStatesMap := t.getStateMap(newState)
if oldStatesMap == nil || newStatesMap == nil {
log.Warn("update state, not handle the state", zap.String("task", taskID),
zap.Any("old_state", oldStates), zap.Any("new_state", newState))
return
}
switch s {
case meta.TaskStateInitial:
t.initialNum--
case meta.TaskStateRunning:
t.runningNum--
case meta.TaskStatePaused:
t.pauseNum--
default:
innerLog.Warn("reduce, not handle the state", zap.Int("state", int(s)))

if _, ok := oldStatesMap[taskID]; !ok {
return
}
}

func (t *TaskNumMetric) UpdateState(newState meta.TaskState, oldStates meta.TaskState) {
t.add(newState, "update state, new state")
t.reduce(oldStates, "update state, old state")
delete(oldStatesMap, taskID)
newStatesMap[taskID] = struct{}{}
}

func (t *TaskNumMetric) Delete(state meta.TaskState) {
t.reduce(state, "delete")
func (t *TaskNumMetric) Delete(taskID string, state meta.TaskState) {
t.numLock.Lock()
defer t.numLock.Unlock()

stateMap := t.getStateMap(state)
if stateMap == nil {
return
}
delete(stateMap, taskID)
}

func (t *TaskNumMetric) Add(state meta.TaskState) {
t.add(state, "add state")
func (t *TaskNumMetric) Add(taskID string, state meta.TaskState) {
t.numLock.Lock()
defer t.numLock.Unlock()

stateMap := t.getStateMap(state)
if stateMap == nil {
return
}
stateMap[taskID] = struct{}{}
}

// Describe it should be adapted if you modify the meta.MinTaskState or the meta.MaxTaskState
Expand All @@ -93,12 +98,20 @@ func (t *TaskNumMetric) Describe(descs chan<- *prometheus.Desc) {
descs <- t.metricDesc
}

func (t *TaskNumMetric) getStateNum() (float64, float64, float64) {
t.numLock.RLock()
defer t.numLock.RUnlock()

return float64(len(t.initialTaskMap)), float64(len(t.runningTaskMap)), float64(len(t.pauseTaskMap))
}

// Collect it should be adapted if you modify the meta.MinTaskState or the meta.MaxTaskState
func (t *TaskNumMetric) Collect(metrics chan<- prometheus.Metric) {
initialNum, runningNum, pauseNum := t.getStateNum()
metrics <- prometheus.MustNewConstMetric(t.metricDesc, prometheus.GaugeValue,
float64(t.initialNum), meta.TaskStateInitial.String())
initialNum, meta.TaskStateInitial.String())
metrics <- prometheus.MustNewConstMetric(t.metricDesc, prometheus.GaugeValue,
float64(t.runningNum), meta.TaskStateRunning.String())
runningNum, meta.TaskStateRunning.String())
metrics <- prometheus.MustNewConstMetric(t.metricDesc, prometheus.GaugeValue,
float64(t.pauseNum), meta.TaskStatePaused.String())
pauseNum, meta.TaskStatePaused.String())
}
12 changes: 6 additions & 6 deletions server/metrics/metrics_task_num_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,22 @@ import (

func TestMetricTaskNum(t *testing.T) {
m := &TaskNumMetric{}
m.Add(meta.TaskStateInitial)
m.Add("0", meta.TaskStateInitial)
assertNum(t, m, 1, 0, 0)

m.Add(meta.TaskStateRunning)
m.Add("1", meta.TaskStateRunning)
assertNum(t, m, 1, 1, 0)

m.UpdateState(meta.TaskStatePaused, meta.TaskStateRunning)
m.UpdateState("1", meta.TaskStatePaused, meta.TaskStateRunning)
assertNum(t, m, 1, 0, 1)

m.UpdateState(meta.TaskStateRunning, meta.TaskStateInitial)
m.UpdateState("0", meta.TaskStateRunning, meta.TaskStateInitial)
assertNum(t, m, 0, 1, 1)

m.Delete(meta.TaskStatePaused)
m.Delete("1", meta.TaskStatePaused)
assertNum(t, m, 0, 1, 0)

m.UpdateState(meta.MinTaskState-1, meta.MaxTaskState+1)
m.UpdateState("2", meta.MinTaskState-1, meta.MaxTaskState+1)
assertNum(t, m, 0, 1, 0)
}

Expand Down
4 changes: 2 additions & 2 deletions server/store/meta_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func UpdateTaskState(taskInfoStore api.MetaStore[*meta.TaskInfo], taskID string,
return err
}
metrics.TaskStateVec.WithLabelValues(taskID).Set(float64(newState))
metrics.TaskNumVec.UpdateState(newState, oldState)
metrics.TaskNumVec.UpdateState(taskID, newState, oldState)
return nil
}

Expand Down Expand Up @@ -198,7 +198,7 @@ func DeleteTask(factory api.MetaStoreFactory, taskID string) (*meta.TaskInfo, er
if err == nil {
commitErr := commitFunc(err)
if commitErr == nil {
metrics.TaskNumVec.Delete(info.State)
metrics.TaskNumVec.Delete(info.TaskID, info.State)
metrics.TaskStateVec.WithLabelValues(info.TaskID).Set(-1)
return info, nil
}
Expand Down

0 comments on commit caf61d9

Please sign in to comment.