Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics to monitor task validation. #5466

Merged
merged 3 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,8 @@ const (
ClusterMetadataScope
// GetAvailableIsolationGroupsScope is the metric for the default partitioner's getIsolationGroups operation
GetAvailableIsolationGroupsScope

// TaskValidatorScope is the metric for the taskvalidator's workflow check operation.
TaskValidatorScope
NumCommonScopes
)

Expand Down Expand Up @@ -1623,6 +1624,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
GetAvailableIsolationGroupsScope: {operation: "GetAvailableIsolationGroups"},

DomainFailoverScope: {operation: "DomainFailover"},
TaskValidatorScope: {operation: "TaskValidation"},
DomainReplicationQueueScope: {operation: "DomainReplicationQueue"},
ClusterMetadataScope: {operation: "ClusterMetadata"},
},
Expand Down Expand Up @@ -2085,6 +2087,7 @@ const (
IsolationGroupStatePollerUnavailable
IsolationGroupStateDrained
IsolationGroupStateHealthy
ValidatedWorkflowCount

NumCommonMetrics // Needs to be last on this list for iota numbering
)
Expand Down Expand Up @@ -2360,7 +2363,6 @@ const (
LargeHistoryEventCount
LargeHistorySizeCount
UpdateWorkflowExecutionCount

NumHistoryMetrics
)

Expand Down Expand Up @@ -2716,6 +2718,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
IsolationGroupStatePollerUnavailable: {metricName: "isolation_group_poller_unavailable", metricType: Counter},
IsolationGroupStateDrained: {metricName: "isolation_group_drained", metricType: Counter},
IsolationGroupStateHealthy: {metricName: "isolation_group_healthy", metricType: Counter},
ValidatedWorkflowCount: {metricName: "task_validator_count", metricType: Counter},
},
History: {
TaskRequests: {metricName: "task_requests", metricType: Counter},
Expand Down
1 change: 1 addition & 0 deletions common/quotas/fallbackdynamicratelimiterfactory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/uber/cadence/common/dynamicconfig"
)

Expand Down
1 change: 0 additions & 1 deletion common/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ type (
GetIsolationGroupState() isolationgroup.State
GetPartitioner() partition.Partitioner
GetIsolationGroupStore() configstore.Client

//GetTaskValidator returns the taskValidator
GetTaskValidator() taskvalidator.Checker
}
Expand Down
2 changes: 1 addition & 1 deletion common/resource/resourceImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func New(
isolationGroups: isolationGroupState,
isolationGroupConfigStore: isolationGroupStore, // can be nil where persistence is not available
partitioner: partitioner,
taskvalidator: taskvalidator.NewWfChecker(logger),
taskvalidator: taskvalidator.NewWfChecker(logger, params.MetricsClient),
}
return impl, nil
}
Expand Down
2 changes: 1 addition & 1 deletion common/resource/resourceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func NewTest(
// logger

Logger: logger,
taskvalidator: taskvalidator.NewWfChecker(logger),
taskvalidator: taskvalidator.NewWfChecker(logger, metrics.NewClient(scope, serviceMetricsIndex)),
}
}

Expand Down
21 changes: 13 additions & 8 deletions common/taskvalidator/validateworkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
package taskvalidator

import (
"fmt"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"

"github.com/uber/cadence/common/log"
)
Expand All @@ -36,21 +37,25 @@ type Checker interface {

// checkerImpl is the implementation of the Checker interface.
type checkerImpl struct {
logger log.Logger
logger log.Logger
metricsClient metrics.Client
}

// NewWfChecker creates a new instance of Checker.
func NewWfChecker(logger log.Logger) Checker {
return &checkerImpl{logger: logger}
func NewWfChecker(logger log.Logger, metrics metrics.Client) Checker {
return &checkerImpl{logger: logger,
metricsClient: metrics}
}

// WorkflowCheckforValidation is a dummy implementation of workflow validation.
func (w *checkerImpl) WorkflowCheckforValidation(workflowID string, domainID string, runID string) error {
// Emitting just the log to ensure that the workflow is called for now.
// TODO: add some validations to check the wf for corruptions.
w.logger.Info(fmt.Sprintf("WorkflowCheckforValidation. DomainID: %v, WorkflowID: %v, RunID: %v",
domainID,
workflowID,
runID))
w.logger.Info("WorkflowCheckforValidation",
tag.WorkflowID(workflowID),
tag.WorkflowRunID(runID),
tag.WorkflowDomainID(domainID))
// Emit the number of workflows that have come in for the validation.
w.metricsClient.Scope(metrics.TaskValidatorScope).IncCounter(metrics.ValidatedWorkflowCount)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which tags will this metric have? can you share example from local run?

return nil
}
29 changes: 21 additions & 8 deletions common/taskvalidator/validateworkflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,33 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/metrics"
)

// MockMetricsScope implements the metrics.Scope interface for testing purposes.
type MockMetricsScope struct{}

func (s *MockMetricsScope) IncCounter(counter int) {}

func TestWorkflowCheckforValidation(t *testing.T) {
// Create a logger for testing
// Create a mock logger and metrics client
logger := log.NewNoop()
metricsClient := metrics.NewNoopMetricsClient()

// Create an instance of checkerImpl with the mock logger and metrics client
checker := NewWfChecker(logger, metricsClient)

// Define test inputs
workflowID := "testWorkflowID"
domainID := "testDomainID"
runID := "testRunID"

// Create a new Checker
checker := NewWfChecker(logger)
// Call the method being tested
err := checker.WorkflowCheckforValidation(workflowID, domainID, runID)

// Test with sample data
err := checker.WorkflowCheckforValidation("workflow123", "domain456", "run789")
// Assert that the method returned no error
assert.NoError(t, err)

// Assert that there is no error
assert.Nil(t, err)
// Add additional assertions as needed based on the expected behavior of the method
}