Skip to content

Commit

Permalink
[FABG-718] fix for ChainCodeEvent getting older events
Browse files Browse the repository at this point in the history
- default to 'Newest' and discard first event
when no seek type is provided for a new client

Change-Id: Id5e8b05e340b2c7afe99ef681c383ddde62b6f82
Signed-off-by: Sudesh Shetty <sudesh.shetty@securekey.com>
  • Loading branch information
sudeshrshetty committed Aug 15, 2018
1 parent b25b359 commit beccd9c
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 8 deletions.
16 changes: 12 additions & 4 deletions pkg/fab/events/deliverclient/deliverclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,21 @@ func New(context fabcontext.Client, chConfig fab.ChannelCfg, discoveryService fa
return nil, err
}

dispatcher := dispatcher.New(context, chConfig, discoveryWrapper, params.connProvider, opts...)

//default seek type is `Newest`
if params.seekType == "" {
params.seekType = seek.Newest
//discard (do not publish) next BlockEvent/FilteredBlockEvent in dispatcher, since default seek type 'newest' is
// only needed for block height calculations
dispatcher.DiscardNextEvent()
}

client := &Client{
Client: *client.New(
dispatcher.New(context, chConfig, discoveryWrapper, params.connProvider, opts...),
opts...,
),
Client: *client.New(dispatcher, opts...),
params: *params,
}

client.SetAfterConnectHandler(client.seek)
client.SetBeforeReconnectHandler(client.setSeekFromLastBlockReceived)

Expand Down
1 change: 0 additions & 1 deletion pkg/fab/events/deliverclient/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type params struct {
func defaultParams() *params {
return &params{
connProvider: deliverFilteredProvider,
seekType: seek.Newest,
respTimeout: 5 * time.Second,
}
}
Expand Down
22 changes: 19 additions & 3 deletions pkg/fab/events/service/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@ type HandlerRegistry map[reflect.Type]Handler
// This also avoids the need for synchronization.
type Dispatcher struct {
params
handlers map[reflect.Type]Handler
lastBlockNum uint64
discardNextEvent bool
state int32
eventch chan interface{}
blockRegistrations []*BlockReg
filteredBlockRegistrations []*FilteredBlockReg
handlers map[reflect.Type]Handler
txRegistrations map[string]*TxStatusReg
ccRegistrations map[string]*ChaincodeReg
state int32
lastBlockNum uint64
}

// New creates a new Dispatcher.
Expand Down Expand Up @@ -306,6 +307,11 @@ func (ed *Dispatcher) HandleBlock(block *cb.Block, sourceURL string) {
return
}

if ed.discardNextEvent {
ed.discardNextEvent = false
return
}

ed.publishBlockEvents(block, sourceURL)
ed.publishFilteredBlockEvents(toFilteredBlock(block), sourceURL)
}
Expand All @@ -319,6 +325,11 @@ func (ed *Dispatcher) HandleFilteredBlock(fblock *pb.FilteredBlock, sourceURL st
return
}

if ed.discardNextEvent {
ed.discardNextEvent = false
return
}

logger.Debug("Publishing filtered block event...")
ed.publishFilteredBlockEvents(fblock, sourceURL)
}
Expand Down Expand Up @@ -506,6 +517,11 @@ func (ed *Dispatcher) RegisterHandler(t interface{}, h Handler) {
}
}

//DiscardNextEvent sets if next event needs to be published or not
func (ed *Dispatcher) DiscardNextEvent() {
ed.discardNextEvent = true
}

func getCCKey(ccID, eventFilter string) string {
return ccID + "/" + eventFilter
}
Expand Down
52 changes: 52 additions & 0 deletions test/integration/pkg/client/channel/channel_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func TestChannelClient(t *testing.T) {
testChaincodeEventListener(chaincodeID, chClient, listener, t)

testDuplicateTargets(chaincodeID, chClient, t)

//test if CCEvents for chaincode events are in sync when new channel client are created
// for each transaction
testMultipleClientChaincodeEvent(chaincodeID, t)
}

func testDuplicateTargets(chaincodeID string, chClient *channel.Client, t *testing.T) {
Expand Down Expand Up @@ -390,6 +394,54 @@ func testChaincodeEvent(ccID string, chClient *channel.Client, t *testing.T) {
}
}

//TestMultipleEventClient tests if CCEvents for chaincode events are in sync when new channel client are created
// for each transaction
func testMultipleClientChaincodeEvent(chainCodeID string, t *testing.T) {

channelID := mainTestSetup.ChannelID
eventID := "([a-zA-Z]+)"

for i := 0; i < 10; i++ {

sdk, err := fabsdk.New(integration.ConfigBackend)
if err != nil {
t.Fatalf("Failed to create new SDK: %s", err)
}

chContextProvider := sdk.ChannelContext(channelID, fabsdk.WithUser(org1User), fabsdk.WithOrg(org1Name))

chClient, err := channel.New(chContextProvider)
if err != nil {
t.Fatalf("Failed to create new channel client: %s", err)
}

// Register chaincode event (pass in channel which receives event details when the event is complete)
reg, notifier, err := chClient.RegisterChaincodeEvent(chainCodeID, eventID)
if err != nil {
t.Fatalf("Failed to register cc event: %s", err)
}
defer chClient.UnregisterChaincodeEvent(reg)

// Move funds
resp, err := chClient.Execute(channel.Request{ChaincodeID: chainCodeID, Fcn: "invoke",
Args: integration.ExampleCCTxArgs()}, channel.WithRetry(retry.DefaultChannelOpts))
if err != nil {
t.Fatalf("Failed to move funds: %s", err)
}

txID := resp.TransactionID

var ccEvent *fab.CCEvent
select {
case ccEvent = <-notifier:
t.Logf("Received CC eventID: %#v\n", ccEvent.TxID)
case <-time.After(time.Second * 20):
t.Fatalf("Did NOT receive CC event for eventId(%s)\n", eventID)
}
assert.Equal(t, string(txID), ccEvent.TxID, "mismatched ccEvent.TxID")
}
}

func testChaincodeEventListener(ccID string, chClient *channel.Client, listener *channel.Client, t *testing.T) {

eventID := integration.GenerateRandomID()
Expand Down

0 comments on commit beccd9c

Please sign in to comment.