Skip to content

Commit

Permalink
Emit task list lag metric in matching
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Mar 12, 2021
1 parent 1de8cd4 commit 9b8d3a2
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 10 deletions.
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1961,6 +1961,7 @@ const (
RemoteToRemoteMatchPerTaskListCounter
PollerPerTaskListCounter
TaskListManagersGauge
TaskLagPerTaskListGauge

NumMatchingMetrics
)
Expand Down Expand Up @@ -2443,6 +2444,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
RemoteToRemoteMatchPerTaskListCounter: {metricName: "remote_to_remote_matches_per_tl", metricRollupName: "remote_to_remote_matches"},
PollerPerTaskListCounter: {metricName: "poller_count_per_tl", metricRollupName: "poller_count"},
TaskListManagersGauge: {metricName: "tasklist_managers", metricType: Gauge},
TaskLagPerTaskListGauge: {metricName: "task_lag_per_tl", metricType: Gauge},
},
Worker: {
ReplicatorMessages: {metricName: "replicator_messages"},
Expand Down
4 changes: 3 additions & 1 deletion common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

package metrics

import "strconv"
import (
"strconv"
)

const (
revisionTag = "revision"
Expand Down
26 changes: 18 additions & 8 deletions service/matching/taskListManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ const (
returnEmptyTaskTimeBudget time.Duration = time.Second
)

var taskListActivityTypeTag = metrics.TaskListTypeTag("activity")
var taskListDecisionTypeTag = metrics.TaskListTypeTag("decision")
var (
taskListActivityTypeTag = metrics.TaskListTypeTag("activity")
taskListDecisionTypeTag = metrics.TaskListTypeTag("decision")
)

type (
addTaskParams struct {
Expand Down Expand Up @@ -168,12 +170,9 @@ func newTaskListManager(
metrics.MatchingTaskListMgrScope,
))
}
var taskListTypeMetricScope metrics.Scope
if taskList.taskType == persistence.TaskListTypeActivity {
taskListTypeMetricScope = tlMgr.metricScope().Tagged(taskListActivityTypeTag)
} else {
taskListTypeMetricScope = tlMgr.metricScope().Tagged(taskListDecisionTypeTag)
}
taskListTypeMetricScope := tlMgr.metricScope().Tagged(
getTaskListTypeTag(taskList.taskType),
)
tlMgr.pollerHistory = newPollerHistory(func() {
taskListTypeMetricScope.UpdateGauge(metrics.PollerPerTaskListCounter,
float64(len(tlMgr.pollerHistory.getAllPollerInfo())))
Expand Down Expand Up @@ -581,6 +580,17 @@ func (c *taskListManagerImpl) tryInitDomainNameAndScope() {
c.domainNameValue.Store(domainName)
}

func getTaskListTypeTag(taskListType int) metrics.Tag {
switch taskListType {
case persistence.TaskListTypeActivity:
return taskListActivityTypeTag
case persistence.TaskListTypeDecision:
return taskListDecisionTypeTag
default:
return metrics.TaskListTypeTag("")
}
}

func createServiceBusyError(msg string) *types.ServiceBusyError {
return &types.ServiceBusyError{Message: msg}
}
9 changes: 8 additions & 1 deletion service/matching/taskReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,14 @@ func (tr *taskReader) addSingleTaskToBuffer(
}

func (tr *taskReader) persistAckLevel() error {
return tr.tlMgr.db.UpdateState(tr.tlMgr.taskAckManager.GetAckLevel())
ackLevel := tr.tlMgr.taskAckManager.GetAckLevel()
maxReadLevel := tr.tlMgr.taskWriter.GetMaxReadLevel()
scope := tr.scope().Tagged(getTaskListTypeTag(tr.tlMgr.taskListID.taskType))
// note: this metrics is only an estimation for the lag. taskID in DB may not be continuous,
// especially when task list ownership changes.
scope.UpdateGauge(metrics.TaskLagPerTaskListGauge, float64(maxReadLevel-ackLevel))

return tr.tlMgr.db.UpdateState(ackLevel)
}

func (tr *taskReader) isTaskAddedRecently(lastAddTime time.Time) bool {
Expand Down

0 comments on commit 9b8d3a2

Please sign in to comment.