Skip to content

Commit

Permalink
Persist complete collection in the private blockstore (#2010)
Browse files Browse the repository at this point in the history
- In the current code, we trim the collection to match the key-values
present in the booting snapshot. As per the current design
assumption, this suffices for transferring the private data to another
peer may bootstrap from the same or a newer snapshot at a later point
in time. However, a possibility of booting a peer from an older snapshot
(or from genesis block) cannot be ruled out and is good to be able to
support. This commit allows for the above by not removing the unused
key-values (if any) from the collection data for a transaction.

- Also fixes a bug by skipping a check; wherein we check for the
transaction validation flag before applying the missing private data.
Because, in the snapshot state, the data appears only from the
valid transactions

Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi authored Oct 14, 2020
1 parent 4e2ac6f commit 0ee3295
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 46 deletions.
80 changes: 40 additions & 40 deletions core/ledger/kvledger/hashcheck_pvtdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"bytes"

"github.com/hyperledger/fabric-protos-go/ledger/rwset"
"github.com/hyperledger/fabric-protos-go/ledger/rwset/kvrwset"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/ledger"
Expand Down Expand Up @@ -47,7 +46,7 @@ func constructValidAndInvalidPvtData(
var err error

if pvtdata.BlockNum <= lastBlockInBootSnapshot {
validData, invalidData, err = verifyHashesAndTrimWriteSetViaBootKVHashes(pvtdata, pvtdataStore)
validData, invalidData, err = verifyHashesViaBootKVHashes(pvtdata, pvtdataStore)
} else {
validData, invalidData, err = verifyHashesFromBlockStore(pvtdata, blockStore)
}
Expand Down Expand Up @@ -193,7 +192,7 @@ func findInvalidNsPvtData(nsRwset *rwset.NsPvtReadWriteSet, txRWSet *rwsetutil.T
return invalidPvtData, invalidNsColl
}

func verifyHashesAndTrimWriteSetViaBootKVHashes(reconciledPvtdata *ledger.ReconciledPvtdata, pvtdataStore *pvtdatastorage.Store) (
func verifyHashesViaBootKVHashes(reconciledPvtdata *ledger.ReconciledPvtdata, pvtdataStore *pvtdatastorage.Store) (
[]*ledger.TxPvtData, []*ledger.PvtdataHashMismatch, error,
) {
var validPvtData []*ledger.TxPvtData
Expand All @@ -202,20 +201,22 @@ func verifyHashesAndTrimWriteSetViaBootKVHashes(reconciledPvtdata *ledger.Reconc
blkNum := reconciledPvtdata.BlockNum

for txNum, txData := range reconciledPvtdata.WriteSets { // Tx loop
var toDeleteNsColl []*nsColl

reconTx, err := rwsetutil.TxPvtRwSetFromProtoMsg(txData.WriteSet)
if err != nil {
continue
}

trimmedTx := &rwsetutil.TxPvtRwSet{}

for _, reconNS := range reconTx.NsPvtRwSet { // Ns Loop
trimmedNs := &rwsetutil.NsPvtRwSet{
NameSpace: reconNS.NameSpace,
}

for _, reconColl := range reconNS.CollPvtRwSets { // coll loop
if reconColl.KvRwSet == nil || len(reconColl.KvRwSet.Writes) == 0 {
toDeleteNsColl = append(toDeleteNsColl,
&nsColl{
ns: reconNS.NameSpace,
coll: reconColl.CollectionName,
},
)
continue
}

Expand All @@ -224,30 +225,41 @@ func verifyHashesAndTrimWriteSetViaBootKVHashes(reconciledPvtdata *ledger.Reconc
return nil, nil, err
}
if len(expectedKVHashes) == 0 {
toDeleteNsColl = append(toDeleteNsColl,
&nsColl{
ns: reconNS.NameSpace,
coll: reconColl.CollectionName,
},
)
continue
}

filteredInKVs := []*kvrwset.KVWrite{}
anyKVHashMismatch := false
anyKVMismatch := false
numKVsRecieved := 0
keysVisited := map[string]struct{}{}

for _, reconKV := range reconColl.KvRwSet.Writes {
_, ok := keysVisited[reconKV.Key]
if ok {
anyKVMismatch = true
break
}
keysVisited[reconKV.Key] = struct{}{}

reconKeyHash := util.ComputeSHA256([]byte(reconKV.Key))
reconValHash := util.ComputeSHA256(reconKV.Value)

expectedValHash, ok := expectedKVHashes[string(reconKeyHash)]
if ok {
numKVsRecieved++
if bytes.Equal(expectedValHash, reconValHash) {
filteredInKVs = append(filteredInKVs, reconKV)
} else {
anyKVHashMismatch = true
if !bytes.Equal(expectedValHash, reconValHash) {
anyKVMismatch = true
break
}
}
}

if anyKVHashMismatch || numKVsRecieved < len(expectedKVHashes) {
if anyKVMismatch || numKVsRecieved < len(expectedKVHashes) {
invalidPvtData = append(invalidPvtData,
&ledger.PvtdataHashMismatch{
BlockNum: blkNum,
Expand All @@ -256,35 +268,23 @@ func verifyHashesAndTrimWriteSetViaBootKVHashes(reconciledPvtdata *ledger.Reconc
Collection: reconColl.CollectionName,
},
)
toDeleteNsColl = append(toDeleteNsColl,
&nsColl{
ns: reconNS.NameSpace,
coll: reconColl.CollectionName,
},
)
continue
}

trimmedNs.CollPvtRwSets = append(trimmedNs.CollPvtRwSets,
&rwsetutil.CollPvtRwSet{
CollectionName: reconColl.CollectionName,
KvRwSet: &kvrwset.KVRWSet{
Writes: filteredInKVs,
},
},
)
} // end coll loop

if len(trimmedNs.CollPvtRwSets) > 0 {
trimmedTx.NsPvtRwSet = append(trimmedTx.NsPvtRwSet, trimmedNs)
}
} // end Ns loop

if len(trimmedTx.NsPvtRwSet) > 0 {
trimmedTxProto, err := trimmedTx.ToProtoMsg()
if err != nil {
return nil, nil, err
}
validPvtData = append(validPvtData,
&ledger.TxPvtData{
SeqInBlock: txNum,
WriteSet: trimmedTxProto,
},
)
for _, nsColl := range toDeleteNsColl {
removeCollFromTxPvtReadWriteSet(txData.WriteSet, nsColl.ns, nsColl.coll)
}

if len(txData.WriteSet.NsPvtRwset) > 0 {
validPvtData = append(validPvtData, txData)
}
} // end Tx loop
return validPvtData, invalidPvtData, nil
Expand Down
70 changes: 67 additions & 3 deletions core/ledger/kvledger/hashcheck_pvtdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/ledger/rwset"
"github.com/hyperledger/fabric-protos-go/ledger/rwset/kvrwset"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger"
Expand Down Expand Up @@ -242,7 +243,7 @@ func TestConstructValidInvalidBlocksPvtData(t *testing.T) {
require.Len(t, hashMismatches, 0)
})

t.Run("for-data-before-snapshot:trims-the-extra-keys", func(t *testing.T) {
t.Run("for-data-before-snapshot:does-not-trim-the-extra-keys", func(t *testing.T) {
lgr := bootstrappedLedger
pvtdata := pvtdataCopy()
pvtdataWithExtraKey := pvtdataCopy()
Expand Down Expand Up @@ -275,7 +276,7 @@ func TestConstructValidInvalidBlocksPvtData(t *testing.T) {
verifyBlocksPvtdata(t,
map[uint64][]*ledger.TxPvtData{
2: {
pvtdata[0],
pvtdataWithExtraKey[0],
pvtdata[1],
},
},
Expand Down Expand Up @@ -331,6 +332,69 @@ func TestConstructValidInvalidBlocksPvtData(t *testing.T) {
)
})

t.Run("for-data-before-snapshot:reports-repeated-key", func(t *testing.T) {
lgr := bootstrappedLedger
pvtdata := pvtdataCopy()
repeatedKeyInTx0Ns1Coll1 := pvtdataCopy()
repeatedKeyWS := &kvrwset.KVRWSet{
Writes: []*kvrwset.KVWrite{
{
Key: "tx0-key-1",
Value: []byte("tx0-val-1"),
},
{
Key: "tx0-key-1",
Value: []byte("tx0-val-1-tempered"),
},
},
}

repeatedKeyWSBytes, err := proto.Marshal(repeatedKeyWS)
require.NoError(t, err)
repeatedKeyInTx0Ns1Coll1[0].WriteSet.NsPvtRwset[0].CollectionPvtRwset[0].Rwset = repeatedKeyWSBytes

expectedPartOfTx0, _ := produceSamplePvtdata(t, 0,
[][4]string{
{"ns-1", "coll-2", "tx0-key-2", "tx0-val-2"},
},
)

blocksValidPvtData, hashMismatches, err := constructValidAndInvalidPvtData(
[]*ledger.ReconciledPvtdata{
{
BlockNum: 2,
WriteSets: repeatedKeyInTx0Ns1Coll1,
},
},
lgr.blockStore,
lgr.pvtdataStore,
2,
)
require.NoError(t, err)

verifyBlocksPvtdata(t,
map[uint64][]*ledger.TxPvtData{
2: {
expectedPartOfTx0,
pvtdata[1],
},
},
blocksValidPvtData,
)

require.ElementsMatch(t,
[]*ledger.PvtdataHashMismatch{
{
BlockNum: 2,
TxNum: 0,
Namespace: "ns-1",
Collection: "coll-1",
},
},
hashMismatches,
)
})

t.Run("for-data-before-snapshot:ignores-bad-data-corrupted-writeset", func(t *testing.T) {
lgr := bootstrappedLedger
pvtdata := pvtdataCopy()
Expand Down Expand Up @@ -411,7 +475,7 @@ func verifyBlocksPvtdata(t *testing.T, expected, actual map[uint64][]*ledger.TxP

for _, e := range expectedTx {
require.NotNil(t, m[e.SeqInBlock])
require.True(t, proto.Equal(e.WriteSet, m[e.SeqInBlock]))
require.Equal(t, e.WriteSet, m[e.SeqInBlock])
}
}
}
Expand Down
18 changes: 15 additions & 3 deletions core/ledger/kvledger/kv_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,12 @@ func (l *kvLedger) CommitPvtDataOfOldBlocks(reconciledPvtdata []*ledger.Reconcil

func (l *kvLedger) applyValidTxPvtDataOfOldBlocks(hashVerifiedPvtData map[uint64][]*ledger.TxPvtData) error {
logger.Debugf("[%s:] Filtering pvtData of invalidation transactions", l.ledgerID)
committedPvtData, err := filterPvtDataOfInvalidTx(hashVerifiedPvtData, l.blockStore)

lastBlockInBootstrapSnapshot := uint64(0)
if l.bootSnapshotMetadata != nil {
lastBlockInBootstrapSnapshot = l.bootSnapshotMetadata.LastBlockNumber
}
committedPvtData, err := filterPvtDataOfInvalidTx(hashVerifiedPvtData, l.blockStore, lastBlockInBootstrapSnapshot)
if err != nil {
return err
}
Expand Down Expand Up @@ -984,10 +989,17 @@ func (a *ccEventListenerAdaptor) ChaincodeDeployDone(succeeded bool) {
a.legacyEventListener.ChaincodeDeployDone(succeeded)
}

func filterPvtDataOfInvalidTx(hashVerifiedPvtData map[uint64][]*ledger.TxPvtData, blockStore *blkstorage.BlockStore) (map[uint64][]*ledger.TxPvtData, error) {
func filterPvtDataOfInvalidTx(
hashVerifiedPvtData map[uint64][]*ledger.TxPvtData,
blockStore *blkstorage.BlockStore,
lastBlockInBootstrapSnapshot uint64,
) (map[uint64][]*ledger.TxPvtData, error) {
committedPvtData := make(map[uint64][]*ledger.TxPvtData)
for blkNum, txsPvtData := range hashVerifiedPvtData {

if blkNum <= lastBlockInBootstrapSnapshot {
committedPvtData[blkNum] = txsPvtData
continue
}
// TODO: Instead of retrieving the whole block, we need to retrieve only
// the TxValidationFlags from the block metadata. For that, we would need
// to add a new index for the block metadata - FAB-15808
Expand Down

0 comments on commit 0ee3295

Please sign in to comment.