Skip to content

Commit

Permalink
Add metrics to monitor task validation. (#5466)
Browse files Browse the repository at this point in the history
* resolved merge conflicts

* Added metrics to monitor the number of workflows going for validation
  • Loading branch information
agautam478 authored Dec 6, 2023
1 parent 558780b commit 186d253
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 21 deletions.
7 changes: 5 additions & 2 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,8 @@ const (
ClusterMetadataScope
// GetAvailableIsolationGroupsScope is the metric for the default partitioner's getIsolationGroups operation
GetAvailableIsolationGroupsScope

// TaskValidatorScope is the metric for the taskvalidator's workflow check operation.
TaskValidatorScope
NumCommonScopes
)

Expand Down Expand Up @@ -1623,6 +1624,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
GetAvailableIsolationGroupsScope: {operation: "GetAvailableIsolationGroups"},

DomainFailoverScope: {operation: "DomainFailover"},
TaskValidatorScope: {operation: "TaskValidation"},
DomainReplicationQueueScope: {operation: "DomainReplicationQueue"},
ClusterMetadataScope: {operation: "ClusterMetadata"},
},
Expand Down Expand Up @@ -2085,6 +2087,7 @@ const (
IsolationGroupStatePollerUnavailable
IsolationGroupStateDrained
IsolationGroupStateHealthy
ValidatedWorkflowCount

NumCommonMetrics // Needs to be last on this list for iota numbering
)
Expand Down Expand Up @@ -2360,7 +2363,6 @@ const (
LargeHistoryEventCount
LargeHistorySizeCount
UpdateWorkflowExecutionCount

NumHistoryMetrics
)

Expand Down Expand Up @@ -2716,6 +2718,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
IsolationGroupStatePollerUnavailable: {metricName: "isolation_group_poller_unavailable", metricType: Counter},
IsolationGroupStateDrained: {metricName: "isolation_group_drained", metricType: Counter},
IsolationGroupStateHealthy: {metricName: "isolation_group_healthy", metricType: Counter},
ValidatedWorkflowCount: {metricName: "task_validator_count", metricType: Counter},
},
History: {
TaskRequests: {metricName: "task_requests", metricType: Counter},
Expand Down
1 change: 1 addition & 0 deletions common/quotas/fallbackdynamicratelimiterfactory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/uber/cadence/common/dynamicconfig"
)

Expand Down
1 change: 0 additions & 1 deletion common/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ type (
GetIsolationGroupState() isolationgroup.State
GetPartitioner() partition.Partitioner
GetIsolationGroupStore() configstore.Client

//GetTaskValidator returns the taskValidator
GetTaskValidator() taskvalidator.Checker
}
Expand Down
2 changes: 1 addition & 1 deletion common/resource/resourceImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func New(
isolationGroups: isolationGroupState,
isolationGroupConfigStore: isolationGroupStore, // can be nil where persistence is not available
partitioner: partitioner,
taskvalidator: taskvalidator.NewWfChecker(logger),
taskvalidator: taskvalidator.NewWfChecker(logger, params.MetricsClient),
}
return impl, nil
}
Expand Down
2 changes: 1 addition & 1 deletion common/resource/resourceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func NewTest(
// logger

Logger: logger,
taskvalidator: taskvalidator.NewWfChecker(logger),
taskvalidator: taskvalidator.NewWfChecker(logger, metrics.NewClient(scope, serviceMetricsIndex)),
}
}

Expand Down
21 changes: 13 additions & 8 deletions common/taskvalidator/validateworkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
package taskvalidator

import (
"fmt"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"

"github.com/uber/cadence/common/log"
)
Expand All @@ -36,21 +37,25 @@ type Checker interface {

// checkerImpl is the implementation of the Checker interface.
type checkerImpl struct {
logger log.Logger
logger log.Logger
metricsClient metrics.Client
}

// NewWfChecker creates a new instance of Checker.
func NewWfChecker(logger log.Logger) Checker {
return &checkerImpl{logger: logger}
func NewWfChecker(logger log.Logger, metrics metrics.Client) Checker {
return &checkerImpl{logger: logger,
metricsClient: metrics}
}

// WorkflowCheckforValidation is a dummy implementation of workflow validation.
func (w *checkerImpl) WorkflowCheckforValidation(workflowID string, domainID string, runID string) error {
// Emitting just the log to ensure that the workflow is called for now.
// TODO: add some validations to check the wf for corruptions.
w.logger.Info(fmt.Sprintf("WorkflowCheckforValidation. DomainID: %v, WorkflowID: %v, RunID: %v",
domainID,
workflowID,
runID))
w.logger.Info("WorkflowCheckforValidation",
tag.WorkflowID(workflowID),
tag.WorkflowRunID(runID),
tag.WorkflowDomainID(domainID))
// Emit the number of workflows that have come in for the validation.
w.metricsClient.Scope(metrics.TaskValidatorScope).IncCounter(metrics.ValidatedWorkflowCount)
return nil
}
29 changes: 21 additions & 8 deletions common/taskvalidator/validateworkflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,33 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/metrics"
)

// MockMetricsScope implements the metrics.Scope interface for testing purposes.
type MockMetricsScope struct{}

func (s *MockMetricsScope) IncCounter(counter int) {}

func TestWorkflowCheckforValidation(t *testing.T) {
// Create a logger for testing
// Create a mock logger and metrics client
logger := log.NewNoop()
metricsClient := metrics.NewNoopMetricsClient()

// Create an instance of checkerImpl with the mock logger and metrics client
checker := NewWfChecker(logger, metricsClient)

// Define test inputs
workflowID := "testWorkflowID"
domainID := "testDomainID"
runID := "testRunID"

// Create a new Checker
checker := NewWfChecker(logger)
// Call the method being tested
err := checker.WorkflowCheckforValidation(workflowID, domainID, runID)

// Test with sample data
err := checker.WorkflowCheckforValidation("workflow123", "domain456", "run789")
// Assert that the method returned no error
assert.NoError(t, err)

// Assert that there is no error
assert.Nil(t, err)
// Add additional assertions as needed based on the expected behavior of the method
}

0 comments on commit 186d253

Please sign in to comment.