-
Notifications
You must be signed in to change notification settings - Fork 8.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[FAB-6779] Allow rebuilding block storage indexes
This CR allows building of block storage indexes. For rebuilding the indexes, existing index folder would need to be dropped. However, please note that this would drop (and rebuild) the indexes for all the chains because they share the underlying leveldb. Also, enabled the flush/synch of batch writting to leveldb (statedb, block indexes, and historydb). Change-Id: I6a926ab765df4bbb6543d6a3960359d95d60fd68 Signed-off-by: manish <manish.sethi@gmail.com>
- Loading branch information
1 parent
c8efd6a
commit 1daabff
Showing
7 changed files
with
259 additions
and
21 deletions.
There are no files selected for viewing
117 changes: 117 additions & 0 deletions
117
common/ledger/blkstorage/fsblkstorage/blockfile_helper.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
/* | ||
Copyright IBM Corp. All Rights Reserved. | ||
SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package fsblkstorage | ||
|
||
import ( | ||
"fmt" | ||
"io/ioutil" | ||
"os" | ||
"strconv" | ||
"strings" | ||
|
||
"github.com/davecgh/go-spew/spew" | ||
"github.com/hyperledger/fabric/protos/common" | ||
) | ||
|
||
// constructCheckpointInfoFromBlockFiles scans the last blockfile (if any) and construct the checkpoint info | ||
// if the last file contains no block or only a partially written block (potentially because of a crash while writing block to the file), | ||
// this scans the second last file (if any) | ||
func constructCheckpointInfoFromBlockFiles(rootDir string) (*checkpointInfo, error) { | ||
logger.Debugf("Retrieving checkpoint info from block files") | ||
var lastFileNum int | ||
var numBlocksInFile int | ||
var endOffsetLastBlock int64 | ||
var lastBlockNumber uint64 | ||
|
||
var lastBlockBytes []byte | ||
var lastBlock *common.Block | ||
var err error | ||
|
||
if lastFileNum, err = retrieveLastFileSuffix(rootDir); err != nil { | ||
return nil, err | ||
} | ||
logger.Debugf("Last file number found = %d", lastFileNum) | ||
|
||
if lastFileNum == -1 { | ||
cpInfo := &checkpointInfo{0, 0, true, 0} | ||
logger.Info("No block file found") | ||
return cpInfo, nil | ||
} | ||
|
||
fileInfo := getFileInfoOrPanic(rootDir, lastFileNum) | ||
logger.Infof("Last Block file info: FileName=[%s], FileSize=[%d]", fileInfo.Name(), fileInfo.Size()) | ||
if lastBlockBytes, endOffsetLastBlock, numBlocksInFile, err = scanForLastCompleteBlock(rootDir, lastFileNum, 0); err != nil { | ||
logger.Errorf("Error while scanning last file [file num=%d]: %s", lastFileNum, err) | ||
return nil, err | ||
} | ||
|
||
if numBlocksInFile == 0 && lastFileNum > 0 { | ||
secondLastFileNum := lastFileNum - 1 | ||
fileInfo := getFileInfoOrPanic(rootDir, secondLastFileNum) | ||
logger.Infof("Second last Block file info: FileName=[%s], FileSize=[%d]", fileInfo.Name(), fileInfo.Size()) | ||
if lastBlockBytes, _, _, err = scanForLastCompleteBlock(rootDir, secondLastFileNum, 0); err != nil { | ||
logger.Errorf("Error while scanning second last file [file num=%d]: %s", secondLastFileNum, err) | ||
return nil, err | ||
} | ||
} | ||
|
||
if lastBlockBytes != nil { | ||
if lastBlock, err = deserializeBlock(lastBlockBytes); err != nil { | ||
logger.Errorf("Error deserializing last block: %s. Block bytes length = %d", err, len(lastBlockBytes)) | ||
return nil, err | ||
} | ||
lastBlockNumber = lastBlock.Header.Number | ||
} | ||
|
||
cpInfo := &checkpointInfo{ | ||
lastBlockNumber: lastBlockNumber, | ||
latestFileChunksize: int(endOffsetLastBlock), | ||
latestFileChunkSuffixNum: lastFileNum, | ||
isChainEmpty: lastFileNum == 0 && numBlocksInFile == 0, | ||
} | ||
logger.Debugf("Checkpoint info constructed from file system = %s", spew.Sdump(cpInfo)) | ||
return cpInfo, nil | ||
} | ||
|
||
func retrieveLastFileSuffix(rootDir string) (int, error) { | ||
logger.Debugf("retrieveLastFileSuffix()") | ||
biggestFileNum := -1 | ||
filesInfo, err := ioutil.ReadDir(rootDir) | ||
if err != nil { | ||
return -1, err | ||
} | ||
for _, fileInfo := range filesInfo { | ||
name := fileInfo.Name() | ||
if fileInfo.IsDir() || !isBlockFileName(name) { | ||
logger.Debugf("Skipping File name = %s", name) | ||
continue | ||
} | ||
fileSuffix := strings.TrimPrefix(name, blockfilePrefix) | ||
fileNum, err := strconv.Atoi(fileSuffix) | ||
if err != nil { | ||
return -1, err | ||
} | ||
if fileNum > biggestFileNum { | ||
biggestFileNum = fileNum | ||
} | ||
} | ||
logger.Debugf("retrieveLastFileSuffix() - biggestFileNum = %d", biggestFileNum) | ||
return biggestFileNum, err | ||
} | ||
|
||
func isBlockFileName(name string) bool { | ||
return strings.HasPrefix(name, blockfilePrefix) | ||
} | ||
|
||
func getFileInfoOrPanic(rootDir string, fileNum int) os.FileInfo { | ||
filePath := deriveBlockfilePath(rootDir, fileNum) | ||
fileInfo, err := os.Lstat(filePath) | ||
if err != nil { | ||
panic(fmt.Errorf("Error in retrieving file info for file num = %d", fileNum)) | ||
} | ||
return fileInfo | ||
} |
87 changes: 87 additions & 0 deletions
87
common/ledger/blkstorage/fsblkstorage/blockfile_helper_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
/* | ||
Copyright IBM Corp. All Rights Reserved. | ||
SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package fsblkstorage | ||
|
||
import ( | ||
"os" | ||
"testing" | ||
|
||
"github.com/golang/protobuf/proto" | ||
"github.com/hyperledger/fabric/common/ledger/testutil" | ||
"github.com/hyperledger/fabric/common/ledger/util" | ||
) | ||
|
||
func TestConstructCheckpointInfoFromBlockFiles(t *testing.T) { | ||
testPath := "/tmp/tests/fabric/common/ledger/blkstorage/fsblkstorage" | ||
ledgerid := "testLedger" | ||
conf := NewConf(testPath, 0) | ||
blkStoreDir := conf.getLedgerBlockDir(ledgerid) | ||
env := newTestEnv(t, conf) | ||
util.CreateDirIfMissing(blkStoreDir) | ||
defer env.Cleanup() | ||
|
||
// checkpoint constructed on an empty block folder should return CPInfo with isChainEmpty: true | ||
cpInfo, err := constructCheckpointInfoFromBlockFiles(blkStoreDir) | ||
testutil.AssertNoError(t, err, "") | ||
testutil.AssertEquals(t, cpInfo, &checkpointInfo{isChainEmpty: true, lastBlockNumber: 0, latestFileChunksize: 0, latestFileChunkSuffixNum: 0}) | ||
|
||
w := newTestBlockfileWrapper(env, ledgerid) | ||
defer w.close() | ||
blockfileMgr := w.blockfileMgr | ||
bg, gb := testutil.NewBlockGenerator(t, ledgerid, false) | ||
|
||
// Add a few blocks and verify that cpinfo derived from filesystem should be same as from the blockfile manager | ||
blockfileMgr.addBlock(gb) | ||
for _, blk := range bg.NextTestBlocks(3) { | ||
blockfileMgr.addBlock(blk) | ||
} | ||
checkCPInfoFromFile(t, blkStoreDir, blockfileMgr.cpInfo) | ||
|
||
// Move the chain to new file and check cpinfo derived from file system | ||
blockfileMgr.moveToNextFile() | ||
checkCPInfoFromFile(t, blkStoreDir, blockfileMgr.cpInfo) | ||
|
||
// Add a few blocks that would go to new file and verify that cpinfo derived from filesystem should be same as from the blockfile manager | ||
for _, blk := range bg.NextTestBlocks(3) { | ||
blockfileMgr.addBlock(blk) | ||
} | ||
checkCPInfoFromFile(t, blkStoreDir, blockfileMgr.cpInfo) | ||
|
||
// Write a partial block (to simulate a crash) and verify that cpinfo derived from filesystem should be same as from the blockfile manager | ||
lastTestBlk := bg.NextTestBlocks(1)[0] | ||
blockBytes, _, err := serializeBlock(lastTestBlk) | ||
testutil.AssertNoError(t, err, "") | ||
partialByte := append(proto.EncodeVarint(uint64(len(blockBytes))), blockBytes[len(blockBytes)/2:]...) | ||
blockfileMgr.currentFileWriter.append(partialByte, true) | ||
checkCPInfoFromFile(t, blkStoreDir, blockfileMgr.cpInfo) | ||
|
||
// Close the block storage, drop the index and restart and verify | ||
cpInfoBeforeClose := blockfileMgr.cpInfo | ||
w.close() | ||
env.provider.Close() | ||
indexFolder := conf.getIndexDir() | ||
testutil.AssertNoError(t, os.RemoveAll(indexFolder), "") | ||
|
||
env = newTestEnv(t, conf) | ||
w = newTestBlockfileWrapper(env, ledgerid) | ||
blockfileMgr = w.blockfileMgr | ||
testutil.AssertEquals(t, blockfileMgr.cpInfo, cpInfoBeforeClose) | ||
|
||
lastBlkIndexed, err := blockfileMgr.index.getLastBlockIndexed() | ||
testutil.AssertNoError(t, err, "") | ||
testutil.AssertEquals(t, lastBlkIndexed, uint64(6)) | ||
|
||
// Add the last block again after start and check cpinfo again | ||
testutil.AssertNoError(t, blockfileMgr.addBlock(lastTestBlk), "") | ||
checkCPInfoFromFile(t, blkStoreDir, blockfileMgr.cpInfo) | ||
} | ||
|
||
func checkCPInfoFromFile(t *testing.T, blkStoreDir string, expectedCPInfo *checkpointInfo) { | ||
cpInfo, err := constructCheckpointInfoFromBlockFiles(blkStoreDir) | ||
testutil.AssertNoError(t, err, "") | ||
testutil.AssertEquals(t, cpInfo, expectedCPInfo) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.