diff --git a/core/ledger/blkstorage/blockstorage.go b/core/ledger/blkstorage/blockstorage.go index 458826f591c..a52f0138952 100644 --- a/core/ledger/blkstorage/blockstorage.go +++ b/core/ledger/blkstorage/blockstorage.go @@ -30,9 +30,10 @@ type IndexableAttr string // constants for indexable attributes const ( - IndexableAttrBlockNum = IndexableAttr("BlockNum") - IndexableAttrBlockHash = IndexableAttr("BlockHash") - IndexableAttrTxID = IndexableAttr("TxID") + IndexableAttrBlockNum = IndexableAttr("BlockNum") + IndexableAttrBlockHash = IndexableAttr("BlockHash") + IndexableAttrTxID = IndexableAttr("TxID") + IndexableAttrBlockNumTranNum = IndexableAttr("BlockNumTranNum") ) // IndexConfig - a configuration that includes a list of attributes that should be indexed diff --git a/core/ledger/blkstorage/fsblkstorage/block_serialization.go b/core/ledger/blkstorage/fsblkstorage/block_serialization.go index b53a8824b27..bd95e1d9cd6 100644 --- a/core/ledger/blkstorage/fsblkstorage/block_serialization.go +++ b/core/ledger/blkstorage/fsblkstorage/block_serialization.go @@ -27,7 +27,13 @@ import ( type serializedBlockInfo struct { blockHeader *common.BlockHeader - txOffsets map[string]*locPointer + txOffsets []*txindexInfo +} + +//The order of the transactions must be maintained for history +type txindexInfo struct { + txID string + loc *locPointer } func serializeBlock(block *common.Block) ([]byte, *serializedBlockInfo, error) { @@ -94,8 +100,9 @@ func addHeaderBytes(blockHeader *common.BlockHeader, buf *proto.Buffer) error { return nil } -func addDataBytes(blockData *common.BlockData, buf *proto.Buffer) (map[string]*locPointer, error) { - txOffsets := make(map[string]*locPointer) +func addDataBytes(blockData *common.BlockData, buf *proto.Buffer) ([]*txindexInfo, error) { + var txOffsets []*txindexInfo + if err := buf.EncodeVarint(uint64(len(blockData.Data))); err != nil { return nil, err } @@ -108,7 +115,8 @@ func addDataBytes(blockData *common.BlockData, buf *proto.Buffer) (map[string]*l if err := buf.EncodeRawBytes(txEnvelopeBytes); err != nil { return nil, err } - txOffsets[txid] = &locPointer{offset, len(buf.Bytes()) - offset} + idxInfo := &txindexInfo{txid, &locPointer{offset, len(buf.Bytes()) - offset}} + txOffsets = append(txOffsets, idxInfo) } return txOffsets, nil } @@ -147,9 +155,9 @@ func extractHeader(buf *ledgerutil.Buffer) (*common.BlockHeader, error) { return header, nil } -func extractData(buf *ledgerutil.Buffer) (*common.BlockData, map[string]*locPointer, error) { +func extractData(buf *ledgerutil.Buffer) (*common.BlockData, []*txindexInfo, error) { data := &common.BlockData{} - txOffsets := make(map[string]*locPointer) + var txOffsets []*txindexInfo var numItems uint64 var err error @@ -167,7 +175,8 @@ func extractData(buf *ledgerutil.Buffer) (*common.BlockData, map[string]*locPoin return nil, nil, err } data.Data = append(data.Data, txEnvBytes) - txOffsets[txid] = &locPointer{txOffset, buf.GetBytesConsumed() - txOffset} + idxInfo := &txindexInfo{txid, &locPointer{txOffset, buf.GetBytesConsumed() - txOffset}} + txOffsets = append(txOffsets, idxInfo) } return data, txOffsets, nil } diff --git a/core/ledger/blkstorage/fsblkstorage/block_serialization_test.go b/core/ledger/blkstorage/fsblkstorage/block_serialization_test.go index 759e3ec55b1..4313d52be7a 100644 --- a/core/ledger/blkstorage/fsblkstorage/block_serialization_test.go +++ b/core/ledger/blkstorage/fsblkstorage/block_serialization_test.go @@ -50,12 +50,16 @@ func TestSerializedBlockInfo(t *testing.T) { testutil.AssertNoError(t, err, "") testutil.AssertEquals(t, infoFromBB, info) testutil.AssertEquals(t, len(info.txOffsets), len(block.Data.Data)) - for _, txEnvBytes := range block.Data.Data { + for txIndex, txEnvBytes := range block.Data.Data { txid, err := extractTxID(txEnvBytes) testutil.AssertNoError(t, err, "") - offset, ok := info.txOffsets[txid] - testutil.AssertEquals(t, ok, true) - b := bb[offset.offset:] + + indexInfo := info.txOffsets[txIndex] + indexTxID := indexInfo.txID + indexOffset := indexInfo.loc + + testutil.AssertEquals(t, txid, indexTxID) + b := bb[indexOffset.offset:] len, num := proto.DecodeVarint(b) txEnvBytesFromBB := b[num : num+int(len)] testutil.AssertEquals(t, txEnvBytesFromBB, txEnvBytes) diff --git a/core/ledger/blkstorage/fsblkstorage/blockfile_mgr.go b/core/ledger/blkstorage/fsblkstorage/blockfile_mgr.go index 424ee49bf6e..e23144cd918 100644 --- a/core/ledger/blkstorage/fsblkstorage/blockfile_mgr.go +++ b/core/ledger/blkstorage/fsblkstorage/blockfile_mgr.go @@ -304,7 +304,7 @@ func (mgr *blockfileMgr) addBlock(block *common.Block) error { blockFLP.offset = currentOffset // shift the txoffset because we prepend length of bytes before block bytes for _, txOffset := range txOffsets { - txOffset.offset += len(blockBytesEncodedLen) + txOffset.loc.offset += len(blockBytesEncodedLen) } //save the index in the database mgr.index.indexBlock(&blockIdxInfo{ @@ -363,7 +363,7 @@ func (mgr *blockfileMgr) syncIndex() error { return err } for _, offset := range info.txOffsets { - offset.offset += int(blockPlacementInfo.blockBytesOffset) + offset.loc.offset += int(blockPlacementInfo.blockBytesOffset) } //Update the blockIndexInfo with what was actually stored in file system blockIdxInfo := &blockIdxInfo{} @@ -456,6 +456,15 @@ func (mgr *blockfileMgr) retrieveTransactionByID(txID string) (*pb.Transaction, return mgr.fetchTransaction(loc) } +func (mgr *blockfileMgr) retrieveTransactionForBlockNumTranNum(blockNum uint64, tranNum uint64) (*pb.Transaction, error) { + logger.Debugf("retrieveTransactionForBlockNumTranNum() - blockNum = [%d], tranNum = [%d]", blockNum, tranNum) + loc, err := mgr.index.getTXLocForBlockNumTranNum(blockNum, tranNum) + if err != nil { + return nil, err + } + return mgr.fetchTransaction(loc) +} + func (mgr *blockfileMgr) fetchBlock(lp *fileLocPointer) (*common.Block, error) { blockBytes, err := mgr.fetchBlockBytes(lp) if err != nil { diff --git a/core/ledger/blkstorage/fsblkstorage/blockindex.go b/core/ledger/blkstorage/fsblkstorage/blockindex.go index 70bfad8d4c4..c2c1f01c56a 100644 --- a/core/ledger/blkstorage/fsblkstorage/blockindex.go +++ b/core/ledger/blkstorage/fsblkstorage/blockindex.go @@ -27,10 +27,11 @@ import ( ) const ( - blockNumIdxKeyPrefix = 'n' - blockHashIdxKeyPrefix = 'h' - txIDIdxKeyPrefix = 't' - indexCheckpointKeyStr = "indexCheckpointKey" + blockNumIdxKeyPrefix = 'n' + blockHashIdxKeyPrefix = 'h' + txIDIdxKeyPrefix = 't' + blockNumTranNumIdxKeyPrefix = 'a' + indexCheckpointKeyStr = "indexCheckpointKey" ) var indexCheckpointKey = []byte(indexCheckpointKeyStr) @@ -41,13 +42,14 @@ type index interface { getBlockLocByHash(blockHash []byte) (*fileLocPointer, error) getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error) getTxLoc(txID string) (*fileLocPointer, error) + getTXLocForBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error) } type blockIdxInfo struct { blockNum uint64 blockHash []byte flp *fileLocPointer - txOffsets map[string]*locPointer + txOffsets []*txindexInfo } type blockIndex struct { @@ -89,25 +91,42 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error { return err } + //Index1 if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockHash]; ok { batch.Put(constructBlockHashKey(blockIdxInfo.blockHash), flpBytes) } + //Index2 if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNum]; ok { batch.Put(constructBlockNumKey(blockIdxInfo.blockNum), flpBytes) } + //Index3 Used to find a transactin by it's transaction id if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxID]; ok { - for txid, txoffset := range txOffsets { - txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset) - logger.Debugf("Adding txLoc [%s] for tx [%s] to index", txFlp, txid) + for _, txoffset := range txOffsets { + txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc) + logger.Debugf("Adding txLoc [%s] for tx ID: [%s] to index", txFlp, txoffset.txID) txFlpBytes, marshalErr := txFlp.marshal() if marshalErr != nil { return marshalErr } - batch.Put(constructTxIDKey(txid), txFlpBytes) + batch.Put(constructTxIDKey(txoffset.txID), txFlpBytes) } } + + //Index4 - Store BlockNumTranNum will be used to query history data + if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNumTranNum]; ok { + for txIterator, txoffset := range txOffsets { + txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc) + logger.Debugf("Adding txLoc [%s] for tx number:[%d] ID: [%s] to blockNumTranNum index", txFlp, txIterator+1, txoffset.txID) + txFlpBytes, marshalErr := txFlp.marshal() + if marshalErr != nil { + return marshalErr + } + batch.Put(constructBlockNumTranNumKey(blockIdxInfo.blockNum, uint64(txIterator+1)), txFlpBytes) + } + } + batch.Put(indexCheckpointKey, encodeBlockNum(blockIdxInfo.blockNum)) if err := index.db.WriteBatch(batch, false); err != nil { return err @@ -163,6 +182,22 @@ func (index *blockIndex) getTxLoc(txID string) (*fileLocPointer, error) { return txFLP, nil } +func (index *blockIndex) getTXLocForBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error) { + if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNumTranNum]; !ok { + return nil, blkstorage.ErrAttrNotIndexed + } + b, err := index.db.Get(constructBlockNumTranNumKey(blockNum, tranNum)) + if err != nil { + return nil, err + } + if b == nil { + return nil, blkstorage.ErrNotFoundInIndex + } + txFLP := &fileLocPointer{} + txFLP.unmarshal(b) + return txFLP, nil +} + func constructBlockNumKey(blockNum uint64) []byte { blkNumBytes := util.EncodeOrderPreservingVarUint64(blockNum) return append([]byte{blockNumIdxKeyPrefix}, blkNumBytes...) @@ -176,6 +211,13 @@ func constructTxIDKey(txID string) []byte { return append([]byte{txIDIdxKeyPrefix}, []byte(txID)...) } +func constructBlockNumTranNumKey(blockNum uint64, txNum uint64) []byte { + blkNumBytes := util.EncodeOrderPreservingVarUint64(blockNum) + tranNumBytes := util.EncodeOrderPreservingVarUint64(txNum) + key := append(blkNumBytes, tranNumBytes...) + return append([]byte{blockNumTranNumIdxKeyPrefix}, key...) +} + func constructTxID(blockNum uint64, txNum int) string { return fmt.Sprintf("%d:%d", blockNum, txNum) } diff --git a/core/ledger/blkstorage/fsblkstorage/blockindex_test.go b/core/ledger/blkstorage/fsblkstorage/blockindex_test.go index 8d05f15fe79..65c48a12764 100644 --- a/core/ledger/blkstorage/fsblkstorage/blockindex_test.go +++ b/core/ledger/blkstorage/fsblkstorage/blockindex_test.go @@ -42,6 +42,9 @@ func (i *noopIndex) getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, err func (i *noopIndex) getTxLoc(txID string) (*fileLocPointer, error) { return nil, nil } +func (i *noopIndex) getTXLocForBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error) { + return nil, nil +} func TestBlockIndexSync(t *testing.T) { testBlockIndexSync(t, 10, 5, false) @@ -103,7 +106,9 @@ func TestBlockIndexSelectiveIndexing(t *testing.T) { testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockHash}) testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockNum}) testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrTxID}) + testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockNumTranNum}) testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockHash, blkstorage.IndexableAttrBlockNum}) + testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrTxID, blkstorage.IndexableAttrBlockNumTranNum}) } func testBlockIndexSelectiveIndexing(t *testing.T, indexItems []blkstorage.IndexableAttr) { @@ -149,4 +154,15 @@ func testBlockIndexSelectiveIndexing(t *testing.T, indexItems []blkstorage.Index } else { testutil.AssertSame(t, err, blkstorage.ErrAttrNotIndexed) } + + //test 'retrieveTrasnactionsByBlockNumTranNum + tx2, err := blockfileMgr.retrieveTransactionForBlockNumTranNum(1, 1) + if testutil.Contains(indexItems, blkstorage.IndexableAttrBlockNumTranNum) { + testutil.AssertNoError(t, err, "Error while retrieving tx by blockNum and tranNum") + txOrig2, err2 := extractTransaction(blocks[0].Data.Data[0]) + testutil.AssertNoError(t, err2, "") + testutil.AssertEquals(t, tx2, txOrig2) + } else { + testutil.AssertSame(t, err, blkstorage.ErrAttrNotIndexed) + } } diff --git a/core/ledger/blkstorage/fsblkstorage/pkg_test.go b/core/ledger/blkstorage/fsblkstorage/pkg_test.go index 013e8f6a5b4..2788cc2581a 100644 --- a/core/ledger/blkstorage/fsblkstorage/pkg_test.go +++ b/core/ledger/blkstorage/fsblkstorage/pkg_test.go @@ -39,6 +39,7 @@ func newTestEnv(t testing.TB) *testEnv { blkstorage.IndexableAttrBlockHash, blkstorage.IndexableAttrBlockNum, blkstorage.IndexableAttrTxID, + blkstorage.IndexableAttrBlockNumTranNum, } os.RemoveAll(conf.dbPath) os.RemoveAll(conf.blockfilesDir) diff --git a/core/ledger/kvledger/kv_ledger.go b/core/ledger/kvledger/kv_ledger.go index a169a02da18..008c1aeb2c2 100644 --- a/core/ledger/kvledger/kv_ledger.go +++ b/core/ledger/kvledger/kv_ledger.go @@ -74,6 +74,7 @@ func NewKVLedger(conf *Conf) (*KVLedger, error) { blkstorage.IndexableAttrBlockHash, blkstorage.IndexableAttrBlockNum, blkstorage.IndexableAttrTxID, + blkstorage.IndexableAttrBlockNumTranNum, } indexConfig := &blkstorage.IndexConfig{AttrsToIndex: attrsToIndex} blockStorageConf := fsblkstorage.NewConf(conf.blockStorageDir, conf.maxBlockfileSize) diff --git a/core/ledger/kvledger/kv_ledger_test.go b/core/ledger/kvledger/kv_ledger_test.go index 23dd3a17e7d..9114d5363f1 100644 --- a/core/ledger/kvledger/kv_ledger_test.go +++ b/core/ledger/kvledger/kv_ledger_test.go @@ -179,7 +179,7 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) { simulator, _ := ledger.NewTxSimulator() simulator.SetState("ns1", "key4", []byte("value1")) simulator.SetState("ns1", "key5", []byte("value2")) - simulator.SetState("ns1", "key6", []byte("{\"shipmentID\":\"161003PKC7300\",\"customsInvoice\":{\"methodOfTransport\":\"GROUND\",\"invoiceNumber\":\"00091624\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")) + simulator.SetState("ns1", "key6", []byte("{\"shipmentID\":\"161003PKC7300\",\"customsInvoice\":{\"methodOfTransport\":\"GROUND\",\"invoiceNumber\":\"00091622\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")) simulator.SetState("ns1", "key7", []byte("{\"shipmentID\":\"161003PKC7600\",\"customsInvoice\":{\"methodOfTransport\":\"AIR MAYBE\",\"invoiceNumber\":\"00091624\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")) simulator.Done() simRes, _ := simulator.GetTxSimulationResults() @@ -195,6 +195,7 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) { Height: 1, CurrentBlockHash: block1Hash, PreviousBlockHash: []byte{}}) //Note key 4 and 6 are updates but key 7 is new. I.E. should see history for key 4 and 6 if history is enabled + simulationResults := [][]byte{} simulator, _ = ledger.NewTxSimulator() simulator.SetState("ns1", "key4", []byte("value3")) simulator.SetState("ns1", "key5", []byte("{\"shipmentID\":\"161003PKC7500\",\"customsInvoice\":{\"methodOfTransport\":\"AIR FREIGHT\",\"invoiceNumber\":\"00091623\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")) @@ -203,7 +204,16 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) { simulator.SetState("ns1", "key8", []byte("{\"shipmentID\":\"161003PKC7700\",\"customsInvoice\":{\"methodOfTransport\":\"SHIP\",\"invoiceNumber\":\"00091625\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")) simulator.Done() simRes, _ = simulator.GetTxSimulationResults() - block2 := bg.NextBlock([][]byte{simRes}, false) + simulationResults = append(simulationResults, simRes) + //add a 2nd transaction + simulator2, _ := ledger.NewTxSimulator() + simulator2.SetState("ns1", "key9", []byte("value5")) + simulator2.SetState("ns1", "key10", []byte("{\"shipmentID\":\"261003PKC8000\",\"customsInvoice\":{\"methodOfTransport\":\"DONKEY\",\"invoiceNumber\":\"00091626\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")) + simulator2.Done() + simRes2, _ := simulator2.GetTxSimulationResults() + simulationResults = append(simulationResults, simRes2) + + block2 := bg.NextBlock(simulationResults, false) ledger.RemoveInvalidTransactionsAndPrepare(block2) ledger.Commit() @@ -225,6 +235,6 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) { testutil.AssertEquals(t, b2, block2) if ledgerconfig.IsHistoryDBEnabled() == true { - //TODO history specific test + //TODO history specific test once the query api's are in and we can validate content } }