Skip to content

Commit

Permalink
[FAB-8537] Make event hub and deliver opts consistent
Browse files Browse the repository at this point in the history
- Removed NewFiltered from EventHubClient
- Removed unnecessary private functions
- Also removed mutex that was causing
  timeout errors in CI

Change-Id: I03646463c1c7d1bdaa5743149c729efdfe46027e
Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
  • Loading branch information
bstasyszyn committed Feb 26, 2018
1 parent 5e51983 commit 081c0ff
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 140 deletions.
4 changes: 0 additions & 4 deletions pkg/fab/events/deliverclient/deliverclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ type Client struct {

// New returns a new deliver event client
func New(context fabcontext.Context, channelID string, discoveryService fab.DiscoveryService, opts ...options.Opt) (*Client, error) {
return newClient(context, channelID, discoveryService, opts...)
}

func newClient(context fabcontext.Context, channelID string, discoveryService fab.DiscoveryService, opts ...options.Opt) (*Client, error) {
if channelID == "" {
return nil, errors.New("expecting channel ID")
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/fab/events/deliverclient/deliverclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ func TestOptionsInNewClient(t *testing.T) {
WithBlockEvents(),
)
if err != nil {
t.Fatalf("expecting error with no channel ID but got none")
t.Fatalf("error creating deliver client: %s", err)
}
client.Close()
}

func TestClientConnect(t *testing.T) {
eventClient, err := newClient(
eventClient, err := New(
newMockContext(), "mychannel",
clientmocks.NewDiscoveryService(peer1, peer2),
withConnectionProvider(
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestReconnectRegistration(t *testing.T) {
func testConnect(t *testing.T, maxConnectAttempts uint, expectedOutcome clientmocks.Outcome, connAttemptResult clientmocks.ConnectAttemptResults) {
cp := clientmocks.NewProviderFactory()

eventClient, err := newClient(
eventClient, err := New(
newMockContext(), "mychannel",
clientmocks.NewDiscoveryService(peer1, peer2),
withConnectionProvider(
Expand Down Expand Up @@ -218,7 +218,7 @@ func testReconnect(t *testing.T, reconnect bool, maxReconnectAttempts uint, expe
connectch := make(chan *fab.ConnectionEvent)
ledger := servicemocks.NewMockLedger(servicemocks.BlockEventFactory)

eventClient, err := newClient(
eventClient, err := New(
newMockContext(), "mychannel",
clientmocks.NewDiscoveryService(peer1, peer2),
withConnectionProvider(
Expand Down Expand Up @@ -288,7 +288,7 @@ func testReconnectRegistration(t *testing.T, connectResults clientmocks.ConnectA

cp := clientmocks.NewProviderFactory()

eventClient, err := newClient(
eventClient, err := New(
newMockContext(), channelID,
clientmocks.NewDiscoveryService(peer1, peer2),
withConnectionProvider(
Expand Down
6 changes: 3 additions & 3 deletions pkg/fab/events/deliverclient/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ type fromBlockSetter interface {
SetFromBlock(value uint64)
}

func (p *params) SetConnectionProvider(value api.ConnectionProvider, permitBlockEvents bool) {
logger.Debugf("ConnectionProvider: %#v", value)
p.connProvider = value
func (p *params) SetConnectionProvider(connProvider api.ConnectionProvider, permitBlockEvents bool) {
logger.Debugf("ConnectionProvider: %#v, PermitBlockEvents: %t", connProvider, permitBlockEvents)
p.connProvider = connProvider
p.permitBlockEvents = permitBlockEvents
}

Expand Down
20 changes: 4 additions & 16 deletions pkg/fab/events/eventhubclient/eventhubclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/eventhubclient/dispatcher"
"github.com/hyperledger/fabric-sdk-go/pkg/logging"
"github.com/hyperledger/fabric-sdk-go/pkg/options"
pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer"
"github.com/pkg/errors"
)

Expand All @@ -40,20 +39,10 @@ var ehConnProvider = func(channelID string, context context.Context, peer fab.Pe
type Client struct {
client.Client
params
interests []*pb.Interest
}

// New returns a new block event event hub client
// New returns a new event hub client
func New(context context.Context, channelID string, discoveryService fab.DiscoveryService, opts ...options.Opt) (*Client, error) {
return newClient(context, channelID, ehConnProvider, discoveryService, []*pb.Interest{&pb.Interest{EventType: pb.EventType_BLOCK}}, true, opts...)
}

// NewFiltered returns a new filtered block event hub client
func NewFiltered(context context.Context, channelID string, discoveryService fab.DiscoveryService, opts ...options.Opt) (*Client, error) {
return newClient(context, channelID, ehConnProvider, discoveryService, []*pb.Interest{&pb.Interest{EventType: pb.EventType_FILTEREDBLOCK}}, false, opts...)
}

func newClient(context context.Context, channelID string, connProvider api.ConnectionProvider, discoveryService fab.DiscoveryService, interests []*pb.Interest, permitBlockEvents bool, opts ...options.Opt) (*Client, error) {
if channelID == "" {
return nil, errors.New("expecting channel ID")
}
Expand All @@ -63,12 +52,11 @@ func newClient(context context.Context, channelID string, connProvider api.Conne

client := &Client{
Client: *client.New(
permitBlockEvents,
dispatcher.New(context, channelID, connProvider, discoveryService, opts...),
params.permitBlockEvents,
dispatcher.New(context, channelID, params.connProvider, discoveryService, opts...),
opts...,
),
params: *params,
interests: interests,
params: *params,
}
client.SetAfterConnectHandler(client.registerInterests)

Expand Down
178 changes: 73 additions & 105 deletions pkg/fab/events/eventhubclient/eventhubclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,27 +41,31 @@ var (
endpoint2 = newMockEventEndpoint("grpcs://peer2.example.com:7053")
)

func TestInvalidOptionsInNewClient(t *testing.T) {
// Filtered Client
if _, err := NewFiltered(newMockContext(), "", clientmocks.NewDiscoveryService(endpoint1, endpoint2)); err == nil {
t.Fatalf("expecting error with no channel ID but got none")
}
// Client
func TestOptionsInNewClient(t *testing.T) {
if _, err := New(newMockContext(), "", clientmocks.NewDiscoveryService(endpoint1, endpoint2)); err == nil {
t.Fatalf("expecting error with no channel ID but got none")
}

client, err := New(newMockContext(), "mychannel", clientmocks.NewDiscoveryService(endpoint1, endpoint2),
WithBlockEvents(),
)
if err != nil {
t.Fatalf("error creating new event hub client: %s", err)
}
client.Close()
}

func TestClientConnect(t *testing.T) {
eventClient, _, err := newClientWithMockConnAndOpts(
eventClient, err := New(
newMockContext(), "mychannel",
clientmocks.NewProviderFactory().Provider(
ehclientmocks.NewConnection(
clientmocks.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory)),
)),
filteredClientProvider,
clientmocks.NewDiscoveryService(endpoint1, endpoint2),
defaultOpts,
withConnectionProviderAndInterests(
clientmocks.NewProviderFactory().Provider(
ehclientmocks.NewConnection(
clientmocks.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory)),
)),
filteredBlockInterests, false,
),
)
if err != nil {
t.Fatalf("error creating channel event client: %s", err)
Expand All @@ -84,18 +88,19 @@ func TestClientConnect(t *testing.T) {
}

func TestTimeoutClientConnect(t *testing.T) {
eventClient, _, err := newClientWithMockConnAndOpts(
eventClient, err := New(
newMockContext(), "mychannel",
clientmocks.NewProviderFactory().Provider(
ehclientmocks.NewConnection(
clientmocks.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory)),
clientmocks.WithResults(
clientmocks.NewResult(ehmocks.RegInterests, clientmocks.NoOpResult),
),
)),
filteredClientProvider,
clientmocks.NewDiscoveryService(endpoint1, endpoint2),
defaultOpts,
withConnectionProviderAndInterests(
clientmocks.NewProviderFactory().Provider(
ehclientmocks.NewConnection(
clientmocks.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory)),
clientmocks.WithResults(
clientmocks.NewResult(ehmocks.RegInterests, clientmocks.NoOpResult),
),
)),
filteredBlockInterests, false,
),
)
if err != nil {
t.Fatalf("error creating channel event client: %s", err)
Expand Down Expand Up @@ -196,21 +201,21 @@ func TestReconnectRegistration(t *testing.T) {
}

func testConnect(t *testing.T, maxConnectAttempts uint, expectedOutcome clientmocks.Outcome, connAttemptResult clientmocks.ConnectAttemptResults) {
eventClient, _, err := newClientWithMockConnAndOpts(
eventClient, err := New(
newMockContext(), "mychannel",
clientmocks.NewProviderFactory().FlakeyProvider(
connAttemptResult,
clientmocks.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory)),
clientmocks.WithFactory(func(opts ...clientmocks.Opt) clientmocks.Connection {
return ehclientmocks.NewConnection(opts...)
}),
),
clientProvider,
clientmocks.NewDiscoveryService(endpoint1, endpoint2),
newOpts(
esdispatcher.WithEventConsumerTimeout(time.Second),
client.WithMaxConnectAttempts(maxConnectAttempts),
withConnectionProviderAndInterests(
clientmocks.NewProviderFactory().FlakeyProvider(
connAttemptResult,
clientmocks.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory)),
clientmocks.WithFactory(func(opts ...clientmocks.Opt) clientmocks.Connection {
return ehclientmocks.NewConnection(opts...)
}),
),
blockInterests, true,
),
esdispatcher.WithEventConsumerTimeout(time.Second),
client.WithMaxConnectAttempts(maxConnectAttempts),
)
if err != nil {
t.Fatalf("error creating channel event client: %s", err)
Expand All @@ -235,27 +240,27 @@ func testReconnect(t *testing.T, reconnect bool, maxReconnectAttempts uint, expe
connectch := make(chan *fab.ConnectionEvent)
ledger := servicemocks.NewMockLedger(servicemocks.BlockEventFactory)

eventClient, _, err := newClientWithMockConnAndOpts(
eventClient, err := New(
newMockContext(), "mychannel",
cp.FlakeyProvider(
connAttemptResult,
clientmocks.WithLedger(ledger),
clientmocks.WithFactory(func(opts ...clientmocks.Opt) clientmocks.Connection {
return ehclientmocks.NewConnection(opts...)
}),
),
clientProvider,
clientmocks.NewDiscoveryService(endpoint1, endpoint2),
newOpts(
esdispatcher.WithEventConsumerTimeout(3*time.Second),
client.WithReconnect(reconnect),
client.WithReconnectInitialDelay(0),
client.WithMaxConnectAttempts(1),
client.WithMaxReconnectAttempts(maxReconnectAttempts),
client.WithTimeBetweenConnectAttempts(time.Millisecond),
client.WithConnectionEvent(connectch),
client.WithResponseTimeout(2*time.Second),
withConnectionProviderAndInterests(
cp.FlakeyProvider(
connAttemptResult,
clientmocks.WithLedger(ledger),
clientmocks.WithFactory(func(opts ...clientmocks.Opt) clientmocks.Connection {
return ehclientmocks.NewConnection(opts...)
}),
),
blockInterests, true,
),
esdispatcher.WithEventConsumerTimeout(3*time.Second),
client.WithReconnect(reconnect),
client.WithReconnectInitialDelay(0),
client.WithMaxConnectAttempts(1),
client.WithMaxReconnectAttempts(maxReconnectAttempts),
client.WithTimeBetweenConnectAttempts(time.Millisecond),
client.WithConnectionEvent(connectch),
client.WithResponseTimeout(2*time.Second),
)
if err != nil {
t.Fatalf("error creating channel event client: %s", err)
Expand Down Expand Up @@ -294,24 +299,24 @@ func testReconnectRegistration(t *testing.T, expectedBlockEvents clientmocks.Num
ledger := servicemocks.NewMockLedger(servicemocks.BlockEventFactory)
cp := clientmocks.NewProviderFactory()

eventClient, _, err := newClientWithMockConnAndOpts(
eventClient, err := New(
newMockContext(), channelID,
cp.FlakeyProvider(
connectResults,
clientmocks.WithLedger(ledger),
clientmocks.WithFactory(func(opts ...clientmocks.Opt) clientmocks.Connection {
return ehclientmocks.NewConnection(opts...)
}),
),
clientProvider,
clientmocks.NewDiscoveryService(endpoint1, endpoint2),
newOpts(
esdispatcher.WithEventConsumerTimeout(3*time.Second),
client.WithReconnectInitialDelay(0),
client.WithMaxConnectAttempts(1),
client.WithMaxReconnectAttempts(1),
client.WithTimeBetweenConnectAttempts(time.Millisecond),
withConnectionProviderAndInterests(
cp.FlakeyProvider(
connectResults,
clientmocks.WithLedger(ledger),
clientmocks.WithFactory(func(opts ...clientmocks.Opt) clientmocks.Connection {
return ehclientmocks.NewConnection(opts...)
}),
),
blockInterests, true,
),
esdispatcher.WithEventConsumerTimeout(3*time.Second),
client.WithReconnectInitialDelay(0),
client.WithMaxConnectAttempts(1),
client.WithMaxReconnectAttempts(1),
client.WithTimeBetweenConnectAttempts(time.Millisecond),
)
if err != nil {
t.Fatalf("error creating channel event client: %s", err)
Expand Down Expand Up @@ -451,39 +456,6 @@ func listenEvents(blockch <-chan *fab.BlockEvent, ccch <-chan *fab.CCEvent, wait
}
}

type ClientProvider func(context context.Context, channelID string, connectionProvider api.ConnectionProvider, discoveryService fab.DiscoveryService, opts ...options.Opt) (*Client, error)

var clientProvider = func(context context.Context, channelID string, connectionProvider api.ConnectionProvider, discoveryService fab.DiscoveryService, opts ...options.Opt) (*Client, error) {
return newClient(context, channelID, connectionProvider, discoveryService, []*pb.Interest{&pb.Interest{EventType: pb.EventType_BLOCK}}, true, opts...)
}

var filteredClientProvider = func(context context.Context, channelID string, connectionProvider api.ConnectionProvider, discoveryService fab.DiscoveryService, opts ...options.Opt) (*Client, error) {
return newClient(context, channelID, connectionProvider, discoveryService, []*pb.Interest{&pb.Interest{EventType: pb.EventType_FILTEREDBLOCK}}, false, opts...)
}

func newClientWithMockConn(context context.Context, channelID string, clientProvider ClientProvider, connOpts ...clientmocks.Opt) (*Client, clientmocks.Connection, error) {
conn := ehclientmocks.NewConnection(connOpts...)
client, _, err := newClientWithMockConnAndOpts(
context, channelID,
clientmocks.NewProviderFactory().Provider(conn),
clientProvider,
clientmocks.NewDiscoveryService(endpoint1, endpoint2),
defaultOpts,
)
return client, conn, err
}

func newClientWithMockConnAndOpts(context context.Context, channelID string, connectionProvider api.ConnectionProvider, clientProvider ClientProvider, discoveryService fab.DiscoveryService, opts []options.Opt, connOpts ...clientmocks.Opt) (*Client, clientmocks.Connection, error) {
var conn *ehclientmocks.MockConnection
if connectionProvider == nil {
conn = ehclientmocks.NewConnection(connOpts...)
connectionProvider = clientmocks.NewProviderFactory().Provider(conn)
}

client, err := clientProvider(context, channelID, connectionProvider, discoveryService, opts...)
return client, conn, err
}

func newMockContext() context.Context {
return fabmocks.NewMockContext(fabmocks.NewMockUser("user1"))
}
Expand All @@ -493,7 +465,3 @@ func newMockEventEndpoint(url string) api.EventEndpoint {
EvtURL: url,
}
}

func newOpts(opts ...options.Opt) []options.Opt {
return opts
}
Loading

0 comments on commit 081c0ff

Please sign in to comment.