Skip to content

Commit

Permalink
[FAB-9170] Define one WithBlockEvents option
Browse files Browse the repository at this point in the history
The WithBlockEvents options in deliverclient
and eventhubclient were deleted and a new one
was added to the abstract client, which both
deliver and eventhub use.

Change-Id: I18e85a90e07e7ae102ff7a14b58cbeb5735dc558
Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
  • Loading branch information
bstasyszyn committed Mar 26, 2018
1 parent 84bff1a commit 6880d84
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 85 deletions.
22 changes: 10 additions & 12 deletions pkg/fab/events/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,27 +41,25 @@ type Client struct {
eventservice.Service
params
sync.RWMutex
connEvent chan *dispatcher.ConnectionEvent
connectionState int32
stopped int32
registerOnce sync.Once
permitBlockEvents bool
afterConnect handler
beforeReconnect handler
connEvent chan *dispatcher.ConnectionEvent
connectionState int32
stopped int32
registerOnce sync.Once
afterConnect handler
beforeReconnect handler
}

type handler func() error

// New returns a new event client
func New(permitBlockEvents bool, dispatcher eventservice.Dispatcher, opts ...options.Opt) *Client {
func New(dispatcher eventservice.Dispatcher, opts ...options.Opt) *Client {
params := defaultParams()
options.Apply(params, opts)

return &Client{
Service: *eventservice.New(dispatcher, opts...),
params: *params,
connectionState: int32(Disconnected),
permitBlockEvents: permitBlockEvents,
Service: *eventservice.New(dispatcher, opts...),
params: *params,
connectionState: int32(Disconnected),
}
}

Expand Down
11 changes: 6 additions & 5 deletions pkg/fab/events/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1226,7 +1226,8 @@ func listenEvents(blockch <-chan *fab.BlockEvent, ccch <-chan *fab.CCEvent, wait
type ClientProvider func(context context.Client, chConfig fab.ChannelCfg, connectionProvider api.ConnectionProvider, opts []options.Opt) (*Client, error)

var clientProvider = func(context context.Client, chConfig fab.ChannelCfg, connectionProvider api.ConnectionProvider, opts []options.Opt) (*Client, error) {
return newClient(context, chConfig, connectionProvider, opts, true,
opts = append(opts, WithBlockEvents())
return newClient(context, chConfig, connectionProvider, opts,
func() error {
fmt.Printf("AfterConnect called")
return nil
Expand All @@ -1238,15 +1239,16 @@ var clientProvider = func(context context.Client, chConfig fab.ChannelCfg, conne
}

var failAfterConnectClientProvider = func(context context.Client, chConfig fab.ChannelCfg, connectionProvider api.ConnectionProvider, opts []options.Opt) (*Client, error) {
return newClient(context, chConfig, connectionProvider, opts, true,
opts = append(opts, WithBlockEvents())
return newClient(context, chConfig, connectionProvider, opts,
func() error {
return errors.New("simulated failure after connect")
},
nil)
}

var filteredClientProvider = func(context context.Client, chConfig fab.ChannelCfg, connectionProvider api.ConnectionProvider, opts []options.Opt) (*Client, error) {
return newClient(context, chConfig, connectionProvider, opts, false,
return newClient(context, chConfig, connectionProvider, opts,
func() error {
fmt.Printf("AfterConnect called")
return nil
Expand All @@ -1257,9 +1259,8 @@ var filteredClientProvider = func(context context.Client, chConfig fab.ChannelCf
})
}

func newClient(context context.Client, chConfig fab.ChannelCfg, connectionProvider api.ConnectionProvider, opts []options.Opt, permitBlockEvents bool, afterConnect handler, beforeReconnect handler) (*Client, error) {
func newClient(context context.Client, chConfig fab.ChannelCfg, connectionProvider api.ConnectionProvider, opts []options.Opt, afterConnect handler, beforeReconnect handler) (*Client, error) {
client := New(
permitBlockEvents,
dispatcher.New(
context, chConfig,
connectionProvider,
Expand Down
20 changes: 20 additions & 0 deletions pkg/fab/events/client/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type params struct {
timeBetweenConnAttempts time.Duration
connEventCh chan *dispatcher.ConnectionEvent
respTimeout time.Duration
permitBlockEvents bool
}

func defaultParams() *params {
Expand All @@ -36,6 +37,16 @@ func defaultParams() *params {
}
}

// WithBlockEvents indicates that block events are to be received.
// Note that the caller must have sufficient privileges for this option.
func WithBlockEvents() options.Opt {
return func(p options.Params) {
if setter, ok := p.(permitBlockEventsSetter); ok {
setter.PermitBlockEvents()
}
}
}

// WithReconnect indicates whether the client should automatically attempt to reconnect
// to the server after a connection has been lost
func WithReconnect(value bool) options.Opt {
Expand Down Expand Up @@ -143,6 +154,11 @@ func (p *params) SetResponseTimeout(value time.Duration) {
p.respTimeout = value
}

func (p *params) PermitBlockEvents() {
logger.Debugf("PermitBlockEvents")
p.permitBlockEvents = true
}

type reconnectSetter interface {
SetReconnect(value bool)
}
Expand Down Expand Up @@ -170,3 +186,7 @@ type timeBetweenConnectAttemptsSetter interface {
type responseTimeoutSetter interface {
SetResponseTimeout(value time.Duration)
}

type permitBlockEventsSetter interface {
PermitBlockEvents()
}
1 change: 0 additions & 1 deletion pkg/fab/events/deliverclient/deliverclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func New(context fabcontext.Client, chConfig fab.ChannelCfg, opts ...options.Opt

client := &Client{
Client: *client.New(
params.permitBlockEvents,
dispatcher.New(deliverCtx, chConfig, params.connProvider, opts...),
opts...,
),
Expand Down
10 changes: 5 additions & 5 deletions pkg/fab/events/deliverclient/deliverclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestOptionsInNewClient(t *testing.T) {
client, err := New(
newMockContext(),
fabmocks.NewMockChannelCfg(channelID),
WithBlockEvents(),
client.WithBlockEvents(),
)
if err != nil {
t.Fatalf("error creating deliver client: %s", err)
Expand All @@ -59,13 +59,13 @@ func TestClientConnect(t *testing.T) {
eventClient, err := New(
newMockContext(),
fabmocks.NewMockChannelCfg(channelID),
client.WithBlockEvents(),
withConnectionProvider(
clientmocks.NewProviderFactory().Provider(
delivermocks.NewConnection(
clientmocks.WithLedger(servicemocks.NewMockLedger(delivermocks.BlockEventFactory, sourceURL)),
),
),
true,
),
WithSeekType(seek.FromBlock),
WithBlockNum(0),
Expand Down Expand Up @@ -187,6 +187,7 @@ func testConnect(t *testing.T, maxConnectAttempts uint, expectedOutcome clientmo
eventClient, err := New(
newMockContext(),
fabmocks.NewMockChannelCfg(channelID),
client.WithBlockEvents(),
withConnectionProvider(
cp.FlakeyProvider(
connAttemptResult,
Expand All @@ -195,7 +196,6 @@ func testConnect(t *testing.T, maxConnectAttempts uint, expectedOutcome clientmo
return delivermocks.NewConnection(opts...)
}),
),
true,
),
esdispatcher.WithEventConsumerTimeout(time.Second),
client.WithMaxConnectAttempts(maxConnectAttempts),
Expand Down Expand Up @@ -227,6 +227,7 @@ func testReconnect(t *testing.T, reconnect bool, maxReconnectAttempts uint, expe
eventClient, err := New(
newMockContext(),
fabmocks.NewMockChannelCfg(channelID),
client.WithBlockEvents(),
withConnectionProvider(
cp.FlakeyProvider(
connAttemptResult,
Expand All @@ -235,7 +236,6 @@ func testReconnect(t *testing.T, reconnect bool, maxReconnectAttempts uint, expe
return delivermocks.NewConnection(opts...)
}),
),
true,
),
esdispatcher.WithEventConsumerTimeout(3*time.Second),
client.WithReconnect(reconnect),
Expand Down Expand Up @@ -297,6 +297,7 @@ func testReconnectRegistration(t *testing.T, connectResults clientmocks.ConnectA
eventClient, err := New(
newMockContext(),
fabmocks.NewMockChannelCfg(channelID),
client.WithBlockEvents(),
withConnectionProvider(
cp.FlakeyProvider(
connectResults,
Expand All @@ -305,7 +306,6 @@ func testReconnectRegistration(t *testing.T, connectResults clientmocks.ConnectA
return delivermocks.NewConnection(opts...)
}),
),
true,
),
esdispatcher.WithEventConsumerTimeout(3*time.Second),
client.WithReconnect(true),
Expand Down
37 changes: 16 additions & 21 deletions pkg/fab/events/deliverclient/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ import (
)

type params struct {
connProvider api.ConnectionProvider
permitBlockEvents bool
seekType seek.Type
fromBlock uint64
respTimeout time.Duration
connProvider api.ConnectionProvider
seekType seek.Type
fromBlock uint64
respTimeout time.Duration
}

func defaultParams() *params {
Expand All @@ -30,16 +29,6 @@ func defaultParams() *params {
}
}

// WithBlockEvents indicates that block events are to be received.
// Note that the caller must have sufficient privileges for this option.
func WithBlockEvents() options.Opt {
return func(p options.Params) {
if setter, ok := p.(connectionProviderSetter); ok {
setter.SetConnectionProvider(deliverProvider, true)
}
}
}

// WithSeekType specifies the point from which block events are to be received.
func WithSeekType(value seek.Type) options.Opt {
return func(p options.Params) {
Expand All @@ -60,16 +49,17 @@ func WithBlockNum(value uint64) options.Opt {
}

// withConnectionProvider is used only for testing
func withConnectionProvider(connProvider api.ConnectionProvider, permitBlockEvents bool) options.Opt {
func withConnectionProvider(connProvider api.ConnectionProvider) options.Opt {
return func(p options.Params) {
if setter, ok := p.(connectionProviderSetter); ok {
setter.SetConnectionProvider(connProvider, permitBlockEvents)
setter.SetConnectionProvider(connProvider)
}
}
}

// connectionProviderSetter is only used in unit tests
type connectionProviderSetter interface {
SetConnectionProvider(value api.ConnectionProvider, permitBlockEvents bool)
SetConnectionProvider(value api.ConnectionProvider)
}

type seekTypeSetter interface {
Expand All @@ -80,10 +70,15 @@ type fromBlockSetter interface {
SetFromBlock(value uint64)
}

func (p *params) SetConnectionProvider(connProvider api.ConnectionProvider, permitBlockEvents bool) {
logger.Debugf("ConnectionProvider: %#v, PermitBlockEvents: %t", connProvider, permitBlockEvents)
func (p *params) PermitBlockEvents() {
logger.Debugf("PermitBlockEvents")
p.connProvider = deliverProvider
}

// SetConnectionProvider is only used in unit tests
func (p *params) SetConnectionProvider(connProvider api.ConnectionProvider) {
logger.Debugf("ConnectionProvider: %#v", connProvider)
p.connProvider = connProvider
p.permitBlockEvents = permitBlockEvents
}

func (p *params) SetFromBlock(value uint64) {
Expand Down
6 changes: 5 additions & 1 deletion pkg/fab/events/eventhubclient/eventhubclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ type Client struct {
// New returns a new event hub client
func New(context context.Client, chConfig fab.ChannelCfg, opts ...options.Opt) (*Client, error) {
params := defaultParams()

// FIXME: Temporarily set the default to block events since Fabric 1.0 does
// not support filtered block events
opts = append(opts, client.WithBlockEvents())

options.Apply(params, opts)

// Use a context that returns a custom Discovery Provider which
Expand All @@ -51,7 +56,6 @@ func New(context context.Client, chConfig fab.ChannelCfg, opts ...options.Opt) (

client := &Client{
Client: *client.New(
params.permitBlockEvents,
dispatcher.New(ehCtx, chConfig, params.connProvider, opts...),
opts...,
),
Expand Down
20 changes: 9 additions & 11 deletions pkg/fab/events/eventhubclient/eventhubclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestOptionsInNewClient(t *testing.T) {
client, err := New(
newMockContext(),
fabmocks.NewMockChannelCfg(channelID),
WithBlockEvents(),
client.WithBlockEvents(),
)
if err != nil {
t.Fatalf("error creating new event hub client: %s", err)
Expand All @@ -62,12 +62,11 @@ func TestClientConnect(t *testing.T) {
eventClient, err := New(
newMockContext(),
fabmocks.NewMockChannelCfg(channelID),
withConnectionProviderAndInterests(
withConnectionProvider(
clientmocks.NewProviderFactory().Provider(
ehclientmocks.NewConnection(
clientmocks.WithLedger(servicemocks.NewMockLedger(ehmocks.BlockEventFactory, sourceURL)),
)),
filteredBlockInterests, false,
),
)
if err != nil {
Expand Down Expand Up @@ -95,15 +94,14 @@ func TestTimeoutClientConnect(t *testing.T) {
eventClient, err := New(
newMockContext(),
fabmocks.NewMockChannelCfg(channelID),
withConnectionProviderAndInterests(
withConnectionProvider(
clientmocks.NewProviderFactory().Provider(
ehclientmocks.NewConnection(
clientmocks.WithLedger(servicemocks.NewMockLedger(ehmocks.BlockEventFactory, sourceURL)),
clientmocks.WithResults(
clientmocks.NewResult(ehmocks.RegInterests, clientmocks.NoOpResult),
),
)),
filteredBlockInterests, false,
),
)
if err != nil {
Expand Down Expand Up @@ -209,15 +207,15 @@ func testConnect(t *testing.T, maxConnectAttempts uint, expectedOutcome clientmo
eventClient, err := New(
newMockContext(),
fabmocks.NewMockChannelCfg(channelID),
withConnectionProviderAndInterests(
client.WithBlockEvents(),
withConnectionProvider(
clientmocks.NewProviderFactory().FlakeyProvider(
connAttemptResult,
clientmocks.WithLedger(servicemocks.NewMockLedger(ehmocks.BlockEventFactory, sourceURL)),
clientmocks.WithFactory(func(opts ...clientmocks.Opt) clientmocks.Connection {
return ehclientmocks.NewConnection(opts...)
}),
),
blockInterests, true,
),
esdispatcher.WithEventConsumerTimeout(time.Second),
client.WithMaxConnectAttempts(maxConnectAttempts),
Expand Down Expand Up @@ -249,15 +247,15 @@ func testReconnect(t *testing.T, reconnect bool, maxReconnectAttempts uint, expe
eventClient, err := New(
newMockContext(),
fabmocks.NewMockChannelCfg(channelID),
withConnectionProviderAndInterests(
client.WithBlockEvents(),
withConnectionProvider(
cp.FlakeyProvider(
connAttemptResult,
clientmocks.WithLedger(ledger),
clientmocks.WithFactory(func(opts ...clientmocks.Opt) clientmocks.Connection {
return ehclientmocks.NewConnection(opts...)
}),
),
blockInterests, true,
),
esdispatcher.WithEventConsumerTimeout(3*time.Second),
client.WithReconnect(reconnect),
Expand Down Expand Up @@ -308,15 +306,15 @@ func testReconnectRegistration(t *testing.T, expectedBlockEvents clientmocks.Num
eventClient, err := New(
newMockContext(),
fabmocks.NewMockChannelCfg(channelID),
withConnectionProviderAndInterests(
client.WithBlockEvents(),
withConnectionProvider(
cp.FlakeyProvider(
connectResults,
clientmocks.WithLedger(ledger),
clientmocks.WithFactory(func(opts ...clientmocks.Opt) clientmocks.Connection {
return ehclientmocks.NewConnection(opts...)
}),
),
blockInterests, true,
),
esdispatcher.WithEventConsumerTimeout(3*time.Second),
client.WithReconnectInitialDelay(0),
Expand Down
Loading

0 comments on commit 6880d84

Please sign in to comment.