Skip to content

Commit

Permalink
Fix status check for visibility and archival
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Jun 10, 2022
1 parent af932bd commit 8799985
Show file tree
Hide file tree
Showing 15 changed files with 88 additions and 66 deletions.
2 changes: 1 addition & 1 deletion cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (s *server) startService() common.Daemon {
dynamicconfig.AdvancedVisibilityWritingMode,
common.GetDefaultAdvancedVisibilityWritingMode(params.PersistenceConfig.IsAdvancedVisibilityConfigExist()),
)()
isAdvancedVisEnabled := advancedVisMode != common.AdvancedVisibilityWritingModeOff
isAdvancedVisEnabled := common.IsAdvancedVisibilityWritingEnabled(advancedVisMode, params.PersistenceConfig.IsAdvancedVisibilityConfigExist())
if isAdvancedVisEnabled {
params.MessagingClient = kafka.NewKafkaClient(&s.cfg.Kafka, params.MetricsClient, params.Logger, params.MetricScope, isAdvancedVisEnabled)
} else {
Expand Down
16 changes: 11 additions & 5 deletions common/archiver/archivalMetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ type (
archivalConfig struct {
staticClusterStatus ArchivalStatus
dynamicClusterStatus dynamicconfig.StringPropertyFn
enableRead dynamicconfig.BoolPropertyFn
staticEnableRead bool
dynamicEnableRead dynamicconfig.BoolPropertyFn
domainDefaultStatus types.ArchivalStatus
domainDefaultURI string
}
Expand Down Expand Up @@ -86,6 +87,7 @@ func NewArchivalMetadata(
historyConfig := NewArchivalConfig(
historyStatus,
dc.GetStringProperty(dynamicconfig.HistoryArchivalStatus, historyStatus),
historyReadEnabled,
dc.GetBoolProperty(dynamicconfig.EnableReadFromHistoryArchival, historyReadEnabled),
domainDefaults.History.Status,
domainDefaults.History.URI,
Expand All @@ -94,6 +96,7 @@ func NewArchivalMetadata(
visibilityConfig := NewArchivalConfig(
visibilityStatus,
dc.GetStringProperty(dynamicconfig.VisibilityArchivalStatus, visibilityStatus),
visibilityReadEnabled,
dc.GetBoolProperty(dynamicconfig.EnableReadFromVisibilityArchival, visibilityReadEnabled),
domainDefaults.Visibility.Status,
domainDefaults.Visibility.URI,
Expand All @@ -117,7 +120,8 @@ func (metadata *archivalMetadata) GetVisibilityConfig() ArchivalConfig {
func NewArchivalConfig(
staticClusterStatusStr string,
dynamicClusterStatus dynamicconfig.StringPropertyFn,
enableRead dynamicconfig.BoolPropertyFn,
staticEnableRead bool,
dynamicEnableRead dynamicconfig.BoolPropertyFn,
domainDefaultStatusStr string,
domainDefaultURI string,
) ArchivalConfig {
Expand All @@ -133,7 +137,8 @@ func NewArchivalConfig(
return &archivalConfig{
staticClusterStatus: staticClusterStatus,
dynamicClusterStatus: dynamicClusterStatus,
enableRead: enableRead,
staticEnableRead: staticEnableRead,
dynamicEnableRead: dynamicEnableRead,
domainDefaultStatus: domainDefaultStatus,
domainDefaultURI: domainDefaultURI,
}
Expand All @@ -144,7 +149,8 @@ func NewDisabledArchvialConfig() ArchivalConfig {
return &archivalConfig{
staticClusterStatus: ArchivalDisabled,
dynamicClusterStatus: nil,
enableRead: nil,
staticEnableRead: false,
dynamicEnableRead: nil,
domainDefaultStatus: types.ArchivalStatusDisabled,
domainDefaultURI: "",
}
Expand Down Expand Up @@ -177,7 +183,7 @@ func (a *archivalConfig) ReadEnabled() bool {
if !a.ClusterConfiguredForArchival() {
return false
}
return a.enableRead()
return a.staticEnableRead && a.dynamicEnableRead()
}

func (a *archivalConfig) GetDomainDefaultStatus() types.ArchivalStatus {
Expand Down
41 changes: 22 additions & 19 deletions common/persistence/visibilityDualManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,31 +166,34 @@ func (v *visibilityDualManager) chooseVisibilityManagerForWrite(ctx context.Cont

switch writeMode {
case common.AdvancedVisibilityWritingModeOff:
if v.dbVisibilityManager == nil {
return &types.InternalServiceError{
Message: fmt.Sprintf("visibility writing mode: %s is misconfigured", writeMode),
}
if v.dbVisibilityManager != nil {
return dbVisFunc()
} else {
v.logger.Warn("basic visibility is not available to write, fall back to advanced visibility")
return esVisFunc()
}
return dbVisFunc()
case common.AdvancedVisibilityWritingModeOn:
if v.esVisibilityManager == nil {
return &types.InternalServiceError{
Message: fmt.Sprintf("visibility writing mode: %s is misconfigured", writeMode),
}
if v.esVisibilityManager != nil {
return esVisFunc()
} else {
v.logger.Warn("advanced visibility is not available to write, fall back to basic visibility")
return dbVisFunc()
}

return esVisFunc()
case common.AdvancedVisibilityWritingModeDual:
if v.dbVisibilityManager == nil || v.esVisibilityManager == nil {
return &types.InternalServiceError{
Message: fmt.Sprintf("visibility writing mode: %s is misconfigured", writeMode),
if v.esVisibilityManager != nil {
if err := esVisFunc(); err != nil {
return err
}
if v.dbVisibilityManager != nil {
return dbVisFunc()
} else {
v.logger.Warn("basic visibility is not available to write")
return nil
}
} else {
v.logger.Warn("advanced visibility is not available to write")
return dbVisFunc()
}

if err := esVisFunc(); err != nil {
return err
}
return dbVisFunc()
default:
return &types.InternalServiceError{
Message: fmt.Sprintf("Unknown visibility writing mode: %s", writeMode),
Expand Down
10 changes: 10 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,16 @@ func GetDefaultAdvancedVisibilityWritingMode(isAdvancedVisConfigExist bool) stri
return AdvancedVisibilityWritingModeOff
}

// IsAdvancedVisibilityWritingEnabled returns true if we should write to advanced visibility
func IsAdvancedVisibilityWritingEnabled(advancedVisibilityWritingMode string, isAdvancedVisConfigExist bool) bool {
return advancedVisibilityWritingMode != AdvancedVisibilityWritingModeOff && isAdvancedVisConfigExist
}

// IsAdvancedVisibilityReadingEnabled returns true if we should read from advanced visibility
func IsAdvancedVisibilityReadingEnabled(isAdvancedVisReadEnabled, isAdvancedVisConfigExist bool) bool {
return isAdvancedVisReadEnabled && isAdvancedVisConfigExist
}

// ConvertIntMapToDynamicConfigMapProperty converts a map whose key value type are both int to
// a map value that is compatible with dynamic config's map property
func ConvertIntMapToDynamicConfigMapProperty(
Expand Down
5 changes: 3 additions & 2 deletions host/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,18 @@ func NewCluster(options *TestClusterConfig, logger log.Logger, params persistenc
setupShards(testBase, options.HistoryConfig.NumHistoryShards, logger)
archiverBase := newArchiverBase(options.EnableArchival, logger)
messagingClient := getMessagingClient(options.MessagingClientConfig, logger)
pConfig := testBase.Config()
pConfig.NumHistoryShards = options.HistoryConfig.NumHistoryShards
var esClient elasticsearch.GenericClient
if options.WorkerConfig.EnableIndexer {
var err error
esClient, err = elasticsearch.NewGenericClient(options.ESConfig, logger)
if err != nil {
return nil, err
}
pConfig.AdvancedVisibilityStore = "es-visibility"
}

pConfig := testBase.Config()
pConfig.NumHistoryShards = options.HistoryConfig.NumHistoryShards
scope := tally.NewTestScope("integration-test", nil)
metricsClient := metrics.NewClient(scope, metrics.ServiceIdx(0))
domainReplicationQueue := domain.NewReplicationQueue(
Expand Down
1 change: 0 additions & 1 deletion service/frontend/clusterRedirectionHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ func (s *clusterRedirectionHandlerSuite) SetupTest() {
),
0,
false,
false,
)
frontendHandler := NewWorkflowHandler(s.mockResource, s.config, nil, client.NewVersionChecker())

Expand Down
1 change: 0 additions & 1 deletion service/frontend/clusterRedirectionPolicy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ func (s *selectedAPIsForwardingRedirectionPolicySuite) SetupTest() {
),
0,
false,
false,
)
s.policy = newSelectedOrAllAPIsForwardingPolicy(
s.currentClusterName,
Expand Down
9 changes: 5 additions & 4 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
// Config represents configuration for cadence-frontend service
type Config struct {
NumHistoryShards int
IsAdvancedVisConfigExist bool
domainConfig domain.Config
PersistenceMaxQPS dynamicconfig.IntPropertyFn
PersistenceGlobalMaxQPS dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -111,17 +112,18 @@ type Config struct {
}

// NewConfig returns new service config with default values
func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, enableReadFromES bool, sendRawWorkflowHistory bool) *Config {
func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, isAdvancedVisConfigExist bool) *Config {
return &Config{
NumHistoryShards: numHistoryShards,
IsAdvancedVisConfigExist: isAdvancedVisConfigExist,
PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.FrontendPersistenceMaxQPS, 2000),
PersistenceGlobalMaxQPS: dc.GetIntProperty(dynamicconfig.FrontendPersistenceGlobalMaxQPS, 0),
VisibilityMaxPageSize: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendVisibilityMaxPageSize, 1000),
EnableVisibilitySampling: dc.GetBoolProperty(dynamicconfig.EnableVisibilitySampling, true),
EnableReadFromClosedExecutionV2: dc.GetBoolProperty(dynamicconfig.EnableReadFromClosedExecutionV2, false),
VisibilityListMaxQPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendVisibilityListMaxQPS, defaultVisibilityListMaxQPS()),
ESVisibilityListMaxQPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendESVisibilityListMaxQPS, 30),
EnableReadVisibilityFromES: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableReadVisibilityFromES, enableReadFromES),
EnableReadVisibilityFromES: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableReadVisibilityFromES, isAdvancedVisConfigExist),
ESIndexMaxResultWindow: dc.GetIntProperty(dynamicconfig.FrontendESIndexMaxResultWindow, 10000),
HistoryMaxPageSize: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendHistoryMaxPageSize, common.GetHistoryMaxPageSize),
UserRPS: dc.GetIntProperty(dynamicconfig.FrontendUserRPS, 1200),
Expand Down Expand Up @@ -157,7 +159,7 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, enableReadFro
SearchAttributesTotalSizeLimit: dc.GetIntPropertyFilteredByDomain(dynamicconfig.SearchAttributesTotalSizeLimit, 40*1024),
VisibilityArchivalQueryMaxPageSize: dc.GetIntProperty(dynamicconfig.VisibilityArchivalQueryMaxPageSize, 10000),
DisallowQuery: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.DisallowQuery, false),
SendRawWorkflowHistory: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.SendRawWorkflowHistory, sendRawWorkflowHistory),
SendRawWorkflowHistory: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.SendRawWorkflowHistory, false),
DecisionResultCountLimit: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendDecisionResultCountLimit, 0),
EmitSignalNameMetricsTag: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.FrontendEmitSignalNameMetricsTag, false),
Lockdown: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.Lockdown, false),
Expand Down Expand Up @@ -208,7 +210,6 @@ func NewService(
),
params.PersistenceConfig.NumHistoryShards,
isAdvancedVisExistInConfig,
false,
)
params.PersistenceConfig.HistoryMaxConns = serviceConfig.HistoryMgrNumConns()

Expand Down
2 changes: 1 addition & 1 deletion service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4362,7 +4362,7 @@ func (wh *WorkflowHandler) convertIndexedKeyToThrift(keys map[string]interface{}
}

func (wh *WorkflowHandler) isListRequestPageSizeTooLarge(pageSize int32, domain string) bool {
return wh.config.EnableReadVisibilityFromES(domain) &&
return common.IsAdvancedVisibilityReadingEnabled(wh.config.EnableReadVisibilityFromES(domain), wh.config.IsAdvancedVisConfigExist) &&
pageSize > int32(wh.config.ESIndexMaxResultWindow())
}

Expand Down
Loading

0 comments on commit 8799985

Please sign in to comment.