Skip to content

Commit

Permalink
[FAB-5638] SideDB - ledger storage
Browse files Browse the repository at this point in the history
This CR introduces ledger storage which maintains consistency
in the underlying block storage and pvtdata store by ensuring
an atomic operaton for writing blocks (to block storage) and
writing the pvtdata (to pvtdata store)

Change-Id: Ifb21f14d401eb10233db72c1473ba56f44bc119d
Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi committed Aug 8, 2017
1 parent 5d47989 commit 6e9e042
Show file tree
Hide file tree
Showing 5 changed files with 367 additions and 5 deletions.
39 changes: 39 additions & 0 deletions core/ledger/ledgerstorage/pkg_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
Copyright IBM Corp. 2017 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ledgerstorage

import (
"os"
"testing"

"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
)

type testEnv struct {
t testing.TB
}

func newTestEnv(t *testing.T) *testEnv {
testEnv := &testEnv{t}
testEnv.cleanup()
return testEnv
}

func (env *testEnv) cleanup() {
path := ledgerconfig.GetRootPath()
os.RemoveAll(path)
}
185 changes: 185 additions & 0 deletions core/ledger/ledgerstorage/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
Copyright IBM Corp. 2017 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ledgerstorage

import (
"fmt"
"sync"

"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/blkstorage/fsblkstorage"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/core/ledger/pvtdatastorage"
"github.com/hyperledger/fabric/protos/common"
)

// Provider encapusaltes two providers 1) block store provider and 2) and pvt data store provider
type Provider struct {
blkStoreProvider blkstorage.BlockStoreProvider
pvtdataStoreProvider pvtdatastorage.Provider
}

// Store encapsulates two stores 1) block store and pvt data store
type Store struct {
blkstorage.BlockStore
pvtdataStore pvtdatastorage.Store
rwlock *sync.RWMutex
}

// NewProvider returns the handle to the provider
func NewProvider() *Provider {
// Initialize the block storage
attrsToIndex := []blkstorage.IndexableAttr{
blkstorage.IndexableAttrBlockHash,
blkstorage.IndexableAttrBlockNum,
blkstorage.IndexableAttrTxID,
blkstorage.IndexableAttrBlockNumTranNum,
blkstorage.IndexableAttrBlockTxID,
blkstorage.IndexableAttrTxValidationCode,
}
indexConfig := &blkstorage.IndexConfig{AttrsToIndex: attrsToIndex}
blockStoreProvider := fsblkstorage.NewProvider(
fsblkstorage.NewConf(ledgerconfig.GetBlockStorePath(), ledgerconfig.GetMaxBlockfileSize()),
indexConfig)

pvtStoreProvider := pvtdatastorage.NewProvider()
return &Provider{blockStoreProvider, pvtStoreProvider}
}

// Open opens the store
func (p *Provider) Open(ledgerid string) (*Store, error) {
var blockStore blkstorage.BlockStore
var pvtdataStore pvtdatastorage.Store
var err error
if blockStore, err = p.blkStoreProvider.OpenBlockStore(ledgerid); err != nil {
return nil, err
}
if pvtdataStore, err = p.pvtdataStoreProvider.OpenStore(ledgerid); err != nil {
return nil, err
}
store := &Store{blockStore, pvtdataStore, &sync.RWMutex{}}
if err := store.init(); err != nil {
return nil, err
}
return store, nil
}

// Close closes the provider
func (p *Provider) Close() {
p.blkStoreProvider.Close()
p.pvtdataStoreProvider.Close()
}

// CommitWithPvtData commits the block and the corresponding pvt data in an atomic operation
func (s *Store) CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error {
s.rwlock.Lock()
defer s.rwlock.Unlock()
var pvtdata []*ledger.TxPvtData
for _, v := range blockAndPvtdata.BlockPvtData {
pvtdata = append(pvtdata, v)
}
if err := s.pvtdataStore.Prepare(blockAndPvtdata.Block.Header.Number, pvtdata); err != nil {
return err
}
if err := s.AddBlock(blockAndPvtdata.Block); err != nil {
s.pvtdataStore.Rollback()
return err
}
return s.pvtdataStore.Commit()
}

// GetPvtDataAndBlockByNum returns the block and the corresponding pvt data.
// The pvt data is filtered by the list of 'collections' supplied
func (s *Store) GetPvtDataAndBlockByNum(blockNum uint64, filter ledger.PvtNsCollFilter) (*ledger.BlockAndPvtData, error) {
s.rwlock.RLock()
defer s.rwlock.RUnlock()

var block *common.Block
var pvtdata []*ledger.TxPvtData
var err error
if block, err = s.RetrieveBlockByNumber(blockNum); err != nil {
return nil, err
}
if pvtdata, err = s.GetPvtDataByNum(blockNum, filter); err != nil {
return nil, err
}
return &ledger.BlockAndPvtData{Block: block, BlockPvtData: constructPvtdataMap(pvtdata)}, nil
}

// GetPvtDataByNum returns only the pvt data corresponding to the given block number
// The pvt data is filtered by the list of 'ns/collections' supplied in the filter
// A nil filter does not filter any results
func (s *Store) GetPvtDataByNum(blockNum uint64, filter ledger.PvtNsCollFilter) ([]*ledger.TxPvtData, error) {
s.rwlock.RLock()
defer s.rwlock.RUnlock()

var pvtdata []*ledger.TxPvtData
var err error
if pvtdata, err = s.pvtdataStore.GetPvtDataByBlockNum(blockNum, filter); err != nil {
return nil, err
}
return pvtdata, nil
}

// init checks whether the block storage and pvt data store are in sync
// this is called when the store instance is constructed and handed over for the use.
// this check whether there is a pending batch (possibly from a previous system crash)
// of pvt data that was not committed. If a pending batch exists, the check is made
// whether the associated block was successfully committed in the block storage (before the crash)
// or not. If the block was committed, the private data batch is committed
// otherwise, the pvt data batch is rolledback
func (s *Store) init() error {
var pendingPvtbatch bool
var err error
if pendingPvtbatch, err = s.pvtdataStore.HasPendingBatch(); err != nil {
return err
}
if !pendingPvtbatch {
return nil
}
var bcInfo *common.BlockchainInfo
var pvtdataStoreHt uint64

if bcInfo, err = s.GetBlockchainInfo(); err != nil {
return err
}
if pvtdataStoreHt, err = s.pvtdataStore.LastCommittedBlockHeight(); err != nil {
return err
}

if bcInfo.Height == pvtdataStoreHt {
return s.pvtdataStore.Rollback()
}

if bcInfo.Height == pvtdataStoreHt+1 {
return s.pvtdataStore.Commit()
}

return fmt.Errorf("This is not expected. blockStoreHeight=%d, pvtdataStoreHeight=%d", bcInfo.Height, pvtdataStoreHt)
}

func constructPvtdataMap(pvtdata []*ledger.TxPvtData) map[uint64]*ledger.TxPvtData {
if pvtdata == nil {
return nil
}
m := make(map[uint64]*ledger.TxPvtData)
for _, pvtdatum := range pvtdata {
m[pvtdatum.SeqInBlock] = pvtdatum
}
return m
}
135 changes: 135 additions & 0 deletions core/ledger/ledgerstorage/store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
Copyright IBM Corp. 2017 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ledgerstorage

import (
"os"
"testing"

"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/protos/ledger/rwset"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
)

func TestMain(m *testing.M) {
flogging.SetModuleLevel("ledgerstorage", "debug")
flogging.SetModuleLevel("pvtdatastorage", "debug")
viper.Set("peer.fileSystemPath", "/tmp/fabric/core/ledger/ledgerstorage")
os.Exit(m.Run())
}

func TestStore(t *testing.T) {
testEnv := newTestEnv(t)
defer testEnv.cleanup()
provider := NewProvider()
defer provider.Close()
store, err := provider.Open("testLedger")
defer store.Shutdown()

assert.NoError(t, err)
sampleData := sampleData(t)
for _, sampleDatum := range sampleData {
assert.NoError(t, store.CommitWithPvtData(sampleDatum))
}

// block 1 has no pvt data
pvtdata, err := store.GetPvtDataByNum(1, nil)
assert.NoError(t, err)
assert.Nil(t, pvtdata)

// block 4 has no pvt data
pvtdata, err = store.GetPvtDataByNum(4, nil)
assert.NoError(t, err)
assert.Nil(t, pvtdata)

// block 2 has pvt data for tx 3 and 5 only
pvtdata, err = store.GetPvtDataByNum(2, nil)
assert.NoError(t, err)
assert.Equal(t, 2, len(pvtdata))
assert.Equal(t, uint64(3), pvtdata[0].SeqInBlock)
assert.Equal(t, uint64(5), pvtdata[1].SeqInBlock)

// block 3 has pvt data for tx 4 and 6 only
pvtdata, err = store.GetPvtDataByNum(3, nil)
assert.NoError(t, err)
assert.Equal(t, 2, len(pvtdata))
assert.Equal(t, uint64(4), pvtdata[0].SeqInBlock)
assert.Equal(t, uint64(6), pvtdata[1].SeqInBlock)

blockAndPvtdata, err := store.GetPvtDataAndBlockByNum(2, nil)
assert.NoError(t, err)
assert.Equal(t, sampleData[2], blockAndPvtdata)

blockAndPvtdata, err = store.GetPvtDataAndBlockByNum(3, nil)
assert.NoError(t, err)
assert.Equal(t, sampleData[3], blockAndPvtdata)

// pvt data retrieval for block 3 with filter should return filtered pvtdata
filter := ledger.NewPvtNsCollFilter()
filter.Add("ns-1", "coll-1")
blockAndPvtdata, err = store.GetPvtDataAndBlockByNum(3, filter)
assert.NoError(t, err)
assert.Equal(t, sampleData[3].Block, blockAndPvtdata.Block)
// two transactions should be present
assert.Equal(t, 2, len(blockAndPvtdata.BlockPvtData))
// both tran number 4 and 6 should have only one collection because of filter
assert.Equal(t, 1, len(blockAndPvtdata.BlockPvtData[4].WriteSet.NsPvtRwset))
assert.Equal(t, 1, len(blockAndPvtdata.BlockPvtData[6].WriteSet.NsPvtRwset))
// any other transaction entry should be nil
assert.Nil(t, blockAndPvtdata.BlockPvtData[2])
}

func sampleData(t *testing.T) []*ledger.BlockAndPvtData {
var blockAndpvtdata []*ledger.BlockAndPvtData
blocks := testutil.ConstructTestBlocks(t, 10)
for i := 0; i < 10; i++ {
blockAndpvtdata = append(blockAndpvtdata, &ledger.BlockAndPvtData{Block: blocks[i]})
}
// txNum 3, 5 in block 2 has pvtdata
blockAndpvtdata[2].BlockPvtData = samplePvtData(t, []uint64{3, 5})
// txNum 4, 6 in block 3 has pvtdata
blockAndpvtdata[3].BlockPvtData = samplePvtData(t, []uint64{4, 6})

return blockAndpvtdata
}

func samplePvtData(t *testing.T, txNums []uint64) map[uint64]*ledger.TxPvtData {
pvtWriteSet := &rwset.TxPvtReadWriteSet{DataModel: rwset.TxReadWriteSet_KV}
pvtWriteSet.NsPvtRwset = []*rwset.NsPvtReadWriteSet{
&rwset.NsPvtReadWriteSet{
Namespace: "ns-1",
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
&rwset.CollectionPvtReadWriteSet{
CollectionName: "coll-1",
Rwset: []byte("RandomBytes-PvtRWSet-ns1-coll1"),
},
&rwset.CollectionPvtReadWriteSet{
CollectionName: "coll-2",
Rwset: []byte("RandomBytes-PvtRWSet-ns1-coll2"),
},
},
},
}
var pvtData []*ledger.TxPvtData
for _, txNum := range txNums {
pvtData = append(pvtData, &ledger.TxPvtData{SeqInBlock: txNum, WriteSet: pvtWriteSet})
}
return constructPvtdataMap(pvtData)
}
4 changes: 2 additions & 2 deletions core/ledger/pvtdatastorage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type Store interface {
Rollback() error
// IsEmpty returns true if the store does not have any block committed yet
IsEmpty() (bool, error)
// LastCommittedBlock returns the last committed blocknum
LastCommittedBlock() (uint64, error)
// LastCommittedBlockHeight returns the height of the last committed block
LastCommittedBlockHeight() (uint64, error)
// HasPendingBatch returns if the store has a pending batch
HasPendingBatch() (bool, error)
// Shutdown stops the store
Expand Down
9 changes: 6 additions & 3 deletions core/ledger/pvtdatastorage/store_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,12 @@ func (s *store) GetPvtDataByBlockNum(blockNum uint64, filter ledger.PvtNsCollFil
return pvtData, nil
}

// LastCommittedBlock implements the function in the interface `Store`
func (s *store) LastCommittedBlock() (uint64, error) {
return s.lastCommittedBlock, nil
// LastCommittedBlockHeight implements the function in the interface `Store`
func (s *store) LastCommittedBlockHeight() (uint64, error) {
if s.isEmpty {
return 0, nil
}
return s.lastCommittedBlock + 1, nil
}

// HasPendingBatch implements the function in the interface `Store`
Expand Down

0 comments on commit 6e9e042

Please sign in to comment.