Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve logs/metrics of HandleDecisionTaskCompleted #5497

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 50 additions & 33 deletions service/history/decision/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,6 @@ func (handler *handlerImpl) HandleDecisionTaskCompleted(
ctx context.Context,
req *types.HistoryRespondDecisionTaskCompletedRequest,
) (resp *types.HistoryRespondDecisionTaskCompletedResponse, retError error) {

domainEntry, err := handler.getActiveDomainByID(req.DomainUUID)
if err != nil {
return nil, err
Expand All @@ -304,6 +303,18 @@ func (handler *handlerImpl) HandleDecisionTaskCompleted(
RunID: token.RunID,
}

domainName := domainEntry.GetInfo().Name
logger := handler.logger.WithTags(
taylanisikdemir marked this conversation as resolved.
Show resolved Hide resolved
tag.WorkflowDomainName(domainName),
tag.WorkflowDomainID(domainEntry.GetInfo().ID),
tag.WorkflowID(workflowExecution.GetWorkflowID()),
tag.WorkflowRunID(workflowExecution.GetRunID()),
tag.WorkflowScheduleID(token.ScheduleID),
)
scope := handler.metricsClient.Scope(metrics.HistoryRespondDecisionTaskCompletedScope,
metrics.DomainTag(domainName),
metrics.WorkflowTypeTag(token.WorkflowType))

call := yarpc.CallFromContext(ctx)
clientLibVersion := call.Header(common.LibraryVersionHeaderName)
clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
Expand All @@ -317,6 +328,7 @@ func (handler *handlerImpl) HandleDecisionTaskCompleted(

Update_History_Loop:
for attempt := 0; attempt < workflow.ConditionalRetryCount; attempt++ {
logger.Debug("Update_History_Loop attempt", tag.Attempt(int32(attempt)))
msBuilder, err := wfContext.LoadWorkflowExecution(ctx)
if err != nil {
return nil, err
Expand All @@ -330,60 +342,53 @@ Update_History_Loop:
}

executionInfo := msBuilder.GetExecutionInfo()

scheduleID := token.ScheduleID
currentDecision, isRunning := msBuilder.GetDecisionInfo(scheduleID)
currentDecision, isRunning := msBuilder.GetDecisionInfo(token.ScheduleID)

// First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in
// some extreme cassandra failure cases.
if !isRunning && scheduleID >= msBuilder.GetNextEventID() {
handler.metricsClient.IncCounter(metrics.HistoryRespondDecisionTaskCompletedScope, metrics.StaleMutableStateCounter)
handler.logger.Error("Encounter stale mutable state in RespondDecisionTaskCompleted",
tag.WorkflowDomainName(domainEntry.GetInfo().Name),
tag.WorkflowID(workflowExecution.GetWorkflowID()),
tag.WorkflowRunID(workflowExecution.GetRunID()),
tag.WorkflowScheduleID(scheduleID),
tag.WorkflowNextEventID(msBuilder.GetNextEventID()),
)
if !isRunning && token.ScheduleID >= msBuilder.GetNextEventID() {
scope.IncCounter(metrics.StaleMutableStateCounter)
logger.Error("Encounter stale mutable state in RespondDecisionTaskCompleted", tag.WorkflowNextEventID(msBuilder.GetNextEventID()))
// Reload workflow execution history
wfContext.Clear()
continue Update_History_Loop
}

if !msBuilder.IsWorkflowExecutionRunning() || !isRunning || currentDecision.Attempt != token.ScheduleAttempt ||
currentDecision.StartedID == common.EmptyEventID {
if !msBuilder.IsWorkflowExecutionRunning() || !isRunning || currentDecision.Attempt != token.ScheduleAttempt || currentDecision.StartedID == common.EmptyEventID {
logger.Debugf("Decision task not found. IsWorkflowExecutionRunning: %v, isRunning: %v, currentDecision.Attempt: %v, token.ScheduleAttempt: %v, currentDecision.StartID: %v",
taylanisikdemir marked this conversation as resolved.
Show resolved Hide resolved
msBuilder.IsWorkflowExecutionRunning(), isRunning, getDecisionInfoAttempt(currentDecision), token.ScheduleAttempt, getDecisionInfoStartedID(currentDecision))
return nil, &types.EntityNotExistsError{Message: "Decision task not found."}
}

startedID := currentDecision.StartedID
maxResetPoints := handler.config.MaxAutoResetPoints(domainEntry.GetInfo().Name)
if msBuilder.GetExecutionInfo().AutoResetPoints != nil && maxResetPoints == len(msBuilder.GetExecutionInfo().AutoResetPoints.Points) {
handler.metricsClient.IncCounter(metrics.HistoryRespondDecisionTaskCompletedScope, metrics.AutoResetPointsLimitExceededCounter)
logger.Debugf("Max reset points %d is exceeded", maxResetPoints)
taylanisikdemir marked this conversation as resolved.
Show resolved Hide resolved
scope.IncCounter(metrics.AutoResetPointsLimitExceededCounter)
}

decisionHeartbeating := request.GetForceCreateNewDecisionTask() && len(request.Decisions) == 0
var decisionHeartbeatTimeout bool
var completedEvent *types.HistoryEvent
if decisionHeartbeating {
domainName := domainEntry.GetInfo().Name
timeout := handler.config.DecisionHeartbeatTimeout(domainName)
if currentDecision.OriginalScheduledTimestamp > 0 && handler.timeSource.Now().After(time.Unix(0, currentDecision.OriginalScheduledTimestamp).Add(timeout)) {
decisionHeartbeatTimeout = true
scope := handler.metricsClient.Scope(metrics.HistoryRespondDecisionTaskCompletedScope, metrics.DomainTag(domainName))
scope.IncCounter(metrics.DecisionHeartbeatTimeoutCounter)
completedEvent, err = msBuilder.AddDecisionTaskTimedOutEvent(currentDecision.ScheduleID, currentDecision.StartedID)
if err != nil {
return nil, &types.InternalServiceError{Message: "Failed to add decision timeout event."}
}
msBuilder.ClearStickyness()
} else {
completedEvent, err = msBuilder.AddDecisionTaskCompletedEvent(scheduleID, startedID, request, maxResetPoints)
logger.Debug("Adding DecisionTaskCompletedEvent to mutable state for heartbeat")
completedEvent, err = msBuilder.AddDecisionTaskCompletedEvent(token.ScheduleID, startedID, request, maxResetPoints)
if err != nil {
return nil, &types.InternalServiceError{Message: "Unable to add DecisionTaskCompleted event to history."}
}
}
} else {
completedEvent, err = msBuilder.AddDecisionTaskCompletedEvent(scheduleID, startedID, request, maxResetPoints)
completedEvent, err = msBuilder.AddDecisionTaskCompletedEvent(token.ScheduleID, startedID, request, maxResetPoints)
if err != nil {
return nil, &types.InternalServiceError{Message: "Unable to add DecisionTaskCompleted event to history."}
}
Expand All @@ -401,11 +406,11 @@ Update_History_Loop:
hasUnhandledEvents = msBuilder.HasBufferedEvents()

if request.StickyAttributes == nil || request.StickyAttributes.WorkerTaskList == nil {
handler.metricsClient.IncCounter(metrics.HistoryRespondDecisionTaskCompletedScope, metrics.CompleteDecisionWithStickyDisabledCounter)
scope.IncCounter(metrics.CompleteDecisionWithStickyDisabledCounter)
executionInfo.StickyTaskList = ""
executionInfo.StickyScheduleToStartTimeout = 0
} else {
handler.metricsClient.IncCounter(metrics.HistoryRespondDecisionTaskCompletedScope, metrics.CompleteDecisionWithStickyEnabledCounter)
scope.IncCounter(metrics.CompleteDecisionWithStickyEnabledCounter)
executionInfo.StickyTaskList = request.StickyAttributes.WorkerTaskList.GetName()
executionInfo.StickyScheduleToStartTimeout = request.StickyAttributes.GetScheduleToStartTimeoutSeconds()
}
Expand All @@ -419,8 +424,6 @@ Update_History_Loop:
failCause = types.DecisionTaskFailedCauseBadBinary
failMessage = fmt.Sprintf("binary %v is already marked as bad deployment", binChecksum)
} else {

domainName := domainEntry.GetInfo().Name
workflowSizeChecker := newWorkflowSizeChecker(
handler.config.BlobSizeLimitWarn(domainName),
handler.config.BlobSizeLimitError(domainName),
Expand Down Expand Up @@ -468,20 +471,15 @@ Update_History_Loop:
// failMessage is not used by decisionTaskHandler
activityNotStartedCancelled = decisionTaskHandler.activityNotStartedCancelled
// continueAsNewTimerTasks is not used by decisionTaskHandler

continueAsNewBuilder = decisionTaskHandler.continueAsNewBuilder

hasUnhandledEvents = decisionTaskHandler.hasUnhandledEventsBeforeDecisions
}

if failDecision {
handler.metricsClient.IncCounter(metrics.HistoryRespondDecisionTaskCompletedScope, metrics.FailedDecisionsCounter)
handler.logger.Info("Failing the decision.", tag.WorkflowDecisionFailCause(int64(failCause)),
tag.WorkflowID(token.WorkflowID),
tag.WorkflowRunID(token.RunID),
tag.WorkflowDomainID(domainID))
scope.IncCounter(metrics.FailedDecisionsCounter)
logger.Info("Failing the decision.", tag.WorkflowDecisionFailCause(int64(failCause)))
msBuilder, err = handler.failDecisionHelper(
ctx, wfContext, scheduleID, startedID, failCause, []byte(failMessage), request, domainEntry)
ctx, wfContext, token.ScheduleID, startedID, failCause, []byte(failMessage), request, domainEntry)
if err != nil {
return nil, err
}
Expand All @@ -490,6 +488,8 @@ Update_History_Loop:
}

createNewDecisionTask := msBuilder.IsWorkflowExecutionRunning() && (hasUnhandledEvents || request.GetForceCreateNewDecisionTask() || activityNotStartedCancelled)
logger.Debugf("createNewDecisionTask: %v, msBuilder.IsWorkflowExecutionRunning: %v, hasUnhandledEvents: %v, request.GetForceCreateNewDecisionTask: %v, activityNotStartedCancelled: %v",
createNewDecisionTask, msBuilder.IsWorkflowExecutionRunning(), hasUnhandledEvents, request.GetForceCreateNewDecisionTask(), activityNotStartedCancelled)
var newDecisionTaskScheduledID int64
if createNewDecisionTask {
var newDecision *execution.DecisionInfo
Expand All @@ -511,6 +511,7 @@ Update_History_Loop:
newDecisionTaskScheduledID = newDecision.ScheduleID
// skip transfer task for decision if request asking to return new decision task
if request.GetReturnNewDecisionTask() {
logger.Debugf("Adding DecisionTaskStartedEvent to mutable state. new decision's ScheduleID: %d, TaskList: %s", newDecisionTaskScheduledID, newDecision.TaskList)
// start the new decision task if request asked to do so
// TODO: replace the poll request
_, _, err := msBuilder.AddDecisionTaskStartedEvent(newDecision.ScheduleID, "request-from-RespondDecisionTaskCompleted", &types.PollForDecisionTaskRequest{
Expand All @@ -528,6 +529,7 @@ Update_History_Loop:
var updateErr error
if continueAsNewBuilder != nil {
continueAsNewExecutionInfo := continueAsNewBuilder.GetExecutionInfo()
logger.Debugf("Updating execution with continue as new info. new wfid: %s, runid: %s", continueAsNewExecutionInfo.WorkflowID, continueAsNewExecutionInfo.RunID)
updateErr = wfContext.UpdateWorkflowExecutionWithNewAsActive(
ctx,
handler.shard.GetTimeSource().Now(),
Expand All @@ -549,7 +551,7 @@ Update_History_Loop:

if updateErr != nil {
if execution.IsConflictError(updateErr) {
handler.metricsClient.IncCounter(metrics.HistoryRespondDecisionTaskCompletedScope, metrics.ConcurrencyUpdateFailureCounter)
scope.IncCounter(metrics.ConcurrencyUpdateFailureCounter)
continue Update_History_Loop
}

Expand Down Expand Up @@ -612,6 +614,7 @@ Update_History_Loop:
activitiesToDispatchLocally[dr.activityDispatchInfo.ActivityID] = dr.activityDispatchInfo
}
}
logger.Debugf("%d activities will be dispatched locally on the client side")
resp.ActivitiesToDispatchLocally = activitiesToDispatchLocally

if request.GetReturnNewDecisionTask() && createNewDecisionTask {
Expand Down Expand Up @@ -891,3 +894,17 @@ func (handler *handlerImpl) failDecisionHelper(
func (handler *handlerImpl) getActiveDomainByID(id string) (*cache.DomainCacheEntry, error) {
return cache.GetActiveDomainByID(handler.shard.GetDomainCache(), handler.shard.GetClusterMetadata().GetCurrentClusterName(), id)
}

func getDecisionInfoAttempt(di *execution.DecisionInfo) int64 {
taylanisikdemir marked this conversation as resolved.
Show resolved Hide resolved
if di == nil {
return 0
}
return di.Attempt
}

func getDecisionInfoStartedID(di *execution.DecisionInfo) int64 {
if di == nil {
return 0
}
return di.StartedID
}