Skip to content

Commit

Permalink
block-listener: identify invalid transactions
Browse files Browse the repository at this point in the history
Added check for invalid transactions in block. Removed
rejection event processing as that is no longer needed.

Change-Id: I24aeef3cd18d788af8eb0ea1e2674228ec782947
Signed-off-by: Patrick Mullaney <pm.mullaney@gmail.com>
  • Loading branch information
pmullaney committed Feb 7, 2017
1 parent bc93489 commit 49d0854
Showing 1 changed file with 44 additions and 32 deletions.
76 changes: 44 additions & 32 deletions examples/events/block-listener/block-listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,20 @@ import (
"fmt"
"os"

"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/events/consumer"
"github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protos/utils"
)

type adapter struct {
notfy chan *pb.Event_Block
rejected chan *pb.Event_Rejection
listenToRejections bool
notfy chan *pb.Event_Block
}

//GetInterestedEvents implements consumer.EventAdapter interface for registering interested events
func (a *adapter) GetInterestedEvents() ([]*pb.Interest, error) {
return []*pb.Interest{{EventType: pb.EventType_BLOCK}, {EventType: pb.EventType_REJECTION}}, nil
return []*pb.Interest{{EventType: pb.EventType_BLOCK}}, nil
}

//Recv implements consumer.EventAdapter interface for receiving events
Expand All @@ -44,12 +43,6 @@ func (a *adapter) Recv(msg *pb.Event) (bool, error) {
a.notfy <- o
return true, nil
}
if o, e := msg.Event.(*pb.Event_Rejection); e {
if a.listenToRejections {
a.rejected <- o
}
return true, nil
}
return false, fmt.Errorf("Receive unkown type event: %v", msg)
}

Expand All @@ -59,12 +52,11 @@ func (a *adapter) Disconnected(err error) {
os.Exit(1)
}

func createEventClient(eventAddress string, listenToRejections bool, cid string) *adapter {
func createEventClient(eventAddress string, cid string) *adapter {
var obcEHClient *consumer.EventsClient

done := make(chan *pb.Event_Block)
reject := make(chan *pb.Event_Rejection)
adapter := &adapter{notfy: done, rejected: reject, listenToRejections: listenToRejections}
adapter := &adapter{notfy: done}
obcEHClient, _ = consumer.NewEventsClient(eventAddress, 5, adapter)
if err := obcEHClient.Start(); err != nil {
fmt.Printf("could not start chat %s\n", err)
Expand All @@ -74,6 +66,23 @@ func createEventClient(eventAddress string, listenToRejections bool, cid string)

return adapter
}
func getTxPayload(tdata []byte) (*common.Payload, error) {
if tdata == nil {
return nil, fmt.Errorf("Cannot extract payload from nil transaction")
}

if env, err := utils.GetEnvelopeFromBlock(tdata); err != nil {
return nil, fmt.Errorf("Error getting tx from block(%s)", err)
} else if env != nil {
// get the payload from the envelope
payload, err := utils.GetPayload(env)
if err != nil {
return nil, fmt.Errorf("Could not extract payload from envelope, err %s", err)
}
return payload, nil
}
return nil, nil
}

// getChainCodeEvents parses block events for chaincode events associated with individual transactions
func getChainCodeEvents(tdata []byte) (*pb.ChaincodeEvent, error) {
Expand All @@ -82,7 +91,7 @@ func getChainCodeEvents(tdata []byte) (*pb.ChaincodeEvent, error) {
}

if env, err := utils.GetEnvelopeFromBlock(tdata); err != nil {
return nil, fmt.Errorf("Error getting tx from block(%s)\n", err)
return nil, fmt.Errorf("Error getting tx from block(%s)", err)
} else if env != nil {
// get the payload from the envelope
payload, err := utils.GetPayload(env)
Expand Down Expand Up @@ -119,16 +128,14 @@ func getChainCodeEvents(tdata []byte) (*pb.ChaincodeEvent, error) {

func main() {
var eventAddress string
var listenToRejections bool
var chaincodeID string
flag.StringVar(&eventAddress, "events-address", "0.0.0.0:7053", "address of events server")
flag.BoolVar(&listenToRejections, "listen-to-rejections", false, "whether to listen to rejection events")
flag.StringVar(&chaincodeID, "events-from-chaincode", "", "listen to events from given chaincode")
flag.Parse()

fmt.Printf("Event Address: %s\n", eventAddress)

a := createEventClient(eventAddress, listenToRejections, chaincodeID)
a := createEventClient(eventAddress, chaincodeID)
if a == nil {
fmt.Printf("Error creating event client\n")
return
Expand All @@ -141,24 +148,29 @@ func main() {
fmt.Printf("\n")
fmt.Printf("Received block\n")
fmt.Printf("--------------\n")
for _, r := range b.Block.Data.Data {
fmt.Printf("Transaction:\n\t[%v]\n", r)
if event, err := getChainCodeEvents(r); err == nil {
if event.ChaincodeID == chaincodeID {
fmt.Printf("Received chaincode event\n")
fmt.Printf("------------------------\n")
fmt.Printf("Chaincode Event:%+v\n", event)
txsFltr := util.NewFilterBitArrayFromBytes(b.Block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])

for i, r := range b.Block.Data.Data {
if txsFltr.IsSet(uint(i)) {
tx, _ := getTxPayload(r)
if tx != nil {
fmt.Printf("\n")
fmt.Printf("\n")
fmt.Printf("Received invalid transaction\n")
fmt.Printf("--------------\n")
fmt.Printf("Transaction invalid: TxID: %s\n", tx.Header.ChainHeader.TxID)
}
} else {
fmt.Printf("Transaction:\n\t[%v]\n", r)
if event, err := getChainCodeEvents(r); err == nil {
if len(chaincodeID) != 0 && event.ChaincodeID == chaincodeID {
fmt.Printf("Received chaincode event\n")
fmt.Printf("------------------------\n")
fmt.Printf("Chaincode Event:%+v\n", event)
}
}
}
}
case r := <-a.rejected:
fmt.Printf("\n")
fmt.Printf("\n")
fmt.Printf("Received rejected transaction\n")
fmt.Printf("--------------\n")
//TODO get TxID from pb.ChaincodeHeader from TransactionAction's Header
//fmt.Printf("Transaction error:\n%s\t%s\n", r.Rejection.Tx.Txid, r.Rejection.ErrorMsg)
fmt.Printf("Transaction error:\n%s\n", r.Rejection.ErrorMsg)
}
}
}

0 comments on commit 49d0854

Please sign in to comment.