Skip to content

Task manager - task cleanup on passive side using task completer #6514

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

12 changes: 12 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
@@ -1648,6 +1648,12 @@ const (
// Value type: Bool
// Default value: false
MatchingEnableTasklistGuardAgainstOwnershipShardLoss
// MatchingEnableStandbyTaskCompletion is to enable completion of tasks in the domain's passive side
// KeyName: matching.enableStandbyTaskCompletion
// Value type: Bool
// Default value: false
// Allowed filters: DomainName,TasklistName,TasklistType
MatchingEnableStandbyTaskCompletion

MatchingEnableGetNumberOfPartitionsFromCache
MatchingEnableAdaptiveScaler
@@ -4026,6 +4032,12 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "MatchingEnableGetNumberOfPartitionsFromCache is to enable getting number of partitions from cache instead of dynamic config",
DefaultValue: false,
},
MatchingEnableStandbyTaskCompletion: {
KeyName: "matching.enableStandbyTaskCompletion",
Filters: []Filter{DomainName, TaskListName, TaskType},
Description: "MatchingEnableStandbyTaskCompletion is to enable completion of tasks in the domain's passive side",
DefaultValue: false,
},
MatchingEnableAdaptiveScaler: {
KeyName: "matching.enableAdaptiveScaler",
Filters: []Filter{DomainName, TaskListName, TaskType},
6 changes: 6 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
@@ -2634,6 +2634,9 @@ const (
EstimatedAddTaskQPSGauge
TaskListPartitionUpscaleThresholdGauge
TaskListPartitionDownscaleThresholdGauge
StandbyClusterTasksCompletedCounterPerTaskList
StandbyClusterTasksNotStartedCounterPerTaskList
StandbyClusterTasksCompletionFailurePerTaskList

NumMatchingMetrics
)
@@ -3326,6 +3329,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
EstimatedAddTaskQPSGauge: {metricName: "estimated_add_task_qps_per_tl", metricType: Gauge},
TaskListPartitionUpscaleThresholdGauge: {metricName: "tasklist_partition_upscale_threshold", metricType: Gauge},
TaskListPartitionDownscaleThresholdGauge: {metricName: "tasklist_partition_downscale_threshold", metricType: Gauge},
StandbyClusterTasksCompletedCounterPerTaskList: {metricName: "standby_cluster_tasks_completed_per_tl", metricType: Counter},
StandbyClusterTasksNotStartedCounterPerTaskList: {metricName: "standby_cluster_tasks_not_started_per_tl", metricType: Counter},
StandbyClusterTasksCompletionFailurePerTaskList: {metricName: "standby_cluster_tasks_completion_failure_per_tl", metricType: Counter},
},
Worker: {
ReplicatorMessages: {metricName: "replicator_messages"},
13 changes: 13 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
@@ -80,6 +80,10 @@ const (
replicationServiceBusyMaxInterval = 10 * time.Second
replicationServiceBusyExpirationInterval = 5 * time.Minute

taskCompleterInitialInterval = 1 * time.Second
taskCompleterMaxInterval = 10 * time.Second
taskCompleterExpirationInterval = 5 * time.Minute

contextExpireThreshold = 10 * time.Millisecond

// FailureReasonCompleteResultExceedsLimit is failureReason for complete result exceeds limit
@@ -203,6 +207,15 @@ func CreateReplicationServiceBusyRetryPolicy() backoff.RetryPolicy {
return policy
}

// CreateTaskCompleterRetryPolicy creates a retry policy to handle tasks not being started
func CreateTaskCompleterRetryPolicy() backoff.RetryPolicy {
policy := backoff.NewExponentialRetryPolicy(taskCompleterInitialInterval)
policy.SetMaximumInterval(taskCompleterMaxInterval)
policy.SetExpirationInterval(taskCompleterExpirationInterval)

return policy
}

// IsValidIDLength checks if id is valid according to its length
func IsValidIDLength(
id string,
9 changes: 9 additions & 0 deletions config/dynamicconfig/development.yaml
Original file line number Diff line number Diff line change
@@ -4,6 +4,15 @@ frontend.enableClientVersionCheck:
system.minRetentionDays:
- value: 0
constraints: {}
matching.enableStandbyTaskCompletion:
- value: true
constraints: {}
history.standbyClusterDelay:
- value: 30s
constraints: {}
history.standbyTaskMissingEventsResendDelay:
- value: 30s
constraints: {}
history.EnableConsistentQueryByDomain:
- value: true
constraints: {}
18 changes: 18 additions & 0 deletions service/history/constants/test_constants.go
Original file line number Diff line number Diff line change
@@ -134,4 +134,22 @@ var (
},
TestVersion,
)

// TestGlobalStandbyDomainEntry is the global standby domain cache entry for test
TestGlobalStandbyDomainEntry = cache.NewGlobalDomainCacheEntryForTest(
&persistence.DomainInfo{ID: TestDomainID, Name: TestDomainName},
&persistence.DomainConfig{
Retention: 1,
VisibilityArchivalStatus: types.ArchivalStatusEnabled,
VisibilityArchivalURI: "test:///visibility/archival",
},
&persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestAlternativeClusterName,
Clusters: []*persistence.ClusterReplicationConfig{
{ClusterName: cluster.TestCurrentClusterName},
{ClusterName: cluster.TestAlternativeClusterName},
},
},
TestVersion,
)
)
4 changes: 4 additions & 0 deletions service/matching/config/config.go
Original file line number Diff line number Diff line change
@@ -60,6 +60,7 @@ type (
PartitionDownscaleSustainedDuration dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
AdaptiveScalerUpdateInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
EnableAdaptiveScaler dynamicconfig.BoolPropertyFnWithTaskListInfoFilters
EnableStandbyTaskCompletion dynamicconfig.BoolPropertyFnWithTaskListInfoFilters

// Time to hold a poll request before returning an empty response if there are no tasks
LongPollExpirationInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
@@ -138,6 +139,8 @@ type (
TaskDispatchRPSTTL time.Duration
// task gc configuration
MaxTimeBetweenTaskDeletes time.Duration
// standby task completion configuration
EnableStandbyTaskCompletion func() bool
}
)

@@ -189,5 +192,6 @@ func NewConfig(dc *dynamicconfig.Collection, hostName string, getIsolationGroups
TaskDispatchRPSTTL: time.Minute,
MaxTimeBetweenTaskDeletes: time.Second,
AllIsolationGroups: getIsolationGroups,
EnableStandbyTaskCompletion: dc.GetBoolPropertyFilteredByTaskListInfo(dynamicconfig.MatchingEnableStandbyTaskCompletion),
}
}
1 change: 1 addition & 0 deletions service/matching/config/config_test.go
Original file line number Diff line number Diff line change
@@ -87,6 +87,7 @@ func TestNewConfig(t *testing.T) {
"PartitionDownscaleSustainedDuration": {dynamicconfig.MatchingPartitionDownscaleSustainedDuration, time.Duration(33)},
"AdaptiveScalerUpdateInterval": {dynamicconfig.MatchingAdaptiveScalerUpdateInterval, time.Duration(34)},
"EnableAdaptiveScaler": {dynamicconfig.MatchingEnableAdaptiveScaler, true},
"EnableStandbyTaskCompletion": {dynamicconfig.MatchingEnableStandbyTaskCompletion, false},
}
client := dynamicconfig.NewInMemoryClient()
for fieldName, expected := range fields {
1 change: 1 addition & 0 deletions service/matching/handler/engine.go
Original file line number Diff line number Diff line change
@@ -245,6 +245,7 @@ func (e *matchingEngineImpl) getTaskListManager(taskList *tasklist.Identifier, t
e.config,
e.timeSource,
e.timeSource.Now(),
e.historyService,
)
if err != nil {
e.taskListsLock.Unlock()
3 changes: 2 additions & 1 deletion service/matching/handler/engine_integration_test.go
Original file line number Diff line number Diff line change
@@ -225,7 +225,8 @@ func (s *matchingEngineSuite) TestOnlyUnloadMatchingInstance() {
&tlKind,
s.matchingEngine.config,
s.matchingEngine.timeSource,
s.matchingEngine.timeSource.Now())
s.matchingEngine.timeSource.Now(),
s.matchingEngine.historyService)
s.Require().NoError(err)

// try to unload a different tlm instance with the same taskListID
5 changes: 5 additions & 0 deletions service/matching/tasklist/interfaces.go
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interfaces_mock.go github.com/uber/cadence/service/matching/tasklist Manager
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interfaces_mock.go github.com/uber/cadence/service/matching/tasklist TaskMatcher
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interfaces_mock.go github.com/uber/cadence/service/matching/tasklist Forwarder
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interfaces_mock.go github.com/uber/cadence/service/matching/tasklist TaskCompleter

package tasklist

@@ -83,4 +84,8 @@ type (
AddReqTokenC() <-chan *ForwarderReqToken
PollReqTokenC(isolationGroup string) <-chan *ForwarderReqToken
}

TaskCompleter interface {
CompleteTaskIfStarted(ctx context.Context, task *InternalTask) error
}
)
37 changes: 37 additions & 0 deletions service/matching/tasklist/interfaces_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading