Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop dynamic config for gRPC message size #5002

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package client

import (
"fmt"
"time"

"go.uber.org/yarpc/api/transport"
Expand Down Expand Up @@ -116,18 +115,9 @@ func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (

peerResolver := history.NewPeerResolver(cf.numberOfHistoryShards, cf.resolver, namedPort)

supportedMessageSize := cf.rpcFactory.GetMaxMessageSize()
maxSizeConfig := cf.dynConfig.GetIntProperty(dynamicconfig.GRPCMaxSizeInByte)
if maxSizeConfig() > supportedMessageSize {
return nil, fmt.Errorf(
"GRPCMaxSizeInByte dynamic config value %v is larger than supported value %v",
maxSizeConfig(),
supportedMessageSize,
)
}
client := history.NewClient(
cf.numberOfHistoryShards,
maxSizeConfig,
cf.rpcFactory.GetMaxMessageSize(),
timeout,
rawClient,
peerResolver,
Expand Down
7 changes: 3 additions & 4 deletions client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"golang.org/x/sync/errgroup"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/future"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand All @@ -48,7 +47,7 @@ const (
type (
clientImpl struct {
numberOfShards int
rpcMaxSizeInBytes dynamicconfig.IntPropertyFn // This value currently only used in GetReplicationMessage API
rpcMaxSizeInBytes int // This value currently only used in GetReplicationMessage API
tokenSerializer common.TaskTokenSerializer
timeout time.Duration
client Client
Expand All @@ -66,7 +65,7 @@ type (
// NewClient creates a new history service TChannel client
func NewClient(
numberOfShards int,
rpcMaxSizeInBytes dynamicconfig.IntPropertyFn,
rpcMaxSizeInBytes int,
timeout time.Duration,
client Client,
peerResolver PeerResolver,
Expand Down Expand Up @@ -859,7 +858,7 @@ func (c *clientImpl) GetReplicationMessages(

response := &types.GetReplicationMessagesResponse{MessagesByShard: make(map[int32]*types.ReplicationMessages)}
responseTotalSize := 0
rpcMaxResponseSize := c.rpcMaxSizeInBytes()
rpcMaxResponseSize := c.rpcMaxSizeInBytes
for _, resp := range peerResponses {
if (responseTotalSize + resp.size) >= rpcMaxResponseSize {
// Log shards that did not fit for debugging purposes
Expand Down
9 changes: 2 additions & 7 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ func (k MapKey) DefaultMap() map[string]interface{} {
//
// Since our ratelimiters do int/float conversions, and zero or negative values
// result in not allowing any requests, math.MaxInt is unsafe:
// int(float64(math.MaxInt)) // -9223372036854775808
//
// int(float64(math.MaxInt)) // -9223372036854775808
//
// Much higher values are possible, but we can't handle 2 billion RPS, this is good enough.
const UnlimitedRPS = math.MaxInt32
Expand All @@ -270,7 +271,6 @@ const (
MaxRetentionDays
MinRetentionDays
MaxDecisionStartToCloseSeconds
GRPCMaxSizeInByte
BlobSizeLimitError
// BlobSizeLimitWarn is the per event blob size limit for warning
// KeyName: limit.blobSize.warn
Expand Down Expand Up @@ -2510,11 +2510,6 @@ var IntKeys = map[IntKey]DynamicInt{
Description: "MaxDecisionStartToCloseSeconds is the maximum allowed value for decision start to close timeout in seconds",
DefaultValue: 240,
},
GRPCMaxSizeInByte: DynamicInt{
KeyName: "system.grpcMaxSizeInByte",
Description: "GRPCMaxSizeInByte is the key for config GRPC response size",
DefaultValue: 4 * 1024 * 1024,
},
BlobSizeLimitError: DynamicInt{
KeyName: "limit.blobSize.error",
Description: "BlobSizeLimitError is the per event blob size limit",
Expand Down
8 changes: 4 additions & 4 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type Config struct {
EnableStickyQuery dynamicconfig.BoolPropertyFnWithDomainFilter
ShutdownDrainDuration dynamicconfig.DurationPropertyFn
WorkflowDeletionJitterRange dynamicconfig.IntPropertyFnWithDomainFilter
MaxResponseSize dynamicconfig.IntPropertyFn
MaxResponseSize int

// HistoryCache settings
// Change of these configs require shard restart
Expand Down Expand Up @@ -318,7 +318,7 @@ type Config struct {
}

// New returns new service config with default values
func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isAdvancedVisConfigExist bool) *Config {
func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, storeType string, isAdvancedVisConfigExist bool) *Config {
cfg := &Config{
NumberOfShards: numberOfShards,
IsAdvancedVisConfigExist: isAdvancedVisConfigExist,
Expand Down Expand Up @@ -363,7 +363,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
StandbyTaskMissingEventsResendDelay: dc.GetDurationProperty(dynamicconfig.StandbyTaskMissingEventsResendDelay),
StandbyTaskMissingEventsDiscardDelay: dc.GetDurationProperty(dynamicconfig.StandbyTaskMissingEventsDiscardDelay),
WorkflowDeletionJitterRange: dc.GetIntPropertyFilteredByDomain(dynamicconfig.WorkflowDeletionJitterRange),
MaxResponseSize: dc.GetIntProperty(dynamicconfig.GRPCMaxSizeInByte),
MaxResponseSize: maxMessageSize,

TaskProcessRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.TaskProcessRPS),
TaskSchedulerType: dc.GetIntProperty(dynamicconfig.TaskSchedulerType),
Expand Down Expand Up @@ -580,7 +580,7 @@ func NewForTestByShardNumber(shardNumber int) *Config {
panicIfErr(inMem.UpdateValue(dynamicconfig.NormalDecisionScheduleToStartMaxAttempts, 3))
panicIfErr(inMem.UpdateValue(dynamicconfig.EnablePendingActivityValidation, true))
dc := dynamicconfig.NewCollection(inMem, log.NewNoop())
config := New(dc, shardNumber, config.StoreTypeCassandra, false)
config := New(dc, shardNumber, 1024*1024, config.StoreTypeCassandra, false)
// reduce the duration of long poll to increase test speed
config.LongPollExpirationInterval = dc.GetDurationPropertyFilteredByDomain(dynamicconfig.HistoryLongPollExpirationInterval)
config.EnableConsistentQueryByDomain = dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableConsistentQueryByDomain)
Expand Down
2 changes: 1 addition & 1 deletion service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1626,7 +1626,7 @@ func (h *handlerImpl) GetReplicationMessages(
wg.Wait()

responseSize := 0
maxResponseSize := h.config.MaxResponseSize()
maxResponseSize := h.config.MaxResponseSize

messagesByShard := make(map[int32]*types.ReplicationMessages)
result.Range(func(key, value interface{}) bool {
Expand Down
1 change: 1 addition & 0 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func NewService(
dynamicconfig.ClusterNameFilter(params.ClusterMetadata.GetCurrentClusterName()),
),
params.PersistenceConfig.NumHistoryShards,
params.RPCFactory.GetMaxMessageSize(),
params.PersistenceConfig.DefaultStoreType(),
params.PersistenceConfig.IsAdvancedVisibilityConfigExist())

Expand Down