From 458c5212fc556f3985a258d775a02ca3fec216dc Mon Sep 17 00:00:00 2001 From: Mari Wade Date: Fri, 9 Dec 2016 13:52:10 -0500 Subject: [PATCH] FAB-1336 Add new ledger blockstorage index. Add a new ledger blockstorage index for History that will map (blocknum,trannum) to the file storage location for this block transaction This index will be used for the API GetTransactionsForKey() for (chaincode1,key1). It will do a key range query on chaincode1~key1 to pick up all chaincode1~key1 records. Results will indicate the set of (blocknum,trannum) transactions that updated this key. Change-Id: I81da09e5526d7e2966634c78a03d34011d514442 Signed-off-by: Mari Wade --- core/ledger/blkstorage/blockstorage.go | 7 ++- .../fsblkstorage/block_serialization.go | 23 ++++--- .../fsblkstorage/block_serialization_test.go | 12 ++-- .../blkstorage/fsblkstorage/blockfile_mgr.go | 13 +++- .../blkstorage/fsblkstorage/blockindex.go | 60 ++++++++++++++++--- .../fsblkstorage/blockindex_test.go | 16 +++++ .../blkstorage/fsblkstorage/pkg_test.go | 1 + core/ledger/kvledger/kv_ledger.go | 1 + core/ledger/kvledger/kv_ledger_test.go | 16 ++++- 9 files changed, 121 insertions(+), 28 deletions(-) diff --git a/core/ledger/blkstorage/blockstorage.go b/core/ledger/blkstorage/blockstorage.go index 458826f591c..a52f0138952 100644 --- a/core/ledger/blkstorage/blockstorage.go +++ b/core/ledger/blkstorage/blockstorage.go @@ -30,9 +30,10 @@ type IndexableAttr string // constants for indexable attributes const ( - IndexableAttrBlockNum = IndexableAttr("BlockNum") - IndexableAttrBlockHash = IndexableAttr("BlockHash") - IndexableAttrTxID = IndexableAttr("TxID") + IndexableAttrBlockNum = IndexableAttr("BlockNum") + IndexableAttrBlockHash = IndexableAttr("BlockHash") + IndexableAttrTxID = IndexableAttr("TxID") + IndexableAttrBlockNumTranNum = IndexableAttr("BlockNumTranNum") ) // IndexConfig - a configuration that includes a list of attributes that should be indexed diff --git a/core/ledger/blkstorage/fsblkstorage/block_serialization.go b/core/ledger/blkstorage/fsblkstorage/block_serialization.go index b53a8824b27..bd95e1d9cd6 100644 --- a/core/ledger/blkstorage/fsblkstorage/block_serialization.go +++ b/core/ledger/blkstorage/fsblkstorage/block_serialization.go @@ -27,7 +27,13 @@ import ( type serializedBlockInfo struct { blockHeader *common.BlockHeader - txOffsets map[string]*locPointer + txOffsets []*txindexInfo +} + +//The order of the transactions must be maintained for history +type txindexInfo struct { + txID string + loc *locPointer } func serializeBlock(block *common.Block) ([]byte, *serializedBlockInfo, error) { @@ -94,8 +100,9 @@ func addHeaderBytes(blockHeader *common.BlockHeader, buf *proto.Buffer) error { return nil } -func addDataBytes(blockData *common.BlockData, buf *proto.Buffer) (map[string]*locPointer, error) { - txOffsets := make(map[string]*locPointer) +func addDataBytes(blockData *common.BlockData, buf *proto.Buffer) ([]*txindexInfo, error) { + var txOffsets []*txindexInfo + if err := buf.EncodeVarint(uint64(len(blockData.Data))); err != nil { return nil, err } @@ -108,7 +115,8 @@ func addDataBytes(blockData *common.BlockData, buf *proto.Buffer) (map[string]*l if err := buf.EncodeRawBytes(txEnvelopeBytes); err != nil { return nil, err } - txOffsets[txid] = &locPointer{offset, len(buf.Bytes()) - offset} + idxInfo := &txindexInfo{txid, &locPointer{offset, len(buf.Bytes()) - offset}} + txOffsets = append(txOffsets, idxInfo) } return txOffsets, nil } @@ -147,9 +155,9 @@ func extractHeader(buf *ledgerutil.Buffer) (*common.BlockHeader, error) { return header, nil } -func extractData(buf *ledgerutil.Buffer) (*common.BlockData, map[string]*locPointer, error) { +func extractData(buf *ledgerutil.Buffer) (*common.BlockData, []*txindexInfo, error) { data := &common.BlockData{} - txOffsets := make(map[string]*locPointer) + var txOffsets []*txindexInfo var numItems uint64 var err error @@ -167,7 +175,8 @@ func extractData(buf *ledgerutil.Buffer) (*common.BlockData, map[string]*locPoin return nil, nil, err } data.Data = append(data.Data, txEnvBytes) - txOffsets[txid] = &locPointer{txOffset, buf.GetBytesConsumed() - txOffset} + idxInfo := &txindexInfo{txid, &locPointer{txOffset, buf.GetBytesConsumed() - txOffset}} + txOffsets = append(txOffsets, idxInfo) } return data, txOffsets, nil } diff --git a/core/ledger/blkstorage/fsblkstorage/block_serialization_test.go b/core/ledger/blkstorage/fsblkstorage/block_serialization_test.go index 759e3ec55b1..4313d52be7a 100644 --- a/core/ledger/blkstorage/fsblkstorage/block_serialization_test.go +++ b/core/ledger/blkstorage/fsblkstorage/block_serialization_test.go @@ -50,12 +50,16 @@ func TestSerializedBlockInfo(t *testing.T) { testutil.AssertNoError(t, err, "") testutil.AssertEquals(t, infoFromBB, info) testutil.AssertEquals(t, len(info.txOffsets), len(block.Data.Data)) - for _, txEnvBytes := range block.Data.Data { + for txIndex, txEnvBytes := range block.Data.Data { txid, err := extractTxID(txEnvBytes) testutil.AssertNoError(t, err, "") - offset, ok := info.txOffsets[txid] - testutil.AssertEquals(t, ok, true) - b := bb[offset.offset:] + + indexInfo := info.txOffsets[txIndex] + indexTxID := indexInfo.txID + indexOffset := indexInfo.loc + + testutil.AssertEquals(t, txid, indexTxID) + b := bb[indexOffset.offset:] len, num := proto.DecodeVarint(b) txEnvBytesFromBB := b[num : num+int(len)] testutil.AssertEquals(t, txEnvBytesFromBB, txEnvBytes) diff --git a/core/ledger/blkstorage/fsblkstorage/blockfile_mgr.go b/core/ledger/blkstorage/fsblkstorage/blockfile_mgr.go index 424ee49bf6e..e23144cd918 100644 --- a/core/ledger/blkstorage/fsblkstorage/blockfile_mgr.go +++ b/core/ledger/blkstorage/fsblkstorage/blockfile_mgr.go @@ -304,7 +304,7 @@ func (mgr *blockfileMgr) addBlock(block *common.Block) error { blockFLP.offset = currentOffset // shift the txoffset because we prepend length of bytes before block bytes for _, txOffset := range txOffsets { - txOffset.offset += len(blockBytesEncodedLen) + txOffset.loc.offset += len(blockBytesEncodedLen) } //save the index in the database mgr.index.indexBlock(&blockIdxInfo{ @@ -363,7 +363,7 @@ func (mgr *blockfileMgr) syncIndex() error { return err } for _, offset := range info.txOffsets { - offset.offset += int(blockPlacementInfo.blockBytesOffset) + offset.loc.offset += int(blockPlacementInfo.blockBytesOffset) } //Update the blockIndexInfo with what was actually stored in file system blockIdxInfo := &blockIdxInfo{} @@ -456,6 +456,15 @@ func (mgr *blockfileMgr) retrieveTransactionByID(txID string) (*pb.Transaction, return mgr.fetchTransaction(loc) } +func (mgr *blockfileMgr) retrieveTransactionForBlockNumTranNum(blockNum uint64, tranNum uint64) (*pb.Transaction, error) { + logger.Debugf("retrieveTransactionForBlockNumTranNum() - blockNum = [%d], tranNum = [%d]", blockNum, tranNum) + loc, err := mgr.index.getTXLocForBlockNumTranNum(blockNum, tranNum) + if err != nil { + return nil, err + } + return mgr.fetchTransaction(loc) +} + func (mgr *blockfileMgr) fetchBlock(lp *fileLocPointer) (*common.Block, error) { blockBytes, err := mgr.fetchBlockBytes(lp) if err != nil { diff --git a/core/ledger/blkstorage/fsblkstorage/blockindex.go b/core/ledger/blkstorage/fsblkstorage/blockindex.go index 70bfad8d4c4..c2c1f01c56a 100644 --- a/core/ledger/blkstorage/fsblkstorage/blockindex.go +++ b/core/ledger/blkstorage/fsblkstorage/blockindex.go @@ -27,10 +27,11 @@ import ( ) const ( - blockNumIdxKeyPrefix = 'n' - blockHashIdxKeyPrefix = 'h' - txIDIdxKeyPrefix = 't' - indexCheckpointKeyStr = "indexCheckpointKey" + blockNumIdxKeyPrefix = 'n' + blockHashIdxKeyPrefix = 'h' + txIDIdxKeyPrefix = 't' + blockNumTranNumIdxKeyPrefix = 'a' + indexCheckpointKeyStr = "indexCheckpointKey" ) var indexCheckpointKey = []byte(indexCheckpointKeyStr) @@ -41,13 +42,14 @@ type index interface { getBlockLocByHash(blockHash []byte) (*fileLocPointer, error) getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error) getTxLoc(txID string) (*fileLocPointer, error) + getTXLocForBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error) } type blockIdxInfo struct { blockNum uint64 blockHash []byte flp *fileLocPointer - txOffsets map[string]*locPointer + txOffsets []*txindexInfo } type blockIndex struct { @@ -89,25 +91,42 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error { return err } + //Index1 if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockHash]; ok { batch.Put(constructBlockHashKey(blockIdxInfo.blockHash), flpBytes) } + //Index2 if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNum]; ok { batch.Put(constructBlockNumKey(blockIdxInfo.blockNum), flpBytes) } + //Index3 Used to find a transactin by it's transaction id if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxID]; ok { - for txid, txoffset := range txOffsets { - txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset) - logger.Debugf("Adding txLoc [%s] for tx [%s] to index", txFlp, txid) + for _, txoffset := range txOffsets { + txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc) + logger.Debugf("Adding txLoc [%s] for tx ID: [%s] to index", txFlp, txoffset.txID) txFlpBytes, marshalErr := txFlp.marshal() if marshalErr != nil { return marshalErr } - batch.Put(constructTxIDKey(txid), txFlpBytes) + batch.Put(constructTxIDKey(txoffset.txID), txFlpBytes) } } + + //Index4 - Store BlockNumTranNum will be used to query history data + if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNumTranNum]; ok { + for txIterator, txoffset := range txOffsets { + txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc) + logger.Debugf("Adding txLoc [%s] for tx number:[%d] ID: [%s] to blockNumTranNum index", txFlp, txIterator+1, txoffset.txID) + txFlpBytes, marshalErr := txFlp.marshal() + if marshalErr != nil { + return marshalErr + } + batch.Put(constructBlockNumTranNumKey(blockIdxInfo.blockNum, uint64(txIterator+1)), txFlpBytes) + } + } + batch.Put(indexCheckpointKey, encodeBlockNum(blockIdxInfo.blockNum)) if err := index.db.WriteBatch(batch, false); err != nil { return err @@ -163,6 +182,22 @@ func (index *blockIndex) getTxLoc(txID string) (*fileLocPointer, error) { return txFLP, nil } +func (index *blockIndex) getTXLocForBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error) { + if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNumTranNum]; !ok { + return nil, blkstorage.ErrAttrNotIndexed + } + b, err := index.db.Get(constructBlockNumTranNumKey(blockNum, tranNum)) + if err != nil { + return nil, err + } + if b == nil { + return nil, blkstorage.ErrNotFoundInIndex + } + txFLP := &fileLocPointer{} + txFLP.unmarshal(b) + return txFLP, nil +} + func constructBlockNumKey(blockNum uint64) []byte { blkNumBytes := util.EncodeOrderPreservingVarUint64(blockNum) return append([]byte{blockNumIdxKeyPrefix}, blkNumBytes...) @@ -176,6 +211,13 @@ func constructTxIDKey(txID string) []byte { return append([]byte{txIDIdxKeyPrefix}, []byte(txID)...) } +func constructBlockNumTranNumKey(blockNum uint64, txNum uint64) []byte { + blkNumBytes := util.EncodeOrderPreservingVarUint64(blockNum) + tranNumBytes := util.EncodeOrderPreservingVarUint64(txNum) + key := append(blkNumBytes, tranNumBytes...) + return append([]byte{blockNumTranNumIdxKeyPrefix}, key...) +} + func constructTxID(blockNum uint64, txNum int) string { return fmt.Sprintf("%d:%d", blockNum, txNum) } diff --git a/core/ledger/blkstorage/fsblkstorage/blockindex_test.go b/core/ledger/blkstorage/fsblkstorage/blockindex_test.go index 8d05f15fe79..65c48a12764 100644 --- a/core/ledger/blkstorage/fsblkstorage/blockindex_test.go +++ b/core/ledger/blkstorage/fsblkstorage/blockindex_test.go @@ -42,6 +42,9 @@ func (i *noopIndex) getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, err func (i *noopIndex) getTxLoc(txID string) (*fileLocPointer, error) { return nil, nil } +func (i *noopIndex) getTXLocForBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error) { + return nil, nil +} func TestBlockIndexSync(t *testing.T) { testBlockIndexSync(t, 10, 5, false) @@ -103,7 +106,9 @@ func TestBlockIndexSelectiveIndexing(t *testing.T) { testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockHash}) testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockNum}) testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrTxID}) + testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockNumTranNum}) testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockHash, blkstorage.IndexableAttrBlockNum}) + testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrTxID, blkstorage.IndexableAttrBlockNumTranNum}) } func testBlockIndexSelectiveIndexing(t *testing.T, indexItems []blkstorage.IndexableAttr) { @@ -149,4 +154,15 @@ func testBlockIndexSelectiveIndexing(t *testing.T, indexItems []blkstorage.Index } else { testutil.AssertSame(t, err, blkstorage.ErrAttrNotIndexed) } + + //test 'retrieveTrasnactionsByBlockNumTranNum + tx2, err := blockfileMgr.retrieveTransactionForBlockNumTranNum(1, 1) + if testutil.Contains(indexItems, blkstorage.IndexableAttrBlockNumTranNum) { + testutil.AssertNoError(t, err, "Error while retrieving tx by blockNum and tranNum") + txOrig2, err2 := extractTransaction(blocks[0].Data.Data[0]) + testutil.AssertNoError(t, err2, "") + testutil.AssertEquals(t, tx2, txOrig2) + } else { + testutil.AssertSame(t, err, blkstorage.ErrAttrNotIndexed) + } } diff --git a/core/ledger/blkstorage/fsblkstorage/pkg_test.go b/core/ledger/blkstorage/fsblkstorage/pkg_test.go index 013e8f6a5b4..2788cc2581a 100644 --- a/core/ledger/blkstorage/fsblkstorage/pkg_test.go +++ b/core/ledger/blkstorage/fsblkstorage/pkg_test.go @@ -39,6 +39,7 @@ func newTestEnv(t testing.TB) *testEnv { blkstorage.IndexableAttrBlockHash, blkstorage.IndexableAttrBlockNum, blkstorage.IndexableAttrTxID, + blkstorage.IndexableAttrBlockNumTranNum, } os.RemoveAll(conf.dbPath) os.RemoveAll(conf.blockfilesDir) diff --git a/core/ledger/kvledger/kv_ledger.go b/core/ledger/kvledger/kv_ledger.go index a169a02da18..008c1aeb2c2 100644 --- a/core/ledger/kvledger/kv_ledger.go +++ b/core/ledger/kvledger/kv_ledger.go @@ -74,6 +74,7 @@ func NewKVLedger(conf *Conf) (*KVLedger, error) { blkstorage.IndexableAttrBlockHash, blkstorage.IndexableAttrBlockNum, blkstorage.IndexableAttrTxID, + blkstorage.IndexableAttrBlockNumTranNum, } indexConfig := &blkstorage.IndexConfig{AttrsToIndex: attrsToIndex} blockStorageConf := fsblkstorage.NewConf(conf.blockStorageDir, conf.maxBlockfileSize) diff --git a/core/ledger/kvledger/kv_ledger_test.go b/core/ledger/kvledger/kv_ledger_test.go index 23dd3a17e7d..9114d5363f1 100644 --- a/core/ledger/kvledger/kv_ledger_test.go +++ b/core/ledger/kvledger/kv_ledger_test.go @@ -179,7 +179,7 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) { simulator, _ := ledger.NewTxSimulator() simulator.SetState("ns1", "key4", []byte("value1")) simulator.SetState("ns1", "key5", []byte("value2")) - simulator.SetState("ns1", "key6", []byte("{\"shipmentID\":\"161003PKC7300\",\"customsInvoice\":{\"methodOfTransport\":\"GROUND\",\"invoiceNumber\":\"00091624\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")) + simulator.SetState("ns1", "key6", []byte("{\"shipmentID\":\"161003PKC7300\",\"customsInvoice\":{\"methodOfTransport\":\"GROUND\",\"invoiceNumber\":\"00091622\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")) simulator.SetState("ns1", "key7", []byte("{\"shipmentID\":\"161003PKC7600\",\"customsInvoice\":{\"methodOfTransport\":\"AIR MAYBE\",\"invoiceNumber\":\"00091624\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")) simulator.Done() simRes, _ := simulator.GetTxSimulationResults() @@ -195,6 +195,7 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) { Height: 1, CurrentBlockHash: block1Hash, PreviousBlockHash: []byte{}}) //Note key 4 and 6 are updates but key 7 is new. I.E. should see history for key 4 and 6 if history is enabled + simulationResults := [][]byte{} simulator, _ = ledger.NewTxSimulator() simulator.SetState("ns1", "key4", []byte("value3")) simulator.SetState("ns1", "key5", []byte("{\"shipmentID\":\"161003PKC7500\",\"customsInvoice\":{\"methodOfTransport\":\"AIR FREIGHT\",\"invoiceNumber\":\"00091623\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")) @@ -203,7 +204,16 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) { simulator.SetState("ns1", "key8", []byte("{\"shipmentID\":\"161003PKC7700\",\"customsInvoice\":{\"methodOfTransport\":\"SHIP\",\"invoiceNumber\":\"00091625\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")) simulator.Done() simRes, _ = simulator.GetTxSimulationResults() - block2 := bg.NextBlock([][]byte{simRes}, false) + simulationResults = append(simulationResults, simRes) + //add a 2nd transaction + simulator2, _ := ledger.NewTxSimulator() + simulator2.SetState("ns1", "key9", []byte("value5")) + simulator2.SetState("ns1", "key10", []byte("{\"shipmentID\":\"261003PKC8000\",\"customsInvoice\":{\"methodOfTransport\":\"DONKEY\",\"invoiceNumber\":\"00091626\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")) + simulator2.Done() + simRes2, _ := simulator2.GetTxSimulationResults() + simulationResults = append(simulationResults, simRes2) + + block2 := bg.NextBlock(simulationResults, false) ledger.RemoveInvalidTransactionsAndPrepare(block2) ledger.Commit() @@ -225,6 +235,6 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) { testutil.AssertEquals(t, b2, block2) if ledgerconfig.IsHistoryDBEnabled() == true { - //TODO history specific test + //TODO history specific test once the query api's are in and we can validate content } }