Skip to content

Commit

Permalink
*: store application long along with tx/block
Browse files Browse the repository at this point in the history
Two times less keys inserted into the DB per tx leads to ~13% TPS
improvement. We also drop one goroutine with it which isn't bad as well.
  • Loading branch information
roman-khimov committed Dec 8, 2021
1 parent f87c595 commit 0fd95e1
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 159 deletions.
5 changes: 2 additions & 3 deletions pkg/core/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,18 @@ func (b *Block) RebuildMerkleRoot() {
b.MerkleRoot = b.ComputeMerkleRoot()
}

// NewBlockFromTrimmedBytes returns a new block from trimmed data.
// NewTrimmedFromReader returns a new block from trimmed data.
// This is commonly used to create a block from stored data.
// Blocks created from trimmed data will have their Trimmed field
// set to true.
func NewBlockFromTrimmedBytes(stateRootEnabled bool, b []byte) (*Block, error) {
func NewTrimmedFromReader(stateRootEnabled bool, br *io.BinReader) (*Block, error) {
block := &Block{
Header: Header{
StateRootEnabled: stateRootEnabled,
},
Trimmed: true,
}

br := io.NewBinReaderFromBuf(b)
block.Header.DecodeBinary(br)
lenHashes := br.ReadVarUint()
if lenHashes > MaxTransactionsPerBlock {
Expand Down
3 changes: 2 additions & 1 deletion pkg/core/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func TestTrimmedBlock(t *testing.T) {
b, err := block.Trim()
require.NoError(t, err)

trimmedBlock, err := NewBlockFromTrimmedBytes(false, b)
r := io.NewBinReaderFromBuf(b)
trimmedBlock, err := NewTrimmedFromReader(false, r)
require.NoError(t, err)

assert.True(t, trimmedBlock.Trimmed)
Expand Down
74 changes: 22 additions & 52 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,13 +883,14 @@ func (bc *Blockchain) addHeaders(verify bool, headers ...*block.Header) error {
continue
}
bc.headerHashes = append(bc.headerHashes, h.Hash())
buf.WriteB(storage.ExecBlock)
h.EncodeBinary(buf.BinWriter)
buf.BinWriter.WriteB(0)
if buf.Err != nil {
return buf.Err
}

key := storage.AppendPrefix(storage.DataBlock, h.Hash().BytesBE())
key := storage.AppendPrefix(storage.DataExecutable, h.Hash().BytesBE())
batch.Put(key, buf.Bytes())
buf.Reset()
lastHeader = h
Expand Down Expand Up @@ -936,38 +937,25 @@ func (bc *Blockchain) GetStateSyncModule() blockchainer.StateSync {
func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error {
var (
cache = bc.dao.GetWrapped()
blockCache = bc.dao.GetWrapped()
aerCache = bc.dao.GetWrapped()
appExecResults = make([]*state.AppExecResult, 0, 2+len(block.Transactions))
aerchan = make(chan *state.AppExecResult, len(block.Transactions)/8) // Tested 8 and 4 with no practical difference, but feel free to test more and tune.
aerdone = make(chan error)
blockdone = make(chan error)
)
go func() {
var (
kvcache = blockCache
writeBuf = io.NewBufBinWriter()
kvcache = aerCache
writeBuf = io.NewBufBinWriter()
err error
txCnt int
baer1, baer2 *state.AppExecResult
transCache = make(map[util.Uint160]transferData)
)
if err := kvcache.StoreAsBlock(block, writeBuf); err != nil {
blockdone <- err
return
}
writeBuf.Reset()

if err := kvcache.StoreAsCurrentBlock(block, writeBuf); err != nil {
blockdone <- err
aerdone <- err
return
}
writeBuf.Reset()

for _, tx := range block.Transactions {
if err := kvcache.StoreAsTransaction(tx, block.Index, writeBuf); err != nil {
blockdone <- err
return
}

writeBuf.Reset()
}
if bc.config.RemoveUntraceableBlocks {
var start, stop uint32
if bc.config.P2PStateExchangeExtensions {
Expand All @@ -994,24 +982,16 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
writeBuf.Reset()
}
}
close(blockdone)
}()
go func() {
var (
kvcache = aerCache
writeBuf = io.NewBufBinWriter()
err error
appendBlock bool
transCache = make(map[util.Uint160]transferData)
)
for aer := range aerchan {
if aer.Container == block.Hash() && appendBlock {
err = kvcache.AppendAppExecResult(aer, writeBuf)
} else {
err = kvcache.PutAppExecResult(aer, writeBuf)
if aer.Container == block.Hash() {
appendBlock = true
if aer.Container == block.Hash() {
if baer1 == nil {
baer1 = aer
} else {
baer2 = aer
}
} else {
err = kvcache.StoreAsTransaction(block.Transactions[txCnt], block.Index, aer, writeBuf)
txCnt++
}
if err != nil {
err = fmt.Errorf("failed to store exec result: %w", err)
Expand All @@ -1028,6 +1008,11 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
aerdone <- err
return
}
if err := kvcache.StoreAsBlock(block, baer1, baer2, writeBuf); err != nil {
aerdone <- err
return
}
writeBuf.Reset()
for acc, trData := range transCache {
err = kvcache.PutTokenTransferInfo(acc, &trData.Info)
if err != nil {
Expand Down Expand Up @@ -1055,7 +1040,6 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
if err != nil {
// Release goroutines, don't care about errors, we already have one.
close(aerchan)
<-blockdone
<-aerdone
return fmt.Errorf("onPersist failed: %w", err)
}
Expand All @@ -1077,7 +1061,6 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
if err != nil {
// Release goroutines, don't care about errors, we already have one.
close(aerchan)
<-blockdone
<-aerdone
return fmt.Errorf("failed to persist invocation results: %w", err)
}
Expand Down Expand Up @@ -1107,7 +1090,6 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
if err != nil {
// Release goroutines, don't care about errors, we already have one.
close(aerchan)
<-blockdone
<-aerdone
return fmt.Errorf("postPersist failed: %w", err)
}
Expand All @@ -1119,7 +1101,6 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
mpt, sr, err := bc.stateRoot.AddMPTBatch(block.Index, b, d.Store)
if err != nil {
// Release goroutines, don't care about errors, we already have one.
<-blockdone
<-aerdone
// Here MPT can be left in a half-applied state.
// However if this error occurs, this is a bug somewhere in code
Expand All @@ -1135,7 +1116,6 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
}
if err != nil {
// Release goroutines, don't care about errors, we already have one.
<-blockdone
<-aerdone
return err
}
Expand All @@ -1152,22 +1132,12 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
mpt.Collapse(10)
}

// Wait for _both_ goroutines to finish.
blockerr := <-blockdone
aererr := <-aerdone
if blockerr != nil {
return blockerr
}
if aererr != nil {
return aererr
}

bc.lock.Lock()
_, err = blockCache.Persist()
if err != nil {
bc.lock.Unlock()
return err
}
_, err = aerCache.Persist()
if err != nil {
bc.lock.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestAddBlock(t *testing.T) {
require.NoError(t, err)

for _, block := range blocks {
key := storage.AppendPrefix(storage.DataBlock, block.Hash().BytesBE())
key := storage.AppendPrefix(storage.DataExecutable, block.Hash().BytesBE())
_, err := bc.dao.Store.Get(key)
require.NoErrorf(t, err, "block %s not persisted", block.Hash())
}
Expand Down Expand Up @@ -805,7 +805,7 @@ func TestVerifyTx(t *testing.T) {
},
},
}
require.NoError(t, bc.dao.StoreAsTransaction(conflicting, bc.blockHeight, nil))
require.NoError(t, bc.dao.StoreAsTransaction(conflicting, bc.blockHeight, nil, nil))
require.True(t, errors.Is(bc.VerifyTx(tx), ErrHasConflicts))
})
t.Run("attribute on-chain conflict", func(t *testing.T) {
Expand Down
Loading

0 comments on commit 0fd95e1

Please sign in to comment.