From 5d4798971ccfab58822e0dfa653655907dc4db6a Mon Sep 17 00:00:00 2001 From: manish Date: Sat, 5 Aug 2017 22:13:25 -0400 Subject: [PATCH] [FAB-4976] Sidedb - pvtdata storage This CR implements a store for persisting the writesets produced over the private data. From data perspective, this storage is analogous to the block storage for the block data Change-Id: I43b5349d3671bffa67f7975794e6f1937f99dde5 Signed-off-by: manish --- core/ledger/ledger_interface.go | 61 ++++ core/ledger/ledgerconfig/ledger_config.go | 5 + core/ledger/pkg_test.go | 52 ++++ core/ledger/pvtdatastorage/kv_encoding.go | 56 ++++ core/ledger/pvtdatastorage/store.go | 80 ++++++ core/ledger/pvtdatastorage/store_impl.go | 260 ++++++++++++++++++ core/ledger/pvtdatastorage/store_impl_test.go | 150 ++++++++++ core/ledger/pvtdatastorage/test_exports.go | 46 ++++ 8 files changed, 710 insertions(+) create mode 100644 core/ledger/pkg_test.go create mode 100644 core/ledger/pvtdatastorage/kv_encoding.go create mode 100644 core/ledger/pvtdatastorage/store.go create mode 100644 core/ledger/pvtdatastorage/store_impl.go create mode 100644 core/ledger/pvtdatastorage/store_impl_test.go create mode 100644 core/ledger/pvtdatastorage/test_exports.go diff --git a/core/ledger/ledger_interface.go b/core/ledger/ledger_interface.go index f5427f751bd..6d22c04a363 100644 --- a/core/ledger/ledger_interface.go +++ b/core/ledger/ledger_interface.go @@ -19,6 +19,7 @@ package ledger import ( commonledger "github.com/hyperledger/fabric/common/ledger" "github.com/hyperledger/fabric/protos/common" + "github.com/hyperledger/fabric/protos/ledger/rwset" "github.com/hyperledger/fabric/protos/peer" ) @@ -125,3 +126,63 @@ type TxSimulator interface { // of information in different way in order to support different data-models or optimize the information representations. GetTxSimulationResults() ([]byte, error) } + +// TxPvtData encapsulates the transaction number and pvt write-set for a transaction +type TxPvtData struct { + SeqInBlock uint64 + WriteSet *rwset.TxPvtReadWriteSet +} + +// BlockAndPvtData encapsultes the block and a map that contains the tuples +// The map is expected to contain the entries only for the transactions that has associated pvt data +type BlockAndPvtData struct { + Block *common.Block + BlockPvtData map[uint64]*TxPvtData +} + +// PvtCollFilter represents the set of the collection names (as keys of the map with value 'true') +type PvtCollFilter map[string]bool + +// PvtNsCollFilter specifies the tuple +type PvtNsCollFilter map[string]PvtCollFilter + +// NewPvtNsCollFilter constructs an empty PvtNsCollFilter +func NewPvtNsCollFilter() PvtNsCollFilter { + return make(map[string]PvtCollFilter) +} + +// Has returns true if the pvtdata includes the data for collection +func (pvtdata *TxPvtData) Has(ns string, coll string) bool { + if pvtdata.WriteSet == nil { + return false + } + for _, nsdata := range pvtdata.WriteSet.NsPvtRwset { + if nsdata.Namespace == ns { + for _, colldata := range nsdata.CollectionPvtRwset { + if colldata.CollectionName == coll { + return true + } + } + } + } + return false +} + +// Add adds a namespace-collection tuple to the filter +func (filter PvtNsCollFilter) Add(ns string, coll string) { + collFilter, ok := filter[ns] + if !ok { + collFilter = make(map[string]bool) + filter[ns] = collFilter + } + collFilter[coll] = true +} + +// Has returns true if the filter has the entry for tuple namespace-collection +func (filter PvtNsCollFilter) Has(ns string, coll string) bool { + collFilter, ok := filter[ns] + if !ok { + return false + } + return collFilter[coll] +} diff --git a/core/ledger/ledgerconfig/ledger_config.go b/core/ledger/ledgerconfig/ledger_config.go index c85021dd57b..45a7d0842b5 100644 --- a/core/ledger/ledgerconfig/ledger_config.go +++ b/core/ledger/ledgerconfig/ledger_config.go @@ -59,6 +59,11 @@ func GetBlockStorePath() string { return filepath.Join(GetRootPath(), "chains") } +// GetPvtdataStorePath returns the filesystem path that is used for permanent storage of private write-sets +func GetPvtdataStorePath() string { + return filepath.Join(GetRootPath(), "pvtdataStore") +} + // GetMaxBlockfileSize returns maximum size of the block file func GetMaxBlockfileSize() int { return 64 * 1024 * 1024 diff --git a/core/ledger/pkg_test.go b/core/ledger/pkg_test.go new file mode 100644 index 00000000000..36df23679fb --- /dev/null +++ b/core/ledger/pkg_test.go @@ -0,0 +1,52 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ +package ledger + +import ( + "testing" + + "github.com/hyperledger/fabric/protos/ledger/rwset" + "github.com/stretchr/testify/assert" +) + +func TestTxPvtData(t *testing.T) { + txPvtData := &TxPvtData{} + assert.False(t, txPvtData.Has("ns", "coll")) + + txPvtData.WriteSet = &rwset.TxPvtReadWriteSet{ + DataModel: rwset.TxReadWriteSet_KV, + NsPvtRwset: []*rwset.NsPvtReadWriteSet{ + &rwset.NsPvtReadWriteSet{ + Namespace: "ns", + CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{ + &rwset.CollectionPvtReadWriteSet{ + CollectionName: "coll-1", + Rwset: []byte("RandomBytes-PvtRWSet-ns1-coll1"), + }, + &rwset.CollectionPvtReadWriteSet{ + CollectionName: "coll-2", + Rwset: []byte("RandomBytes-PvtRWSet-ns1-coll2"), + }, + }, + }, + }, + } + + assert.True(t, txPvtData.Has("ns", "coll-1")) + assert.True(t, txPvtData.Has("ns", "coll-2")) + assert.False(t, txPvtData.Has("ns", "coll-3")) + assert.False(t, txPvtData.Has("ns1", "coll-1")) +} + +func TestPvtNsCollFilter(t *testing.T) { + filter := NewPvtNsCollFilter() + filter.Add("ns", "coll-1") + filter.Add("ns", "coll-2") + assert.True(t, filter.Has("ns", "coll-1")) + assert.True(t, filter.Has("ns", "coll-2")) + assert.False(t, filter.Has("ns", "coll-3")) + assert.False(t, filter.Has("ns1", "coll-3")) +} diff --git a/core/ledger/pvtdatastorage/kv_encoding.go b/core/ledger/pvtdatastorage/kv_encoding.go new file mode 100644 index 00000000000..69951a52dcf --- /dev/null +++ b/core/ledger/pvtdatastorage/kv_encoding.go @@ -0,0 +1,56 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package pvtdatastorage + +import ( + "math" + + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version" + "github.com/hyperledger/fabric/protos/ledger/rwset" +) + +var ( + pendingCommitKey = []byte{0} + lastCommittedBlkkey = []byte{1} + pvtDataKeyPrefix = []byte{2} + + emptyValue = []byte{} +) + +func encodePK(blockNum uint64, tranNum uint64) blkTranNumKey { + return append(pvtDataKeyPrefix, version.NewHeight(blockNum, tranNum).ToBytes()...) +} + +func decodePK(key blkTranNumKey) (blockNum uint64, tranNum uint64) { + height, _ := version.NewHeightFromBytes(key[1:]) + return height.BlockNum, height.TxNum +} + +func getKeysForRangeScanByBlockNum(blockNum uint64) (startKey []byte, endKey []byte) { + startKey = encodePK(blockNum, 0) + endKey = encodePK(blockNum, math.MaxUint64) + return +} + +func encodePvtRwSet(txPvtRwSet *rwset.TxPvtReadWriteSet) ([]byte, error) { + return proto.Marshal(txPvtRwSet) +} + +func decodePvtRwSet(encodedBytes []byte) (*rwset.TxPvtReadWriteSet, error) { + writeset := &rwset.TxPvtReadWriteSet{} + return writeset, proto.Unmarshal(encodedBytes, writeset) +} + +func encodeBlockNum(blockNum uint64) []byte { + return proto.EncodeVarint(blockNum) +} + +func decodeBlockNum(blockNumBytes []byte) uint64 { + s, _ := proto.DecodeVarint(blockNumBytes) + return s +} diff --git a/core/ledger/pvtdatastorage/store.go b/core/ledger/pvtdatastorage/store.go new file mode 100644 index 00000000000..c1c04e801ee --- /dev/null +++ b/core/ledger/pvtdatastorage/store.go @@ -0,0 +1,80 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package pvtdatastorage + +import ( + "github.com/hyperledger/fabric/core/ledger" +) + +// Provider provides handle to specific 'Store' that in turn manages +// private write sets for a ledger +type Provider interface { + OpenStore(id string) (Store, error) + Close() +} + +// Store manages the permanent storage of private write sets for a ledger +// Beacsue the pvt data is supposed to be in sync with the blocks in the +// ledger, both should logically happen in an atomic operation. In order +// to accomplish this, an implementation of this store should provide +// support for a two-phase like commit/rollback capability. +// The expected use is such that - first the private data will be given to +// this store (via `Prepare` funtion) and then the block is appended to the block storage. +// Finally, one of the functions `Commit` or `Rollback` is invoked on this store based +// on whether the block was written successfully or not. The store implementation +// is expected to survive a server crash between the call to `Prepare` and `Commit`/`Rollback` +type Store interface { + // GetPvtDataByBlockNum returns only the pvt data corresponding to the given block number + // The pvt data is filtered by the list of 'ns/collections' supplied in the filter + // A nil filter does not filter any results + GetPvtDataByBlockNum(blockNum uint64, filter ledger.PvtNsCollFilter) ([]*ledger.TxPvtData, error) + // Prepare prepares the Store for commiting the pvt data. This call does not commit the pvt data. + // Subsequently, the caller is expected to call either `Commit` or `Rollback` function. + // Return from this should ensure that enough preparation is done such that `Commit` function invoked afterwards + // can commit the data and the store is capable of surviving a crash between this function call and the next + // invoke to the `Commit` + Prepare(blockNum uint64, pvtData []*ledger.TxPvtData) error + // Commit commits the pvt data passed in the previous invoke to the `Prepare` function + Commit() error + // Rollback rolls back the pvt data passed in the previous invoke to the `Prepare` function + Rollback() error + // IsEmpty returns true if the store does not have any block committed yet + IsEmpty() (bool, error) + // LastCommittedBlock returns the last committed blocknum + LastCommittedBlock() (uint64, error) + // HasPendingBatch returns if the store has a pending batch + HasPendingBatch() (bool, error) + // Shutdown stops the store + Shutdown() +} + +// ErrIllegalCall is to be thrown by a store impl if the store does not expect a call to Prepare/Commit/Rollback +type ErrIllegalCall struct { + msg string +} + +func (err *ErrIllegalCall) Error() string { + return err.msg +} + +// ErrIllegalArgs is to be thrown by a store impl if the args passed are not allowed +type ErrIllegalArgs struct { + msg string +} + +func (err *ErrIllegalArgs) Error() string { + return err.msg +} + +// ErrOutOfRange is to be thrown for the request for the data that is not yet committed +type ErrOutOfRange struct { + msg string +} + +func (err *ErrOutOfRange) Error() string { + return err.msg +} diff --git a/core/ledger/pvtdatastorage/store_impl.go b/core/ledger/pvtdatastorage/store_impl.go new file mode 100644 index 00000000000..de517da37c0 --- /dev/null +++ b/core/ledger/pvtdatastorage/store_impl.go @@ -0,0 +1,260 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package pvtdatastorage + +import ( + "fmt" + + "github.com/hyperledger/fabric/common/flogging" + "github.com/hyperledger/fabric/common/ledger/util/leveldbhelper" + "github.com/hyperledger/fabric/core/ledger" + "github.com/hyperledger/fabric/core/ledger/ledgerconfig" + "github.com/hyperledger/fabric/protos/ledger/rwset" +) + +var logger = flogging.MustGetLogger("pvtdatastorage") + +type provider struct { + dbProvider *leveldbhelper.Provider +} + +type store struct { + db *leveldbhelper.DBHandle + ledgerid string + isEmpty bool + lastCommittedBlock uint64 + batchPending bool +} + +type blkTranNumKey []byte + +// NewProvider instantiates a StoreProvider +func NewProvider() Provider { + dbPath := ledgerconfig.GetPvtdataStorePath() + dbProvider := leveldbhelper.NewProvider(&leveldbhelper.Conf{DBPath: dbPath}) + return &provider{dbProvider: dbProvider} +} + +// OpenStore returns a handle to a store +func (p *provider) OpenStore(ledgerid string) (Store, error) { + dbHandle := p.dbProvider.GetDBHandle(ledgerid) + s := &store{db: dbHandle, ledgerid: ledgerid} + if err := s.initState(); err != nil { + return nil, err + } + return s, nil +} + +// Close closes the store +func (p *provider) Close() { + p.dbProvider.Close() +} + +func (s *store) initState() error { + var err error + if s.isEmpty, s.lastCommittedBlock, err = s.getLastCommittedBlockNum(); err != nil { + return err + } + if s.batchPending, err = s.hasPendingCommit(); err != nil { + return err + } + return nil +} + +// Prepare implements the function in the interface `Store` +func (s *store) Prepare(blockNum uint64, pvtData []*ledger.TxPvtData) error { + if s.batchPending { + return &ErrIllegalCall{`A pending batch exists as as result of last invoke to "Prepare" call. + Invoke "Commit" or "Rollback" on the pending batch before invoking "Prepare" function`} + } + expectedBlockNum := s.nextBlockNum() + if expectedBlockNum != blockNum { + return &ErrIllegalArgs{fmt.Sprintf("Expected block number=%d, recived block number=%d", expectedBlockNum, blockNum)} + } + + batch := leveldbhelper.NewUpdateBatch() + var key, value []byte + var err error + for _, txPvtData := range pvtData { + key = encodePK(blockNum, txPvtData.SeqInBlock) + if value, err = encodePvtRwSet(txPvtData.WriteSet); err != nil { + return err + } + logger.Debugf("Adding private data to batch blockNum=%d, tranNum=%d", blockNum, txPvtData.SeqInBlock) + batch.Put(key, value) + } + batch.Put(pendingCommitKey, emptyValue) + if err := s.db.WriteBatch(batch, true); err != nil { + return err + } + s.batchPending = true + return nil +} + +// Commit implements the function in the interface `Store` +func (s *store) Commit() error { + if !s.batchPending { + return &ErrIllegalCall{"No pending batch to commit"} + } + committingBlockNum := s.nextBlockNum() + logger.Debugf("Committing pvt data for block = %d", committingBlockNum) + batch := leveldbhelper.NewUpdateBatch() + batch.Delete(pendingCommitKey) + batch.Put(lastCommittedBlkkey, encodeBlockNum(committingBlockNum)) + if err := s.db.WriteBatch(batch, true); err != nil { + return err + } + s.batchPending = false + s.isEmpty = false + s.lastCommittedBlock = committingBlockNum + logger.Debugf("Committed pvt data for block = %d", committingBlockNum) + return nil +} + +// Rollback implements the function in the interface `Store` +func (s *store) Rollback() error { + var pendingBatchKeys []blkTranNumKey + var err error + if !s.batchPending { + return &ErrIllegalCall{"No pending batch to rollback"} + } + rollingbackBlockNum := s.nextBlockNum() + logger.Debugf("Rolling back pvt data for block = %d", rollingbackBlockNum) + + if pendingBatchKeys, err = s.retrievePendingBatchKeys(); err != nil { + return err + } + batch := leveldbhelper.NewUpdateBatch() + for _, key := range pendingBatchKeys { + batch.Delete(key) + } + batch.Delete(pendingCommitKey) + if err := s.db.WriteBatch(batch, true); err != nil { + return err + } + s.batchPending = false + logger.Debugf("Rolled back pvt data for block = %d", rollingbackBlockNum) + return nil +} + +// GetPvtDataByBlockNum implements the function in the interface `Store`. +// If the store is empty or the last committed block number is smaller then the +// requested block number, an 'ErrOutOfRange' is thrown +func (s *store) GetPvtDataByBlockNum(blockNum uint64, filter ledger.PvtNsCollFilter) ([]*ledger.TxPvtData, error) { + logger.Debugf("GetPvtDataByBlockNum(): blockNum=%d, filter=%#v", blockNum, filter) + if s.isEmpty { + return nil, &ErrOutOfRange{"The store is empty"} + } + + if blockNum > s.lastCommittedBlock { + return nil, &ErrOutOfRange{fmt.Sprintf("Last committed block=%d, block requested=%d", s.lastCommittedBlock, blockNum)} + } + var pvtData []*ledger.TxPvtData + startKey, endKey := getKeysForRangeScanByBlockNum(blockNum) + logger.Debugf("GetPvtDataByBlockNum(): startKey=%#v, endKey=%#v", startKey, endKey) + itr := s.db.GetIterator(startKey, endKey) + defer itr.Release() + + var pvtWSet *rwset.TxPvtReadWriteSet + var err error + for itr.Next() { + bNum, tNum := decodePK(itr.Key()) + if pvtWSet, err = decodePvtRwSet(itr.Value()); err != nil { + return nil, err + } + logger.Debugf("Retrieving pvtdata for bNum=%d, tNum=%d", bNum, tNum) + filteredWSet := trimPvtWSet(pvtWSet, filter) + pvtData = append(pvtData, &ledger.TxPvtData{SeqInBlock: tNum, WriteSet: filteredWSet}) + } + return pvtData, nil +} + +// LastCommittedBlock implements the function in the interface `Store` +func (s *store) LastCommittedBlock() (uint64, error) { + return s.lastCommittedBlock, nil +} + +// HasPendingBatch implements the function in the interface `Store` +func (s *store) HasPendingBatch() (bool, error) { + return s.batchPending, nil +} + +// IsEmpty implements the function in the interface `Store` +func (s *store) IsEmpty() (bool, error) { + return s.isEmpty, nil +} + +// Shutdown implements the function in the interface `Store` +func (s *store) Shutdown() { + // do nothing +} + +func (s *store) nextBlockNum() uint64 { + if s.isEmpty { + return 0 + } + return s.lastCommittedBlock + 1 +} + +func (s *store) retrievePendingBatchKeys() ([]blkTranNumKey, error) { + var pendingBatchKeys []blkTranNumKey + itr := s.db.GetIterator(encodePK(s.nextBlockNum(), 0), nil) + for itr.Next() { + pendingBatchKeys = append(pendingBatchKeys, itr.Key()) + } + return pendingBatchKeys, nil +} + +func (s *store) hasPendingCommit() (bool, error) { + var v []byte + var err error + if v, err = s.db.Get(pendingCommitKey); err != nil { + return false, err + } + return v != nil, nil +} + +func (s *store) getLastCommittedBlockNum() (bool, uint64, error) { + var v []byte + var err error + if v, err = s.db.Get(lastCommittedBlkkey); v == nil || err != nil { + return true, 0, err + } + return false, decodeBlockNum(v), nil +} + +func trimPvtWSet(pvtWSet *rwset.TxPvtReadWriteSet, filter ledger.PvtNsCollFilter) *rwset.TxPvtReadWriteSet { + if filter == nil { + return pvtWSet + } + + var filteredNsRwSet []*rwset.NsPvtReadWriteSet + for _, ns := range pvtWSet.NsPvtRwset { + var filteredCollRwSet []*rwset.CollectionPvtReadWriteSet + for _, coll := range ns.CollectionPvtRwset { + if filter.Has(ns.Namespace, coll.CollectionName) { + filteredCollRwSet = append(filteredCollRwSet, coll) + } + } + if filteredCollRwSet != nil { + filteredNsRwSet = append(filteredNsRwSet, + &rwset.NsPvtReadWriteSet{ + Namespace: ns.Namespace, + CollectionPvtRwset: filteredCollRwSet, + }, + ) + } + } + var filteredTxPvtRwSet *rwset.TxPvtReadWriteSet + if filteredNsRwSet != nil { + filteredTxPvtRwSet = &rwset.TxPvtReadWriteSet{ + DataModel: pvtWSet.GetDataModel(), + NsPvtRwset: filteredNsRwSet, + } + } + return filteredTxPvtRwSet +} diff --git a/core/ledger/pvtdatastorage/store_impl_test.go b/core/ledger/pvtdatastorage/store_impl_test.go new file mode 100644 index 00000000000..2aa4848fff0 --- /dev/null +++ b/core/ledger/pvtdatastorage/store_impl_test.go @@ -0,0 +1,150 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package pvtdatastorage + +import ( + "os" + "testing" + + "github.com/hyperledger/fabric/common/flogging" + "github.com/hyperledger/fabric/core/ledger" + "github.com/hyperledger/fabric/protos/ledger/rwset" + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" +) + +func TestMain(m *testing.M) { + flogging.SetModuleLevel("pvtdatastorage", "debug") + viper.Set("peer.fileSystemPath", "/tmp/fabric/core/ledger/pvtdatastorage") + os.Exit(m.Run()) +} + +func TestEmptyStore(t *testing.T) { + env := NewTestStoreEnv(t) + defer env.Cleanup() + assert := assert.New(t) + store := env.TestStore + testEmpty(true, assert, store) + testPendingBatch(false, assert, store) +} + +func TestStoreBasicCommitAndRetrieval(t *testing.T) { + env := NewTestStoreEnv(t) + defer env.Cleanup() + assert := assert.New(t) + store := env.TestStore + testData := samplePvtData(t, []uint64{2, 4}) + + // no pvt data with block 0 + assert.NoError(store.Prepare(0, nil)) + assert.NoError(store.Commit()) + + // pvt data with block 1 - commit + assert.NoError(store.Prepare(1, testData)) + assert.NoError(store.Commit()) + + // pvt data with block 2 - rollback + assert.NoError(store.Prepare(2, testData)) + assert.NoError(store.Rollback()) + + // pvt data retrieval for block 0 should return nil + var nilFilter ledger.PvtNsCollFilter + retrievedData, err := store.GetPvtDataByBlockNum(0, nilFilter) + assert.NoError(err) + assert.Nil(retrievedData) + + // pvt data retrieval for block 1 should return full pvtdata + retrievedData, err = store.GetPvtDataByBlockNum(1, nilFilter) + assert.NoError(err) + assert.Equal(testData, retrievedData) + + // pvt data retrieval for block 1 with filter should return filtered pvtdata + filter := ledger.NewPvtNsCollFilter() + filter.Add("ns-1", "coll-1") + filter.Add("ns-2", "coll-2") + retrievedData, err = store.GetPvtDataByBlockNum(1, filter) + assert.Equal(1, len(retrievedData[0].WriteSet.NsPvtRwset[0].CollectionPvtRwset)) + assert.Equal(1, len(retrievedData[0].WriteSet.NsPvtRwset[1].CollectionPvtRwset)) + assert.True(retrievedData[0].Has("ns-1", "coll-1")) + assert.True(retrievedData[0].Has("ns-2", "coll-2")) + + // pvt data retrieval for block 2 should return ErrOutOfRange + retrievedData, err = store.GetPvtDataByBlockNum(2, nilFilter) + _, ok := err.(*ErrOutOfRange) + assert.True(ok) + assert.Nil(retrievedData) +} + +func TestStoreState(t *testing.T) { + env := NewTestStoreEnv(t) + defer env.Cleanup() + assert := assert.New(t) + store := env.TestStore + testData := samplePvtData(t, []uint64{0}) + + _, ok := store.Prepare(1, testData).(*ErrIllegalArgs) + assert.True(ok) + + assert.Nil(store.Prepare(0, testData)) + assert.NoError(store.Commit()) + + assert.Nil(store.Prepare(1, testData)) + _, ok = store.Prepare(2, testData).(*ErrIllegalCall) + assert.True(ok) +} + +// TODO Add tests for simulating a crash between calls `Prepare` and `Commit`/`Rollback` + +func testEmpty(expectedEmpty bool, assert *assert.Assertions, store Store) { + isEmpty, err := store.IsEmpty() + assert.NoError(err) + assert.Equal(expectedEmpty, isEmpty) +} + +func testPendingBatch(expectedPending bool, assert *assert.Assertions, store Store) { + hasPendingBatch, err := store.HasPendingBatch() + assert.NoError(err) + assert.Equal(expectedPending, hasPendingBatch) +} + +func samplePvtData(t *testing.T, txNums []uint64) []*ledger.TxPvtData { + pvtWriteSet := &rwset.TxPvtReadWriteSet{DataModel: rwset.TxReadWriteSet_KV} + pvtWriteSet.NsPvtRwset = []*rwset.NsPvtReadWriteSet{ + &rwset.NsPvtReadWriteSet{ + Namespace: "ns-1", + CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{ + &rwset.CollectionPvtReadWriteSet{ + CollectionName: "coll-1", + Rwset: []byte("RandomBytes-PvtRWSet-ns1-coll1"), + }, + &rwset.CollectionPvtReadWriteSet{ + CollectionName: "coll-2", + Rwset: []byte("RandomBytes-PvtRWSet-ns1-coll2"), + }, + }, + }, + + &rwset.NsPvtReadWriteSet{ + Namespace: "ns-2", + CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{ + &rwset.CollectionPvtReadWriteSet{ + CollectionName: "coll-1", + Rwset: []byte("RandomBytes-PvtRWSet-ns2-coll1"), + }, + &rwset.CollectionPvtReadWriteSet{ + CollectionName: "coll-2", + Rwset: []byte("RandomBytes-PvtRWSet-ns2-coll2"), + }, + }, + }, + } + var pvtData []*ledger.TxPvtData + for _, txNum := range txNums { + pvtData = append(pvtData, &ledger.TxPvtData{SeqInBlock: txNum, WriteSet: pvtWriteSet}) + } + return pvtData +} diff --git a/core/ledger/pvtdatastorage/test_exports.go b/core/ledger/pvtdatastorage/test_exports.go new file mode 100644 index 00000000000..e947de2dee8 --- /dev/null +++ b/core/ledger/pvtdatastorage/test_exports.go @@ -0,0 +1,46 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package pvtdatastorage + +import ( + "os" + "testing" + + "github.com/hyperledger/fabric/core/ledger/ledgerconfig" + "github.com/stretchr/testify/assert" +) + +// StoreEnv provides the store env for testing +type StoreEnv struct { + t testing.TB + TestStoreProvider Provider + TestStore Store +} + +// NewTestStoreEnv construct a StoreEnv for testing +func NewTestStoreEnv(t *testing.T) *StoreEnv { + removeStorePath(t) + assert := assert.New(t) + testStoreProvider := NewProvider() + testStore, err := testStoreProvider.OpenStore("TestStore") + assert.NoError(err) + return &StoreEnv{t, testStoreProvider, testStore} +} + +// Cleanup cleansup the store env after testing +func (env *StoreEnv) Cleanup() { + env.TestStoreProvider.Close() + removeStorePath(env.t) +} + +func removeStorePath(t testing.TB) { + dbPath := ledgerconfig.GetPvtdataStorePath() + if err := os.RemoveAll(dbPath); err != nil { + t.Fatalf("Err: %s", err) + t.FailNow() + } +}