Skip to content

Commit

Permalink
[FAB-17992] Add remove channel func to blockstorage provider
Browse files Browse the repository at this point in the history
Add a Remove function to block storage provider in oder to
remove ledger data for a channel. It creates a temporary file
to indicate the channel is to be removed and start a goroutine
to remove channel ledger data in background. If remove fails or
orderer is stopped before remove is done, upon ledger restart,
it checks the existence of the temporary file and complete remove
as needed.

Signed-off-by: Wenjian Qiao <wenjianq@gmail.com>
  • Loading branch information
wenjianqiao committed Jun 15, 2020
1 parent 0d5e664 commit 3866768
Show file tree
Hide file tree
Showing 10 changed files with 261 additions and 4 deletions.
97 changes: 94 additions & 3 deletions common/ledger/blkstorage/blockstore_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ SPDX-License-Identifier: Apache-2.0
package blkstorage

import (
"io/ioutil"
"os"
"strings"
"sync"

"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/dataformat"
Expand Down Expand Up @@ -60,6 +63,7 @@ type BlockStoreProvider struct {
indexConfig *IndexConfig
leveldbProvider *leveldbhelper.Provider
stats *stats
mutex sync.RWMutex
}

// NewProvider constructs a filesystem based block store provider
Expand Down Expand Up @@ -87,7 +91,15 @@ func NewProvider(conf *Conf, indexConfig *IndexConfig, metricsProvider metrics.P
}

stats := newStats(metricsProvider)
return &BlockStoreProvider{conf, indexConfig, p, stats}, nil
provider := &BlockStoreProvider{
conf: conf,
indexConfig: indexConfig,
leveldbProvider: p,
stats: stats,
}

go provider.completePendingRemoves()
return provider, nil
}

// Open opens a block store for given ledgerid.
Expand All @@ -101,12 +113,91 @@ func (p *BlockStoreProvider) Open(ledgerid string) (*BlockStore, error) {
// Exists tells whether the BlockStore with given id exists
func (p *BlockStoreProvider) Exists(ledgerid string) (bool, error) {
exists, _, err := util.FileExists(p.conf.getLedgerBlockDir(ledgerid))
return exists, err
if !exists || err != nil {
return false, err
}
toBeRemoved, _, err := util.FileExists(p.conf.getToBeRemovedFilePath(ledgerid))
if err != nil {
return false, err
}
return !toBeRemoved, nil
}

// List lists the ids of the existing ledgers
// A channel is filtered out if it has a temporary __toBeRemoved_ file.
func (p *BlockStoreProvider) List() ([]string, error) {
return util.ListSubdirs(p.conf.getChainsDir())
subdirs, err := util.ListSubdirs(p.conf.getChainsDir())
if err != nil {
return nil, err
}
channelNames := []string{}
for _, subdir := range subdirs {
toBeRemoved, _, err := util.FileExists(p.conf.getToBeRemovedFilePath(subdir))
if err != nil {
return nil, err
}
if !toBeRemoved {
channelNames = append(channelNames, subdir)
}
}
return channelNames, nil
}

// Remove block index and blocks for the given ledgerid (channelID).
// It creates a temporary file to indicate the channel is to be removed and deletes the ledger data in a separate goroutine.
// If the channel does not exist (or the channel is already marked to be removed), it is not an error.
func (p *BlockStoreProvider) Remove(ledgerid string) error {
p.mutex.Lock()
defer p.mutex.Unlock()
exists, err := p.Exists(ledgerid)
if !exists || err != nil {
return err
}

f, err := os.Create(p.conf.getToBeRemovedFilePath(ledgerid))
if err != nil {
return err
}
f.Close()

go p.removeLedgerData(ledgerid)

return nil
}

// completePendingRemoves checks __toBeRemoved_xxx files and removes the corresponding channel ledger data
// if any temporary file(s) is found. This function should only be called upon ledger init.
func (p *BlockStoreProvider) completePendingRemoves() {
files, err := ioutil.ReadDir(p.conf.blockStorageDir)
if err != nil {
logger.Errorf("Error reading dir %s, error: %s", p.conf.blockStorageDir, err)
return
}
for _, f := range files {
fileName := f.Name()
if !f.IsDir() && strings.HasPrefix(fileName, toBeRemovedFilePrefix) {
p.removeLedgerData(fileName[len(toBeRemovedFilePrefix):])
}
}
}

func (p *BlockStoreProvider) removeLedgerData(ledgerid string) error {
logger.Infof("Removing block data for channel %s", ledgerid)
if err := p.leveldbProvider.Remove(ledgerid); err != nil {
logger.Errorf("Failed to remove block index for channel %s, error: %s", ledgerid, err)
return err
}
if err := os.RemoveAll(p.conf.getLedgerBlockDir(ledgerid)); err != nil {
logger.Errorf("Failed to remove blocks for channel %s, error: %s", ledgerid, err)
return err
}
tempFile := p.conf.getToBeRemovedFilePath(ledgerid)
if err := os.Remove(tempFile); err != nil {
logger.Errorf("Failed to remove temporary file %s for channel %s", tempFile, ledgerid)
return err
}
logger.Infof("Successfully removed block data for channel %s", ledgerid)
return nil
}

// Close closes the BlockStoreProvider
Expand Down
55 changes: 55 additions & 0 deletions common/ledger/blkstorage/blockstore_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"os"
"testing"
"time"

"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/peer"
Expand Down Expand Up @@ -191,6 +192,60 @@ func TestBlockStoreProvider(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, false, exists)

ledgerid := constructLedgerid(0)
provider.Remove(ledgerid)
numStores--
verifyLedgerAfterRemove(t, []string{ledgerid}, provider, numStores)

// manually create toBeRemoved files and then verify Exist/List functions
ledgerids := []string{constructLedgerid(1), constructLedgerid(5)}
for _, ledgerid := range ledgerids {
f, err := os.Create(provider.conf.getToBeRemovedFilePath(ledgerid))
require.NoError(t, err)
f.Close()
}
numStores = numStores - len(ledgerids)
for _, ledgerid := range ledgerids {
exists, err := provider.Exists(ledgerid)
require.NoError(t, err)
require.Equal(t, false, exists)
}
storeNames, err = provider.List()
require.NoError(t, err)
require.Equal(t, numStores, len(storeNames))

go provider.completePendingRemoves()
verifyLedgerAfterRemove(t, ledgerids, provider, numStores)
}

func verifyLedgerAfterRemove(t *testing.T, ledgerids []string, provider *BlockStoreProvider, expectedNumStores int) {
storeNames, err := provider.List()
require.NoError(t, err)
require.Equal(t, expectedNumStores, len(storeNames))

for _, ledgerid := range ledgerids {
exists, err := provider.Exists(ledgerid)
require.NoError(t, err)
require.Equal(t, false, exists)
blockDirRemoved := func() bool {
_, err := os.Stat(provider.conf.getLedgerBlockDir(ledgerid))
if os.IsNotExist(err) {
return true
}
return false
}
toBeRemovedFileNotExisted := func() bool {
_, err := os.Stat(provider.conf.getToBeRemovedFilePath(ledgerid))
if os.IsNotExist(err) {
return true
}
return false
}
require.Eventually(t, blockDirRemoved, 2*time.Second, 100*time.Millisecond)
require.Eventually(t, toBeRemovedFileNotExisted, 2*time.Second, 100*time.Millisecond)
iter := provider.leveldbProvider.GetDBHandle(ledgerid).GetIterator(nil, nil)
require.False(t, iter.Next())
}
}

func constructLedgerid(id int) string {
Expand Down
5 changes: 5 additions & 0 deletions common/ledger/blkstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
// IndexDir is the name of the directory containing all block indexes across ledgers.
IndexDir = "index"
defaultMaxBlockfileSize = 64 * 1024 * 1024 // bytes
toBeRemovedFilePrefix = "__toBeRemoved_"
)

// Conf encapsulates all the configurations for `BlockStore`
Expand Down Expand Up @@ -42,3 +43,7 @@ func (conf *Conf) getChainsDir() string {
func (conf *Conf) getLedgerBlockDir(ledgerid string) string {
return filepath.Join(conf.getChainsDir(), ledgerid)
}

func (conf *Conf) getToBeRemovedFilePath(ledgerid string) string {
return filepath.Join(conf.blockStorageDir, toBeRemovedFilePrefix+ledgerid)
}
6 changes: 6 additions & 0 deletions common/ledger/blockledger/fileledger/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
type blockStoreProvider interface {
Open(ledgerid string) (*blkstorage.BlockStore, error)
List() ([]string, error)
Remove(ledgerid string) error
Close()
}

Expand Down Expand Up @@ -56,6 +57,11 @@ func (flf *fileLedgerFactory) ChannelIDs() []string {
return channelIDs
}

// Remove removes block indexes and blocks for the given channelID
func (flf *fileLedgerFactory) Remove(channelID string) error {
return flf.blkstorageProvider.Remove(channelID)
}

// Close releases all resources acquired by the factory
func (flf *fileLedgerFactory) Close() {
flf.blkstorageProvider.Close()
Expand Down
4 changes: 4 additions & 0 deletions common/ledger/blockledger/fileledger/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func (mbsp *mockBlockStoreProvider) List() ([]string, error) {
return mbsp.list, mbsp.error
}

func (mbsp *mockBlockStoreProvider) Remove(ledgerid string) error {
return mbsp.error
}

func (mbsp *mockBlockStoreProvider) Close() {
}

Expand Down
3 changes: 3 additions & 0 deletions common/ledger/blockledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ type Factory interface {
// ChannelIDs returns the channel IDs the Factory is aware of
ChannelIDs() []string

// Remove removes block indexes and blocks for the given channelID
Remove(channelID string) error

// Close releases all resources acquired by the factory
Close()
}
Expand Down
45 changes: 44 additions & 1 deletion common/ledger/util/leveldbhelper/leveldb_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"bytes"
"fmt"
"sync"
"time"

"github.com/hyperledger/fabric/common/ledger/dataformat"
"github.com/syndtr/goleveldb/leveldb"
Expand All @@ -18,7 +19,11 @@ import (

// internalDBName is used to keep track of data related to internals such as data format
// _ is used as name because this is not allowed as a channelname
const internalDBName = "_"
const (
internalDBName = "_"
maxBatchSize = 5000
batchesInterval = 1000
)

var (
dbNameKeySep = []byte{0x00}
Expand Down Expand Up @@ -123,6 +128,18 @@ func (p *Provider) GetDBHandle(dbName string) *DBHandle {
return dbHandle
}

// Remove removes all the keys for the given dbName from the leveldb
func (p *Provider) Remove(dbName string) error {
dbHandle := p.GetDBHandle(dbName)
if err := dbHandle.DeleteAll(); err != nil {
return err
}
p.mux.Lock()
defer p.mux.Unlock()
delete(p.dbHandles, dbName)
return nil
}

// Close closes the underlying leveldb
func (p *Provider) Close() {
p.db.Close()
Expand All @@ -149,6 +166,32 @@ func (h *DBHandle) Delete(key []byte, sync bool) error {
return h.db.Delete(constructLevelKey(h.dbName, key), sync)
}

// DeleteAll deletes all the keys that belong to the channel (dbName).DeleteAll
func (h *DBHandle) DeleteAll() error {
iter := h.GetIterator(nil, nil)
defer iter.Release()

numKeys := 0
batch := NewUpdateBatch()
for iter.Next() {
key := iter.Key()
batch.Delete(key)
numKeys++
if batch.Len() >= maxBatchSize {
if err := h.WriteBatch(batch, true); err != nil {
return err
}
batch = NewUpdateBatch()
sleepTime := time.Duration(batchesInterval)
logger.Infof("Sleep for %d milliseconds between batches of deletion. Entries have been removed for channel %s: %d", sleepTime, h.dbName, numKeys)
}
}
if batch.Len() > 0 {
return h.WriteBatch(batch, true)
}
return nil
}

// WriteBatch writes a batch in an atomic way
func (h *DBHandle) WriteBatch(batch *UpdateBatch, sync bool) error {
if len(batch.KVs) == 0 {
Expand Down
33 changes: 33 additions & 0 deletions common/ledger/util/leveldbhelper/leveldb_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,39 @@ func TestBatchedUpdates(t *testing.T) {
}
}

func TestRemove(t *testing.T) {
env := newTestProviderEnv(t, testDBPath)
defer env.cleanup()
p := env.provider

db1 := p.GetDBHandle("db1")
db2 := p.GetDBHandle("db2")
db3 := p.GetDBHandle("db3")
for i := 0; i < 20; i++ {
db1.Put([]byte(createTestKey(i)), []byte(createTestValue("db1", i)), false)
db2.Put([]byte(createTestKey(i)), []byte(createTestValue("db2", i)), false)
db3.Put([]byte(createTestKey(i)), []byte(createTestValue("db3", i)), false)
}

itr := db1.GetIterator(nil, nil)
defer itr.Release()
require.True(t, itr.Next())

err := p.Remove("db1")
require.NoError(t, err)
itr1 := db1.GetIterator(nil, nil)
defer itr1.Release()
require.False(t, itr1.Next())

itr2 := db2.GetIterator(nil, nil)
defer itr2.Release()
checkItrResults(t, itr2, createTestKeys(0, 19), createTestValues("db2", 0, 19))

itr3 := db3.GetIterator(nil, nil)
defer itr3.Release()
checkItrResults(t, itr3, createTestKeys(0, 19), createTestValues("db3", 0, 19))
}

func TestFormatCheck(t *testing.T) {
testCases := []struct {
dataFormat string
Expand Down
14 changes: 14 additions & 0 deletions orderer/common/server/mocks/factory.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions orderer/common/server/onboarding.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ type Factory interface {
// ChannelIDs returns the channel IDs the Factory is aware of
ChannelIDs() []string

// Remove removes block indexes and block files for the channel
Remove(channelID string) error

// Close releases all resources acquired by the factory
Close()
}
Expand Down

0 comments on commit 3866768

Please sign in to comment.