Skip to content

Commit

Permalink
Merge "[FAB-6779] Allow rebuilding block storage indexes"
Browse files Browse the repository at this point in the history
  • Loading branch information
denyeart authored and Gerrit Code Review committed Oct 31, 2017
2 parents b8bc66c + 1daabff commit 4fb3abd
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 21 deletions.
117 changes: 117 additions & 0 deletions common/ledger/blkstorage/fsblkstorage/blockfile_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package fsblkstorage

import (
"fmt"
"io/ioutil"
"os"
"strconv"
"strings"

"github.com/davecgh/go-spew/spew"
"github.com/hyperledger/fabric/protos/common"
)

// constructCheckpointInfoFromBlockFiles scans the last blockfile (if any) and construct the checkpoint info
// if the last file contains no block or only a partially written block (potentially because of a crash while writing block to the file),
// this scans the second last file (if any)
func constructCheckpointInfoFromBlockFiles(rootDir string) (*checkpointInfo, error) {
logger.Debugf("Retrieving checkpoint info from block files")
var lastFileNum int
var numBlocksInFile int
var endOffsetLastBlock int64
var lastBlockNumber uint64

var lastBlockBytes []byte
var lastBlock *common.Block
var err error

if lastFileNum, err = retrieveLastFileSuffix(rootDir); err != nil {
return nil, err
}
logger.Debugf("Last file number found = %d", lastFileNum)

if lastFileNum == -1 {
cpInfo := &checkpointInfo{0, 0, true, 0}
logger.Info("No block file found")
return cpInfo, nil
}

fileInfo := getFileInfoOrPanic(rootDir, lastFileNum)
logger.Infof("Last Block file info: FileName=[%s], FileSize=[%d]", fileInfo.Name(), fileInfo.Size())
if lastBlockBytes, endOffsetLastBlock, numBlocksInFile, err = scanForLastCompleteBlock(rootDir, lastFileNum, 0); err != nil {
logger.Errorf("Error while scanning last file [file num=%d]: %s", lastFileNum, err)
return nil, err
}

if numBlocksInFile == 0 && lastFileNum > 0 {
secondLastFileNum := lastFileNum - 1
fileInfo := getFileInfoOrPanic(rootDir, secondLastFileNum)
logger.Infof("Second last Block file info: FileName=[%s], FileSize=[%d]", fileInfo.Name(), fileInfo.Size())
if lastBlockBytes, _, _, err = scanForLastCompleteBlock(rootDir, secondLastFileNum, 0); err != nil {
logger.Errorf("Error while scanning second last file [file num=%d]: %s", secondLastFileNum, err)
return nil, err
}
}

if lastBlockBytes != nil {
if lastBlock, err = deserializeBlock(lastBlockBytes); err != nil {
logger.Errorf("Error deserializing last block: %s. Block bytes length = %d", err, len(lastBlockBytes))
return nil, err
}
lastBlockNumber = lastBlock.Header.Number
}

cpInfo := &checkpointInfo{
lastBlockNumber: lastBlockNumber,
latestFileChunksize: int(endOffsetLastBlock),
latestFileChunkSuffixNum: lastFileNum,
isChainEmpty: lastFileNum == 0 && numBlocksInFile == 0,
}
logger.Debugf("Checkpoint info constructed from file system = %s", spew.Sdump(cpInfo))
return cpInfo, nil
}

func retrieveLastFileSuffix(rootDir string) (int, error) {
logger.Debugf("retrieveLastFileSuffix()")
biggestFileNum := -1
filesInfo, err := ioutil.ReadDir(rootDir)
if err != nil {
return -1, err
}
for _, fileInfo := range filesInfo {
name := fileInfo.Name()
if fileInfo.IsDir() || !isBlockFileName(name) {
logger.Debugf("Skipping File name = %s", name)
continue
}
fileSuffix := strings.TrimPrefix(name, blockfilePrefix)
fileNum, err := strconv.Atoi(fileSuffix)
if err != nil {
return -1, err
}
if fileNum > biggestFileNum {
biggestFileNum = fileNum
}
}
logger.Debugf("retrieveLastFileSuffix() - biggestFileNum = %d", biggestFileNum)
return biggestFileNum, err
}

func isBlockFileName(name string) bool {
return strings.HasPrefix(name, blockfilePrefix)
}

func getFileInfoOrPanic(rootDir string, fileNum int) os.FileInfo {
filePath := deriveBlockfilePath(rootDir, fileNum)
fileInfo, err := os.Lstat(filePath)
if err != nil {
panic(fmt.Errorf("Error in retrieving file info for file num = %d", fileNum))
}
return fileInfo
}
87 changes: 87 additions & 0 deletions common/ledger/blkstorage/fsblkstorage/blockfile_helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package fsblkstorage

import (
"os"
"testing"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/common/ledger/util"
)

func TestConstructCheckpointInfoFromBlockFiles(t *testing.T) {
testPath := "/tmp/tests/fabric/common/ledger/blkstorage/fsblkstorage"
ledgerid := "testLedger"
conf := NewConf(testPath, 0)
blkStoreDir := conf.getLedgerBlockDir(ledgerid)
env := newTestEnv(t, conf)
util.CreateDirIfMissing(blkStoreDir)
defer env.Cleanup()

// checkpoint constructed on an empty block folder should return CPInfo with isChainEmpty: true
cpInfo, err := constructCheckpointInfoFromBlockFiles(blkStoreDir)
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, cpInfo, &checkpointInfo{isChainEmpty: true, lastBlockNumber: 0, latestFileChunksize: 0, latestFileChunkSuffixNum: 0})

w := newTestBlockfileWrapper(env, ledgerid)
defer w.close()
blockfileMgr := w.blockfileMgr
bg, gb := testutil.NewBlockGenerator(t, ledgerid, false)

// Add a few blocks and verify that cpinfo derived from filesystem should be same as from the blockfile manager
blockfileMgr.addBlock(gb)
for _, blk := range bg.NextTestBlocks(3) {
blockfileMgr.addBlock(blk)
}
checkCPInfoFromFile(t, blkStoreDir, blockfileMgr.cpInfo)

// Move the chain to new file and check cpinfo derived from file system
blockfileMgr.moveToNextFile()
checkCPInfoFromFile(t, blkStoreDir, blockfileMgr.cpInfo)

// Add a few blocks that would go to new file and verify that cpinfo derived from filesystem should be same as from the blockfile manager
for _, blk := range bg.NextTestBlocks(3) {
blockfileMgr.addBlock(blk)
}
checkCPInfoFromFile(t, blkStoreDir, blockfileMgr.cpInfo)

// Write a partial block (to simulate a crash) and verify that cpinfo derived from filesystem should be same as from the blockfile manager
lastTestBlk := bg.NextTestBlocks(1)[0]
blockBytes, _, err := serializeBlock(lastTestBlk)
testutil.AssertNoError(t, err, "")
partialByte := append(proto.EncodeVarint(uint64(len(blockBytes))), blockBytes[len(blockBytes)/2:]...)
blockfileMgr.currentFileWriter.append(partialByte, true)
checkCPInfoFromFile(t, blkStoreDir, blockfileMgr.cpInfo)

// Close the block storage, drop the index and restart and verify
cpInfoBeforeClose := blockfileMgr.cpInfo
w.close()
env.provider.Close()
indexFolder := conf.getIndexDir()
testutil.AssertNoError(t, os.RemoveAll(indexFolder), "")

env = newTestEnv(t, conf)
w = newTestBlockfileWrapper(env, ledgerid)
blockfileMgr = w.blockfileMgr
testutil.AssertEquals(t, blockfileMgr.cpInfo, cpInfoBeforeClose)

lastBlkIndexed, err := blockfileMgr.index.getLastBlockIndexed()
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, lastBlkIndexed, uint64(6))

// Add the last block again after start and check cpinfo again
testutil.AssertNoError(t, blockfileMgr.addBlock(lastTestBlk), "")
checkCPInfoFromFile(t, blkStoreDir, blockfileMgr.cpInfo)
}

func checkCPInfoFromFile(t *testing.T, blkStoreDir string, expectedCPInfo *checkpointInfo) {
cpInfo, err := constructCheckpointInfoFromBlockFiles(blkStoreDir)
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, cpInfo, expectedCPInfo)
}
57 changes: 41 additions & 16 deletions common/ledger/blkstorage/fsblkstorage/blockfile_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"sync"
"sync/atomic"

"github.com/davecgh/go-spew/spew"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
Expand Down Expand Up @@ -112,16 +114,23 @@ func newBlockfileMgr(id string, conf *Conf, indexConfig *blkstorage.IndexConfig,
if err != nil {
panic(fmt.Sprintf("Could not get block file info for current block file from db: %s", err))
}
if cpInfo == nil { //if no cpInfo stored in db initiate to zero
cpInfo = &checkpointInfo{0, 0, true, 0}
err = mgr.saveCurrentInfo(cpInfo, true)
if err != nil {
panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
if cpInfo == nil {
logger.Info(`No info about blocks file found in the db.
This could happen if this is the first time the ledger is constructed or the index is dropped.
Scanning blocks dir for the latest info`)
if cpInfo, err = constructCheckpointInfoFromBlockFiles(rootDir); err != nil {
panic(fmt.Sprintf("Could not build checkpoint info from block files: %s", err))
}
logger.Infof("Info constructed by scanning the blocks dir = %s", spew.Sdump(cpInfo))
} else {
logger.Info(`Synching the info about block files`)
syncCPInfoFromFS(rootDir, cpInfo)
}
err = mgr.saveCurrentInfo(cpInfo, true)
if err != nil {
panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
}
//Verify that the checkpoint stored in db is accurate with what is actually stored in block file system
// If not the same, sync the cpInfo and the file system
syncCPInfoFromFS(rootDir, cpInfo)

//Open a writer to the file identified by the number and truncate it to only contain the latest block
// that was completely saved (file system, index, cpinfo, etc)
currentFileWriter, err := newBlockfileWriter(deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum))
Expand Down Expand Up @@ -193,7 +202,7 @@ func syncCPInfoFromFS(rootDir string, cpInfo *checkpointInfo) {
return
}
//Scan the file system to verify that the checkpoint info stored in db is correct
endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(
_, endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(
rootDir, cpInfo.latestFileChunkSuffixNum, int64(cpInfo.latestFileChunksize))
if err != nil {
panic(fmt.Sprintf("Could not open current file for detecting last block in the file: %s", err))
Expand Down Expand Up @@ -325,25 +334,36 @@ func (mgr *blockfileMgr) syncIndex() error {
}
indexEmpty = true
}

//initialize index to file number:zero, offset:zero and blockNum:0
startFileNum := 0
startOffset := 0
blockNum := uint64(0)
skipFirstBlock := false
//get the last file that blocks were added to using the checkpoint info
endFileNum := mgr.cpInfo.latestFileChunkSuffixNum
startingBlockNum := uint64(0)

//if the index stored in the db has value, update the index information with those values
if !indexEmpty {
if lastBlockIndexed == mgr.cpInfo.lastBlockNumber {
logger.Infof("Both the block files and indices are in sync.")
return nil
}
logger.Infof("Last block indexed [%d], Last block present in block files=[%d]", lastBlockIndexed, mgr.cpInfo.lastBlockNumber)
var flp *fileLocPointer
if flp, err = mgr.index.getBlockLocByBlockNum(lastBlockIndexed); err != nil {
return err
}
startFileNum = flp.fileSuffixNum
startOffset = flp.locPointer.offset
blockNum = lastBlockIndexed
skipFirstBlock = true
startingBlockNum = lastBlockIndexed + 1
} else {
logger.Infof("No block indexed, Last block present in block files=[%d]", mgr.cpInfo.lastBlockNumber)
}

logger.Infof("Start building index from block [%d]", startingBlockNum)

//open a blockstream to the file location that was stored in the index
var stream *blockStream
if stream, err = newBlockStream(mgr.rootDir, startFileNum, int64(startOffset), endFileNum); err != nil {
Expand All @@ -365,6 +385,7 @@ func (mgr *blockfileMgr) syncIndex() error {
//Should be at the last block already, but go ahead and loop looking for next blockBytes.
//If there is another block, add it to the index.
//This will ensure block indexes are correct, for example if peer had crashed before indexes got updated.
blockIdxInfo := &blockIdxInfo{}
for {
if blockBytes, blockPlacementInfo, err = stream.nextBlockBytesAndPlacementInfo(); err != nil {
return err
Expand All @@ -385,7 +406,6 @@ func (mgr *blockfileMgr) syncIndex() error {
}

//Update the blockIndexInfo with what was actually stored in file system
blockIdxInfo := &blockIdxInfo{}
blockIdxInfo.blockHash = info.blockHeader.Hash()
blockIdxInfo.blockNum = info.blockHeader.Number
blockIdxInfo.flp = &fileLocPointer{fileSuffixNum: blockPlacementInfo.fileNum,
Expand All @@ -397,8 +417,11 @@ func (mgr *blockfileMgr) syncIndex() error {
if err = mgr.index.indexBlock(blockIdxInfo); err != nil {
return err
}
blockNum++
if blockIdxInfo.blockNum%10000 == 0 {
logger.Infof("Indexed block number [%d]", blockIdxInfo.blockNum)
}
}
logger.Infof("Finished building index. Last block indexed [%d]", blockIdxInfo.blockNum)
return nil
}

Expand Down Expand Up @@ -581,12 +604,13 @@ func (mgr *blockfileMgr) saveCurrentInfo(i *checkpointInfo, sync bool) error {

// scanForLastCompleteBlock scan a given block file and detects the last offset in the file
// after which there may lie a block partially written (towards the end of the file in a crash scenario).
func scanForLastCompleteBlock(rootDir string, fileNum int, startingOffset int64) (int64, int, error) {
func scanForLastCompleteBlock(rootDir string, fileNum int, startingOffset int64) ([]byte, int64, int, error) {
//scan the passed file number suffix starting from the passed offset to find the last completed block
numBlocks := 0
var lastBlockBytes []byte
blockStream, errOpen := newBlockfileStream(rootDir, fileNum, startingOffset)
if errOpen != nil {
return 0, 0, errOpen
return nil, 0, 0, errOpen
}
defer blockStream.close()
var errRead error
Expand All @@ -596,6 +620,7 @@ func scanForLastCompleteBlock(rootDir string, fileNum int, startingOffset int64)
if blockBytes == nil || errRead != nil {
break
}
lastBlockBytes = blockBytes
numBlocks++
}
if errRead == ErrUnexpectedEndOfBlockfile {
Expand All @@ -605,7 +630,7 @@ func scanForLastCompleteBlock(rootDir string, fileNum int, startingOffset int64)
errRead = nil
}
logger.Debugf("scanForLastCompleteBlock(): last complete block ends at offset=[%d]", blockStream.currentOffset)
return blockStream.currentOffset, numBlocks, errRead
return lastBlockBytes, blockStream.currentOffset, numBlocks, errRead
}

// checkpointInfo
Expand Down
10 changes: 8 additions & 2 deletions common/ledger/blkstorage/fsblkstorage/blockfile_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ func TestBlockFileScanSmallTxOnly(t *testing.T) {
_, fileSize, err := util.FileExists(filePath)
testutil.AssertNoError(t, err, "")

endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(env.provider.conf.getLedgerBlockDir(ledgerid), 0, 0)
lastBlockBytes, endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(env.provider.conf.getLedgerBlockDir(ledgerid), 0, 0)
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, numBlocks, len(blocks))
testutil.AssertEquals(t, endOffsetLastBlock, fileSize)

expectedLastBlockBytes, _, err := serializeBlock(blocks[len(blocks)-1])
testutil.AssertEquals(t, lastBlockBytes, expectedLastBlockBytes)
}

func TestBlockFileScanSmallTxLastTxIncomplete(t *testing.T) {
Expand All @@ -72,7 +75,10 @@ func TestBlockFileScanSmallTxLastTxIncomplete(t *testing.T) {
err = file.Truncate(fileSize - 1)
testutil.AssertNoError(t, err, "")

_, numBlocks, err := scanForLastCompleteBlock(env.provider.conf.getLedgerBlockDir(ledgerid), 0, 0)
lastBlockBytes, _, numBlocks, err := scanForLastCompleteBlock(env.provider.conf.getLedgerBlockDir(ledgerid), 0, 0)
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, numBlocks, len(blocks)-1)

expectedLastBlockBytes, _, err := serializeBlock(blocks[len(blocks)-2])
testutil.AssertEquals(t, lastBlockBytes, expectedLastBlockBytes)
}
3 changes: 2 additions & 1 deletion common/ledger/blkstorage/fsblkstorage/blockindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
}

batch.Put(indexCheckpointKey, encodeBlockNum(blockIdxInfo.blockNum))
if err := index.db.WriteBatch(batch, false); err != nil {
// Setting snyc to true as a precaution, false may be an ok optimization after further testing.
if err := index.db.WriteBatch(batch, true); err != nil {
return err
}
return nil
Expand Down
Loading

0 comments on commit 4fb3abd

Please sign in to comment.