From 186d2535f1fcdbd132b44ff180e4f2adfdff9956 Mon Sep 17 00:00:00 2001 From: agautam478 <72432016+agautam478@users.noreply.github.com> Date: Wed, 6 Dec 2023 10:11:52 -0800 Subject: [PATCH] Add metrics to monitor task validation. (#5466) * resolved merge conflicts * Added metrics to monitor the number of workflows going for validation --- common/metrics/defs.go | 7 +++-- .../fallbackdynamicratelimiterfactory_test.go | 1 + common/resource/resource.go | 1 - common/resource/resourceImpl.go | 2 +- common/resource/resourceTest.go | 2 +- common/taskvalidator/validateworkflow.go | 21 +++++++++----- common/taskvalidator/validateworkflow_test.go | 29 ++++++++++++++----- 7 files changed, 42 insertions(+), 21 deletions(-) diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 9a1ead49d3b..716584ab2e9 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -782,7 +782,8 @@ const ( ClusterMetadataScope // GetAvailableIsolationGroupsScope is the metric for the default partitioner's getIsolationGroups operation GetAvailableIsolationGroupsScope - + // TaskValidatorScope is the metric for the taskvalidator's workflow check operation. + TaskValidatorScope NumCommonScopes ) @@ -1623,6 +1624,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ GetAvailableIsolationGroupsScope: {operation: "GetAvailableIsolationGroups"}, DomainFailoverScope: {operation: "DomainFailover"}, + TaskValidatorScope: {operation: "TaskValidation"}, DomainReplicationQueueScope: {operation: "DomainReplicationQueue"}, ClusterMetadataScope: {operation: "ClusterMetadata"}, }, @@ -2085,6 +2087,7 @@ const ( IsolationGroupStatePollerUnavailable IsolationGroupStateDrained IsolationGroupStateHealthy + ValidatedWorkflowCount NumCommonMetrics // Needs to be last on this list for iota numbering ) @@ -2360,7 +2363,6 @@ const ( LargeHistoryEventCount LargeHistorySizeCount UpdateWorkflowExecutionCount - NumHistoryMetrics ) @@ -2716,6 +2718,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ IsolationGroupStatePollerUnavailable: {metricName: "isolation_group_poller_unavailable", metricType: Counter}, IsolationGroupStateDrained: {metricName: "isolation_group_drained", metricType: Counter}, IsolationGroupStateHealthy: {metricName: "isolation_group_healthy", metricType: Counter}, + ValidatedWorkflowCount: {metricName: "task_validator_count", metricType: Counter}, }, History: { TaskRequests: {metricName: "task_requests", metricType: Counter}, diff --git a/common/quotas/fallbackdynamicratelimiterfactory_test.go b/common/quotas/fallbackdynamicratelimiterfactory_test.go index c87f5eb5dca..c05e77a011f 100644 --- a/common/quotas/fallbackdynamicratelimiterfactory_test.go +++ b/common/quotas/fallbackdynamicratelimiterfactory_test.go @@ -26,6 +26,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/uber/cadence/common/dynamicconfig" ) diff --git a/common/resource/resource.go b/common/resource/resource.go index cc8eeb0e505..22ddb4f619e 100644 --- a/common/resource/resource.go +++ b/common/resource/resource.go @@ -117,7 +117,6 @@ type ( GetIsolationGroupState() isolationgroup.State GetPartitioner() partition.Partitioner GetIsolationGroupStore() configstore.Client - //GetTaskValidator returns the taskValidator GetTaskValidator() taskvalidator.Checker } diff --git a/common/resource/resourceImpl.go b/common/resource/resourceImpl.go index 8ad0d1d5c27..10f7ef781f6 100644 --- a/common/resource/resourceImpl.go +++ b/common/resource/resourceImpl.go @@ -340,7 +340,7 @@ func New( isolationGroups: isolationGroupState, isolationGroupConfigStore: isolationGroupStore, // can be nil where persistence is not available partitioner: partitioner, - taskvalidator: taskvalidator.NewWfChecker(logger), + taskvalidator: taskvalidator.NewWfChecker(logger, params.MetricsClient), } return impl, nil } diff --git a/common/resource/resourceTest.go b/common/resource/resourceTest.go index 9d019fef982..517fbb1ed34 100644 --- a/common/resource/resourceTest.go +++ b/common/resource/resourceTest.go @@ -213,7 +213,7 @@ func NewTest( // logger Logger: logger, - taskvalidator: taskvalidator.NewWfChecker(logger), + taskvalidator: taskvalidator.NewWfChecker(logger, metrics.NewClient(scope, serviceMetricsIndex)), } } diff --git a/common/taskvalidator/validateworkflow.go b/common/taskvalidator/validateworkflow.go index a66bfd3bb9b..4a562c2ab14 100644 --- a/common/taskvalidator/validateworkflow.go +++ b/common/taskvalidator/validateworkflow.go @@ -24,7 +24,8 @@ package taskvalidator import ( - "fmt" + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/log" ) @@ -36,21 +37,25 @@ type Checker interface { // checkerImpl is the implementation of the Checker interface. type checkerImpl struct { - logger log.Logger + logger log.Logger + metricsClient metrics.Client } // NewWfChecker creates a new instance of Checker. -func NewWfChecker(logger log.Logger) Checker { - return &checkerImpl{logger: logger} +func NewWfChecker(logger log.Logger, metrics metrics.Client) Checker { + return &checkerImpl{logger: logger, + metricsClient: metrics} } // WorkflowCheckforValidation is a dummy implementation of workflow validation. func (w *checkerImpl) WorkflowCheckforValidation(workflowID string, domainID string, runID string) error { // Emitting just the log to ensure that the workflow is called for now. // TODO: add some validations to check the wf for corruptions. - w.logger.Info(fmt.Sprintf("WorkflowCheckforValidation. DomainID: %v, WorkflowID: %v, RunID: %v", - domainID, - workflowID, - runID)) + w.logger.Info("WorkflowCheckforValidation", + tag.WorkflowID(workflowID), + tag.WorkflowRunID(runID), + tag.WorkflowDomainID(domainID)) + // Emit the number of workflows that have come in for the validation. + w.metricsClient.Scope(metrics.TaskValidatorScope).IncCounter(metrics.ValidatedWorkflowCount) return nil } diff --git a/common/taskvalidator/validateworkflow_test.go b/common/taskvalidator/validateworkflow_test.go index a183f44ce21..304f25ef6c2 100644 --- a/common/taskvalidator/validateworkflow_test.go +++ b/common/taskvalidator/validateworkflow_test.go @@ -26,20 +26,33 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/metrics" ) +// MockMetricsScope implements the metrics.Scope interface for testing purposes. +type MockMetricsScope struct{} + +func (s *MockMetricsScope) IncCounter(counter int) {} + func TestWorkflowCheckforValidation(t *testing.T) { - // Create a logger for testing + // Create a mock logger and metrics client logger := log.NewNoop() + metricsClient := metrics.NewNoopMetricsClient() + + // Create an instance of checkerImpl with the mock logger and metrics client + checker := NewWfChecker(logger, metricsClient) + + // Define test inputs + workflowID := "testWorkflowID" + domainID := "testDomainID" + runID := "testRunID" - // Create a new Checker - checker := NewWfChecker(logger) + // Call the method being tested + err := checker.WorkflowCheckforValidation(workflowID, domainID, runID) - // Test with sample data - err := checker.WorkflowCheckforValidation("workflow123", "domain456", "run789") + // Assert that the method returned no error + assert.NoError(t, err) - // Assert that there is no error - assert.Nil(t, err) + // Add additional assertions as needed based on the expected behavior of the method }