Skip to content

Commit

Permalink
Drop dynamic config for gRPC message size (#5002)
Browse files Browse the repository at this point in the history
  • Loading branch information
vytautas-karpavicius authored and Groxx committed Oct 6, 2022
1 parent c2ffb71 commit cb79876
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 27 deletions.
12 changes: 1 addition & 11 deletions client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package client

import (
"fmt"
"time"

"go.uber.org/yarpc/api/transport"
Expand Down Expand Up @@ -116,18 +115,9 @@ func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (

peerResolver := history.NewPeerResolver(cf.numberOfHistoryShards, cf.resolver, namedPort)

supportedMessageSize := cf.rpcFactory.GetMaxMessageSize()
maxSizeConfig := cf.dynConfig.GetIntProperty(dynamicconfig.GRPCMaxSizeInByte)
if maxSizeConfig() > supportedMessageSize {
return nil, fmt.Errorf(
"GRPCMaxSizeInByte dynamic config value %v is larger than supported value %v",
maxSizeConfig(),
supportedMessageSize,
)
}
client := history.NewClient(
cf.numberOfHistoryShards,
maxSizeConfig,
cf.rpcFactory.GetMaxMessageSize(),
timeout,
rawClient,
peerResolver,
Expand Down
7 changes: 3 additions & 4 deletions client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"golang.org/x/sync/errgroup"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/future"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand All @@ -48,7 +47,7 @@ const (
type (
clientImpl struct {
numberOfShards int
rpcMaxSizeInBytes dynamicconfig.IntPropertyFn // This value currently only used in GetReplicationMessage API
rpcMaxSizeInBytes int // This value currently only used in GetReplicationMessage API
tokenSerializer common.TaskTokenSerializer
timeout time.Duration
client Client
Expand All @@ -66,7 +65,7 @@ type (
// NewClient creates a new history service TChannel client
func NewClient(
numberOfShards int,
rpcMaxSizeInBytes dynamicconfig.IntPropertyFn,
rpcMaxSizeInBytes int,
timeout time.Duration,
client Client,
peerResolver PeerResolver,
Expand Down Expand Up @@ -859,7 +858,7 @@ func (c *clientImpl) GetReplicationMessages(

response := &types.GetReplicationMessagesResponse{MessagesByShard: make(map[int32]*types.ReplicationMessages)}
responseTotalSize := 0
rpcMaxResponseSize := c.rpcMaxSizeInBytes()
rpcMaxResponseSize := c.rpcMaxSizeInBytes
for _, resp := range peerResponses {
if (responseTotalSize + resp.size) >= rpcMaxResponseSize {
// Log shards that did not fit for debugging purposes
Expand Down
9 changes: 2 additions & 7 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ func (k MapKey) DefaultMap() map[string]interface{} {
//
// Since our ratelimiters do int/float conversions, and zero or negative values
// result in not allowing any requests, math.MaxInt is unsafe:
// int(float64(math.MaxInt)) // -9223372036854775808
//
// int(float64(math.MaxInt)) // -9223372036854775808
//
// Much higher values are possible, but we can't handle 2 billion RPS, this is good enough.
const UnlimitedRPS = math.MaxInt32
Expand All @@ -270,7 +271,6 @@ const (
MaxRetentionDays
MinRetentionDays
MaxDecisionStartToCloseSeconds
GRPCMaxSizeInByte
BlobSizeLimitError
// BlobSizeLimitWarn is the per event blob size limit for warning
// KeyName: limit.blobSize.warn
Expand Down Expand Up @@ -2512,11 +2512,6 @@ var IntKeys = map[IntKey]DynamicInt{
Description: "MaxDecisionStartToCloseSeconds is the maximum allowed value for decision start to close timeout in seconds",
DefaultValue: 240,
},
GRPCMaxSizeInByte: DynamicInt{
KeyName: "system.grpcMaxSizeInByte",
Description: "GRPCMaxSizeInByte is the key for config GRPC response size",
DefaultValue: 4 * 1024 * 1024,
},
BlobSizeLimitError: DynamicInt{
KeyName: "limit.blobSize.error",
Description: "BlobSizeLimitError is the per event blob size limit",
Expand Down
8 changes: 4 additions & 4 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type Config struct {
EnableStickyQuery dynamicconfig.BoolPropertyFnWithDomainFilter
ShutdownDrainDuration dynamicconfig.DurationPropertyFn
WorkflowDeletionJitterRange dynamicconfig.IntPropertyFnWithDomainFilter
MaxResponseSize dynamicconfig.IntPropertyFn
MaxResponseSize int

// HistoryCache settings
// Change of these configs require shard restart
Expand Down Expand Up @@ -318,7 +318,7 @@ type Config struct {
}

// New returns new service config with default values
func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isAdvancedVisConfigExist bool) *Config {
func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, storeType string, isAdvancedVisConfigExist bool) *Config {
cfg := &Config{
NumberOfShards: numberOfShards,
IsAdvancedVisConfigExist: isAdvancedVisConfigExist,
Expand Down Expand Up @@ -363,7 +363,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
StandbyTaskMissingEventsResendDelay: dc.GetDurationProperty(dynamicconfig.StandbyTaskMissingEventsResendDelay),
StandbyTaskMissingEventsDiscardDelay: dc.GetDurationProperty(dynamicconfig.StandbyTaskMissingEventsDiscardDelay),
WorkflowDeletionJitterRange: dc.GetIntPropertyFilteredByDomain(dynamicconfig.WorkflowDeletionJitterRange),
MaxResponseSize: dc.GetIntProperty(dynamicconfig.GRPCMaxSizeInByte),
MaxResponseSize: maxMessageSize,

TaskProcessRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.TaskProcessRPS),
TaskSchedulerType: dc.GetIntProperty(dynamicconfig.TaskSchedulerType),
Expand Down Expand Up @@ -580,7 +580,7 @@ func NewForTestByShardNumber(shardNumber int) *Config {
panicIfErr(inMem.UpdateValue(dynamicconfig.NormalDecisionScheduleToStartMaxAttempts, 3))
panicIfErr(inMem.UpdateValue(dynamicconfig.EnablePendingActivityValidation, true))
dc := dynamicconfig.NewCollection(inMem, log.NewNoop())
config := New(dc, shardNumber, config.StoreTypeCassandra, false)
config := New(dc, shardNumber, 1024*1024, config.StoreTypeCassandra, false)
// reduce the duration of long poll to increase test speed
config.LongPollExpirationInterval = dc.GetDurationPropertyFilteredByDomain(dynamicconfig.HistoryLongPollExpirationInterval)
config.EnableConsistentQueryByDomain = dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableConsistentQueryByDomain)
Expand Down
2 changes: 1 addition & 1 deletion service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1626,7 +1626,7 @@ func (h *handlerImpl) GetReplicationMessages(
wg.Wait()

responseSize := 0
maxResponseSize := h.config.MaxResponseSize()
maxResponseSize := h.config.MaxResponseSize

messagesByShard := make(map[int32]*types.ReplicationMessages)
result.Range(func(key, value interface{}) bool {
Expand Down
1 change: 1 addition & 0 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func NewService(
dynamicconfig.ClusterNameFilter(params.ClusterMetadata.GetCurrentClusterName()),
),
params.PersistenceConfig.NumHistoryShards,
params.RPCFactory.GetMaxMessageSize(),
params.PersistenceConfig.DefaultStoreType(),
params.PersistenceConfig.IsAdvancedVisibilityConfigExist())

Expand Down

0 comments on commit cb79876

Please sign in to comment.