Skip to content

Commit

Permalink
Merge "FAB-1336 Add new ledger blockstorage index."
Browse files Browse the repository at this point in the history
  • Loading branch information
Srinivasan Muralidharan authored and Gerrit Code Review committed Dec 13, 2016
2 parents 4a29b63 + 458c521 commit 4c63856
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 28 deletions.
7 changes: 4 additions & 3 deletions core/ledger/blkstorage/blockstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ type IndexableAttr string

// constants for indexable attributes
const (
IndexableAttrBlockNum = IndexableAttr("BlockNum")
IndexableAttrBlockHash = IndexableAttr("BlockHash")
IndexableAttrTxID = IndexableAttr("TxID")
IndexableAttrBlockNum = IndexableAttr("BlockNum")
IndexableAttrBlockHash = IndexableAttr("BlockHash")
IndexableAttrTxID = IndexableAttr("TxID")
IndexableAttrBlockNumTranNum = IndexableAttr("BlockNumTranNum")
)

// IndexConfig - a configuration that includes a list of attributes that should be indexed
Expand Down
23 changes: 16 additions & 7 deletions core/ledger/blkstorage/fsblkstorage/block_serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ import (

type serializedBlockInfo struct {
blockHeader *common.BlockHeader
txOffsets map[string]*locPointer
txOffsets []*txindexInfo
}

//The order of the transactions must be maintained for history
type txindexInfo struct {
txID string
loc *locPointer
}

func serializeBlock(block *common.Block) ([]byte, *serializedBlockInfo, error) {
Expand Down Expand Up @@ -94,8 +100,9 @@ func addHeaderBytes(blockHeader *common.BlockHeader, buf *proto.Buffer) error {
return nil
}

func addDataBytes(blockData *common.BlockData, buf *proto.Buffer) (map[string]*locPointer, error) {
txOffsets := make(map[string]*locPointer)
func addDataBytes(blockData *common.BlockData, buf *proto.Buffer) ([]*txindexInfo, error) {
var txOffsets []*txindexInfo

if err := buf.EncodeVarint(uint64(len(blockData.Data))); err != nil {
return nil, err
}
Expand All @@ -108,7 +115,8 @@ func addDataBytes(blockData *common.BlockData, buf *proto.Buffer) (map[string]*l
if err := buf.EncodeRawBytes(txEnvelopeBytes); err != nil {
return nil, err
}
txOffsets[txid] = &locPointer{offset, len(buf.Bytes()) - offset}
idxInfo := &txindexInfo{txid, &locPointer{offset, len(buf.Bytes()) - offset}}
txOffsets = append(txOffsets, idxInfo)
}
return txOffsets, nil
}
Expand Down Expand Up @@ -147,9 +155,9 @@ func extractHeader(buf *ledgerutil.Buffer) (*common.BlockHeader, error) {
return header, nil
}

func extractData(buf *ledgerutil.Buffer) (*common.BlockData, map[string]*locPointer, error) {
func extractData(buf *ledgerutil.Buffer) (*common.BlockData, []*txindexInfo, error) {
data := &common.BlockData{}
txOffsets := make(map[string]*locPointer)
var txOffsets []*txindexInfo
var numItems uint64
var err error

Expand All @@ -167,7 +175,8 @@ func extractData(buf *ledgerutil.Buffer) (*common.BlockData, map[string]*locPoin
return nil, nil, err
}
data.Data = append(data.Data, txEnvBytes)
txOffsets[txid] = &locPointer{txOffset, buf.GetBytesConsumed() - txOffset}
idxInfo := &txindexInfo{txid, &locPointer{txOffset, buf.GetBytesConsumed() - txOffset}}
txOffsets = append(txOffsets, idxInfo)
}
return data, txOffsets, nil
}
Expand Down
12 changes: 8 additions & 4 deletions core/ledger/blkstorage/fsblkstorage/block_serialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,16 @@ func TestSerializedBlockInfo(t *testing.T) {
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, infoFromBB, info)
testutil.AssertEquals(t, len(info.txOffsets), len(block.Data.Data))
for _, txEnvBytes := range block.Data.Data {
for txIndex, txEnvBytes := range block.Data.Data {
txid, err := extractTxID(txEnvBytes)
testutil.AssertNoError(t, err, "")
offset, ok := info.txOffsets[txid]
testutil.AssertEquals(t, ok, true)
b := bb[offset.offset:]

indexInfo := info.txOffsets[txIndex]
indexTxID := indexInfo.txID
indexOffset := indexInfo.loc

testutil.AssertEquals(t, txid, indexTxID)
b := bb[indexOffset.offset:]
len, num := proto.DecodeVarint(b)
txEnvBytesFromBB := b[num : num+int(len)]
testutil.AssertEquals(t, txEnvBytesFromBB, txEnvBytes)
Expand Down
13 changes: 11 additions & 2 deletions core/ledger/blkstorage/fsblkstorage/blockfile_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (mgr *blockfileMgr) addBlock(block *common.Block) error {
blockFLP.offset = currentOffset
// shift the txoffset because we prepend length of bytes before block bytes
for _, txOffset := range txOffsets {
txOffset.offset += len(blockBytesEncodedLen)
txOffset.loc.offset += len(blockBytesEncodedLen)
}
//save the index in the database
mgr.index.indexBlock(&blockIdxInfo{
Expand Down Expand Up @@ -363,7 +363,7 @@ func (mgr *blockfileMgr) syncIndex() error {
return err
}
for _, offset := range info.txOffsets {
offset.offset += int(blockPlacementInfo.blockBytesOffset)
offset.loc.offset += int(blockPlacementInfo.blockBytesOffset)
}
//Update the blockIndexInfo with what was actually stored in file system
blockIdxInfo := &blockIdxInfo{}
Expand Down Expand Up @@ -456,6 +456,15 @@ func (mgr *blockfileMgr) retrieveTransactionByID(txID string) (*pb.Transaction,
return mgr.fetchTransaction(loc)
}

func (mgr *blockfileMgr) retrieveTransactionForBlockNumTranNum(blockNum uint64, tranNum uint64) (*pb.Transaction, error) {
logger.Debugf("retrieveTransactionForBlockNumTranNum() - blockNum = [%d], tranNum = [%d]", blockNum, tranNum)
loc, err := mgr.index.getTXLocForBlockNumTranNum(blockNum, tranNum)
if err != nil {
return nil, err
}
return mgr.fetchTransaction(loc)
}

func (mgr *blockfileMgr) fetchBlock(lp *fileLocPointer) (*common.Block, error) {
blockBytes, err := mgr.fetchBlockBytes(lp)
if err != nil {
Expand Down
60 changes: 51 additions & 9 deletions core/ledger/blkstorage/fsblkstorage/blockindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ import (
)

const (
blockNumIdxKeyPrefix = 'n'
blockHashIdxKeyPrefix = 'h'
txIDIdxKeyPrefix = 't'
indexCheckpointKeyStr = "indexCheckpointKey"
blockNumIdxKeyPrefix = 'n'
blockHashIdxKeyPrefix = 'h'
txIDIdxKeyPrefix = 't'
blockNumTranNumIdxKeyPrefix = 'a'
indexCheckpointKeyStr = "indexCheckpointKey"
)

var indexCheckpointKey = []byte(indexCheckpointKeyStr)
Expand All @@ -41,13 +42,14 @@ type index interface {
getBlockLocByHash(blockHash []byte) (*fileLocPointer, error)
getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error)
getTxLoc(txID string) (*fileLocPointer, error)
getTXLocForBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error)
}

type blockIdxInfo struct {
blockNum uint64
blockHash []byte
flp *fileLocPointer
txOffsets map[string]*locPointer
txOffsets []*txindexInfo
}

type blockIndex struct {
Expand Down Expand Up @@ -89,25 +91,42 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
return err
}

//Index1
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockHash]; ok {
batch.Put(constructBlockHashKey(blockIdxInfo.blockHash), flpBytes)
}

//Index2
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNum]; ok {
batch.Put(constructBlockNumKey(blockIdxInfo.blockNum), flpBytes)
}

//Index3 Used to find a transactin by it's transaction id
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxID]; ok {
for txid, txoffset := range txOffsets {
txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset)
logger.Debugf("Adding txLoc [%s] for tx [%s] to index", txFlp, txid)
for _, txoffset := range txOffsets {
txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc)
logger.Debugf("Adding txLoc [%s] for tx ID: [%s] to index", txFlp, txoffset.txID)
txFlpBytes, marshalErr := txFlp.marshal()
if marshalErr != nil {
return marshalErr
}
batch.Put(constructTxIDKey(txid), txFlpBytes)
batch.Put(constructTxIDKey(txoffset.txID), txFlpBytes)
}
}

//Index4 - Store BlockNumTranNum will be used to query history data
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNumTranNum]; ok {
for txIterator, txoffset := range txOffsets {
txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc)
logger.Debugf("Adding txLoc [%s] for tx number:[%d] ID: [%s] to blockNumTranNum index", txFlp, txIterator+1, txoffset.txID)
txFlpBytes, marshalErr := txFlp.marshal()
if marshalErr != nil {
return marshalErr
}
batch.Put(constructBlockNumTranNumKey(blockIdxInfo.blockNum, uint64(txIterator+1)), txFlpBytes)
}
}

batch.Put(indexCheckpointKey, encodeBlockNum(blockIdxInfo.blockNum))
if err := index.db.WriteBatch(batch, false); err != nil {
return err
Expand Down Expand Up @@ -163,6 +182,22 @@ func (index *blockIndex) getTxLoc(txID string) (*fileLocPointer, error) {
return txFLP, nil
}

func (index *blockIndex) getTXLocForBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error) {
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNumTranNum]; !ok {
return nil, blkstorage.ErrAttrNotIndexed
}
b, err := index.db.Get(constructBlockNumTranNumKey(blockNum, tranNum))
if err != nil {
return nil, err
}
if b == nil {
return nil, blkstorage.ErrNotFoundInIndex
}
txFLP := &fileLocPointer{}
txFLP.unmarshal(b)
return txFLP, nil
}

func constructBlockNumKey(blockNum uint64) []byte {
blkNumBytes := util.EncodeOrderPreservingVarUint64(blockNum)
return append([]byte{blockNumIdxKeyPrefix}, blkNumBytes...)
Expand All @@ -176,6 +211,13 @@ func constructTxIDKey(txID string) []byte {
return append([]byte{txIDIdxKeyPrefix}, []byte(txID)...)
}

func constructBlockNumTranNumKey(blockNum uint64, txNum uint64) []byte {
blkNumBytes := util.EncodeOrderPreservingVarUint64(blockNum)
tranNumBytes := util.EncodeOrderPreservingVarUint64(txNum)
key := append(blkNumBytes, tranNumBytes...)
return append([]byte{blockNumTranNumIdxKeyPrefix}, key...)
}

func constructTxID(blockNum uint64, txNum int) string {
return fmt.Sprintf("%d:%d", blockNum, txNum)
}
Expand Down
16 changes: 16 additions & 0 deletions core/ledger/blkstorage/fsblkstorage/blockindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ func (i *noopIndex) getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, err
func (i *noopIndex) getTxLoc(txID string) (*fileLocPointer, error) {
return nil, nil
}
func (i *noopIndex) getTXLocForBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error) {
return nil, nil
}

func TestBlockIndexSync(t *testing.T) {
testBlockIndexSync(t, 10, 5, false)
Expand Down Expand Up @@ -103,7 +106,9 @@ func TestBlockIndexSelectiveIndexing(t *testing.T) {
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockHash})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockNum})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrTxID})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockNumTranNum})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockHash, blkstorage.IndexableAttrBlockNum})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrTxID, blkstorage.IndexableAttrBlockNumTranNum})
}

func testBlockIndexSelectiveIndexing(t *testing.T, indexItems []blkstorage.IndexableAttr) {
Expand Down Expand Up @@ -149,4 +154,15 @@ func testBlockIndexSelectiveIndexing(t *testing.T, indexItems []blkstorage.Index
} else {
testutil.AssertSame(t, err, blkstorage.ErrAttrNotIndexed)
}

//test 'retrieveTrasnactionsByBlockNumTranNum
tx2, err := blockfileMgr.retrieveTransactionForBlockNumTranNum(1, 1)
if testutil.Contains(indexItems, blkstorage.IndexableAttrBlockNumTranNum) {
testutil.AssertNoError(t, err, "Error while retrieving tx by blockNum and tranNum")
txOrig2, err2 := extractTransaction(blocks[0].Data.Data[0])
testutil.AssertNoError(t, err2, "")
testutil.AssertEquals(t, tx2, txOrig2)
} else {
testutil.AssertSame(t, err, blkstorage.ErrAttrNotIndexed)
}
}
1 change: 1 addition & 0 deletions core/ledger/blkstorage/fsblkstorage/pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func newTestEnv(t testing.TB) *testEnv {
blkstorage.IndexableAttrBlockHash,
blkstorage.IndexableAttrBlockNum,
blkstorage.IndexableAttrTxID,
blkstorage.IndexableAttrBlockNumTranNum,
}
os.RemoveAll(conf.dbPath)
os.RemoveAll(conf.blockfilesDir)
Expand Down
1 change: 1 addition & 0 deletions core/ledger/kvledger/kv_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func NewKVLedger(conf *Conf) (*KVLedger, error) {
blkstorage.IndexableAttrBlockHash,
blkstorage.IndexableAttrBlockNum,
blkstorage.IndexableAttrTxID,
blkstorage.IndexableAttrBlockNumTranNum,
}
indexConfig := &blkstorage.IndexConfig{AttrsToIndex: attrsToIndex}
blockStorageConf := fsblkstorage.NewConf(conf.blockStorageDir, conf.maxBlockfileSize)
Expand Down
16 changes: 13 additions & 3 deletions core/ledger/kvledger/kv_ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) {
simulator, _ := ledger.NewTxSimulator()
simulator.SetState("ns1", "key4", []byte("value1"))
simulator.SetState("ns1", "key5", []byte("value2"))
simulator.SetState("ns1", "key6", []byte("{\"shipmentID\":\"161003PKC7300\",\"customsInvoice\":{\"methodOfTransport\":\"GROUND\",\"invoiceNumber\":\"00091624\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}"))
simulator.SetState("ns1", "key6", []byte("{\"shipmentID\":\"161003PKC7300\",\"customsInvoice\":{\"methodOfTransport\":\"GROUND\",\"invoiceNumber\":\"00091622\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}"))
simulator.SetState("ns1", "key7", []byte("{\"shipmentID\":\"161003PKC7600\",\"customsInvoice\":{\"methodOfTransport\":\"AIR MAYBE\",\"invoiceNumber\":\"00091624\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}"))
simulator.Done()
simRes, _ := simulator.GetTxSimulationResults()
Expand All @@ -195,6 +195,7 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) {
Height: 1, CurrentBlockHash: block1Hash, PreviousBlockHash: []byte{}})

//Note key 4 and 6 are updates but key 7 is new. I.E. should see history for key 4 and 6 if history is enabled
simulationResults := [][]byte{}
simulator, _ = ledger.NewTxSimulator()
simulator.SetState("ns1", "key4", []byte("value3"))
simulator.SetState("ns1", "key5", []byte("{\"shipmentID\":\"161003PKC7500\",\"customsInvoice\":{\"methodOfTransport\":\"AIR FREIGHT\",\"invoiceNumber\":\"00091623\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}"))
Expand All @@ -203,7 +204,16 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) {
simulator.SetState("ns1", "key8", []byte("{\"shipmentID\":\"161003PKC7700\",\"customsInvoice\":{\"methodOfTransport\":\"SHIP\",\"invoiceNumber\":\"00091625\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}"))
simulator.Done()
simRes, _ = simulator.GetTxSimulationResults()
block2 := bg.NextBlock([][]byte{simRes}, false)
simulationResults = append(simulationResults, simRes)
//add a 2nd transaction
simulator2, _ := ledger.NewTxSimulator()
simulator2.SetState("ns1", "key9", []byte("value5"))
simulator2.SetState("ns1", "key10", []byte("{\"shipmentID\":\"261003PKC8000\",\"customsInvoice\":{\"methodOfTransport\":\"DONKEY\",\"invoiceNumber\":\"00091626\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}"))
simulator2.Done()
simRes2, _ := simulator2.GetTxSimulationResults()
simulationResults = append(simulationResults, simRes2)

block2 := bg.NextBlock(simulationResults, false)
ledger.RemoveInvalidTransactionsAndPrepare(block2)
ledger.Commit()

Expand All @@ -225,6 +235,6 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) {
testutil.AssertEquals(t, b2, block2)

if ledgerconfig.IsHistoryDBEnabled() == true {
//TODO history specific test
//TODO history specific test once the query api's are in and we can validate content
}
}

0 comments on commit 4c63856

Please sign in to comment.