From ebb3cb9ee3abc946c9607aaa6d8e3ec17dd86684 Mon Sep 17 00:00:00 2001 From: Patrick Mullaney Date: Thu, 8 Dec 2016 13:13:13 -0500 Subject: [PATCH] Enable block event generation This changeset enables block event generation support for 1.0 Change-Id: Id50c5e8fdd63c1e4d693a90b5438d9c8fa26de3c Signed-off-by: Patrick Mullaney --- core/committer/noopssinglechain/client.go | 4 + events/events_test.go | 11 ++- events/producer/eventhelper.go | 99 +++++++++++++++++++++-- 3 files changed, 101 insertions(+), 13 deletions(-) diff --git a/core/committer/noopssinglechain/client.go b/core/committer/noopssinglechain/client.go index 09c4eb73228..d715d33ee98 100644 --- a/core/committer/noopssinglechain/client.go +++ b/core/committer/noopssinglechain/client.go @@ -24,6 +24,7 @@ import ( "github.com/hyperledger/fabric/core/committer" "github.com/hyperledger/fabric/core/ledger/kvledger" "github.com/hyperledger/fabric/core/util" + "github.com/hyperledger/fabric/events/producer" "github.com/hyperledger/fabric/protos/common" "github.com/hyperledger/fabric/protos/orderer" putils "github.com/hyperledger/fabric/protos/utils" @@ -327,6 +328,9 @@ func (d *DeliverService) readUntilClose() { // Gossip messages with other nodes logger.Debugf("Gossiping block [%d], peers number [%d]", seqNum, numberOfPeers) d.gossip.Gossip(gossipMsg) + if err = producer.SendProducerBlockEvent(block); err != nil { + logger.Errorf("Error sending block event %s", err) + } d.unAcknowledged++ if d.unAcknowledged >= d.windowSize/2 { diff --git a/events/events_test.go b/events/events_test.go index f79de901baa..77516833451 100644 --- a/events/events_test.go +++ b/events/events_test.go @@ -81,12 +81,11 @@ func (a *Adapter) Disconnected(err error) { } } -func createTestBlock() *ehpb.Event { +func createTestBlock() *common.Block { block := common.NewBlock(1, []byte{}) block.Data.Data = [][]byte{[]byte("tx1"), []byte("tx2")} block.Header.DataHash = block.Data.Hash() - emsg := producer.CreateBlockEvent(block) - return emsg + return block } func createTestChaincodeEvent(tid string, typ string) *ehpb.Event { @@ -122,13 +121,13 @@ func TestReceiveAnyMessage(t *testing.T) { var err error adapter.count = 1 - emsg := createTestBlock() - if err = producer.Send(emsg); err != nil { + block := createTestBlock() + if err = producer.SendProducerBlockEvent(block); err != nil { t.Fail() t.Logf("Error sending message %s", err) } - emsg = createTestChaincodeEvent("0xffffffff", "event2") + emsg := createTestChaincodeEvent("0xffffffff", "event2") if err = producer.Send(emsg); err != nil { t.Fail() t.Logf("Error sending message %s", err) diff --git a/events/producer/eventhelper.go b/events/producer/eventhelper.go index bc66f4f798c..902eb4b6650 100644 --- a/events/producer/eventhelper.go +++ b/events/producer/eventhelper.go @@ -17,21 +17,106 @@ limitations under the License. package producer import ( + "fmt" + + "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/protos/common" - ehpb "github.com/hyperledger/fabric/protos/peer" + pb "github.com/hyperledger/fabric/protos/peer" + "github.com/hyperledger/fabric/protos/utils" + "github.com/op/go-logging" ) +var logger *logging.Logger // package-level logger + +func init() { + logger = logging.MustGetLogger("eventhub_producer") +} + +// SendProducerBlockEvent sends block event to clients +func SendProducerBlockEvent(block *common.Block) error { + bevent := &common.Block{} + bevent.Header = block.Header + bevent.Metadata = block.Metadata + bevent.Data = &common.BlockData{} + for _, d := range block.Data.Data { + if d != nil { + if env, err := utils.GetEnvelopeFromBlock(d); err != nil { + logger.Errorf("Error getting tx from block(%s)\n", err) + } else if env != nil { + // get the payload from the envelope + payload, err := utils.GetPayload(env) + if err != nil { + return fmt.Errorf("Could not extract payload from envelope, err %s", err) + } + + if common.HeaderType(payload.Header.ChainHeader.Type) == common.HeaderType_ENDORSER_TRANSACTION { + tx, err := utils.GetTransaction(payload.Data) + if err != nil { + logger.Errorf("Error unmarshalling transaction payload for block event: %s", err) + continue + } + chaincodeActionPayload := &pb.ChaincodeActionPayload{} + err = proto.Unmarshal(tx.Actions[0].Payload, chaincodeActionPayload) + if err != nil { + logger.Errorf("Error unmarshalling transaction action payload for block event: %s", err) + continue + } + + propRespPayload := &pb.ProposalResponsePayload{} + err = proto.Unmarshal(chaincodeActionPayload.Action.ProposalResponsePayload, propRespPayload) + if err != nil { + logger.Errorf("Error unmarshalling proposal response payload for block event: %s", err) + continue + } + //ENDORSER_ACTION, ProposalResponsePayload.Extension field contains ChaincodeAction + caPayload := &pb.ChaincodeAction{} + err = proto.Unmarshal(propRespPayload.Extension, caPayload) + if err != nil { + logger.Errorf("Error unmarshalling chaincode action for block event: %s", err) + continue + } + // Drop read write set from transaction before sending block event + caPayload.Results = nil + propRespPayload.Extension, err = proto.Marshal(caPayload) + if err != nil { + logger.Errorf("Error marshalling tx proposal extension payload for block event: %s", err) + continue + } + // Marshal Transaction again and append to block to be sent + chaincodeActionPayload.Action.ProposalResponsePayload, err = proto.Marshal(propRespPayload) + if err != nil { + logger.Errorf("Error marshalling tx proposal payload for block event: %s", err) + continue + } + tx.Actions[0].Payload, err = proto.Marshal(chaincodeActionPayload) + if err != nil { + logger.Errorf("Error marshalling tx action payload for block event: %s", err) + continue + } + if t, err := proto.Marshal(tx); err == nil { + bevent.Data.Data = append(bevent.Data.Data, t) + logger.Infof("calling sendProducerBlockEvent\n") + } else { + logger.Infof("Cannot marshal transaction %s\n", err) + } + } + } + } + } + return Send(CreateBlockEvent(bevent)) +} + //CreateBlockEvent creates a Event from a Block -func CreateBlockEvent(te *common.Block) *ehpb.Event { - return &ehpb.Event{Event: &ehpb.Event_Block{Block: te}} +func CreateBlockEvent(te *common.Block) *pb.Event { + return &pb.Event{Event: &pb.Event_Block{Block: te}} } //CreateChaincodeEvent creates a Event from a ChaincodeEvent -func CreateChaincodeEvent(te *ehpb.ChaincodeEvent) *ehpb.Event { - return &ehpb.Event{Event: &ehpb.Event_ChaincodeEvent{ChaincodeEvent: te}} +func CreateChaincodeEvent(te *pb.ChaincodeEvent) *pb.Event { + return &pb.Event{Event: &pb.Event_ChaincodeEvent{ChaincodeEvent: te}} } //CreateRejectionEvent creates an Event from TxResults -func CreateRejectionEvent(tx *ehpb.Transaction, errorMsg string) *ehpb.Event { - return &ehpb.Event{Event: &ehpb.Event_Rejection{Rejection: &ehpb.Rejection{Tx: tx, ErrorMsg: errorMsg}}} +func CreateRejectionEvent(tx *pb.Transaction, errorMsg string) *pb.Event { + return &pb.Event{Event: &pb.Event_Rejection{Rejection: &pb.Rejection{Tx: tx, ErrorMsg: errorMsg}}} }