Skip to content

Commit

Permalink
Fix for block event generation
Browse files Browse the repository at this point in the history
[FAB-1603]Fix for incorrect marshalling of transaction into block and
changed to using utils for marshalling

Change-Id: I84697b6d2c2a140fb57f6ae45e3994e7ee4c4c20
Signed-off-by: Patrick Mullaney <pm.mullaney@gmail.com>
  • Loading branch information
pmullaney committed Jan 16, 2017
1 parent 28841b9 commit ed01846
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 36 deletions.
59 changes: 56 additions & 3 deletions events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import (
"testing"
"time"

"github.com/golang/protobuf/ptypes/timestamp"
"github.com/hyperledger/fabric/events/consumer"
"github.com/hyperledger/fabric/events/producer"
"github.com/hyperledger/fabric/protos/common"
ehpb "github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -81,9 +83,60 @@ func (a *Adapter) Disconnected(err error) {
}
}

func createTestBlock() *common.Block {
func createTestBlock(t *testing.T) *common.Block {
chdr := &common.ChainHeader{
Type: int32(common.HeaderType_ENDORSER_TRANSACTION),
Version: 1,
Timestamp: &timestamp.Timestamp{
Seconds: time.Now().Unix(),
Nanos: 0,
},
ChainID: "test"}
hdr := &common.Header{ChainHeader: chdr}
payload := &common.Payload{Header: hdr}
cea := &ehpb.ChaincodeEndorsedAction{}
ccaPayload := &ehpb.ChaincodeActionPayload{Action: cea}
env := &common.Envelope{}
taa := &ehpb.TransactionAction{}
taas := make([]*ehpb.TransactionAction, 1)
taas[0] = taa
tx := &ehpb.Transaction{Actions: taas}

events := &ehpb.ChaincodeEvent{
ChaincodeID: "ccid",
EventName: "EventName",
Payload: []byte("EventPayload"),
TxID: "TxID"}

pHashBytes := []byte("proposal_hash")
results := []byte("results")
eventBytes, err := utils.GetBytesChaincodeEvent(events)
if err != nil {
t.Fatalf("Failure while marshalling the ProposalResponsePayload")
}
ccaPayload.Action.ProposalResponsePayload, err = utils.GetBytesProposalResponsePayload(pHashBytes, results, eventBytes)
if err != nil {
t.Fatalf("Failure while marshalling the ProposalResponsePayload")
}
tx.Actions[0].Payload, err = utils.GetBytesChaincodeActionPayload(ccaPayload)
if err != nil {
t.Fatalf("Error marshalling tx action payload for block event: %s", err)
}
payload.Data, err = utils.GetBytesTransaction(tx)
if err != nil {
t.Fatalf("Failure while marshalling payload for block event: %s", err)
}
env.Payload, err = utils.GetBytesPayload(payload)
if err != nil {
t.Fatalf("Failure while marshalling tx envelope for block event: %s", err)
}
ebytes, err := utils.GetBytesEnvelope(env)
if err != nil {
t.Fatalf("Failure while marshalling transaction %s", err)
}

block := common.NewBlock(1, []byte{})
block.Data.Data = [][]byte{[]byte("tx1"), []byte("tx2")}
block.Data.Data = append(block.Data.Data, ebytes)
block.Header.DataHash = block.Data.Hash()
return block
}
Expand Down Expand Up @@ -121,7 +174,7 @@ func TestReceiveAnyMessage(t *testing.T) {
var err error

adapter.count = 1
block := createTestBlock()
block := createTestBlock(t)
if err = producer.SendProducerBlockEvent(block); err != nil {
t.Fail()
t.Logf("Error sending message %s", err)
Expand Down
61 changes: 28 additions & 33 deletions events/producer/eventhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package producer
import (
"fmt"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protos/utils"
Expand All @@ -39,8 +38,9 @@ func SendProducerBlockEvent(block *common.Block) error {
bevent.Metadata = block.Metadata
bevent.Data = &common.BlockData{}
for _, d := range block.Data.Data {
if d != nil {
if env, err := utils.GetEnvelopeFromBlock(d); err != nil {
ebytes := d
if ebytes != nil {
if env, err := utils.GetEnvelopeFromBlock(ebytes); err != nil {
logger.Errorf("Error getting tx from block(%s)\n", err)
} else if env != nil {
// get the payload from the envelope
Expand All @@ -52,56 +52,51 @@ func SendProducerBlockEvent(block *common.Block) error {
if common.HeaderType(payload.Header.ChainHeader.Type) == common.HeaderType_ENDORSER_TRANSACTION {
tx, err := utils.GetTransaction(payload.Data)
if err != nil {
logger.Errorf("Error unmarshalling transaction payload for block event: %s", err)
continue
return fmt.Errorf("Error unmarshalling transaction payload for block event: %s", err)
}
chaincodeActionPayload := &pb.ChaincodeActionPayload{}
err = proto.Unmarshal(tx.Actions[0].Payload, chaincodeActionPayload)
chaincodeActionPayload, err := utils.GetChaincodeActionPayload(tx.Actions[0].Payload)
if err != nil {
logger.Errorf("Error unmarshalling transaction action payload for block event: %s", err)
continue
return fmt.Errorf("Error unmarshalling transaction action payload for block event: %s", err)
}

propRespPayload := &pb.ProposalResponsePayload{}
err = proto.Unmarshal(chaincodeActionPayload.Action.ProposalResponsePayload, propRespPayload)
propRespPayload, err := utils.GetProposalResponsePayload(chaincodeActionPayload.Action.ProposalResponsePayload)
if err != nil {
logger.Errorf("Error unmarshalling proposal response payload for block event: %s", err)
continue
return fmt.Errorf("Error unmarshalling proposal response payload for block event: %s", err)
}
//ENDORSER_ACTION, ProposalResponsePayload.Extension field contains ChaincodeAction
caPayload := &pb.ChaincodeAction{}
err = proto.Unmarshal(propRespPayload.Extension, caPayload)
caPayload, err := utils.GetChaincodeAction(propRespPayload.Extension)
if err != nil {
logger.Errorf("Error unmarshalling chaincode action for block event: %s", err)
continue
return fmt.Errorf("Error unmarshalling chaincode action for block event: %s", err)
}
// Drop read write set from transaction before sending block event
// Performance issue with chaincode deploy txs and causes nodejs grpc
// to hit max message size bug
// Dropping the read write set may cause issues for security and
// we will need to revist when event security is addressed
caPayload.Results = nil
propRespPayload.Extension, err = proto.Marshal(caPayload)
chaincodeActionPayload.Action.ProposalResponsePayload, err = utils.GetBytesProposalResponsePayload(propRespPayload.ProposalHash, caPayload.Results, caPayload.Events)
if err != nil {
return fmt.Errorf("Error marshalling tx proposal payload for block event: %s", err)
}
tx.Actions[0].Payload, err = utils.GetBytesChaincodeActionPayload(chaincodeActionPayload)
if err != nil {
logger.Errorf("Error marshalling tx proposal extension payload for block event: %s", err)
continue
return fmt.Errorf("Error marshalling tx action payload for block event: %s", err)
}
// Marshal Transaction again and append to block to be sent
chaincodeActionPayload.Action.ProposalResponsePayload, err = proto.Marshal(propRespPayload)
payload.Data, err = utils.GetBytesTransaction(tx)
if err != nil {
logger.Errorf("Error marshalling tx proposal payload for block event: %s", err)
continue
return fmt.Errorf("Error marshalling payload for block event: %s", err)
}
tx.Actions[0].Payload, err = proto.Marshal(chaincodeActionPayload)
env.Payload, err = utils.GetBytesPayload(payload)
if err != nil {
logger.Errorf("Error marshalling tx action payload for block event: %s", err)
continue
return fmt.Errorf("Error marshalling tx envelope for block event: %s", err)
}
if t, err := proto.Marshal(tx); err == nil {
bevent.Data.Data = append(bevent.Data.Data, t)
logger.Infof("calling sendProducerBlockEvent\n")
} else {
logger.Infof("Cannot marshal transaction %s\n", err)
ebytes, err = utils.GetBytesEnvelope(env)
if err != nil {
return fmt.Errorf("Cannot marshal transaction %s", err)
}
}
}
}
bevent.Data.Data = append(bevent.Data.Data, ebytes)
}
return Send(CreateBlockEvent(bevent))
}
Expand Down

0 comments on commit ed01846

Please sign in to comment.