Skip to content

Commit

Permalink
Added deprecated domain check to the taskvalidator (#5580)
Browse files Browse the repository at this point in the history
* Added deprecated domain check to the taskvalidator

* Added TODOs

* fixed lint

* Resolved code comments

* Resolved code comments

* Minor fixes
  • Loading branch information
agautam478 authored Jan 6, 2024
1 parent a184c39 commit 0cdd8d0
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 31 deletions.
8 changes: 8 additions & 0 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,3 +936,11 @@ func GetActiveDomainByID(cache DomainCache, currentCluster string, domainID stri

return domain, nil
}

// This function checks the domain status to see if the domain has been deprecated or deleted.
func (entry *DomainCacheEntry) IsDeprecatedOrDeleted() bool {
if entry.info.Status == common.DeprecatedDomainStatus || entry.info.Status == common.DeletedDomainStatus {
return true
}
return false
}
5 changes: 5 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,8 @@ const (
// DefaultHistoryMaxAutoResetPoints is the default maximum number for auto reset points
DefaultHistoryMaxAutoResetPoints = 20
)

const (
DeprecatedDomainStatus = 1
DeletedDomainStatus = 2
)
8 changes: 2 additions & 6 deletions common/reconciliation/invariant/inactiveDomainExists.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,12 @@ package invariant
import (
"context"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/reconciliation/entity"
)

const (
deprecatedDomainStatus = 1
deletedDomainStatus = 2
)

type (
inactiveDomainExists struct {
pr persistence.Retryer
Expand Down Expand Up @@ -77,7 +73,7 @@ func (idc *inactiveDomainExists) Check(
InfoDetails: err.Error(),
}
}
if domain.GetInfo().Status == deprecatedDomainStatus || domain.GetInfo().Status == deletedDomainStatus {
if domain.GetInfo().Status == common.DeprecatedDomainStatus || domain.GetInfo().Status == common.DeletedDomainStatus {
return CheckResult{
CheckResultType: CheckResultTypeCorrupted,
InvariantName: idc.Name(),
Expand Down
2 changes: 1 addition & 1 deletion common/resource/resourceImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func New(
isolationGroups: isolationGroupState,
isolationGroupConfigStore: isolationGroupStore, // can be nil where persistence is not available
partitioner: partitioner,
taskvalidator: taskvalidator.NewWfChecker(logger, params.MetricsClient),
taskvalidator: taskvalidator.NewWfChecker(logger, params.MetricsClient, domainCache),
}
return impl, nil
}
Expand Down
2 changes: 1 addition & 1 deletion common/resource/resourceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func NewTest(
// logger

Logger: logger,
taskvalidator: taskvalidator.NewWfChecker(logger, metrics.NewClient(scope, serviceMetricsIndex)),
taskvalidator: taskvalidator.NewWfChecker(logger, metrics.NewClient(scope, serviceMetricsIndex), cache.NewMockDomainCache(controller)),
}
}

Expand Down
31 changes: 27 additions & 4 deletions common/taskvalidator/validateworkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package taskvalidator

import (
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
Expand All @@ -38,25 +39,47 @@ type Checker interface {
type checkerImpl struct {
logger log.Logger
metricsClient metrics.Client
dc cache.DomainCache
}

// NewWfChecker creates a new instance of Checker.
func NewWfChecker(logger log.Logger, metrics metrics.Client) Checker {
return &checkerImpl{logger: logger,
metricsClient: metrics}
func NewWfChecker(logger log.Logger, metrics metrics.Client, domainCache cache.DomainCache) Checker {
return &checkerImpl{
logger: logger,
metricsClient: metrics,
dc: domainCache,
}
}

// WorkflowCheckforValidation is a dummy implementation of workflow validation.
func (w *checkerImpl) WorkflowCheckforValidation(workflowID string, domainID string, domainName string, runID string) error {
// Emitting just the log to ensure that the workflow is called for now.
// TODO: add some validations to check the wf for corruptions.
// TODO: Add stale workflow check validation.
w.logger.Info("WorkflowCheckforValidation",
tag.WorkflowID(workflowID),
tag.WorkflowRunID(runID),
tag.WorkflowDomainID(domainID),
tag.WorkflowDomainName(domainName))
// Emit the number of workflows that have come in for the validation. Including the domain tag.
// The domain name will be useful when I introduce a flipr switch to turn on validation.
// TODO: Add this as a first validation before the stale workflow check. Add a deleteworkflow call if true.
err := w.deprecatedDomainCheck(domainID, domainName)
if err != nil {
return err
}
w.metricsClient.Scope(metrics.TaskValidatorScope, metrics.DomainTag(domainName)).IncCounter(metrics.ValidatedWorkflowCount)
return nil
}

func (w *checkerImpl) deprecatedDomainCheck(domainID string, domainName string) error {
domain, err := w.dc.GetDomainByID(domainID)
if err != nil {
return err
}
status := domain.IsDeprecatedOrDeleted()
if status {
w.logger.Info("The workflow domain doesn't exist", tag.WorkflowDomainID(domainID),
tag.WorkflowDomainName(domainName))
}
return nil
}
47 changes: 28 additions & 19 deletions common/taskvalidator/validateworkflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,43 @@ package taskvalidator
import (
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"

"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/service/history/constants"
)

// MockMetricsScope implements the metrics.Scope interface for testing purposes.
type MockMetricsScope struct{}

func (s *MockMetricsScope) IncCounter(counter int) {}

func TestWorkflowCheckforValidation(t *testing.T) {
// Create a mock logger and metrics client
logger := log.NewNoop()
metricsClient := metrics.NewNoopMetricsClient()
testCases := []struct {
name string
domainID string
runID string
}{
{"DomainFetchSuccess", "domain-id-success", "run-id-success"},
{"DomainFetchFailure", "domain-id-failure", "run-id-failure"},
}

// Create an instance of checkerImpl with the mock logger and metrics client
checker := NewWfChecker(logger, metricsClient)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

// Define test inputs
workflowID := "testWorkflowID"
domainID := "testDomainID"
runID := "testRunID"
domainName := "testDomainName"
mockDomainCache := cache.NewMockDomainCache(mockCtrl)
logger := log.NewNoop()
metricsClient := metrics.NewNoopMetricsClient()
checker := NewWfChecker(logger, metricsClient, mockDomainCache)

// Call the method being tested
err := checker.WorkflowCheckforValidation(workflowID, domainID, domainName, runID)
// Set up the mock behavior for GetDomainByID
mockDomainCache.EXPECT().
GetDomainByID(tc.domainID).
Return(constants.TestGlobalDomainEntry, nil).
AnyTimes()

// Assert that the method returned no error
assert.NoError(t, err)
err := checker.WorkflowCheckforValidation("workflowID", tc.domainID, "domainName", tc.runID)
assert.NoError(t, err)
})
}
}

0 comments on commit 0cdd8d0

Please sign in to comment.