Skip to content

Commit

Permalink
[FAB-7695]: Restructure filtered events proto msg.
Browse files Browse the repository at this point in the history
Currently FilteredBlock proto message inlcude the common.HeaderType
information, while this information should be available per transaction
base. E.g. instead of having:

message FilteredBlock {
    string channel_id = 1;
    uint64 number = 2; // The position in the blockchain
    common.HeaderType type = 3;
    repeated FilteredTransaction filtered_tx = 4;
}

this commit moves transaction header type into FilteredTransaction and
introduces a new extensible field to accomodate possible future
extenstion for more transaction types:

message FilteredTransaction {
    string txid = 1;
    common.HeaderType type = 2;
    TxValidationCode tx_validation_code = 3;
    oneof Data {
        FilteredProposalResponse proposal_response = 4;
    }
}

Change-Id: I92315ba391a35714282ed390c2c3d825673dc240
Signed-off-by: Artem Barger <bartem@il.ibm.com>
  • Loading branch information
C0rWin committed Jan 13, 2018
1 parent 3367d59 commit 967b5ef
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 120 deletions.
29 changes: 17 additions & 12 deletions core/peer/deliverevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,32 +181,34 @@ func (block *blockEvent) toFilteredBlock() (*peer.FilteredBlock, error) {
return nil, err
}

filteredBlock.Type = common.HeaderType(chdr.Type)
filteredBlock.ChannelId = chdr.ChannelId

if filteredBlock.Type == common.HeaderType_ENDORSER_TRANSACTION {
filteredTransaction := &peer.FilteredTransaction{
Txid: chdr.TxId,
Type: common.HeaderType(chdr.Type),
TxValidationCode: txsFltr.Flag(txIndex)}

if filteredTransaction.Type == common.HeaderType_ENDORSER_TRANSACTION {
tx, err := utils.GetTransaction(payload.Data)
if err != nil {
return nil, errors.WithMessage(err, "error unmarshal transaction payload for block event")
}

filteredActionArray, err := transactionActions(tx.Actions).toFilteredActions()
filteredTransaction.Data, err = transactionActions(tx.Actions).toFilteredActions()
if err != nil {
logger.Errorf(err.Error())
return nil, err
}
filteredBlock.FilteredTx = append(filteredBlock.FilteredTx, &peer.FilteredTransaction{
Txid: chdr.TxId,
TxValidationCode: txsFltr.Flag(txIndex),
FilteredAction: filteredActionArray})
}

filteredBlock.FilteredTx = append(filteredBlock.FilteredTx, filteredTransaction)
}

return filteredBlock, nil
}

func (ta transactionActions) toFilteredActions() ([]*peer.FilteredAction, error) {
var res []*peer.FilteredAction
func (ta transactionActions) toFilteredActions() (*peer.FilteredTransaction_ProposalResponse, error) {
proposalResponse := &peer.FilteredProposalResponse{}
for _, action := range ta {
chaincodeActionPayload, err := utils.GetChaincodeActionPayload(action.Payload)
if err != nil {
Expand All @@ -229,16 +231,19 @@ func (ta transactionActions) toFilteredActions() ([]*peer.FilteredAction, error)
}

if ccEvent.GetChaincodeId() != "" {
res = append(res, &peer.FilteredAction{
filteredAction := &peer.FilteredChaincodeAction{
CcEvent: &peer.ChaincodeEvent{
TxId: ccEvent.TxId,
ChaincodeId: ccEvent.ChaincodeId,
EventName: ccEvent.EventName,
},
})
}
proposalResponse.ChaincodeActions = append(proposalResponse.ChaincodeActions, filteredAction)
}
}
return res, nil
return &peer.FilteredTransaction_ProposalResponse{
ProposalResponse: proposalResponse,
}, nil
}

func dumpStacktraceOnPanic() {
Expand Down
18 changes: 11 additions & 7 deletions core/peer/deliverevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,17 @@ func TestEventsServer_DeliverFiltered(t *testing.T) {
test.Equal(uint64(0), block.Number)
test.Equal(test.channelID, block.ChannelId)
test.Equal(1, len(block.FilteredTx))
test.Equal(test.txID, block.FilteredTx[0].Txid)
test.Equal(peer.TxValidationCode_VALID, block.FilteredTx[0].TxValidationCode)
filteredActions := block.FilteredTx[0].FilteredAction
test.Equal(1, len(filteredActions))
test.Equal(test.eventName, filteredActions[0].CcEvent.EventName)
test.Equal(test.txID, filteredActions[0].CcEvent.TxId)
test.Equal(test.chaincodeName, filteredActions[0].CcEvent.ChaincodeId)
tx := block.FilteredTx[0]
test.Equal(test.txID, tx.Txid)
test.Equal(peer.TxValidationCode_VALID, tx.TxValidationCode)
test.Equal(common.HeaderType_ENDORSER_TRANSACTION, tx.Type)
proposalResponse := tx.GetProposalResponse()
test.NotNil(proposalResponse)
chaincodeActions := proposalResponse.ChaincodeActions
test.Equal(1, len(chaincodeActions))
test.Equal(test.eventName, chaincodeActions[0].CcEvent.EventName)
test.Equal(test.txID, chaincodeActions[0].CcEvent.TxId)
test.Equal(test.chaincodeName, chaincodeActions[0].CcEvent.ChaincodeId)
default:
test.FailNow("Unexpected response type")
}
Expand Down
13 changes: 6 additions & 7 deletions events/producer/eventhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func CreateBlockEvents(block *common.Block) (bevent *pb.Event, fbevent *pb.Event
return nil, nil, "", fmt.Errorf("error unmarshalling transaction payload for block event: %s", err)
}

filteredTx := &pb.FilteredTransaction{Txid: chdr.TxId, TxValidationCode: txsFltr.Flag(txIndex)}
filteredActionArray := []*pb.FilteredAction{}
filteredTx := &pb.FilteredTransaction{Txid: chdr.TxId, TxValidationCode: txsFltr.Flag(txIndex), Type: headerType}
proposalResponse := &pb.FilteredProposalResponse{}
for _, action := range tx.Actions {
chaincodeActionPayload, err := utils.GetChaincodeActionPayload(action.Payload)
if err != nil {
Expand All @@ -90,14 +90,14 @@ func CreateBlockEvents(block *common.Block) (bevent *pb.Event, fbevent *pb.Event
return nil, nil, "", fmt.Errorf("error unmarshalling chaincode event for block event: %s", err)
}

filteredAction := &pb.FilteredAction{}
chaincodeAction := &pb.FilteredChaincodeAction{}
if ccEvent.GetChaincodeId() != "" {
filteredCcEvent := ccEvent
// nil out ccevent payload
filteredCcEvent.Payload = nil
filteredAction.CcEvent = filteredCcEvent
chaincodeAction.CcEvent = filteredCcEvent
}
filteredActionArray = append(filteredActionArray, filteredAction)
proposalResponse.ChaincodeActions = append(proposalResponse.ChaincodeActions, chaincodeAction)

// Drop read write set from transaction before sending block event
// Performance issue with chaincode deploy txs and causes nodejs grpc
Expand All @@ -114,7 +114,7 @@ func CreateBlockEvents(block *common.Block) (bevent *pb.Event, fbevent *pb.Event
return nil, nil, "", fmt.Errorf("error marshalling tx action payload for block event: %s", err)
}
}
filteredTx.FilteredAction = filteredActionArray
filteredTx.Data = &pb.FilteredTransaction_ProposalResponse{ProposalResponse: proposalResponse}
filteredTxArray = append(filteredTxArray, filteredTx)

payload.Data, err = utils.GetBytesTransaction(tx)
Expand All @@ -136,7 +136,6 @@ func CreateBlockEvents(block *common.Block) (bevent *pb.Event, fbevent *pb.Event
}
filteredBlockForEvent.ChannelId = channelID
filteredBlockForEvent.Number = block.Header.Number
filteredBlockForEvent.Type = headerType
filteredBlockForEvent.FilteredTx = filteredTxArray

return CreateBlockEvent(blockForEvent), CreateFilteredBlockEvent(filteredBlockForEvent), channelID, nil
Expand Down
3 changes: 2 additions & 1 deletion protos/peer/admin.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 967b5ef

Please sign in to comment.