diff --git a/core/ledgernext/ReadWriteSet.md b/core/ledgernext/ReadWriteSet.md new file mode 100644 index 00000000000..42651f57648 --- /dev/null +++ b/core/ledgernext/ReadWriteSet.md @@ -0,0 +1,70 @@ +### Read-Write set semantics + +This documents discusses the details of the current implementation about the semantics of read-write sets. + +##### Transaction simulation and read-write set +During simulation of a transaction at an `endorser`, a read-write set is prepared for the transaction. The `read set` contains a list of unique keys and their their committed versions that the transaction reads during simulation. The `write set` contains a list of unique keys (though there can be overlap with the keys present in the read set) and their new values that the transaction writes. A delete marker is set (in the place of new value) for the key if the update performed by the transaction is to delete the key. + +Further, if the transaction writes a value multiple times for a key, only the last written value is retained. Also, if a transaction reads a value for a key that the transaction itself has written before, the last written value is returned instead of the value present in the committed snapshot; one implication of this is that if a transaction writes a value for a key before reading it from the committed snapshot, the key does not appear in the read set of the transaction. + +As noted earlier, the versions of the keys are recorded only in the read set; the write set just contains the list of unique keys and their latest values set by the transaction. + +Following is an illustration of an example read-write set prepared by simulation of an hypothetical transaction. + +``` + + + + + + + + + + +``` + +##### Transaction validation and updating world state using read-write set +A `committer` uses the read set portion of the read-write set for checking the validity of a transaction and the write set portion of the read-write set for updating the versions and the values of the affected keys. + +In the validation phase, a transaction is considered `valid` iff the version of each key present in the read-set of the transaction matches the version for the same key in the world state - assuming all the preceding `valid` transactions (including the preceding transactions in the same block) are committed. + +If a transaction passes the validity check, the committer uses the write set for updating the world state. In the update phase, for each key present in the write set, the value in the world state for the same key is set to the value as specified in the write set. Further, the version of the key in the world state is incremented by one. + +##### Example simulation and validation +This section helps understanding the semantics with the help of an example scenario. +For the purpose of this example, the presence of a key `k` in the world state is represented by a tuple `(k,ver,val)` where `ver` is the latest version of the key `k` having `val` as its value. + +Now, consider a set of file transactions `T1, T2, T3, T4, and T5`, all simulated on the same snapshot of the world state. Following snippet shows the snapshot of the world state against witch the transactions are simulated and the sequence of read and write activities performed by each of these transactions. + +``` +World state: (k1,1,v1), (k2,1,v2), (k3,1,v3), (k4,1,v4), (k5,1,v5) +T1 -> Write(k1, v1'), Write(k2, v2') +T2 -> Read(k1), Write(k3, v3') +T3 -> Write(k2, v2'') +T4 -> Write(k2, v2'''), read(k2) +T5 -> Write(k6, v6'), read(k1) +``` +Now, assume that these transactions are ordered in the sequence of T1,..,T5 (could be contained in a single block or different blocks) + +1. `T1` passes the validation because it does not perform any read. Further, the tuple of keys `k1` and `k2` in the world state are updated to `(k1,2,v1'), (k2,2,v2')` + +2. `T2` fails the validation because it reads a key `k1` which is modified by a preceding transaction `T1` + +3. `T3` passes the validation because it does not perform a read. Further the tuple of the key `k2` in the world state are updated to `(k2,3,v2'')` + +4. `T4` passes the validation because it performs a read the key `k2` after writing the new value (though the key was modified by a preceding transaction `T1`). Further the tuple of the key `k2` in the world state are updated to `(k2,4,v2''')` + +5. `T5` fails the validation because it performs a read for key `k1` which is modified by a preceding transaction `T1` + +#### Transactions with multiple read-write sets +If a transaction contains multiple read-write sets as a result of including different simulations results in a single transaction, the validation also checks for read conflicts between the read-write sets in addition to the read conflicts check with preceding transactions. + +#### Questions +1. In the final block, is there a benefit of persisting read-set portion of the read-write set? The advantage of not storing clearly reduces the storage space requirement. If we chose not to store the read-set, the endorsers should sign only the write set portion of the read-write set which means that the +`actionBytes` field in the `EndorsedAction` would contain only write set and a separate field would be required for the read set. + +2. Is there a benefit of deciding the version for a key in the write-set at simulation time instead of commit time? If we fix the version at the simulation time, then we would have to discard the transactions that have only write conflicts (i.e., some other transaction has written the version). diff --git a/core/ledgernext/blkstorage/blockstorage.go b/core/ledgernext/blkstorage/blockstorage.go new file mode 100644 index 00000000000..8d17c44ef17 --- /dev/null +++ b/core/ledgernext/blkstorage/blockstorage.go @@ -0,0 +1,33 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package blkstorage + +import ( + "github.com/hyperledger/fabric/core/ledgernext" + "github.com/hyperledger/fabric/protos" +) + +// BlockStore - an interface for persisting and retrieving blocks +type BlockStore interface { + AddBlock(block *protos.Block2) error + GetBlockchainInfo() (*protos.BlockchainInfo, error) + RetrieveBlocks(startNum uint64, endNum uint64) (ledger.ResultsIterator, error) + RetrieveBlockByHash(blockHash []byte) (*protos.Block2, error) + RetrieveBlockByNumber(blockNum uint64) (*protos.Block2, error) + RetrieveTxByID(txID string) (*protos.Transaction2, error) + Shutdown() +} diff --git a/core/ledgernext/blkstorage/fsblkstorage/blockfile_mgr.go b/core/ledgernext/blkstorage/fsblkstorage/blockfile_mgr.go new file mode 100644 index 00000000000..8fd26593fa1 --- /dev/null +++ b/core/ledgernext/blkstorage/fsblkstorage/blockfile_mgr.go @@ -0,0 +1,387 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fsblkstorage + +import ( + "fmt" + "sync/atomic" + + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/core/ledgernext/util" + "github.com/hyperledger/fabric/core/ledgernext/util/db" + "github.com/hyperledger/fabric/protos" + "github.com/op/go-logging" + "github.com/tecbot/gorocksdb" +) + +var logger = logging.MustGetLogger("kvledger") + +const ( + blockIndexCF = "blockIndexCF" + blockfilePrefix = "blockfile_" +) + +var ( + blkMgrInfoKey = []byte("blkMgrInfo") +) + +type blockfileMgr struct { + rootDir string + conf *Conf + db *db.DB + defaultCF *gorocksdb.ColumnFamilyHandle + index *blockIndex + cpInfo *checkpointInfo + currentFileWriter *blockfileWriter + bcInfo atomic.Value +} + +func newBlockfileMgr(conf *Conf) *blockfileMgr { + rootDir := conf.blockfilesDir + _, err := util.CreateDirIfMissing(rootDir) + if err != nil { + panic(fmt.Sprintf("Error: %s", err)) + } + db := initDB(conf) + mgr := &blockfileMgr{rootDir: rootDir, conf: conf, db: db, defaultCF: db.GetDefaultCFHandle()} + cpInfo, err := mgr.loadCurrentInfo() + if err != nil { + panic(fmt.Sprintf("Could not get block file info for current block file from db: %s", err)) + } + if cpInfo == nil { + cpInfo = &checkpointInfo{latestFileChunkSuffixNum: 0, latestFileChunksize: 0} + err = mgr.saveCurrentInfo(cpInfo) + if err != nil { + panic(fmt.Sprintf("Could not save next block file info to db: %s", err)) + } + } + currentFileWriter, err := newBlockfileWriter(deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum)) + if err != nil { + panic(fmt.Sprintf("Could not open writer to current file: %s", err)) + } + err = currentFileWriter.truncateFile(cpInfo.latestFileChunksize) + if err != nil { + panic(fmt.Sprintf("Could not truncate current file to known size in db: %s", err)) + } + + mgr.index = newBlockIndex(db) + mgr.cpInfo = cpInfo + mgr.currentFileWriter = currentFileWriter + + // init BlockchainInfo + bcInfo := &protos.BlockchainInfo{Height: 0, CurrentBlockHash: nil, PreviousBlockHash: nil} + if cpInfo.lastBlockNumber > 0 { + lastBlock, err := mgr.retrieveSerBlockByNumber(cpInfo.lastBlockNumber) + if err != nil { + panic(fmt.Sprintf("Could not retrieve last block form file: %s", err)) + } + lastBlockHash := lastBlock.ComputeHash() + previousBlockHash, err := lastBlock.GetPreviousBlockHash() + if err != nil { + panic(fmt.Sprintf("Error in decoding block: %s", err)) + } + bcInfo = &protos.BlockchainInfo{ + Height: cpInfo.lastBlockNumber, + CurrentBlockHash: lastBlockHash, + PreviousBlockHash: previousBlockHash} + } + mgr.bcInfo.Store(bcInfo) + return mgr +} + +func initDB(conf *Conf) *db.DB { + dbInst := db.CreateDB(&db.Conf{DBPath: conf.dbPath, CFNames: []string{blockIndexCF}}) + dbInst.Open() + return dbInst +} + +func deriveBlockfilePath(rootDir string, suffixNum int) string { + return rootDir + "/" + blockfilePrefix + fmt.Sprintf("%06d", suffixNum) +} + +func (mgr *blockfileMgr) open() error { + return mgr.currentFileWriter.open() +} + +func (mgr *blockfileMgr) close() { + mgr.currentFileWriter.close() + mgr.db.Close() +} + +func (mgr *blockfileMgr) moveToNextFile() { + nextFileInfo := &checkpointInfo{latestFileChunkSuffixNum: mgr.cpInfo.latestFileChunkSuffixNum + 1, latestFileChunksize: 0} + nextFileWriter, err := newBlockfileWriter(deriveBlockfilePath(mgr.rootDir, nextFileInfo.latestFileChunkSuffixNum)) + if err != nil { + panic(fmt.Sprintf("Could not open writer to next file: %s", err)) + } + mgr.currentFileWriter.close() + err = mgr.saveCurrentInfo(nextFileInfo) + if err != nil { + panic(fmt.Sprintf("Could not save next block file info to db: %s", err)) + } + mgr.cpInfo = nextFileInfo + mgr.currentFileWriter = nextFileWriter +} + +func (mgr *blockfileMgr) addBlock(block *protos.Block2) error { + serBlock, err := protos.ConstructSerBlock2(block) + if err != nil { + return fmt.Errorf("Error while serializing block: %s", err) + } + blockBytes := serBlock.GetBytes() + blockHash := serBlock.ComputeHash() + txOffsets, err := serBlock.GetTxOffsets() + if err != nil { + return fmt.Errorf("Error while serializing block: %s", err) + } + currentOffset := mgr.cpInfo.latestFileChunksize + length := len(blockBytes) + encodedLen := proto.EncodeVarint(uint64(length)) + totalLen := length + len(encodedLen) + if currentOffset+totalLen > mgr.conf.maxBlockfileSize { + mgr.moveToNextFile() + currentOffset = 0 + } + err = mgr.currentFileWriter.append(encodedLen) + if err != nil { + err1 := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize) + if err1 != nil { + panic(fmt.Sprintf("Could not truncate current file to known size after an error while appending a block: %s", err)) + } + return fmt.Errorf("Error while appending block to file: %s", err) + } + + err = mgr.currentFileWriter.append(blockBytes) + if err != nil { + err1 := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize) + if err1 != nil { + panic(fmt.Sprintf("Could not truncate current file to known size after an error while appending a block: %s", err)) + } + return fmt.Errorf("Error while appending block to file: %s", err) + } + + mgr.cpInfo.latestFileChunksize += totalLen + mgr.cpInfo.lastBlockNumber++ + err = mgr.saveCurrentInfo(mgr.cpInfo) + if err != nil { + mgr.cpInfo.latestFileChunksize -= totalLen + err1 := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize) + if err1 != nil { + panic(fmt.Sprintf("Could not truncate current file to known size after an error while appending a block: %s", err)) + } + return fmt.Errorf("Error while saving current file info to db: %s", err) + } + blockFLP := &fileLocPointer{fileSuffixNum: mgr.cpInfo.latestFileChunkSuffixNum} + blockFLP.offset = currentOffset + mgr.index.indexBlock(mgr.cpInfo.lastBlockNumber, blockHash, blockFLP, length, len(encodedLen), txOffsets) + mgr.updateBlockchainInfo(blockHash, block) + return nil +} + +func (mgr *blockfileMgr) getBlockchainInfo() *protos.BlockchainInfo { + return mgr.bcInfo.Load().(*protos.BlockchainInfo) +} + +func (mgr *blockfileMgr) updateBlockchainInfo(latestBlockHash []byte, latestBlock *protos.Block2) { + currentBCInfo := mgr.getBlockchainInfo() + newBCInfo := &protos.BlockchainInfo{Height: currentBCInfo.Height + 1, CurrentBlockHash: latestBlockHash, PreviousBlockHash: latestBlock.PreviousBlockHash} + mgr.bcInfo.Store(newBCInfo) +} + +func (mgr *blockfileMgr) retrieveBlockByHash(blockHash []byte) (*protos.Block2, error) { + logger.Debugf("retrieveBlockByHash() - blockHash = [%#v]", blockHash) + loc, err := mgr.index.getBlockLocByHash(blockHash) + if err != nil { + return nil, err + } + return mgr.fetchBlock(loc) +} + +func (mgr *blockfileMgr) retrieveBlockByNumber(blockNum uint64) (*protos.Block2, error) { + logger.Debugf("retrieveBlockByNumber() - blockNum = [%d]", blockNum) + loc, err := mgr.index.getBlockLocByBlockNum(blockNum) + if err != nil { + return nil, err + } + return mgr.fetchBlock(loc) +} + +func (mgr *blockfileMgr) retrieveSerBlockByNumber(blockNum uint64) (*protos.SerBlock2, error) { + logger.Debugf("retrieveSerBlockByNumber() - blockNum = [%d]", blockNum) + loc, err := mgr.index.getBlockLocByBlockNum(blockNum) + if err != nil { + return nil, err + } + return mgr.fetchSerBlock(loc) +} + +func (mgr *blockfileMgr) retrieveBlocks(startNum uint64, endNum uint64) (*BlocksItr, error) { + var lp *fileLocPointer + var err error + if lp, err = mgr.index.getBlockLocByBlockNum(startNum); err != nil { + return nil, err + } + filePath := deriveBlockfilePath(mgr.rootDir, lp.fileSuffixNum) + + var stream *blockStream + if stream, err = newBlockStream(filePath, int64(lp.offset)); err != nil { + return nil, err + } + return newBlockItr(stream, int(endNum-startNum)+1), nil +} + +func (mgr *blockfileMgr) retrieveTransactionByID(txID string) (*protos.Transaction2, error) { + logger.Debugf("retrieveTransactionByID() - txId = [%s]", txID) + loc, err := mgr.index.getTxLoc(txID) + if err != nil { + return nil, err + } + return mgr.fetchTransaction(loc) +} + +func (mgr *blockfileMgr) fetchBlock(lp *fileLocPointer) (*protos.Block2, error) { + serBlock, err := mgr.fetchSerBlock(lp) + if err != nil { + return nil, err + } + block, err := serBlock.ToBlock2() + if err != nil { + return nil, err + } + return block, nil +} + +func (mgr *blockfileMgr) fetchSerBlock(lp *fileLocPointer) (*protos.SerBlock2, error) { + blockBytes, err := mgr.fetchBlockBytes(lp) + if err != nil { + return nil, err + } + return protos.NewSerBlock2(blockBytes), nil +} + +func (mgr *blockfileMgr) fetchTransaction(lp *fileLocPointer) (*protos.Transaction2, error) { + txBytes, err := mgr.fetchRawBytes(lp) + if err != nil { + return nil, err + } + tx := &protos.Transaction2{} + err = proto.Unmarshal(txBytes, tx) + if err != nil { + return nil, err + } + return tx, nil +} + +func (mgr *blockfileMgr) fetchBlockBytes(lp *fileLocPointer) ([]byte, error) { + filePath := deriveBlockfilePath(mgr.rootDir, lp.fileSuffixNum) + stream, err := newBlockStream(filePath, int64(lp.offset)) + if err != nil { + return nil, err + } + defer stream.close() + b, err := stream.nextBlockBytes() + if err != nil { + return nil, err + } + return b, nil +} + +func (mgr *blockfileMgr) fetchRawBytes(lp *fileLocPointer) ([]byte, error) { + filePath := deriveBlockfilePath(mgr.rootDir, lp.fileSuffixNum) + reader, err := newBlockfileReader(filePath) + if err != nil { + return nil, err + } + defer reader.close() + b, err := reader.read(lp.offset, lp.bytesLength) + if err != nil { + return nil, err + } + return b, nil +} + +func (mgr *blockfileMgr) loadCurrentInfo() (*checkpointInfo, error) { + b, err := mgr.db.Get(mgr.defaultCF, blkMgrInfoKey) + if err != nil { + return nil, err + } + if b == nil { + return nil, err + } + i := &checkpointInfo{} + if err = i.unmarshal(b); err != nil { + return nil, err + } + return i, nil +} + +func (mgr *blockfileMgr) saveCurrentInfo(i *checkpointInfo) error { + b, err := i.marshal() + if err != nil { + return err + } + err = mgr.db.Put(mgr.defaultCF, blkMgrInfoKey, b) + if err != nil { + return err + } + return nil +} + +// blkMgrInfo +type checkpointInfo struct { + latestFileChunkSuffixNum int + latestFileChunksize int + lastBlockNumber uint64 +} + +func (i *checkpointInfo) marshal() ([]byte, error) { + buffer := proto.NewBuffer([]byte{}) + var err error + if err = buffer.EncodeVarint(uint64(i.latestFileChunkSuffixNum)); err != nil { + return nil, err + } + if err = buffer.EncodeVarint(uint64(i.latestFileChunksize)); err != nil { + return nil, err + } + if err = buffer.EncodeVarint(i.lastBlockNumber); err != nil { + return nil, err + } + return buffer.Bytes(), nil +} + +func (i *checkpointInfo) unmarshal(b []byte) error { + buffer := proto.NewBuffer(b) + var val uint64 + var err error + + if val, err = buffer.DecodeVarint(); err != nil { + return err + } + i.latestFileChunkSuffixNum = int(val) + + if val, err = buffer.DecodeVarint(); err != nil { + return err + } + i.latestFileChunksize = int(val) + + if val, err = buffer.DecodeVarint(); err != nil { + return err + } + i.lastBlockNumber = val + + return nil +} diff --git a/core/ledgernext/blkstorage/fsblkstorage/blockfile_mgr_test.go b/core/ledgernext/blkstorage/fsblkstorage/blockfile_mgr_test.go new file mode 100644 index 00000000000..83aee1e702b --- /dev/null +++ b/core/ledgernext/blkstorage/fsblkstorage/blockfile_mgr_test.go @@ -0,0 +1,137 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fsblkstorage + +import ( + "fmt" + "testing" + + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/core/ledgernext/testutil" + "github.com/hyperledger/fabric/protos" +) + +func TestBlockfileMgrBlockReadWrite(t *testing.T) { + env := newTestEnv(t) + defer env.Cleanup() + blkfileMgrWrapper := newTestBlockfileWrapper(t, env) + defer blkfileMgrWrapper.close() + blocks := testutil.ConstructTestBlocks(t, 10) + blkfileMgrWrapper.addBlocks(blocks) + blkfileMgrWrapper.testGetBlockByHash(blocks) + blkfileMgrWrapper.testGetBlockByNumber(blocks, 1) +} + +func TestBlockfileMgrBlockIterator(t *testing.T) { + env := newTestEnv(t) + defer env.Cleanup() + blkfileMgrWrapper := newTestBlockfileWrapper(t, env) + defer blkfileMgrWrapper.close() + blocks := testutil.ConstructTestBlocks(t, 10) + blkfileMgrWrapper.addBlocks(blocks) + itr, err := blkfileMgrWrapper.blockfileMgr.retrieveBlocks(1, 8) + defer itr.Close() + testutil.AssertNoError(t, err, "Error while getting blocks iterator") + numBlocksItrated := 0 + for ; itr.Next(); numBlocksItrated++ { + block, err := itr.Get() + testutil.AssertNoError(t, err, fmt.Sprintf("Error while getting block number [%d] from iterator", numBlocksItrated)) + testutil.AssertEquals(t, block.(*BlockHolder).GetBlock(), blocks[numBlocksItrated]) + } + testutil.AssertEquals(t, numBlocksItrated, 8) +} + +func TestBlockfileMgrBlockchainInfo(t *testing.T) { + env := newTestEnv(t) + defer env.Cleanup() + blkfileMgrWrapper := newTestBlockfileWrapper(t, env) + defer blkfileMgrWrapper.close() + + bcInfo := blkfileMgrWrapper.blockfileMgr.getBlockchainInfo() + testutil.AssertEquals(t, bcInfo, &protos.BlockchainInfo{Height: 0, CurrentBlockHash: nil, PreviousBlockHash: nil}) + + blocks := testutil.ConstructTestBlocks(t, 10) + blkfileMgrWrapper.addBlocks(blocks) + bcInfo = blkfileMgrWrapper.blockfileMgr.getBlockchainInfo() + testutil.AssertEquals(t, bcInfo.Height, uint64(10)) +} + +func TestBlockfileMgrGetTxById(t *testing.T) { + env := newTestEnv(t) + defer env.Cleanup() + blkfileMgrWrapper := newTestBlockfileWrapper(t, env) + defer blkfileMgrWrapper.close() + blocks := testutil.ConstructTestBlocks(t, 10) + blkfileMgrWrapper.addBlocks(blocks) + for i, blk := range blocks { + for j, txBytes := range blk.Transactions { + // blockNum starts with 1 + txID := constructTxID(uint64(i+1), j) + txFromFileMgr, err := blkfileMgrWrapper.blockfileMgr.retrieveTransactionByID(txID) + testutil.AssertNoError(t, err, "Error while retrieving tx from blkfileMgr") + tx := &protos.Transaction2{} + err = proto.Unmarshal(txBytes, tx) + testutil.AssertNoError(t, err, "Error while unmarshalling tx") + testutil.AssertEquals(t, txFromFileMgr, tx) + } + } +} + +func TestBlockfileMgrRestart(t *testing.T) { + env := newTestEnv(t) + defer env.Cleanup() + blkfileMgrWrapper := newTestBlockfileWrapper(t, env) + blocks := testutil.ConstructTestBlocks(t, 10) + blkfileMgrWrapper.addBlocks(blocks) + blkfileMgrWrapper.close() + + blkfileMgrWrapper = newTestBlockfileWrapper(t, env) + defer blkfileMgrWrapper.close() + testutil.AssertEquals(t, int(blkfileMgrWrapper.blockfileMgr.cpInfo.lastBlockNumber), 10) + blkfileMgrWrapper.testGetBlockByHash(blocks) +} + +func TestBlockfileMgrFileRolling(t *testing.T) { + env := newTestEnv(t) + blocks := testutil.ConstructTestBlocks(t, 100) + size := 0 + for _, block := range blocks { + serBlock, err := protos.ConstructSerBlock2(block) + testutil.AssertNoError(t, err, "Error while getting bytes from block") + by := serBlock.GetBytes() + blockBytesSize := len(by) + encodedLen := proto.EncodeVarint(uint64(blockBytesSize)) + size += blockBytesSize + len(encodedLen) + } + + env.conf.maxBlockfileSize = int(0.75 * float64(size)) + blkfileMgrWrapper := newTestBlockfileWrapper(t, env) + blkfileMgrWrapper.addBlocks(blocks) + testutil.AssertEquals(t, blkfileMgrWrapper.blockfileMgr.cpInfo.latestFileChunkSuffixNum, 1) + blkfileMgrWrapper.testGetBlockByHash(blocks) + blkfileMgrWrapper.close() + env.Cleanup() + + env = newTestEnv(t) + defer env.Cleanup() + env.conf.maxBlockfileSize = int(0.40 * float64(size)) + blkfileMgrWrapper = newTestBlockfileWrapper(t, env) + defer blkfileMgrWrapper.close() + blkfileMgrWrapper.addBlocks(blocks) + testutil.AssertEquals(t, blkfileMgrWrapper.blockfileMgr.cpInfo.latestFileChunkSuffixNum, 2) + blkfileMgrWrapper.testGetBlockByHash(blocks) +} diff --git a/core/ledgernext/blkstorage/fsblkstorage/blockfile_rw.go b/core/ledgernext/blkstorage/fsblkstorage/blockfile_rw.go new file mode 100644 index 00000000000..70453ff4969 --- /dev/null +++ b/core/ledgernext/blkstorage/fsblkstorage/blockfile_rw.go @@ -0,0 +1,141 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fsblkstorage + +import ( + "bufio" + "fmt" + "io" + "os" + + "github.com/golang/protobuf/proto" +) + +//// WRITER //// +type blockfileWriter struct { + filePath string + file *os.File +} + +func newBlockfileWriter(filePath string) (*blockfileWriter, error) { + writer := &blockfileWriter{filePath: filePath} + return writer, writer.open() +} + +func (w *blockfileWriter) truncateFile(targetSize int) error { + fileStat, err := w.file.Stat() + if err != nil { + return err + } + if fileStat.Size() > int64(targetSize) { + w.file.Truncate(int64(targetSize)) + } + return nil +} + +func (w *blockfileWriter) append(b []byte) error { + _, err := w.file.Write(b) + if err != nil { + return err + } + w.file.Sync() + return nil +} + +func (w *blockfileWriter) open() error { + file, err := os.OpenFile(w.filePath, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660) + if err != nil { + return err + } + w.file = file + return nil +} + +func (w *blockfileWriter) close() error { + return w.file.Close() +} + +//// READER //// +type blockfileReader struct { + file *os.File +} + +func newBlockfileReader(filePath string) (*blockfileReader, error) { + file, err := os.OpenFile(filePath, os.O_RDONLY, 0600) + if err != nil { + return nil, err + } + reader := &blockfileReader{file} + return reader, nil +} + +func (r *blockfileReader) read(offset int, length int) ([]byte, error) { + b := make([]byte, length) + _, err := r.file.ReadAt(b, int64(offset)) + if err != nil { + return nil, err + } + return b, nil +} + +func (r *blockfileReader) close() error { + return r.file.Close() +} + +type blockStream struct { + file *os.File + reader *bufio.Reader +} + +func newBlockStream(filePath string, offset int64) (*blockStream, error) { + var file *os.File + var err error + if file, err = os.OpenFile(filePath, os.O_RDONLY, 0600); err != nil { + return nil, err + } + + var newPosition int64 + if newPosition, err = file.Seek(offset, 0); err != nil { + return nil, err + } + if newPosition != offset { + panic(fmt.Sprintf("Could not seek file [%s] to given offset [%d]. New position = [%d]", filePath, offset, newPosition)) + } + s := &blockStream{file, bufio.NewReader(file)} + return s, nil +} + +func (s *blockStream) nextBlockBytes() ([]byte, error) { + lenBytes, err := s.reader.Peek(8) + if err == io.EOF { + logger.Debugf("block stream reached end of file. Returning next block as nil") + return nil, nil + } + len, n := proto.DecodeVarint(lenBytes) + if _, err = s.reader.Discard(n); err != nil { + return nil, err + } + blockBytes := make([]byte, len) + if _, err = io.ReadAtLeast(s.reader, blockBytes, int(len)); err != nil { + return nil, err + } + return blockBytes, nil +} + +func (s *blockStream) close() error { + return s.file.Close() +} diff --git a/core/ledgernext/blkstorage/fsblkstorage/blockindex.go b/core/ledgernext/blkstorage/fsblkstorage/blockindex.go new file mode 100644 index 00000000000..a3d58cd163b --- /dev/null +++ b/core/ledgernext/blkstorage/fsblkstorage/blockindex.go @@ -0,0 +1,192 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fsblkstorage + +import ( + "fmt" + + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/core/ledgernext/util" + "github.com/hyperledger/fabric/core/ledgernext/util/db" + "github.com/tecbot/gorocksdb" +) + +type blockIndex struct { + db *db.DB + blockIndexCF *gorocksdb.ColumnFamilyHandle +} + +func newBlockIndex(db *db.DB) *blockIndex { + //TODO during init make sure that the index is in sync with block strorage + return &blockIndex{db, db.GetCFHandle(blockIndexCF)} +} + +func (index *blockIndex) indexBlock(blockNum uint64, blockHash []byte, flp *fileLocPointer, blockLen int, skip int, txOffsets []int) error { + logger.Debugf("Adding blockLoc [%s] to index", flp) + batch := gorocksdb.NewWriteBatch() + defer batch.Destroy() + flpBytes, err := flp.marshal() + if err != nil { + return err + } + batch.PutCF(index.blockIndexCF, index.constructBlockHashKey(blockHash), flpBytes) + batch.PutCF(index.blockIndexCF, index.constructBlockNumKey(blockNum), flpBytes) + for i := 0; i < len(txOffsets)-1; i++ { + txID := constructTxID(blockNum, i) + txBytesLength := txOffsets[i+1] - txOffsets[i] + txFLP := newFileLocationPointer(flp.fileSuffixNum, flp.offset+skip, &locPointer{txOffsets[i], txBytesLength}) + logger.Debugf("Adding txLoc [%s] for tx [%s] to index", txFLP, txID) + txFLPBytes, marshalErr := txFLP.marshal() + if marshalErr != nil { + return marshalErr + } + batch.PutCF(index.blockIndexCF, index.constructTxIDKey(txID), txFLPBytes) + } + // for txNum, txOffset := range txOffsets { + // txID := constructTxID(blockNum, txNum) + // txBytesLength := 0 + // if txNum < len(txOffsets)-1 { + // txBytesLength = txOffsets[txNum+1] - txOffsets[txNum] + // } else { + // txBytesLength = blockLen - txOffsets[txNum] + // } + // txFLP := newFileLocationPointer(flp.fileSuffixNum, flp.offset+skip, &locPointer{txOffset, txBytesLength}) + // logger.Debugf("Adding txLoc [%s] for tx [%s] to index", txFLP, txID) + // txFLPBytes, marshalErr := txFLP.marshal() + // if marshalErr != nil { + // return marshalErr + // } + // batch.PutCF(index.blockIndexCF, index.constructTxIDKey(txID), txFLPBytes) + // } + err = index.db.WriteBatch(batch) + if err != nil { + return err + } + return nil +} + +func (index *blockIndex) getBlockLocByHash(blockHash []byte) (*fileLocPointer, error) { + b, err := index.db.Get(index.blockIndexCF, index.constructBlockHashKey(blockHash)) + if err != nil { + return nil, err + } + blkLoc := &fileLocPointer{} + blkLoc.unmarshal(b) + return blkLoc, nil +} + +func (index *blockIndex) getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error) { + b, err := index.db.Get(index.blockIndexCF, index.constructBlockNumKey(blockNum)) + if err != nil { + return nil, err + } + blkLoc := &fileLocPointer{} + blkLoc.unmarshal(b) + return blkLoc, nil +} + +func (index *blockIndex) getTxLoc(txID string) (*fileLocPointer, error) { + b, err := index.db.Get(index.blockIndexCF, index.constructTxIDKey(txID)) + if err != nil { + return nil, err + } + txFLP := &fileLocPointer{} + txFLP.unmarshal(b) + return txFLP, nil +} + +func (index *blockIndex) constructBlockNumKey(blockNum uint64) []byte { + blkNumBytes := util.EncodeOrderPreservingVarUint64(blockNum) + return append([]byte{'n'}, blkNumBytes...) +} + +func (index *blockIndex) constructBlockHashKey(blockHash []byte) []byte { + return append([]byte{'b'}, blockHash...) +} + +func (index *blockIndex) constructTxIDKey(txID string) []byte { + return append([]byte{'t'}, []byte(txID)...) +} + +func constructTxID(blockNum uint64, txNum int) string { + return fmt.Sprintf("%d:%d", blockNum, txNum) +} + +type locPointer struct { + offset int + bytesLength int +} + +func (lp *locPointer) String() string { + return fmt.Sprintf("offset=%d, bytesLength=%d", + lp.offset, lp.bytesLength) +} + +// locPointer +type fileLocPointer struct { + fileSuffixNum int + locPointer +} + +func newFileLocationPointer(fileSuffixNum int, begginingOffset int, relativeLP *locPointer) *fileLocPointer { + flp := &fileLocPointer{fileSuffixNum: fileSuffixNum} + flp.offset = begginingOffset + relativeLP.offset + flp.bytesLength = relativeLP.bytesLength + return flp +} + +func (flp *fileLocPointer) marshal() ([]byte, error) { + buffer := proto.NewBuffer([]byte{}) + e := buffer.EncodeVarint(uint64(flp.fileSuffixNum)) + if e != nil { + return nil, e + } + e = buffer.EncodeVarint(uint64(flp.offset)) + if e != nil { + return nil, e + } + e = buffer.EncodeVarint(uint64(flp.bytesLength)) + if e != nil { + return nil, e + } + return buffer.Bytes(), nil +} + +func (flp *fileLocPointer) unmarshal(b []byte) error { + buffer := proto.NewBuffer(b) + i, e := buffer.DecodeVarint() + if e != nil { + return e + } + flp.fileSuffixNum = int(i) + + i, e = buffer.DecodeVarint() + if e != nil { + return e + } + flp.offset = int(i) + i, e = buffer.DecodeVarint() + if e != nil { + return e + } + flp.bytesLength = int(i) + return nil +} + +func (flp *fileLocPointer) String() string { + return fmt.Sprintf("fileSuffixNum=%d, %s", flp.fileSuffixNum, flp.locPointer.String()) +} diff --git a/core/ledgernext/blkstorage/fsblkstorage/blocks_itr.go b/core/ledgernext/blkstorage/fsblkstorage/blocks_itr.go new file mode 100644 index 00000000000..a7c38867303 --- /dev/null +++ b/core/ledgernext/blkstorage/fsblkstorage/blocks_itr.go @@ -0,0 +1,80 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fsblkstorage + +import ( + "fmt" + + "github.com/hyperledger/fabric/core/ledgernext" + "github.com/hyperledger/fabric/protos" +) + +// BlockHolder holds block bytes +type BlockHolder struct { + blockBytes []byte +} + +// GetBlock serializes Block from block bytes +func (bh *BlockHolder) GetBlock() *protos.Block2 { + serBlock := protos.NewSerBlock2(bh.blockBytes) + block, err := serBlock.ToBlock2() + if err != nil { + panic(fmt.Errorf("Problem in deserialzing block: %s", err)) + } + return block +} + +// GetBlockBytes returns block bytes +func (bh *BlockHolder) GetBlockBytes() []byte { + return bh.blockBytes +} + +// BlocksItr - an iterator for iterating over a sequence of blocks +type BlocksItr struct { + stream *blockStream + nextBlockBytes []byte + err error + numTotalBlocks int + numIteratedBlocks int +} + +func newBlockItr(stream *blockStream, numTotalBlocks int) *BlocksItr { + return &BlocksItr{stream, nil, nil, numTotalBlocks, 0} +} + +// Next moves the cursor to next block and returns true iff the iterator is not exhausted +func (itr *BlocksItr) Next() bool { + if itr.err != nil || itr.numIteratedBlocks == itr.numTotalBlocks { + return false + } + itr.nextBlockBytes, itr.err = itr.stream.nextBlockBytes() + itr.numIteratedBlocks++ + return itr.nextBlockBytes != nil +} + +// Get returns the block at current cursor +func (itr *BlocksItr) Get() (ledger.QueryResult, error) { + if itr.err != nil { + return nil, itr.err + } + return &BlockHolder{itr.nextBlockBytes}, nil +} + +// Close releases any resources held by the iterator +func (itr *BlocksItr) Close() { + itr.stream.close() +} diff --git a/core/ledgernext/blkstorage/fsblkstorage/config.go b/core/ledgernext/blkstorage/fsblkstorage/config.go new file mode 100644 index 00000000000..3f4fbef94ed --- /dev/null +++ b/core/ledgernext/blkstorage/fsblkstorage/config.go @@ -0,0 +1,42 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fsblkstorage + +import "strings" + +const ( + defaultMaxBlockfileSize = 64 * 1024 * 1024 +) + +// Conf encapsulates all the configurations for `FsBlockStore` +type Conf struct { + blockfilesDir string + dbPath string + maxBlockfileSize int +} + +// NewConf constructs new `Conf`. +// filesystemPath is the top level folder under which `FsBlockStore` manages its data +func NewConf(filesystemPath string, maxBlockfileSize int) *Conf { + if !strings.HasSuffix(filesystemPath, "/") { + filesystemPath = filesystemPath + "/" + } + if maxBlockfileSize <= 0 { + maxBlockfileSize = defaultMaxBlockfileSize + } + return &Conf{filesystemPath + "blocks", filesystemPath + "db", maxBlockfileSize} +} diff --git a/core/ledgernext/blkstorage/fsblkstorage/fs_blockstore.go b/core/ledgernext/blkstorage/fsblkstorage/fs_blockstore.go new file mode 100644 index 00000000000..33013bb160b --- /dev/null +++ b/core/ledgernext/blkstorage/fsblkstorage/fs_blockstore.go @@ -0,0 +1,72 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fsblkstorage + +import ( + "github.com/hyperledger/fabric/core/ledgernext" + "github.com/hyperledger/fabric/protos" +) + +// FsBlockStore - filesystem based implementation for `BlockStore` +type FsBlockStore struct { + fileMgr *blockfileMgr +} + +// NewFsBlockStore constructs a `FsBlockStore` +func NewFsBlockStore(conf *Conf) *FsBlockStore { + return &FsBlockStore{newBlockfileMgr(conf)} +} + +// AddBlock adds a new block +func (store *FsBlockStore) AddBlock(block *protos.Block2) error { + return store.fileMgr.addBlock(block) +} + +// GetBlockchainInfo returns the current info about blockchain +func (store *FsBlockStore) GetBlockchainInfo() (*protos.BlockchainInfo, error) { + return store.fileMgr.getBlockchainInfo(), nil +} + +// RetrieveBlocks returns an iterator that can be used for iterating over a range of blocks +func (store *FsBlockStore) RetrieveBlocks(startNum uint64, endNum uint64) (ledger.ResultsIterator, error) { + var itr *BlocksItr + var err error + if itr, err = store.fileMgr.retrieveBlocks(startNum, endNum); err != nil { + return nil, err + } + return itr, nil +} + +// RetrieveBlockByHash returns the block for given block-hash +func (store *FsBlockStore) RetrieveBlockByHash(blockHash []byte) (*protos.Block2, error) { + return store.fileMgr.retrieveBlockByHash(blockHash) +} + +// RetrieveBlockByNumber returns the block at a given blockchain height +func (store *FsBlockStore) RetrieveBlockByNumber(blockNum uint64) (*protos.Block2, error) { + return store.fileMgr.retrieveBlockByNumber(blockNum) +} + +// RetrieveTxByID returns a transaction for given transaction id +func (store *FsBlockStore) RetrieveTxByID(txID string) (*protos.Transaction2, error) { + return store.fileMgr.retrieveTransactionByID(txID) +} + +// Shutdown shuts down the block store +func (store *FsBlockStore) Shutdown() { + store.fileMgr.close() +} diff --git a/core/ledgernext/blkstorage/fsblkstorage/pkg_test.go b/core/ledgernext/blkstorage/fsblkstorage/pkg_test.go new file mode 100644 index 00000000000..c6d68612349 --- /dev/null +++ b/core/ledgernext/blkstorage/fsblkstorage/pkg_test.go @@ -0,0 +1,81 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fsblkstorage + +import ( + "fmt" + "os" + "testing" + + "github.com/hyperledger/fabric/core/ledgernext/testutil" + "github.com/hyperledger/fabric/protos" +) + +type testEnv struct { + conf *Conf +} + +func newTestEnv(t testing.TB) *testEnv { + conf := NewConf("/tmp/tests/ledgernext/blkstorage/fsblkstorage", 0) + os.RemoveAll(conf.dbPath) + os.RemoveAll(conf.blockfilesDir) + return &testEnv{conf} +} + +func (env *testEnv) Cleanup() { + os.RemoveAll(env.conf.dbPath) + os.RemoveAll(env.conf.blockfilesDir) +} + +type testBlockfileMgrWrapper struct { + t testing.TB + blockfileMgr *blockfileMgr +} + +func newTestBlockfileWrapper(t testing.TB, env *testEnv) *testBlockfileMgrWrapper { + return &testBlockfileMgrWrapper{t, newBlockfileMgr(env.conf)} +} + +func (w *testBlockfileMgrWrapper) addBlocks(blocks []*protos.Block2) { + for _, blk := range blocks { + err := w.blockfileMgr.addBlock(blk) + testutil.AssertNoError(w.t, err, "Error while adding block to blockfileMgr") + } +} + +func (w *testBlockfileMgrWrapper) testGetBlockByHash(blocks []*protos.Block2) { + for i, block := range blocks { + serBlock, err := protos.ConstructSerBlock2(block) + testutil.AssertNoError(w.t, err, "Error while getting hash from block") + b, err := w.blockfileMgr.retrieveBlockByHash(serBlock.ComputeHash()) + testutil.AssertNoError(w.t, err, fmt.Sprintf("Error while retrieving [%d]th block from blockfileMgr", i)) + testutil.AssertEquals(w.t, b, block) + } +} + +func (w *testBlockfileMgrWrapper) testGetBlockByNumber(blocks []*protos.Block2, startingNum uint64) { + for i := 0; i < len(blocks); i++ { + b, err := w.blockfileMgr.retrieveBlockByNumber(startingNum + uint64(i)) + testutil.AssertNoError(w.t, err, fmt.Sprintf("Error while retrieving [%d]th block from blockfileMgr", i)) + testutil.AssertEquals(w.t, b, blocks[i]) + } +} + +func (w *testBlockfileMgrWrapper) close() { + w.blockfileMgr.close() + w.blockfileMgr.db.Close() +} diff --git a/core/ledgernext/kvledger/example/app.go b/core/ledgernext/kvledger/example/app.go new file mode 100644 index 00000000000..adbc7ec3e57 --- /dev/null +++ b/core/ledgernext/kvledger/example/app.go @@ -0,0 +1,122 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package example + +import ( + "fmt" + + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/core/ledgernext" + "github.com/hyperledger/fabric/protos" +) + +// App - a sample fund transfer app +type App struct { + name string + ledger ledger.ValidatedLedger +} + +// ConstructAppInstance constructs an instance of an app +func ConstructAppInstance(ledger ledger.ValidatedLedger) *App { + return &App{"PaymentApp", ledger} +} + +// Init simulates init transaction +func (app *App) Init(initialBalances map[string]int) (*protos.Transaction2, error) { + var txSimulator ledger.TxSimulator + var err error + if txSimulator, err = app.ledger.NewTxSimulator(); err != nil { + return nil, err + } + defer txSimulator.Done() + for accountID, bal := range initialBalances { + txSimulator.SetState(app.name, accountID, toBytes(bal)) + } + var txSimulationResults []byte + if txSimulationResults, err = txSimulator.GetTxSimulationResults(); err != nil { + return nil, err + } + tx := constructTransaction(txSimulationResults) + return tx, nil +} + +// TransferFunds simulates a transaction for transferring fund from fromAccount to toAccount +func (app *App) TransferFunds(fromAccount string, toAccount string, transferAmt int) (*protos.Transaction2, error) { + var txSimulator ledger.TxSimulator + var err error + if txSimulator, err = app.ledger.NewTxSimulator(); err != nil { + return nil, err + } + defer txSimulator.Done() + var balFromBytes []byte + if balFromBytes, err = txSimulator.GetState(app.name, fromAccount); err != nil { + return nil, err + } + balFrom := toInt(balFromBytes) + if balFrom-transferAmt < 0 { + return nil, fmt.Errorf("Not enough balance in account [%s]. Balance = [%d], transfer request = [%d]", + fromAccount, balFrom, transferAmt) + } + + var balToBytes []byte + if balToBytes, err = txSimulator.GetState(app.name, toAccount); err != nil { + return nil, err + } + balTo := toInt(balToBytes) + txSimulator.SetState(app.name, fromAccount, toBytes(balFrom-transferAmt)) + txSimulator.SetState(app.name, toAccount, toBytes(balTo+transferAmt)) + var txSimulationResults []byte + if txSimulationResults, err = txSimulator.GetTxSimulationResults(); err != nil { + return nil, err + } + tx := constructTransaction(txSimulationResults) + return tx, nil +} + +// QueryBalances queries the balance funds +func (app *App) QueryBalances(accounts []string) ([]int, error) { + var queryExecutor ledger.QueryExecutor + var err error + if queryExecutor, err = app.ledger.NewQueryExecutor(); err != nil { + return nil, err + } + balances := make([]int, len(accounts)) + for i := 0; i < len(accounts); i++ { + var balBytes []byte + if balBytes, err = queryExecutor.GetState(app.name, accounts[i]); err != nil { + return nil, err + } + balances[i] = toInt(balBytes) + } + return balances, nil +} + +func constructTransaction(simulationResults []byte) *protos.Transaction2 { + tx := &protos.Transaction2{} + tx.EndorsedActions = []*protos.EndorsedAction{ + &protos.EndorsedAction{ActionBytes: simulationResults, Endorsements: []*protos.Endorsement{}, ProposalBytes: []byte{}}} + return tx +} + +func toBytes(balance int) []byte { + return proto.EncodeVarint(uint64(balance)) +} + +func toInt(balanceBytes []byte) int { + v, _ := proto.DecodeVarint(balanceBytes) + return int(v) +} diff --git a/core/ledgernext/kvledger/example/committer.go b/core/ledgernext/kvledger/example/committer.go new file mode 100644 index 00000000000..7cf4b1b7a05 --- /dev/null +++ b/core/ledgernext/kvledger/example/committer.go @@ -0,0 +1,46 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package example + +import ( + "github.com/hyperledger/fabric/core/ledgernext" + "github.com/hyperledger/fabric/protos" +) + +// Committer a toy committer +type Committer struct { + ledger ledger.ValidatedLedger +} + +// ConstructCommitter constructs a committer for the example +func ConstructCommitter(ledger ledger.ValidatedLedger) *Committer { + return &Committer{ledger} +} + +// CommitBlock commits the block +func (c *Committer) CommitBlock(rawBlock *protos.Block2) (*protos.Block2, []*protos.InvalidTransaction, error) { + var validBlock *protos.Block2 + var invalidTxs []*protos.InvalidTransaction + var err error + if validBlock, invalidTxs, err = c.ledger.RemoveInvalidTransactionsAndPrepare(rawBlock); err != nil { + return nil, nil, err + } + if err = c.ledger.Commit(); err != nil { + return nil, nil, err + } + return validBlock, invalidTxs, err +} diff --git a/core/ledgernext/kvledger/example/consenter.go b/core/ledgernext/kvledger/example/consenter.go new file mode 100644 index 00000000000..019b8fb9db5 --- /dev/null +++ b/core/ledgernext/kvledger/example/consenter.go @@ -0,0 +1,41 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package example + +import ( + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/protos" +) + +// Consenter - a toy Consenter +type Consenter struct { +} + +// ConstructConsenter constructs a consenter for example +func ConstructConsenter() *Consenter { + return &Consenter{} +} + +// ConstructBlock constructs a block from a list of transactions +func (c *Consenter) ConstructBlock(transactions ...*protos.Transaction2) *protos.Block2 { + block := &protos.Block2{} + for _, tx := range transactions { + txBytes, _ := proto.Marshal(tx) + block.Transactions = append(block.Transactions, txBytes) + } + return block +} diff --git a/core/ledgernext/kvledger/example/main/example.go b/core/ledgernext/kvledger/example/main/example.go new file mode 100644 index 00000000000..9b042a1c45a --- /dev/null +++ b/core/ledgernext/kvledger/example/main/example.go @@ -0,0 +1,125 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "fmt" + "os" + + "github.com/hyperledger/fabric/core/ledgernext" + "github.com/hyperledger/fabric/core/ledgernext/kvledger" + "github.com/hyperledger/fabric/core/ledgernext/kvledger/example" + "github.com/hyperledger/fabric/protos" +) + +const ( + ledgerPath = "/tmp/test/ledgernext/kvledger/example" +) + +var finalLedger ledger.ValidatedLedger +var app *example.App +var committer *example.Committer +var consenter *example.Consenter + +var accounts = []string{"account1", "account2", "account3", "account4"} + +func init() { + os.RemoveAll(ledgerPath) + ledgerConf := kvledger.NewConf(ledgerPath, 0) + var err error + finalLedger, err = kvledger.NewKVLedger(ledgerConf) + if err != nil { + panic(fmt.Errorf("Error in NewKVLedger(): %s", err)) + } + app = example.ConstructAppInstance(finalLedger) + committer = example.ConstructCommitter(finalLedger) + consenter = example.ConstructConsenter() +} + +func main() { + defer finalLedger.Close() + initApp() + printBalances() + transferFunds() + printBalances() + tryInvalidTransfer() + tryDoubleSpend() + printBalances() +} + +func initApp() { + tx, err := app.Init(map[string]int{ + accounts[0]: 100, + accounts[1]: 100, + accounts[2]: 100, + accounts[3]: 100}) + handleError(err, true) + rawBlock := consenter.ConstructBlock(tx) + finalBlock, invalidTx, err := committer.CommitBlock(rawBlock) + handleError(err, true) + printBlocksInfo(rawBlock, finalBlock, invalidTx) +} + +func transferFunds() { + tx1, err := app.TransferFunds("account1", "account2", 50) + handleError(err, true) + tx2, err := app.TransferFunds("account3", "account4", 50) + handleError(err, true) + rawBlock := consenter.ConstructBlock(tx1, tx2) + finalBlock, invalidTx, err := committer.CommitBlock(rawBlock) + handleError(err, true) + printBlocksInfo(rawBlock, finalBlock, invalidTx) +} + +func tryInvalidTransfer() { + _, err := app.TransferFunds("account1", "account2", 60) + handleError(err, false) +} + +func tryDoubleSpend() { + tx1, err := app.TransferFunds("account1", "account2", 50) + handleError(err, true) + tx2, err := app.TransferFunds("account1", "account4", 50) + handleError(err, true) + rawBlock := consenter.ConstructBlock(tx1, tx2) + finalBlock, invalidTx, err := committer.CommitBlock(rawBlock) + handleError(err, true) + printBlocksInfo(rawBlock, finalBlock, invalidTx) +} + +func printBlocksInfo(rawBlock *protos.Block2, finalBlock *protos.Block2, invalidTxs []*protos.InvalidTransaction) { + fmt.Printf("Num txs in rawBlock = [%d], num txs in final block = [%d], num invalidTxs = [%d]\n", + len(rawBlock.Transactions), len(finalBlock.Transactions), len(invalidTxs)) +} + +func printBalances() { + balances, err := app.QueryBalances(accounts) + handleError(err, true) + for i := 0; i < len(accounts); i++ { + fmt.Printf("[%s] = [%d]\n", accounts[i], balances[i]) + } +} + +func handleError(err error, quit bool) { + if err != nil { + if quit { + panic(fmt.Errorf("Error: %s\n", err)) + } else { + fmt.Printf("Error: %s\n", err) + } + } +} diff --git a/core/ledgernext/kvledger/kv_ledger.go b/core/ledgernext/kvledger/kv_ledger.go new file mode 100644 index 00000000000..2e39a1d17cf --- /dev/null +++ b/core/ledgernext/kvledger/kv_ledger.go @@ -0,0 +1,155 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kvledger + +import ( + "errors" + "fmt" + "strings" + + "github.com/hyperledger/fabric/core/ledgernext" + "github.com/hyperledger/fabric/core/ledgernext/blkstorage" + "github.com/hyperledger/fabric/core/ledgernext/blkstorage/fsblkstorage" + "github.com/hyperledger/fabric/core/ledgernext/kvledger/txmgmt" + "github.com/hyperledger/fabric/core/ledgernext/kvledger/txmgmt/lockbasedtxmgmt" + "github.com/hyperledger/fabric/protos" +) + +// Conf captures `KVLedger` configurations +type Conf struct { + blockStorageDir string + maxBlockfileSize int + txMgrDBPath string +} + +// NewConf constructs new `Conf`. +// filesystemPath is the top level directory under which `KVLedger` manages its data +func NewConf(filesystemPath string, maxBlockfileSize int) *Conf { + if !strings.HasSuffix(filesystemPath, "/") { + filesystemPath = filesystemPath + "/" + } + blocksStorageDir := filesystemPath + "blocks" + txMgrDBPath := filesystemPath + "txMgmgt/db" + return &Conf{blocksStorageDir, maxBlockfileSize, txMgrDBPath} +} + +// KVLedger provides an implementation of `ledgernext.ValidatedLedger`. +// This implementation provides a key-value based data model +type KVLedger struct { + blockStore blkstorage.BlockStore + txtmgmt txmgmt.TxMgr + pendingBlockToCommit *protos.Block2 +} + +// NewKVLedger constructs new `KVLedger` +func NewKVLedger(conf *Conf) (*KVLedger, error) { + blockStore := fsblkstorage.NewFsBlockStore(fsblkstorage.NewConf(conf.blockStorageDir, conf.maxBlockfileSize)) + txmgmt := lockbasedtxmgmt.NewLockBasedTxMgr(&lockbasedtxmgmt.Conf{DBPath: conf.txMgrDBPath}) + return &KVLedger{blockStore, txmgmt, nil}, nil +} + +// GetTransactionByID retrieves a transaction by id +func (l *KVLedger) GetTransactionByID(txID string) (*protos.Transaction2, error) { + return l.blockStore.RetrieveTxByID(txID) +} + +// GetBlockchainInfo returns basic info about blockchain +func (l *KVLedger) GetBlockchainInfo() (*protos.BlockchainInfo, error) { + return l.blockStore.GetBlockchainInfo() +} + +// GetBlockByNumber returns block at a given height +func (l *KVLedger) GetBlockByNumber(blockNumber uint64) (*protos.Block2, error) { + return l.blockStore.RetrieveBlockByNumber(blockNumber) + +} + +// GetBlocksByNumber returns all the blocks between given heights (both inclusive). ResultsIterator contains type BlockHolder +func (l *KVLedger) GetBlocksByNumber(startBlockNumber, endBlockNumber uint64) (ledger.ResultsIterator, error) { + return l.blockStore.RetrieveBlocks(startBlockNumber, endBlockNumber) + +} + +// GetBlockByHash returns a block given it's hash +func (l *KVLedger) GetBlockByHash(blockHash []byte) (*protos.Block2, error) { + return l.blockStore.RetrieveBlockByHash(blockHash) +} + +//VerifyChain will verify the integrity of the blockchain. This is accomplished +// by ensuring that the previous block hash stored in each block matches +// the actual hash of the previous block in the chain. The return value is the +// block number of lowest block in the range which can be verified as valid. +func (l *KVLedger) VerifyChain(startBlockNumber, endBlockNumber uint64) (uint64, error) { + return 0, errors.New("Not yet implemented") +} + +//Prune prunes the blocks/transactions that satisfy the given policy +func (l *KVLedger) Prune(policy ledger.PrunePolicy) error { + return errors.New("Not yet implemented") +} + +// NewTxSimulator returns new `ledger.TxSimulator` +func (l *KVLedger) NewTxSimulator() (ledger.TxSimulator, error) { + return l.txtmgmt.NewTxSimulator() +} + +// NewQueryExecutor gives handle to a query executer. +// A client can obtain more than one 'QueryExecutor's for parallel execution. +// Any synchronization should be performed at the implementation level if required +func (l *KVLedger) NewQueryExecutor() (ledger.QueryExecutor, error) { + return l.txtmgmt.NewQueryExecutor() +} + +// RemoveInvalidTransactionsAndPrepare validates all the transactions in the given block +// and returns a block that contains only valid transactions and a list of transactions that are invalid +func (l *KVLedger) RemoveInvalidTransactionsAndPrepare(block *protos.Block2) (*protos.Block2, []*protos.InvalidTransaction, error) { + var validBlock *protos.Block2 + var invalidTxs []*protos.InvalidTransaction + var err error + validBlock, invalidTxs, err = l.txtmgmt.ValidateAndPrepare(block) + if err == nil { + l.pendingBlockToCommit = validBlock + } + return validBlock, invalidTxs, err +} + +// Commit commits the valid block (returned in the method RemoveInvalidTransactionsAndPrepare) and related state changes +func (l *KVLedger) Commit() error { + if l.pendingBlockToCommit == nil { + panic(fmt.Errorf(`Nothing to commit. RemoveInvalidTransactionsAndPrepare() method should have been called and should not have thrown error`)) + } + if err := l.blockStore.AddBlock(l.pendingBlockToCommit); err != nil { + return err + } + if err := l.txtmgmt.Commit(); err != nil { + panic(fmt.Errorf(`Error during commit to txmgr:%s`, err)) + } + l.pendingBlockToCommit = nil + return nil +} + +// Rollback rollbacks the changes caused by the last invocation to method `RemoveInvalidTransactionsAndPrepare` +func (l *KVLedger) Rollback() { + l.txtmgmt.Rollback() + l.pendingBlockToCommit = nil +} + +// Close closes `KVLedger` +func (l *KVLedger) Close() { + l.blockStore.Shutdown() + l.txtmgmt.Shutdown() +} diff --git a/core/ledgernext/kvledger/kv_ledger_test.go b/core/ledgernext/kvledger/kv_ledger_test.go new file mode 100644 index 00000000000..e16f06f9bb2 --- /dev/null +++ b/core/ledgernext/kvledger/kv_ledger_test.go @@ -0,0 +1,79 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kvledger + +import ( + "testing" + + "github.com/hyperledger/fabric/core/ledgernext/testutil" + "github.com/hyperledger/fabric/protos" +) + +func TestKVLedgerBlockStorage(t *testing.T) { + env := newTestEnv(t) + defer env.cleanup() + ledger, _ := NewKVLedger(env.conf) + defer ledger.Close() + + bcInfo, _ := ledger.GetBlockchainInfo() + testutil.AssertEquals(t, bcInfo, &protos.BlockchainInfo{ + Height: 0, CurrentBlockHash: nil, PreviousBlockHash: nil}) + + simulator, _ := ledger.NewTxSimulator() + simulator.SetState("ns1", "key1", []byte("value1")) + simulator.SetState("ns1", "key2", []byte("value2")) + simulator.SetState("ns1", "key3", []byte("value3")) + simulator.Done() + simRes, _ := simulator.GetTxSimulationResults() + block1 := testutil.ConstructBlockForSimulationResults(t, [][]byte{simRes}) + ledger.RemoveInvalidTransactionsAndPrepare(block1) + ledger.Commit() + + bcInfo, _ = ledger.GetBlockchainInfo() + serBlock1, _ := protos.ConstructSerBlock2(block1) + block1Hash := serBlock1.ComputeHash() + testutil.AssertEquals(t, bcInfo, &protos.BlockchainInfo{ + Height: 1, CurrentBlockHash: block1Hash, PreviousBlockHash: []byte{}}) + + simulator, _ = ledger.NewTxSimulator() + simulator.SetState("ns1", "key1", []byte("value4")) + simulator.SetState("ns1", "key2", []byte("value5")) + simulator.SetState("ns1", "key3", []byte("value6")) + simulator.Done() + simRes, _ = simulator.GetTxSimulationResults() + block2 := testutil.ConstructBlockForSimulationResults(t, [][]byte{simRes}) + ledger.RemoveInvalidTransactionsAndPrepare(block2) + ledger.Commit() + + bcInfo, _ = ledger.GetBlockchainInfo() + serBlock2, _ := protos.ConstructSerBlock2(block2) + block2Hash := serBlock2.ComputeHash() + testutil.AssertEquals(t, bcInfo, &protos.BlockchainInfo{ + Height: 2, CurrentBlockHash: block2Hash, PreviousBlockHash: []byte{}}) + + b1, _ := ledger.GetBlockByHash(block1Hash) + testutil.AssertEquals(t, b1, block1) + + b2, _ := ledger.GetBlockByHash(block2Hash) + testutil.AssertEquals(t, b2, block2) + + b1, _ = ledger.GetBlockByNumber(1) + testutil.AssertEquals(t, b1, block1) + + b2, _ = ledger.GetBlockByNumber(2) + testutil.AssertEquals(t, b2, block2) +} diff --git a/core/ledgernext/kvledger/pkg_test.go b/core/ledgernext/kvledger/pkg_test.go new file mode 100644 index 00000000000..72aec7401a4 --- /dev/null +++ b/core/ledgernext/kvledger/pkg_test.go @@ -0,0 +1,44 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kvledger + +import ( + "os" + "testing" +) + +type testEnv struct { + conf *Conf + t testing.TB +} + +func newTestEnv(t testing.TB) *testEnv { + conf := NewConf("/tmp/tests/ledgernext/", 0) + os.RemoveAll(conf.blockStorageDir) + os.RemoveAll(conf.txMgrDBPath) + return &testEnv{conf, t} +} + +func (env *testEnv) cleanup() { + os.RemoveAll(env.conf.blockStorageDir) + os.RemoveAll(env.conf.txMgrDBPath) +} + +type testLedgerWrapper struct { + ledger *KVLedger + t *testing.T +} diff --git a/core/ledgernext/kvledger/txmgmt/lockbasedtxmgmt/lockbased_query_executer.go b/core/ledgernext/kvledger/txmgmt/lockbasedtxmgmt/lockbased_query_executer.go new file mode 100644 index 00000000000..ece3469a5d3 --- /dev/null +++ b/core/ledgernext/kvledger/txmgmt/lockbasedtxmgmt/lockbased_query_executer.go @@ -0,0 +1,58 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lockbasedtxmgmt + +import ( + "errors" + + "github.com/hyperledger/fabric/core/ledgernext" +) + +// RWLockQueryExecutor is a query executor used in `LockBasedTxMgr` +type RWLockQueryExecutor struct { + txmgr *LockBasedTxMgr +} + +// GetState implements method in interface `ledger.QueryExecutor` +func (q *RWLockQueryExecutor) GetState(ns string, key string) ([]byte, error) { + var value []byte + var err error + if value, _, err = q.txmgr.getCommittedValueAndVersion(ns, key); err != nil { + return nil, err + } + return value, nil +} + +// GetStateMultipleKeys implements method in interface `ledger.QueryExecutor` +func (q *RWLockQueryExecutor) GetStateMultipleKeys(namespace string, keys []string) ([][]byte, error) { + return nil, errors.New("Not yet implemented") +} + +// GetStateRangeScanIterator implements method in interface `ledger.QueryExecutor` +func (q *RWLockQueryExecutor) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (ledger.ResultsIterator, error) { + return nil, errors.New("Not yet implemented") +} + +// GetTransactionsForKey - implements method in interface `ledger.QueryExecutor` +func (q *RWLockQueryExecutor) GetTransactionsForKey(namespace string, key string) (ledger.ResultsIterator, error) { + return nil, errors.New("Not yet implemented") +} + +// ExecuteQuery implements method in interface `ledger.QueryExecutor` +func (q *RWLockQueryExecutor) ExecuteQuery(query string) (ledger.ResultsIterator, error) { + return nil, errors.New("Not supported by KV data model") +} diff --git a/core/ledgernext/kvledger/txmgmt/lockbasedtxmgmt/lockbased_tx_simulator.go b/core/ledgernext/kvledger/txmgmt/lockbasedtxmgmt/lockbased_tx_simulator.go new file mode 100644 index 00000000000..0a230530a66 --- /dev/null +++ b/core/ledgernext/kvledger/txmgmt/lockbasedtxmgmt/lockbased_tx_simulator.go @@ -0,0 +1,164 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lockbasedtxmgmt + +import ( + "errors" + "reflect" + + "github.com/hyperledger/fabric/core/ledgernext/kvledger/txmgmt" +) + +type kvReadCache struct { + kvRead *txmgmt.KVRead + cachedValue []byte +} + +type nsRWs struct { + readMap map[string]*kvReadCache + writeMap map[string]*txmgmt.KVWrite +} + +func newNsRWs() *nsRWs { + return &nsRWs{make(map[string]*kvReadCache), make(map[string]*txmgmt.KVWrite)} +} + +// LockBasedTxSimulator is a transaction simulator used in `LockBasedTxMgr` +type LockBasedTxSimulator struct { + RWLockQueryExecutor + rwMap map[string]*nsRWs + done bool +} + +func (s *LockBasedTxSimulator) getOrCreateNsRWHolder(ns string) *nsRWs { + var nsRWs *nsRWs + var ok bool + if nsRWs, ok = s.rwMap[ns]; !ok { + nsRWs = newNsRWs() + s.rwMap[ns] = nsRWs + } + return nsRWs +} + +// GetState implements method in interface `ledger.TxSimulator` +func (s *LockBasedTxSimulator) GetState(ns string, key string) ([]byte, error) { + logger.Debugf("Get state [%s:%s]", ns, key) + nsRWs := s.getOrCreateNsRWHolder(ns) + // check if it was written + kvWrite, ok := nsRWs.writeMap[key] + if ok { + logger.Debugf("Returing value for key=[%s:%s] from write set", ns, key, kvWrite.Value) + return kvWrite.Value, nil + } + // check if it was read + readCache, ok := nsRWs.readMap[key] + if ok { + logger.Debugf("Returing value for key=[%s:%s] from read set", ns, key, readCache.cachedValue) + return readCache.cachedValue, nil + } + + // read from storage + value, version, err := s.txmgr.getCommittedValueAndVersion(ns, key) + logger.Debugf("Read state from storage key=[%s:%s], value=[%#v], version=[%d]", ns, key, value, version) + if err != nil { + return nil, err + } + nsRWs.readMap[key] = &kvReadCache{txmgmt.NewKVRead(key, version), value} + return value, nil +} + +// SetState implements method in interface `ledger.TxSimulator` +func (s *LockBasedTxSimulator) SetState(ns string, key string, value []byte) error { + if s.done { + panic("This method should not be called after calling Done()") + } + nsRWs := s.getOrCreateNsRWHolder(ns) + kvWrite, ok := nsRWs.writeMap[key] + if ok { + kvWrite.SetValue(value) + return nil + } + nsRWs.writeMap[key] = txmgmt.NewKVWrite(key, value) + return nil +} + +// DeleteState implements method in interface `ledger.TxSimulator` +func (s *LockBasedTxSimulator) DeleteState(ns string, key string) error { + return s.SetState(ns, key, nil) +} + +// Done implements method in interface `ledger.TxSimulator` +func (s *LockBasedTxSimulator) Done() { + s.done = true + s.txmgr.commitRWLock.RUnlock() +} + +func (s *LockBasedTxSimulator) getTxReadWriteSet() *txmgmt.TxReadWriteSet { + txRWSet := &txmgmt.TxReadWriteSet{} + sortedNamespaces := getSortedKeys(s.rwMap) + for _, ns := range sortedNamespaces { + //Get namespace specific read-writes + nsReadWriteMap := s.rwMap[ns] + //add read set + reads := []*txmgmt.KVRead{} + sortedReadKeys := getSortedKeys(nsReadWriteMap.readMap) + for _, key := range sortedReadKeys { + reads = append(reads, nsReadWriteMap.readMap[key].kvRead) + } + + //add write set + writes := []*txmgmt.KVWrite{} + sortedWriteKeys := getSortedKeys(nsReadWriteMap.writeMap) + for _, key := range sortedWriteKeys { + writes = append(writes, nsReadWriteMap.writeMap[key]) + } + nsRWs := &txmgmt.NsReadWriteSet{NameSpace: ns, Reads: reads, Writes: writes} + txRWSet.NsRWs = append(txRWSet.NsRWs, nsRWs) + } + logger.Debugf("txRWSet = [%s]", txRWSet) + return txRWSet +} + +func getSortedKeys(m interface{}) []string { + mapVal := reflect.ValueOf(m) + keyVals := mapVal.MapKeys() + keys := []string{} + for _, keyVal := range keyVals { + keys = append(keys, keyVal.String()) + } + return keys +} + +// GetTxSimulationResults implements method in interface `ledger.TxSimulator` +func (s *LockBasedTxSimulator) GetTxSimulationResults() ([]byte, error) { + return s.getTxReadWriteSet().Marshal() +} + +// SetStateMultipleKeys implements method in interface `ledger.TxSimulator` +func (s *LockBasedTxSimulator) SetStateMultipleKeys(namespace string, kvs map[string][]byte) error { + return errors.New("Not yet implemented") +} + +// CopyState implements method in interface `ledger.TxSimulator` +func (s *LockBasedTxSimulator) CopyState(sourceNamespace string, targetNamespace string) error { + return errors.New("Not yet implemented") +} + +// ExecuteUpdate implements method in interface `ledger.TxSimulator` +func (s *LockBasedTxSimulator) ExecuteUpdate(query string) error { + return errors.New("Not supported by KV data model") +} diff --git a/core/ledgernext/kvledger/txmgmt/lockbasedtxmgmt/lockbased_txmgmt_test.go b/core/ledgernext/kvledger/txmgmt/lockbasedtxmgmt/lockbased_txmgmt_test.go new file mode 100644 index 00000000000..193133bbcbd --- /dev/null +++ b/core/ledgernext/kvledger/txmgmt/lockbasedtxmgmt/lockbased_txmgmt_test.go @@ -0,0 +1,194 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lockbasedtxmgmt + +import ( + "fmt" + "testing" + + "github.com/hyperledger/fabric/core/ledgernext/testutil" +) + +func TestTxSimulatorWithNoExistingData(t *testing.T) { + env := newTestEnv(t) + defer env.Cleanup() + txMgr := NewLockBasedTxMgr(env.conf) + defer txMgr.Shutdown() + s, _ := txMgr.NewTxSimulator() + value, err := s.GetState("ns1", "key1") + testutil.AssertNoError(t, err, fmt.Sprintf("Error in GetState(): %s", err)) + testutil.AssertNil(t, value) + + s.SetState("ns1", "key1", []byte("value1")) + s.SetState("ns1", "key2", []byte("value2")) + s.SetState("ns2", "key3", []byte("value3")) + s.SetState("ns2", "key4", []byte("value4")) + + value, _ = s.GetState("ns2", "key3") + testutil.AssertEquals(t, value, []byte("value3")) + + s.DeleteState("ns2", "key3") + value, _ = s.GetState("ns2", "key3") + testutil.AssertNil(t, value) +} + +func TestTxSimulatorWithExistingData(t *testing.T) { + env := newTestEnv(t) + defer env.Cleanup() + txMgr := NewLockBasedTxMgr(env.conf) + + // simulate tx1 + s1, _ := txMgr.NewTxSimulator() + defer txMgr.Shutdown() + s1.SetState("ns1", "key1", []byte("value1")) + s1.SetState("ns1", "key2", []byte("value2")) + s1.SetState("ns2", "key3", []byte("value3")) + s1.SetState("ns2", "key4", []byte("value4")) + s1.Done() + // validate and commit RWset + txRWSet := s1.(*LockBasedTxSimulator).getTxReadWriteSet() + isValid, err := txMgr.validateTx(txRWSet) + testutil.AssertNoError(t, err, fmt.Sprintf("Error in validateTx(): %s", err)) + testutil.AssertSame(t, isValid, true) + txMgr.addWriteSetToBatch(txRWSet) + err = txMgr.Commit() + testutil.AssertNoError(t, err, fmt.Sprintf("Error while calling commit(): %s", err)) + + // simulate tx2 that make changes to existing data + s2, _ := txMgr.NewTxSimulator() + value, _ := s2.GetState("ns1", "key1") + testutil.AssertEquals(t, value, []byte("value1")) + s2.SetState("ns1", "key1", []byte("value1_1")) + s2.DeleteState("ns2", "key3") + value, _ = s2.GetState("ns1", "key1") + testutil.AssertEquals(t, value, []byte("value1_1")) + s2.Done() + // validate and commit RWset for tx2 + txRWSet = s2.(*LockBasedTxSimulator).getTxReadWriteSet() + isValid, err = txMgr.validateTx(txRWSet) + testutil.AssertSame(t, isValid, true) + txMgr.addWriteSetToBatch(txRWSet) + txMgr.Commit() + + // simulate tx3 + s3, _ := txMgr.NewTxSimulator() + value, _ = s3.GetState("ns1", "key1") + testutil.AssertEquals(t, value, []byte("value1_1")) + value, _ = s3.GetState("ns2", "key3") + testutil.AssertEquals(t, value, nil) + s3.Done() + + // verify the versions of keys in persistence + ver, _ := txMgr.getCommitedVersion("ns1", "key1") + testutil.AssertEquals(t, ver, uint64(2)) + ver, _ = txMgr.getCommitedVersion("ns1", "key2") + testutil.AssertEquals(t, ver, uint64(1)) + ver, _ = txMgr.getCommitedVersion("ns2", "key3") + testutil.AssertEquals(t, ver, uint64(2)) +} + +func TestTxValidation(t *testing.T) { + env := newTestEnv(t) + defer env.Cleanup() + txMgr := NewLockBasedTxMgr(env.conf) + defer txMgr.Shutdown() + + // simulate tx1 + s1, _ := txMgr.NewTxSimulator() + s1.SetState("ns1", "key1", []byte("value1")) + s1.SetState("ns1", "key2", []byte("value2")) + s1.SetState("ns2", "key3", []byte("value3")) + s1.SetState("ns2", "key4", []byte("value4")) + s1.Done() + // validate and commit RWset + txRWSet := s1.(*LockBasedTxSimulator).getTxReadWriteSet() + isValid, err := txMgr.validateTx(txRWSet) + testutil.AssertNoError(t, err, fmt.Sprintf("Error in validateTx(): %s", err)) + testutil.AssertSame(t, isValid, true) + txMgr.addWriteSetToBatch(txRWSet) + err = txMgr.Commit() + testutil.AssertNoError(t, err, fmt.Sprintf("Error while calling commit(): %s", err)) + + // simulate tx2 that make changes to existing data + s2, _ := txMgr.NewTxSimulator() + value, _ := s2.GetState("ns1", "key1") + testutil.AssertEquals(t, value, []byte("value1")) + + s2.SetState("ns1", "key1", []byte("value1_2")) + s2.DeleteState("ns2", "key3") + s2.Done() + + // simulate tx3 before committing tx2 changes. Reads and modifies the key changed by tx2 + s3, _ := txMgr.NewTxSimulator() + s3.GetState("ns1", "key1") + s3.SetState("ns1", "key1", []byte("value1_3")) + s3.Done() + + // simulate tx4 before committing tx2 changes. Reads and Deletes the key changed by tx2 + s4, _ := txMgr.NewTxSimulator() + s4.GetState("ns2", "key3") + s4.DeleteState("ns2", "key3") + s4.Done() + + // simulate tx5 before committing tx2 changes. Modifies and then Reads the key changed by tx2 and writes a new key + s5, _ := txMgr.NewTxSimulator() + s5.SetState("ns1", "key1", []byte("new_value")) + s5.GetState("ns1", "key1") + s5.Done() + + // simulate tx6 before committing tx2 changes. Only writes a new key, does not reads/writes a key changed by tx2 + s6, _ := txMgr.NewTxSimulator() + s6.SetState("ns1", "new_key", []byte("new_value")) + s6.Done() + + // validate and commit RWset for tx2 + txRWSet = s2.(*LockBasedTxSimulator).getTxReadWriteSet() + isValid, err = txMgr.validateTx(txRWSet) + testutil.AssertNoError(t, err, fmt.Sprintf("Error in validateTx(): %s", err)) + testutil.AssertSame(t, isValid, true) + txMgr.addWriteSetToBatch(txRWSet) + txMgr.Commit() + + //RWSet for tx3 and tx4 should not be invalid now + isValid, err = txMgr.validateTx(s3.(*LockBasedTxSimulator).getTxReadWriteSet()) + testutil.AssertNoError(t, err, fmt.Sprintf("Error in validateTx(): %s", err)) + testutil.AssertSame(t, isValid, false) + + isValid, err = txMgr.validateTx(s4.(*LockBasedTxSimulator).getTxReadWriteSet()) + testutil.AssertNoError(t, err, fmt.Sprintf("Error in validateTx(): %s", err)) + testutil.AssertSame(t, isValid, false) + + //tx5 shold still be valid as it over-writes the key first and then reads + isValid, _ = txMgr.validateTx(s5.(*LockBasedTxSimulator).getTxReadWriteSet()) + testutil.AssertSame(t, isValid, true) + + // tx6 should still be valid as it only writes a new key + isValid, _ = txMgr.validateTx(s6.(*LockBasedTxSimulator).getTxReadWriteSet()) + testutil.AssertSame(t, isValid, true) +} + +func TestEncodeDecodeValueAndVersion(t *testing.T) { + testValueAndVersionEncodeing(t, []byte("value1"), uint64(1)) + testValueAndVersionEncodeing(t, nil, uint64(2)) +} + +func testValueAndVersionEncodeing(t *testing.T, value []byte, version uint64) { + encodedValue := encodeValue(value, version) + val, ver := decodeValue(encodedValue) + testutil.AssertEquals(t, val, value) + testutil.AssertEquals(t, ver, version) +} diff --git a/core/ledgernext/kvledger/txmgmt/lockbasedtxmgmt/lockbased_txmgr.go b/core/ledgernext/kvledger/txmgmt/lockbasedtxmgmt/lockbased_txmgr.go new file mode 100644 index 00000000000..b74a081a927 --- /dev/null +++ b/core/ledgernext/kvledger/txmgmt/lockbasedtxmgmt/lockbased_txmgr.go @@ -0,0 +1,270 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lockbasedtxmgmt + +import ( + "fmt" + "sync" + + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/core/ledgernext" + "github.com/hyperledger/fabric/core/ledgernext/kvledger/txmgmt" + "github.com/hyperledger/fabric/core/ledgernext/util/db" + "github.com/hyperledger/fabric/protos" + "github.com/op/go-logging" + "github.com/tecbot/gorocksdb" +) + +var logger = logging.MustGetLogger("lockbasedtxmgmt") + +// Conf - configuration for `LockBasedTxMgr` +type Conf struct { + DBPath string +} + +type versionedValue struct { + value []byte + version uint64 +} + +type updateSet struct { + m map[string]*versionedValue +} + +func newUpdateSet() *updateSet { + return &updateSet{make(map[string]*versionedValue)} +} + +func (u *updateSet) add(compositeKey []byte, vv *versionedValue) { + u.m[string(compositeKey)] = vv +} + +func (u *updateSet) exists(compositeKey []byte) bool { + _, ok := u.m[string(compositeKey)] + return ok +} + +func (u *updateSet) get(compositeKey []byte) *versionedValue { + return u.m[string(compositeKey)] +} + +// LockBasedTxMgr a simple implementation of interface `txmgmt.TxMgr`. +// This implementation uses a read-write lock to prevent conflicts between transaction simulation and committing +type LockBasedTxMgr struct { + db *db.DB + stateIndexCF *gorocksdb.ColumnFamilyHandle + updateSet *updateSet + commitRWLock sync.RWMutex +} + +// NewLockBasedTxMgr constructs a `LockBasedTxMgr` +func NewLockBasedTxMgr(conf *Conf) *LockBasedTxMgr { + db := db.CreateDB(&db.Conf{DBPath: conf.DBPath, CFNames: []string{}}) + db.Open() + return &LockBasedTxMgr{db: db, stateIndexCF: db.GetDefaultCFHandle()} +} + +// NewQueryExecutor implements method in interface `txmgmt.TxMgr` +func (txmgr *LockBasedTxMgr) NewQueryExecutor() (ledger.QueryExecutor, error) { + return &RWLockQueryExecutor{txmgr}, nil +} + +// NewTxSimulator implements method in interface `txmgmt.TxMgr` +func (txmgr *LockBasedTxMgr) NewTxSimulator() (ledger.TxSimulator, error) { + s := &LockBasedTxSimulator{RWLockQueryExecutor{txmgr}, make(map[string]*nsRWs), false} + s.txmgr.commitRWLock.RLock() + return s, nil +} + +// ValidateAndPrepare implements method in interface `txmgmt.TxMgr` +func (txmgr *LockBasedTxMgr) ValidateAndPrepare(block *protos.Block2) (*protos.Block2, []*protos.InvalidTransaction, error) { + validatedBlock := &protos.Block2{} + //TODO pull PreviousBlockHash from db + validatedBlock.PreviousBlockHash = block.PreviousBlockHash + invalidTxs := []*protos.InvalidTransaction{} + var valid bool + var err error + txmgr.updateSet = newUpdateSet() + logger.Debugf("Validating a block with [%d] transactions", len(block.Transactions)) + for _, txBytes := range block.Transactions { + tx := &protos.Transaction2{} + err = proto.Unmarshal(txBytes, tx) + if err != nil { + return nil, nil, err + } + txRWSet := &txmgmt.TxReadWriteSet{} + numEndorsements := len(tx.EndorsedActions) + if numEndorsements == 0 { + return nil, nil, fmt.Errorf("Tx contains no EndorsedActions") + } + if numEndorsements > 1 { + return nil, nil, fmt.Errorf("Tx contains more than one [%d] EndorsedActions", numEndorsements) + } + if err = txRWSet.Unmarshal(tx.EndorsedActions[0].ActionBytes); err != nil { + return nil, nil, err + } + logger.Debugf("validating txRWSet:[%s]", txRWSet) + if valid, err = txmgr.validateTx(txRWSet); err != nil { + return nil, nil, err + } + + if valid { + if err := txmgr.addWriteSetToBatch(txRWSet); err != nil { + return nil, nil, err + } + validatedBlock.Transactions = append(validatedBlock.Transactions, txBytes) + } else { + invalidTxs = append(invalidTxs, &protos.InvalidTransaction{ + Transaction: tx, Cause: protos.InvalidTransaction_RWConflictDuringCommit}) + } + } + return validatedBlock, invalidTxs, nil +} + +// Shutdown implements method in interface `txmgmt.TxMgr` +func (txmgr *LockBasedTxMgr) Shutdown() { + txmgr.db.Close() +} + +func (txmgr *LockBasedTxMgr) validateTx(txRWSet *txmgmt.TxReadWriteSet) (bool, error) { + logger.Debugf("Validating txRWSet:%s", txRWSet) + var err error + var currentVersion uint64 + + for _, nsRWSet := range txRWSet.NsRWs { + ns := nsRWSet.NameSpace + for _, kvRead := range nsRWSet.Reads { + compositeKey := constructCompositeKey(ns, kvRead.Key) + if txmgr.updateSet != nil && txmgr.updateSet.exists(compositeKey) { + return false, nil + } + if currentVersion, err = txmgr.getCommitedVersion(ns, kvRead.Key); err != nil { + return false, err + } + if currentVersion != kvRead.Version { + logger.Debugf("Version mismatch for key [%s:%s]. Current version = [%d], Version in readSet [%d]", + ns, kvRead.Key, currentVersion, kvRead.Version) + return false, nil + } + } + } + return true, nil +} + +func (txmgr *LockBasedTxMgr) addWriteSetToBatch(txRWSet *txmgmt.TxReadWriteSet) error { + var err error + var currentVersion uint64 + + if txmgr.updateSet == nil { + txmgr.updateSet = newUpdateSet() + } + for _, nsRWSet := range txRWSet.NsRWs { + ns := nsRWSet.NameSpace + for _, kvWrite := range nsRWSet.Writes { + compositeKey := constructCompositeKey(ns, kvWrite.Key) + versionedVal := txmgr.updateSet.get(compositeKey) + if versionedVal != nil { + currentVersion = versionedVal.version + } else { + currentVersion, err = txmgr.getCommitedVersion(ns, kvWrite.Key) + if err != nil { + return err + } + } + txmgr.updateSet.add(compositeKey, &versionedValue{kvWrite.Value, currentVersion + 1}) + } + } + return nil +} + +// Commit implements method in interface `txmgmt.TxMgr` +func (txmgr *LockBasedTxMgr) Commit() error { + batch := gorocksdb.NewWriteBatch() + if txmgr.updateSet == nil { + panic("validateAndPrepare() method should have been called before calling commit()") + } + for k, v := range txmgr.updateSet.m { + batch.PutCF(txmgr.stateIndexCF, []byte(k), encodeValue(v.value, v.version)) + } + txmgr.commitRWLock.Lock() + defer txmgr.commitRWLock.Unlock() + defer func() { txmgr.updateSet = nil }() + defer batch.Destroy() + if err := txmgr.db.WriteBatch(batch); err != nil { + return err + } + return nil +} + +// Rollback implements method in interface `txmgmt.TxMgr` +func (txmgr *LockBasedTxMgr) Rollback() { + txmgr.updateSet = nil +} + +func (txmgr *LockBasedTxMgr) getCommitedVersion(ns string, key string) (uint64, error) { + var err error + var version uint64 + if _, version, err = txmgr.getCommittedValueAndVersion(ns, key); err != nil { + return 0, err + } + return version, nil +} + +func (txmgr *LockBasedTxMgr) getCommittedValueAndVersion(ns string, key string) ([]byte, uint64, error) { + compositeKey := constructCompositeKey(ns, key) + var encodedValue []byte + var err error + if encodedValue, err = txmgr.db.Get(txmgr.stateIndexCF, compositeKey); err != nil { + return nil, 0, err + } + if encodedValue == nil { + return nil, 0, nil + } + value, version := decodeValue(encodedValue) + return value, version, nil +} + +func encodeValue(value []byte, version uint64) []byte { + versionBytes := proto.EncodeVarint(version) + deleteMarker := 0 + if value == nil { + deleteMarker = 1 + } + deleteMarkerBytes := proto.EncodeVarint(uint64(deleteMarker)) + encodedValue := append(versionBytes, deleteMarkerBytes...) + if value != nil { + encodedValue = append(encodedValue, value...) + } + return encodedValue +} + +func decodeValue(encodedValue []byte) ([]byte, uint64) { + version, len1 := proto.DecodeVarint(encodedValue) + deleteMarker, len2 := proto.DecodeVarint(encodedValue[len1:]) + if deleteMarker == 1 { + return nil, version + } + value := encodedValue[len1+len2:] + return value, version +} + +func constructCompositeKey(ns string, key string) []byte { + compositeKey := []byte(ns) + compositeKey = append(compositeKey, byte(0)) + compositeKey = append(compositeKey, []byte(key)...) + return compositeKey +} diff --git a/core/ledgernext/kvledger/txmgmt/lockbasedtxmgmt/pkg_test.go b/core/ledgernext/kvledger/txmgmt/lockbasedtxmgmt/pkg_test.go new file mode 100644 index 00000000000..33cc42e0706 --- /dev/null +++ b/core/ledgernext/kvledger/txmgmt/lockbasedtxmgmt/pkg_test.go @@ -0,0 +1,36 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lockbasedtxmgmt + +import ( + "os" + "testing" +) + +type testEnv struct { + conf *Conf +} + +func newTestEnv(t testing.TB) *testEnv { + conf := &Conf{"/tmp/tests/ledgernext/kvledger/txmgmt/lockbasedtxmgmt"} + os.RemoveAll(conf.DBPath) + return &testEnv{conf} +} + +func (env *testEnv) Cleanup() { + os.RemoveAll(env.conf.DBPath) +} diff --git a/core/ledgernext/kvledger/txmgmt/rwset.go b/core/ledgernext/kvledger/txmgmt/rwset.go new file mode 100644 index 00000000000..ea374e1d632 --- /dev/null +++ b/core/ledgernext/kvledger/txmgmt/rwset.go @@ -0,0 +1,253 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package txmgmt + +import ( + "bytes" + "fmt" + + "github.com/golang/protobuf/proto" +) + +// KVRead - a tuple of key and its version at the time of transaction simulation +type KVRead struct { + Key string + Version uint64 +} + +// NewKVRead constructs a new `KVRead` +func NewKVRead(key string, version uint64) *KVRead { + return &KVRead{key, version} +} + +// KVWrite - a tuple of key and it's value that a transaction wants to set during simulation. +// In addition, IsDelete is set to true iff the operation performed on the key is a delete operation +type KVWrite struct { + Key string + IsDelete bool + Value []byte +} + +// NewKVWrite constructs a new `KVWrite` +func NewKVWrite(key string, value []byte) *KVWrite { + return &KVWrite{key, value == nil, value} +} + +// SetValue sets the new value for the key +func (w *KVWrite) SetValue(value []byte) { + w.Value = value + w.IsDelete = value == nil +} + +// NsReadWriteSet - a collection of all the reads and writes that belong to a common namespace +type NsReadWriteSet struct { + NameSpace string + Reads []*KVRead + Writes []*KVWrite +} + +// TxReadWriteSet - a collection of all the reads and writes collected as a result of a transaction simulation +type TxReadWriteSet struct { + NsRWs []*NsReadWriteSet +} + +// Marshal serializes a `KVRead` +func (r *KVRead) Marshal(buf *proto.Buffer) error { + if err := buf.EncodeStringBytes(r.Key); err != nil { + return err + } + if err := buf.EncodeVarint(r.Version); err != nil { + return err + } + return nil +} + +// Unmarshal deserializes a `KVRead` +func (r *KVRead) Unmarshal(buf *proto.Buffer) error { + var err error + if r.Key, err = buf.DecodeStringBytes(); err != nil { + return err + } + if r.Version, err = buf.DecodeVarint(); err != nil { + return err + } + return nil +} + +// Marshal serializes a `KVWrite` +func (w *KVWrite) Marshal(buf *proto.Buffer) error { + var err error + if err = buf.EncodeStringBytes(w.Key); err != nil { + return err + } + deleteMarker := 0 + if w.IsDelete { + deleteMarker = 1 + } + if err = buf.EncodeVarint(uint64(deleteMarker)); err != nil { + return err + } + if deleteMarker == 0 { + if err = buf.EncodeRawBytes(w.Value); err != nil { + return err + } + } + return nil +} + +// Unmarshal deserializes a `KVWrite` +func (w *KVWrite) Unmarshal(buf *proto.Buffer) error { + var err error + if w.Key, err = buf.DecodeStringBytes(); err != nil { + return err + } + var deleteMarker uint64 + if deleteMarker, err = buf.DecodeVarint(); err != nil { + return err + } + if deleteMarker == 1 { + w.IsDelete = true + return nil + } + if w.Value, err = buf.DecodeRawBytes(false); err != nil { + return err + } + return nil +} + +// Marshal serializes a `NsReadWriteSet` +func (nsRW *NsReadWriteSet) Marshal(buf *proto.Buffer) error { + var err error + if err = buf.EncodeStringBytes(nsRW.NameSpace); err != nil { + return err + } + if err = buf.EncodeVarint(uint64(len(nsRW.Reads))); err != nil { + return err + } + for i := 0; i < len(nsRW.Reads); i++ { + nsRW.Reads[i].Marshal(buf) + } + if err = buf.EncodeVarint(uint64(len(nsRW.Writes))); err != nil { + return err + } + for i := 0; i < len(nsRW.Writes); i++ { + nsRW.Writes[i].Marshal(buf) + } + return nil +} + +// Unmarshal deserializes a `NsReadWriteSet` +func (nsRW *NsReadWriteSet) Unmarshal(buf *proto.Buffer) error { + var err error + if nsRW.NameSpace, err = buf.DecodeStringBytes(); err != nil { + return err + } + var numReads uint64 + if numReads, err = buf.DecodeVarint(); err != nil { + return err + } + for i := 0; i < int(numReads); i++ { + r := &KVRead{} + if err = r.Unmarshal(buf); err != nil { + return err + } + nsRW.Reads = append(nsRW.Reads, r) + } + + var numWrites uint64 + if numWrites, err = buf.DecodeVarint(); err != nil { + return err + } + for i := 0; i < int(numWrites); i++ { + w := &KVWrite{} + if err = w.Unmarshal(buf); err != nil { + return err + } + nsRW.Writes = append(nsRW.Writes, w) + } + return nil +} + +// Marshal serializes a `TxReadWriteSet` +func (txRW *TxReadWriteSet) Marshal() ([]byte, error) { + buf := proto.NewBuffer(nil) + var err error + if err = buf.EncodeVarint(uint64(len(txRW.NsRWs))); err != nil { + return nil, err + } + for i := 0; i < len(txRW.NsRWs); i++ { + if err = txRW.NsRWs[i].Marshal(buf); err != nil { + return nil, err + } + } + return buf.Bytes(), nil +} + +// Unmarshal deserializes a `TxReadWriteSet` +func (txRW *TxReadWriteSet) Unmarshal(b []byte) error { + buf := proto.NewBuffer(b) + var err error + var numEntries uint64 + if numEntries, err = buf.DecodeVarint(); err != nil { + return err + } + for i := 0; i < int(numEntries); i++ { + nsRW := &NsReadWriteSet{} + if err = nsRW.Unmarshal(buf); err != nil { + return err + } + txRW.NsRWs = append(txRW.NsRWs, nsRW) + } + return nil +} + +// String prints a `KVRead` +func (r *KVRead) String() string { + return fmt.Sprintf("%s:%d", r.Key, r.Version) +} + +// String prints a `KVWrite` +func (w *KVWrite) String() string { + return fmt.Sprintf("%s=[%#v]", w.Key, w.Value) +} + +// String prints a `NsReadWriteSet` +func (nsRW *NsReadWriteSet) String() string { + var buffer bytes.Buffer + buffer.WriteString("ReadSet~") + for _, r := range nsRW.Reads { + buffer.WriteString(r.String()) + buffer.WriteString(",") + } + buffer.WriteString("WriteSet~") + for _, w := range nsRW.Writes { + buffer.WriteString(w.String()) + buffer.WriteString(",") + } + return buffer.String() +} + +// String prints a `TxReadWriteSet` +func (txRW *TxReadWriteSet) String() string { + var buffer bytes.Buffer + for _, nsRWSet := range txRW.NsRWs { + buffer.WriteString(nsRWSet.NameSpace) + buffer.WriteString("::") + buffer.WriteString(nsRWSet.String()) + } + return buffer.String() +} diff --git a/core/ledgernext/kvledger/txmgmt/rwset_test.go b/core/ledgernext/kvledger/txmgmt/rwset_test.go new file mode 100644 index 00000000000..151145ba5b4 --- /dev/null +++ b/core/ledgernext/kvledger/txmgmt/rwset_test.go @@ -0,0 +1,50 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package txmgmt + +import ( + "testing" + + "github.com/hyperledger/fabric/core/ledgernext/testutil" +) + +func TestTxRWSetMarshalUnmarshal(t *testing.T) { + txRW := &TxReadWriteSet{} + nsRW1 := &NsReadWriteSet{"ns1", + []*KVRead{&KVRead{"key1", uint64(1)}}, + []*KVWrite{&KVWrite{"key2", false, []byte("value2")}}} + + nsRW2 := &NsReadWriteSet{"ns2", + []*KVRead{&KVRead{"key3", uint64(1)}}, + []*KVWrite{&KVWrite{"key4", true, nil}}} + + nsRW3 := &NsReadWriteSet{"ns3", + []*KVRead{&KVRead{"key5", uint64(1)}}, + []*KVWrite{&KVWrite{"key6", false, []byte("value6")}, &KVWrite{"key7", false, []byte("value7")}}} + + txRW.NsRWs = append(txRW.NsRWs, nsRW1, nsRW2, nsRW3) + + b, err := txRW.Marshal() + testutil.AssertNoError(t, err, "Error while marshalling changeset") + + deserializedRWSet := &TxReadWriteSet{} + err = deserializedRWSet.Unmarshal(b) + testutil.AssertNoError(t, err, "Error while unmarshalling changeset") + t.Logf("Unmarshalled changeset = %#+v", deserializedRWSet.NsRWs[0].Writes[0].IsDelete) + testutil.AssertEquals(t, deserializedRWSet, txRW) + +} diff --git a/core/ledgernext/kvledger/txmgmt/txmgmt.go b/core/ledgernext/kvledger/txmgmt/txmgmt.go new file mode 100644 index 00000000000..cabab8cacc2 --- /dev/null +++ b/core/ledgernext/kvledger/txmgmt/txmgmt.go @@ -0,0 +1,32 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package txmgmt + +import ( + "github.com/hyperledger/fabric/core/ledgernext" + "github.com/hyperledger/fabric/protos" +) + +// TxMgr - an interface that a transaction manager should implement +type TxMgr interface { + NewQueryExecutor() (ledger.QueryExecutor, error) + NewTxSimulator() (ledger.TxSimulator, error) + ValidateAndPrepare(block *protos.Block2) (*protos.Block2, []*protos.InvalidTransaction, error) + Commit() error + Rollback() + Shutdown() +} diff --git a/core/ledgernext/ledger_interface.go b/core/ledgernext/ledger_interface.go new file mode 100644 index 00000000000..26bb6c687e2 --- /dev/null +++ b/core/ledgernext/ledger_interface.go @@ -0,0 +1,150 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ledger + +import ( + "github.com/hyperledger/fabric/protos" +) + +// Ledger captures the methods that are common across the 'raw ledger' and the 'final ledger' +type Ledger interface { + // GetTransactionByID retrieves a transaction by id + GetTransactionByID(txID string) (*protos.Transaction2, error) + // GetBlockchainInfo returns basic info about blockchain + GetBlockchainInfo() (*protos.BlockchainInfo, error) + // GetBlockchainInfo returns block at a given height + GetBlockByNumber(blockNumber uint64) (*protos.Block2, error) + // GetBlocksByNumber returns all the blocks between given heights (both inclusive). ResultsIterator contains type BlockHolder + GetBlocksByNumber(startBlockNumber, endBlockNumber uint64) (ResultsIterator, error) + // GetBlockByHash returns a block given it's hash + GetBlockByHash(blockHash []byte) (*protos.Block2, error) + //VerifyChain will verify the integrity of the blockchain. This is accomplished + // by ensuring that the previous block hash stored in each block matches + // the actual hash of the previous block in the chain. The return value is the + // block number of lowest block in the range which can be verified as valid. + VerifyChain(startBlockNumber, endBlockNumber uint64) (uint64, error) + //Prune prunes the blocks/transactions that satisfy the given policy + Prune(policy PrunePolicy) error + // Close closes the ledger + Close() +} + +// RawLedger implements methods required by 'raw ledger' +type RawLedger interface { + Ledger + // CommitBlock adds a new block + CommitBlock(block *protos.Block2) error +} + +// ValidatedLedger represents the 'final ledger'. In addition to implement the methods inherited from the Ledger, +// it provides the handle to objects for querying the state and executing transactions. +type ValidatedLedger interface { + Ledger + // NewTxSimulator gives handle to a transaction simulator. + // A client can obtain more than one 'TxSimulator's for parallel execution. + // Any snapshoting/synchronization should be performed at the implementation level if required + NewTxSimulator() (TxSimulator, error) + // NewQueryExecuter gives handle to a query executer. + // A client can obtain more than one 'QueryExecutor's for parallel execution. + // Any synchronization should be performed at the implementation level if required + NewQueryExecutor() (QueryExecutor, error) + // RemoveInvalidTransactions validates all the transactions in the given block + // and returns a block that contains only valid transactions and a list of transactions that are invalid + RemoveInvalidTransactionsAndPrepare(block *protos.Block2) (*protos.Block2, []*protos.InvalidTransaction, error) + // Commit commits the changes prepared in the method RemoveInvalidTransactionsAndPrepare. + // Commits both the valid block and related state changes + Commit() error + // Rollback rollbacks the changes prepared in the method RemoveInvalidTransactionsAndPrepare + Rollback() +} + +// QueryExecutor executes the queries +// Get* methods are for supporting KV-based data model. ExecuteQuery method is for supporting a rich datamodel and query support +// +// ExecuteQuery method in the case of a rich data model is expected to support queries on +// latest state, historical state and on the intersection of state and transactions +type QueryExecutor interface { + // GetState gets the value for given namespace and key. For a chaincode, the namespace corresponds to the chaincodeId + GetState(namespace string, key string) ([]byte, error) + // GetStateMultipleKeys gets the values for multiple keys in a single call + GetStateMultipleKeys(namespace string, keys []string) ([][]byte, error) + // GetStateRangeScanIterator returns an iterator that contains all the key-values beteen given key ranges. + // The returned ResultsIterator contains results of type *KV + GetStateRangeScanIterator(namespace string, startKey string, endKey string) (ResultsIterator, error) + // GetTransactionsForKey returns an iterator that contains all the transactions that modified the given key. + // The returned ResultsIterator contains results of type *msgs.Transaction + GetTransactionsForKey(namespace string, key string) (ResultsIterator, error) + // ExecuteQuery executes the given query and returns an iterator that contains results of type specific to the underlying data store. + ExecuteQuery(query string) (ResultsIterator, error) +} + +// TxSimulator simulates a transaction on a consistent snapshot of the 'as recent state as possible' +// Set* methods are for supporting KV-based data model. ExecuteUpdate method is for supporting a rich datamodel and query support +type TxSimulator interface { + QueryExecutor + // SetState sets the given value for the given namespace and key. For a chaincode, the namespace corresponds to the chaincodeId + SetState(namespace string, key string, value []byte) error + // DeleteState deletes the given namespace and key + DeleteState(namespace string, key string) error + // SetMultipleKeys sets the values for multiple keys in a single call + SetStateMultipleKeys(namespace string, kvs map[string][]byte) error + // ExecuteUpdate for supporting rich data model (see comments on QueryExecutor above) + ExecuteUpdate(query string) error + // CopyState copies the entire state in the sourceNamespace to the targetNamespace. This can be a large payload + CopyState(sourceNamespace string, targetNamespace string) error + // Done releases resources occupied by the TxSimulator + Done() + // GetTxSimulationResults encapsulates the results of the transaction simulation. + // This should contain enough detail for + // - The update in the state that would be caused if the transaction is to be committed + // - The environment in which the transaction is executed so as to be able to decide the validity of the enviroment + // (at a later time on a different peer) during committing the transactions + // Different ledger implementation (or configurations of a single implementation) may want to represent the above two pieces + // of information in different way in order to support different data-models or optimize the information representations. + // TODO detailed illustration of a couple of representations. + GetTxSimulationResults() ([]byte, error) +} + +// ResultsIterator - an iterator for query result set +type ResultsIterator interface { + // Next moves to next item in the result set. Returns true if next item exists. + //The first call to this method moves the cursor to the first item in the result set + Next() bool + // Get returns current result item + Get() (QueryResult, error) + // Close releases resources occupied by the iterator + Close() +} + +// QueryResult - a general interface for supporting different types of query results. Actual types differ for different queries +type QueryResult interface{} + +// KV - QueryResult for KV-based datamodel. Holds a key and corresponding value. A nil value indicates a non-existent key. +type KV struct { + Key string + Value []byte +} + +// BlockHolder holds block returned by the iterator in GetBlocksByNumber. +// The sole purpose of this holder is to avoid desrialization if block is desired in raw bytes form (e.g., for transfer) +type BlockHolder interface { + GetBlock() *protos.Block2 + GetBlockBytes() []byte +} + +// PrunePolicy - a general interface for supporting different pruning policies +type PrunePolicy interface{} diff --git a/core/ledgernext/testutil/test_helper.go b/core/ledgernext/testutil/test_helper.go new file mode 100644 index 00000000000..c25932d7902 --- /dev/null +++ b/core/ledgernext/testutil/test_helper.go @@ -0,0 +1,73 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testutil + +import ( + "testing" + + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/protos" +) + +// ConstructBlockForSimulationResults constructs a block that includes a number of transactions - one per simulationResults +func ConstructBlockForSimulationResults(t *testing.T, simulationResults [][]byte) *protos.Block2 { + txs := []*protos.Transaction2{} + for i := 0; i < len(simulationResults); i++ { + tx := ConstructTestTransaction(t, simulationResults[i]) + txs = append(txs, tx) + } + return newBlock(txs) +} + +// ConstructTestBlocks constructs 'numBlocks' number of blocks for testing +func ConstructTestBlocks(t *testing.T, numBlocks int) []*protos.Block2 { + blocks := []*protos.Block2{} + for i := 0; i < numBlocks; i++ { + blocks = append(blocks, ConstructTestBlock(t, 10, i*10)) + } + return blocks +} + +// ConstructTestBlock constructs a block with 'numTx' number of transactions for testing +func ConstructTestBlock(t *testing.T, numTx int, startingTxID int) *protos.Block2 { + txs := []*protos.Transaction2{} + for i := startingTxID; i < numTx+startingTxID; i++ { + tx := &protos.Transaction2{} + tx.EndorsedActions = []*protos.EndorsedAction{ + &protos.EndorsedAction{ActionBytes: ConstructRandomBytes(t, 100), Endorsements: []*protos.Endorsement{}, ProposalBytes: []byte{}}} + txs = append(txs, tx) + } + return newBlock(txs) +} + +// ConstructTestTransaction constructs a transaction for testing +func ConstructTestTransaction(t *testing.T, simulationResults []byte) *protos.Transaction2 { + tx := &protos.Transaction2{} + tx.EndorsedActions = []*protos.EndorsedAction{ + &protos.EndorsedAction{ActionBytes: simulationResults, Endorsements: []*protos.Endorsement{}, ProposalBytes: []byte{}}} + return tx +} + +func newBlock(txs []*protos.Transaction2) *protos.Block2 { + block := &protos.Block2{} + block.PreviousBlockHash = []byte{} + for i := 0; i < len(txs); i++ { + txBytes, _ := proto.Marshal(txs[i]) + block.Transactions = append(block.Transactions, txBytes) + } + return block +} diff --git a/core/ledgernext/testutil/test_util.go b/core/ledgernext/testutil/test_util.go new file mode 100644 index 00000000000..04f1ae13c14 --- /dev/null +++ b/core/ledgernext/testutil/test_util.go @@ -0,0 +1,236 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testutil + +import ( + "crypto/rand" + "flag" + "fmt" + mathRand "math/rand" + "reflect" + "regexp" + "runtime" + "strings" + "testing" + "time" + + "github.com/hyperledger/fabric/core/util" + "github.com/op/go-logging" + "github.com/spf13/viper" +) + +// TestRandomNumberGenerator a random number generator for testing +type TestRandomNumberGenerator struct { + rand *mathRand.Rand + maxNumber int +} + +// NewTestRandomNumberGenerator constructs a new `TestRandomNumberGenerator` +func NewTestRandomNumberGenerator(maxNumber int) *TestRandomNumberGenerator { + return &TestRandomNumberGenerator{ + mathRand.New(mathRand.NewSource(time.Now().UnixNano())), + maxNumber, + } +} + +// Next generates next random number +func (randNumGenerator *TestRandomNumberGenerator) Next() int { + return randNumGenerator.rand.Intn(randNumGenerator.maxNumber) +} + +// SetupTestConfig sets up configurations for tetsing +func SetupTestConfig() { + viper.AddConfigPath(".") + viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) + viper.AutomaticEnv() + viper.SetDefault("peer.ledger.test.loadYAML", true) + loadYAML := viper.GetBool("peer.ledger.test.loadYAML") + if loadYAML { + viper.SetConfigName("test") + err := viper.ReadInConfig() + if err != nil { // Handle errors reading the config file + panic(fmt.Errorf("Fatal error config file: %s \n", err)) + } + } + var formatter = logging.MustStringFormatter( + `%{color}%{time:15:04:05.000} [%{module}] %{shortfunc} [%{shortfile}] -> %{level:.4s} %{id:03x}%{color:reset} %{message}`, + ) + logging.SetFormatter(formatter) +} + +// SetLogLevel sets up log level +func SetLogLevel(level logging.Level, module string) { + logging.SetLevel(level, module) +} + +// ParseTestParams parses tests params +func ParseTestParams() []string { + testParams := flag.String("testParams", "", "Test specific parameters") + flag.Parse() + regex, err := regexp.Compile(",(\\s+)?") + if err != nil { + panic(fmt.Errorf("err = %s\n", err)) + } + paramsArray := regex.Split(*testParams, -1) + return paramsArray +} + +// AssertNil varifies that the value is nil +func AssertNil(t testing.TB, value interface{}) { + if !isNil(value) { + t.Fatalf("Value not nil. value=[%#v]\n %s", value, getCallerInfo()) + } +} + +// AssertNotNil varifies that the value is not nil +func AssertNotNil(t testing.TB, value interface{}) { + if isNil(value) { + t.Fatalf("Values is nil. %s", getCallerInfo()) + } +} + +// AssertSame varifies that the two values are same +func AssertSame(t testing.TB, actual interface{}, expected interface{}) { + t.Logf("%s: AssertSame [%#v] and [%#v]", getCallerInfo(), actual, expected) + if actual != expected { + t.Fatalf("Values actual=[%#v] and expected=[%#v] do not point to same object. %s", actual, expected, getCallerInfo()) + } +} + +// AssertEquals varifies that the two values are equal +func AssertEquals(t testing.TB, actual interface{}, expected interface{}) { + t.Logf("%s: AssertEquals [%#v] and [%#v]", getCallerInfo(), actual, expected) + if expected == nil && isNil(actual) { + return + } + if !reflect.DeepEqual(actual, expected) { + t.Fatalf("Values are not equal.\n Actual=[%#v], \n Expected=[%#v]\n %s", actual, expected, getCallerInfo()) + } +} + +// AssertNotEquals varifies that the two values are not equal +func AssertNotEquals(t testing.TB, actual interface{}, expected interface{}) { + if reflect.DeepEqual(actual, expected) { + t.Fatalf("Values are not supposed to be equal. Actual=[%#v], Expected=[%#v]\n %s", actual, expected, getCallerInfo()) + } +} + +// AssertError varifies that the err is not nil +func AssertError(t testing.TB, err error, message string) { + if err == nil { + t.Fatalf("%s\n %s", message, getCallerInfo()) + } +} + +// AssertNoError varifies that the err is nil +func AssertNoError(t testing.TB, err error, message string) { + if err != nil { + t.Fatalf("%s - Error: %s\n %s", message, err, getCallerInfo()) + } +} + +// AssertContains varifies that the slice contains the value +func AssertContains(t testing.TB, slice interface{}, value interface{}) { + if reflect.TypeOf(slice).Kind() != reflect.Slice && reflect.TypeOf(slice).Kind() != reflect.Array { + t.Fatalf("Type of argument 'slice' is expected to be a slice/array, found =[%s]\n %s", reflect.TypeOf(slice), getCallerInfo()) + } + + if !contains(slice, value) { + t.Fatalf("Expected value [%s] not found in slice %s\n %s", value, slice, getCallerInfo()) + } +} + +// AssertContainsAll varifies that sliceActual is a superset of sliceExpected +func AssertContainsAll(t testing.TB, sliceActual interface{}, sliceExpected interface{}) { + if reflect.TypeOf(sliceActual).Kind() != reflect.Slice && reflect.TypeOf(sliceActual).Kind() != reflect.Array { + t.Fatalf("Type of argument 'sliceActual' is expected to be a slice/array, found =[%s]\n %s", reflect.TypeOf(sliceActual), getCallerInfo()) + } + + if reflect.TypeOf(sliceExpected).Kind() != reflect.Slice && reflect.TypeOf(sliceExpected).Kind() != reflect.Array { + t.Fatalf("Type of argument 'sliceExpected' is expected to be a slice/array, found =[%s]\n %s", reflect.TypeOf(sliceExpected), getCallerInfo()) + } + + array := reflect.ValueOf(sliceExpected) + for i := 0; i < array.Len(); i++ { + element := array.Index(i).Interface() + if !contains(sliceActual, element) { + t.Fatalf("Expected value [%s] not found in slice %s\n %s", element, sliceActual, getCallerInfo()) + } + } +} + +// AssertPanic varifies that a panic is raised during a test +func AssertPanic(t testing.TB, msg string) { + x := recover() + if x == nil { + t.Fatal(msg) + } else { + t.Logf("A panic was caught successfully. Actual msg = %s", x) + } +} + +// ComputeCryptoHash computes crypto hash for testing +func ComputeCryptoHash(content ...[]byte) []byte { + return util.ComputeCryptoHash(AppendAll(content...)) +} + +// AppendAll combines the bytes from different []byte into one []byte +func AppendAll(content ...[]byte) []byte { + combinedContent := []byte{} + for _, b := range content { + combinedContent = append(combinedContent, b...) + } + return combinedContent +} + +// GenerateID generates a uuid +func GenerateID(t *testing.T) string { + return util.GenerateUUID() +} + +// ConstructRandomBytes constructs random bytes of given size +func ConstructRandomBytes(t testing.TB, size int) []byte { + value := make([]byte, size) + _, err := rand.Read(value) + if err != nil { + t.Fatalf("Error while generating random bytes: %s", err) + } + return value +} + +func contains(slice interface{}, value interface{}) bool { + array := reflect.ValueOf(slice) + for i := 0; i < array.Len(); i++ { + element := array.Index(i).Interface() + if value == element || reflect.DeepEqual(element, value) { + return true + } + } + return false +} + +func isNil(in interface{}) bool { + return in == nil || reflect.ValueOf(in).IsNil() || (reflect.TypeOf(in).Kind() == reflect.Slice && reflect.ValueOf(in).Len() == 0) +} + +func getCallerInfo() string { + _, file, line, ok := runtime.Caller(2) + if !ok { + return "Could not retrieve caller's info" + } + return fmt.Sprintf("CallerInfo = [%s:%d]", file, line) +} diff --git a/core/ledgernext/testutil/test_util_test.go b/core/ledgernext/testutil/test_util_test.go new file mode 100644 index 00000000000..3cb0a13871f --- /dev/null +++ b/core/ledgernext/testutil/test_util_test.go @@ -0,0 +1,23 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testutil + +import "testing" + +func TestSkipAll(t *testing.T) { + t.Skip(`No tests in this package for now - This package contains only utility functions that are meant to be used by other functional tests`) +} diff --git a/core/ledgernext/util/db/db.go b/core/ledgernext/util/db/db.go new file mode 100644 index 00000000000..5834e7f025c --- /dev/null +++ b/core/ledgernext/util/db/db.go @@ -0,0 +1,190 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package db + +import ( + "fmt" + "sync" + + "github.com/hyperledger/fabric/core/ledgernext/util" + "github.com/op/go-logging" + "github.com/tecbot/gorocksdb" +) + +var logger = logging.MustGetLogger("kvledger.db") + +type dbState int32 + +const ( + defaultCFName = "default" + closed dbState = iota + opened +) + +// Conf configuration for `DB` +type Conf struct { + DBPath string + CFNames []string +} + +// DB - a rocksDB instance +type DB struct { + conf *Conf + rocksDB *gorocksdb.DB + cfHandlesMap map[string]*gorocksdb.ColumnFamilyHandle + dbState dbState + mux sync.Mutex +} + +// CreateDB constructs a `DB` +func CreateDB(conf *Conf) *DB { + conf.CFNames = append(conf.CFNames, defaultCFName) + return &DB{conf: conf, cfHandlesMap: make(map[string]*gorocksdb.ColumnFamilyHandle), dbState: closed} +} + +// Open open underlying rocksdb +func (dbInst *DB) Open() { + dbInst.mux.Lock() + if dbInst.dbState == opened { + dbInst.mux.Unlock() + return + } + + defer dbInst.mux.Unlock() + + dbPath := dbInst.conf.DBPath + dirEmpty, err := util.CreateDirIfMissing(dbPath) + if err != nil { + panic(fmt.Sprintf("Error while trying to open DB: %s", err)) + } + opts := gorocksdb.NewDefaultOptions() + defer opts.Destroy() + + opts.SetCreateIfMissing(dirEmpty) + opts.SetCreateIfMissingColumnFamilies(true) + + var cfOpts []*gorocksdb.Options + for range dbInst.conf.CFNames { + cfOpts = append(cfOpts, opts) + } + db, cfHandlers, err := gorocksdb.OpenDbColumnFamilies(opts, dbPath, dbInst.conf.CFNames, cfOpts) + if err != nil { + panic(fmt.Sprintf("Error opening DB: %s", err)) + } + dbInst.rocksDB = db + for i := 0; i < len(dbInst.conf.CFNames); i++ { + dbInst.cfHandlesMap[dbInst.conf.CFNames[i]] = cfHandlers[i] + } + dbInst.dbState = opened +} + +// Close releases all column family handles and closes rocksdb +func (dbInst *DB) Close() { + dbInst.mux.Lock() + if dbInst.dbState == closed { + dbInst.mux.Unlock() + return + } + + defer dbInst.mux.Unlock() + for _, cfHandler := range dbInst.cfHandlesMap { + cfHandler.Destroy() + } + dbInst.rocksDB.Close() + dbInst.dbState = closed +} + +func (dbInst *DB) isOpen() bool { + dbInst.mux.Lock() + defer dbInst.mux.Unlock() + return dbInst.dbState == opened +} + +// Get returns the value for the given column family and key +func (dbInst *DB) Get(cfHandle *gorocksdb.ColumnFamilyHandle, key []byte) ([]byte, error) { + opt := gorocksdb.NewDefaultReadOptions() + defer opt.Destroy() + slice, err := dbInst.rocksDB.GetCF(opt, cfHandle, key) + if err != nil { + fmt.Println("Error while trying to retrieve key:", key) + return nil, err + } + defer slice.Free() + if slice.Data() == nil { + return nil, nil + } + data := makeCopy(slice.Data()) + return data, nil +} + +// Put saves the key/value in the given column family +func (dbInst *DB) Put(cfHandle *gorocksdb.ColumnFamilyHandle, key []byte, value []byte) error { + opt := gorocksdb.NewDefaultWriteOptions() + defer opt.Destroy() + err := dbInst.rocksDB.PutCF(opt, cfHandle, key, value) + if err != nil { + fmt.Println("Error while trying to write key:", key) + return err + } + return nil +} + +// Delete delets the given key in the specified column family +func (dbInst *DB) Delete(cfHandle *gorocksdb.ColumnFamilyHandle, key []byte) error { + opt := gorocksdb.NewDefaultWriteOptions() + defer opt.Destroy() + err := dbInst.rocksDB.DeleteCF(opt, cfHandle, key) + if err != nil { + fmt.Println("Error while trying to delete key:", key) + return err + } + return nil +} + +// WriteBatch writes a batch +func (dbInst *DB) WriteBatch(batch *gorocksdb.WriteBatch) error { + opts := gorocksdb.NewDefaultWriteOptions() + defer opts.Destroy() + if err := dbInst.rocksDB.Write(opts, batch); err != nil { + return err + } + return nil +} + +// GetIterator returns an iterator for the given column family +func (dbInst *DB) GetIterator(cfName string) *gorocksdb.Iterator { + opt := gorocksdb.NewDefaultReadOptions() + opt.SetFillCache(true) + defer opt.Destroy() + return dbInst.rocksDB.NewIteratorCF(opt, dbInst.GetCFHandle(cfName)) +} + +// GetCFHandle returns handle to a named column family +func (dbInst *DB) GetCFHandle(cfName string) *gorocksdb.ColumnFamilyHandle { + return dbInst.cfHandlesMap[cfName] +} + +// GetDefaultCFHandle returns handle to default column family +func (dbInst *DB) GetDefaultCFHandle() *gorocksdb.ColumnFamilyHandle { + return dbInst.GetCFHandle(defaultCFName) +} + +func makeCopy(src []byte) []byte { + dest := make([]byte, len(src)) + copy(dest, src) + return dest +} diff --git a/core/ledgernext/util/db/db_test.go b/core/ledgernext/util/db/db_test.go new file mode 100644 index 00000000000..d0107149489 --- /dev/null +++ b/core/ledgernext/util/db/db_test.go @@ -0,0 +1,44 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package db + +import ( + "testing" + + "github.com/hyperledger/fabric/core/ledgernext/testutil" +) + +func TestDBBasicWriteAndReads(t *testing.T) { + dbConf := &Conf{"/tmp/v2/test/db", []string{"cf1", "cf2"}} + db := CreateDB(dbConf) + db.Open() + defer db.Close() + db.Put(db.GetCFHandle("cf1"), []byte("key1"), []byte("value1")) + db.Put(db.GetCFHandle("cf2"), []byte("key2"), []byte("value2")) + db.Put(db.GetDefaultCFHandle(), []byte("key3"), []byte("value3")) + val, err := db.Get(db.GetCFHandle("cf1"), []byte("key1")) + testutil.AssertNoError(t, err, "") + testutil.AssertEquals(t, val, []byte("value1")) + + val, err = db.Get(db.GetCFHandle("cf2"), []byte("key2")) + testutil.AssertNoError(t, err, "") + testutil.AssertEquals(t, val, []byte("value2")) + + val, err = db.Get(db.GetDefaultCFHandle(), []byte("key3")) + testutil.AssertNoError(t, err, "") + testutil.AssertEquals(t, val, []byte("value3")) +} diff --git a/core/ledgernext/util/ioutil.go b/core/ledgernext/util/ioutil.go new file mode 100644 index 00000000000..d60f69cebf3 --- /dev/null +++ b/core/ledgernext/util/ioutil.go @@ -0,0 +1,59 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "io" + "os" + "path" + "strings" + + "github.com/op/go-logging" +) + +var logger = logging.MustGetLogger("kvledger.util") + +// CreateDirIfMissing creates a dir for dirPath if not already exists. If the dir is empty it returns true +func CreateDirIfMissing(dirPath string) (bool, error) { + // if dirPath does not end with a path separator, it leaves out the last segment while creating directories + if !strings.HasSuffix(dirPath, "/") { + dirPath = dirPath + "/" + } + logger.Debugf("CreateDirIfMissing [%s]", dirPath) + err := os.MkdirAll(path.Dir(dirPath), 0755) + if err != nil { + logger.Debugf("Error while creating dir [%s]", dirPath) + return false, err + } + return DirEmpty(dirPath) +} + +// DirEmpty returns true if the dir at dirPath is empty +func DirEmpty(dirPath string) (bool, error) { + f, err := os.Open(dirPath) + if err != nil { + logger.Debugf("Error while opening dir [%s]: %s", dirPath, err) + return false, err + } + defer f.Close() + + _, err = f.Readdir(1) + if err == io.EOF { + return true, nil + } + return false, err +} diff --git a/core/ledgernext/util/util.go b/core/ledgernext/util/util.go new file mode 100644 index 00000000000..fdc768a7e2f --- /dev/null +++ b/core/ledgernext/util/util.go @@ -0,0 +1,62 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "encoding/binary" + "fmt" + + "github.com/golang/protobuf/proto" +) + +// EncodeOrderPreservingVarUint64 returns a byte-representation for a uint64 number such that +// all zero-bits starting bytes are trimmed in order to reduce the length of the array +// For preserving the order in a default bytes-comparison, first byte contains the number of remaining bytes. +// The presence of first byte also allows to use the returned bytes as part of other larger byte array such as a +// composite-key representation in db +func EncodeOrderPreservingVarUint64(number uint64) []byte { + bytes := make([]byte, 8) + binary.BigEndian.PutUint64(bytes, number) + startingIndex := 0 + size := 0 + for i, b := range bytes { + if b != 0x00 { + startingIndex = i + size = 8 - i + break + } + } + sizeBytes := proto.EncodeVarint(uint64(size)) + if len(sizeBytes) > 1 { + panic(fmt.Errorf("[]sizeBytes should not be more than one byte because the max number it needs to hold is 8. size=%d", size)) + } + encodedBytes := make([]byte, size+1) + encodedBytes[0] = sizeBytes[0] + copy(encodedBytes[1:], bytes[startingIndex:]) + return encodedBytes +} + +// DecodeOrderPreservingVarUint64 decodes the number from the bytes obtained from method 'EncodeOrderPreservingVarUint64'. +// Also, returns the number of bytes that are consumed in the process +func DecodeOrderPreservingVarUint64(bytes []byte) (uint64, int) { + s, _ := proto.DecodeVarint(bytes) + size := int(s) + decodedBytes := make([]byte, 8) + copy(decodedBytes[8-size:], bytes[1:size+1]) + numBytesConsumed := size + 1 + return binary.BigEndian.Uint64(decodedBytes), numBytesConsumed +} diff --git a/core/ledgernext/util/util_test.go b/core/ledgernext/util/util_test.go new file mode 100644 index 00000000000..ff71b835699 --- /dev/null +++ b/core/ledgernext/util/util_test.go @@ -0,0 +1,54 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "bytes" + "testing" +) + +func TestBasicEncodingDecoding(t *testing.T) { + for i := 0; i < 10000; i++ { + value := EncodeOrderPreservingVarUint64(uint64(i)) + nextValue := EncodeOrderPreservingVarUint64(uint64(i + 1)) + if !(bytes.Compare(value, nextValue) < 0) { + t.Fatalf("A smaller integer should result into smaller bytes. Encoded bytes for [%d] is [%x] and for [%d] is [%x]", + i, i+1, value, nextValue) + } + decodedValue, _ := DecodeOrderPreservingVarUint64(value) + if decodedValue != uint64(i) { + t.Fatalf("Value not same after decoding. Original value = [%d], decode value = [%d]", i, decodedValue) + } + } +} + +func TestDecodingAppendedValues(t *testing.T) { + appendedValues := []byte{} + for i := 0; i < 1000; i++ { + appendedValues = append(appendedValues, EncodeOrderPreservingVarUint64(uint64(i))...) + } + + len := 0 + value := uint64(0) + for i := 0; i < 1000; i++ { + appendedValues = appendedValues[len:] + value, len = DecodeOrderPreservingVarUint64(appendedValues) + if value != uint64(i) { + t.Fatalf("expected value = [%d], decode value = [%d]", i, value) + } + } +} diff --git a/protos/api.pb.go b/protos/api.pb.go index 0358e4d5fac..6fdedbf38a2 100644 --- a/protos/api.pb.go +++ b/protos/api.pb.go @@ -54,6 +54,8 @@ It has these top-level messages: ProposalResponse EndorsedAction Transaction2 + InvalidTransaction + Block2 Transaction TransactionBlock TransactionResult diff --git a/protos/fabric_next.pb.go b/protos/fabric_next.pb.go index 5117a4701f4..6754e4245f6 100644 --- a/protos/fabric_next.pb.go +++ b/protos/fabric_next.pb.go @@ -88,6 +88,26 @@ func (x Proposal_Type) String() string { return proto.EnumName(Proposal_Type_name, int32(x)) } +type InvalidTransaction_Cause int32 + +const ( + InvalidTransaction_TxIdAlreadyExists InvalidTransaction_Cause = 0 + InvalidTransaction_RWConflictDuringCommit InvalidTransaction_Cause = 1 +) + +var InvalidTransaction_Cause_name = map[int32]string{ + 0: "TxIdAlreadyExists", + 1: "RWConflictDuringCommit", +} +var InvalidTransaction_Cause_value = map[string]int32{ + "TxIdAlreadyExists": 0, + "RWConflictDuringCommit": 1, +} + +func (x InvalidTransaction_Cause) String() string { + return proto.EnumName(InvalidTransaction_Cause_name, int32(x)) +} + // Envelope is used to deliver a message type Envelope struct { // Signature of the message. @@ -306,9 +326,38 @@ func (m *Transaction2) GetEndorsedActions() []*EndorsedAction { return nil } +// This is used to wrap an invalid Transaction with the cause +type InvalidTransaction struct { + Transaction *Transaction2 `protobuf:"bytes,1,opt,name=transaction" json:"transaction,omitempty"` + Cause InvalidTransaction_Cause `protobuf:"varint,2,opt,name=cause,enum=protos.InvalidTransaction_Cause" json:"cause,omitempty"` +} + +func (m *InvalidTransaction) Reset() { *m = InvalidTransaction{} } +func (m *InvalidTransaction) String() string { return proto.CompactTextString(m) } +func (*InvalidTransaction) ProtoMessage() {} + +func (m *InvalidTransaction) GetTransaction() *Transaction2 { + if m != nil { + return m.Transaction + } + return nil +} + +// Block contains a list of transactions and the crypto hash of previous block +type Block2 struct { + PreviousBlockHash []byte `protobuf:"bytes,1,opt,name=PreviousBlockHash,proto3" json:"PreviousBlockHash,omitempty"` + // transactions are stored in serialized form so that the concenters can avoid marshaling of transactions + Transactions [][]byte `protobuf:"bytes,2,rep,name=Transactions,proto3" json:"Transactions,omitempty"` +} + +func (m *Block2) Reset() { *m = Block2{} } +func (m *Block2) String() string { return proto.CompactTextString(m) } +func (*Block2) ProtoMessage() {} + func init() { proto.RegisterEnum("protos.Message2_Type", Message2_Type_name, Message2_Type_value) proto.RegisterEnum("protos.Proposal_Type", Proposal_Type_name, Proposal_Type_value) + proto.RegisterEnum("protos.InvalidTransaction_Cause", InvalidTransaction_Cause_name, InvalidTransaction_Cause_value) } // Reference imports to suppress errors if they are not otherwise used. diff --git a/protos/fabric_next.proto b/protos/fabric_next.proto index 67ce845ade4..daec9506cb0 100644 --- a/protos/fabric_next.proto +++ b/protos/fabric_next.proto @@ -262,6 +262,23 @@ message Transaction2 { } +// This is used to wrap an invalid Transaction with the cause +message InvalidTransaction { + enum Cause { + TxIdAlreadyExists = 0; + RWConflictDuringCommit = 1; + } + Transaction2 transaction = 1; + Cause cause = 2; +} + +// Block contains a list of transactions and the crypto hash of previous block +message Block2 { + bytes PreviousBlockHash = 1; + // transactions are stored in serialized form so that the concenters can avoid marshaling of transactions + repeated bytes Transactions = 2; +} + service Endorser { rpc ProcessProposal(Proposal) returns (ProposalResponse) {} -} \ No newline at end of file +} diff --git a/protos/ser_block2.go b/protos/ser_block2.go new file mode 100644 index 00000000000..daede066542 --- /dev/null +++ b/protos/ser_block2.go @@ -0,0 +1,168 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package protos + +import ( + "encoding/binary" + + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/core/util" +) + +// SerBlock2 is responsible for serialized structure of block +type SerBlock2 struct { + txOffsets []int + blockHash []byte + blockBytes []byte +} + +// ConstructSerBlock2 constructs `SerBlock2` from `Block2` +func ConstructSerBlock2(block *Block2) (*SerBlock2, error) { + txOffsets := []int{} + buf := proto.NewBuffer(nil) + if err := buf.EncodeRawBytes(block.PreviousBlockHash); err != nil { + return nil, err + } + blockBytes := buf.Bytes() + numTxs := len(block.Transactions) + for i := 0; i < numTxs; i++ { + nextTxOffset := len(blockBytes) + txOffsets = append(txOffsets, nextTxOffset) + blockBytes = append(blockBytes, block.Transactions[i]...) + } + trailerOffset := len(blockBytes) + txOffsets = append(txOffsets, trailerOffset) + buf = proto.NewBuffer(blockBytes) + if err := buf.EncodeVarint(uint64(numTxs)); err != nil { + return nil, err + } + for i := 0; i < numTxs; i++ { + if err := buf.EncodeVarint(uint64(txOffsets[i])); err != nil { + return nil, err + } + } + logger.Debugf("ConstructSerBlock2():TxOffsets=%#v", txOffsets) + blockBytes = buf.Bytes() + lastBytes := intToBytes(uint32(trailerOffset)) + blockBytes = append(blockBytes, lastBytes...) + return &SerBlock2{txOffsets: txOffsets, blockBytes: blockBytes}, nil +} + +// NewSerBlock2 constructs `SerBlock2` from serialized block bytes +func NewSerBlock2(blockBytes []byte) *SerBlock2 { + return &SerBlock2{blockBytes: blockBytes} +} + +// GetBytes gives the serialized bytes +func (serBlock *SerBlock2) GetBytes() []byte { + return serBlock.blockBytes +} + +// ComputeHash computes the crypto-hash of block +func (serBlock *SerBlock2) ComputeHash() []byte { + if serBlock.blockHash == nil { + serBlock.blockHash = util.ComputeCryptoHash(serBlock.blockBytes) + } + return serBlock.blockHash +} + +// GetPreviousBlockHash retrieves PreviousBlockHash from serialized bytes +func (serBlock *SerBlock2) GetPreviousBlockHash() ([]byte, error) { + buf := proto.NewBuffer(serBlock.blockBytes) + return serBlock.extractPreviousBlockHash(buf) +} + +// GetTxOffsets retrieves transactions offsets from serialized bytes. +// Transactions are serialized in an continuous manner - e.g, +// second transaction starts immediately after first transaction ends. +// The last entry in the return value is an extra entry in order to indicate +// the length of the last transaction +func (serBlock *SerBlock2) GetTxOffsets() ([]int, error) { + if serBlock.txOffsets == nil { + var err error + buf := proto.NewBuffer(serBlock.blockBytes) + if _, err = serBlock.extractPreviousBlockHash(buf); err != nil { + return nil, err + } + serBlock.txOffsets, err = serBlock.extractTxOffsets() + return serBlock.txOffsets, err + } + return serBlock.txOffsets, nil +} + +// ToBlock2 reconstructs `Block2` from `serBlock` +func (serBlock *SerBlock2) ToBlock2() (*Block2, error) { + block := &Block2{} + buf := proto.NewBuffer(serBlock.blockBytes) + var err error + var previousBlockHash []byte + var txOffsets []int + if previousBlockHash, err = serBlock.extractPreviousBlockHash(buf); err != nil { + return nil, err + } + if txOffsets, err = serBlock.extractTxOffsets(); err != nil { + return nil, err + } + block.PreviousBlockHash = previousBlockHash + block.Transactions = make([][]byte, len(txOffsets)-1) + for i := 0; i < len(txOffsets)-1; i++ { + block.Transactions[i] = serBlock.blockBytes[txOffsets[i]:txOffsets[i+1]] + } + return block, nil +} + +func (serBlock *SerBlock2) extractPreviousBlockHash(buf *proto.Buffer) ([]byte, error) { + var err error + var previousBlockHash []byte + if previousBlockHash, err = buf.DecodeRawBytes(false); err != nil { + return nil, err + } + return previousBlockHash, err +} + +func (serBlock *SerBlock2) extractTxOffsets() ([]int, error) { + lastBytes := serBlock.blockBytes[len(serBlock.blockBytes)-4:] + trailerOffset := int(bytesToInt(lastBytes)) + trailerBytes := serBlock.blockBytes[trailerOffset:] + buf := proto.NewBuffer(trailerBytes) + txOffsets := []int{} + var err error + var numTxs uint64 + if numTxs, err = buf.DecodeVarint(); err != nil { + return nil, err + } + var nextTxOffset uint64 + for i := 0; i < int(numTxs); i++ { + if nextTxOffset, err = buf.DecodeVarint(); err != nil { + return nil, err + } + txOffsets = append(txOffsets, int(nextTxOffset)) + } + txOffsets = append(txOffsets, trailerOffset) + logger.Debugf("extractTxOffsets():TxOffsets=%#v", txOffsets) + return txOffsets, nil +} + +func intToBytes(i uint32) []byte { + bs := make([]byte, 4) + binary.LittleEndian.PutUint32(bs, i) + return bs +} + +func bytesToInt(b []byte) uint32 { + return binary.LittleEndian.Uint32(b) +} diff --git a/protos/ser_block2_test.go b/protos/ser_block2_test.go new file mode 100644 index 00000000000..41156247fab --- /dev/null +++ b/protos/ser_block2_test.go @@ -0,0 +1,65 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package protos + +import ( + "reflect" + "testing" + + "github.com/golang/protobuf/proto" +) + +func TestSerBlock2(t *testing.T) { + tx1 := &Transaction2{} + tx1.EndorsedActions = []*EndorsedAction{ + &EndorsedAction{[]byte("action1"), []*Endorsement{&Endorsement{[]byte("signature1")}}, []byte("proposal1")}, + &EndorsedAction{[]byte("action2"), []*Endorsement{&Endorsement{[]byte("signature1")}, &Endorsement{[]byte("signature2")}}, []byte("proposal1")}} + + tx1Bytes, err := proto.Marshal(tx1) + if err != nil { + t.Fatalf("Error:%s", err) + } + + tx2 := &Transaction2{} + tx2.EndorsedActions = []*EndorsedAction{ + &EndorsedAction{[]byte("action1"), []*Endorsement{&Endorsement{[]byte("signature1")}}, []byte("proposal1")}, + &EndorsedAction{[]byte("action2"), []*Endorsement{&Endorsement{[]byte("signature2")}}, []byte("proposal1")}} + + tx2Bytes, err := proto.Marshal(tx2) + if err != nil { + t.Fatalf("Error:%s", err) + } + + block := &Block2{} + block.PreviousBlockHash = []byte("PreviousBlockHash") + block.Transactions = [][]byte{tx1Bytes, tx2Bytes} + testSerBlock2(t, block) +} + +func testSerBlock2(t *testing.T, block *Block2) { + serBlock, err := ConstructSerBlock2(block) + if err != nil { + t.Fatalf("Error:%s", err) + } + serDeBlock, err := serBlock.ToBlock2() + if err != nil { + t.Fatalf("Error:%s", err) + } + if !reflect.DeepEqual(block, serDeBlock) { + t.Fatalf("Block is not same after serialization-deserialization. \n\t Expected=%#v, \n\t Actual=%#v", block, serDeBlock) + } +}