diff --git a/examples/events/block-listener/block-listener.go b/examples/events/block-listener/block-listener.go index 495eae20b8d..f9a07c33df3 100644 --- a/examples/events/block-listener/block-listener.go +++ b/examples/events/block-listener/block-listener.go @@ -21,6 +21,7 @@ import ( "fmt" "os" + "github.com/hyperledger/fabric/core/ledger/util" "github.com/hyperledger/fabric/events/consumer" "github.com/hyperledger/fabric/protos/common" pb "github.com/hyperledger/fabric/protos/peer" @@ -28,14 +29,12 @@ import ( ) type adapter struct { - notfy chan *pb.Event_Block - rejected chan *pb.Event_Rejection - listenToRejections bool + notfy chan *pb.Event_Block } //GetInterestedEvents implements consumer.EventAdapter interface for registering interested events func (a *adapter) GetInterestedEvents() ([]*pb.Interest, error) { - return []*pb.Interest{{EventType: pb.EventType_BLOCK}, {EventType: pb.EventType_REJECTION}}, nil + return []*pb.Interest{{EventType: pb.EventType_BLOCK}}, nil } //Recv implements consumer.EventAdapter interface for receiving events @@ -44,12 +43,6 @@ func (a *adapter) Recv(msg *pb.Event) (bool, error) { a.notfy <- o return true, nil } - if o, e := msg.Event.(*pb.Event_Rejection); e { - if a.listenToRejections { - a.rejected <- o - } - return true, nil - } return false, fmt.Errorf("Receive unkown type event: %v", msg) } @@ -59,12 +52,11 @@ func (a *adapter) Disconnected(err error) { os.Exit(1) } -func createEventClient(eventAddress string, listenToRejections bool, cid string) *adapter { +func createEventClient(eventAddress string, cid string) *adapter { var obcEHClient *consumer.EventsClient done := make(chan *pb.Event_Block) - reject := make(chan *pb.Event_Rejection) - adapter := &adapter{notfy: done, rejected: reject, listenToRejections: listenToRejections} + adapter := &adapter{notfy: done} obcEHClient, _ = consumer.NewEventsClient(eventAddress, 5, adapter) if err := obcEHClient.Start(); err != nil { fmt.Printf("could not start chat %s\n", err) @@ -74,6 +66,23 @@ func createEventClient(eventAddress string, listenToRejections bool, cid string) return adapter } +func getTxPayload(tdata []byte) (*common.Payload, error) { + if tdata == nil { + return nil, fmt.Errorf("Cannot extract payload from nil transaction") + } + + if env, err := utils.GetEnvelopeFromBlock(tdata); err != nil { + return nil, fmt.Errorf("Error getting tx from block(%s)", err) + } else if env != nil { + // get the payload from the envelope + payload, err := utils.GetPayload(env) + if err != nil { + return nil, fmt.Errorf("Could not extract payload from envelope, err %s", err) + } + return payload, nil + } + return nil, nil +} // getChainCodeEvents parses block events for chaincode events associated with individual transactions func getChainCodeEvents(tdata []byte) (*pb.ChaincodeEvent, error) { @@ -82,7 +91,7 @@ func getChainCodeEvents(tdata []byte) (*pb.ChaincodeEvent, error) { } if env, err := utils.GetEnvelopeFromBlock(tdata); err != nil { - return nil, fmt.Errorf("Error getting tx from block(%s)\n", err) + return nil, fmt.Errorf("Error getting tx from block(%s)", err) } else if env != nil { // get the payload from the envelope payload, err := utils.GetPayload(env) @@ -119,16 +128,14 @@ func getChainCodeEvents(tdata []byte) (*pb.ChaincodeEvent, error) { func main() { var eventAddress string - var listenToRejections bool var chaincodeID string flag.StringVar(&eventAddress, "events-address", "0.0.0.0:7053", "address of events server") - flag.BoolVar(&listenToRejections, "listen-to-rejections", false, "whether to listen to rejection events") flag.StringVar(&chaincodeID, "events-from-chaincode", "", "listen to events from given chaincode") flag.Parse() fmt.Printf("Event Address: %s\n", eventAddress) - a := createEventClient(eventAddress, listenToRejections, chaincodeID) + a := createEventClient(eventAddress, chaincodeID) if a == nil { fmt.Printf("Error creating event client\n") return @@ -141,24 +148,29 @@ func main() { fmt.Printf("\n") fmt.Printf("Received block\n") fmt.Printf("--------------\n") - for _, r := range b.Block.Data.Data { - fmt.Printf("Transaction:\n\t[%v]\n", r) - if event, err := getChainCodeEvents(r); err == nil { - if event.ChaincodeID == chaincodeID { - fmt.Printf("Received chaincode event\n") - fmt.Printf("------------------------\n") - fmt.Printf("Chaincode Event:%+v\n", event) + txsFltr := util.NewFilterBitArrayFromBytes(b.Block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) + + for i, r := range b.Block.Data.Data { + if txsFltr.IsSet(uint(i)) { + tx, _ := getTxPayload(r) + if tx != nil { + fmt.Printf("\n") + fmt.Printf("\n") + fmt.Printf("Received invalid transaction\n") + fmt.Printf("--------------\n") + fmt.Printf("Transaction invalid: TxID: %s\n", tx.Header.ChainHeader.TxID) + } + } else { + fmt.Printf("Transaction:\n\t[%v]\n", r) + if event, err := getChainCodeEvents(r); err == nil { + if len(chaincodeID) != 0 && event.ChaincodeID == chaincodeID { + fmt.Printf("Received chaincode event\n") + fmt.Printf("------------------------\n") + fmt.Printf("Chaincode Event:%+v\n", event) + } } } } - case r := <-a.rejected: - fmt.Printf("\n") - fmt.Printf("\n") - fmt.Printf("Received rejected transaction\n") - fmt.Printf("--------------\n") - //TODO get TxID from pb.ChaincodeHeader from TransactionAction's Header - //fmt.Printf("Transaction error:\n%s\t%s\n", r.Rejection.Tx.Txid, r.Rejection.ErrorMsg) - fmt.Printf("Transaction error:\n%s\n", r.Rejection.ErrorMsg) } } }