diff --git a/doc.go b/doc.go index f59ef1b19c..b6f7a26227 100644 --- a/doc.go +++ b/doc.go @@ -15,6 +15,9 @@ SPDX-License-Identifier: Apache-2.0 // pkg/client/channel: Provides channel transaction capabilities. // Reference: https://godoc.org/github.com/hyperledger/fabric-sdk-go/pkg/client/channel // +// pkg/client/event: Provides channel event capabilities. +// Reference: https://godoc.org/github.com/hyperledger/fabric-sdk-go/pkg/client/event +// // pkg/client/ledger: Enables queries to a channel's underlying ledger. // Reference: https://godoc.org/github.com/hyperledger/fabric-sdk-go/pkg/client/ledger // diff --git a/pkg/client/event/event.go b/pkg/client/event/event.go new file mode 100644 index 0000000000..656809aca8 --- /dev/null +++ b/pkg/client/event/event.go @@ -0,0 +1,132 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +// Package event enables access to a channel events on a Fabric network. Event client receives events such as block, filtered block, +// chaincode, and transaction status events. +// +// // prepare channel client context +// org1ChannelClientContext := sdk.ChannelContext("mychannel", fabsdk.WithUser("User1"), fabsdk.WithOrg("Org1")) +// +// // create default event client (with filtered block events) +// eventClient, _ := event.New(org1ChannelClientContext) +// if err != nil { +// t.Fatalf("Failed to create new events client: %s", err) +// } +// +// // Register chaincode event (returns channel which receives event details when the event is complete) +// reg, notifier, err := eventClient.RegisterChaincodeEvent("eventcc", "event123") +// if err != nil { +// t.Fatalf("Failed to register cc event: %s", err) +// } +// defer eventClient.Unregister(reg) +// +// select { +// case ccEvent := <-notifier: +// t.Logf("Received cc event: %#v", ccEvent) +// case <-time.After(time.Second * 20): +// t.Fatalf("Did NOT receive CC event for 'event123'") +// } +package event + +import ( + "github.com/hyperledger/fabric-sdk-go/pkg/common/logging" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client" + "github.com/pkg/errors" +) + +var logger = logging.NewLogger("fabsdk/client") + +// Client enables access to a channel events on a Fabric network. +type Client struct { + eventService fab.EventService + permitBlockEvents bool +} + +// New returns a Client instance. Client receives events such as block, filtered block, +// chaincode, and transaction status events. +func New(channelProvider context.ChannelProvider, opts ...ClientOption) (*Client, error) { + + channelContext, err := channelProvider() + if err != nil { + return nil, errors.WithMessage(err, "failed to create channel context") + } + + eventClient := Client{} + + for _, param := range opts { + err := param(&eventClient) + if err != nil { + return nil, errors.WithMessage(err, "option failed") + } + } + + if channelContext.ChannelService() == nil { + return nil, errors.New("channel service not initialized") + } + + var es fab.EventService + if eventClient.permitBlockEvents { + es, err = channelContext.ChannelService().EventService(client.WithBlockEvents()) + } else { + es, err = channelContext.ChannelService().EventService() + } + + if err != nil { + return nil, errors.WithMessage(err, "event service creation failed") + } + + eventClient.eventService = es + + return &eventClient, nil +} + +// RegisterBlockEvent registers for block events. If the caller does not have permission +// to register for block events then an error is returned. Unregister must be called when the registration is no longer needed. +// Parameters: +// filter is an optional filter that filters out unwanted events. (Note: Only one filter may be specified.) +// +// Returns: +// the registration and a channel that is used to receive events. The channel is closed when Unregister is called. +func (c *Client) RegisterBlockEvent(filter ...fab.BlockFilter) (fab.Registration, <-chan *fab.BlockEvent, error) { + return c.eventService.RegisterBlockEvent(filter...) +} + +// RegisterFilteredBlockEvent registers for filtered block events. Unregister must be called when the registration is no longer needed. +// Returns: +// the registration and a channel that is used to receive events. The channel is closed when Unregister is called. +func (c *Client) RegisterFilteredBlockEvent() (fab.Registration, <-chan *fab.FilteredBlockEvent, error) { + return c.eventService.RegisterFilteredBlockEvent() +} + +// RegisterChaincodeEvent registers for chaincode events. Unregister must be called when the registration is no longer needed. +// Parameters: +// ccID is the chaincode ID for which events are to be received +// eventFilter is the chaincode event filter (regular expression) for which events are to be received +// +// Returns: +// the registration and a channel that is used to receive events. The channel is closed when Unregister is called. +func (c *Client) RegisterChaincodeEvent(ccID, eventFilter string) (fab.Registration, <-chan *fab.CCEvent, error) { + return c.eventService.RegisterChaincodeEvent(ccID, eventFilter) +} + +// RegisterTxStatusEvent registers for transaction status events. Unregister must be called when the registration is no longer needed. +// Parameters: +// txID is the transaction ID for which events are to be received +// +// Returns: +// the registration and a channel that is used to receive events. The channel is closed when Unregister is called. +func (c *Client) RegisterTxStatusEvent(txID string) (fab.Registration, <-chan *fab.TxStatusEvent, error) { + return c.eventService.RegisterTxStatusEvent(txID) +} + +// Unregister removes the given registration and closes the event channel. +// Parameters: +// reg is the registration handle that was returned from one of the Register functions +func (c *Client) Unregister(reg fab.Registration) { + c.eventService.Unregister(reg) +} diff --git a/pkg/client/event/event_test.go b/pkg/client/event/event_test.go new file mode 100644 index 0000000000..aac809018a --- /dev/null +++ b/pkg/client/event/event_test.go @@ -0,0 +1,441 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ +package event + +import ( + "testing" + "time" + + txnmocks "github.com/hyperledger/fabric-sdk-go/pkg/client/common/mocks" + "github.com/hyperledger/fabric-sdk-go/pkg/common/options" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" + contextImpl "github.com/hyperledger/fabric-sdk-go/pkg/context" + fcmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" + mspmocks "github.com/hyperledger/fabric-sdk-go/pkg/msp/test/mockmsp" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/dispatcher" + servicemocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/mocks" + pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" +) + +var ( + channelID = "testChannel" + defaultOpts = []options.Opt{} + sourceURL = "localhost:9051" +) + +func TestNewEventClient(t *testing.T) { + + fabCtx := setupCustomTestContext(t, nil) + ctx := createChannelContext(fabCtx, channelID) + + _, err := New(ctx) + if err != nil { + t.Fatalf("Failed to create new event client: %s", err) + } + + _, err = New(ctx, WithBlockEvents()) + if err != nil { + t.Fatalf("Failed to create new event client: %s", err) + } + + ctxErr := createChannelContextWithError(fabCtx, channelID) + _, err = New(ctxErr) + if err == nil { + t.Fatalf("Should have failed with 'Test Error'") + } +} + +func TestBlockEvents(t *testing.T) { + + eventService, eventProducer, err := newServiceWithMockProducer(defaultOpts, withBlockLedger(sourceURL)) + if err != nil { + t.Fatalf("error creating channel event client: %s", err) + } + defer eventProducer.Close() + defer eventService.Stop() + + fabCtx := setupCustomTestContext(t, nil) + ctx := createChannelContext(fabCtx, channelID) + + client, err := New(ctx) + if err != nil { + t.Fatalf("Failed to create new event client: %s", err) + } + + client.eventService = eventService + + registration, eventch, err := client.RegisterBlockEvent() + if err != nil { + t.Fatalf("error registering for block events: %s", err) + } + defer client.Unregister(registration) + + eventProducer.Ledger().NewBlock(channelID) + + select { + case _, ok := <-eventch: + if !ok { + t.Fatalf("unexpected closed channel") + } + case <-time.After(5 * time.Second): + t.Fatalf("timed out waiting for block event") + } +} + +func TestFilteredBlockEvents(t *testing.T) { + + eventService, eventProducer, err := newServiceWithMockProducer(defaultOpts, withFilteredBlockLedger(sourceURL)) + if err != nil { + t.Fatalf("error creating channel event client: %s", err) + } + defer eventProducer.Close() + defer eventService.Stop() + + fabCtx := setupCustomTestContext(t, nil) + ctx := createChannelContext(fabCtx, channelID) + + client, err := New(ctx) + if err != nil { + t.Fatalf("Failed to create new event client: %s", err) + } + + client.eventService = eventService + + registration, eventch, err := client.RegisterFilteredBlockEvent() + if err != nil { + t.Fatalf("error registering for filtered block events: %s", err) + } + defer client.Unregister(registration) + + txID1 := "1234" + txCode1 := pb.TxValidationCode_VALID + txID2 := "5678" + txCode2 := pb.TxValidationCode_ENDORSEMENT_POLICY_FAILURE + + eventProducer.Ledger().NewFilteredBlock( + channelID, + servicemocks.NewFilteredTx(txID1, txCode1), + servicemocks.NewFilteredTx(txID2, txCode2), + ) + + select { + case fbevent, ok := <-eventch: + if !ok { + t.Fatalf("unexpected closed channel") + } + if fbevent.FilteredBlock == nil { + t.Fatalf("Expecting filtered block but got nil") + } + if fbevent.FilteredBlock.ChannelId != channelID { + t.Fatalf("Expecting channel [%s] but got [%s]", channelID, fbevent.FilteredBlock.ChannelId) + } + case <-time.After(5 * time.Second): + t.Fatalf("timed out waiting for filtered block event") + } +} + +func TestTxStatusEvents(t *testing.T) { + channelID := "mychannel" + eventService, eventProducer, err := newServiceWithMockProducer(defaultOpts, withFilteredBlockLedger(sourceURL)) + if err != nil { + t.Fatalf("error creating channel event client: %s", err) + } + defer eventProducer.Close() + defer eventService.Stop() + + fabCtx := setupCustomTestContext(t, nil) + ctx := createChannelContext(fabCtx, channelID) + + client, err := New(ctx) + if err != nil { + t.Fatalf("Failed to create new event client: %s", err) + } + + client.eventService = eventService + + txID1 := "1234" + txCode1 := pb.TxValidationCode_VALID + txID2 := "5678" + txCode2 := pb.TxValidationCode_ENDORSEMENT_POLICY_FAILURE + + if _, _, err := client.RegisterTxStatusEvent(""); err == nil { + t.Fatalf("expecting error registering for TxStatus event without a TX ID but got none") + } + + reg1, eventch1, err := client.RegisterTxStatusEvent(txID1) + if err != nil { + t.Fatalf("error registering for TxStatus events: %s", err) + } + defer client.Unregister(reg1) + + reg2, eventch2, err := client.RegisterTxStatusEvent(txID2) + if err != nil { + t.Fatalf("error registering for TxStatus events: %s", err) + } + defer client.Unregister(reg2) + + eventProducer.Ledger().NewFilteredBlock( + channelID, + servicemocks.NewFilteredTx(txID1, txCode1), + servicemocks.NewFilteredTx(txID2, txCode2), + ) + + numExpected := 2 + numReceived := 0 + done := false + for !done { + select { + case event, ok := <-eventch1: + if !ok { + t.Fatalf("unexpected closed channel") + } else { + checkTxStatusEvent(t, event, txID1, txCode1) + numReceived++ + } + case event, ok := <-eventch2: + if !ok { + t.Fatalf("unexpected closed channel") + } else { + checkTxStatusEvent(t, event, txID2, txCode2) + numReceived++ + } + case <-time.After(5 * time.Second): + t.Fatalf("timed out waiting for [%d] TxStatus events. Only received [%d]", numExpected, numReceived) + } + + if numReceived == numExpected { + break + } + } +} + +func TestCCEvents(t *testing.T) { + channelID := "mychannel" + eventService, eventProducer, err := newServiceWithMockProducer(defaultOpts, withFilteredBlockLedger(sourceURL)) + if err != nil { + t.Fatalf("error creating channel event client: %s", err) + } + defer eventProducer.Close() + defer eventService.Stop() + + fabCtx := setupCustomTestContext(t, nil) + ctx := createChannelContext(fabCtx, channelID) + + client, err := New(ctx) + if err != nil { + t.Fatalf("Failed to create new event client: %s", err) + } + + client.eventService = eventService + + ccID1 := "mycc1" + ccID2 := "mycc2" + ccFilter1 := "event1" + ccFilter2 := "event.*" + event1 := "event1" + event2 := "event2" + event3 := "event3" + + if _, _, err := client.RegisterChaincodeEvent("", ccFilter1); err == nil { + t.Fatalf("expecting error registering for chaincode events without CC ID but got none") + } + + reg1, eventch1, err := client.RegisterChaincodeEvent(ccID1, ccFilter1) + if err != nil { + t.Fatalf("error registering for block events: %s", err) + } + defer client.Unregister(reg1) + + reg2, eventch2, err := client.RegisterChaincodeEvent(ccID2, ccFilter2) + if err != nil { + t.Fatalf("error registering for chaincode events: %s", err) + } + defer client.Unregister(reg2) + + eventProducer.Ledger().NewFilteredBlock( + channelID, + servicemocks.NewFilteredTxWithCCEvent("txid1", ccID1, event1), + servicemocks.NewFilteredTxWithCCEvent("txid2", ccID2, event2), + servicemocks.NewFilteredTxWithCCEvent("txid3", ccID2, event3), + ) + + numExpected := 3 + numReceived := 0 + done := false + for !done { + select { + case event, ok := <-eventch1: + if !ok { + t.Fatalf("unexpected closed channel") + } else { + checkCCEvent(t, event, ccID1, event1) + numReceived++ + } + case event, ok := <-eventch2: + if !ok { + t.Fatalf("unexpected closed channel") + } else { + checkCCEvent(t, event, ccID2, event2, event3) + numReceived++ + } + case <-time.After(5 * time.Second): + t.Fatalf("timed out waiting for [%d] CC events. Only received [%d]", numExpected, numReceived) + } + + if numReceived == numExpected { + break + } + } +} + +func checkCCEvent(t *testing.T, event *fab.CCEvent, expectedCCID string, expectedEventNames ...string) { + if event.ChaincodeID != expectedCCID { + t.Fatalf("expecting event for CC [%s] but received event for CC [%s]", expectedCCID, event.ChaincodeID) + } + found := false + for _, eventName := range expectedEventNames { + if event.EventName == eventName { + found = true + break + } + } + if !found { + t.Fatalf("expecting one of [%v] but received [%s]", expectedEventNames, event.EventName) + } +} + +func checkTxStatusEvent(t *testing.T, event *fab.TxStatusEvent, expectedTxID string, expectedCode pb.TxValidationCode) { + if event.TxID != expectedTxID { + t.Fatalf("expecting event for TxID [%s] but received event for TxID [%s]", expectedTxID, event.TxID) + } + if event.TxValidationCode != expectedCode { + t.Fatalf("expecting TxValidationCode [%s] but received [%s]", expectedCode, event.TxValidationCode) + } +} + +func setupCustomTestContext(t *testing.T, orderers []fab.Orderer) context.ClientProvider { + user := mspmocks.NewMockSigningIdentity("test", "test") + ctx := fcmocks.NewMockContext(user) + + if orderers == nil { + orderer := fcmocks.NewMockOrderer("", nil) + orderers = []fab.Orderer{orderer} + } + + transactor := txnmocks.MockTransactor{ + Ctx: ctx, + ChannelID: channelID, + Orderers: orderers, + } + + ctx.InfraProvider().(*fcmocks.MockInfraProvider).SetCustomTransactor(&transactor) + + testChannelSvc, err := setupTestChannelService(ctx, orderers) + assert.Nil(t, err, "Got error %s", err) + + channelProvider := ctx.MockProviderContext.ChannelProvider() + channelProvider.(*fcmocks.MockChannelProvider).SetCustomChannelService(testChannelSvc) + + return createClientContext(ctx) +} + +func setupTestChannelService(ctx context.Client, orderers []fab.Orderer) (fab.ChannelService, error) { + chProvider, err := fcmocks.NewMockChannelProvider(ctx) + if err != nil { + return nil, errors.WithMessage(err, "mock channel provider creation failed") + } + + chService, err := chProvider.ChannelService(ctx, channelID) + if err != nil { + return nil, errors.WithMessage(err, "mock channel service creation failed") + } + + return chService, nil +} + +func createChannelContext(clientContext context.ClientProvider, channelID string) context.ChannelProvider { + + channelProvider := func() (context.Channel, error) { + return contextImpl.NewChannel(clientContext, channelID) + } + + return channelProvider +} + +func createChannelContextWithError(clientContext context.ClientProvider, channelID string) context.ChannelProvider { + + channelProvider := func() (context.Channel, error) { + return nil, errors.New("Test Error") + } + + return channelProvider +} + +func createClientContext(client context.Client) context.ClientProvider { + return func() (context.Client, error) { + return client, nil + } +} + +type producerOpts struct { + ledger *servicemocks.MockLedger +} + +type producerOpt func(opts *producerOpts) + +func withBlockLedger(source string) producerOpt { + return func(opts *producerOpts) { + opts.ledger = servicemocks.NewMockLedger(servicemocks.BlockEventFactory, source) + } +} + +func withFilteredBlockLedger(source string) producerOpt { + return func(opts *producerOpts) { + opts.ledger = servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory, source) + } +} + +func newServiceWithMockProducer(opts []options.Opt, pOpts ...producerOpt) (*service.Service, *servicemocks.MockProducer, error) { + service := service.New(dispatcher.New(opts...), opts...) + if err := service.Start(); err != nil { + return nil, nil, err + } + + eventch, err := service.Dispatcher().EventCh() + if err != nil { + return nil, nil, err + } + + popts := producerOpts{} + for _, opt := range pOpts { + opt(&popts) + } + + ledger := popts.ledger + if popts.ledger == nil { + ledger = servicemocks.NewMockLedger(servicemocks.BlockEventFactory, sourceURL) + } + + eventProducer := servicemocks.NewMockProducer(ledger) + producerch := eventProducer.Register() + + go func() { + for { + event, ok := <-producerch + if !ok { + return + } + eventch <- event + } + }() + + return service, eventProducer, nil +} diff --git a/pkg/client/event/opts.go b/pkg/client/event/opts.go new file mode 100644 index 0000000000..74c896e2a0 --- /dev/null +++ b/pkg/client/event/opts.go @@ -0,0 +1,18 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package event + +// ClientOption describes a functional parameter for the New constructor +type ClientOption func(*Client) error + +// WithBlockEvents option +func WithBlockEvents() ClientOption { + return func(c *Client) error { + c.permitBlockEvents = true + return nil + } +} diff --git a/pkg/common/providers/fab/context.go b/pkg/common/providers/fab/context.go index 5771d2c173..ba24a9e364 100644 --- a/pkg/common/providers/fab/context.go +++ b/pkg/common/providers/fab/context.go @@ -6,10 +6,12 @@ SPDX-License-Identifier: Apache-2.0 package fab +import "github.com/hyperledger/fabric-sdk-go/pkg/common/options" + // ChannelService supplies services related to a channel. type ChannelService interface { Config() (ChannelConfig, error) - EventService() (EventService, error) + EventService(opts ...options.Opt) (EventService, error) Membership() (ChannelMembership, error) ChannelConfig() (ChannelCfg, error) } diff --git a/pkg/common/providers/fab/provider.go b/pkg/common/providers/fab/provider.go index e0021f7649..66e78deaaa 100644 --- a/pkg/common/providers/fab/provider.go +++ b/pkg/common/providers/fab/provider.go @@ -32,7 +32,7 @@ type InfraProvider interface { CreateChannelCfg(ctx ClientContext, channelID string) (ChannelCfg, error) CreateChannelTransactor(reqCtx reqContext.Context, cfg ChannelCfg) (Transactor, error) CreateChannelMembership(ctx ClientContext, channelID string) (ChannelMembership, error) - CreateEventService(ctx ClientContext, channelID string) (EventService, error) + CreateEventService(ctx ClientContext, channelID string, opts ...options.Opt) (EventService, error) CreatePeerFromConfig(peerCfg *core.NetworkPeer) (Peer, error) CreateOrdererFromConfig(cfg *core.OrdererConfig) (Orderer, error) CommManager() CommManager diff --git a/pkg/fab/mocks/mockchprovider.go b/pkg/fab/mocks/mockchprovider.go index e48b8eaf8d..a4e1464722 100644 --- a/pkg/fab/mocks/mockchprovider.go +++ b/pkg/fab/mocks/mockchprovider.go @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0 package mocks import ( + "github.com/hyperledger/fabric-sdk-go/pkg/common/options" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/core" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" @@ -67,7 +68,7 @@ func (cs *MockChannelService) SetOrderers(orderers []string) { } // EventService returns a mock event service -func (cs *MockChannelService) EventService() (fab.EventService, error) { +func (cs *MockChannelService) EventService(opts ...options.Opt) (fab.EventService, error) { return NewMockEventService(), nil } diff --git a/pkg/fab/mocks/mockfabricprovider.go b/pkg/fab/mocks/mockfabricprovider.go index 3f040d8aef..a192d21f7e 100644 --- a/pkg/fab/mocks/mockfabricprovider.go +++ b/pkg/fab/mocks/mockfabricprovider.go @@ -11,6 +11,7 @@ import ( reqContext "context" + "github.com/hyperledger/fabric-sdk-go/pkg/common/options" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/core" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" @@ -24,7 +25,7 @@ type MockInfraProvider struct { } // CreateEventService creates the event service. -func (f *MockInfraProvider) CreateEventService(ic fab.ClientContext, channelID string) (fab.EventService, error) { +func (f *MockInfraProvider) CreateEventService(ic fab.ClientContext, channelID string, opts ...options.Opt) (fab.EventService, error) { panic("not implemented") } diff --git a/pkg/fabsdk/provider/chpvdr/chprovider.go b/pkg/fabsdk/provider/chpvdr/chprovider.go index 177fb4d6a4..b0ebad6fef 100644 --- a/pkg/fabsdk/provider/chpvdr/chprovider.go +++ b/pkg/fabsdk/provider/chpvdr/chprovider.go @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0 package chpvdr import ( + "github.com/hyperledger/fabric-sdk-go/pkg/common/options" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" ) @@ -52,8 +53,8 @@ func (cs *ChannelService) Config() (fab.ChannelConfig, error) { } // EventService returns the EventService. -func (cs *ChannelService) EventService() (fab.EventService, error) { - return cs.infraProvider.CreateEventService(cs.context, cs.channelID) +func (cs *ChannelService) EventService(opts ...options.Opt) (fab.EventService, error) { + return cs.infraProvider.CreateEventService(cs.context, cs.channelID, opts...) } // Membership returns the member identifier for this channel diff --git a/pkg/fabsdk/provider/fabpvdr/cachekey.go b/pkg/fabsdk/provider/fabpvdr/cachekey.go index e1f516afd7..76030abab9 100644 --- a/pkg/fabsdk/provider/fabpvdr/cachekey.go +++ b/pkg/fabsdk/provider/fabpvdr/cachekey.go @@ -8,7 +8,9 @@ package fabpvdr import ( "crypto/sha256" + "strconv" + "github.com/hyperledger/fabric-sdk-go/pkg/common/options" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" ) @@ -17,23 +19,50 @@ type CacheKey struct { key string context fab.ClientContext chConfig fab.ChannelCfg + opts []options.Opt +} + +type params struct { + permitBlockEvents bool +} + +func defaultParams() *params { + return ¶ms{} +} + +func (p *params) PermitBlockEvents() { + p.permitBlockEvents = true +} + +type permitBlockEventsSetter interface { + PermitBlockEvents() +} + +func (p *params) getOptKey() string { + // Construct opts portion + optKey := "blockEvents:" + strconv.FormatBool(p.permitBlockEvents) + return optKey } // NewCacheKey returns a new CacheKey -func NewCacheKey(ctx fab.ClientContext, chConfig fab.ChannelCfg) (*CacheKey, error) { +func NewCacheKey(ctx fab.ClientContext, chConfig fab.ChannelCfg, opts ...options.Opt) (*CacheKey, error) { identity, err := ctx.Serialize() if err != nil { return nil, err } + params := defaultParams() + options.Apply(params, opts) + h := sha256.New() h.Write(identity) // nolint - hash := h.Sum([]byte(chConfig.ID())) + hash := h.Sum([]byte(chConfig.ID() + params.getOptKey())) return &CacheKey{ key: string(hash), context: ctx, chConfig: chConfig, + opts: opts, }, nil } @@ -51,3 +80,8 @@ func (k *CacheKey) Context() fab.ClientContext { func (k *CacheKey) ChannelConfig() fab.ChannelCfg { return k.chConfig } + +// Opts returns the options to use for creating events service +func (k *CacheKey) Opts() []options.Opt { + return k.opts +} diff --git a/pkg/fabsdk/provider/fabpvdr/fabpvdr.go b/pkg/fabsdk/provider/fabpvdr/fabpvdr.go index d30d85f86a..b67ec34842 100644 --- a/pkg/fabsdk/provider/fabpvdr/fabpvdr.go +++ b/pkg/fabsdk/provider/fabpvdr/fabpvdr.go @@ -32,6 +32,7 @@ type cacheKey interface { lazycache.Key Context() fab.ClientContext ChannelConfig() fab.ChannelCfg + Opts() []options.Opt } type cache interface { @@ -49,7 +50,7 @@ type InfraProvider struct { } // New creates a InfraProvider enabling access to core Fabric objects and functionality. -func New(config core.Config, opts ...options.Opt) *InfraProvider { +func New(config core.Config) *InfraProvider { idleTime := config.TimeoutOrDefault(core.ConnectionIdle) sweepTime := config.TimeoutOrDefault(core.CacheSweepInterval) eventIdleTime := config.TimeoutOrDefault(core.EventServiceIdle) @@ -63,7 +64,7 @@ func New(config core.Config, opts ...options.Opt) *InfraProvider { return NewEventClientRef( eventIdleTime, func() (fab.EventClient, error) { - return getEventClient(ck.Context(), ck.ChannelConfig(), opts...) + return getEventClient(ck.Context(), ck.ChannelConfig(), ck.Opts()...) }, ), nil }, @@ -106,12 +107,12 @@ func (f *InfraProvider) CommManager() fab.CommManager { } // CreateEventService creates the event service. -func (f *InfraProvider) CreateEventService(ctx fab.ClientContext, channelID string) (fab.EventService, error) { +func (f *InfraProvider) CreateEventService(ctx fab.ClientContext, channelID string, opts ...options.Opt) (fab.EventService, error) { chnlCfg, err := f.CreateChannelCfg(ctx, channelID) if err != nil { return nil, err } - key, err := NewCacheKey(ctx, chnlCfg) + key, err := NewCacheKey(ctx, chnlCfg, opts...) if err != nil { return nil, err } @@ -119,6 +120,7 @@ func (f *InfraProvider) CreateEventService(ctx fab.ClientContext, channelID stri if err != nil { return nil, err } + return eventService.(fab.EventService), nil } diff --git a/test/fixtures/testdata/src/github.com/example_cc/example_cc.go b/test/fixtures/testdata/src/github.com/example_cc/example_cc.go index 17e3a61d95..079c3fe357 100644 --- a/test/fixtures/testdata/src/github.com/example_cc/example_cc.go +++ b/test/fixtures/testdata/src/github.com/example_cc/example_cc.go @@ -103,7 +103,11 @@ func (t *SimpleChaincode) Invoke(stub shim.ChaincodeStubInterface) pb.Response { return t.query(stub, args) } if args[0] == "move" { - if err := stub.SetEvent("testEvent", []byte("Test Payload")); err != nil { + eventID := "testEvent" + if len(args) >= 5 { + eventID = args[4] + } + if err := stub.SetEvent(eventID, []byte("Test Payload")); err != nil { return shim.Error("Unable to set CC event: testEvent. Aborting transaction ...") } return t.move(stub, args) @@ -117,7 +121,7 @@ func (t *SimpleChaincode) move(stub shim.ChaincodeStubInterface, args []string) var Aval, Bval int // Asset holdings var X int // Transaction value var err error - if len(args) != 4 { + if len(args) < 4 { return shim.Error("Incorrect number of arguments. Expecting 4, function followed by 2 names and 1 value") } diff --git a/test/integration/fab/eventclient_test.go b/test/integration/fab/eventclient_test.go index 86e63dfccf..46955ee311 100644 --- a/test/integration/fab/eventclient_test.go +++ b/test/integration/fab/eventclient_test.go @@ -15,11 +15,8 @@ import ( "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/core" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" - "github.com/hyperledger/fabric-sdk-go/pkg/core/config" "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client" "github.com/hyperledger/fabric-sdk-go/pkg/fabsdk" - "github.com/hyperledger/fabric-sdk-go/pkg/fabsdk/factory/defcore" - "github.com/hyperledger/fabric-sdk-go/pkg/fabsdk/provider/fabpvdr" "github.com/pkg/errors" "github.com/stretchr/testify/assert" @@ -52,19 +49,7 @@ func TestEventClient(t *testing.T) { testEventService(t, testSetup, sdk, chainCodeID, false, eventService) }) t.Run("Deliver Block Events", func(t *testing.T) { - // Must create a new SDK that enables block events for the deliver event client - sdk, err := fabsdk.New(config.FromFile(testSetup.ConfigFile), fabsdk.WithCorePkg(&DeliverBlocksProviderFactory{})) - if err != nil { - t.Fatalf("Error creating SDK with block events: %s", err) - } - defer sdk.Close() - - chContextProvider := sdk.ChannelContext(testSetup.ChannelID, fabsdk.WithUser(org1User), fabsdk.WithOrg(org1Name)) - chContext, err := chContextProvider() - if err != nil { - t.Fatalf("error getting channel context: %s", err) - } - eventService, err := chContext.ChannelService().EventService() + eventService, err := chContext.ChannelService().EventService(client.WithBlockEvents()) if err != nil { t.Fatalf("error getting event service: %s", err) } @@ -272,12 +257,3 @@ func createAndSendTransaction(transactor fab.Sender, proposal *fab.TransactionPr return transactionResponse, nil } - -type DeliverBlocksProviderFactory struct { - defcore.ProviderFactory -} - -// CreateInfraProvider returns an InfraProvider that uses block events -func (f *DeliverBlocksProviderFactory) CreateInfraProvider(config core.Config) (fab.InfraProvider, error) { - return fabpvdr.New(config, client.WithBlockEvents()), nil -} diff --git a/test/integration/sdk/channel_client_test.go b/test/integration/sdk/channel_client_test.go index 18a4a970fe..ebff093f42 100644 --- a/test/integration/sdk/channel_client_test.go +++ b/test/integration/sdk/channel_client_test.go @@ -282,7 +282,7 @@ func testChaincodeEvent(ccID string, chClient *channel.Client, t *testing.T) { func testChaincodeEventListener(ccID string, chClient *channel.Client, listener *channel.Client, t *testing.T) { - eventID := "test([a-zA-Z]+)" + eventID := integration.GenerateRandomID() // Register chaincode event (pass in channel which receives event details when the event is complete) reg, notifier, err := listener.RegisterChaincodeEvent(ccID, eventID) @@ -291,7 +291,7 @@ func testChaincodeEventListener(ccID string, chClient *channel.Client, listener } defer chClient.UnregisterChaincodeEvent(reg) - response, err := chClient.Execute(channel.Request{ChaincodeID: ccID, Fcn: "invoke", Args: integration.ExampleCCTxArgs()}) + response, err := chClient.Execute(channel.Request{ChaincodeID: ccID, Fcn: "invoke", Args: append(integration.ExampleCCTxArgs(), []byte(eventID))}) if err != nil { t.Fatalf("Failed to move funds: %s", err) } diff --git a/test/integration/sdk/events_client_test.go b/test/integration/sdk/events_client_test.go new file mode 100644 index 0000000000..7ec7f79027 --- /dev/null +++ b/test/integration/sdk/events_client_test.go @@ -0,0 +1,174 @@ +// +build !prev + +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package sdk + +import ( + "testing" + "time" + + "github.com/hyperledger/fabric-sdk-go/pkg/client/channel" + "github.com/hyperledger/fabric-sdk-go/pkg/client/event" + "github.com/hyperledger/fabric-sdk-go/pkg/fabsdk" + "github.com/hyperledger/fabric-sdk-go/test/integration" +) + +func TestDefaultEventClient(t *testing.T) { + + // using shared SDK instance to increase test speed + sdk := mainSDK + testSetup := mainTestSetup + chaincodeID := mainChaincodeID + + // prepare channel client context + org1ChannelClientContext := sdk.ChannelContext(testSetup.ChannelID, fabsdk.WithUser(org1User), fabsdk.WithOrg(org1Name)) + + // get channel client (used to generate transactions) + chClient, err := channel.New(org1ChannelClientContext) + if err != nil { + t.Fatalf("Failed to create new channel client: %s", err) + } + + // get default event client (with filtered block events) + eventClient, err := event.New(org1ChannelClientContext) + if err != nil { + t.Fatalf("Failed to create new events client: %s", err) + } + + // test register and receive chaincode event (payload is not expected) + testCCEvent(chaincodeID, chClient, eventClient, false, t) + + // test register filter block event + testRegisterFilteredBlockEvent(chaincodeID, chClient, eventClient, t) + + // default event client (with filtered blocks) is not allowed to register for block events + _, _, err = eventClient.RegisterBlockEvent() + if err == nil { + t.Fatalf("Default events client should have failed to register for block events") + } +} + +func TestEventsClientWithBlockEvents(t *testing.T) { + + // using shared SDK instance to increase test speed + sdk := mainSDK + testSetup := mainTestSetup + chaincodeID := mainChaincodeID + + // prepare channel client context + org1ChannelClientContext := sdk.ChannelContext(testSetup.ChannelID, fabsdk.WithUser(org1User), fabsdk.WithOrg(org1Name)) + + // get channel client (used to generate transactions) + chClient, err := channel.New(org1ChannelClientContext) + if err != nil { + t.Fatalf("Failed to create new channel client: %s", err) + } + + // create event client with block events + eventClient, err := event.New(org1ChannelClientContext, event.WithBlockEvents()) + if err != nil { + t.Fatalf("Failed to create new events client with block events: %s", err) + } + + // test register and receive chaincode event (payload is expected since we are set for receiving block events) + testCCEvent(chaincodeID, chClient, eventClient, true, t) + + // test register block and filter block event + testRegisterBlockEvent(chaincodeID, chClient, eventClient, t) + testRegisterFilteredBlockEvent(chaincodeID, chClient, eventClient, t) +} + +func testCCEvent(ccID string, chClient *channel.Client, eventClient *event.Client, expectPayload bool, t *testing.T) { + + eventID := integration.GenerateRandomID() + payload := "Test Payload" + + // Register chaincode event (pass in channel which receives event details when the event is complete) + reg, notifier, err := eventClient.RegisterChaincodeEvent(ccID, eventID) + if err != nil { + t.Fatalf("Failed to register cc event: %s", err) + } + defer eventClient.Unregister(reg) + + response, err := chClient.Execute(channel.Request{ChaincodeID: ccID, Fcn: "invoke", Args: append(integration.ExampleCCTxArgs(), []byte(eventID))}) + if err != nil { + t.Fatalf("Failed to move funds: %s", err) + } + + select { + case ccEvent := <-notifier: + t.Logf("Received cc event: %#v", ccEvent) + if expectPayload && string(ccEvent.Payload[:]) != payload { + t.Fatalf("Did not receive 'Test Payload'") + } + + if !expectPayload && string(ccEvent.Payload[:]) != "" { + t.Fatalf("Expected empty payload, got %s", ccEvent.Payload[:]) + } + if ccEvent.TxID != string(response.TransactionID) { + t.Fatalf("CCEvent(%s) and Execute(%s) transaction IDs don't match", ccEvent.TxID, string(response.TransactionID)) + } + case <-time.After(time.Second * 20): + t.Fatalf("Did NOT receive CC for eventId(%s)\n", eventID) + } +} + +func testRegisterBlockEvent(ccID string, chClient *channel.Client, eventClient *event.Client, t *testing.T) { + + breg, beventch, err := eventClient.RegisterBlockEvent() + if err != nil { + t.Fatalf("Error registering for block events: %s", err) + } + defer eventClient.Unregister(breg) + + response, err := chClient.Execute(channel.Request{ChaincodeID: ccID, Fcn: "invoke", Args: integration.ExampleCCTxArgs()}) + if err != nil { + t.Fatalf("Failed to move funds: %s", err) + } + + select { + case event, ok := <-beventch: + if !ok { + t.Fatalf("unexpected closed channel while waiting for block event") + } + t.Logf("Received block event: %#v", event) + if event.Block == nil { + t.Fatalf("Expecting block in block event but got nil") + } + case <-time.After(time.Second * 20): + t.Fatalf("Did NOT receive block event for txID(%s)\n", response.TransactionID) + } +} + +func testRegisterFilteredBlockEvent(ccID string, chClient *channel.Client, eventClient *event.Client, t *testing.T) { + + fbreg, fbeventch, err := eventClient.RegisterFilteredBlockEvent() + if err != nil { + t.Fatalf("Error registering for block events: %s", err) + } + defer eventClient.Unregister(fbreg) + + response, err := chClient.Execute(channel.Request{ChaincodeID: ccID, Fcn: "invoke", Args: integration.ExampleCCTxArgs()}) + if err != nil { + t.Fatalf("Failed to move funds: %s", err) + } + + select { + case event, ok := <-fbeventch: + if !ok { + t.Fatalf("unexpected closed channel while waiting for filtered block event") + } + if event.FilteredBlock == nil { + t.Fatalf("Expecting filtered block in filtered block event but got nil") + } + t.Logf("Received filtered block event: %#v", event) + + case <-time.After(time.Second * 20): + t.Fatalf("Did NOT receive filtered block event for txID(%s)\n", response.TransactionID) + } +}