Skip to content

Commit

Permalink
[FAB-4976] Sidedb - pvtdata storage
Browse files Browse the repository at this point in the history
This CR implements a store for persisting the writesets produced
over the private data. From data perspective, this storage is
analogous to the block storage for the block data

Change-Id: I43b5349d3671bffa67f7975794e6f1937f99dde5
Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi committed Aug 8, 2017
1 parent ee12505 commit 5d47989
Show file tree
Hide file tree
Showing 8 changed files with 710 additions and 0 deletions.
61 changes: 61 additions & 0 deletions core/ledger/ledger_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package ledger
import (
commonledger "github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/ledger/rwset"
"github.com/hyperledger/fabric/protos/peer"
)

Expand Down Expand Up @@ -125,3 +126,63 @@ type TxSimulator interface {
// of information in different way in order to support different data-models or optimize the information representations.
GetTxSimulationResults() ([]byte, error)
}

// TxPvtData encapsulates the transaction number and pvt write-set for a transaction
type TxPvtData struct {
SeqInBlock uint64
WriteSet *rwset.TxPvtReadWriteSet
}

// BlockAndPvtData encapsultes the block and a map that contains the tuples <seqInBlock, *TxPvtData>
// The map is expected to contain the entries only for the transactions that has associated pvt data
type BlockAndPvtData struct {
Block *common.Block
BlockPvtData map[uint64]*TxPvtData
}

// PvtCollFilter represents the set of the collection names (as keys of the map with value 'true')
type PvtCollFilter map[string]bool

// PvtNsCollFilter specifies the tuple <namespace, PvtCollFilter>
type PvtNsCollFilter map[string]PvtCollFilter

// NewPvtNsCollFilter constructs an empty PvtNsCollFilter
func NewPvtNsCollFilter() PvtNsCollFilter {
return make(map[string]PvtCollFilter)
}

// Has returns true if the pvtdata includes the data for collection <ns,coll>
func (pvtdata *TxPvtData) Has(ns string, coll string) bool {
if pvtdata.WriteSet == nil {
return false
}
for _, nsdata := range pvtdata.WriteSet.NsPvtRwset {
if nsdata.Namespace == ns {
for _, colldata := range nsdata.CollectionPvtRwset {
if colldata.CollectionName == coll {
return true
}
}
}
}
return false
}

// Add adds a namespace-collection tuple to the filter
func (filter PvtNsCollFilter) Add(ns string, coll string) {
collFilter, ok := filter[ns]
if !ok {
collFilter = make(map[string]bool)
filter[ns] = collFilter
}
collFilter[coll] = true
}

// Has returns true if the filter has the entry for tuple namespace-collection
func (filter PvtNsCollFilter) Has(ns string, coll string) bool {
collFilter, ok := filter[ns]
if !ok {
return false
}
return collFilter[coll]
}
5 changes: 5 additions & 0 deletions core/ledger/ledgerconfig/ledger_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ func GetBlockStorePath() string {
return filepath.Join(GetRootPath(), "chains")
}

// GetPvtdataStorePath returns the filesystem path that is used for permanent storage of private write-sets
func GetPvtdataStorePath() string {
return filepath.Join(GetRootPath(), "pvtdataStore")
}

// GetMaxBlockfileSize returns maximum size of the block file
func GetMaxBlockfileSize() int {
return 64 * 1024 * 1024
Expand Down
52 changes: 52 additions & 0 deletions core/ledger/pkg_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package ledger

import (
"testing"

"github.com/hyperledger/fabric/protos/ledger/rwset"
"github.com/stretchr/testify/assert"
)

func TestTxPvtData(t *testing.T) {
txPvtData := &TxPvtData{}
assert.False(t, txPvtData.Has("ns", "coll"))

txPvtData.WriteSet = &rwset.TxPvtReadWriteSet{
DataModel: rwset.TxReadWriteSet_KV,
NsPvtRwset: []*rwset.NsPvtReadWriteSet{
&rwset.NsPvtReadWriteSet{
Namespace: "ns",
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
&rwset.CollectionPvtReadWriteSet{
CollectionName: "coll-1",
Rwset: []byte("RandomBytes-PvtRWSet-ns1-coll1"),
},
&rwset.CollectionPvtReadWriteSet{
CollectionName: "coll-2",
Rwset: []byte("RandomBytes-PvtRWSet-ns1-coll2"),
},
},
},
},
}

assert.True(t, txPvtData.Has("ns", "coll-1"))
assert.True(t, txPvtData.Has("ns", "coll-2"))
assert.False(t, txPvtData.Has("ns", "coll-3"))
assert.False(t, txPvtData.Has("ns1", "coll-1"))
}

func TestPvtNsCollFilter(t *testing.T) {
filter := NewPvtNsCollFilter()
filter.Add("ns", "coll-1")
filter.Add("ns", "coll-2")
assert.True(t, filter.Has("ns", "coll-1"))
assert.True(t, filter.Has("ns", "coll-2"))
assert.False(t, filter.Has("ns", "coll-3"))
assert.False(t, filter.Has("ns1", "coll-3"))
}
56 changes: 56 additions & 0 deletions core/ledger/pvtdatastorage/kv_encoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package pvtdatastorage

import (
"math"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/hyperledger/fabric/protos/ledger/rwset"
)

var (
pendingCommitKey = []byte{0}
lastCommittedBlkkey = []byte{1}
pvtDataKeyPrefix = []byte{2}

emptyValue = []byte{}
)

func encodePK(blockNum uint64, tranNum uint64) blkTranNumKey {
return append(pvtDataKeyPrefix, version.NewHeight(blockNum, tranNum).ToBytes()...)
}

func decodePK(key blkTranNumKey) (blockNum uint64, tranNum uint64) {
height, _ := version.NewHeightFromBytes(key[1:])
return height.BlockNum, height.TxNum
}

func getKeysForRangeScanByBlockNum(blockNum uint64) (startKey []byte, endKey []byte) {
startKey = encodePK(blockNum, 0)
endKey = encodePK(blockNum, math.MaxUint64)
return
}

func encodePvtRwSet(txPvtRwSet *rwset.TxPvtReadWriteSet) ([]byte, error) {
return proto.Marshal(txPvtRwSet)
}

func decodePvtRwSet(encodedBytes []byte) (*rwset.TxPvtReadWriteSet, error) {
writeset := &rwset.TxPvtReadWriteSet{}
return writeset, proto.Unmarshal(encodedBytes, writeset)
}

func encodeBlockNum(blockNum uint64) []byte {
return proto.EncodeVarint(blockNum)
}

func decodeBlockNum(blockNumBytes []byte) uint64 {
s, _ := proto.DecodeVarint(blockNumBytes)
return s
}
80 changes: 80 additions & 0 deletions core/ledger/pvtdatastorage/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package pvtdatastorage

import (
"github.com/hyperledger/fabric/core/ledger"
)

// Provider provides handle to specific 'Store' that in turn manages
// private write sets for a ledger
type Provider interface {
OpenStore(id string) (Store, error)
Close()
}

// Store manages the permanent storage of private write sets for a ledger
// Beacsue the pvt data is supposed to be in sync with the blocks in the
// ledger, both should logically happen in an atomic operation. In order
// to accomplish this, an implementation of this store should provide
// support for a two-phase like commit/rollback capability.
// The expected use is such that - first the private data will be given to
// this store (via `Prepare` funtion) and then the block is appended to the block storage.
// Finally, one of the functions `Commit` or `Rollback` is invoked on this store based
// on whether the block was written successfully or not. The store implementation
// is expected to survive a server crash between the call to `Prepare` and `Commit`/`Rollback`
type Store interface {
// GetPvtDataByBlockNum returns only the pvt data corresponding to the given block number
// The pvt data is filtered by the list of 'ns/collections' supplied in the filter
// A nil filter does not filter any results
GetPvtDataByBlockNum(blockNum uint64, filter ledger.PvtNsCollFilter) ([]*ledger.TxPvtData, error)
// Prepare prepares the Store for commiting the pvt data. This call does not commit the pvt data.
// Subsequently, the caller is expected to call either `Commit` or `Rollback` function.
// Return from this should ensure that enough preparation is done such that `Commit` function invoked afterwards
// can commit the data and the store is capable of surviving a crash between this function call and the next
// invoke to the `Commit`
Prepare(blockNum uint64, pvtData []*ledger.TxPvtData) error
// Commit commits the pvt data passed in the previous invoke to the `Prepare` function
Commit() error
// Rollback rolls back the pvt data passed in the previous invoke to the `Prepare` function
Rollback() error
// IsEmpty returns true if the store does not have any block committed yet
IsEmpty() (bool, error)
// LastCommittedBlock returns the last committed blocknum
LastCommittedBlock() (uint64, error)
// HasPendingBatch returns if the store has a pending batch
HasPendingBatch() (bool, error)
// Shutdown stops the store
Shutdown()
}

// ErrIllegalCall is to be thrown by a store impl if the store does not expect a call to Prepare/Commit/Rollback
type ErrIllegalCall struct {
msg string
}

func (err *ErrIllegalCall) Error() string {
return err.msg
}

// ErrIllegalArgs is to be thrown by a store impl if the args passed are not allowed
type ErrIllegalArgs struct {
msg string
}

func (err *ErrIllegalArgs) Error() string {
return err.msg
}

// ErrOutOfRange is to be thrown for the request for the data that is not yet committed
type ErrOutOfRange struct {
msg string
}

func (err *ErrOutOfRange) Error() string {
return err.msg
}
Loading

0 comments on commit 5d47989

Please sign in to comment.