Skip to content

Commit

Permalink
block-listener: parse chaincode events from block event
Browse files Browse the repository at this point in the history
Client and their associated support libraries should now parse
block events for chaincode events associated with individual
transactions.

Change-Id: I9b12deef293cfe50e7e634301c990d4c95ec7cf2
Signed-off-by: Patrick Mullaney <pm.mullaney@gmail.com>
  • Loading branch information
pmullaney committed Jan 16, 2017
1 parent ed01846 commit 606cc4f
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 23 deletions.
75 changes: 52 additions & 23 deletions examples/events/block-listener/block-listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,19 @@ import (
"os"

"github.com/hyperledger/fabric/events/consumer"
"github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protos/utils"
)

type adapter struct {
notfy chan *pb.Event_Block
rejected chan *pb.Event_Rejection
cEvent chan *pb.Event_ChaincodeEvent
listenToRejections bool
chaincodeID string
}

//GetInterestedEvents implements consumer.EventAdapter interface for registering interested events
func (a *adapter) GetInterestedEvents() ([]*pb.Interest, error) {
if a.chaincodeID != "" {
return []*pb.Interest{
{EventType: pb.EventType_BLOCK},
{EventType: pb.EventType_REJECTION},
{EventType: pb.EventType_CHAINCODE,
RegInfo: &pb.Interest_ChaincodeRegInfo{
ChaincodeRegInfo: &pb.ChaincodeReg{
ChaincodeID: a.chaincodeID,
EventName: ""}}}}, nil
}
return []*pb.Interest{{EventType: pb.EventType_BLOCK}, {EventType: pb.EventType_REJECTION}}, nil
}

Expand All @@ -60,10 +50,6 @@ func (a *adapter) Recv(msg *pb.Event) (bool, error) {
}
return true, nil
}
if o, e := msg.Event.(*pb.Event_ChaincodeEvent); e {
a.cEvent <- o
return true, nil
}
return false, fmt.Errorf("Receive unkown type event: %v", msg)
}

Expand All @@ -78,7 +64,7 @@ func createEventClient(eventAddress string, listenToRejections bool, cid string)

done := make(chan *pb.Event_Block)
reject := make(chan *pb.Event_Rejection)
adapter := &adapter{notfy: done, rejected: reject, listenToRejections: listenToRejections, chaincodeID: cid, cEvent: make(chan *pb.Event_ChaincodeEvent)}
adapter := &adapter{notfy: done, rejected: reject, listenToRejections: listenToRejections}
obcEHClient, _ = consumer.NewEventsClient(eventAddress, 5, adapter)
if err := obcEHClient.Start(); err != nil {
fmt.Printf("could not start chat %s\n", err)
Expand All @@ -89,6 +75,48 @@ func createEventClient(eventAddress string, listenToRejections bool, cid string)
return adapter
}

// getChainCodeEvents parses block events for chaincode events associated with individual transactions
func getChainCodeEvents(tdata []byte) (*pb.ChaincodeEvent, error) {
if tdata == nil {
return nil, fmt.Errorf("Cannot extract payload from nil transaction")
}

if env, err := utils.GetEnvelopeFromBlock(tdata); err != nil {
return nil, fmt.Errorf("Error getting tx from block(%s)\n", err)
} else if env != nil {
// get the payload from the envelope
payload, err := utils.GetPayload(env)
if err != nil {
return nil, fmt.Errorf("Could not extract payload from envelope, err %s", err)
}

if common.HeaderType(payload.Header.ChainHeader.Type) == common.HeaderType_ENDORSER_TRANSACTION {
tx, err := utils.GetTransaction(payload.Data)
if err != nil {
return nil, fmt.Errorf("Error unmarshalling transaction payload for block event: %s", err)
}
chaincodeActionPayload, err := utils.GetChaincodeActionPayload(tx.Actions[0].Payload)
if err != nil {
return nil, fmt.Errorf("Error unmarshalling transaction action payload for block event: %s", err)
}
propRespPayload, err := utils.GetProposalResponsePayload(chaincodeActionPayload.Action.ProposalResponsePayload)
if err != nil {
return nil, fmt.Errorf("Error unmarshalling proposal response payload for block event: %s", err)
}
caPayload, err := utils.GetChaincodeAction(propRespPayload.Extension)
if err != nil {
return nil, fmt.Errorf("Error unmarshalling chaincode action for block event: %s", err)
}
ccEvent, err := utils.GetChaincodeEvents(caPayload.Events)

if ccEvent != nil {
return ccEvent, nil
}
}
}
return nil, fmt.Errorf("No events found")
}

func main() {
var eventAddress string
var listenToRejections bool
Expand All @@ -115,6 +143,13 @@ func main() {
fmt.Printf("--------------\n")
for _, r := range b.Block.Data.Data {
fmt.Printf("Transaction:\n\t[%v]\n", r)
if event, err := getChainCodeEvents(r); err == nil {
if event.ChaincodeID == chaincodeID {
fmt.Printf("Received chaincode event\n")
fmt.Printf("------------------------\n")
fmt.Printf("Chaincode Event:%+v\n", event)
}
}
}
case r := <-a.rejected:
fmt.Printf("\n")
Expand All @@ -124,12 +159,6 @@ func main() {
//TODO get TxID from pb.ChaincodeHeader from TransactionAction's Header
//fmt.Printf("Transaction error:\n%s\t%s\n", r.Rejection.Tx.Txid, r.Rejection.ErrorMsg)
fmt.Printf("Transaction error:\n%s\n", r.Rejection.ErrorMsg)
case ce := <-a.cEvent:
fmt.Printf("\n")
fmt.Printf("\n")
fmt.Printf("Received chaincode event\n")
fmt.Printf("------------------------\n")
fmt.Printf("Chaincode Event:%v\n", ce)
}
}
}
11 changes: 11 additions & 0 deletions protos/utils/proputils.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ func GetChaincodeAction(caBytes []byte) (*peer.ChaincodeAction, error) {
return chaincodeAction, nil
}

// GetChaincodeEvents gets the ChaincodeEvents given chaicnode event bytes
func GetChaincodeEvents(eBytes []byte) (*peer.ChaincodeEvent, error) {
chaincodeEvent := &peer.ChaincodeEvent{}
err := proto.Unmarshal(eBytes, chaincodeEvent)
if err != nil {
return nil, err
}

return chaincodeEvent, nil
}

// GetProposalResponsePayload gets the proposal response payload
func GetProposalResponsePayload(prpBytes []byte) (*peer.ProposalResponsePayload, error) {
prp := &peer.ProposalResponsePayload{}
Expand Down
12 changes: 12 additions & 0 deletions protos/utils/proputils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,18 @@ func TestProposalResponse(t *testing.T) {
return
}

event, err := GetChaincodeEvents(act.Events)
if err != nil {
t.Fatalf("Failure while unmarshalling the ChainCodeEvents")
return
}

// sanity check on the event
if string(event.ChaincodeID) != "ccid" {
t.Fatalf("Invalid actions after unmarshalling")
return
}

pr := &pb.ProposalResponse{
Payload: prpBytes,
Endorsement: &pb.Endorsement{Endorser: []byte("endorser"), Signature: []byte("signature")},
Expand Down

0 comments on commit 606cc4f

Please sign in to comment.