Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FAB-17992] Remove ledger blockstore data for a channel #1423

Merged
merged 1 commit into from
Jun 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions common/ledger/blkstorage/blockstore_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,29 @@ func (p *BlockStoreProvider) Exists(ledgerid string) (bool, error) {
return exists, err
}

// Remove block index and blocks for the given ledgerid (channelID). It is not an error if the channel does not exist.
// This function is not error safe. If this function returns an error or a crash takes place, it is highly likely
// that the data for this ledger is left in an inconsistent state. Opening the ledger again or reusing the previously
// opened ledger can show unknown behavior.
func (p *BlockStoreProvider) Remove(ledgerid string) error {
wenjianqiao marked this conversation as resolved.
Show resolved Hide resolved
exists, err := p.Exists(ledgerid)
if err != nil {
return err
}
if !exists {
return nil
}
dbHandle := p.leveldbProvider.GetDBHandle(ledgerid)
if err := dbHandle.DeleteAll(); err != nil {
return err
}
dbHandle.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. Can be deferred after getting the handle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dbHandle.Close() removes dbHandle from provider's dbHandles map, so it is only called after DeleteAll is successful.

if err := os.RemoveAll(p.conf.getLedgerBlockDir(ledgerid)); err != nil {
return err
}
return syncDir(p.conf.getChainsDir())
}

// List lists the ids of the existing ledgers
func (p *BlockStoreProvider) List() ([]string, error) {
return util.ListSubdirs(p.conf.getChainsDir())
Expand Down
73 changes: 63 additions & 10 deletions common/ledger/blkstorage/blockstore_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,9 @@ func TestMultipleBlockStores(t *testing.T) {
require.NoError(t, err)
defer store2.Shutdown()

blocks1 := testutil.ConstructTestBlocks(t, 5)
for _, b := range blocks1 {
err := store1.AddBlock(b)
require.NoError(t, err)
}
blocks1 := addBlocksToStore(t, store1, 5)
blocks2 := addBlocksToStore(t, store2, 10)

blocks2 := testutil.ConstructTestBlocks(t, 10)
for _, b := range blocks2 {
err := store2.AddBlock(b)
require.NoError(t, err)
}
checkBlocks(t, blocks1, store1)
checkBlocks(t, blocks2, store2)
checkWithWrongInputs(t, store1, 5)
Expand Down Expand Up @@ -94,6 +86,15 @@ func TestMultipleBlockStores(t *testing.T) {
checkWithWrongInputs(t, newstore2, 10)
}

func addBlocksToStore(t *testing.T, store *BlockStore, numBlocks int) []*common.Block {
blocks := testutil.ConstructTestBlocks(t, numBlocks)
for _, b := range blocks {
err := store.AddBlock(b)
require.NoError(t, err)
}
return blocks
}

func checkBlocks(t *testing.T, expectedBlocks []*common.Block, store *BlockStore) {
bcInfo, _ := store.GetBlockchainInfo()
require.Equal(t, uint64(len(expectedBlocks)), bcInfo.Height)
Expand Down Expand Up @@ -192,6 +193,58 @@ func TestBlockStoreProvider(t *testing.T) {

}

func TestRemove(t *testing.T) {
wenjianqiao marked this conversation as resolved.
Show resolved Hide resolved
env := newTestEnv(t, NewConf(testPath(), 0))
defer env.Cleanup()

provider := env.provider
store1, err := provider.Open("ledger1")
require.NoError(t, err)
defer store1.Shutdown()
store2, err := provider.Open("ledger2")
require.NoError(t, err)
defer store2.Shutdown()

blocks1 := addBlocksToStore(t, store1, 5)
blocks2 := addBlocksToStore(t, store2, 10)

checkBlocks(t, blocks1, store1)
checkBlocks(t, blocks2, store2)
storeNames, err := provider.List()
require.NoError(t, err)
require.ElementsMatch(t, storeNames, []string{"ledger1", "ledger2"})

require.NoError(t, provider.Remove("ledger1"))

// verify ledger1 block dir and block indexes are deleted
exists, err := provider.Exists("ledger1")
require.NoError(t, err)
require.False(t, exists)
itr := provider.leveldbProvider.GetDBHandle("ledger1").GetIterator(nil, nil)
defer itr.Release()
require.False(t, itr.Next())

// verify ledger2 ledger data are remained same
checkBlocks(t, blocks2, store2)
storeNames, err = provider.List()
require.NoError(t, err)
require.ElementsMatch(t, storeNames, []string{"ledger2"})

// remove again should return no error
require.NoError(t, provider.Remove("ledger1"))

// verify "ledger1" store can be opened again after remove, but it is an empty store
newstore1, err := provider.Open("ledger1")
require.NoError(t, err)
bcInfo, err := newstore1.GetBlockchainInfo()
require.NoError(t, err)
require.Equal(t, &common.BlockchainInfo{}, bcInfo)

// negative test
provider.Close()
require.EqualError(t, provider.Remove("ledger2"), "internal leveldb error while obtaining db iterator: leveldb: closed")
}

func constructLedgerid(id int) string {
return fmt.Sprintf("ledger_%d", id)
}
74 changes: 68 additions & 6 deletions common/ledger/util/leveldbhelper/leveldb_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,29 @@ import (
"sync"

"github.com/hyperledger/fabric/common/ledger/dataformat"
"github.com/pkg/errors"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
)

// internalDBName is used to keep track of data related to internals such as data format
// _ is used as name because this is not allowed as a channelname
const internalDBName = "_"
const (
// internalDBName is used to keep track of data related to internals such as data format
// _ is used as name because this is not allowed as a channelname
internalDBName = "_"
// maxBatchSize limits the memory usage (1MB) for a batch. It is measured by the total number of bytes
// of all the keys in a batch.
maxBatchSize = 1000000
)

var (
dbNameKeySep = []byte{0x00}
lastKeyIndicator = byte(0x01)
formatVersionKey = []byte{'f'} // a single key in db whose value indicates the version of the data format
)

// closeFunc closes the db handle
type closeFunc func()

// Conf configuration for `Provider`
//
// `ExpectedFormat` is the expected value of the format key in the internal database.
Expand Down Expand Up @@ -117,7 +126,12 @@ func (p *Provider) GetDBHandle(dbName string) *DBHandle {
defer p.mux.Unlock()
dbHandle := p.dbHandles[dbName]
if dbHandle == nil {
dbHandle = &DBHandle{dbName, p.db}
closeFunc := func() {
p.mux.Lock()
defer p.mux.Unlock()
delete(p.dbHandles, dbName)
}
dbHandle = &DBHandle{dbName, p.db, closeFunc}
p.dbHandles[dbName] = dbHandle
}
return dbHandle
Expand All @@ -130,8 +144,9 @@ func (p *Provider) Close() {

// DBHandle is an handle to a named db
type DBHandle struct {
dbName string
db *DB
dbName string
db *DB
closeFunc closeFunc
}

// Get returns the value for the given key
Expand All @@ -149,6 +164,46 @@ func (h *DBHandle) Delete(key []byte, sync bool) error {
return h.db.Delete(constructLevelKey(h.dbName, key), sync)
}

// DeleteAll deletes all the keys that belong to the channel (dbName).
func (h *DBHandle) DeleteAll() error {
iter := h.GetIterator(nil, nil)
defer iter.Release()
if err := iter.Error(); err != nil {
return errors.Wrap(err, "internal leveldb error while obtaining db iterator")
}

// use leveldb iterator directly to be more efficient
dbIter := iter.Iterator

// This is common code shared by all the leveldb instances. Because each leveldb has its own key size pattern,
// each batch is limited by memory usage instead of number of keys. Once the batch memory usage reaches maxBatchSize,
// the batch will be committed.
numKeys := 0
batchSize := 0
batch := &leveldb.Batch{}
for dbIter.Next() {
if err := dbIter.Error(); err != nil {
return errors.Wrap(err, "internal leveldb error while retrieving data from db iterator")
}
key := dbIter.Key()
numKeys++
batchSize = batchSize + len(key)
batch.Delete(key)
if batchSize >= maxBatchSize {
if err := h.db.WriteBatch(batch, true); err != nil {
return err
}
logger.Infof("Have removed %d entries for channel %s in leveldb %s", numKeys, h.dbName, h.db.conf.DBPath)
wenjianqiao marked this conversation as resolved.
Show resolved Hide resolved
batchSize = 0
batch = &leveldb.Batch{}
}
}
if batch.Len() > 0 {
return h.db.WriteBatch(batch, true)
}
return nil
}

// WriteBatch writes a batch in an atomic way
func (h *DBHandle) WriteBatch(batch *UpdateBatch, sync bool) error {
if len(batch.KVs) == 0 {
Expand Down Expand Up @@ -183,6 +238,13 @@ func (h *DBHandle) GetIterator(startKey []byte, endKey []byte) *Iterator {
return &Iterator{h.dbName, h.db.GetIterator(sKey, eKey)}
}

// Close closes the DBHandle after its db data have been deleted
func (h *DBHandle) Close() {
if h.closeFunc != nil {
h.closeFunc()
}
}

// UpdateBatch encloses the details of multiple `updates`
type UpdateBatch struct {
KVs map[string][]byte
Expand Down
Loading