Skip to content

Commit

Permalink
Enable block event generation
Browse files Browse the repository at this point in the history
This changeset enables block event generation support for 1.0

Change-Id: Id50c5e8fdd63c1e4d693a90b5438d9c8fa26de3c
Signed-off-by: Patrick Mullaney <pm.mullaney@gmail.com>
  • Loading branch information
pmullaney committed Dec 8, 2016
1 parent 85456c0 commit ebb3cb9
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 13 deletions.
4 changes: 4 additions & 0 deletions core/committer/noopssinglechain/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/hyperledger/fabric/core/committer"
"github.com/hyperledger/fabric/core/ledger/kvledger"
"github.com/hyperledger/fabric/core/util"
"github.com/hyperledger/fabric/events/producer"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/orderer"
putils "github.com/hyperledger/fabric/protos/utils"
Expand Down Expand Up @@ -327,6 +328,9 @@ func (d *DeliverService) readUntilClose() {
// Gossip messages with other nodes
logger.Debugf("Gossiping block [%d], peers number [%d]", seqNum, numberOfPeers)
d.gossip.Gossip(gossipMsg)
if err = producer.SendProducerBlockEvent(block); err != nil {
logger.Errorf("Error sending block event %s", err)
}

d.unAcknowledged++
if d.unAcknowledged >= d.windowSize/2 {
Expand Down
11 changes: 5 additions & 6 deletions events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,11 @@ func (a *Adapter) Disconnected(err error) {
}
}

func createTestBlock() *ehpb.Event {
func createTestBlock() *common.Block {
block := common.NewBlock(1, []byte{})
block.Data.Data = [][]byte{[]byte("tx1"), []byte("tx2")}
block.Header.DataHash = block.Data.Hash()
emsg := producer.CreateBlockEvent(block)
return emsg
return block
}

func createTestChaincodeEvent(tid string, typ string) *ehpb.Event {
Expand Down Expand Up @@ -122,13 +121,13 @@ func TestReceiveAnyMessage(t *testing.T) {
var err error

adapter.count = 1
emsg := createTestBlock()
if err = producer.Send(emsg); err != nil {
block := createTestBlock()
if err = producer.SendProducerBlockEvent(block); err != nil {
t.Fail()
t.Logf("Error sending message %s", err)
}

emsg = createTestChaincodeEvent("0xffffffff", "event2")
emsg := createTestChaincodeEvent("0xffffffff", "event2")
if err = producer.Send(emsg); err != nil {
t.Fail()
t.Logf("Error sending message %s", err)
Expand Down
99 changes: 92 additions & 7 deletions events/producer/eventhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,106 @@ limitations under the License.
package producer

import (
"fmt"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/protos/common"
ehpb "github.com/hyperledger/fabric/protos/peer"
pb "github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/op/go-logging"
)

var logger *logging.Logger // package-level logger

func init() {
logger = logging.MustGetLogger("eventhub_producer")
}

// SendProducerBlockEvent sends block event to clients
func SendProducerBlockEvent(block *common.Block) error {
bevent := &common.Block{}
bevent.Header = block.Header
bevent.Metadata = block.Metadata
bevent.Data = &common.BlockData{}
for _, d := range block.Data.Data {
if d != nil {
if env, err := utils.GetEnvelopeFromBlock(d); err != nil {
logger.Errorf("Error getting tx from block(%s)\n", err)
} else if env != nil {
// get the payload from the envelope
payload, err := utils.GetPayload(env)
if err != nil {
return fmt.Errorf("Could not extract payload from envelope, err %s", err)
}

if common.HeaderType(payload.Header.ChainHeader.Type) == common.HeaderType_ENDORSER_TRANSACTION {
tx, err := utils.GetTransaction(payload.Data)
if err != nil {
logger.Errorf("Error unmarshalling transaction payload for block event: %s", err)
continue
}
chaincodeActionPayload := &pb.ChaincodeActionPayload{}
err = proto.Unmarshal(tx.Actions[0].Payload, chaincodeActionPayload)
if err != nil {
logger.Errorf("Error unmarshalling transaction action payload for block event: %s", err)
continue
}

propRespPayload := &pb.ProposalResponsePayload{}
err = proto.Unmarshal(chaincodeActionPayload.Action.ProposalResponsePayload, propRespPayload)
if err != nil {
logger.Errorf("Error unmarshalling proposal response payload for block event: %s", err)
continue
}
//ENDORSER_ACTION, ProposalResponsePayload.Extension field contains ChaincodeAction
caPayload := &pb.ChaincodeAction{}
err = proto.Unmarshal(propRespPayload.Extension, caPayload)
if err != nil {
logger.Errorf("Error unmarshalling chaincode action for block event: %s", err)
continue
}
// Drop read write set from transaction before sending block event
caPayload.Results = nil
propRespPayload.Extension, err = proto.Marshal(caPayload)
if err != nil {
logger.Errorf("Error marshalling tx proposal extension payload for block event: %s", err)
continue
}
// Marshal Transaction again and append to block to be sent
chaincodeActionPayload.Action.ProposalResponsePayload, err = proto.Marshal(propRespPayload)
if err != nil {
logger.Errorf("Error marshalling tx proposal payload for block event: %s", err)
continue
}
tx.Actions[0].Payload, err = proto.Marshal(chaincodeActionPayload)
if err != nil {
logger.Errorf("Error marshalling tx action payload for block event: %s", err)
continue
}
if t, err := proto.Marshal(tx); err == nil {
bevent.Data.Data = append(bevent.Data.Data, t)
logger.Infof("calling sendProducerBlockEvent\n")
} else {
logger.Infof("Cannot marshal transaction %s\n", err)
}
}
}
}
}
return Send(CreateBlockEvent(bevent))
}

//CreateBlockEvent creates a Event from a Block
func CreateBlockEvent(te *common.Block) *ehpb.Event {
return &ehpb.Event{Event: &ehpb.Event_Block{Block: te}}
func CreateBlockEvent(te *common.Block) *pb.Event {
return &pb.Event{Event: &pb.Event_Block{Block: te}}
}

//CreateChaincodeEvent creates a Event from a ChaincodeEvent
func CreateChaincodeEvent(te *ehpb.ChaincodeEvent) *ehpb.Event {
return &ehpb.Event{Event: &ehpb.Event_ChaincodeEvent{ChaincodeEvent: te}}
func CreateChaincodeEvent(te *pb.ChaincodeEvent) *pb.Event {
return &pb.Event{Event: &pb.Event_ChaincodeEvent{ChaincodeEvent: te}}
}

//CreateRejectionEvent creates an Event from TxResults
func CreateRejectionEvent(tx *ehpb.Transaction, errorMsg string) *ehpb.Event {
return &ehpb.Event{Event: &ehpb.Event_Rejection{Rejection: &ehpb.Rejection{Tx: tx, ErrorMsg: errorMsg}}}
func CreateRejectionEvent(tx *pb.Transaction, errorMsg string) *pb.Event {
return &pb.Event{Event: &pb.Event_Rejection{Rejection: &pb.Rejection{Tx: tx, ErrorMsg: errorMsg}}}
}

0 comments on commit ebb3cb9

Please sign in to comment.