Skip to content

Commit

Permalink
import collection configs from snapshot (hyperledger#1460)
Browse files Browse the repository at this point in the history
This PR builds the collection config store
for a ledger using the snapshot files.

FAB-18003

Signed-off-by: senthil <cendhu@gmail.com>
  • Loading branch information
cendhu authored and Sijo Cherian committed Jun 28, 2020
1 parent 625e69c commit 325b704
Show file tree
Hide file tree
Showing 4 changed files with 322 additions and 89 deletions.
13 changes: 13 additions & 0 deletions core/ledger/confighistory/db_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,19 @@ func (d *db) getNamespaceIterator(ns string) (*leveldbhelper.Iterator, error) {
return d.GetIterator(nsStartKey, nsEndKey)
}

func (d *db) isEmpty() (bool, error) {
itr, err := d.GetIterator(nil, nil)
if err != nil {
return false, err
}
defer itr.Release()
entryExist := itr.Next()
if err := itr.Error(); err != nil {
return false, errors.WithMessagef(err, "internal leveldb error while obtaining next entry from iterator")
}
return !entryExist, nil
}

func encodeCompositeKey(ns, key string, blockNum uint64) []byte {
b := []byte(keyPrefix + ns)
b = append(b, separatorByte)
Expand Down
39 changes: 39 additions & 0 deletions core/ledger/confighistory/db_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,46 @@ func TestGetNamespaceIterator(t *testing.T) {
require.EqualError(t, err, "internal leveldb error while obtaining db iterator: leveldb: closed")
require.Nil(t, itr)
})
}

func TestIsNotEmpty(t *testing.T) {
testDBPath := "/tmp/fabric/core/ledger/confighistory"
deleteTestPath(t, testDBPath)
provider, err := newDBProvider(testDBPath)
require.NoError(t, err)
defer deleteTestPath(t, testDBPath)

db := provider.getDB("ledger1")

t.Run("db is empty", func(t *testing.T) {
empty, err := db.isEmpty()
require.NoError(t, err)
require.True(t, empty)
})

t.Run("db is not empty", func(t *testing.T) {
sampleData := []*compositeKV{
{
&compositeKey{
ns: "ns1",
key: "key1",
blockNum: 40,
},
[]byte("val1_40"),
},
}
populateDBWithSampleData(t, db, sampleData)
empty, err := db.isEmpty()
require.NoError(t, err)
require.False(t, empty)
})

t.Run("iter error", func(t *testing.T) {
provider.Close()
empty, err := db.isEmpty()
require.EqualError(t, err, "internal leveldb error while obtaining db iterator: leveldb: closed")
require.False(t, empty)
})
}

func verifyNsEntries(t *testing.T, nsItr *leveldbhelper.Iterator, expectedEntries []*compositeKV) {
Expand Down
56 changes: 55 additions & 1 deletion core/ledger/confighistory/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ import (
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/snapshot"
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
"github.com/hyperledger/fabric/core/ledger"
"github.com/pkg/errors"
)

var logger = flogging.MustGetLogger("confighistory")
var (
logger = flogging.MustGetLogger("confighistory")
importConfigsBatchSize = 1024 * 1024
)

const (
collectionConfigNamespace = "lscc" // lscc namespace was introduced in version 1.2 and we continue to use this in order to be compatible with existing data
Expand Down Expand Up @@ -104,6 +108,56 @@ func (m *Mgr) HandleStateUpdates(trigger *ledger.StateUpdateTrigger) error {
return dbHandle.writeBatch(batch, true)
}

func (m *Mgr) ImportConfigHistory(ledgerID string, dir string) error {
db := m.dbProvider.getDB(ledgerID)
empty, err := db.isEmpty()
if err != nil {
return err
}
if !empty {
return errors.New(fmt.Sprintf(
"config history for ledger [%s] exists. Incremental import is not supported. "+
"Remove the existing ledger data before retry",
ledgerID,
))
}

configMetadata, err := snapshot.OpenFile(filepath.Join(dir, snapshotMetadataFileName), snapshotFileFormat)
if err != nil {
return err
}
numCollectionConfigs, err := configMetadata.DecodeUVarInt()
if err != nil {
return err
}
collectionConfigData, err := snapshot.OpenFile(filepath.Join(dir, snapshotDataFileName), snapshotFileFormat)
if err != nil {
return err
}

batch := leveldbhelper.NewUpdateBatch()
currentBatchSize := 0
for i := uint64(0); i < numCollectionConfigs; i++ {
key, err := collectionConfigData.DecodeBytes()
if err != nil {
return err
}
val, err := collectionConfigData.DecodeBytes()
if err != nil {
return err
}
batch.Put(key, val)
currentBatchSize += len(key) + len(val)
if currentBatchSize >= importConfigsBatchSize {
if err := db.WriteBatch(batch, true); err != nil {
return err
}
batch = leveldbhelper.NewUpdateBatch()
}
}
return db.WriteBatch(batch, true)
}

// GetRetriever returns an implementation of `ledger.ConfigHistoryRetriever` for the given ledger id.
func (m *Mgr) GetRetriever(ledgerID string, ledgerInfoRetriever LedgerInfoRetriever) *Retriever {
return &Retriever{
Expand Down
Loading

0 comments on commit 325b704

Please sign in to comment.