From a55b4f4d52c74dd3feece2c64f696617bca7afd1 Mon Sep 17 00:00:00 2001 From: Wenjian Qiao Date: Thu, 18 Jun 2020 22:53:25 -0400 Subject: [PATCH] [FAB-17992] Remove ledger blockstore data for a channel Add a Remove function to block store provider to remove block index and blocks directory for a channel. It will be called by the orderer and peer as part of channel deletion. Signed-off-by: Wenjian Qiao --- .../ledger/blkstorage/blockstore_provider.go | 23 +++ .../blkstorage/blockstore_provider_test.go | 73 ++++++++-- .../util/leveldbhelper/leveldb_provider.go | 74 +++++++++- .../leveldbhelper/leveldb_provider_test.go | 131 ++++++++++++++++++ 4 files changed, 285 insertions(+), 16 deletions(-) diff --git a/common/ledger/blkstorage/blockstore_provider.go b/common/ledger/blkstorage/blockstore_provider.go index e542fd4c6de..ef6a6cee00d 100644 --- a/common/ledger/blkstorage/blockstore_provider.go +++ b/common/ledger/blkstorage/blockstore_provider.go @@ -125,6 +125,29 @@ func (p *BlockStoreProvider) Exists(ledgerid string) (bool, error) { return exists, err } +// Remove block index and blocks for the given ledgerid (channelID). It is not an error if the channel does not exist. +// This function is not error safe. If this function returns an error or a crash takes place, it is highly likely +// that the data for this ledger is left in an inconsistent state. Opening the ledger again or reusing the previously +// opened ledger can show unknown behavior. +func (p *BlockStoreProvider) Remove(ledgerid string) error { + exists, err := p.Exists(ledgerid) + if err != nil { + return err + } + if !exists { + return nil + } + dbHandle := p.leveldbProvider.GetDBHandle(ledgerid) + if err := dbHandle.DeleteAll(); err != nil { + return err + } + dbHandle.Close() + if err := os.RemoveAll(p.conf.getLedgerBlockDir(ledgerid)); err != nil { + return err + } + return syncDir(p.conf.getChainsDir()) +} + // List lists the ids of the existing ledgers func (p *BlockStoreProvider) List() ([]string, error) { return util.ListSubdirs(p.conf.getChainsDir()) diff --git a/common/ledger/blkstorage/blockstore_provider_test.go b/common/ledger/blkstorage/blockstore_provider_test.go index cd446b80fc6..d9b237eabd0 100644 --- a/common/ledger/blkstorage/blockstore_provider_test.go +++ b/common/ledger/blkstorage/blockstore_provider_test.go @@ -52,17 +52,9 @@ func TestMultipleBlockStores(t *testing.T) { require.NoError(t, err) defer store2.Shutdown() - blocks1 := testutil.ConstructTestBlocks(t, 5) - for _, b := range blocks1 { - err := store1.AddBlock(b) - require.NoError(t, err) - } + blocks1 := addBlocksToStore(t, store1, 5) + blocks2 := addBlocksToStore(t, store2, 10) - blocks2 := testutil.ConstructTestBlocks(t, 10) - for _, b := range blocks2 { - err := store2.AddBlock(b) - require.NoError(t, err) - } checkBlocks(t, blocks1, store1) checkBlocks(t, blocks2, store2) checkWithWrongInputs(t, store1, 5) @@ -94,6 +86,15 @@ func TestMultipleBlockStores(t *testing.T) { checkWithWrongInputs(t, newstore2, 10) } +func addBlocksToStore(t *testing.T, store *BlockStore, numBlocks int) []*common.Block { + blocks := testutil.ConstructTestBlocks(t, numBlocks) + for _, b := range blocks { + err := store.AddBlock(b) + require.NoError(t, err) + } + return blocks +} + func checkBlocks(t *testing.T, expectedBlocks []*common.Block, store *BlockStore) { bcInfo, _ := store.GetBlockchainInfo() require.Equal(t, uint64(len(expectedBlocks)), bcInfo.Height) @@ -192,6 +193,58 @@ func TestBlockStoreProvider(t *testing.T) { } +func TestRemove(t *testing.T) { + env := newTestEnv(t, NewConf(testPath(), 0)) + defer env.Cleanup() + + provider := env.provider + store1, err := provider.Open("ledger1") + require.NoError(t, err) + defer store1.Shutdown() + store2, err := provider.Open("ledger2") + require.NoError(t, err) + defer store2.Shutdown() + + blocks1 := addBlocksToStore(t, store1, 5) + blocks2 := addBlocksToStore(t, store2, 10) + + checkBlocks(t, blocks1, store1) + checkBlocks(t, blocks2, store2) + storeNames, err := provider.List() + require.NoError(t, err) + require.ElementsMatch(t, storeNames, []string{"ledger1", "ledger2"}) + + require.NoError(t, provider.Remove("ledger1")) + + // verify ledger1 block dir and block indexes are deleted + exists, err := provider.Exists("ledger1") + require.NoError(t, err) + require.False(t, exists) + itr := provider.leveldbProvider.GetDBHandle("ledger1").GetIterator(nil, nil) + defer itr.Release() + require.False(t, itr.Next()) + + // verify ledger2 ledger data are remained same + checkBlocks(t, blocks2, store2) + storeNames, err = provider.List() + require.NoError(t, err) + require.ElementsMatch(t, storeNames, []string{"ledger2"}) + + // remove again should return no error + require.NoError(t, provider.Remove("ledger1")) + + // verify "ledger1" store can be opened again after remove, but it is an empty store + newstore1, err := provider.Open("ledger1") + require.NoError(t, err) + bcInfo, err := newstore1.GetBlockchainInfo() + require.NoError(t, err) + require.Equal(t, &common.BlockchainInfo{}, bcInfo) + + // negative test + provider.Close() + require.EqualError(t, provider.Remove("ledger2"), "internal leveldb error while obtaining db iterator: leveldb: closed") +} + func constructLedgerid(id int) string { return fmt.Sprintf("ledger_%d", id) } diff --git a/common/ledger/util/leveldbhelper/leveldb_provider.go b/common/ledger/util/leveldbhelper/leveldb_provider.go index 838780087b2..8abe71bf078 100644 --- a/common/ledger/util/leveldbhelper/leveldb_provider.go +++ b/common/ledger/util/leveldbhelper/leveldb_provider.go @@ -12,13 +12,19 @@ import ( "sync" "github.com/hyperledger/fabric/common/ledger/dataformat" + "github.com/pkg/errors" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" ) -// internalDBName is used to keep track of data related to internals such as data format -// _ is used as name because this is not allowed as a channelname -const internalDBName = "_" +const ( + // internalDBName is used to keep track of data related to internals such as data format + // _ is used as name because this is not allowed as a channelname + internalDBName = "_" + // maxBatchSize limits the memory usage (1MB) for a batch. It is measured by the total number of bytes + // of all the keys in a batch. + maxBatchSize = 1000000 +) var ( dbNameKeySep = []byte{0x00} @@ -26,6 +32,9 @@ var ( formatVersionKey = []byte{'f'} // a single key in db whose value indicates the version of the data format ) +// closeFunc closes the db handle +type closeFunc func() + // Conf configuration for `Provider` // // `ExpectedFormat` is the expected value of the format key in the internal database. @@ -117,7 +126,12 @@ func (p *Provider) GetDBHandle(dbName string) *DBHandle { defer p.mux.Unlock() dbHandle := p.dbHandles[dbName] if dbHandle == nil { - dbHandle = &DBHandle{dbName, p.db} + closeFunc := func() { + p.mux.Lock() + defer p.mux.Unlock() + delete(p.dbHandles, dbName) + } + dbHandle = &DBHandle{dbName, p.db, closeFunc} p.dbHandles[dbName] = dbHandle } return dbHandle @@ -130,8 +144,9 @@ func (p *Provider) Close() { // DBHandle is an handle to a named db type DBHandle struct { - dbName string - db *DB + dbName string + db *DB + closeFunc closeFunc } // Get returns the value for the given key @@ -149,6 +164,46 @@ func (h *DBHandle) Delete(key []byte, sync bool) error { return h.db.Delete(constructLevelKey(h.dbName, key), sync) } +// DeleteAll deletes all the keys that belong to the channel (dbName). +func (h *DBHandle) DeleteAll() error { + iter := h.GetIterator(nil, nil) + defer iter.Release() + if err := iter.Error(); err != nil { + return errors.Wrap(err, "internal leveldb error while obtaining db iterator") + } + + // use leveldb iterator directly to be more efficient + dbIter := iter.Iterator + + // This is common code shared by all the leveldb instances. Because each leveldb has its own key size pattern, + // each batch is limited by memory usage instead of number of keys. Once the batch memory usage reaches maxBatchSize, + // the batch will be committed. + numKeys := 0 + batchSize := 0 + batch := &leveldb.Batch{} + for dbIter.Next() { + if err := dbIter.Error(); err != nil { + return errors.Wrap(err, "internal leveldb error while retrieving data from db iterator") + } + key := dbIter.Key() + numKeys++ + batchSize = batchSize + len(key) + batch.Delete(key) + if batchSize >= maxBatchSize { + if err := h.db.WriteBatch(batch, true); err != nil { + return err + } + logger.Infof("Have removed %d entries for channel %s in leveldb %s", numKeys, h.dbName, h.db.conf.DBPath) + batchSize = 0 + batch = &leveldb.Batch{} + } + } + if batch.Len() > 0 { + return h.db.WriteBatch(batch, true) + } + return nil +} + // WriteBatch writes a batch in an atomic way func (h *DBHandle) WriteBatch(batch *UpdateBatch, sync bool) error { if len(batch.KVs) == 0 { @@ -183,6 +238,13 @@ func (h *DBHandle) GetIterator(startKey []byte, endKey []byte) *Iterator { return &Iterator{h.dbName, h.db.GetIterator(sKey, eKey)} } +// Close closes the DBHandle after its db data have been deleted +func (h *DBHandle) Close() { + if h.closeFunc != nil { + h.closeFunc() + } +} + // UpdateBatch encloses the details of multiple `updates` type UpdateBatch struct { KVs map[string][]byte diff --git a/common/ledger/util/leveldbhelper/leveldb_provider_test.go b/common/ledger/util/leveldbhelper/leveldb_provider_test.go index 3850ff04b63..d5746672da2 100644 --- a/common/ledger/util/leveldbhelper/leveldb_provider_test.go +++ b/common/ledger/util/leveldbhelper/leveldb_provider_test.go @@ -191,6 +191,99 @@ func TestBatchedUpdates(t *testing.T) { } } +func TestDeleteAll(t *testing.T) { + env := newTestProviderEnv(t, testDBPath) + defer env.cleanup() + p := env.provider + + db1 := p.GetDBHandle("db1") + db2 := p.GetDBHandle("db2") + db3 := p.GetDBHandle("db3") + db4 := p.GetDBHandle("db4") + for i := 0; i < 20; i++ { + db1.Put([]byte(createTestKey(i)), []byte(createTestValue("db1", i)), false) + db2.Put([]byte(createTestKey(i)), []byte(createTestValue("db2", i)), false) + db3.Put([]byte(createTestKey(i)), []byte(createTestValue("db3", i)), false) + } + // db4 is used to test remove when multiple batches are needed (each long key has 125 bytes) + for i := 0; i < 10000; i++ { + db4.Put([]byte(createTestLongKey(i)), []byte(createTestValue("db4", i)), false) + } + + expectedSetup := []struct { + db *DBHandle + expectedKeys []string + expectedValues []string + }{ + { + db: db1, + expectedKeys: createTestKeys(0, 19), + expectedValues: createTestValues("db1", 0, 19), + }, + { + db: db2, + expectedKeys: createTestKeys(0, 19), + expectedValues: createTestValues("db2", 0, 19), + }, + { + db: db3, + expectedKeys: createTestKeys(0, 19), + expectedValues: createTestValues("db3", 0, 19), + }, + { + db: db4, + expectedKeys: createTestLongKeys(0, 9999), + expectedValues: createTestValues("db4", 0, 9999), + }, + } + + for _, dbSetup := range expectedSetup { + itr := dbSetup.db.GetIterator(nil, nil) + checkItrResults(t, itr, dbSetup.expectedKeys, dbSetup.expectedValues) + itr.Release() + } + + require.NoError(t, db1.DeleteAll()) + require.NoError(t, db4.DeleteAll()) + + expectedResults := []struct { + db *DBHandle + expectedKeys []string + expectedValues []string + }{ + { + db: db1, + expectedKeys: nil, + expectedValues: nil, + }, + { + db: db2, + expectedKeys: createTestKeys(0, 19), + expectedValues: createTestValues("db2", 0, 19), + }, + { + db: db3, + expectedKeys: createTestKeys(0, 19), + expectedValues: createTestValues("db3", 0, 19), + }, + { + db: db4, + expectedKeys: nil, + expectedValues: nil, + }, + } + + for _, result := range expectedResults { + itr := result.db.GetIterator(nil, nil) + checkItrResults(t, itr, result.expectedKeys, result.expectedValues) + itr.Release() + } + + // negative test + p.Close() + require.EqualError(t, db2.DeleteAll(), "internal leveldb error while obtaining db iterator: leveldb: closed") +} + func TestFormatCheck(t *testing.T) { testCases := []struct { dataFormat string @@ -245,6 +338,30 @@ func TestFormatCheck(t *testing.T) { } } +func TestClose(t *testing.T) { + env := newTestProviderEnv(t, testDBPath) + defer env.cleanup() + p := env.provider + + db1 := p.GetDBHandle("db1") + db2 := p.GetDBHandle("db2") + + expectedDBHandles := map[string]*DBHandle{ + "db1": db1, + "db2": db2, + } + require.Equal(t, expectedDBHandles, p.dbHandles) + + db1.Close() + expectedDBHandles = map[string]*DBHandle{ + "db2": db2, + } + require.Equal(t, expectedDBHandles, p.dbHandles) + + db2.Close() + require.Equal(t, map[string]*DBHandle{}, p.dbHandles) +} + func testFormatCheck(t *testing.T, dataFormat, expectedFormat string, dataExists bool, expectedErr *dataformat.ErrFormatMismatch) { assert.NoError(t, os.RemoveAll(testDBPath)) defer func() { @@ -337,6 +454,12 @@ func createTestKey(i int) string { return fmt.Sprintf("key_%06d", i) } +const padding100 = "_0123456789_0123456789_0123456789_0123456789_0123456789_0123456789_0123456789_0123456789_0123456789_" + +func createTestLongKey(i int) string { + return fmt.Sprintf("key_%s_%10d", padding100, i) +} + func createTestValue(dbname string, i int) string { return fmt.Sprintf("value_%s_%06d", dbname, i) } @@ -349,6 +472,14 @@ func createTestKeys(start int, end int) []string { return keys } +func createTestLongKeys(start int, end int) []string { + var keys []string + for i := start; i <= end; i++ { + keys = append(keys, createTestLongKey(i)) + } + return keys +} + func createTestValues(dbname string, start int, end int) []string { var values []string for i := start; i <= end; i++ {