Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bootstrap collection config history store from snapshot #1460

Merged
merged 1 commit into from
Jun 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions core/ledger/confighistory/db_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,19 @@ func (d *db) getNamespaceIterator(ns string) (*leveldbhelper.Iterator, error) {
return d.GetIterator(nsStartKey, nsEndKey)
}

func (d *db) isEmpty() (bool, error) {
itr, err := d.GetIterator(nil, nil)
if err != nil {
return false, err
}
defer itr.Release()
entryExist := itr.Next()
if err := itr.Error(); err != nil {
return false, errors.WithMessagef(err, "internal leveldb error while obtaining next entry from iterator")
}
return !entryExist, nil
}

func encodeCompositeKey(ns, key string, blockNum uint64) []byte {
b := []byte(keyPrefix + ns)
b = append(b, separatorByte)
Expand Down
39 changes: 39 additions & 0 deletions core/ledger/confighistory/db_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,46 @@ func TestGetNamespaceIterator(t *testing.T) {
require.EqualError(t, err, "internal leveldb error while obtaining db iterator: leveldb: closed")
require.Nil(t, itr)
})
}

func TestIsNotEmpty(t *testing.T) {
testDBPath := "/tmp/fabric/core/ledger/confighistory"
deleteTestPath(t, testDBPath)
provider, err := newDBProvider(testDBPath)
require.NoError(t, err)
defer deleteTestPath(t, testDBPath)

db := provider.getDB("ledger1")

t.Run("db is empty", func(t *testing.T) {
empty, err := db.isEmpty()
require.NoError(t, err)
require.True(t, empty)
})

t.Run("db is not empty", func(t *testing.T) {
sampleData := []*compositeKV{
{
&compositeKey{
ns: "ns1",
key: "key1",
blockNum: 40,
},
[]byte("val1_40"),
},
}
populateDBWithSampleData(t, db, sampleData)
empty, err := db.isEmpty()
require.NoError(t, err)
require.False(t, empty)
})

t.Run("iter error", func(t *testing.T) {
provider.Close()
empty, err := db.isEmpty()
require.EqualError(t, err, "internal leveldb error while obtaining db iterator: leveldb: closed")
require.False(t, empty)
})
}

func verifyNsEntries(t *testing.T, nsItr *leveldbhelper.Iterator, expectedEntries []*compositeKV) {
Expand Down
56 changes: 55 additions & 1 deletion core/ledger/confighistory/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ import (
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/snapshot"
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
"github.com/hyperledger/fabric/core/ledger"
"github.com/pkg/errors"
)

var logger = flogging.MustGetLogger("confighistory")
var (
logger = flogging.MustGetLogger("confighistory")
importConfigsBatchSize = 1024 * 1024
)

const (
collectionConfigNamespace = "lscc" // lscc namespace was introduced in version 1.2 and we continue to use this in order to be compatible with existing data
Expand Down Expand Up @@ -104,6 +108,56 @@ func (m *Mgr) HandleStateUpdates(trigger *ledger.StateUpdateTrigger) error {
return dbHandle.writeBatch(batch, true)
}

func (m *Mgr) ImportConfigHistory(ledgerID string, dir string) error {
db := m.dbProvider.getDB(ledgerID)
empty, err := db.isEmpty()
if err != nil {
return err
}
if !empty {
return errors.New(fmt.Sprintf(
"config history for ledger [%s] exists. Incremental import is not supported. "+
"Remove the existing ledger data before retry",
ledgerID,
))
}

configMetadata, err := snapshot.OpenFile(filepath.Join(dir, snapshotMetadataFileName), snapshotFileFormat)
if err != nil {
return err
}
numCollectionConfigs, err := configMetadata.DecodeUVarInt()
if err != nil {
return err
}
collectionConfigData, err := snapshot.OpenFile(filepath.Join(dir, snapshotDataFileName), snapshotFileFormat)
if err != nil {
return err
}

batch := leveldbhelper.NewUpdateBatch()
currentBatchSize := 0
for i := uint64(0); i < numCollectionConfigs; i++ {
key, err := collectionConfigData.DecodeBytes()
if err != nil {
return err
}
val, err := collectionConfigData.DecodeBytes()
if err != nil {
return err
}
batch.Put(key, val)
currentBatchSize += len(key) + len(val)
if currentBatchSize >= importConfigsBatchSize {
if err := db.WriteBatch(batch, true); err != nil {
return err
}
batch = leveldbhelper.NewUpdateBatch()
}
}
return db.WriteBatch(batch, true)
}

// GetRetriever returns an implementation of `ledger.ConfigHistoryRetriever` for the given ledger id.
func (m *Mgr) GetRetriever(ledgerID string, ledgerInfoRetriever LedgerInfoRetriever) *Retriever {
return &Retriever{
Expand Down
Loading