Skip to content

Commit

Permalink
[ FAB-3432] ChaincodeEvent should contain channel ID
Browse files Browse the repository at this point in the history
Change-Id: I06f3ef63c34bf0ad1c52196e7572dce1881ecf60
Signed-off-by: Bob Stasyszyn <bob.stasyszyn@securekey.com>
  • Loading branch information
bstasyszyn committed Apr 27, 2017
1 parent f550a1d commit b76434a
Show file tree
Hide file tree
Showing 3 changed files with 282 additions and 37 deletions.
34 changes: 19 additions & 15 deletions fabric-client/events/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type ChaincodeEvent struct {
TxId string
EventName string
Payload []byte
ChannelID string
}

// ChainCodeCBE ...
Expand Down Expand Up @@ -294,10 +295,10 @@ func (eventHub *eventHub) Recv(msg *pb.Event) (bool, error) {
}

for _, tdata := range blockEvent.Block.Data.Data {
if ccEvent, err := getChainCodeEvent(tdata); err != nil {
if ccEvent, channelID, err := getChainCodeEvent(tdata); err != nil {
logger.Warningf("getChainCodeEvent return error: %v\n", err)
} else if ccEvent != nil {
eventHub.notifyChaincodeRegistrants(ccEvent, true)
eventHub.notifyChaincodeRegistrants(channelID, ccEvent, true)
}
}
return true, nil
Expand All @@ -306,7 +307,7 @@ func (eventHub *eventHub) Recv(msg *pb.Event) (bool, error) {
logger.Debugf("Recv ccEvent:%v\n", ccEvent)

if ccEvent != nil {
eventHub.notifyChaincodeRegistrants(ccEvent.ChaincodeEvent, false)
eventHub.notifyChaincodeRegistrants("", ccEvent.ChaincodeEvent, false)
}
return true, nil
default:
Expand Down Expand Up @@ -493,58 +494,60 @@ func (eventHub *eventHub) getTXRegistrant(txID string) func(string, error) {
}

// getChainCodeEvents parses block events for chaincode events associated with individual transactions
func getChainCodeEvent(tdata []byte) (*pb.ChaincodeEvent, error) {
func getChainCodeEvent(tdata []byte) (event *pb.ChaincodeEvent, channelID string, err error) {

if tdata == nil {
return nil, errors.New("Cannot extract payload from nil transaction")
return nil, "", errors.New("Cannot extract payload from nil transaction")
}

if env, err := utils.GetEnvelopeFromBlock(tdata); err != nil {
return nil, fmt.Errorf("Error getting tx from block(%s)", err)
return nil, "", fmt.Errorf("Error getting tx from block(%s)", err)
} else if env != nil {
// get the payload from the envelope
payload, err := utils.GetPayload(env)
if err != nil {
return nil, fmt.Errorf("Could not extract payload from envelope, err %s", err)
return nil, "", fmt.Errorf("Could not extract payload from envelope, err %s", err)
}

channelHeaderBytes := payload.Header.ChannelHeader
channelHeader := &common.ChannelHeader{}
err = proto.Unmarshal(channelHeaderBytes, channelHeader)
if err != nil {
return nil, fmt.Errorf("Could not extract channel header from envelope, err %s", err)
return nil, "", fmt.Errorf("Could not extract channel header from envelope, err %s", err)
}

channelID := channelHeader.ChannelId

// Chaincode events apply to endorser transaction only
if common.HeaderType(channelHeader.Type) == common.HeaderType_ENDORSER_TRANSACTION {
tx, err := utils.GetTransaction(payload.Data)
if err != nil {
return nil, fmt.Errorf("Error unmarshalling transaction payload for block event: %s", err)
return nil, "", fmt.Errorf("Error unmarshalling transaction payload for block event: %s", err)
}
chaincodeActionPayload, err := utils.GetChaincodeActionPayload(tx.Actions[0].Payload)
if err != nil {
return nil, fmt.Errorf("Error unmarshalling transaction action payload for block event: %s", err)
return nil, "", fmt.Errorf("Error unmarshalling transaction action payload for block event: %s", err)
}
propRespPayload, err := utils.GetProposalResponsePayload(chaincodeActionPayload.Action.ProposalResponsePayload)
if err != nil {
return nil, fmt.Errorf("Error unmarshalling proposal response payload for block event: %s", err)
return nil, "", fmt.Errorf("Error unmarshalling proposal response payload for block event: %s", err)
}
caPayload, err := utils.GetChaincodeAction(propRespPayload.Extension)
if err != nil {
return nil, fmt.Errorf("Error unmarshalling chaincode action for block event: %s", err)
return nil, "", fmt.Errorf("Error unmarshalling chaincode action for block event: %s", err)
}
ccEvent, err := utils.GetChaincodeEvents(caPayload.Events)

if ccEvent != nil {
return ccEvent, nil
return ccEvent, channelID, nil
}
}
}
return nil, nil
return nil, "", nil
}

// Utility function to fire callbacks for chaincode registrants
func (eventHub *eventHub) notifyChaincodeRegistrants(ccEvent *pb.ChaincodeEvent, patternMatch bool) {
func (eventHub *eventHub) notifyChaincodeRegistrants(channelID string, ccEvent *pb.ChaincodeEvent, patternMatch bool) {

cbeArray := eventHub.getChaincodeRegistrants(ccEvent.ChaincodeId)
if len(cbeArray) <= 0 {
Expand All @@ -564,6 +567,7 @@ func (eventHub *eventHub) notifyChaincodeRegistrants(ccEvent *pb.ChaincodeEvent,
TxId: ccEvent.TxId,
EventName: ccEvent.EventName,
Payload: ccEvent.Payload,
ChannelID: channelID,
})
}
}
Expand Down
121 changes: 118 additions & 3 deletions fabric-client/events/eventhub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ import (
)

func TestDeadlock(t *testing.T) {
channelID := "testchannel"
ccID := "testccid"

eventHub, clientFactory := createMockedEventHub(t)
if t.Failed() {
return
Expand Down Expand Up @@ -65,7 +68,10 @@ func TestDeadlock(t *testing.T) {
})

go client.MockEvent(&pb.Event{
Event: buildMockTxEvent(transactionID),
Event: (&MockTxEventBuilder{
TxID: transactionID,
ChannelID: channelID,
}).Build(),
})

// Wait for the TX event and then unregister
Expand All @@ -78,13 +84,16 @@ func TestDeadlock(t *testing.T) {
go flood(eventsPerThread, threads, func() {
eventName := generateTxID()
received := newCompletionHandler(timeout)
registration := eventHub.RegisterChaincodeEvent("testccid", eventName, func(event *ChaincodeEvent) {
registration := eventHub.RegisterChaincodeEvent(ccID, eventName, func(event *ChaincodeEvent) {
ccCompletion.done()
received.done()
})

go client.MockEvent(&pb.Event{
Event: buildMockCCEvent("testccid", eventName),
Event: (&MockCCEventBuilder{
CCID: ccID,
EventName: eventName,
}).Build(),
})

// Wait for the CC event and then unregister
Expand All @@ -109,6 +118,112 @@ func TestDeadlock(t *testing.T) {
}
}

func TestChaincodeEvent(t *testing.T) {
ccID := "someccid"
eventName := "someevent"

eventHub, clientFactory := createMockedEventHub(t)
if t.Failed() {
return
}

fmt.Printf("EventHub Chaincode event test\n")

client := clientFactory.clients[0]
if client == nil {
t.Fatalf("No client")
}

eventReceived := make(chan *ChaincodeEvent)

// Register for CC event
registration := eventHub.RegisterChaincodeEvent(ccID, eventName, func(event *ChaincodeEvent) {
eventReceived <- event
})

// Publish CC event
go client.MockEvent(&pb.Event{
Event: (&MockCCEventBuilder{
CCID: ccID,
EventName: eventName,
}).Build(),
})

// Wait for the CC event
var event *ChaincodeEvent
select {
case event = <-eventReceived:
eventHub.UnregisterChaincodeEvent(registration)
case <-time.After(time.Second * 5):
t.Fatalf("Timed out waiting for CC event")
}

// Check CC event
if event.ChaincodeId != ccID {
t.Fatalf("Expecting chaincode ID [%s] but got [%s]", ccID, event.ChaincodeId)
}
if event.EventName != eventName {
t.Fatalf("Expecting event name [%s] but got [%s]", eventName, event.EventName)
}
}

func TestChaincodeBlockEvent(t *testing.T) {
channelID := "somechannelid"
ccID := "someccid"
eventName := "someevent"
txID := generateTxID()

eventHub, clientFactory := createMockedEventHub(t)
if t.Failed() {
return
}

client := clientFactory.clients[0]
if client == nil {
t.Fatalf("No client")
}

eventReceived := make(chan *ChaincodeEvent)

// Register for CC event
registration := eventHub.RegisterChaincodeEvent(ccID, eventName, func(event *ChaincodeEvent) {
eventReceived <- event
})

// Publish CC event
go client.MockEvent(&pb.Event{
Event: (&MockCCBlockEventBuilder{
CCID: ccID,
EventName: eventName,
ChannelID: channelID,
TxID: txID,
}).Build(),
})

// Wait for CC event
var event *ChaincodeEvent
select {
case event = <-eventReceived:
eventHub.UnregisterChaincodeEvent(registration)
case <-time.After(time.Second * 5):
t.Fatalf("Timed out waiting for CC event")
}

// Check CC event
if event.ChannelID != channelID {
t.Fatalf("Expecting channel ID [%s] but got [%s]", channelID, event.ChannelID)
}
if event.ChaincodeId != ccID {
t.Fatalf("Expecting chaincode ID [%s] but got [%s]", ccID, event.ChaincodeId)
}
if event.EventName != eventName {
t.Fatalf("Expecting event name [%s] but got [%s]", eventName, event.EventName)
}
if event.TxId == "" {
t.Fatalf("Expecting TxID [%s] but got [%s]", txID, event.TxId)
}
}

// completionHandler waits for a single event with a timeout
type completionHandler struct {
completed chan bool
Expand Down
Loading

0 comments on commit b76434a

Please sign in to comment.