Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Splitting wfCacheEnabled config for internal and external requests #5647

Merged
merged 6 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1716,6 +1716,18 @@ const (
// Default value: false
// Allowed filters: DomainName
WorkflowIDCacheEnabled
// WorkflowIDCacheExternalEnabled is the key to enable/disable caching of workflowID specific information for external requests
// KeyName: history.workflowIDCacheExternalEnabled
// Value type: Bool
// Default value: false
// Allowed filters: DomainName
WorkflowIDCacheExternalEnabled
// WorkflowIDCacheInternalEnabled is the key to enable/disable caching of workflowID specific information for internal requests
// KeyName: history.workflowIDCacheInternalEnabled
// Value type: Bool
// Default value: false
// Allowed filters: DomainName
WorkflowIDCacheInternalEnabled
// AllowArchivingIncompleteHistory will continue on when seeing some error like history mutated(usually caused by database consistency issues)
// KeyName: worker.AllowArchivingIncompleteHistory
// Value type: Bool
Expand Down Expand Up @@ -4183,6 +4195,18 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "WorkflowIDCacheEnabled is the key to enable/disable caching of workflowID specific information",
DefaultValue: false,
},
WorkflowIDCacheExternalEnabled: DynamicBool{
KeyName: "history.workflowIDCacheExternalEnabled",
Filters: []Filter{DomainName},
Description: "WorkflowIDCacheExternalEnabled is the key to enable/disable caching of workflowID specific information for external requests",
DefaultValue: false,
},
WorkflowIDCacheInternalEnabled: DynamicBool{
KeyName: "history.workflowIDCacheInternalEnabled",
Filters: []Filter{DomainName},
Description: "WorkflowIDCacheInternalEnabled is the key to enable/disable caching of workflowID specific information for internal requests",
DefaultValue: false,
},
}

var FloatKeys = map[FloatKey]DynamicFloat{
Expand Down
16 changes: 10 additions & 6 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,11 @@ type Config struct {
EnableRecordWorkflowExecutionUninitialized dynamicconfig.BoolPropertyFnWithDomainFilter

// The following are used by the history workflowID cache
WorkflowIDCacheEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
WorkflowIDExternalRPS dynamicconfig.IntPropertyFnWithDomainFilter
WorkflowIDInternalRPS dynamicconfig.IntPropertyFnWithDomainFilter
WorkflowIDCacheEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
WorkflowIDCacheExternalEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
WorkflowIDCacheInternalEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
WorkflowIDExternalRPS dynamicconfig.IntPropertyFnWithDomainFilter
WorkflowIDInternalRPS dynamicconfig.IntPropertyFnWithDomainFilter

// The following are used by consistent query
EnableConsistentQuery dynamicconfig.BoolPropertyFn
Expand Down Expand Up @@ -552,9 +554,11 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, s
EnableReplicationTaskGeneration: dc.GetBoolPropertyFilteredByDomainIDAndWorkflowID(dynamicconfig.EnableReplicationTaskGeneration),
EnableRecordWorkflowExecutionUninitialized: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableRecordWorkflowExecutionUninitialized),

WorkflowIDCacheEnabled: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.WorkflowIDCacheEnabled),
WorkflowIDExternalRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.WorkflowIDExternalRPS),
WorkflowIDInternalRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.WorkflowIDInternalRPS),
WorkflowIDCacheEnabled: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.WorkflowIDCacheEnabled),
WorkflowIDCacheExternalEnabled: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.WorkflowIDCacheExternalEnabled),
WorkflowIDCacheInternalEnabled: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.WorkflowIDCacheInternalEnabled),
WorkflowIDExternalRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.WorkflowIDExternalRPS),
WorkflowIDInternalRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.WorkflowIDInternalRPS),

EnableConsistentQuery: dc.GetBoolProperty(dynamicconfig.EnableConsistentQuery),
EnableConsistentQueryByDomain: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableConsistentQueryByDomain),
Expand Down
18 changes: 10 additions & 8 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,16 @@ func NewHandler(
tokenSerializer: common.NewJSONTaskTokenSerializer(),
rateLimiter: quotas.NewDynamicRateLimiter(config.RPS.AsFloat64()),
workflowIDCache: workflowcache.New(workflowcache.Params{
TTL: workflowIDCacheTTL,
ExternalLimiterFactory: quotas.NewSimpleDynamicRateLimiterFactory(config.WorkflowIDExternalRPS),
InternalLimiterFactory: quotas.NewSimpleDynamicRateLimiterFactory(config.WorkflowIDInternalRPS),
WorkflowIDCacheEnabled: config.WorkflowIDCacheEnabled,
MaxCount: workflowIDCacheMaxCount,
DomainCache: resource.GetDomainCache(),
Logger: resource.GetLogger(),
MetricsClient: resource.GetMetricsClient(),
TTL: workflowIDCacheTTL,
ExternalLimiterFactory: quotas.NewSimpleDynamicRateLimiterFactory(config.WorkflowIDExternalRPS),
InternalLimiterFactory: quotas.NewSimpleDynamicRateLimiterFactory(config.WorkflowIDInternalRPS),
WorkflowIDCacheEnabled: config.WorkflowIDCacheEnabled,
WorkflowIDCacheExternalEnabled: config.WorkflowIDCacheExternalEnabled,
WorkflowIDCacheInternalEnabled: config.WorkflowIDCacheInternalEnabled,
MaxCount: workflowIDCacheMaxCount,
DomainCache: resource.GetDomainCache(),
Logger: resource.GetLogger(),
MetricsClient: resource.GetMetricsClient(),
}),
}

Expand Down
63 changes: 36 additions & 27 deletions service/history/workflowcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,16 @@ type WFCache interface {
}

type wfCache struct {
lru cache.Cache
externalLimiterFactory quotas.LimiterFactory
internalLimiterFactory quotas.LimiterFactory
workflowIDCacheEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
domainCache cache.DomainCache
metricsClient metrics.Client
logger log.Logger
getCacheItemFn func(domainName string, workflowID string) (*cacheValue, error)
lru cache.Cache
externalLimiterFactory quotas.LimiterFactory
internalLimiterFactory quotas.LimiterFactory
workflowIDCacheEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
workflowIDCacheExternalEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
workflowIDCacheInternalEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
domainCache cache.DomainCache
metricsClient metrics.Client
logger log.Logger
getCacheItemFn func(domainName string, workflowID string) (*cacheValue, error)
}

type cacheKey struct {
Expand All @@ -69,14 +71,16 @@ type cacheValue struct {

// Params is the parameters for a new WFCache
type Params struct {
TTL time.Duration
MaxCount int
ExternalLimiterFactory quotas.LimiterFactory
InternalLimiterFactory quotas.LimiterFactory
WorkflowIDCacheEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
DomainCache cache.DomainCache
MetricsClient metrics.Client
Logger log.Logger
TTL time.Duration
MaxCount int
ExternalLimiterFactory quotas.LimiterFactory
InternalLimiterFactory quotas.LimiterFactory
WorkflowIDCacheEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
WorkflowIDCacheExternalEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
WorkflowIDCacheInternalEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
DomainCache cache.DomainCache
MetricsClient metrics.Client
Logger log.Logger
}

// New creates a new WFCache
Expand All @@ -88,12 +92,14 @@ func New(params Params) WFCache {
MaxCount: params.MaxCount,
ActivelyEvict: true,
}),
externalLimiterFactory: params.ExternalLimiterFactory,
internalLimiterFactory: params.InternalLimiterFactory,
workflowIDCacheEnabled: params.WorkflowIDCacheEnabled,
domainCache: params.DomainCache,
metricsClient: params.MetricsClient,
logger: params.Logger,
externalLimiterFactory: params.ExternalLimiterFactory,
internalLimiterFactory: params.InternalLimiterFactory,
workflowIDCacheEnabled: params.WorkflowIDCacheEnabled,
workflowIDCacheExternalEnabled: params.WorkflowIDCacheExternalEnabled,
workflowIDCacheInternalEnabled: params.WorkflowIDCacheInternalEnabled,
domainCache: params.DomainCache,
metricsClient: params.MetricsClient,
logger: params.Logger,
}
// We set getCacheItemFn to cache.getCacheItem so that we can mock it in unit tests
cache.getCacheItemFn = cache.getCacheItem
Expand All @@ -116,11 +122,6 @@ func (c *wfCache) allow(domainID string, workflowID string, rateLimitType rateLi
return true
}

if !c.workflowIDCacheEnabled(domainName) {
// The cache is not enabled, so we allow the call through
return true
}

c.metricsClient.
Scope(metrics.HistoryClientWfIDCacheScope, metrics.DomainTag(domainName)).
UpdateGauge(metrics.WorkflowIDCacheSizeGauge, float64(c.lru.Size()))
Expand All @@ -135,12 +136,20 @@ func (c *wfCache) allow(domainID string, workflowID string, rateLimitType rateLi

switch rateLimitType {
case external:
if !c.workflowIDCacheEnabled(domainName) && !c.workflowIDCacheExternalEnabled(domainName) {
sankari165 marked this conversation as resolved.
Show resolved Hide resolved
// The cache is not enabled, so we allow the call through
return true
}
if !value.externalRateLimiter.Allow() {
c.emitRateLimitMetrics(domainID, workflowID, domainName, "external", metrics.WorkflowIDCacheRequestsExternalRatelimitedCounter)
return false
}
return true
case internal:
if !c.workflowIDCacheInternalEnabled(domainName) {
// The cache is not enabled, so we allow the call through
return true
}
if !value.internalRateLimiter.Allow() {
c.emitRateLimitMetrics(domainID, workflowID, domainName, "internal", metrics.WorkflowIDCacheRequestsInternalRatelimitedCounter)
return false
Expand Down
108 changes: 60 additions & 48 deletions service/history/workflowcache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,16 @@ func TestWfCache_AllowSingleWorkflow(t *testing.T) {

wfCache := New(Params{
// The cache TTL is set to 1 minute, so all requests will hit the cache
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: externalLimiterFactory,
InternalLimiterFactory: internalLimiterFactory,
WorkflowIDCacheEnabled: func(domain string) bool { return true },
Logger: log.NewNoop(),
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: externalLimiterFactory,
InternalLimiterFactory: internalLimiterFactory,
WorkflowIDCacheEnabled: func(domain string) bool { return false },
WorkflowIDCacheExternalEnabled: func(domain string) bool { return true },
WorkflowIDCacheInternalEnabled: func(domain string) bool { return true },
Logger: log.NewNoop(),
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
})

assert.True(t, wfCache.AllowExternal(testDomainID, testWorkflowID))
Expand Down Expand Up @@ -115,14 +117,16 @@ func TestWfCache_AllowMultipleWorkflow(t *testing.T) {
internalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(internalLimiterWf2).Times(1)

wfCache := New(Params{
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: externalLimiterFactory,
InternalLimiterFactory: internalLimiterFactory,
WorkflowIDCacheEnabled: func(domain string) bool { return true },
Logger: log.NewNoop(),
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: externalLimiterFactory,
InternalLimiterFactory: internalLimiterFactory,
WorkflowIDCacheEnabled: func(domain string) bool { return false },
WorkflowIDCacheExternalEnabled: func(domain string) bool { return true },
WorkflowIDCacheInternalEnabled: func(domain string) bool { return true },
Logger: log.NewNoop(),
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
})

assert.True(t, wfCache.AllowExternal(testDomainID, testWorkflowID))
Expand Down Expand Up @@ -155,14 +159,16 @@ func TestWfCache_AllowError(t *testing.T) {

// Setup the cache, we do not need the factories, as we will mock the getCacheItemFn
wfCache := New(Params{
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: nil,
InternalLimiterFactory: nil,
WorkflowIDCacheEnabled: func(domain string) bool { return true },
Logger: logger,
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: nil,
InternalLimiterFactory: nil,
WorkflowIDCacheEnabled: func(domain string) bool { return false },
WorkflowIDCacheExternalEnabled: func(domain string) bool { return true },
WorkflowIDCacheInternalEnabled: func(domain string) bool { return true },
Logger: logger,
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
}).(*wfCache)

// We set getCacheItemFn to a function that will return an error so that we can test the error logic
Expand Down Expand Up @@ -201,14 +207,16 @@ func TestWfCache_AllowDomainCacheError(t *testing.T) {

// Setup the cache, we do not need the factories, as we will mock the getCacheItemFn
wfCache := New(Params{
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: nil,
InternalLimiterFactory: nil,
WorkflowIDCacheEnabled: func(domain string) bool { return true },
Logger: logger,
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: nil,
InternalLimiterFactory: nil,
WorkflowIDCacheEnabled: func(domain string) bool { return false },
WorkflowIDCacheExternalEnabled: func(domain string) bool { return true },
WorkflowIDCacheInternalEnabled: func(domain string) bool { return true },
Logger: logger,
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
})

// We fail open
Expand All @@ -231,14 +239,16 @@ func TestWfCache_CacheDisabled(t *testing.T) {

// Setup the cache, we do not need the factories, as we will mock the getCacheItemFn
sankari165 marked this conversation as resolved.
Show resolved Hide resolved
wfCache := New(Params{
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: nil,
InternalLimiterFactory: nil,
WorkflowIDCacheEnabled: func(domain string) bool { return false },
Logger: logger,
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: nil,
InternalLimiterFactory: nil,
WorkflowIDCacheEnabled: func(domain string) bool { return false },
WorkflowIDCacheExternalEnabled: func(domain string) bool { return false },
WorkflowIDCacheInternalEnabled: func(domain string) bool { return false },
Logger: logger,
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
})

// We fail open
Expand Down Expand Up @@ -276,14 +286,16 @@ func TestWfCache_RejectLog(t *testing.T) {
expectRatelimitLog(logger, "internal")

wfCache := New(Params{
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: externalLimiterFactory,
InternalLimiterFactory: internalLimiterFactory,
WorkflowIDCacheEnabled: func(domain string) bool { return true },
Logger: logger,
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: externalLimiterFactory,
InternalLimiterFactory: internalLimiterFactory,
WorkflowIDCacheEnabled: func(domain string) bool { return false },
WorkflowIDCacheExternalEnabled: func(domain string) bool { return true },
WorkflowIDCacheInternalEnabled: func(domain string) bool { return true },
Logger: logger,
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
})

assert.False(t, wfCache.AllowExternal(testDomainID, testWorkflowID))
Expand Down