Skip to content

Commit

Permalink
Allow configuring MaxWorkflowRetentionInDays (#3982)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Feb 18, 2021
1 parent 2f822ee commit 0a6ed6e
Show file tree
Hide file tree
Showing 13 changed files with 70 additions and 50 deletions.
4 changes: 0 additions & 4 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,6 @@ const (
// CriticalLongPollTimeout is a threshold for the context timeout passed into long poll API,
// below which a warning will be logged
CriticalLongPollTimeout = time.Second * 20
// MaxWorkflowRetentionPeriodInDays is the maximum of workflow retention when registering domain
// !!! Do NOT simply decrease this number, because it is being used by history scavenger to avoid race condition against history archival.
// Check more details in history scanner(scavenger)
MaxWorkflowRetentionPeriodInDays = 30
)

const (
Expand Down
7 changes: 5 additions & 2 deletions common/domain/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ package domain
import "time"

const (
// MinRetentionDays is the minimal retention days for any domain
MinRetentionDays = 1
// DefaultMinWorkflowRetentionInDays is the minimal retention days for any domain
DefaultMinWorkflowRetentionInDays = 1

// DefaultMaxWorkflowRetentionInDays is a shared config in both Frontend and Worker, they need the same default value
DefaultMaxWorkflowRetentionInDays = 30

// MaxBadBinaries is the maximal number of bad client binaries stored in a domain
MaxBadBinaries = 10
Expand Down
1 change: 1 addition & 0 deletions common/domain/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type (
// Config is the domain config for domain handler
Config struct {
MinRetentionDays dynamicconfig.IntPropertyFn
MaxRetentionDays dynamicconfig.IntPropertyFn
MaxBadBinaryCount dynamicconfig.IntPropertyFnWithDomainFilter
FailoverCoolDown dynamicconfig.DurationPropertyFnWithDomainFilter
}
Expand Down
2 changes: 1 addition & 1 deletion common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2337,7 +2337,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ActivityInfoSize: {metricName: "activity_info_size", metricType: Timer},
TimerInfoSize: {metricName: "timer_info_size", metricType: Timer},
ChildInfoSize: {metricName: "child_info_size", metricType: Timer},
SignalInfoSize: {metricName: "signal_info_size", metricType: Timer},
SignalInfoSize: {metricName: "signal_info_size", metricType: Timer},
BufferedEventsSize: {metricName: "buffered_events_size", metricType: Timer},
ActivityInfoCount: {metricName: "activity_info_count", metricType: Timer},
TimerInfoCount: {metricName: "timer_info_count", metricType: Timer},
Expand Down
5 changes: 5 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ var keys = map[Key]string{
EnableGracefulFailover: "system.enableGracefulFailover",
TransactionSizeLimit: "system.transactionSizeLimit",
PersistenceErrorInjectionRate: "system.persistenceErrorInjectionRate",
MaxRetentionDays: "system.maxRetentionDays",
MinRetentionDays: "system.minRetentionDays",
MaxDecisionStartToCloseSeconds: "system.maxDecisionStartToCloseSeconds",
DisallowQuery: "system.disallowQuery",
Expand Down Expand Up @@ -406,6 +407,10 @@ const (
TransactionSizeLimit
// PersistenceErrorInjectionRate is the rate for injecting random error in persistence
PersistenceErrorInjectionRate
// MaxRetentionDays is the maximum retention allowed when registering a domain
// !!! Do NOT simply decrease this number, because it is being used by history scavenger to avoid race condition against history archival.
// Check more details in history scanner(scavenger)
MaxRetentionDays
// MinRetentionDays is the minimal allowed retention days for domain
MinRetentionDays
// MaxDecisionStartToCloseSeconds is the minimal allowed decision start to close timeout in seconds
Expand Down
3 changes: 2 additions & 1 deletion service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, enableReadFro
SendRawWorkflowHistory: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.SendRawWorkflowHistory, sendRawWorkflowHistory),
domainConfig: domain.Config{
MaxBadBinaryCount: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendMaxBadBinaries, domain.MaxBadBinaries),
MinRetentionDays: dc.GetIntProperty(dynamicconfig.MinRetentionDays, domain.MinRetentionDays),
MinRetentionDays: dc.GetIntProperty(dynamicconfig.MinRetentionDays, domain.DefaultMinWorkflowRetentionInDays),
MaxRetentionDays: dc.GetIntProperty(dynamicconfig.MaxRetentionDays, domain.DefaultMaxWorkflowRetentionInDays),
FailoverCoolDown: dc.GetDurationPropertyFilteredByDomain(dynamicconfig.FrontendFailoverCoolDown, domain.FailoverCoolDown),
},
}
Expand Down
2 changes: 1 addition & 1 deletion service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (wh *WorkflowHandler) RegisterDomain(ctx context.Context, registerRequest *
return errRequestNotSet
}

if registerRequest.GetWorkflowExecutionRetentionPeriodInDays() > common.MaxWorkflowRetentionPeriodInDays {
if registerRequest.GetWorkflowExecutionRetentionPeriodInDays() > int32(wh.config.domainConfig.MaxRetentionDays()) {
return errInvalidRetention
}

Expand Down
51 changes: 29 additions & 22 deletions service/worker/scanner/history/scavenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service/dynamicconfig"
"github.com/uber/cadence/common/types"
)

Expand All @@ -48,14 +49,15 @@ type (

// Scavenger is the type that holds the state for history scavenger daemon
Scavenger struct {
db p.HistoryManager
client history.Client
hbd ScavengerHeartbeatDetails
rps int
limiter *rate.Limiter
metrics metrics.Client
logger log.Logger
isInTest bool
db p.HistoryManager
client history.Client
hbd ScavengerHeartbeatDetails
rps int
limiter *rate.Limiter
maxWorkflowRetentionInDays dynamicconfig.IntPropertyFn
metrics metrics.Client
logger log.Logger
isInTest bool
}

taskDetail struct {
Expand All @@ -74,14 +76,17 @@ const (
// used this to decide how many goroutines to process
rpsPerConcurrency = 50
pageSize = 1000
// only clean up history branches that older than this threshold
// we double the MaxWorkflowRetentionPeriodInDays to avoid racing condition with history archival.
// Our history archiver delete mutable state, and then upload history to blob store and then delete history.
// This scanner will face racing condition with archiver because it relys on describe mutable state returning entityNotExist error.
// That's why we need to keep MaxWorkflowRetentionPeriodInDays stable and not decreasing all the time.
cleanUpThreshold = time.Hour * 24 * common.MaxWorkflowRetentionPeriodInDays * 2
)

// only clean up history branches that older than this threshold
// we double the MaxWorkflowRetentionPeriodInDays to avoid racing condition with history archival.
// Our history archiver delete mutable state, and then upload history to blob store and then delete history.
// This scanner will face racing condition with archiver because it relys on describe mutable state returning entityNotExist error.
// That's why we need to keep MaxWorkflowRetentionPeriodInDays stable and not decreasing all the time.
func getHistoryCleanupThreshold(maxWorkflowRetentionInDays int) time.Duration {
return time.Hour * 24 * time.Duration(maxWorkflowRetentionInDays) * 2
}

// NewScavenger returns an instance of history scavenger daemon
// The Scavenger can be started by calling the Run() method on the
// returned object. Calling the Run() method will result in one
Expand All @@ -96,18 +101,20 @@ func NewScavenger(
hbd ScavengerHeartbeatDetails,
metricsClient metrics.Client,
logger log.Logger,
maxWorkflowRetentionInDays dynamicconfig.IntPropertyFn,
) *Scavenger {

rateLimiter := rate.NewLimiter(rate.Limit(rps), rps)

return &Scavenger{
db: db,
client: client,
hbd: hbd,
rps: rps,
limiter: rateLimiter,
metrics: metricsClient,
logger: logger,
db: db,
client: client,
hbd: hbd,
rps: rps,
limiter: rateLimiter,
maxWorkflowRetentionInDays: maxWorkflowRetentionInDays,
metrics: metricsClient,
logger: logger,
}
}

Expand Down Expand Up @@ -135,7 +142,7 @@ func (s *Scavenger) Run(ctx context.Context) (ScavengerHeartbeatDetails, error)
errorsOnSplitting := 0
// send all tasks
for _, br := range resp.Branches {
if time.Now().Add(-cleanUpThreshold).Before(br.ForkTime) {
if time.Now().Add(-1 * getHistoryCleanupThreshold(s.maxWorkflowRetentionInDays())).Before(br.ForkTime) {
batchCount--
skips++
s.metrics.IncCounter(metrics.HistoryScavengerScope, metrics.HistoryScavengerSkipCount)
Expand Down
38 changes: 21 additions & 17 deletions service/worker/scanner/history/scavenger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ import (

"github.com/uber/cadence/client/history"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/domain"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/mocks"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service/dynamicconfig"
"github.com/uber/cadence/common/types"
)

Expand Down Expand Up @@ -67,7 +69,9 @@ func (s *ScavengerTestSuite) createTestScavenger(rps int) (*mocks.HistoryV2Manag
db := &mocks.HistoryV2Manager{}
controller := gomock.NewController(s.T())
workflowClient := history.NewMockClient(controller)
scvgr := NewScavenger(db, 100, workflowClient, ScavengerHeartbeatDetails{}, s.metric, s.logger)

maxWorkflowRetentionInDays := dynamicconfig.GetIntPropertyFn(domain.DefaultMaxWorkflowRetentionInDays)
scvgr := NewScavenger(db, rps, workflowClient, ScavengerHeartbeatDetails{}, s.metric, s.logger, maxWorkflowRetentionInDays)
scvgr.isInTest = true
return db, workflowClient, scvgr, controller
}
Expand Down Expand Up @@ -135,13 +139,13 @@ func (s *ScavengerTestSuite) TestAllErrorSplittingTasksTwoPages() {
{
TreeID: "treeID1",
BranchID: "branchID1",
ForkTime: time.Now().Add(-cleanUpThreshold * 2),
ForkTime: time.Now().Add(-getHistoryCleanupThreshold(domain.DefaultMaxWorkflowRetentionInDays) * 2),
Info: "error-info",
},
{
TreeID: "treeID2",
BranchID: "branchID2",
ForkTime: time.Now().Add(-cleanUpThreshold * 2),
ForkTime: time.Now().Add(-getHistoryCleanupThreshold(domain.DefaultMaxWorkflowRetentionInDays) * 2),
Info: "error-info",
},
},
Expand All @@ -155,13 +159,13 @@ func (s *ScavengerTestSuite) TestAllErrorSplittingTasksTwoPages() {
{
TreeID: "treeID3",
BranchID: "branchID3",
ForkTime: time.Now().Add(-cleanUpThreshold * 2),
ForkTime: time.Now().Add(-getHistoryCleanupThreshold(domain.DefaultMaxWorkflowRetentionInDays) * 2),
Info: "error-info",
},
{
TreeID: "treeID4",
BranchID: "branchID4",
ForkTime: time.Now().Add(-cleanUpThreshold * 2),
ForkTime: time.Now().Add(-getHistoryCleanupThreshold(domain.DefaultMaxWorkflowRetentionInDays) * 2),
Info: "error-info",
},
},
Expand All @@ -187,13 +191,13 @@ func (s *ScavengerTestSuite) TestNoGarbageTwoPages() {
{
TreeID: "treeID1",
BranchID: "branchID1",
ForkTime: time.Now().Add(-cleanUpThreshold * 2),
ForkTime: time.Now().Add(-getHistoryCleanupThreshold(domain.DefaultMaxWorkflowRetentionInDays) * 2),
Info: p.BuildHistoryGarbageCleanupInfo("domainID1", "workflowID1", "runID1"),
},
{
TreeID: "treeID2",
BranchID: "branchID2",
ForkTime: time.Now().Add(-cleanUpThreshold * 2),
ForkTime: time.Now().Add(-getHistoryCleanupThreshold(domain.DefaultMaxWorkflowRetentionInDays) * 2),
Info: p.BuildHistoryGarbageCleanupInfo("domainID2", "workflowID2", "runID2"),
},
},
Expand All @@ -207,13 +211,13 @@ func (s *ScavengerTestSuite) TestNoGarbageTwoPages() {
{
TreeID: "treeID3",
BranchID: "branchID3",
ForkTime: time.Now().Add(-cleanUpThreshold * 2),
ForkTime: time.Now().Add(-getHistoryCleanupThreshold(domain.DefaultMaxWorkflowRetentionInDays) * 2),
Info: p.BuildHistoryGarbageCleanupInfo("domainID3", "workflowID3", "runID3"),
},
{
TreeID: "treeID4",
BranchID: "branchID4",
ForkTime: time.Now().Add(-cleanUpThreshold * 2),
ForkTime: time.Now().Add(-getHistoryCleanupThreshold(domain.DefaultMaxWorkflowRetentionInDays) * 2),
Info: p.BuildHistoryGarbageCleanupInfo("domainID4", "workflowID4", "runID4"),
},
},
Expand Down Expand Up @@ -268,13 +272,13 @@ func (s *ScavengerTestSuite) TestDeletingBranchesTwoPages() {
{
TreeID: "treeID1",
BranchID: "branchID1",
ForkTime: time.Now().Add(-cleanUpThreshold * 2),
ForkTime: time.Now().Add(-getHistoryCleanupThreshold(domain.DefaultMaxWorkflowRetentionInDays) * 2),
Info: p.BuildHistoryGarbageCleanupInfo("domainID1", "workflowID1", "runID1"),
},
{
TreeID: "treeID2",
BranchID: "branchID2",
ForkTime: time.Now().Add(-cleanUpThreshold * 2),
ForkTime: time.Now().Add(-getHistoryCleanupThreshold(domain.DefaultMaxWorkflowRetentionInDays) * 2),
Info: p.BuildHistoryGarbageCleanupInfo("domainID2", "workflowID2", "runID2"),
},
},
Expand All @@ -287,13 +291,13 @@ func (s *ScavengerTestSuite) TestDeletingBranchesTwoPages() {
{
TreeID: "treeID3",
BranchID: "branchID3",
ForkTime: time.Now().Add(-cleanUpThreshold * 2),
ForkTime: time.Now().Add(-getHistoryCleanupThreshold(domain.DefaultMaxWorkflowRetentionInDays) * 2),
Info: p.BuildHistoryGarbageCleanupInfo("domainID3", "workflowID3", "runID3"),
},
{
TreeID: "treeID4",
BranchID: "branchID4",
ForkTime: time.Now().Add(-cleanUpThreshold * 2),
ForkTime: time.Now().Add(-getHistoryCleanupThreshold(domain.DefaultMaxWorkflowRetentionInDays) * 2),
Info: p.BuildHistoryGarbageCleanupInfo("domainID4", "workflowID4", "runID4"),
},
},
Expand Down Expand Up @@ -381,7 +385,7 @@ func (s *ScavengerTestSuite) TestMixesTwoPages() {
// split error
TreeID: "treeID2",
BranchID: "branchID2",
ForkTime: time.Now().Add(-cleanUpThreshold * 2),
ForkTime: time.Now().Add(-getHistoryCleanupThreshold(domain.DefaultMaxWorkflowRetentionInDays) * 2),
Info: "error-info",
},
},
Expand All @@ -395,21 +399,21 @@ func (s *ScavengerTestSuite) TestMixesTwoPages() {
// delete succ
TreeID: "treeID3",
BranchID: "branchID3",
ForkTime: time.Now().Add(-cleanUpThreshold * 2),
ForkTime: time.Now().Add(-getHistoryCleanupThreshold(domain.DefaultMaxWorkflowRetentionInDays) * 2),
Info: p.BuildHistoryGarbageCleanupInfo("domainID3", "workflowID3", "runID3"),
},
{
// delete fail
TreeID: "treeID4",
BranchID: "branchID4",
ForkTime: time.Now().Add(-cleanUpThreshold * 2),
ForkTime: time.Now().Add(-getHistoryCleanupThreshold(domain.DefaultMaxWorkflowRetentionInDays) * 2),
Info: p.BuildHistoryGarbageCleanupInfo("domainID4", "workflowID4", "runID4"),
},
{
// not delete
TreeID: "treeID5",
BranchID: "branchID5",
ForkTime: time.Now().Add(-cleanUpThreshold * 2),
ForkTime: time.Now().Add(-getHistoryCleanupThreshold(domain.DefaultMaxWorkflowRetentionInDays) * 2),
Info: p.BuildHistoryGarbageCleanupInfo("domainID5", "workflowID5", "runID5"),
},
},
Expand Down
3 changes: 2 additions & 1 deletion service/worker/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ type (
// HistoryScannerEnabled indicates if history scanner should be started as part of scanner
HistoryScannerEnabled dynamicconfig.BoolPropertyFn
// ShardScanners is a list of shard scanner configs
ShardScanners []*shardscanner.ScannerConfig
ShardScanners []*shardscanner.ScannerConfig
MaxWorkflowRetentionInDays dynamicconfig.IntPropertyFn
}

// BootstrapParams contains the set of params needed to bootstrap
Expand Down
1 change: 1 addition & 0 deletions service/worker/scanner/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func HistoryScavengerActivity(
hbd,
res.GetMetricsClient(),
res.GetLogger(),
ctx.cfg.MaxWorkflowRetentionInDays,
)
return scavenger.Run(activityCtx)
}
Expand Down
1 change: 1 addition & 0 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func NewConfig(params *service.BootstrapParams) *Config {
executions.CurrentExecutionScannerConfig(dc),
timers.ScannerConfig(dc),
},
MaxWorkflowRetentionInDays: dc.GetIntProperty(dynamicconfig.MaxRetentionDays, domain.DefaultMaxWorkflowRetentionInDays),
},
BatcherCfg: &batcher.Config{
AdminOperationToken: dc.GetStringProperty(dynamicconfig.AdminOperationToken, common.DefaultAdminOperationToken),
Expand Down
2 changes: 1 addition & 1 deletion tools/cli/domainUtils.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func initializeDomainHandler(
) domain.Handler {

domainConfig := domain.Config{
MinRetentionDays: dynamicconfig.GetIntPropertyFn(domain.MinRetentionDays),
MinRetentionDays: dynamicconfig.GetIntPropertyFn(domain.DefaultMinWorkflowRetentionInDays),
MaxBadBinaryCount: dynamicconfig.GetIntPropertyFilteredByDomain(domain.MaxBadBinaries),
FailoverCoolDown: dynamicconfig.GetDurationPropertyFnFilteredByDomain(domain.FailoverCoolDown),
}
Expand Down

0 comments on commit 0a6ed6e

Please sign in to comment.