From b76434a184fcb983f7605a1b613c1ba7ac98d8c4 Mon Sep 17 00:00:00 2001 From: Bob Stasyszyn Date: Thu, 27 Apr 2017 09:24:44 -0400 Subject: [PATCH] [ FAB-3432] ChaincodeEvent should contain channel ID Change-Id: I06f3ef63c34bf0ad1c52196e7572dce1881ecf60 Signed-off-by: Bob Stasyszyn --- fabric-client/events/eventhub.go | 34 +++--- fabric-client/events/eventhub_test.go | 121 ++++++++++++++++++- fabric-client/events/eventmocks.go | 164 +++++++++++++++++++++++--- 3 files changed, 282 insertions(+), 37 deletions(-) diff --git a/fabric-client/events/eventhub.go b/fabric-client/events/eventhub.go index 749bf73ece..cbffbbca06 100644 --- a/fabric-client/events/eventhub.go +++ b/fabric-client/events/eventhub.go @@ -97,6 +97,7 @@ type ChaincodeEvent struct { TxId string EventName string Payload []byte + ChannelID string } // ChainCodeCBE ... @@ -294,10 +295,10 @@ func (eventHub *eventHub) Recv(msg *pb.Event) (bool, error) { } for _, tdata := range blockEvent.Block.Data.Data { - if ccEvent, err := getChainCodeEvent(tdata); err != nil { + if ccEvent, channelID, err := getChainCodeEvent(tdata); err != nil { logger.Warningf("getChainCodeEvent return error: %v\n", err) } else if ccEvent != nil { - eventHub.notifyChaincodeRegistrants(ccEvent, true) + eventHub.notifyChaincodeRegistrants(channelID, ccEvent, true) } } return true, nil @@ -306,7 +307,7 @@ func (eventHub *eventHub) Recv(msg *pb.Event) (bool, error) { logger.Debugf("Recv ccEvent:%v\n", ccEvent) if ccEvent != nil { - eventHub.notifyChaincodeRegistrants(ccEvent.ChaincodeEvent, false) + eventHub.notifyChaincodeRegistrants("", ccEvent.ChaincodeEvent, false) } return true, nil default: @@ -493,58 +494,60 @@ func (eventHub *eventHub) getTXRegistrant(txID string) func(string, error) { } // getChainCodeEvents parses block events for chaincode events associated with individual transactions -func getChainCodeEvent(tdata []byte) (*pb.ChaincodeEvent, error) { +func getChainCodeEvent(tdata []byte) (event *pb.ChaincodeEvent, channelID string, err error) { if tdata == nil { - return nil, errors.New("Cannot extract payload from nil transaction") + return nil, "", errors.New("Cannot extract payload from nil transaction") } if env, err := utils.GetEnvelopeFromBlock(tdata); err != nil { - return nil, fmt.Errorf("Error getting tx from block(%s)", err) + return nil, "", fmt.Errorf("Error getting tx from block(%s)", err) } else if env != nil { // get the payload from the envelope payload, err := utils.GetPayload(env) if err != nil { - return nil, fmt.Errorf("Could not extract payload from envelope, err %s", err) + return nil, "", fmt.Errorf("Could not extract payload from envelope, err %s", err) } channelHeaderBytes := payload.Header.ChannelHeader channelHeader := &common.ChannelHeader{} err = proto.Unmarshal(channelHeaderBytes, channelHeader) if err != nil { - return nil, fmt.Errorf("Could not extract channel header from envelope, err %s", err) + return nil, "", fmt.Errorf("Could not extract channel header from envelope, err %s", err) } + channelID := channelHeader.ChannelId + // Chaincode events apply to endorser transaction only if common.HeaderType(channelHeader.Type) == common.HeaderType_ENDORSER_TRANSACTION { tx, err := utils.GetTransaction(payload.Data) if err != nil { - return nil, fmt.Errorf("Error unmarshalling transaction payload for block event: %s", err) + return nil, "", fmt.Errorf("Error unmarshalling transaction payload for block event: %s", err) } chaincodeActionPayload, err := utils.GetChaincodeActionPayload(tx.Actions[0].Payload) if err != nil { - return nil, fmt.Errorf("Error unmarshalling transaction action payload for block event: %s", err) + return nil, "", fmt.Errorf("Error unmarshalling transaction action payload for block event: %s", err) } propRespPayload, err := utils.GetProposalResponsePayload(chaincodeActionPayload.Action.ProposalResponsePayload) if err != nil { - return nil, fmt.Errorf("Error unmarshalling proposal response payload for block event: %s", err) + return nil, "", fmt.Errorf("Error unmarshalling proposal response payload for block event: %s", err) } caPayload, err := utils.GetChaincodeAction(propRespPayload.Extension) if err != nil { - return nil, fmt.Errorf("Error unmarshalling chaincode action for block event: %s", err) + return nil, "", fmt.Errorf("Error unmarshalling chaincode action for block event: %s", err) } ccEvent, err := utils.GetChaincodeEvents(caPayload.Events) if ccEvent != nil { - return ccEvent, nil + return ccEvent, channelID, nil } } } - return nil, nil + return nil, "", nil } // Utility function to fire callbacks for chaincode registrants -func (eventHub *eventHub) notifyChaincodeRegistrants(ccEvent *pb.ChaincodeEvent, patternMatch bool) { +func (eventHub *eventHub) notifyChaincodeRegistrants(channelID string, ccEvent *pb.ChaincodeEvent, patternMatch bool) { cbeArray := eventHub.getChaincodeRegistrants(ccEvent.ChaincodeId) if len(cbeArray) <= 0 { @@ -564,6 +567,7 @@ func (eventHub *eventHub) notifyChaincodeRegistrants(ccEvent *pb.ChaincodeEvent, TxId: ccEvent.TxId, EventName: ccEvent.EventName, Payload: ccEvent.Payload, + ChannelID: channelID, }) } } diff --git a/fabric-client/events/eventhub_test.go b/fabric-client/events/eventhub_test.go index 8ddd89bf35..19266c7723 100644 --- a/fabric-client/events/eventhub_test.go +++ b/fabric-client/events/eventhub_test.go @@ -34,6 +34,9 @@ import ( ) func TestDeadlock(t *testing.T) { + channelID := "testchannel" + ccID := "testccid" + eventHub, clientFactory := createMockedEventHub(t) if t.Failed() { return @@ -65,7 +68,10 @@ func TestDeadlock(t *testing.T) { }) go client.MockEvent(&pb.Event{ - Event: buildMockTxEvent(transactionID), + Event: (&MockTxEventBuilder{ + TxID: transactionID, + ChannelID: channelID, + }).Build(), }) // Wait for the TX event and then unregister @@ -78,13 +84,16 @@ func TestDeadlock(t *testing.T) { go flood(eventsPerThread, threads, func() { eventName := generateTxID() received := newCompletionHandler(timeout) - registration := eventHub.RegisterChaincodeEvent("testccid", eventName, func(event *ChaincodeEvent) { + registration := eventHub.RegisterChaincodeEvent(ccID, eventName, func(event *ChaincodeEvent) { ccCompletion.done() received.done() }) go client.MockEvent(&pb.Event{ - Event: buildMockCCEvent("testccid", eventName), + Event: (&MockCCEventBuilder{ + CCID: ccID, + EventName: eventName, + }).Build(), }) // Wait for the CC event and then unregister @@ -109,6 +118,112 @@ func TestDeadlock(t *testing.T) { } } +func TestChaincodeEvent(t *testing.T) { + ccID := "someccid" + eventName := "someevent" + + eventHub, clientFactory := createMockedEventHub(t) + if t.Failed() { + return + } + + fmt.Printf("EventHub Chaincode event test\n") + + client := clientFactory.clients[0] + if client == nil { + t.Fatalf("No client") + } + + eventReceived := make(chan *ChaincodeEvent) + + // Register for CC event + registration := eventHub.RegisterChaincodeEvent(ccID, eventName, func(event *ChaincodeEvent) { + eventReceived <- event + }) + + // Publish CC event + go client.MockEvent(&pb.Event{ + Event: (&MockCCEventBuilder{ + CCID: ccID, + EventName: eventName, + }).Build(), + }) + + // Wait for the CC event + var event *ChaincodeEvent + select { + case event = <-eventReceived: + eventHub.UnregisterChaincodeEvent(registration) + case <-time.After(time.Second * 5): + t.Fatalf("Timed out waiting for CC event") + } + + // Check CC event + if event.ChaincodeId != ccID { + t.Fatalf("Expecting chaincode ID [%s] but got [%s]", ccID, event.ChaincodeId) + } + if event.EventName != eventName { + t.Fatalf("Expecting event name [%s] but got [%s]", eventName, event.EventName) + } +} + +func TestChaincodeBlockEvent(t *testing.T) { + channelID := "somechannelid" + ccID := "someccid" + eventName := "someevent" + txID := generateTxID() + + eventHub, clientFactory := createMockedEventHub(t) + if t.Failed() { + return + } + + client := clientFactory.clients[0] + if client == nil { + t.Fatalf("No client") + } + + eventReceived := make(chan *ChaincodeEvent) + + // Register for CC event + registration := eventHub.RegisterChaincodeEvent(ccID, eventName, func(event *ChaincodeEvent) { + eventReceived <- event + }) + + // Publish CC event + go client.MockEvent(&pb.Event{ + Event: (&MockCCBlockEventBuilder{ + CCID: ccID, + EventName: eventName, + ChannelID: channelID, + TxID: txID, + }).Build(), + }) + + // Wait for CC event + var event *ChaincodeEvent + select { + case event = <-eventReceived: + eventHub.UnregisterChaincodeEvent(registration) + case <-time.After(time.Second * 5): + t.Fatalf("Timed out waiting for CC event") + } + + // Check CC event + if event.ChannelID != channelID { + t.Fatalf("Expecting channel ID [%s] but got [%s]", channelID, event.ChannelID) + } + if event.ChaincodeId != ccID { + t.Fatalf("Expecting chaincode ID [%s] but got [%s]", ccID, event.ChaincodeId) + } + if event.EventName != eventName { + t.Fatalf("Expecting event name [%s] but got [%s]", eventName, event.EventName) + } + if event.TxId == "" { + t.Fatalf("Expecting TxID [%s] but got [%s]", txID, event.TxId) + } +} + // completionHandler waits for a single event with a timeout type completionHandler struct { completed chan bool diff --git a/fabric-client/events/eventmocks.go b/fabric-client/events/eventmocks.go index 81f77d2543..4d2cc5c184 100644 --- a/fabric-client/events/eventmocks.go +++ b/fabric-client/events/eventmocks.go @@ -134,60 +134,186 @@ func createMockedEventHub(t *testing.T) (*eventHub, *mockEventClientFactory) { return eventHub, &clientFactory } -func buildMockTxEvent(txID string) *pb.Event_Block { +// MockTxEventBuilder builds a mock TX event block +type MockTxEventBuilder struct { + ChannelID string + TxID string +} + +// MockCCEventBuilder builds a mock chaincode event +type MockCCEventBuilder struct { + CCID string + EventName string + Payload []byte +} + +// MockCCBlockEventBuilder builds a mock CC event block +type MockCCBlockEventBuilder struct { + CCID string + EventName string + ChannelID string + TxID string + Payload []byte +} + +// Build builds a mock TX event block +func (b *MockTxEventBuilder) Build() *pb.Event_Block { return &pb.Event_Block{ Block: &common.Block{ Header: &common.BlockHeader{}, - Metadata: buildBlockMetadata(), + Metadata: b.buildBlockMetadata(), Data: &common.BlockData{ - Data: [][]byte{util.MarshalOrPanic(buildEnvelope(txID))}, + Data: [][]byte{util.MarshalOrPanic(b.buildEnvelope())}, }, }, } } -func buildMockCCEvent(ccID string, eventName string) *pb.Event_ChaincodeEvent { +func (b *MockTxEventBuilder) buildBlockMetadata() *common.BlockMetadata { + return &common.BlockMetadata{ + Metadata: [][]byte{ + []byte{}, + []byte{}, + b.buildTransactionsFilterMetaDataBytes(), + []byte{}, + }, + } +} + +func (b *MockTxEventBuilder) buildTransactionsFilterMetaDataBytes() []byte { + return []byte(ledger_util.TxValidationFlags{uint8(pb.TxValidationCode_VALID)}) +} + +// Build builds a mock chaincode event +func (b *MockCCEventBuilder) Build() *pb.Event_ChaincodeEvent { return &pb.Event_ChaincodeEvent{ ChaincodeEvent: &pb.ChaincodeEvent{ - ChaincodeId: ccID, - EventName: eventName, + ChaincodeId: b.CCID, + EventName: b.EventName, + Payload: b.Payload, + }, + } +} + +func (b *MockTxEventBuilder) buildEnvelope() *common.Envelope { + return &common.Envelope{ + Payload: util.MarshalOrPanic(b.buildPayload()), + } +} + +func (b *MockTxEventBuilder) buildPayload() *common.Payload { + return &common.Payload{ + Header: &common.Header{ + ChannelHeader: util.MarshalOrPanic(b.buildChannelHeader()), + }, + } +} + +func (b *MockTxEventBuilder) buildChannelHeader() *common.ChannelHeader { + return &common.ChannelHeader{ + TxId: b.TxID, + ChannelId: b.ChannelID, + } +} + +// Build builds a mock chaincode event block +func (b *MockCCBlockEventBuilder) Build() *pb.Event_Block { + return &pb.Event_Block{ + Block: &common.Block{ + Header: &common.BlockHeader{}, + Metadata: b.buildBlockMetadata(), + Data: &common.BlockData{ + Data: [][]byte{util.MarshalOrPanic(b.buildEnvelope())}, + }, }, } } -func buildBlockMetadata() *common.BlockMetadata { +func (b *MockCCBlockEventBuilder) buildBlockMetadata() *common.BlockMetadata { return &common.BlockMetadata{ Metadata: [][]byte{ []byte{}, []byte{}, - buildTransactionsFilterMetaDataBytes(), + b.buildTransactionsFilterMetaDataBytes(), []byte{}, }, } } -func buildTransactionsFilterMetaDataBytes() []byte { - return []byte(ledger_util.TxValidationFlags{uint8(pb.TxValidationCode_VALID)}) -} - -func buildEnvelope(txID string) *common.Envelope { +func (b *MockCCBlockEventBuilder) buildEnvelope() *common.Envelope { return &common.Envelope{ - Payload: util.MarshalOrPanic(buildPayload(txID)), + Payload: util.MarshalOrPanic(b.buildPayload()), } } -func buildPayload(txID string) *common.Payload { +func (b *MockCCBlockEventBuilder) buildTransactionsFilterMetaDataBytes() []byte { + return []byte(ledger_util.TxValidationFlags{uint8(pb.TxValidationCode_VALID)}) +} + +func (b *MockCCBlockEventBuilder) buildPayload() *common.Payload { + fmt.Printf("MockCCBlockEventBuilder.buildPayload\n") return &common.Payload{ Header: &common.Header{ - ChannelHeader: util.MarshalOrPanic(buildChannelHeader(txID)), + ChannelHeader: util.MarshalOrPanic(b.buildChannelHeader()), }, + Data: util.MarshalOrPanic(b.buildTransaction()), } } -func buildChannelHeader(txID string) *common.ChannelHeader { +func (b *MockCCBlockEventBuilder) buildChannelHeader() *common.ChannelHeader { return &common.ChannelHeader{ - TxId: txID, - ChannelId: "testchannel", + Type: int32(common.HeaderType_ENDORSER_TRANSACTION), + TxId: b.TxID, + ChannelId: b.ChannelID, + } +} + +func (b *MockCCBlockEventBuilder) buildTransaction() *pb.Transaction { + return &pb.Transaction{ + Actions: []*pb.TransactionAction{b.buildTransactionAction()}, + } +} + +func (b *MockCCBlockEventBuilder) buildTransactionAction() *pb.TransactionAction { + return &pb.TransactionAction{ + Header: []byte{}, + Payload: util.MarshalOrPanic(b.buildChaincodeActionPayload()), + } +} + +func (b *MockCCBlockEventBuilder) buildChaincodeActionPayload() *pb.ChaincodeActionPayload { + return &pb.ChaincodeActionPayload{ + Action: b.buildChaincodeEndorsedAction(), + ChaincodeProposalPayload: []byte{}, + } +} + +func (b *MockCCBlockEventBuilder) buildChaincodeEndorsedAction() *pb.ChaincodeEndorsedAction { + return &pb.ChaincodeEndorsedAction{ + ProposalResponsePayload: util.MarshalOrPanic(b.buildProposalResponsePayload()), + Endorsements: []*pb.Endorsement{}, + } +} + +func (b *MockCCBlockEventBuilder) buildProposalResponsePayload() *pb.ProposalResponsePayload { + return &pb.ProposalResponsePayload{ + ProposalHash: []byte("somehash"), + Extension: util.MarshalOrPanic(b.buildChaincodeAction()), + } +} + +func (b *MockCCBlockEventBuilder) buildChaincodeAction() *pb.ChaincodeAction { + return &pb.ChaincodeAction{ + Events: util.MarshalOrPanic(b.buildChaincodeEvent()), + } +} + +func (b *MockCCBlockEventBuilder) buildChaincodeEvent() *pb.ChaincodeEvent { + return &pb.ChaincodeEvent{ + ChaincodeId: b.CCID, + EventName: b.EventName, + TxId: b.TxID, + Payload: b.Payload, } }