Skip to content

Commit

Permalink
[FABG-718] integration test seektype default vs newest
Browse files Browse the repository at this point in the history
Change-Id: Ib8839d7fccad48294e64092663ad07c022415604
Signed-off-by: Sudesh Shetty <sudesh.shetty@securekey.com>
  • Loading branch information
sudeshrshetty committed Aug 21, 2018
1 parent 7a85759 commit be3009e
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pkg/fab/events/deliverclient/deliverclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func New(context fabcontext.Client, chConfig fab.ChannelCfg, discoveryService fa
params.seekType = seek.Newest
//discard (do not publish) next BlockEvent/FilteredBlockEvent in dispatcher, since default seek type 'newest' is
// only needed for block height calculations
dispatcher.DiscardNextEvent()
dispatcher.UpdateLastBlockInfoOnly()
}

client := &Client{
Expand Down
2 changes: 0 additions & 2 deletions pkg/fab/events/deliverclient/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ func (p *params) SetSeekType(value seek.Type) {
logger.Debugf("SeekType: %s", value)
if value != "" {
p.seekType = value
} else {
logger.Warnf("SeekType must not be empty. Defaulting to %s", p.seekType)
}
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/fab/events/service/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type HandlerRegistry map[reflect.Type]Handler
type Dispatcher struct {
params
lastBlockNum uint64
discardNextEvent bool
updateLastBlockInfoOnly bool
state int32
eventch chan interface{}
blockRegistrations []*BlockReg
Expand Down Expand Up @@ -307,8 +307,8 @@ func (ed *Dispatcher) HandleBlock(block *cb.Block, sourceURL string) {
return
}

if ed.discardNextEvent {
ed.discardNextEvent = false
if ed.updateLastBlockInfoOnly {
ed.updateLastBlockInfoOnly = false
return
}

Expand All @@ -325,8 +325,8 @@ func (ed *Dispatcher) HandleFilteredBlock(fblock *pb.FilteredBlock, sourceURL st
return
}

if ed.discardNextEvent {
ed.discardNextEvent = false
if ed.updateLastBlockInfoOnly {
ed.updateLastBlockInfoOnly = false
return
}

Expand Down Expand Up @@ -517,9 +517,9 @@ func (ed *Dispatcher) RegisterHandler(t interface{}, h Handler) {
}
}

//DiscardNextEvent sets if next event needs to be published or not
func (ed *Dispatcher) DiscardNextEvent() {
ed.discardNextEvent = true
//UpdateLastBlockInfoOnly sets is next event should only be used for updating last block info.
func (ed *Dispatcher) UpdateLastBlockInfoOnly() {
ed.updateLastBlockInfoOnly = true
}

func getCCKey(ccID, eventFilter string) string {
Expand Down
105 changes: 102 additions & 3 deletions test/integration/pkg/fab/eventclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@ import (

"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient"
"github.com/hyperledger/fabric-sdk-go/pkg/fabsdk"
"github.com/hyperledger/fabric-sdk-go/pkg/util/test"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"

"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/seek"
"github.com/hyperledger/fabric-sdk-go/test/integration"
)

const eventTimeWindow = 60 * time.Second // the maximum amount of time to watch for events.
const eventTimeWindow = 120 * time.Second // the maximum amount of time to watch for events.

func TestEventClient(t *testing.T) {
chainCodeID := mainChaincodeID
Expand Down Expand Up @@ -61,8 +64,6 @@ func testEventService(t *testing.T, testSetup *integration.BaseSetupImpl, sdk *f
}
defer cancel()

tpResponses, prop, txID := sendTxProposal(sdk, testSetup, t, transactor, chainCodeID)

var wg sync.WaitGroup
var numExpected uint32

Expand Down Expand Up @@ -94,6 +95,7 @@ func testEventService(t *testing.T, testSetup *integration.BaseSetupImpl, sdk *f
numExpected++
wg.Add(1)

tpResponses, prop, txID := sendTxProposal(sdk, testSetup, t, transactor, chainCodeID)
txReg, txstatusch, err := eventService.RegisterTxStatusEvent(txID)
if err != nil {
t.Fatalf("Error registering for Tx Status event: %s", err)
Expand Down Expand Up @@ -259,3 +261,100 @@ func createAndSendTransaction(transactor fab.Sender, proposal *fab.TransactionPr

return transactionResponse, nil
}

func TestMultipleEventsBySeekTypes(t *testing.T) {

chainCodeID := mainChaincodeID
testSetup := mainTestSetup

//Run with seek type default and test behaviour

for i := 0; i < 4; i++ {
//create new sdk
sdk, err := fabsdk.New(integration.ConfigBackend)
require.NoError(t, err, "failed to get new sdk instance")

//create new channel context
chContextProvider := sdk.ChannelContext(testSetup.ChannelID, fabsdk.WithUser(org1User), fabsdk.WithOrg(org1Name))
chContext, err := chContextProvider()
require.NoError(t, err, "error getting channel context")

//create new event service with default opts
eventService, err := chContext.ChannelService().EventService()
require.NoError(t, err, "error getting event service")

testChannelEventsSeekOptions(t, testSetup, sdk, chContext, chainCodeID, false, eventService, "")
}

//Run with seek type newest and test behaviour
for i := 0; i < 4; i++ {
//create new sdk
sdk, err := fabsdk.New(integration.ConfigBackend)
require.NoError(t, err, "failed to get new sdk instance")

//create new channel context
chContextProvider := sdk.ChannelContext(testSetup.ChannelID, fabsdk.WithUser(org1User), fabsdk.WithOrg(org1Name))
chContext, err := chContextProvider()
require.NoError(t, err, "error getting channel context")

//create new event service with deliver client opts
eventService, err := chContext.ChannelService().EventService(deliverclient.WithSeekType(seek.Newest))
require.NoError(t, err, "error getting event service")

testChannelEventsSeekOptions(t, testSetup, sdk, chContext, chainCodeID, false, eventService, seek.Newest)
}

}

func testChannelEventsSeekOptions(t *testing.T, testSetup *integration.BaseSetupImpl, sdk *fabsdk.FabricSDK, chContext context.Channel, chainCodeID string, blockEvents bool, eventService fab.EventService, seekType seek.Type) {

defer sdk.Close()

//get transactor
_, cancel, transactor, err := getTransactor(sdk, testSetup.ChannelID, "Admin", testSetup.OrgID)
require.NoError(t, err, "Failed to get channel transactor")
defer cancel()

//register chanicode event
ccreg, cceventch, err := eventService.RegisterChaincodeEvent(chainCodeID, ".*")
require.NoError(t, err, "Error registering for filtered block events")
defer eventService.Unregister(ccreg)

// prepare and commit the transaction to generate events
tpResponses, prop, txID := sendTxProposal(sdk, testSetup, t, transactor, chainCodeID)
_, err = createAndSendTransaction(transactor, prop, tpResponses)
require.NoError(t, err, "First invoke failed err")

var event *fab.CCEvent
var ok bool
select {
case event, ok = <-cceventch:
require.True(t, ok, "unexpected closed channel while waiting for Tx Status event")
require.Equal(t, event.ChaincodeID, chainCodeID, "Expecting event for CC ID [%s] but got event for CC ID [%s]", chainCodeID, event.ChaincodeID)

if blockEvents {
expectedPayload := []byte("Test Payload")
if !bytes.Equal(event.Payload, expectedPayload) {
test.Failf(t, "Expecting payload [%s] but got [%s]", []byte("Test Payload"), event.Payload)
}
} else if event.Payload != nil {
test.Failf(t, "Expecting nil payload for filtered events but got [%s]", event.Payload)
}
if event.SourceURL == "" {
test.Failf(t, "Expecting event source URL but got none")
}
if event.BlockNumber == 0 {
test.Failf(t, "Expecting non-zero block number")
}
case <-time.After(eventTimeWindow):
return
}

//If seek type is newest then the first event we get from event channel is not related to the first transaction happened after registration, it is
//actually latest block from the chain
require.Equal(t, seekType == seek.Newest, txID != event.TxID, "for seek type[%s], txID [%s], event.txID[%s] ,condition didn't match", seekType, txID, event.TxID)

//If seek type is default, then event dispatcher uses first block only for block height calculations, it doesn't publish anything
//to event channel, and first event we get from event channel actually belongs to first transaction after registration.
require.Equal(t, seekType == "", txID == event.TxID, "for seek type[%s], txID [%s], event.txID[%s] ,condition didn't match", seekType, txID, event.TxID)
}

0 comments on commit be3009e

Please sign in to comment.