From ca9ef66bcef640396753e4af60ff8b55115390e0 Mon Sep 17 00:00:00 2001 From: Bob Stasyszyn Date: Thu, 18 Oct 2018 14:29:16 -0400 Subject: [PATCH] [FABG-782] Fix default event service policy If the eventService policy is not defined for a channel then the one defined under the _default channel is used. This causes a problem with parameters where 0 is a valid value, i.e. blockHeightLagThreshold and reconnectBlockHeightLagThreshold. If 0 is specified then the SDK will assume that the value was not provided and then use the default value. For this reason, -2 is now used for blockHeightLagThreshold to indicate that only up-to-date peers are to be used and -1 is used for reconnectBlockHeightLagThreshold to indicate that the disconnect feature should be disabled. Change-Id: I03d667548a2649bb719ae756bec59bf533548c57 Signed-off-by: Bob Stasyszyn --- pkg/common/providers/fab/network.go | 51 ++++++++++++- pkg/common/providers/fab/provider.go | 18 ----- pkg/core/config/testdata/config_test.yaml | 16 +++-- .../testdata/config_test_embedded_pems.yaml | 24 ++++++- pkg/core/config/testdata/template/config.yaml | 30 ++++++-- pkg/fab/api.go | 2 + pkg/fab/default_channel_test.go | 20 +++++- pkg/fab/endpointconfig.go | 57 ++++++++++----- .../client/dispatcher/dispatcher_test.go | 71 +++++++++++++++++++ pkg/fab/events/client/dispatcher/opts.go | 17 ++++- .../minblockheight/minblockheight_test.go | 42 +++++++++++ .../peerresolver/minblockheight/opts.go | 42 ++++++++++- test/fixtures/config/config_e2e.yaml | 19 ++--- test/fixtures/config/config_test.yaml | 15 ++-- .../endpointconfig_override_test.go | 7 ++ 15 files changed, 360 insertions(+), 71 deletions(-) diff --git a/pkg/common/providers/fab/network.go b/pkg/common/providers/fab/network.go index dec2765163..c3059d7e57 100644 --- a/pkg/common/providers/fab/network.go +++ b/pkg/common/providers/fab/network.go @@ -143,6 +143,46 @@ type CertKeyPair struct { Key []byte } +// ResolverStrategy is the peer resolver type +type ResolverStrategy string + +const ( + // BalancedStrategy is a peer resolver strategy that chooses peers based on a configured load balancer + BalancedStrategy ResolverStrategy = "Balanced" + + // MinBlockHeightStrategy is a peer resolver strategy that chooses the best peer according to a block height lag threshold. + // The maximum block height of all peers is determined and the peers whose block heights are under the maximum height but above + // a provided "lag" threshold are load balanced. The other peers are not considered. + MinBlockHeightStrategy ResolverStrategy = "MinBlockHeight" + + // PreferOrgStrategy is a peer resolver strategy that determines which peers are suitable based on block height lag threshold, + // although will prefer the peers in the current org (as long as their block height is above a configured threshold). + // If none of the peers from the current org are suitable then a peer from another org is chosen. + PreferOrgStrategy ResolverStrategy = "PreferOrg" +) + +// MinBlockHeightResolverMode specifies the behaviour of the MinBlockHeight resolver strategy. +type MinBlockHeightResolverMode string + +const ( + // ResolveByThreshold resolves to peers based on block height lag threshold. + ResolveByThreshold MinBlockHeightResolverMode = "ResolveByThreshold" + + // ResolveLatest resolves to peers with the most up-to-date block height + ResolveLatest MinBlockHeightResolverMode = "ResolveLatest" +) + +// EnabledDisabled specifies whether or not a feature is enabled +type EnabledDisabled string + +const ( + // Enabled indicates that the feature is enabled. + Enabled EnabledDisabled = "Enabled" + + // Disabled indicates that the feature is disabled. + Disabled EnabledDisabled = "Disabled" +) + // EventServicePolicy specifies the policy for the event service type EventServicePolicy struct { // ResolverStrategy returns the peer resolver strategy to use when connecting to a peer @@ -152,13 +192,20 @@ type EventServicePolicy struct { // Balancer is the balancer to use when choosing a peer to connect to Balancer BalancerType + // MinBlockHeightResolverMode specifies the behaviour of the MinBlockHeight resolver. Note that this + // parameter is used when ResolverStrategy is either MinBlockHeightStrategy or PreferOrgStrategy. + // ResolveByThreshold (default): resolves to peers based on block height lag threshold, as specified by BlockHeightLagThreshold. + // MinBlockHeightResolverMode: then only the peers with the latest block heights are chosen. + MinBlockHeightResolverMode MinBlockHeightResolverMode + // BlockHeightLagThreshold returns the block height lag threshold. This value is used for choosing a peer // to connect to. If a peer is lagging behind the most up-to-date peer by more than the given number of // blocks then it will be excluded from selection. - // If set to 0 then only the most up-to-date peers are considered. - // If set to -1 then all peers (regardless of block height) are considered for selection. BlockHeightLagThreshold int + // PeerMonitor indicates whether or not to enable the peer monitor. + PeerMonitor EnabledDisabled + // ReconnectBlockHeightLagThreshold - if >0 then the event client will disconnect from the peer if the peer's // block height falls behind the specified number of blocks and will reconnect to a better performing peer. // If set to 0 (default) then the peer will not disconnect based on block height. diff --git a/pkg/common/providers/fab/provider.go b/pkg/common/providers/fab/provider.go index 606fc80808..a1f5525c52 100644 --- a/pkg/common/providers/fab/provider.go +++ b/pkg/common/providers/fab/provider.go @@ -107,24 +107,6 @@ type EndpointConfig interface { CryptoConfigPath() string } -// ResolverStrategy is the peer resolver type -type ResolverStrategy string - -const ( - // BalancedStrategy is a peer resolver strategy that chooses peers based on a configured load balancer - BalancedStrategy ResolverStrategy = "Balanced" - - // MinBlockHeightStrategy is a peer resolver strategy that chooses the best peer according to a block height lag threshold. - // The maximum block height of all peers is determined and the peers whose block heights are under the maximum height but above - // a provided "lag" threshold are load balanced. The other peers are not considered. - MinBlockHeightStrategy ResolverStrategy = "MinBlockHeight" - - // PreferOrgStrategy is a peer resolver strategy that determines which peers are suitable based on block height lag threshold, - // although will prefer the peers in the current org (as long as their block height is above a configured threshold). - // If none of the peers from the current org are suitable then a peer from another org is chosen. - PreferOrgStrategy ResolverStrategy = "PreferOrg" -) - // TimeoutType enumerates the different types of outgoing connections type TimeoutType int diff --git a/pkg/core/config/testdata/config_test.yaml b/pkg/core/config/testdata/config_test.yaml index 85c2b73ddf..0d514a21d7 100755 --- a/pkg/core/config/testdata/config_test.yaml +++ b/pkg/core/config/testdata/config_test.yaml @@ -137,6 +137,7 @@ channels: #to a lower priority list of peers which will be sorted according to block height. #Note: This property only applies to BlockHeightPriority sorter. BlockHeightLagThreshold: 5 + #[Optional] options for event service eventService: # [Optional] resolverStrategy specifies the peer resolver strategy to use when connecting to a peer @@ -153,28 +154,31 @@ channels: # Balanced: # Chooses peers using the configured balancer. resolverStrategy: MinBlockHeight + # [Optional] balancer is the balancer to use when choosing a peer to connect to # Possible values: [Random (default), RoundRobin] + balancer: RoundRobin + # [Optional] blockHeightLagThreshold sets the block height lag threshold. This value is used for choosing a peer # to connect to. If a peer is lagging behind the most up-to-date peer by more than the given number of # blocks then it will be excluded from selection. - # If set to 0 then only the most up-to-date peers are considered. - # If set to -1 then all peers (regardless of block height) are considered for selection. + # Note that this parameter is applicable only when minBlockHeightResolverMode is set to ResolveByThreshold. # Default: 5 blockHeightLagThreshold: 4 - # [Optional] reconnectBlockHeightLagThreshold - if >0 then the event client will disconnect from the peer if the peer's + + # [Optional] reconnectBlockHeightLagThreshold - the event client will disconnect from the peer if the peer's # block height falls behind the specified number of blocks and will reconnect to a better performing peer. - # If set to 0 then this feature is disabled. + # Note that this parameter is only applicable if peerMonitor is set to Enabled (default). # Default: 10 # NOTES: - # - peerMonitorPeriod must be >0 to enable this feature # - Setting this value too low may cause the event client to disconnect/reconnect too frequently, thereby # affecting performance. reconnectBlockHeightLagThreshold: 8 + # [Optional] peerMonitorPeriod is the period in which the connected peer is monitored to see if # the event client should disconnect from it and reconnect to another peer. - # Default: 0 (disabled) + # Default: 5s peerMonitorPeriod: 6s # multi-org test channel diff --git a/pkg/core/config/testdata/config_test_embedded_pems.yaml b/pkg/core/config/testdata/config_test_embedded_pems.yaml index 6e2bea7951..3c0126eb82 100755 --- a/pkg/core/config/testdata/config_test_embedded_pems.yaml +++ b/pkg/core/config/testdata/config_test_embedded_pems.yaml @@ -147,13 +147,13 @@ channels: # [Optional] blockHeightLagThreshold sets the block height lag threshold. This value is used for choosing a peer # to connect to. If a peer is lagging behind the most up-to-date peer by more than the given number of # blocks then it will be excluded from selection. - # If set to 0 then only the most up-to-date peers are considered. + # If set to -2 then only the most up-to-date peers are considered. # If set to -1 then all peers (regardless of block height) are considered for selection. # Default: 5 blockHeightLagThreshold: 3 # [Optional] reconnectBlockHeightLagThreshold - if >0 then the event client will disconnect from the peer if the peer's # block height falls behind the specified number of blocks and will reconnect to a better performing peer. - # If set to 0 then this feature is disabled. + # If set to -1 then this feature is disabled. # Default: 10 # NOTES: # - peerMonitorPeriod must be >0 to enable this feature @@ -225,9 +225,27 @@ channels: attempts: 4 #[Optional] the maximum back off interval for any retry attempt maxBackoff: 8s - #[Optional] he factor by which the initial back off period is exponentially incremented + #[Optional] the factor by which the initial back off period is exponentially incremented backoffFactor: 8.0 + eventService: + # [Optional] minBlockHeightResolverMode specifies the behaviour of the MinBlockHeight resolver strategy. + # Note that this parameter is applicable only when resolverStrategy is set to MinBlockHeight or PreferOrg. + # Possible values: [ResolveByThreshold (default), ResolveLatest] + # + # ResolveByThreshold: + # Chooses peers based on block height lag threshold. + # ResolveLatest: + # Chooses only the peers with the most up-to-date block height. + minBlockHeightResolverMode: ResolveLatest + + # [Optional] peerMonitor indicates whether or not a peer monitor should be enabled in order to monitor + # the block height of the connected peer. In the case of MinBlockHeight and PreferOrg strategy, the event client + # will disconnect from the peer if its block height falls below the specified threshold. + # Possible values: [Enabled, Disabled] + # Default: Enabled for MinBlockHeight and PreferOrg strategy; Disabled for Balanced strategy + peerMonitor: Disabled + # multi-org test channel orgchannel: diff --git a/pkg/core/config/testdata/template/config.yaml b/pkg/core/config/testdata/template/config.yaml index 32e0a5fcab..c2316e2fa9 100755 --- a/pkg/core/config/testdata/template/config.yaml +++ b/pkg/core/config/testdata/template/config.yaml @@ -179,6 +179,7 @@ channels: # #to a lower priority list of peers which will be sorted according to block height. # #Note: This property only applies to BlockHeightPriority sorter. # BlockHeightLagThreshold: 5 + # #[Optional] options for event service # eventService: # # [Optional] resolverStrategy specifies the peer resolver strategy to use when connecting to a peer @@ -195,28 +196,47 @@ channels: # # Balanced: # # Chooses peers using the configured balancer. # resolverStrategy: PreferOrg + +# # [Optional] minBlockHeightResolverMode specifies the behaviour of the MinBlockHeight resolver strategy. +# # Note that this parameter is applicable only when resolverStrategy is set to MinBlockHeight or PreferOrg. +# # Possible values: [ResolveByThreshold (default), ResolveLatest] +# # +# # ResolveByThreshold: +# # Chooses peers based on block height lag threshold. +# # ResolveLatest: +# # Chooses only the peers with the most up-to-date block height. +# minBlockHeightResolverMode: ResolveByThreshold +# # # [Optional] balancer is the balancer to use when choosing a peer to connect to # # Possible values: [Random (default), RoundRobin] # balancer: Random # # [Optional] blockHeightLagThreshold sets the block height lag threshold. This value is used for choosing a peer # # to connect to. If a peer is lagging behind the most up-to-date peer by more than the given number of # # blocks then it will be excluded from selection. -# # If set to 0 then only the most up-to-date peers are considered. -# # If set to -1 then all peers (regardless of block height) are considered for selection. +# # Note that this parameter is applicable only when minBlockHeightResolverMode is set to ResolveByThreshold. # # Default: 5 # blockHeightLagThreshold: 5 -# # [Optional] reconnectBlockHeightLagThreshold - if >0 then the event client will disconnect from the peer if the peer's + +# # [Optional] reconnectBlockHeightLagThreshold - the event client will disconnect from the peer if the peer's # # block height falls behind the specified number of blocks and will reconnect to a better performing peer. -# # If set to 0 then this feature is disabled. +# # Note that this parameter is only applicable if peerMonitor is set to Enabled (default). # # Default: 10 # # NOTES: # # - peerMonitorPeriod must be >0 to enable this feature # # - Setting this value too low may cause the event client to disconnect/reconnect too frequently, thereby # # affecting performance. # reconnectBlockHeightLagThreshold: 10 + +# # [Optional] peerMonitor indicates whether or not a peer monitor should be enabled in order to monitor +# # the block height of the connected peer. In the case of MinBlockHeight and PreferOrg strategy, the event client +# # will disconnect from the peer if its block height falls below the specified threshold. +# # Possible values: [Enabled, Disabled] +# # Default: Enabled for MinBlockHeight and PreferOrg strategy; Disabled for Balanced strategy +# peerMonitor: Enabled + # # [Optional] peerMonitorPeriod is the period in which the connected peer is monitored to see if # # the event client should disconnect from it and reconnect to another peer. -# # Default: 0 (disabled) +# # Default: 5s # peerMonitorPeriod: 5s # sample channel with channel matcher (sample*channel will return ch1 config where * can be any word or '') diff --git a/pkg/fab/api.go b/pkg/fab/api.go index 435fe5ccab..ba466a3727 100644 --- a/pkg/fab/api.go +++ b/pkg/fab/api.go @@ -127,8 +127,10 @@ type SelectionPolicy struct { // EventServicePolicy specifies the policy for the event service type EventServicePolicy struct { ResolverStrategy string + MinBlockHeightResolverMode string Balancer BalancerType BlockHeightLagThreshold int + PeerMonitor string ReconnectBlockHeightLagThreshold int PeerMonitorPeriod time.Duration } diff --git a/pkg/fab/default_channel_test.go b/pkg/fab/default_channel_test.go index 7ab367e3a8..b5f5656599 100644 --- a/pkg/fab/default_channel_test.go +++ b/pkg/fab/default_channel_test.go @@ -135,7 +135,7 @@ func TestDefaultChannelWithNoDefaultChannelConfiguredAndWithMatchers(t *testing. } -func TestMissingDiscoveryPolicesInfo(t *testing.T) { +func TestMissingPolicesInfo(t *testing.T) { // Default channel and no channel matchers test defaultChannelBackend, err := config.FromFile(configEmbeddedUsersTestFilePath)() @@ -166,6 +166,24 @@ func TestMissingDiscoveryPolicesInfo(t *testing.T) { assert.Equal(t, defChConfig.Policies.QueryChannelConfig.RetryOpts.InitialBackoff, chConfig.Policies.QueryChannelConfig.RetryOpts.InitialBackoff) assert.Equal(t, defChConfig.Policies.QueryChannelConfig.RetryOpts.BackoffFactor, chConfig.Policies.QueryChannelConfig.RetryOpts.BackoffFactor) assert.Equal(t, defChConfig.Policies.QueryChannelConfig.RetryOpts.MaxBackoff, chConfig.Policies.QueryChannelConfig.RetryOpts.MaxBackoff) + + assert.Equal(t, defChConfig.Policies.EventService.Balancer, chConfig.Policies.EventService.Balancer) + assert.Equal(t, defChConfig.Policies.EventService.MinBlockHeightResolverMode, chConfig.Policies.EventService.MinBlockHeightResolverMode) + assert.Equal(t, defChConfig.Policies.EventService.BlockHeightLagThreshold, chConfig.Policies.EventService.BlockHeightLagThreshold) + assert.Equal(t, defChConfig.Policies.EventService.PeerMonitorPeriod, chConfig.Policies.EventService.PeerMonitorPeriod) + assert.Equal(t, defChConfig.Policies.EventService.PeerMonitor, chConfig.Policies.EventService.PeerMonitor) + assert.Equal(t, defChConfig.Policies.EventService.ReconnectBlockHeightLagThreshold, chConfig.Policies.EventService.ReconnectBlockHeightLagThreshold) + assert.Equal(t, defChConfig.Policies.EventService.ResolverStrategy, chConfig.Policies.EventService.ResolverStrategy) + + chConfig = endpointConfig.ChannelConfig("mychannel") + assert.NotNil(t, chConfig) + + assert.Equal(t, fab.ResolveLatest, chConfig.Policies.EventService.MinBlockHeightResolverMode) + assert.Equal(t, fab.Disabled, chConfig.Policies.EventService.PeerMonitor) + assert.Equal(t, defChConfig.Policies.EventService.Balancer, chConfig.Policies.EventService.Balancer) + assert.Equal(t, defChConfig.Policies.EventService.PeerMonitorPeriod, chConfig.Policies.EventService.PeerMonitorPeriod) + assert.Equal(t, defChConfig.Policies.EventService.ReconnectBlockHeightLagThreshold, chConfig.Policies.EventService.ReconnectBlockHeightLagThreshold) + assert.Equal(t, defChConfig.Policies.EventService.ResolverStrategy, chConfig.Policies.EventService.ResolverStrategy) } func TestMissingPartialChannelPoliciesInfo(t *testing.T) { diff --git a/pkg/fab/endpointconfig.go b/pkg/fab/endpointconfig.go index 3938719a63..7db3120d5f 100644 --- a/pkg/fab/endpointconfig.go +++ b/pkg/fab/endpointconfig.go @@ -52,10 +52,12 @@ const ( defaultCacheSweepInterval = time.Second * 15 defaultResolverStrategy = fab.PreferOrgStrategy + defaultMinBlockHeightResolverMode = fab.ResolveByThreshold defaultBalancer = fab.Random defaultBlockHeightLagThreshold = 5 defaultReconnectBlockHeightLagThreshold = 10 - defaultPeerMonitorPeriod = 0 // Disabled + defaultPeerMonitor = "" // The peer monitor will be enabled if necessary + defaultPeerMonitorPeriod = 5 * time.Second //default grpc opts defaultKeepAliveTime = 0 @@ -89,7 +91,9 @@ var ( }, EventService: EventServicePolicy{ ResolverStrategy: string(fab.PreferOrgStrategy), + MinBlockHeightResolverMode: string(defaultMinBlockHeightResolverMode), Balancer: Random, + PeerMonitor: defaultPeerMonitor, PeerMonitorPeriod: defaultPeerMonitorPeriod, BlockHeightLagThreshold: defaultBlockHeightLagThreshold, ReconnectBlockHeightLagThreshold: defaultReconnectBlockHeightLagThreshold, @@ -401,7 +405,6 @@ func (c *EndpointConfig) loadEndpointConfiguration() error { err = c.backend.UnmarshalKey( "channels", &endpointConfigEntity.Channels, lookup.WithUnmarshalHookFunction(peerChannelConfigHookFunc()), - lookup.WithUnmarshalHookFunction(eventChannelConfigHookFunc()), ) logger.Debugf("channels are: %+v", endpointConfigEntity.Channels) if err != nil { @@ -660,8 +663,10 @@ func (c *EndpointConfig) getChannelPolicies(policies *ChannelPolicies) fab.Chann eventServicePolicy := fab.EventServicePolicy{ ResolverStrategy: fab.ResolverStrategy(policies.EventService.ResolverStrategy), + MinBlockHeightResolverMode: fab.MinBlockHeightResolverMode(policies.EventService.MinBlockHeightResolverMode), Balancer: fab.BalancerType(policies.EventService.Balancer), BlockHeightLagThreshold: policies.EventService.BlockHeightLagThreshold, + PeerMonitor: fab.EnabledDisabled(policies.EventService.PeerMonitor), ReconnectBlockHeightLagThreshold: policies.EventService.ReconnectBlockHeightLagThreshold, PeerMonitorPeriod: policies.EventService.PeerMonitorPeriod, } @@ -681,6 +686,7 @@ func (c *EndpointConfig) addMissingChannelPoliciesItems(chNwCfg ChannelEndpointC policies.Discovery = c.addMissingDiscoveryPolicyInfo(policies.Discovery) policies.Selection = c.addMissingSelectionPolicyInfo(policies.Selection) policies.QueryChannelConfig = c.addMissingQueryChannelConfigPolicyInfo(policies.QueryChannelConfig) + policies.EventService = c.addMissingEventServicePolicyInfo(policies.EventService) return policies } @@ -740,6 +746,32 @@ func (c *EndpointConfig) addMissingQueryChannelConfigPolicyInfo(policy fab.Query return policy } +func (c *EndpointConfig) addMissingEventServicePolicyInfo(policy fab.EventServicePolicy) fab.EventServicePolicy { + if policy.Balancer == "" { + policy.Balancer = c.defaultChannelPolicies.EventService.Balancer + } + if policy.BlockHeightLagThreshold == 0 { + policy.BlockHeightLagThreshold = c.defaultChannelPolicies.EventService.BlockHeightLagThreshold + } + if policy.ResolverStrategy == "" { + policy.ResolverStrategy = c.defaultChannelPolicies.EventService.ResolverStrategy + } + if policy.MinBlockHeightResolverMode == "" { + policy.MinBlockHeightResolverMode = c.defaultChannelPolicies.EventService.MinBlockHeightResolverMode + } + if policy.PeerMonitor == "" { + policy.PeerMonitor = c.defaultChannelPolicies.EventService.PeerMonitor + } + if policy.ReconnectBlockHeightLagThreshold == 0 { + policy.ReconnectBlockHeightLagThreshold = c.defaultChannelPolicies.EventService.ReconnectBlockHeightLagThreshold + } + if policy.PeerMonitorPeriod == 0 { + policy.PeerMonitorPeriod = c.defaultChannelPolicies.EventService.PeerMonitorPeriod + } + + return policy +} + func addMissingRetryOpts(opts retry.Opts, defaultOpts retry.Opts) retry.Opts { // If retry opts are defined then Attempts must be defined, otherwise // we cannot distinguish between default 0 and intentional 0 to disable retries for that channel @@ -989,6 +1021,10 @@ func (c *EndpointConfig) loadDefaultEventServicePolicy(policy *fab.EventServiceP policy.ResolverStrategy = defaultResolverStrategy } + if policy.MinBlockHeightResolverMode == "" { + policy.MinBlockHeightResolverMode = defaultMinBlockHeightResolverMode + } + if policy.Balancer == "" { policy.Balancer = defaultBalancer } @@ -1730,23 +1766,6 @@ func peerChannelConfigHookFunc() mapstructure.DecodeHookFunc { } } -func eventChannelConfigHookFunc() mapstructure.DecodeHookFunc { - return func(f reflect.Type, t reflect.Type, data interface{}) (interface{}, error) { - if t == reflect.TypeOf(EventServicePolicy{}) { - dataMap, ok := data.(map[string]interface{}) - if ok { - setDefault(dataMap, "blockheightlagthreshold", defaultBlockHeightLagThreshold) - setDefault(dataMap, "reconnectblockheightlagthreshold", defaultReconnectBlockHeightLagThreshold) - setDefault(dataMap, "peermonitorperiod", defaultPeerMonitorPeriod) - setDefault(dataMap, "resolverstrategy", string(fab.PreferOrgStrategy)) - setDefault(dataMap, "balancer", Random) - return dataMap, nil - } - } - return data, nil - } -} - //setDefault sets default value provided to map if given key not found func setDefault(dataMap map[string]interface{}, key string, defaultVal interface{}) { _, ok := dataMap[key] diff --git a/pkg/fab/events/client/dispatcher/dispatcher_test.go b/pkg/fab/events/client/dispatcher/dispatcher_test.go index 9d19125d25..8aa000c201 100755 --- a/pkg/fab/events/client/dispatcher/dispatcher_test.go +++ b/pkg/fab/events/client/dispatcher/dispatcher_test.go @@ -21,6 +21,7 @@ import ( mspmocks "github.com/hyperledger/fabric-sdk-go/pkg/msp/test/mockmsp" "github.com/pkg/errors" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) var ( @@ -435,3 +436,73 @@ func disconnect(state string, errch chan error) { errch <- nil } } + +func TestOpts(t *testing.T) { + channelID := "testchannel" + + config := &fabmocks.MockConfig{} + context := fabmocks.NewMockContext( + mspmocks.NewMockSigningIdentity("user1", "Org1MSP"), + ) + context.SetEndpointConfig(config) + + t.Run("Default", func(t *testing.T) { + config.SetCustomChannelConfig(channelID, &fab.ChannelEndpointConfig{ + Policies: fab.ChannelPolicies{ + EventService: fab.EventServicePolicy{}, + }, + }) + + params := defaultParams(context, channelID) + require.NotNil(t, params) + assert.Equal(t, defaultPeerMonitorPeriod, params.peerMonitorPeriod) + require.NotNil(t, params.peerResolverProvider) + }) + + t.Run("MinBlockStrategy", func(t *testing.T) { + config.SetCustomChannelConfig(channelID, &fab.ChannelEndpointConfig{ + Policies: fab.ChannelPolicies{ + EventService: fab.EventServicePolicy{ + ResolverStrategy: fab.MinBlockHeightStrategy, + PeerMonitorPeriod: 7 * time.Second, + }, + }, + }) + + params := defaultParams(context, channelID) + require.NotNil(t, params) + assert.Equal(t, 7*time.Second, params.peerMonitorPeriod) + require.NotNil(t, params.peerResolverProvider) + }) + + t.Run("PeerMonitor Off", func(t *testing.T) { + config.SetCustomChannelConfig(channelID, &fab.ChannelEndpointConfig{ + Policies: fab.ChannelPolicies{ + EventService: fab.EventServicePolicy{ + ResolverStrategy: fab.PreferOrgStrategy, + PeerMonitor: fab.Disabled, + }, + }, + }) + + params := defaultParams(context, channelID) + require.NotNil(t, params) + assert.Equal(t, 0*time.Second, params.peerMonitorPeriod, "Expecting peer monitor to be disabled") + require.NotNil(t, params.peerResolverProvider) + }) + + t.Run("Balanced Strategy", func(t *testing.T) { + config.SetCustomChannelConfig(channelID, &fab.ChannelEndpointConfig{ + Policies: fab.ChannelPolicies{ + EventService: fab.EventServicePolicy{ + ResolverStrategy: fab.BalancedStrategy, + }, + }, + }) + + params := defaultParams(context, channelID) + require.NotNil(t, params) + assert.Equalf(t, 0*time.Second, params.peerMonitorPeriod, "Expecting peer monitor to be disabled for Balance strategy") + require.NotNil(t, params.peerResolverProvider) + }) +} diff --git a/pkg/fab/events/client/dispatcher/opts.go b/pkg/fab/events/client/dispatcher/opts.go index 8b781522d6..0e21b5c841 100755 --- a/pkg/fab/events/client/dispatcher/opts.go +++ b/pkg/fab/events/client/dispatcher/opts.go @@ -19,6 +19,10 @@ import ( "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/peerresolver/preferorg" ) +const ( + defaultPeerMonitorPeriod = 5 * time.Second +) + type params struct { peerMonitorPeriod time.Duration peerResolverProvider peerresolver.Provider @@ -27,8 +31,19 @@ type params struct { func defaultParams(context context.Client, channelID string) *params { policy := context.EndpointConfig().ChannelConfig(channelID).Policies.EventService + peerMonitorPeriod := policy.PeerMonitorPeriod + + // Set the peer monitor period to 0 (disabled) if explicitly configured to be disabled or + // if the resolver is Balanced (since there's no need for a peer monitor for Balanced strategy) + if policy.PeerMonitor == fab.Disabled || policy.ResolverStrategy == fab.BalancedStrategy { + peerMonitorPeriod = 0 + } else if peerMonitorPeriod <= 0 { + logger.Warnf("Invalid PeerMonitorPeriod: %s. Using default: %s.", peerMonitorPeriod, defaultPeerMonitorPeriod) + peerMonitorPeriod = defaultPeerMonitorPeriod + } + return ¶ms{ - peerMonitorPeriod: policy.PeerMonitorPeriod, + peerMonitorPeriod: peerMonitorPeriod, peerResolverProvider: getPeerResolver(policy), } } diff --git a/pkg/fab/events/client/peerresolver/minblockheight/minblockheight_test.go b/pkg/fab/events/client/peerresolver/minblockheight/minblockheight_test.go index 644d4f4dd7..3893b9577f 100644 --- a/pkg/fab/events/client/peerresolver/minblockheight/minblockheight_test.go +++ b/pkg/fab/events/client/peerresolver/minblockheight/minblockheight_test.go @@ -128,3 +128,45 @@ func TestShouldDisconnect(t *testing.T) { disconnect = resolver.ShouldDisconnect(peers, p3) assert.Falsef(t, disconnect, "expecting peer NOT to be disconnected since the peer's block height is under the reconnectBlockHeightThreshold") } + +func TestOpts(t *testing.T) { + channelID := "testchannel" + + config := &mocks.MockConfig{} + context := mocks.NewMockContext( + mockmsp.NewMockSigningIdentity("user1", "Org1MSP"), + ) + context.SetEndpointConfig(config) + + t.Run("Default", func(t *testing.T) { + config.SetCustomChannelConfig(channelID, &fab.ChannelEndpointConfig{ + Policies: fab.ChannelPolicies{ + EventService: fab.EventServicePolicy{}, + }, + }) + + params := defaultParams(context, channelID) + require.NotNil(t, params) + require.NotNil(t, params.loadBalancePolicy) + assert.Equal(t, defaultBlockHeightLagThreshold, params.blockHeightLagThreshold) + assert.Equal(t, defaultReconnectBlockHeightLagThreshold, params.reconnectBlockHeightLagThreshold) + }) + + t.Run("ResolveLatest", func(t *testing.T) { + config.SetCustomChannelConfig(channelID, &fab.ChannelEndpointConfig{ + Policies: fab.ChannelPolicies{ + EventService: fab.EventServicePolicy{ + MinBlockHeightResolverMode: fab.ResolveLatest, + ReconnectBlockHeightLagThreshold: 9, + }, + }, + }) + + params := defaultParams(context, channelID) + require.NotNil(t, params) + require.NotNil(t, params.loadBalancePolicy) + assert.Equal(t, 0, params.blockHeightLagThreshold) + assert.Equal(t, 9, params.reconnectBlockHeightLagThreshold) + }) + +} diff --git a/pkg/fab/events/client/peerresolver/minblockheight/opts.go b/pkg/fab/events/client/peerresolver/minblockheight/opts.go index 819b9502aa..6de30a4fa9 100644 --- a/pkg/fab/events/client/peerresolver/minblockheight/opts.go +++ b/pkg/fab/events/client/peerresolver/minblockheight/opts.go @@ -14,6 +14,11 @@ import ( "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/peerresolver" ) +const ( + defaultBlockHeightLagThreshold = 5 + defaultReconnectBlockHeightLagThreshold = 10 +) + type params struct { blockHeightLagThreshold int reconnectBlockHeightLagThreshold int @@ -25,8 +30,8 @@ func defaultParams(context context.Client, channelID string) *params { policy := context.EndpointConfig().ChannelConfig(channelID).Policies.EventService return ¶ms{ - blockHeightLagThreshold: policy.BlockHeightLagThreshold, - reconnectBlockHeightLagThreshold: policy.ReconnectBlockHeightLagThreshold, + blockHeightLagThreshold: getBlockHeightLagThreshold(policy), + reconnectBlockHeightLagThreshold: getReconnectBlockHeightLagThreshold(policy), loadBalancePolicy: peerresolver.GetBalancer(policy), } } @@ -90,3 +95,36 @@ func (p *params) SetSnapshot(value fab.EventSnapshot) error { p.minBlockHeight = value.LastBlockReceived() + 1 return nil } + +func getBlockHeightLagThreshold(policy fab.EventServicePolicy) int { + var threshold int + + switch policy.MinBlockHeightResolverMode { + case fab.ResolveLatest: + threshold = 0 + case fab.ResolveByThreshold: + threshold = policy.BlockHeightLagThreshold + if threshold <= 0 { + logger.Warnf("Invalid BlockHeightLagThreshold: %d. Using default: %d", threshold, defaultBlockHeightLagThreshold) + threshold = defaultBlockHeightLagThreshold + } + default: + logger.Warnf("Invalid MinBlockHeightResolverMode: [%s]. Using default: [%s]", policy.MinBlockHeightResolverMode, fab.ResolveByThreshold) + threshold = policy.BlockHeightLagThreshold + if threshold <= 0 { + logger.Warnf("Invalid BlockHeightLagThreshold: %d. Using default: %d", threshold, defaultBlockHeightLagThreshold) + threshold = defaultBlockHeightLagThreshold + } + } + + return threshold +} + +func getReconnectBlockHeightLagThreshold(policy fab.EventServicePolicy) int { + threshold := policy.ReconnectBlockHeightLagThreshold + if threshold <= 0 { + logger.Warnf("Invalid ReconnectBlockHeightLagThreshold: %d. Using default: %d", threshold, defaultReconnectBlockHeightLagThreshold) + threshold = defaultReconnectBlockHeightLagThreshold + } + return threshold +} diff --git a/test/fixtures/config/config_e2e.yaml b/test/fixtures/config/config_e2e.yaml index fe1070679d..7564d14421 100755 --- a/test/fixtures/config/config_e2e.yaml +++ b/test/fixtures/config/config_e2e.yaml @@ -170,6 +170,7 @@ channels: maxBackoff: 5s #[Optional] he factor by which the initial back off period is exponentially incremented backoffFactor: 2.0 + #[Optional] options for the event service eventService: # [Optional] resolverStrategy specifies the peer resolver strategy to use when connecting to a peer @@ -186,29 +187,31 @@ channels: # Balanced: # Chooses peers using the configured balancer. resolverStrategy: PreferOrg + # [Optional] balancer is the balancer to use when choosing a peer to connect to # Possible values: [Random (default), RoundRobin] balancer: Random + # [Optional] blockHeightLagThreshold sets the block height lag threshold. This value is used for choosing a peer # to connect to. If a peer is lagging behind the most up-to-date peer by more than the given number of # blocks then it will be excluded from selection. - # If set to 0 then only the most up-to-date peers are considered. - # If set to -1 then all peers (regardless of block height) are considered for selection. + # Note that this parameter is applicable only when minBlockHeightResolverMode is set to ResolveByThreshold. # Default: 5 blockHeightLagThreshold: 5 - # [Optional] reconnectBlockHeightLagThreshold - if >0 then the event client will disconnect from the peer if the peer's + + # [Optional] reconnectBlockHeightLagThreshold - the event client will disconnect from the peer if the peer's # block height falls behind the specified number of blocks and will reconnect to a better performing peer. - # If set to 0 then this feature is disabled. + # Note that this parameter is only applicable if peerMonitor is set to Enabled (default). # Default: 10 # NOTES: - # - peerMonitorPeriod must be >0 to enable this feature # - Setting this value too low may cause the event client to disconnect/reconnect too frequently, thereby # affecting performance. - reconnectBlockHeightLagThreshold: 10 + reconnectBlockHeightLagThreshold: 8 + # [Optional] peerMonitorPeriod is the period in which the connected peer is monitored to see if # the event client should disconnect from it and reconnect to another peer. - # Default: 0 (disabled) - peerMonitorPeriod: 5s + # Default: 0 (disabled) for Balanced resolverStrategy; 5s for PreferOrg and MinBlockHeight strategy + peerMonitorPeriod: 6s #[Required if _default not defined; Optional if _default defined]. diff --git a/test/fixtures/config/config_test.yaml b/test/fixtures/config/config_test.yaml index cc4959282f..429e4102ec 100755 --- a/test/fixtures/config/config_test.yaml +++ b/test/fixtures/config/config_test.yaml @@ -183,6 +183,7 @@ channels: maxBackoff: 5s #[Optional] he factor by which the initial back off period is exponentially incremented backoffFactor: 2.0 + #[Optional] options for the event service eventService: # [Optional] resolverStrategy specifies the peer resolver strategy to use when connecting to a peer @@ -199,28 +200,30 @@ channels: # Balanced: # Chooses peers using the configured balancer. resolverStrategy: PreferOrg + # [Optional] balancer is the balancer to use when choosing a peer to connect to # Possible values: [Random (default), RoundRobin] balancer: RoundRobin + # [Optional] blockHeightLagThreshold sets the block height lag threshold. This value is used for choosing a peer # to connect to. If a peer is lagging behind the most up-to-date peer by more than the given number of # blocks then it will be excluded from selection. - # If set to 0 then only the most up-to-date peers are considered. - # If set to -1 then all peers (regardless of block height) are considered for selection. + # Note that this parameter is applicable only when minBlockHeightResolverMode is set to ResolveByThreshold. # Default: 5 blockHeightLagThreshold: 2 - # [Optional] reconnectBlockHeightLagThreshold - if >0 then the event client will disconnect from the peer if the peer's + + # [Optional] reconnectBlockHeightLagThreshold - the event client will disconnect from the peer if the peer's # block height falls behind the specified number of blocks and will reconnect to a better performing peer. - # If set to 0 then this feature is disabled. + # Note that this parameter is only applicable if peerMonitor is set to Enabled (default). # Default: 10 # NOTES: - # - peerMonitorPeriod must be >0 to enable this feature # - Setting this value too low may cause the event client to disconnect/reconnect too frequently, thereby # affecting performance. reconnectBlockHeightLagThreshold: 5 + # [Optional] peerMonitorPeriod is the period in which the connected peer is monitored to see if # the event client should disconnect from it and reconnect to another peer. - # Default: 0 (disabled) + # Default: 0 (disabled) for Balanced resolverStrategy; 5s for PreferOrg and MinBlockHeight strategy peerMonitorPeriod: 3s # Mychannel overrides initialBackoff for discovery diff --git a/test/integration/e2e/configless/endpointconfig_override_test.go b/test/integration/e2e/configless/endpointconfig_override_test.go index fe0aca2139..8f242542fe 100644 --- a/test/integration/e2e/configless/endpointconfig_override_test.go +++ b/test/integration/e2e/configless/endpointconfig_override_test.go @@ -100,6 +100,13 @@ var ( BackoffFactor: 2.0, }, }, + EventService: fab.EventServicePolicy{ + ResolverStrategy: fab.MinBlockHeightStrategy, + MinBlockHeightResolverMode: fab.ResolveByThreshold, + BlockHeightLagThreshold: 5, + ReconnectBlockHeightLagThreshold: 10, + PeerMonitorPeriod: 5 * time.Second, + }, }, }, "orgchannel": {