Skip to content

Commit

Permalink
[FAB-5756]Add transaction number into pvt data msg
Browse files Browse the repository at this point in the history
CR-12177 related to FAB-4976 introduced need to take care of propagating
information about transaction index related to the pvt data inside the
block. This commit extends gossip message to include this information
and adds unit tests to cover functionality.

Change-Id: I96fd786757862d915a34a68190f53ac9c9bbf7f7
Signed-off-by: Artem Barger <bartem@il.ibm.com>
  • Loading branch information
C0rWin committed Aug 14, 2017
1 parent f13a36c commit c335208
Show file tree
Hide file tree
Showing 6 changed files with 498 additions and 148 deletions.
66 changes: 54 additions & 12 deletions gossip/state/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,49 +11,77 @@ import (

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/committer"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/gossip"
"github.com/hyperledger/fabric/protos/ledger/rwset"
"github.com/pkg/errors"
)

// PvtData a placeholder to represent private data
type PvtData struct {
Payload *rwset.TxPvtReadWriteSet
Payload *ledger.TxPvtData
}

// PvtDataCollections data type to encapsulate collections
// of private data
type PvtDataCollections []*PvtData

// Marshal encodes private collection into bytes array
func (pvt PvtDataCollections) Marshal() ([][]byte, error) {
func (pvt *PvtDataCollections) Marshal() ([][]byte, error) {
pvtDataBytes := make([][]byte, 0)
for index, each := range pvt {
pvtBytes, err := proto.Marshal(each.Payload)
for index, each := range *pvt {
if each.Payload == nil {
errMsg := fmt.Sprintf("Mallformed private data payload, rwset index %d, payload is nil", index)
logger.Errorf(errMsg)
return nil, errors.New(errMsg)
}
pvtBytes, err := proto.Marshal(each.Payload.WriteSet)
if err != nil {
errMsg := fmt.Sprintf("Could not marshal private rwset index %d, due to %s", index, err)
logger.Errorf(errMsg)
return nil, errors.New(errMsg)
}
pvtDataBytes = append(pvtDataBytes, pvtBytes)
// Compose gossip protobuf message with private rwset + index of transaction in the block
txSeqInBlock := each.Payload.SeqInBlock
pvtDataPayload := &gossip.PvtDataPayload{TxSeqInBlock: txSeqInBlock, Payload: pvtBytes}
payloadBytes, err := proto.Marshal(pvtDataPayload)
if err != nil {
errMsg := fmt.Sprintf("Could not marshal private payload with transaction index %d, due to %s", txSeqInBlock, err)
logger.Errorf(errMsg)
return nil, errors.New(errMsg)
}

pvtDataBytes = append(pvtDataBytes, payloadBytes)
}
return pvtDataBytes, nil
}

// Unmarshal read and unmarshal collection of private data
// from given bytes array
func (pvt PvtDataCollections) Unmarshal(data [][]byte) error {
func (pvt *PvtDataCollections) Unmarshal(data [][]byte) error {
for _, each := range data {
payload := &rwset.TxPvtReadWriteSet{}
payload := &gossip.PvtDataPayload{}
if err := proto.Unmarshal(each, payload); err != nil {
return err
}
pvt = append(pvt, &PvtData{Payload: payload})
pvtRWSet := &rwset.TxPvtReadWriteSet{}
if err := proto.Unmarshal(payload.Payload, pvtRWSet); err != nil {
return err
}
*pvt = append(*pvt, &PvtData{Payload: &ledger.TxPvtData{
SeqInBlock: payload.TxSeqInBlock,
WriteSet: pvtRWSet,
}})
}

return nil
}

// PvtDataFilter predicate which used to filter block
// private data
type PvtDataFilter func(data *PvtData) bool

// Coordinator orchestrates the flow of the new
// blocks arrival and in flight transient data, responsible
// to complete missing parts of transient data for given block.
Expand All @@ -62,8 +90,11 @@ type Coordinator interface {
// returns missing transaction ids
StoreBlock(block *common.Block, data ...PvtDataCollections) ([]string, error)

// GetPvtDataAndBlockByNum returns block and related to the block private data
GetPvtDataAndBlockByNum(seqNum uint64, filter PvtDataFilter) (*common.Block, PvtDataCollections, error)

// GetBlockByNum returns block and related to the block private data
GetBlockByNum(seqNum uint64) (*common.Block, PvtDataCollections, error)
GetBlockByNum(seqNum uint64) (*common.Block, error)

// Get recent block sequence number
LedgerHeight() (uint64, error)
Expand All @@ -83,13 +114,24 @@ func NewCoordinator(committer committer.Committer) Coordinator {

func (c *coordinator) StoreBlock(block *common.Block, data ...PvtDataCollections) ([]string, error) {
// Need to check whenever there are missing private rwset
return nil, c.Committer.Commit(block)
if len(data) == 0 {
return nil, c.Commit(block)
}
return nil, c.Commit(block)
}

func (c *coordinator) GetBlockByNum(seqNum uint64) (*common.Block, PvtDataCollections, error) {
blocks := c.Committer.GetBlocks([]uint64{seqNum})
func (c *coordinator) GetPvtDataAndBlockByNum(seqNum uint64, filter PvtDataFilter) (*common.Block, PvtDataCollections, error) {
blocks := c.GetBlocks([]uint64{seqNum})
if len(blocks) == 0 {
return nil, nil, fmt.Errorf("Cannot retreive block number %d", seqNum)
}
return blocks[0], nil, nil
}

func (c *coordinator) GetBlockByNum(seqNum uint64) (*common.Block, error) {
blocks := c.GetBlocks([]uint64{seqNum})
if len(blocks) == 0 {
return nil, fmt.Errorf("Cannot retreive block number %d", seqNum)
}
return blocks[0], nil
}
246 changes: 246 additions & 0 deletions gossip/state/coordinator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package state

import (
"fmt"
"testing"

"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/ledger/rwset"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

type committerMock struct {
mock.Mock
}

func (mock *committerMock) Commit(block *common.Block) error {
args := mock.Called(block)
return args.Error(0)
}

func (mock *committerMock) LedgerHeight() (uint64, error) {
args := mock.Called()
return args.Get(0).(uint64), args.Error(1)
}

func (mock *committerMock) GetBlocks(blockSeqs []uint64) []*common.Block {
args := mock.Called(blockSeqs)
seqs := args.Get(0)
if seqs == nil {
return nil
}
return seqs.([]*common.Block)
}

func (mock *committerMock) Close() {
mock.Called()
}

func TestPvtDataCollections_FailOnEmptyPayload(t *testing.T) {
collection := &PvtDataCollections{
&PvtData{
Payload: &ledger.TxPvtData{
SeqInBlock: uint64(1),
WriteSet: &rwset.TxPvtReadWriteSet{
DataModel: rwset.TxReadWriteSet_KV,
NsPvtRwset: []*rwset.NsPvtReadWriteSet{
{
Namespace: "ns1",
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
{
CollectionName: "secretCollection",
Rwset: []byte{1, 2, 3, 4, 5, 6, 7},
},
},
},
},
},
},
},

&PvtData{
Payload: nil,
},
}

_, err := collection.Marshal()
assertion := assert.New(t)
assertion.Error(err, "Expected to fail since second item has nil payload")
assertion.Equal("Mallformed private data payload, rwset index 1, payload is nil", fmt.Sprintf("%s", err))
}

func TestPvtDataCollections_FailMarshalingWriteSet(t *testing.T) {
collection := &PvtDataCollections{
&PvtData{
Payload: &ledger.TxPvtData{
SeqInBlock: uint64(1),
WriteSet: nil,
},
},
}

_, err := collection.Marshal()
assertion := assert.New(t)
assertion.Error(err, "Expected to fail since first item has nil writeset")
assertion.Contains(fmt.Sprintf("%s", err), "Could not marshal private rwset index 0")
}

func TestPvtDataCollections_Marshal(t *testing.T) {
collection := &PvtDataCollections{
&PvtData{
Payload: &ledger.TxPvtData{
SeqInBlock: uint64(1),
WriteSet: &rwset.TxPvtReadWriteSet{
DataModel: rwset.TxReadWriteSet_KV,
NsPvtRwset: []*rwset.NsPvtReadWriteSet{
{
Namespace: "ns1",
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
{
CollectionName: "secretCollection",
Rwset: []byte{1, 2, 3, 4, 5, 6, 7},
},
},
},
},
},
},
},

&PvtData{
Payload: &ledger.TxPvtData{
SeqInBlock: uint64(2),
WriteSet: &rwset.TxPvtReadWriteSet{
DataModel: rwset.TxReadWriteSet_KV,
NsPvtRwset: []*rwset.NsPvtReadWriteSet{
{
Namespace: "ns1",
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
{
CollectionName: "secretCollection",
Rwset: []byte{42, 42, 42, 42, 42, 42, 42},
},
},
},
{
Namespace: "ns2",
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
{
CollectionName: "otherCollection",
Rwset: []byte{10, 9, 8, 7, 6, 5, 4, 3, 2, 1},
},
},
},
},
},
},
},
}

bytes, err := collection.Marshal()

assertion := assert.New(t)
assertion.NoError(err)
assertion.NotNil(bytes)
assertion.Equal(2, len(bytes))
}

func TestPvtDataCollections_Unmarshal(t *testing.T) {
collection := PvtDataCollections{
&PvtData{
Payload: &ledger.TxPvtData{
SeqInBlock: uint64(1),
WriteSet: &rwset.TxPvtReadWriteSet{
DataModel: rwset.TxReadWriteSet_KV,
NsPvtRwset: []*rwset.NsPvtReadWriteSet{
{
Namespace: "ns1",
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
{
CollectionName: "secretCollection",
Rwset: []byte{1, 2, 3, 4, 5, 6, 7},
},
},
},
},
},
},
},
}

bytes, err := collection.Marshal()

assertion := assert.New(t)
assertion.NoError(err)
assertion.NotNil(bytes)
assertion.Equal(1, len(bytes))

var newCol PvtDataCollections

err = newCol.Unmarshal(bytes)
assertion.NoError(err)
assertion.Equal(newCol, collection)
}

func TestNewCoordinator(t *testing.T) {
assertion := assert.New(t)

committer := new(committerMock)

block := &common.Block{
Header: &common.BlockHeader{
Number: 1,
PreviousHash: []byte{0, 0, 0},
DataHash: []byte{1, 1, 1},
},
Data: &common.BlockData{
Data: [][]byte{{1, 2, 3, 4, 5, 6}},
},
}

blockToCommit := &common.Block{
Header: &common.BlockHeader{
Number: 2,
PreviousHash: []byte{1, 1, 1},
DataHash: []byte{2, 2, 2},
},
Data: &common.BlockData{
Data: [][]byte{{11, 12, 13, 14, 15, 16}},
},
}

committer.On("GetBlocks", []uint64{1}).Return([]*common.Block{block})
committer.On("GetBlocks", []uint64{2}).Return(nil)

committer.On("LedgerHeight").Return(uint64(1), nil)
committer.On("Commit", blockToCommit).Return(nil)

coord := NewCoordinator(committer)

b, err := coord.GetBlockByNum(1)

assertion.NoError(err)
assertion.Equal(block, b)

b, err = coord.GetBlockByNum(2)

assertion.Error(err)
assertion.Nil(b)

height, err := coord.LedgerHeight()
assertion.NoError(err)
assertion.Equal(uint64(1), height)

missingPvtTx, err := coord.StoreBlock(blockToCommit)

assertion.NoError(err)
assertion.Empty(missingPvtTx)
}
4 changes: 2 additions & 2 deletions gossip/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func (s *GossipStateProviderImpl) handleStateRequest(msg proto.ReceivedMessage)
response := &proto.RemoteStateResponse{Payloads: make([]*proto.Payload, 0)}
for seqNum := request.StartSeqNum; seqNum <= endSeqNum; seqNum++ {
logger.Debug("Reading block ", seqNum, " with private data from the coordinator service")
block, pvtData, err := s.coordinator.GetBlockByNum(seqNum)
block, pvtData, err := s.coordinator.GetPvtDataAndBlockByNum(seqNum, nil)

if err != nil {
logger.Errorf("Wasn't able to read block with sequence number %d from ledger, "+
Expand Down Expand Up @@ -661,7 +661,7 @@ func (s *GossipStateProviderImpl) hasRequiredHeight(height uint64) func(peer dis
func (s *GossipStateProviderImpl) GetBlock(index uint64) *common.Block {
// Try to read missing block from the ledger, should return no nil with
// content including at least one block
if block, _, err := s.coordinator.GetBlockByNum(index); block != nil && err != nil {
if block, err := s.coordinator.GetBlockByNum(index); block != nil && err != nil {
return block
}

Expand Down
Loading

0 comments on commit c335208

Please sign in to comment.