Skip to content

Commit

Permalink
Add more metrics and log, remove some dead code (#893)
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 authored Jun 26, 2018
1 parent e5f07f2 commit 9ea16c0
Show file tree
Hide file tree
Showing 17 changed files with 258 additions and 195 deletions.
132 changes: 92 additions & 40 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,32 +364,64 @@ const (
HistoryShardControllerScope
// TransferQueueProcessorScope is the scope used by all metric emitted by transfer queue processor
TransferQueueProcessorScope
// TransferTaskActivityScope is the scope used for activity task processing by transfer queue processor
TransferTaskActivityScope
// TransferTaskDecisionScope is the scope used for decision task processing by transfer queue processor
TransferTaskDecisionScope
// TransferTaskCloseExecutionScope is the scope used for close execution task processing by transfer queue processor
TransferTaskCloseExecutionScope
// TransferTaskCancelExecutionScope is the scope used for cancel execution task processing by transfer queue processor
TransferTaskCancelExecutionScope
// TransferTaskSignalExecutionScope is the scope used for signal execution task processing by transfer queue processor
TransferTaskSignalExecutionScope
// TransferTaskStartChildExecutionScope is the scope used for start child execution task processing by transfer queue processor
TransferTaskStartChildExecutionScope
// TransferActiveQueueProcessorScope is the scope used by all metric emitted by transfer queue processor
TransferActiveQueueProcessorScope
// TransferStandbyQueueProcessorScope is the scope used by all metric emitted by transfer queue processor
TransferStandbyQueueProcessorScope
// TransferActiveTaskActivityScope is the scope used for activity task processing by transfer queue processor
TransferActiveTaskActivityScope
// TransferActiveTaskDecisionScope is the scope used for decision task processing by transfer queue processor
TransferActiveTaskDecisionScope
// TransferActiveTaskCloseExecutionScope is the scope used for close execution task processing by transfer queue processor
TransferActiveTaskCloseExecutionScope
// TransferActiveTaskCancelExecutionScope is the scope used for cancel execution task processing by transfer queue processor
TransferActiveTaskCancelExecutionScope
// TransferActiveTaskSignalExecutionScope is the scope used for signal execution task processing by transfer queue processor
TransferActiveTaskSignalExecutionScope
// TransferActiveTaskStartChildExecutionScope is the scope used for start child execution task processing by transfer queue processor
TransferActiveTaskStartChildExecutionScope
// TransferStandbyTaskActivityScope is the scope used for activity task processing by transfer queue processor
TransferStandbyTaskActivityScope
// TransferStandbyTaskDecisionScope is the scope used for decision task processing by transfer queue processor
TransferStandbyTaskDecisionScope
// TransferStandbyTaskCloseExecutionScope is the scope used for close execution task processing by transfer queue processor
TransferStandbyTaskCloseExecutionScope
// TransferStandbyTaskCancelExecutionScope is the scope used for cancel execution task processing by transfer queue processor
TransferStandbyTaskCancelExecutionScope
// TransferStandbyTaskSignalExecutionScope is the scope used for signal execution task processing by transfer queue processor
TransferStandbyTaskSignalExecutionScope
// TransferStandbyTaskStartChildExecutionScope is the scope used for start child execution task processing by transfer queue processor
TransferStandbyTaskStartChildExecutionScope
// TimerQueueProcessorScope is the scope used by all metric emitted by timer queue processor
TimerQueueProcessorScope
// TimerTaskActivityTimeoutScope is the scope used by metric emitted by timer queue processor for processing activity timeouts
TimerTaskActivityTimeoutScope
// TimerTaskDecisionTimeoutScope is the scope used by metric emitted by timer queue processor for processing decision timeouts
TimerTaskDecisionTimeoutScope
// TimerTaskUserTimerScope is the scope used by metric emitted by timer queue processor for processing user timers
TimerTaskUserTimerScope
// TimerTaskWorkflowTimeoutScope is the scope used by metric emitted by timer queue processor for processing workflow timeouts.
TimerTaskWorkflowTimeoutScope
// TimerTaskRetryTimerScope is the scope used by metric emitted by timer queue processor for processing retry task.
TimerTaskRetryTimerScope
// TimerTaskDeleteHistoryEvent is the scope used by metric emitted by timer queue processor for processing history event cleanup
TimerTaskDeleteHistoryEvent
// TimerActiveQueueProcessorScope is the scope used by all metric emitted by timer queue processor
TimerActiveQueueProcessorScope
// TimerQueueProcessorScope is the scope used by all metric emitted by timer queue processor
TimerStandbyQueueProcessorScope
// TimerActiveTaskActivityTimeoutScope is the scope used by metric emitted by timer queue processor for processing activity timeouts
TimerActiveTaskActivityTimeoutScope
// TimerActiveTaskDecisionTimeoutScope is the scope used by metric emitted by timer queue processor for processing decision timeouts
TimerActiveTaskDecisionTimeoutScope
// TimerActiveTaskUserTimerScope is the scope used by metric emitted by timer queue processor for processing user timers
TimerActiveTaskUserTimerScope
// TimerActiveTaskWorkflowTimeoutScope is the scope used by metric emitted by timer queue processor for processing workflow timeouts.
TimerActiveTaskWorkflowTimeoutScope
// TimerActiveTaskRetryTimerScope is the scope used by metric emitted by timer queue processor for processing retry task.
TimerActiveTaskRetryTimerScope
// TimerActiveTaskDeleteHistoryEvent is the scope used by metric emitted by timer queue processor for processing history event cleanup
TimerActiveTaskDeleteHistoryEvent
// TimerStandbyTaskActivityTimeoutScope is the scope used by metric emitted by timer queue processor for processing activity timeouts
TimerStandbyTaskActivityTimeoutScope
// TimerStandbyTaskDecisionTimeoutScope is the scope used by metric emitted by timer queue processor for processing decision timeouts
TimerStandbyTaskDecisionTimeoutScope
// TimerStandbyTaskUserTimerScope is the scope used by metric emitted by timer queue processor for processing user timers
TimerStandbyTaskUserTimerScope
// TimerStandbyTaskWorkflowTimeoutScope is the scope used by metric emitted by timer queue processor for processing workflow timeouts.
TimerStandbyTaskWorkflowTimeoutScope
// TimerStandbyTaskRetryTimerScope is the scope used by metric emitted by timer queue processor for processing retry task.
TimerStandbyTaskRetryTimerScope
// TimerStandbyTaskDeleteHistoryEvent is the scope used by metric emitted by timer queue processor for processing history event cleanup
TimerStandbyTaskDeleteHistoryEvent
// HistoryEventNotificationScope is the scope used by shard history event nitification
HistoryEventNotificationScope
// ReplicatorQueueProcessorScope is the scope used by all metric emitted by replicator queue processor
Expand Down Expand Up @@ -568,19 +600,35 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
HistoryReplicateEventsScope: {operation: "ReplicateEvents"},
HistoryShardControllerScope: {operation: "ShardController"},
TransferQueueProcessorScope: {operation: "TransferQueueProcessor"},
TransferTaskActivityScope: {operation: "TransferTaskActivity"},
TransferTaskDecisionScope: {operation: "TransferTaskDecision"},
TransferTaskCloseExecutionScope: {operation: "TransferTaskCloseExecution"},
TransferTaskCancelExecutionScope: {operation: "TransferTaskCancelExecution"},
TransferTaskSignalExecutionScope: {operation: "TransferTaskSignalExecution"},
TransferTaskStartChildExecutionScope: {operation: "TransferTaskStartChildExecution"},
TransferActiveQueueProcessorScope: {operation: "TransferActiveQueueProcessor"},
TransferStandbyQueueProcessorScope: {operation: "TransferStandbyQueueProcessor"},
TransferActiveTaskActivityScope: {operation: "TransferActiveTaskActivity"},
TransferActiveTaskDecisionScope: {operation: "TransferActiveTaskDecision"},
TransferActiveTaskCloseExecutionScope: {operation: "TransferActiveTaskCloseExecution"},
TransferActiveTaskCancelExecutionScope: {operation: "TransferActiveTaskCancelExecution"},
TransferActiveTaskSignalExecutionScope: {operation: "TransferActiveTaskSignalExecution"},
TransferActiveTaskStartChildExecutionScope: {operation: "TransferActiveTaskStartChildExecution"},
TransferStandbyTaskActivityScope: {operation: "TransferStandbyTaskActivity"},
TransferStandbyTaskDecisionScope: {operation: "TransferStandbyTaskDecision"},
TransferStandbyTaskCloseExecutionScope: {operation: "TransferStandbyTaskCloseExecution"},
TransferStandbyTaskCancelExecutionScope: {operation: "TransferStandbyTaskCancelExecution"},
TransferStandbyTaskSignalExecutionScope: {operation: "TransferStandbyTaskSignalExecution"},
TransferStandbyTaskStartChildExecutionScope: {operation: "TransferStandbyTaskStartChildExecution"},
TimerQueueProcessorScope: {operation: "TimerQueueProcessor"},
TimerTaskActivityTimeoutScope: {operation: "TimerTaskActivityTimeout"},
TimerTaskDecisionTimeoutScope: {operation: "TimerTaskDecisionTimeout"},
TimerTaskUserTimerScope: {operation: "TimerTaskUserTimer"},
TimerTaskWorkflowTimeoutScope: {operation: "TimerTaskWorkflowTimeout"},
TimerTaskRetryTimerScope: {operation: "TimerTaskRetryTimer"},
TimerTaskDeleteHistoryEvent: {operation: "TimerTaskDeleteHistoryEvent"},
TimerActiveQueueProcessorScope: {operation: "TimerActiveQueueProcessor"},
TimerStandbyQueueProcessorScope: {operation: "TimerStandbyQueueProcessor"},
TimerActiveTaskActivityTimeoutScope: {operation: "TimerActiveTaskActivityTimeout"},
TimerActiveTaskDecisionTimeoutScope: {operation: "TimerActiveTaskDecisionTimeout"},
TimerActiveTaskUserTimerScope: {operation: "TimerActiveTaskUserTimer"},
TimerActiveTaskWorkflowTimeoutScope: {operation: "TimerActiveTaskWorkflowTimeout"},
TimerActiveTaskRetryTimerScope: {operation: "TimerActiveTaskRetryTimer"},
TimerActiveTaskDeleteHistoryEvent: {operation: "TimerActiveTaskDeleteHistoryEvent"},
TimerStandbyTaskActivityTimeoutScope: {operation: "TimerStandbyTaskActivityTimeout"},
TimerStandbyTaskDecisionTimeoutScope: {operation: "TimerStandbyTaskDecisionTimeout"},
TimerStandbyTaskUserTimerScope: {operation: "TimerStandbyTaskUserTimer"},
TimerStandbyTaskWorkflowTimeoutScope: {operation: "TimerStandbyTaskWorkflowTimeout"},
TimerStandbyTaskRetryTimerScope: {operation: "TimerStandbyTaskRetryTimer"},
TimerStandbyTaskDeleteHistoryEvent: {operation: "TimerStandbyTaskDeleteHistoryEvent"},
HistoryEventNotificationScope: {operation: "HistoryEventNotification"},
ReplicatorQueueProcessorScope: {operation: "ReplicatorQueueProcessor"},
ReplicatorTaskHistoryScope: {operation: "ReplicatorTaskHistory"},
Expand Down Expand Up @@ -667,8 +715,7 @@ const (
ScheduleToStartTimeoutCounter
StartToCloseTimeoutCounter
ScheduleToCloseTimeoutCounter
NewActiveTimerCounter
NewStandbyTimerCounter
NewTimerCounter
NewTimerNotifyCounter
AcquireShardsCounter
AcquireShardsLatency
Expand All @@ -689,6 +736,9 @@ const (
StaleReplicationEventsCounter
BufferedReplicationTaskCounter
HistoryConflictsCounter
HistoryTaskStandbyRetryCounter
HistoryTaskNotActiveCounter
HistoryTaskBatchCompleteCounter
)

// Matching metrics enum
Expand Down Expand Up @@ -769,8 +819,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ScheduleToStartTimeoutCounter: {metricName: "schedule-to-start-timeout", metricType: Counter},
StartToCloseTimeoutCounter: {metricName: "start-to-close-timeout", metricType: Counter},
ScheduleToCloseTimeoutCounter: {metricName: "schedule-to-close-timeout", metricType: Counter},
NewActiveTimerCounter: {metricName: "new-active-timer", metricType: Counter},
NewStandbyTimerCounter: {metricName: "new-standby-timer", metricType: Counter},
NewTimerCounter: {metricName: "new-timer", metricType: Counter},
NewTimerNotifyCounter: {metricName: "new-timer-notifications", metricType: Counter},
AcquireShardsCounter: {metricName: "acquire-shards-count", metricType: Counter},
AcquireShardsLatency: {metricName: "acquire-shards-latency", metricType: Timer},
Expand All @@ -791,6 +840,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
StaleReplicationEventsCounter: {metricName: "stale-replication-events", metricType: Counter},
BufferedReplicationTaskCounter: {metricName: "buffered-replication-tasks", metricType: Counter},
HistoryConflictsCounter: {metricName: "history-conflicts", metricType: Counter},
HistoryTaskStandbyRetryCounter: {metricName: "history-task-standby-retry-counter", metricType: Counter},
HistoryTaskNotActiveCounter: {metricName: "history-task-not-active-counter", metricType: Counter},
HistoryTaskBatchCompleteCounter: {metricName: "history-task-batch-complete-counter", metricType: Counter},
},
Matching: {
PollSuccessCounter: {metricName: "poll.success"},
Expand Down
17 changes: 7 additions & 10 deletions service/history/queueAckMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ type (
readLevel int64
ackLevel int64
isReadFinished bool
// number of finished and acked tasks, used to reduce # of calls to update shard
finishedTaskCounter int
}
)

Expand Down Expand Up @@ -169,6 +167,8 @@ func (a *queueAckMgrImpl) updateQueueAckLevel() {
a.Lock()
ackLevel := a.ackLevel

a.logger.Debugf("Moving timer ack level from %v, with %v.", ackLevel, a.outstandingTasks)

// task ID is not sequancial, meaning there are a ton of missing chunks,
// so to optimize the performance, a sort is required
var taskIDs []int64
Expand All @@ -181,28 +181,25 @@ MoveAckLevelLoop:
for _, current := range taskIDs {
acked := a.outstandingTasks[current]
if acked {
a.logger.Debugf("Updating ack level: %v", current)
ackLevel = current
a.finishedTaskCounter++
delete(a.outstandingTasks, current)
a.logger.Debugf("Moving timer ack level to %v.", ackLevel)
} else {
break MoveAckLevelLoop
}
}
updateShard := a.ackLevel != ackLevel
a.ackLevel = ackLevel

if a.isFailover && a.isReadFinished && len(a.outstandingTasks) == 0 {
// this means in failover mode, all possible failover transfer tasks
// are processed and we are free to shundown
a.logger.Debugf("Queue ack manager shutdoen.")
a.finishedChan <- struct{}{}
}
a.Unlock()

if a.finishedTaskCounter < a.options.UpdateShardTaskCount() {
a.Unlock()
} else {
a.finishedTaskCounter = 0
a.Unlock()

if updateShard {
if !a.isFailover {
if err := a.processor.updateAckLevel(ackLevel); err != nil {
a.metricsClient.IncCounter(a.options.MetricScope, metrics.AckLevelUpdateFailedCounter)
Expand Down
10 changes: 4 additions & 6 deletions service/history/queueAckMgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ func (s *queueAckMgrSuite) SetupTest() {
s.mockShard.config.ShardUpdateMinInterval = dynamicconfig.GetDurationPropertyFn(0 * time.Second)

s.queueAckMgr = newQueueAckMgr(s.mockShard, &QueueProcessorOptions{
UpdateShardTaskCount: dynamicconfig.GetIntPropertyFn(1),
MetricScope: metrics.ReplicatorQueueProcessorScope,
MetricScope: metrics.ReplicatorQueueProcessorScope,
}, s.mockProcessor, 0, s.logger)
}

Expand Down Expand Up @@ -279,7 +278,7 @@ func (s *queueAckMgrSuite) TestReadCompleteUpdateTimerTasks() {
s.Equal(moreOutput, moreInput)
s.Equal(map[int64]bool{taskID1: false, taskID2: false, taskID3: false}, s.queueAckMgr.outstandingTasks)

s.mockProcessor.On("updateAckLevel", taskID1).Return(nil)
s.mockProcessor.On("updateAckLevel", taskID1).Return(nil).Once()
s.mockProcessor.On("completeTask", taskID1).Return(nil).Once()
s.queueAckMgr.completeQueueTask(taskID1)
s.queueAckMgr.updateQueueAckLevel()
Expand All @@ -290,7 +289,7 @@ func (s *queueAckMgrSuite) TestReadCompleteUpdateTimerTasks() {
s.queueAckMgr.updateQueueAckLevel()
s.Equal(taskID1, s.queueAckMgr.getQueueAckLevel())

s.mockProcessor.On("updateAckLevel", taskID3).Return(nil)
s.mockProcessor.On("updateAckLevel", taskID3).Return(nil).Once()
s.mockProcessor.On("completeTask", taskID2).Return(nil).Once()
s.queueAckMgr.completeQueueTask(taskID2)
s.queueAckMgr.updateQueueAckLevel()
Expand Down Expand Up @@ -350,8 +349,7 @@ func (s *queueFailoverAckMgrSuite) SetupTest() {
s.mockShard.config.ShardUpdateMinInterval = dynamicconfig.GetDurationPropertyFn(0 * time.Second)

s.queueFailoverAckMgr = newQueueFailoverAckMgr(s.mockShard, &QueueProcessorOptions{
UpdateShardTaskCount: dynamicconfig.GetIntPropertyFn(1),
MetricScope: metrics.ReplicatorQueueProcessorScope,
MetricScope: metrics.ReplicatorQueueProcessorScope,
}, s.mockProcessor, 0, s.logger)
}

Expand Down
3 changes: 2 additions & 1 deletion service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ type (
UpdateAckInterval dynamicconfig.DurationPropertyFn
MaxRetryCount dynamicconfig.IntPropertyFn
MetricScope int
UpdateShardTaskCount dynamicconfig.IntPropertyFn
}

queueProcessorBase struct {
Expand Down Expand Up @@ -283,6 +282,7 @@ ProcessRetryLoop:
err = p.processor.process(task)
if err != nil {
if err == ErrTaskRetry {
p.metricsClient.IncCounter(p.options.MetricScope, metrics.HistoryTaskStandbyRetryCounter)
<-notificationChan
} else {
logging.LogTaskProcessingFailedEvent(logger, err)
Expand All @@ -291,6 +291,7 @@ ProcessRetryLoop:
// just keep try for cache.DomainCacheRefreshInterval
// and giveup
if _, ok := err.(*workflow.DomainNotActiveError); ok && time.Now().Sub(startTime) > cache.DomainCacheRefreshInterval {
p.metricsClient.IncCounter(p.options.MetricScope, metrics.HistoryTaskNotActiveCounter)
return
}
backoff := time.Duration(retryCount * 100)
Expand Down
1 change: 0 additions & 1 deletion service/history/replicatorQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func newReplicatorQueueProcessor(shard ShardContext, replicator messaging.Produc
UpdateAckInterval: config.ReplicatorProcessorUpdateAckInterval,
MaxRetryCount: config.ReplicatorTaskMaxRetryCount,
MetricScope: metrics.ReplicatorQueueProcessorScope,
UpdateShardTaskCount: config.ReplicatorProcessorUpdateShardTaskCount,
}

logger = logger.WithFields(bark.Fields{
Expand Down
Loading

0 comments on commit 9ea16c0

Please sign in to comment.