diff --git a/examples/events/block-listener/block-listener.go b/examples/events/block-listener/block-listener.go index 39e9298a053..495eae20b8d 100644 --- a/examples/events/block-listener/block-listener.go +++ b/examples/events/block-listener/block-listener.go @@ -22,29 +22,19 @@ import ( "os" "github.com/hyperledger/fabric/events/consumer" + "github.com/hyperledger/fabric/protos/common" pb "github.com/hyperledger/fabric/protos/peer" + "github.com/hyperledger/fabric/protos/utils" ) type adapter struct { notfy chan *pb.Event_Block rejected chan *pb.Event_Rejection - cEvent chan *pb.Event_ChaincodeEvent listenToRejections bool - chaincodeID string } //GetInterestedEvents implements consumer.EventAdapter interface for registering interested events func (a *adapter) GetInterestedEvents() ([]*pb.Interest, error) { - if a.chaincodeID != "" { - return []*pb.Interest{ - {EventType: pb.EventType_BLOCK}, - {EventType: pb.EventType_REJECTION}, - {EventType: pb.EventType_CHAINCODE, - RegInfo: &pb.Interest_ChaincodeRegInfo{ - ChaincodeRegInfo: &pb.ChaincodeReg{ - ChaincodeID: a.chaincodeID, - EventName: ""}}}}, nil - } return []*pb.Interest{{EventType: pb.EventType_BLOCK}, {EventType: pb.EventType_REJECTION}}, nil } @@ -60,10 +50,6 @@ func (a *adapter) Recv(msg *pb.Event) (bool, error) { } return true, nil } - if o, e := msg.Event.(*pb.Event_ChaincodeEvent); e { - a.cEvent <- o - return true, nil - } return false, fmt.Errorf("Receive unkown type event: %v", msg) } @@ -78,7 +64,7 @@ func createEventClient(eventAddress string, listenToRejections bool, cid string) done := make(chan *pb.Event_Block) reject := make(chan *pb.Event_Rejection) - adapter := &adapter{notfy: done, rejected: reject, listenToRejections: listenToRejections, chaincodeID: cid, cEvent: make(chan *pb.Event_ChaincodeEvent)} + adapter := &adapter{notfy: done, rejected: reject, listenToRejections: listenToRejections} obcEHClient, _ = consumer.NewEventsClient(eventAddress, 5, adapter) if err := obcEHClient.Start(); err != nil { fmt.Printf("could not start chat %s\n", err) @@ -89,6 +75,48 @@ func createEventClient(eventAddress string, listenToRejections bool, cid string) return adapter } +// getChainCodeEvents parses block events for chaincode events associated with individual transactions +func getChainCodeEvents(tdata []byte) (*pb.ChaincodeEvent, error) { + if tdata == nil { + return nil, fmt.Errorf("Cannot extract payload from nil transaction") + } + + if env, err := utils.GetEnvelopeFromBlock(tdata); err != nil { + return nil, fmt.Errorf("Error getting tx from block(%s)\n", err) + } else if env != nil { + // get the payload from the envelope + payload, err := utils.GetPayload(env) + if err != nil { + return nil, fmt.Errorf("Could not extract payload from envelope, err %s", err) + } + + if common.HeaderType(payload.Header.ChainHeader.Type) == common.HeaderType_ENDORSER_TRANSACTION { + tx, err := utils.GetTransaction(payload.Data) + if err != nil { + return nil, fmt.Errorf("Error unmarshalling transaction payload for block event: %s", err) + } + chaincodeActionPayload, err := utils.GetChaincodeActionPayload(tx.Actions[0].Payload) + if err != nil { + return nil, fmt.Errorf("Error unmarshalling transaction action payload for block event: %s", err) + } + propRespPayload, err := utils.GetProposalResponsePayload(chaincodeActionPayload.Action.ProposalResponsePayload) + if err != nil { + return nil, fmt.Errorf("Error unmarshalling proposal response payload for block event: %s", err) + } + caPayload, err := utils.GetChaincodeAction(propRespPayload.Extension) + if err != nil { + return nil, fmt.Errorf("Error unmarshalling chaincode action for block event: %s", err) + } + ccEvent, err := utils.GetChaincodeEvents(caPayload.Events) + + if ccEvent != nil { + return ccEvent, nil + } + } + } + return nil, fmt.Errorf("No events found") +} + func main() { var eventAddress string var listenToRejections bool @@ -115,6 +143,13 @@ func main() { fmt.Printf("--------------\n") for _, r := range b.Block.Data.Data { fmt.Printf("Transaction:\n\t[%v]\n", r) + if event, err := getChainCodeEvents(r); err == nil { + if event.ChaincodeID == chaincodeID { + fmt.Printf("Received chaincode event\n") + fmt.Printf("------------------------\n") + fmt.Printf("Chaincode Event:%+v\n", event) + } + } } case r := <-a.rejected: fmt.Printf("\n") @@ -124,12 +159,6 @@ func main() { //TODO get TxID from pb.ChaincodeHeader from TransactionAction's Header //fmt.Printf("Transaction error:\n%s\t%s\n", r.Rejection.Tx.Txid, r.Rejection.ErrorMsg) fmt.Printf("Transaction error:\n%s\n", r.Rejection.ErrorMsg) - case ce := <-a.cEvent: - fmt.Printf("\n") - fmt.Printf("\n") - fmt.Printf("Received chaincode event\n") - fmt.Printf("------------------------\n") - fmt.Printf("Chaincode Event:%v\n", ce) } } } diff --git a/protos/utils/proputils.go b/protos/utils/proputils.go index cb54837d629..adc54f70b4b 100644 --- a/protos/utils/proputils.go +++ b/protos/utils/proputils.go @@ -98,6 +98,17 @@ func GetChaincodeAction(caBytes []byte) (*peer.ChaincodeAction, error) { return chaincodeAction, nil } +// GetChaincodeEvents gets the ChaincodeEvents given chaicnode event bytes +func GetChaincodeEvents(eBytes []byte) (*peer.ChaincodeEvent, error) { + chaincodeEvent := &peer.ChaincodeEvent{} + err := proto.Unmarshal(eBytes, chaincodeEvent) + if err != nil { + return nil, err + } + + return chaincodeEvent, nil +} + // GetProposalResponsePayload gets the proposal response payload func GetProposalResponsePayload(prpBytes []byte) (*peer.ProposalResponsePayload, error) { prp := &peer.ProposalResponsePayload{} diff --git a/protos/utils/proputils_test.go b/protos/utils/proputils_test.go index cb0542c104e..76016c39bbd 100644 --- a/protos/utils/proputils_test.go +++ b/protos/utils/proputils_test.go @@ -165,6 +165,18 @@ func TestProposalResponse(t *testing.T) { return } + event, err := GetChaincodeEvents(act.Events) + if err != nil { + t.Fatalf("Failure while unmarshalling the ChainCodeEvents") + return + } + + // sanity check on the event + if string(event.ChaincodeID) != "ccid" { + t.Fatalf("Invalid actions after unmarshalling") + return + } + pr := &pb.ProposalResponse{ Payload: prpBytes, Endorsement: &pb.Endorsement{Endorser: []byte("endorser"), Signature: []byte("signature")},