Skip to content

Commit

Permalink
Added WorkflowTerminationReasonTag to add another dimension to the …
Browse files Browse the repository at this point in the history
…metrics and allow filtering by termination reason
  • Loading branch information
fimanishi committed Jul 5, 2024
1 parent dc05a78 commit 460c452
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 3 deletions.
5 changes: 5 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,11 @@ func IsWorkflowOpen(isOpen bool) Tag {
return newBoolTag("is-workflow-open", isOpen)
}

// WorkflowTerminationReason returns a tag to report a workflow's termination reason
func WorkflowTerminationReason(reason string) Tag {
return newStringTag("wf-termination-reason", reason)
}

// domain related

// WorkflowDomainID returns tag for WorkflowDomainID
Expand Down
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2247,6 +2247,7 @@ const (
TaskQueueLatencyPerDomain
TransferTaskMissingEventCounterPerDomain
ReplicationTasksAppliedPerDomain
WorkflowTerminateCounterPerDomain

TaskRedispatchQueuePendingTasksTimer

Expand Down Expand Up @@ -2898,6 +2899,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
TaskQueueLatencyPerDomain: {metricName: "task_latency_queue_per_domain", metricRollupName: "task_latency_queue", metricType: Timer},
TransferTaskMissingEventCounterPerDomain: {metricName: "transfer_task_missing_event_counter_per_domain", metricRollupName: "transfer_task_missing_event_counter", metricType: Counter},
ReplicationTasksAppliedPerDomain: {metricName: "replication_tasks_applied_per_domain", metricRollupName: "replication_tasks_applied", metricType: Counter},
WorkflowTerminateCounterPerDomain: {metricName: "workflow_terminate_counter_per_domain", metricRollupName: "workflow_terminate_counter", metricType: Counter},

TaskBatchCompleteCounter: {metricName: "task_batch_complete_counter", metricType: Counter},
TaskBatchCompleteFailure: {metricName: "task_batch_complete_error", metricType: Counter},
Expand Down
6 changes: 6 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const (
globalRatelimitKey = "global_ratelimit_key"
globalRatelimitType = "global_ratelimit_type"
globalRatelimitCollectionName = "global_ratelimit_collection"
workflowTerminationReason = "workflow_termination_reason"

allValue = "all"
unknownValue = "_unknown_"
Expand Down Expand Up @@ -262,6 +263,11 @@ func GlobalRatelimiterCollectionName(value string) Tag {
return simpleMetric{key: globalRatelimitCollectionName, value: value}
}

// WorkflowTerminationReasonTag reports the reason for workflow termination
func WorkflowTerminationReasonTag(value string) Tag {
return simpleMetric{key: workflowTerminationReason, value: value}
}

// PartitionConfigTags returns a list of partition config tags
func PartitionConfigTags(partitionConfig map[string]string) []Tag {
tags := make([]Tag, 0, len(partitionConfig))
Expand Down
16 changes: 16 additions & 0 deletions service/history/execution/mutable_state_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,22 @@ func (e *mutableStateBuilder) AddWorkflowExecutionTerminatedEvent(
if err := e.ReplicateWorkflowExecutionTerminatedEvent(firstEventID, event); err != nil {
return nil, err
}

domainName := e.GetDomainEntry().GetInfo().Name

e.logger.Info(
"Workflow execution terminated.",
tag.WorkflowDomainName(domainName),
tag.WorkflowID(e.GetExecutionInfo().WorkflowID),
tag.WorkflowRunID(e.GetExecutionInfo().RunID),
tag.WorkflowTerminationReason(reason),
)

scopeWithDomainTag := e.metricsClient.Scope(metrics.HistoryTerminateWorkflowExecutionScope).
Tagged(metrics.DomainTag(domainName)).
Tagged(metrics.WorkflowTerminationReasonTag(reason))
scopeWithDomainTag.IncCounter(metrics.WorkflowTerminateCounterPerDomain)

return event, nil
}

Expand Down
7 changes: 4 additions & 3 deletions service/history/execution/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
IdentityHistoryService = "history-service"
// WorkflowTerminationIdentity is the component which decides to terminate the workflow
WorkflowTerminationIdentity = "worker-service"
// WorkflowTerminationReason is the reason for terminating workflow due to version conflit
// WorkflowTerminationReason is the reason for terminating workflow due to version conflict
WorkflowTerminationReason = "Terminate Workflow Due To Version Conflict."
)

Expand Down Expand Up @@ -189,7 +189,7 @@ func (r *workflowImpl) SuppressBy(
currentCluster := r.clusterMetadata.GetCurrentClusterName()

if currentCluster == lastWriteCluster {
return TransactionPolicyActive, r.terminateWorkflow(lastWriteVersion, incomingLastWriteVersion)
return TransactionPolicyActive, r.terminateWorkflow(lastWriteVersion, incomingLastWriteVersion, WorkflowTerminationReason)
}
return TransactionPolicyPassive, r.zombiefyWorkflow()
}
Expand Down Expand Up @@ -251,6 +251,7 @@ func (r *workflowImpl) failDecision(
func (r *workflowImpl) terminateWorkflow(
lastWriteVersion int64,
incomingLastWriteVersion int64,
terminationReason string,
) error {

eventBatchFirstEventID := r.GetMutableState().GetNextEventID()
Expand All @@ -265,7 +266,7 @@ func (r *workflowImpl) terminateWorkflow(

_, err := r.mutableState.AddWorkflowExecutionTerminatedEvent(
eventBatchFirstEventID,
WorkflowTerminationReason,
terminationReason,
[]byte(fmt.Sprintf("terminated by version: %v", incomingLastWriteVersion)),
WorkflowTerminationIdentity,
)
Expand Down

0 comments on commit 460c452

Please sign in to comment.