diff --git a/events/events_test.go b/events/events_test.go index 77516833451..8d9a109f1ae 100644 --- a/events/events_test.go +++ b/events/events_test.go @@ -24,10 +24,12 @@ import ( "testing" "time" + "github.com/golang/protobuf/ptypes/timestamp" "github.com/hyperledger/fabric/events/consumer" "github.com/hyperledger/fabric/events/producer" "github.com/hyperledger/fabric/protos/common" ehpb "github.com/hyperledger/fabric/protos/peer" + "github.com/hyperledger/fabric/protos/utils" "github.com/spf13/viper" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -81,9 +83,60 @@ func (a *Adapter) Disconnected(err error) { } } -func createTestBlock() *common.Block { +func createTestBlock(t *testing.T) *common.Block { + chdr := &common.ChainHeader{ + Type: int32(common.HeaderType_ENDORSER_TRANSACTION), + Version: 1, + Timestamp: ×tamp.Timestamp{ + Seconds: time.Now().Unix(), + Nanos: 0, + }, + ChainID: "test"} + hdr := &common.Header{ChainHeader: chdr} + payload := &common.Payload{Header: hdr} + cea := &ehpb.ChaincodeEndorsedAction{} + ccaPayload := &ehpb.ChaincodeActionPayload{Action: cea} + env := &common.Envelope{} + taa := &ehpb.TransactionAction{} + taas := make([]*ehpb.TransactionAction, 1) + taas[0] = taa + tx := &ehpb.Transaction{Actions: taas} + + events := &ehpb.ChaincodeEvent{ + ChaincodeID: "ccid", + EventName: "EventName", + Payload: []byte("EventPayload"), + TxID: "TxID"} + + pHashBytes := []byte("proposal_hash") + results := []byte("results") + eventBytes, err := utils.GetBytesChaincodeEvent(events) + if err != nil { + t.Fatalf("Failure while marshalling the ProposalResponsePayload") + } + ccaPayload.Action.ProposalResponsePayload, err = utils.GetBytesProposalResponsePayload(pHashBytes, results, eventBytes) + if err != nil { + t.Fatalf("Failure while marshalling the ProposalResponsePayload") + } + tx.Actions[0].Payload, err = utils.GetBytesChaincodeActionPayload(ccaPayload) + if err != nil { + t.Fatalf("Error marshalling tx action payload for block event: %s", err) + } + payload.Data, err = utils.GetBytesTransaction(tx) + if err != nil { + t.Fatalf("Failure while marshalling payload for block event: %s", err) + } + env.Payload, err = utils.GetBytesPayload(payload) + if err != nil { + t.Fatalf("Failure while marshalling tx envelope for block event: %s", err) + } + ebytes, err := utils.GetBytesEnvelope(env) + if err != nil { + t.Fatalf("Failure while marshalling transaction %s", err) + } + block := common.NewBlock(1, []byte{}) - block.Data.Data = [][]byte{[]byte("tx1"), []byte("tx2")} + block.Data.Data = append(block.Data.Data, ebytes) block.Header.DataHash = block.Data.Hash() return block } @@ -121,7 +174,7 @@ func TestReceiveAnyMessage(t *testing.T) { var err error adapter.count = 1 - block := createTestBlock() + block := createTestBlock(t) if err = producer.SendProducerBlockEvent(block); err != nil { t.Fail() t.Logf("Error sending message %s", err) diff --git a/events/producer/eventhelper.go b/events/producer/eventhelper.go index 902eb4b6650..8aa3b1842f3 100644 --- a/events/producer/eventhelper.go +++ b/events/producer/eventhelper.go @@ -19,7 +19,6 @@ package producer import ( "fmt" - "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/protos/common" pb "github.com/hyperledger/fabric/protos/peer" "github.com/hyperledger/fabric/protos/utils" @@ -39,8 +38,9 @@ func SendProducerBlockEvent(block *common.Block) error { bevent.Metadata = block.Metadata bevent.Data = &common.BlockData{} for _, d := range block.Data.Data { - if d != nil { - if env, err := utils.GetEnvelopeFromBlock(d); err != nil { + ebytes := d + if ebytes != nil { + if env, err := utils.GetEnvelopeFromBlock(ebytes); err != nil { logger.Errorf("Error getting tx from block(%s)\n", err) } else if env != nil { // get the payload from the envelope @@ -52,56 +52,51 @@ func SendProducerBlockEvent(block *common.Block) error { if common.HeaderType(payload.Header.ChainHeader.Type) == common.HeaderType_ENDORSER_TRANSACTION { tx, err := utils.GetTransaction(payload.Data) if err != nil { - logger.Errorf("Error unmarshalling transaction payload for block event: %s", err) - continue + return fmt.Errorf("Error unmarshalling transaction payload for block event: %s", err) } - chaincodeActionPayload := &pb.ChaincodeActionPayload{} - err = proto.Unmarshal(tx.Actions[0].Payload, chaincodeActionPayload) + chaincodeActionPayload, err := utils.GetChaincodeActionPayload(tx.Actions[0].Payload) if err != nil { - logger.Errorf("Error unmarshalling transaction action payload for block event: %s", err) - continue + return fmt.Errorf("Error unmarshalling transaction action payload for block event: %s", err) } - - propRespPayload := &pb.ProposalResponsePayload{} - err = proto.Unmarshal(chaincodeActionPayload.Action.ProposalResponsePayload, propRespPayload) + propRespPayload, err := utils.GetProposalResponsePayload(chaincodeActionPayload.Action.ProposalResponsePayload) if err != nil { - logger.Errorf("Error unmarshalling proposal response payload for block event: %s", err) - continue + return fmt.Errorf("Error unmarshalling proposal response payload for block event: %s", err) } //ENDORSER_ACTION, ProposalResponsePayload.Extension field contains ChaincodeAction - caPayload := &pb.ChaincodeAction{} - err = proto.Unmarshal(propRespPayload.Extension, caPayload) + caPayload, err := utils.GetChaincodeAction(propRespPayload.Extension) if err != nil { - logger.Errorf("Error unmarshalling chaincode action for block event: %s", err) - continue + return fmt.Errorf("Error unmarshalling chaincode action for block event: %s", err) } // Drop read write set from transaction before sending block event + // Performance issue with chaincode deploy txs and causes nodejs grpc + // to hit max message size bug + // Dropping the read write set may cause issues for security and + // we will need to revist when event security is addressed caPayload.Results = nil - propRespPayload.Extension, err = proto.Marshal(caPayload) + chaincodeActionPayload.Action.ProposalResponsePayload, err = utils.GetBytesProposalResponsePayload(propRespPayload.ProposalHash, caPayload.Results, caPayload.Events) + if err != nil { + return fmt.Errorf("Error marshalling tx proposal payload for block event: %s", err) + } + tx.Actions[0].Payload, err = utils.GetBytesChaincodeActionPayload(chaincodeActionPayload) if err != nil { - logger.Errorf("Error marshalling tx proposal extension payload for block event: %s", err) - continue + return fmt.Errorf("Error marshalling tx action payload for block event: %s", err) } - // Marshal Transaction again and append to block to be sent - chaincodeActionPayload.Action.ProposalResponsePayload, err = proto.Marshal(propRespPayload) + payload.Data, err = utils.GetBytesTransaction(tx) if err != nil { - logger.Errorf("Error marshalling tx proposal payload for block event: %s", err) - continue + return fmt.Errorf("Error marshalling payload for block event: %s", err) } - tx.Actions[0].Payload, err = proto.Marshal(chaincodeActionPayload) + env.Payload, err = utils.GetBytesPayload(payload) if err != nil { - logger.Errorf("Error marshalling tx action payload for block event: %s", err) - continue + return fmt.Errorf("Error marshalling tx envelope for block event: %s", err) } - if t, err := proto.Marshal(tx); err == nil { - bevent.Data.Data = append(bevent.Data.Data, t) - logger.Infof("calling sendProducerBlockEvent\n") - } else { - logger.Infof("Cannot marshal transaction %s\n", err) + ebytes, err = utils.GetBytesEnvelope(env) + if err != nil { + return fmt.Errorf("Cannot marshal transaction %s", err) } } } } + bevent.Data.Data = append(bevent.Data.Data, ebytes) } return Send(CreateBlockEvent(bevent)) }