Skip to content

Commit

Permalink
Generate statedb snapshot files
Browse files Browse the repository at this point in the history
This commit introduces a function in the statedb that exports the
public state and private state hashes from statedb into a separate
set of snapshot files in a deterministic format.

FAB-17902

Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi authored and denyeart committed May 20, 2020
1 parent 6e243ad commit 4118e9f
Show file tree
Hide file tree
Showing 9 changed files with 522 additions and 1 deletion.
5 changes: 4 additions & 1 deletion common/ledger/snapshot/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

// FileWriter creates a new file for ledger snapshot. This is expected to be used by various
// components of ledger, such as blockstorage and statedb for exporting the relevent snapshot data
// components of ledger, such as blockstorage and statedb for exporting the relevant snapshot data
type FileWriter struct {
file *os.File
hasher hash.Hash
Expand Down Expand Up @@ -191,6 +191,9 @@ func (r *FileReader) decodeBytes() ([]byte, error) {
return nil, err
}
size := int(sizeUint)
if size == 0 {
return []byte{}, nil
}
if len(r.reusableByteSlice) < size {
r.reusableByteSlice = make([]byte, size)
}
Expand Down
10 changes: 10 additions & 0 deletions common/ledger/snapshot/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestFileCreateAndRead(t *testing.T) {
require.NoError(t, err)
require.NoError(t, fileCreator.EncodeString("Hi there"))
require.NoError(t, fileCreator.EncodeString("How are you?"))
require.NoError(t, fileCreator.EncodeString("")) // zreo length string
require.NoError(t, fileCreator.EncodeUVarint(uint64(25)))
require.NoError(t, fileCreator.EncodeProtoMessage(
&common.BlockchainInfo{
Expand All @@ -40,6 +41,7 @@ func TestFileCreateAndRead(t *testing.T) {
},
))
require.NoError(t, fileCreator.EncodeBytes([]byte("some junk bytes")))
require.NoError(t, fileCreator.EncodeBytes([]byte{})) // zreo length slice

// Done and verify the returned hash
dataHash, err := fileCreator.Done()
Expand All @@ -62,6 +64,10 @@ func TestFileCreateAndRead(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "How are you?", str)

str, err = fileReader.DecodeString()
require.NoError(t, err)
require.Equal(t, "", str)

number, err := fileReader.DecodeUVarInt()
require.NoError(t, err)
require.Equal(t, uint64(25), number)
Expand All @@ -80,6 +86,10 @@ func TestFileCreateAndRead(t *testing.T) {
b, err := fileReader.DecodeBytes()
require.NoError(t, err)
require.Equal(t, []byte("some junk bytes"), b)

b, err = fileReader.DecodeBytes()
require.NoError(t, err)
require.Equal(t, []byte{}, b)
}

func TestFileCreatorErrorPropagation(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions core/ledger/kvledger/txmgmt/privacyenabledstate/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,14 @@ func deriveHashedDataNs(namespace, collection string) string {
return namespace + nsJoiner + hashDataPrefix + collection
}

func isPvtdataNs(namespace string) bool {
return strings.Contains(namespace, nsJoiner+pvtDataPrefix)
}

func isHashedDataNs(namespace string) bool {
return strings.Contains(namespace, nsJoiner+hashDataPrefix)
}

func addPvtUpdates(pubUpdateBatch *PubUpdateBatch, pvtUpdateBatch *PvtUpdateBatch) {
for ns, nsBatch := range pvtUpdateBatch.UpdateMap {
for _, coll := range nsBatch.GetCollectionNames() {
Expand Down
184 changes: 184 additions & 0 deletions core/ledger/kvledger/txmgmt/privacyenabledstate/snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package privacyenabledstate

import (
"hash"
"path"

"github.com/hyperledger/fabric/common/ledger/snapshot"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
)

const (
snapshotFileFormat = byte(1)
pubStateDataFileName = "public_state.data"
pubStateMetadataFileName = "public_state.metadata"
pvtStateHashesFileName = "private_state_hashes.data"
pvtStateHashesMetadataFileName = "private_state_hashes.metadata"
)

// ExportPubStateAndPvtStateHashes generates four files in the specified dir. The files, public_state.data and public_state.metadata
// contains the exported public state and the files private_state_hashes.data and private_state_hashes.data contain the exported private state hashes.
// The file format for public state and the private state hashes are the same. The data files contains a series of tuple <key,value> and the metadata
// files contains a series of tuple <namespace, num entries for the namespace in the data file>.
func (s *DB) ExportPubStateAndPvtStateHashes(dir string, newHasher func() hash.Hash) (map[string][]byte, error) {
itr, dbValueFormat, err := s.GetFullScanIterator(isPvtdataNs)
if err != nil {
return nil, err
}
defer itr.Close()

pubStateWriter, err := newSnapshotWriter(
path.Join(dir, pubStateDataFileName),
path.Join(dir, pubStateMetadataFileName),
dbValueFormat,
newHasher,
)
if err != nil {
return nil, err
}
defer pubStateWriter.close()

pvtStateHashesWriter, err := newSnapshotWriter(
path.Join(dir, pvtStateHashesFileName),
path.Join(dir, pvtStateHashesMetadataFileName),
dbValueFormat,
newHasher,
)
if err != nil {
return nil, err
}
defer pvtStateHashesWriter.close()

for {
compositeKey, dbValue, err := itr.Next()
if err != nil {
return nil, err
}
if compositeKey == nil {
break
}
switch {
case isHashedDataNs(compositeKey.Namespace):
if err := pvtStateHashesWriter.addData(compositeKey, dbValue); err != nil {
return nil, err
}
default:
if err := pubStateWriter.addData(compositeKey, dbValue); err != nil {
return nil, err
}
}
}
pubStateDataHash, pubStateMetadataHash, err := pubStateWriter.done()
if err != nil {
return nil, err
}
pvtStateHahshesDataHash, pvtStateHashesMetadataHash, err := pvtStateHashesWriter.done()
if err != nil {
return nil, err
}
return map[string][]byte{
pubStateDataFileName: pubStateDataHash,
pubStateMetadataFileName: pubStateMetadataHash,
pvtStateHashesFileName: pvtStateHahshesDataHash,
pvtStateHashesMetadataFileName: pvtStateHashesMetadataHash,
},
nil
}

// snapshotWriter generates two files, a data file and a metadata file. The datafile contains a series of tuples <key, dbValue>
// and the metadata file contains a series of tuples <namesapce, number-of-tuples-in-the-data-file-that-belong-to-this-namespace>
type snapshotWriter struct {
dataFile *snapshot.FileWriter
metadataFile *snapshot.FileWriter
kvCountsPerNamespace map[string]uint64
namespaceInsertionOrder []string
}

func newSnapshotWriter(
dataFilePath, metadataFilePath string,
dbValueFormat byte,
newHasher func() hash.Hash,
) (*snapshotWriter, error) {

var dataFile, metadataFile *snapshot.FileWriter
var err error
defer func() {
if err != nil {
dataFile.Close()
metadataFile.Close()
}
}()

dataFile, err = snapshot.CreateFile(dataFilePath, snapshotFileFormat, newHasher())
if err != nil {
return nil, err
}
if err = dataFile.EncodeBytes([]byte{dbValueFormat}); err != nil {
return nil, err
}

metadataFile, err = snapshot.CreateFile(metadataFilePath, snapshotFileFormat, newHasher())
if err != nil {
return nil, err
}
return &snapshotWriter{
dataFile: dataFile,
metadataFile: metadataFile,
kvCountsPerNamespace: map[string]uint64{},
},
nil
}

func (w *snapshotWriter) addData(ck *statedb.CompositeKey, dbValue []byte) error {
_, ok := w.kvCountsPerNamespace[ck.Namespace]
if !ok {
// new namespace begins
w.namespaceInsertionOrder = append(w.namespaceInsertionOrder, ck.Namespace)
}
w.kvCountsPerNamespace[ck.Namespace]++
if err := w.dataFile.EncodeString(ck.Key); err != nil {
return err
}
if err := w.dataFile.EncodeBytes(dbValue); err != nil {
return err
}
return nil
}

func (w *snapshotWriter) done() ([]byte, []byte, error) {
dataHash, err := w.dataFile.Done()
if err != nil {
return nil, nil, err
}

if err := w.metadataFile.EncodeUVarint(uint64(len(w.kvCountsPerNamespace))); err != nil {
return nil, nil, err
}
for _, ns := range w.namespaceInsertionOrder {
if err := w.metadataFile.EncodeString(ns); err != nil {
return nil, nil, err
}
if err := w.metadataFile.EncodeUVarint(w.kvCountsPerNamespace[ns]); err != nil {
return nil, nil, err
}
}
metadataHash, err := w.metadataFile.Done()
if err != nil {
return nil, nil, err
}
return dataHash, metadataHash, nil
}

func (w *snapshotWriter) close() {
if w == nil {
return
}
w.dataFile.Close()
w.metadataFile.Close()
}
Loading

0 comments on commit 4118e9f

Please sign in to comment.