Skip to content

Commit

Permalink
Merge branch 'master' into pprof
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 authored Jan 10, 2018
2 parents 3335446 + 7ca011c commit 8ae311b
Show file tree
Hide file tree
Showing 10 changed files with 402 additions and 113 deletions.
32 changes: 16 additions & 16 deletions client/history/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (c *metricClient) StartWorkflowExecution(
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientStartWorkflowExecutionScope, metrics.HistoryFailures)
c.metricsClient.IncCounter(metrics.HistoryClientStartWorkflowExecutionScope, metrics.HistoryClientFailures)
}

return resp, err
Expand All @@ -72,7 +72,7 @@ func (c *metricClient) GetMutableState(
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientGetMutableStateScope, metrics.HistoryFailures)
c.metricsClient.IncCounter(metrics.HistoryClientGetMutableStateScope, metrics.HistoryClientFailures)
}

return resp, err
Expand All @@ -89,7 +89,7 @@ func (c *metricClient) DescribeWorkflowExecution(
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientDescribeWorkflowExecutionScope, metrics.HistoryFailures)
c.metricsClient.IncCounter(metrics.HistoryClientDescribeWorkflowExecutionScope, metrics.HistoryClientFailures)
}

return resp, err
Expand All @@ -106,7 +106,7 @@ func (c *metricClient) RecordDecisionTaskStarted(
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientRecordDecisionTaskStartedScope, metrics.HistoryFailures)
c.metricsClient.IncCounter(metrics.HistoryClientRecordDecisionTaskStartedScope, metrics.HistoryClientFailures)
}

return resp, err
Expand All @@ -123,7 +123,7 @@ func (c *metricClient) RecordActivityTaskStarted(
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientRecordActivityTaskStartedScope, metrics.HistoryFailures)
c.metricsClient.IncCounter(metrics.HistoryClientRecordActivityTaskStartedScope, metrics.HistoryClientFailures)
}

return resp, err
Expand All @@ -140,7 +140,7 @@ func (c *metricClient) RespondDecisionTaskCompleted(
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientRespondDecisionTaskCompletedScope, metrics.HistoryFailures)
c.metricsClient.IncCounter(metrics.HistoryClientRespondDecisionTaskCompletedScope, metrics.HistoryClientFailures)
}

return err
Expand All @@ -157,7 +157,7 @@ func (c *metricClient) RespondDecisionTaskFailed(
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientRespondDecisionTaskFailedScope, metrics.HistoryFailures)
c.metricsClient.IncCounter(metrics.HistoryClientRespondDecisionTaskFailedScope, metrics.HistoryClientFailures)
}

return err
Expand All @@ -174,7 +174,7 @@ func (c *metricClient) RespondActivityTaskCompleted(
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientRespondActivityTaskCompletedScope, metrics.HistoryFailures)
c.metricsClient.IncCounter(metrics.HistoryClientRespondActivityTaskCompletedScope, metrics.HistoryClientFailures)
}

return err
Expand All @@ -191,7 +191,7 @@ func (c *metricClient) RespondActivityTaskFailed(
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientRespondActivityTaskFailedScope, metrics.HistoryFailures)
c.metricsClient.IncCounter(metrics.HistoryClientRespondActivityTaskFailedScope, metrics.HistoryClientFailures)
}

return err
Expand All @@ -208,7 +208,7 @@ func (c *metricClient) RespondActivityTaskCanceled(
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientRespondActivityTaskCanceledScope, metrics.HistoryFailures)
c.metricsClient.IncCounter(metrics.HistoryClientRespondActivityTaskCanceledScope, metrics.HistoryClientFailures)
}

return err
Expand All @@ -225,7 +225,7 @@ func (c *metricClient) RecordActivityTaskHeartbeat(
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientRecordActivityTaskHeartbeatScope, metrics.HistoryFailures)
c.metricsClient.IncCounter(metrics.HistoryClientRecordActivityTaskHeartbeatScope, metrics.HistoryClientFailures)
}

return resp, err
Expand All @@ -242,7 +242,7 @@ func (c *metricClient) RequestCancelWorkflowExecution(
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientRequestCancelWorkflowExecutionScope, metrics.HistoryFailures)
c.metricsClient.IncCounter(metrics.HistoryClientRequestCancelWorkflowExecutionScope, metrics.HistoryClientFailures)
}

return err
Expand All @@ -259,7 +259,7 @@ func (c *metricClient) SignalWorkflowExecution(
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientSignalWorkflowExecutionScope, metrics.HistoryFailures)
c.metricsClient.IncCounter(metrics.HistoryClientSignalWorkflowExecutionScope, metrics.HistoryClientFailures)
}

return err
Expand All @@ -276,7 +276,7 @@ func (c *metricClient) TerminateWorkflowExecution(
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientTerminateWorkflowExecutionScope, metrics.HistoryFailures)
c.metricsClient.IncCounter(metrics.HistoryClientTerminateWorkflowExecutionScope, metrics.HistoryClientFailures)
}

return err
Expand All @@ -293,7 +293,7 @@ func (c *metricClient) ScheduleDecisionTask(
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientScheduleDecisionTaskScope, metrics.HistoryFailures)
c.metricsClient.IncCounter(metrics.HistoryClientScheduleDecisionTaskScope, metrics.HistoryClientFailures)
}

return err
Expand All @@ -310,7 +310,7 @@ func (c *metricClient) RecordChildExecutionCompleted(
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientRecordChildExecutionCompletedScope, metrics.HistoryFailures)
c.metricsClient.IncCounter(metrics.HistoryClientRecordChildExecutionCompletedScope, metrics.HistoryClientFailures)
}

return err
Expand Down
14 changes: 7 additions & 7 deletions client/matching/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (c *metricClient) AddActivityTask(
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.MatchingClientAddActivityTaskScope, metrics.MatchingFailures)
c.metricsClient.IncCounter(metrics.MatchingClientAddActivityTaskScope, metrics.MatchingClientFailures)
}

return err
Expand All @@ -72,7 +72,7 @@ func (c *metricClient) AddDecisionTask(
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.MatchingClientAddDecisionTaskScope, metrics.MatchingFailures)
c.metricsClient.IncCounter(metrics.MatchingClientAddDecisionTaskScope, metrics.MatchingClientFailures)
}

return err
Expand All @@ -89,7 +89,7 @@ func (c *metricClient) PollForActivityTask(
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.MatchingClientPollForActivityTaskScope, metrics.MatchingFailures)
c.metricsClient.IncCounter(metrics.MatchingClientPollForActivityTaskScope, metrics.MatchingClientFailures)
}

return resp, err
Expand All @@ -106,7 +106,7 @@ func (c *metricClient) PollForDecisionTask(
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.MatchingClientPollForDecisionTaskScope, metrics.MatchingFailures)
c.metricsClient.IncCounter(metrics.MatchingClientPollForDecisionTaskScope, metrics.MatchingClientFailures)
}

return resp, err
Expand All @@ -123,7 +123,7 @@ func (c *metricClient) QueryWorkflow(
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.MatchingClientQueryWorkflowScope, metrics.MatchingFailures)
c.metricsClient.IncCounter(metrics.MatchingClientQueryWorkflowScope, metrics.MatchingClientFailures)
}

return resp, err
Expand All @@ -140,7 +140,7 @@ func (c *metricClient) RespondQueryTaskCompleted(
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.MatchingClientRespondQueryTaskCompletedScope, metrics.MatchingFailures)
c.metricsClient.IncCounter(metrics.MatchingClientRespondQueryTaskCompletedScope, metrics.MatchingClientFailures)
}

return err
Expand All @@ -157,7 +157,7 @@ func (c *metricClient) CancelOutstandingPoll(
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.MatchingClientCancelOutstandingPollScope, metrics.MatchingFailures)
c.metricsClient.IncCounter(metrics.MatchingClientCancelOutstandingPollScope, metrics.MatchingClientFailures)
}

return err
Expand Down
19 changes: 11 additions & 8 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,15 +534,17 @@ const (
PersistenceErrTimeoutCounter
PersistenceErrBusyCounter

NumCommonMetrics
HistoryClientFailures
MatchingClientFailures

NumCommonMetrics // Needs to be last on this list for iota numbering
)

// History Metrics enum
const (
TaskRequests = iota + NumCommonMetrics
TaskFailures
TaskLatency
HistoryFailures
AckLevelUpdateCounter
AckLevelUpdateFailedCounter
DecisionTypeScheduleActivityCounter
Expand Down Expand Up @@ -588,15 +590,15 @@ const (

// Matching metrics enum
const (
MatchingFailures = iota + NumCommonMetrics
PollSuccessCounter
PollSuccessCounter = iota + NumCommonMetrics
PollTimeoutCounter
PollErrorsCounter
PollSuccessWithSyncCounter
LeaseRequestCounter
LeaseFailureCounter
ConditionFailedErrorCounter
RespondQueryTaskFailedCounter
SyncThrottleCounter
BufferThrottleCounter
)

// MetricDefs record the metrics for all services
Expand All @@ -620,13 +622,14 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
PersistenceErrConditionFailedCounter: {metricName: "persistence.errors.condition-failed", metricType: Counter},
PersistenceErrTimeoutCounter: {metricName: "persistence.errors.timeout", metricType: Counter},
PersistenceErrBusyCounter: {metricName: "persistence.errors.busy", metricType: Counter},
HistoryClientFailures: {metricName: "client.history.errors", metricType: Counter},
MatchingClientFailures: {metricName: "client.matching.errors", metricType: Counter},
},
Frontend: {},
History: {
TaskRequests: {metricName: "task.requests", metricType: Counter},
TaskFailures: {metricName: "task.errors", metricType: Counter},
TaskLatency: {metricName: "task.latency", metricType: Counter},
HistoryFailures: {metricName: "history.errors", metricType: Counter},
AckLevelUpdateCounter: {metricName: "ack-level-update", metricType: Counter},
AckLevelUpdateFailedCounter: {metricName: "ack-level-update-failed", metricType: Counter},
DecisionTypeScheduleActivityCounter: {metricName: "schedule-activity-decision", metricType: Counter},
Expand Down Expand Up @@ -670,15 +673,15 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
HistoryEventNotificationFailDeliveryCount: {metricName: "history-event-notification-fail-delivery-count", metricType: Counter},
},
Matching: {
MatchingFailures: {metricName: "matching.errors", metricType: Counter},
PollSuccessCounter: {metricName: "poll.success"},
PollTimeoutCounter: {metricName: "poll.timeouts"},
PollErrorsCounter: {metricName: "poll.errors"},
PollSuccessWithSyncCounter: {metricName: "poll.success.sync"},
LeaseRequestCounter: {metricName: "lease.requests"},
LeaseFailureCounter: {metricName: "lease.failures"},
ConditionFailedErrorCounter: {metricName: "condition-failed-errors"},
RespondQueryTaskFailedCounter: {metricName: "respond-query-failed"},
SyncThrottleCounter: {metricName: "sync.throttle.count"},
BufferThrottleCounter: {metricName: "buffer.throttle.count"},
},
}

Expand Down
1 change: 1 addition & 0 deletions service/frontend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1664,6 +1664,7 @@ func (wh *WorkflowHandler) createPollForDecisionTaskResponse(ctx context.Context
StartedEventId: matchingResp.StartedEventId,
Query: matchingResp.Query,
BacklogCountHint: matchingResp.BacklogCountHint,
Attempt: matchingResp.Attempt,
History: history,
NextPageToken: continuation,
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ func (e *mutableStateBuilder) AddDecisionTaskCompletedEvent(scheduleEventID, sta

if di.Attempt > 0 {
// Create corresponding DecisionTaskSchedule and DecisionTaskStarted events for decisions we have been retrying
scheduledEvent := e.hBuilder.AddDecisionTaskScheduledEvent(di.Tasklist, di.DecisionTimeout, di.Attempt)
scheduledEvent := e.hBuilder.AddDecisionTaskScheduledEvent(e.executionInfo.TaskList, di.DecisionTimeout, di.Attempt)
startedEvent := e.hBuilder.AddDecisionTaskStartedEvent(scheduledEvent.GetEventId(), di.RequestID,
request.GetIdentity())
startedEventID = startedEvent.GetEventId()
Expand Down
33 changes: 25 additions & 8 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ func (e *matchingEngineImpl) Stop() {
}

func (e *matchingEngineImpl) getTaskLists(maxCount int) (lists []taskListManager) {
e.taskListsLock.Lock()
e.taskListsLock.RLock()
defer e.taskListsLock.RUnlock()
lists = make([]taskListManager, 0, len(e.taskLists))
count := 0
for _, tlMgr := range e.taskLists {
Expand All @@ -139,7 +140,6 @@ func (e *matchingEngineImpl) getTaskLists(maxCount int) (lists []taskListManager
break
}
}
e.taskListsLock.Unlock()
return
}

Expand All @@ -153,20 +153,24 @@ func (e *matchingEngineImpl) String() string {
return r
}

// Returns taskListManager for a task list. If not already cached gets new range from DB and if successful creates one.
// Returns taskListManager for a task list. If not already cached gets new range from DB and
// if successful creates one.
func (e *matchingEngineImpl) getTaskListManager(taskList *taskListID) (taskListManager, error) {
// The first check is an optimization so almost all requests will have a task list manager
// and return avoiding the write lock
e.taskListsLock.RLock()
if result, ok := e.taskLists[*taskList]; ok {
e.taskListsLock.RUnlock()
return result, nil
}
e.taskListsLock.RUnlock()
mgr := newTaskListManager(e, taskList, e.config)
// If it gets here, write lock and check again in case a task list is created between the two locks
e.taskListsLock.Lock()
if result, ok := e.taskLists[*taskList]; ok {
e.taskListsLock.Unlock()
return result, nil
}
mgr := newTaskListManager(e, taskList, e.config)
e.taskLists[*taskList] = mgr
e.taskListsLock.Unlock()
logging.LogTaskListLoadingEvent(e.logger, taskList.taskListName, taskList.taskType)
Expand All @@ -179,6 +183,13 @@ func (e *matchingEngineImpl) getTaskListManager(taskList *taskListID) (taskListM
return mgr, nil
}

// For use in tests
func (e *matchingEngineImpl) updateTaskList(taskList *taskListID, mgr taskListManager) {
e.taskListsLock.Lock()
defer e.taskListsLock.Unlock()
e.taskLists[*taskList] = mgr
}

func (e *matchingEngineImpl) removeTaskListManager(id *taskListID) {
e.taskListsLock.Lock()
defer e.taskListsLock.Unlock()
Expand Down Expand Up @@ -247,7 +258,7 @@ pollLoop:
// long-poll when frontend calls CancelOutstandingPoll API
pollerCtx := context.WithValue(ctx, pollerIDKey, pollerID)
taskList := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision)
tCtx, err := e.getTask(pollerCtx, taskList)
tCtx, err := e.getTask(pollerCtx, taskList, nil)
if err != nil {
// TODO: Is empty poll the best reply for errPumpClosed?
if err == ErrNoTasks || err == errPumpClosed {
Expand Down Expand Up @@ -341,10 +352,14 @@ pollLoop:
}

taskList := newTaskListID(domainID, taskListName, persistence.TaskListTypeActivity)
var maxDispatch *float64
if request.TaskListMetadata != nil {
maxDispatch = request.TaskListMetadata.MaxTasksPerSecond
}
// Add frontend generated pollerID to context so tasklistMgr can support cancellation of
// long-poll when frontend calls CancelOutstandingPoll API
pollerCtx := context.WithValue(ctx, pollerIDKey, pollerID)
tCtx, err := e.getTask(pollerCtx, taskList)
tCtx, err := e.getTask(pollerCtx, taskList, maxDispatch)
if err != nil {
// TODO: Is empty poll the best reply for errPumpClosed?
if err == ErrNoTasks || err == errPumpClosed {
Expand Down Expand Up @@ -450,12 +465,14 @@ func (e *matchingEngineImpl) CancelOutstandingPoll(ctx context.Context, request
}

// Loads a task from persistence and wraps it in a task context
func (e *matchingEngineImpl) getTask(ctx context.Context, taskList *taskListID) (*taskContext, error) {
func (e *matchingEngineImpl) getTask(
ctx context.Context, taskList *taskListID, maxDispatchPerSecond *float64,
) (*taskContext, error) {
tlMgr, err := e.getTaskListManager(taskList)
if err != nil {
return nil, err
}
return tlMgr.GetTaskContext(ctx)
return tlMgr.GetTaskContext(ctx, maxDispatchPerSecond)
}

func (e *matchingEngineImpl) unloadTaskList(id *taskListID) {
Expand Down
Loading

0 comments on commit 8ae311b

Please sign in to comment.