Skip to content

Commit

Permalink
SingleLevelDB for block index
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-1664

This changeset:
- Renames package ledger/util/db to ledger/util/leveldbhelper
- Implements a leveldb provider
  (that enables using same leveldb instance as a multiple logical dbs)
  in util package for being able to reuse across statedb, index,
  and later for historydb
- Implements a provider as a single point of invocation
  for managing multiple block storage
- Uses a single leveldb instance for block storage index
- Makes the structures other than providers as private
  to their respective packages

Change-Id: I5f0b3b9aa8ef3ac1ccdce4f3c6fa6d842b5318c1
Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi committed Jan 17, 2017
1 parent 1642e88 commit 8cdd0f4
Show file tree
Hide file tree
Showing 32 changed files with 842 additions and 425 deletions.
9 changes: 9 additions & 0 deletions core/ledger/blkstorage/blockstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ var (
ErrAttrNotIndexed = errors.New("Attribute not indexed")
)

// BlockStoreProvider provides an handle to a BlockStore
type BlockStoreProvider interface {
CreateBlockStore(ledgerid string) (BlockStore, error)
OpenBlockStore(ledgerid string) (BlockStore, error)
Exists(ledgerid string) (bool, error)
List() ([]string, error)
Close()
}

// BlockStore - an interface for persisting and retrieving blocks
// An implementation of this interface is expected to take an argument
// of type `IndexConfig` which configures the block store on what items should be indexed
Expand Down
16 changes: 8 additions & 8 deletions core/ledger/blkstorage/fsblkstorage/block_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ func TestBlockfileStream(t *testing.T) {
}

func testBlockfileStream(t *testing.T, numBlocks int) {
env := newTestEnv(t)
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", 0))
defer env.Cleanup()
w := newTestBlockfileWrapper(t, env)
blockfileMgr := w.blockfileMgr
ledgerid := "testledger"
w := newTestBlockfileWrapper(env, ledgerid)
blocks := testutil.ConstructTestBlocks(t, numBlocks)
w.addBlocks(blocks)
w.close()

s, err := newBlockfileStream(blockfileMgr.rootDir, 0, 0)
s, err := newBlockfileStream(w.blockfileMgr.rootDir, 0, 0)
defer s.close()
testutil.AssertNoError(t, err, "Error in constructing blockfile stream")

Expand Down Expand Up @@ -71,9 +71,9 @@ func TestBlockFileStreamUnexpectedEOF(t *testing.T) {
}

func testBlockFileStreamUnexpectedEOF(t *testing.T, numBlocks int, partialBlockBytes []byte) {
env := newTestEnv(t)
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", 0))
defer env.Cleanup()
w := newTestBlockfileWrapper(t, env)
w := newTestBlockfileWrapper(env, "testLedger")
blockfileMgr := w.blockfileMgr
blocks := testutil.ConstructTestBlocks(t, numBlocks)
w.addBlocks(blocks)
Expand All @@ -100,9 +100,9 @@ func TestBlockStream(t *testing.T) {
}

func testBlockStream(t *testing.T, numFiles int) {
env := newTestEnv(t)
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", 0))
defer env.Cleanup()
w := newTestBlockfileWrapper(t, env)
w := newTestBlockfileWrapper(env, "testLedger")
defer w.close()
blockfileMgr := w.blockfileMgr

Expand Down
34 changes: 14 additions & 20 deletions core/ledger/blkstorage/fsblkstorage/blockfile_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/ledger/blkstorage"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/core/ledger/util/db"
"github.com/hyperledger/fabric/core/ledger/util/leveldbhelper"
"github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric/protos/peer"
putil "github.com/hyperledger/fabric/protos/utils"
Expand All @@ -43,10 +43,15 @@ var (
blkMgrInfoKey = []byte("blkMgrInfo")
)

type conf struct {
blockfilesDir string
maxBlockfileSize int
}

type blockfileMgr struct {
rootDir string
conf *Conf
db *db.DB
db *leveldbhelper.DBHandle
index index
cpInfo *checkpointInfo
cpInfoCond *sync.Cond
Expand Down Expand Up @@ -95,17 +100,15 @@ At start up a new manager:
-- If index and file system are not in sync, syncs index from the FS
*) Updates blockchain info used by the APIs
*/
func newBlockfileMgr(conf *Conf, indexConfig *blkstorage.IndexConfig) *blockfileMgr {
func newBlockfileMgr(id string, conf *Conf, indexConfig *blkstorage.IndexConfig, indexStore *leveldbhelper.DBHandle) *blockfileMgr {
//Determine the root directory for the blockfile storage, if it does not exist create it
rootDir := conf.blockfilesDir
rootDir := conf.getLedgerBlockDir(id)
_, err := util.CreateDirIfMissing(rootDir)
if err != nil {
panic(fmt.Sprintf("Error: %s", err))
}
//Determine the kev value db instance, if it does not exist, create the directory and instantiate the database.
db := initDB(conf)
// Instantiate the manager, i.e. blockFileMgr structure
mgr := &blockfileMgr{rootDir: rootDir, conf: conf, db: db}
mgr := &blockfileMgr{rootDir: rootDir, conf: conf, db: indexStore}

// cp = checkpointInfo, retrieve from the database the file suffix or number of where blocks were stored.
// It also retrieves the current size of that file and the last block number that was written to that file.
Expand All @@ -123,7 +126,7 @@ func newBlockfileMgr(conf *Conf, indexConfig *blkstorage.IndexConfig) *blockfile
}
//Verify that the checkpoint stored in db is accurate with what is actually stored in block file system
// If not the same, sync the cpInfo and the file system
syncCPInfoFromFS(conf, cpInfo)
syncCPInfoFromFS(rootDir, cpInfo)
//Open a writer to the file identified by the number and truncate it to only contain the latest block
// that was completely saved (file system, index, cpinfo, etc)
currentFileWriter, err := newBlockfileWriter(deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum))
Expand All @@ -137,7 +140,7 @@ func newBlockfileMgr(conf *Conf, indexConfig *blkstorage.IndexConfig) *blockfile
}

// Create a new KeyValue store database handler for the blocks index in the keyvalue database
mgr.index = newBlockIndex(indexConfig, db)
mgr.index = newBlockIndex(indexConfig, indexStore)

// Update the manager with the checkpoint info and the file writer
mgr.cpInfo = cpInfo
Expand Down Expand Up @@ -174,21 +177,13 @@ func newBlockfileMgr(conf *Conf, indexConfig *blkstorage.IndexConfig) *blockfile
return mgr
}

func initDB(conf *Conf) *db.DB {
dbInst := db.CreateDB(&db.Conf{
DBPath: conf.dbPath})
dbInst.Open()
return dbInst
}

//cp = checkpointInfo, from the database gets the file suffix and the size of
// the file of where the last block was written. Also retrieves contains the
// last block number that was written. At init
//checkpointInfo:latestFileChunkSuffixNum=[0], latestFileChunksize=[0], lastBlockNumber=[0]
func syncCPInfoFromFS(conf *Conf, cpInfo *checkpointInfo) {
func syncCPInfoFromFS(rootDir string, cpInfo *checkpointInfo) {
logger.Debugf("Starting checkpoint=%s", cpInfo)
//Checks if the file suffix of where the last block was written exists
rootDir := conf.blockfilesDir
filePath := deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum)
exists, size, err := util.FileExists(filePath)
if err != nil {
Expand Down Expand Up @@ -224,7 +219,6 @@ func (mgr *blockfileMgr) open() error {

func (mgr *blockfileMgr) close() {
mgr.currentFileWriter.close()
mgr.db.Close()
}

func (mgr *blockfileMgr) moveToNextFile() {
Expand Down Expand Up @@ -443,7 +437,7 @@ func (mgr *blockfileMgr) retrieveBlockHeaderByNumber(blockNum uint64) (*common.B
return info.blockHeader, nil
}

func (mgr *blockfileMgr) retrieveBlocks(startNum uint64) (*BlocksItr, error) {
func (mgr *blockfileMgr) retrieveBlocks(startNum uint64) (*blocksItr, error) {
return newBlockItr(mgr, startNum), nil
}

Expand Down
46 changes: 23 additions & 23 deletions core/ledger/blkstorage/fsblkstorage/blockfile_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
)

func TestBlockfileMgrBlockReadWrite(t *testing.T) {
env := newTestEnv(t)
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
blkfileMgrWrapper := newTestBlockfileWrapper(env, "testLedger")
defer blkfileMgrWrapper.close()
blocks := testutil.ConstructTestBlocks(t, 10)
blkfileMgrWrapper.addBlocks(blocks)
Expand All @@ -47,9 +47,10 @@ func TestBlockfileMgrCrashDuringWriting(t *testing.T) {

func testBlockfileMgrCrashDuringWriting(t *testing.T, numBlocksBeforeCheckpoint int,
numBlocksAfterCheckpoint int, numLastBlockBytes int, numPartialBytesToWrite int) {
env := newTestEnv(t)
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
ledgerid := "testLedger"
blkfileMgrWrapper := newTestBlockfileWrapper(env, ledgerid)
bg := testutil.NewBlockGenerator(t)
blocksBeforeCP := bg.NextTestBlocks(numBlocksBeforeCheckpoint)
blkfileMgrWrapper.addBlocks(blocksBeforeCP)
Expand All @@ -75,7 +76,7 @@ func testBlockfileMgrCrashDuringWriting(t *testing.T, numBlocksBeforeCheckpoint
blkfileMgrWrapper.close()

// simulate a start after a crash
blkfileMgrWrapper = newTestBlockfileWrapper(t, env)
blkfileMgrWrapper = newTestBlockfileWrapper(env, ledgerid)
defer blkfileMgrWrapper.close()
cpInfo3 := blkfileMgrWrapper.blockfileMgr.cpInfo
testutil.AssertEquals(t, cpInfo3, cpInfo2)
Expand All @@ -91,9 +92,9 @@ func testBlockfileMgrCrashDuringWriting(t *testing.T, numBlocksBeforeCheckpoint
}

func TestBlockfileMgrBlockIterator(t *testing.T) {
env := newTestEnv(t)
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
blkfileMgrWrapper := newTestBlockfileWrapper(env, "testLedger")
defer blkfileMgrWrapper.close()
blocks := testutil.ConstructTestBlocks(t, 10)
blkfileMgrWrapper.addBlocks(blocks)
Expand All @@ -109,7 +110,7 @@ func testBlockfileMgrBlockIterator(t *testing.T, blockfileMgr *blockfileMgr,
for {
block, err := itr.Next()
testutil.AssertNoError(t, err, fmt.Sprintf("Error while getting block number [%d] from iterator", numBlocksItrated))
testutil.AssertEquals(t, block.(*BlockHolder).GetBlock(), expectedBlocks[numBlocksItrated])
testutil.AssertEquals(t, block.(*blockHolder).GetBlock(), expectedBlocks[numBlocksItrated])
numBlocksItrated++
if numBlocksItrated == lastBlockNum-firstBlockNum+1 {
break
Expand All @@ -119,9 +120,9 @@ func testBlockfileMgrBlockIterator(t *testing.T, blockfileMgr *blockfileMgr,
}

func TestBlockfileMgrBlockchainInfo(t *testing.T) {
env := newTestEnv(t)
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
blkfileMgrWrapper := newTestBlockfileWrapper(env, "testLedger")
defer blkfileMgrWrapper.close()

bcInfo := blkfileMgrWrapper.blockfileMgr.getBlockchainInfo()
Expand All @@ -134,9 +135,9 @@ func TestBlockfileMgrBlockchainInfo(t *testing.T) {
}

func TestBlockfileMgrGetTxById(t *testing.T) {
env := newTestEnv(t)
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
blkfileMgrWrapper := newTestBlockfileWrapper(env, "testLedger")
defer blkfileMgrWrapper.close()
blocks := testutil.ConstructTestBlocks(t, 10)
blkfileMgrWrapper.addBlocks(blocks)
Expand All @@ -155,21 +156,21 @@ func TestBlockfileMgrGetTxById(t *testing.T) {
}

func TestBlockfileMgrRestart(t *testing.T) {
env := newTestEnv(t)
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
ledgerid := "testLedger"
blkfileMgrWrapper := newTestBlockfileWrapper(env, ledgerid)
blocks := testutil.ConstructTestBlocks(t, 10)
blkfileMgrWrapper.addBlocks(blocks)
blkfileMgrWrapper.close()

blkfileMgrWrapper = newTestBlockfileWrapper(t, env)
blkfileMgrWrapper = newTestBlockfileWrapper(env, ledgerid)
defer blkfileMgrWrapper.close()
testutil.AssertEquals(t, int(blkfileMgrWrapper.blockfileMgr.cpInfo.lastBlockNumber), 10)
blkfileMgrWrapper.testGetBlockByHash(blocks)
}

func TestBlockfileMgrFileRolling(t *testing.T) {
env := newTestEnv(t)
blocks := testutil.ConstructTestBlocks(t, 100)
size := 0
for _, block := range blocks {
Expand All @@ -180,18 +181,17 @@ func TestBlockfileMgrFileRolling(t *testing.T) {
size += blockBytesSize + len(encodedLen)
}

env.conf.maxBlockfileSize = int(0.75 * float64(size))
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
maxFileSie := int(0.75 * float64(size))
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", maxFileSie))
defer env.Cleanup()
ledgerid := "testLedger"
blkfileMgrWrapper := newTestBlockfileWrapper(env, ledgerid)
blkfileMgrWrapper.addBlocks(blocks)
testutil.AssertEquals(t, blkfileMgrWrapper.blockfileMgr.cpInfo.latestFileChunkSuffixNum, 1)
blkfileMgrWrapper.testGetBlockByHash(blocks)
blkfileMgrWrapper.close()
env.Cleanup()

env = newTestEnv(t)
defer env.Cleanup()
env.conf.maxBlockfileSize = int(0.40 * float64(size))
blkfileMgrWrapper = newTestBlockfileWrapper(t, env)
blkfileMgrWrapper = newTestBlockfileWrapper(env, ledgerid)
defer blkfileMgrWrapper.close()
blkfileMgrWrapper.addBlocks(blocks)
testutil.AssertEquals(t, blkfileMgrWrapper.blockfileMgr.cpInfo.latestFileChunkSuffixNum, 2)
Expand Down
18 changes: 10 additions & 8 deletions core/ledger/blkstorage/fsblkstorage/blockfile_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (
)

func TestBlockFileScanSmallTxOnly(t *testing.T) {
env := newTestEnv(t)
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
ledgerid := "testLedger"
blkfileMgrWrapper := newTestBlockfileWrapper(env, ledgerid)
bg := testutil.NewBlockGenerator(t)
blocks := []*common.Block{}
blocks = append(blocks, bg.NextTestBlock(0, 0))
Expand All @@ -38,20 +39,21 @@ func TestBlockFileScanSmallTxOnly(t *testing.T) {
blkfileMgrWrapper.addBlocks(blocks)
blkfileMgrWrapper.close()

filePath := deriveBlockfilePath(env.conf.blockfilesDir, 0)
filePath := deriveBlockfilePath(env.provider.conf.getLedgerBlockDir(ledgerid), 0)
_, fileSize, err := util.FileExists(filePath)
testutil.AssertNoError(t, err, "")

endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(env.conf.blockfilesDir, 0, 0)
endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(env.provider.conf.getLedgerBlockDir(ledgerid), 0, 0)
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, numBlocks, len(blocks))
testutil.AssertEquals(t, endOffsetLastBlock, fileSize)
}

func TestBlockFileScanSmallTxLastTxIncomplete(t *testing.T) {
env := newTestEnv(t)
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
ledgerid := "testLedger"
blkfileMgrWrapper := newTestBlockfileWrapper(env, ledgerid)
bg := testutil.NewBlockGenerator(t)
blocks := []*common.Block{}
blocks = append(blocks, bg.NextTestBlock(0, 0))
Expand All @@ -60,7 +62,7 @@ func TestBlockFileScanSmallTxLastTxIncomplete(t *testing.T) {
blkfileMgrWrapper.addBlocks(blocks)
blkfileMgrWrapper.close()

filePath := deriveBlockfilePath(env.conf.blockfilesDir, 0)
filePath := deriveBlockfilePath(env.provider.conf.getLedgerBlockDir(ledgerid), 0)
_, fileSize, err := util.FileExists(filePath)
testutil.AssertNoError(t, err, "")

Expand All @@ -70,7 +72,7 @@ func TestBlockFileScanSmallTxLastTxIncomplete(t *testing.T) {
err = file.Truncate(fileSize - 1)
testutil.AssertNoError(t, err, "")

_, numBlocks, err := scanForLastCompleteBlock(env.conf.blockfilesDir, 0, 0)
_, numBlocks, err := scanForLastCompleteBlock(env.provider.conf.getLedgerBlockDir(ledgerid), 0, 0)
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, numBlocks, len(blocks)-1)
}
9 changes: 4 additions & 5 deletions core/ledger/blkstorage/fsblkstorage/blockindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/ledger/blkstorage"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/core/ledger/util/db"
"github.com/syndtr/goleveldb/leveldb"
"github.com/hyperledger/fabric/core/ledger/util/leveldbhelper"
)

const (
Expand Down Expand Up @@ -54,10 +53,10 @@ type blockIdxInfo struct {

type blockIndex struct {
indexItemsMap map[blkstorage.IndexableAttr]bool
db *db.DB
db *leveldbhelper.DBHandle
}

func newBlockIndex(indexConfig *blkstorage.IndexConfig, db *db.DB) *blockIndex {
func newBlockIndex(indexConfig *blkstorage.IndexConfig, db *leveldbhelper.DBHandle) *blockIndex {
indexItems := indexConfig.AttrsToIndex
logger.Debugf("newBlockIndex() - indexItems:[%s]", indexItems)
indexItemsMap := make(map[blkstorage.IndexableAttr]bool)
Expand Down Expand Up @@ -85,7 +84,7 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
logger.Debugf("Indexing block [%s]", blockIdxInfo)
flp := blockIdxInfo.flp
txOffsets := blockIdxInfo.txOffsets
batch := &leveldb.Batch{}
batch := leveldbhelper.NewUpdateBatch()
flpBytes, err := flp.marshal()
if err != nil {
return err
Expand Down
Loading

0 comments on commit 8cdd0f4

Please sign in to comment.