Skip to content

Commit

Permalink
Change blockchainIndexer interface
Browse files Browse the repository at this point in the history
It would be cleaner to have a single interface method createIndexes(...)
and let the two implementations (Sync and Async) deal with the inherent differences
in implementation than require that the caller choose the correct method.

Change-Id: Ia040c5aeee2c5fdd1b05cc6f7729931aa6d7d548
Signed-off-by: grapebaba <281165273@qq.com>
  • Loading branch information
GrapeBaBa committed Aug 22, 2016
1 parent 8ea25a9 commit 5502704
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 21 deletions.
10 changes: 6 additions & 4 deletions core/ledger/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (blockchain *blockchain) addPersistenceChangesForNewBlock(ctx context.Conte
writeBatch.PutCF(db.GetDBHandle().BlockchainCF, encodeBlockNumberDBKey(blockNumber), blockBytes)
writeBatch.PutCF(db.GetDBHandle().BlockchainCF, blockCountKey, encodeUint64(blockNumber+1))
if blockchain.indexer.isSynchronous() {
blockchain.indexer.createIndexesSync(block, blockNumber, blockHash, writeBatch)
blockchain.indexer.createIndexes(block, blockNumber, blockHash, writeBatch)
}
blockchain.lastProcessedBlock = &lastProcessedBlock{block, blockNumber, blockHash}
return blockNumber, nil
Expand All @@ -218,8 +218,10 @@ func (blockchain *blockchain) blockPersistenceStatus(success bool) {
blockchain.size++
blockchain.previousBlockHash = blockchain.lastProcessedBlock.blockHash
if !blockchain.indexer.isSynchronous() {
blockchain.indexer.createIndexesAsync(blockchain.lastProcessedBlock.block,
blockchain.lastProcessedBlock.blockNumber, blockchain.lastProcessedBlock.blockHash)
writeBatch := gorocksdb.NewWriteBatch()
defer writeBatch.Destroy()
blockchain.indexer.createIndexes(blockchain.lastProcessedBlock.block,
blockchain.lastProcessedBlock.blockNumber, blockchain.lastProcessedBlock.blockHash, writeBatch)
}
}
blockchain.lastProcessedBlock = nil
Expand Down Expand Up @@ -249,7 +251,7 @@ func (blockchain *blockchain) persistRawBlock(block *protos.Block, blockNumber u
}

if blockchain.indexer.isSynchronous() {
blockchain.indexer.createIndexesSync(block, blockNumber, blockHash, writeBatch)
blockchain.indexer.createIndexes(block, blockNumber, blockHash, writeBatch)
}

opt := gorocksdb.NewDefaultWriteOptions()
Expand Down
9 changes: 2 additions & 7 deletions core/ledger/blockchain_indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ var prefixAddressBlockNumCompositeKey = byte(3)
type blockchainIndexer interface {
isSynchronous() bool
start(blockchain *blockchain) error
createIndexesSync(block *protos.Block, blockNumber uint64, blockHash []byte, writeBatch *gorocksdb.WriteBatch) error
createIndexesAsync(block *protos.Block, blockNumber uint64, blockHash []byte) error
createIndexes(block *protos.Block, blockNumber uint64, blockHash []byte, writeBatch *gorocksdb.WriteBatch) error
fetchBlockNumberByBlockHash(blockHash []byte) (uint64, error)
fetchTransactionIndexByID(txID string) (uint64, uint64, error)
stop()
Expand All @@ -57,15 +56,11 @@ func (indexer *blockchainIndexerSync) start(blockchain *blockchain) error {
return nil
}

func (indexer *blockchainIndexerSync) createIndexesSync(
func (indexer *blockchainIndexerSync) createIndexes(
block *protos.Block, blockNumber uint64, blockHash []byte, writeBatch *gorocksdb.WriteBatch) error {
return addIndexDataForPersistence(block, blockNumber, blockHash, writeBatch)
}

func (indexer *blockchainIndexerSync) createIndexesAsync(block *protos.Block, blockNumber uint64, blockHash []byte) error {
return fmt.Errorf("Method not applicable")
}

func (indexer *blockchainIndexerSync) fetchBlockNumberByBlockHash(blockHash []byte) (uint64, error) {
return fetchBlockNumberByBlockHashFromDB(blockHash)
}
Expand Down
7 changes: 1 addition & 6 deletions core/ledger/blockchain_indexes_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,7 @@ func (indexer *blockchainIndexerAsync) start(blockchain *blockchain) error {
return nil
}

func (indexer *blockchainIndexerAsync) createIndexesSync(
block *protos.Block, blockNumber uint64, blockHash []byte, writeBatch *gorocksdb.WriteBatch) error {
return fmt.Errorf("Method not applicable")
}

func (indexer *blockchainIndexerAsync) createIndexesAsync(block *protos.Block, blockNumber uint64, blockHash []byte) error {
func (indexer *blockchainIndexerAsync) createIndexes(block *protos.Block, blockNumber uint64, blockHash []byte, writeBatch *gorocksdb.WriteBatch) error {
indexer.blockChan <- blockWrapper{block, blockNumber, blockHash, false}
return nil
}
Expand Down
5 changes: 1 addition & 4 deletions core/ledger/blockchain_indexes_async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,7 @@ func (noop *NoopIndexer) isSynchronous() bool {
func (noop *NoopIndexer) start(blockchain *blockchain) error {
return nil
}
func (noop *NoopIndexer) createIndexesSync(block *protos.Block, blockNumber uint64, blockHash []byte, writeBatch *gorocksdb.WriteBatch) error {
return nil
}
func (noop *NoopIndexer) createIndexesAsync(block *protos.Block, blockNumber uint64, blockHash []byte) error {
func (noop *NoopIndexer) createIndexes(block *protos.Block, blockNumber uint64, blockHash []byte, writeBatch *gorocksdb.WriteBatch) error {
return nil
}
func (noop *NoopIndexer) fetchBlockNumberByBlockHash(blockHash []byte) (uint64, error) {
Expand Down

0 comments on commit 5502704

Please sign in to comment.