From 0cdd8d0836a9afcdc05435ab9925acfb0707721a Mon Sep 17 00:00:00 2001 From: agautam478 <72432016+agautam478@users.noreply.github.com> Date: Fri, 5 Jan 2024 18:56:28 -0800 Subject: [PATCH] Added deprecated domain check to the taskvalidator (#5580) * Added deprecated domain check to the taskvalidator * Added TODOs * fixed lint * Resolved code comments * Resolved code comments * Minor fixes --- common/cache/domainCache.go | 8 ++++ common/constants.go | 5 ++ .../invariant/inactiveDomainExists.go | 8 +--- common/resource/resourceImpl.go | 2 +- common/resource/resourceTest.go | 2 +- common/taskvalidator/validateworkflow.go | 31 ++++++++++-- common/taskvalidator/validateworkflow_test.go | 47 +++++++++++-------- 7 files changed, 72 insertions(+), 31 deletions(-) diff --git a/common/cache/domainCache.go b/common/cache/domainCache.go index 462cb089012..6b0e4bf958e 100644 --- a/common/cache/domainCache.go +++ b/common/cache/domainCache.go @@ -936,3 +936,11 @@ func GetActiveDomainByID(cache DomainCache, currentCluster string, domainID stri return domain, nil } + +// This function checks the domain status to see if the domain has been deprecated or deleted. +func (entry *DomainCacheEntry) IsDeprecatedOrDeleted() bool { + if entry.info.Status == common.DeprecatedDomainStatus || entry.info.Status == common.DeletedDomainStatus { + return true + } + return false +} diff --git a/common/constants.go b/common/constants.go index 360d12d0bf6..67d281029e2 100644 --- a/common/constants.go +++ b/common/constants.go @@ -264,3 +264,8 @@ const ( // DefaultHistoryMaxAutoResetPoints is the default maximum number for auto reset points DefaultHistoryMaxAutoResetPoints = 20 ) + +const ( + DeprecatedDomainStatus = 1 + DeletedDomainStatus = 2 +) diff --git a/common/reconciliation/invariant/inactiveDomainExists.go b/common/reconciliation/invariant/inactiveDomainExists.go index 796992f5c16..71ffff001ef 100644 --- a/common/reconciliation/invariant/inactiveDomainExists.go +++ b/common/reconciliation/invariant/inactiveDomainExists.go @@ -25,16 +25,12 @@ package invariant import ( "context" + "github.com/uber/cadence/common" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/reconciliation/entity" ) -const ( - deprecatedDomainStatus = 1 - deletedDomainStatus = 2 -) - type ( inactiveDomainExists struct { pr persistence.Retryer @@ -77,7 +73,7 @@ func (idc *inactiveDomainExists) Check( InfoDetails: err.Error(), } } - if domain.GetInfo().Status == deprecatedDomainStatus || domain.GetInfo().Status == deletedDomainStatus { + if domain.GetInfo().Status == common.DeprecatedDomainStatus || domain.GetInfo().Status == common.DeletedDomainStatus { return CheckResult{ CheckResultType: CheckResultTypeCorrupted, InvariantName: idc.Name(), diff --git a/common/resource/resourceImpl.go b/common/resource/resourceImpl.go index e2a968418ae..e2c376c4291 100644 --- a/common/resource/resourceImpl.go +++ b/common/resource/resourceImpl.go @@ -338,7 +338,7 @@ func New( isolationGroups: isolationGroupState, isolationGroupConfigStore: isolationGroupStore, // can be nil where persistence is not available partitioner: partitioner, - taskvalidator: taskvalidator.NewWfChecker(logger, params.MetricsClient), + taskvalidator: taskvalidator.NewWfChecker(logger, params.MetricsClient, domainCache), } return impl, nil } diff --git a/common/resource/resourceTest.go b/common/resource/resourceTest.go index d0724e5743c..99065d30cee 100644 --- a/common/resource/resourceTest.go +++ b/common/resource/resourceTest.go @@ -208,7 +208,7 @@ func NewTest( // logger Logger: logger, - taskvalidator: taskvalidator.NewWfChecker(logger, metrics.NewClient(scope, serviceMetricsIndex)), + taskvalidator: taskvalidator.NewWfChecker(logger, metrics.NewClient(scope, serviceMetricsIndex), cache.NewMockDomainCache(controller)), } } diff --git a/common/taskvalidator/validateworkflow.go b/common/taskvalidator/validateworkflow.go index 032fbde79c7..4a7672489a8 100644 --- a/common/taskvalidator/validateworkflow.go +++ b/common/taskvalidator/validateworkflow.go @@ -24,6 +24,7 @@ package taskvalidator import ( + "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" @@ -38,18 +39,22 @@ type Checker interface { type checkerImpl struct { logger log.Logger metricsClient metrics.Client + dc cache.DomainCache } // NewWfChecker creates a new instance of Checker. -func NewWfChecker(logger log.Logger, metrics metrics.Client) Checker { - return &checkerImpl{logger: logger, - metricsClient: metrics} +func NewWfChecker(logger log.Logger, metrics metrics.Client, domainCache cache.DomainCache) Checker { + return &checkerImpl{ + logger: logger, + metricsClient: metrics, + dc: domainCache, + } } // WorkflowCheckforValidation is a dummy implementation of workflow validation. func (w *checkerImpl) WorkflowCheckforValidation(workflowID string, domainID string, domainName string, runID string) error { // Emitting just the log to ensure that the workflow is called for now. - // TODO: add some validations to check the wf for corruptions. + // TODO: Add stale workflow check validation. w.logger.Info("WorkflowCheckforValidation", tag.WorkflowID(workflowID), tag.WorkflowRunID(runID), @@ -57,6 +62,24 @@ func (w *checkerImpl) WorkflowCheckforValidation(workflowID string, domainID str tag.WorkflowDomainName(domainName)) // Emit the number of workflows that have come in for the validation. Including the domain tag. // The domain name will be useful when I introduce a flipr switch to turn on validation. + // TODO: Add this as a first validation before the stale workflow check. Add a deleteworkflow call if true. + err := w.deprecatedDomainCheck(domainID, domainName) + if err != nil { + return err + } w.metricsClient.Scope(metrics.TaskValidatorScope, metrics.DomainTag(domainName)).IncCounter(metrics.ValidatedWorkflowCount) return nil } + +func (w *checkerImpl) deprecatedDomainCheck(domainID string, domainName string) error { + domain, err := w.dc.GetDomainByID(domainID) + if err != nil { + return err + } + status := domain.IsDeprecatedOrDeleted() + if status { + w.logger.Info("The workflow domain doesn't exist", tag.WorkflowDomainID(domainID), + tag.WorkflowDomainName(domainName)) + } + return nil +} diff --git a/common/taskvalidator/validateworkflow_test.go b/common/taskvalidator/validateworkflow_test.go index 9a0d27f62a3..d7807859e2b 100644 --- a/common/taskvalidator/validateworkflow_test.go +++ b/common/taskvalidator/validateworkflow_test.go @@ -25,34 +25,43 @@ package taskvalidator import ( "testing" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/service/history/constants" ) -// MockMetricsScope implements the metrics.Scope interface for testing purposes. -type MockMetricsScope struct{} - -func (s *MockMetricsScope) IncCounter(counter int) {} - func TestWorkflowCheckforValidation(t *testing.T) { - // Create a mock logger and metrics client - logger := log.NewNoop() - metricsClient := metrics.NewNoopMetricsClient() + testCases := []struct { + name string + domainID string + runID string + }{ + {"DomainFetchSuccess", "domain-id-success", "run-id-success"}, + {"DomainFetchFailure", "domain-id-failure", "run-id-failure"}, + } - // Create an instance of checkerImpl with the mock logger and metrics client - checker := NewWfChecker(logger, metricsClient) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() - // Define test inputs - workflowID := "testWorkflowID" - domainID := "testDomainID" - runID := "testRunID" - domainName := "testDomainName" + mockDomainCache := cache.NewMockDomainCache(mockCtrl) + logger := log.NewNoop() + metricsClient := metrics.NewNoopMetricsClient() + checker := NewWfChecker(logger, metricsClient, mockDomainCache) - // Call the method being tested - err := checker.WorkflowCheckforValidation(workflowID, domainID, domainName, runID) + // Set up the mock behavior for GetDomainByID + mockDomainCache.EXPECT(). + GetDomainByID(tc.domainID). + Return(constants.TestGlobalDomainEntry, nil). + AnyTimes() - // Assert that the method returned no error - assert.NoError(t, err) + err := checker.WorkflowCheckforValidation("workflowID", tc.domainID, "domainName", tc.runID) + assert.NoError(t, err) + }) + } }