Skip to content

Commit

Permalink
[ibft] Get txpool transactions all at once when building block (#176)
Browse files Browse the repository at this point in the history
# Description

The `txpool` prepare, push and pop transactions to hundreds of thousands
heaps, it would be bad performance when the active accounts are huge
amount (e.g. 100 kilo).

Besides, it is a bad pattern that the `ibft` module knew too much
details of the `txpool`.

The idea was heavily inspired by the
[go-ethereum](https://github.com/ethereum/go-ethereum) and
[harmony](https://github.com/harmony-one/harmony).

The PR :
* Removes the 'demote' flags of the `server` command.
  * Whose function causes transactions to be discarded irreversibly.
* Reduces the interface exposure of `txpool`.
  * Supply more capsuled interface.
* Get transactions only once when building block in `ibft` module.
    * Use only one heap, maximum its performance.
    * The heap is sorted in price, and received time.
    * The transaction queues are nonce sorted in slice.
* Would not insert new transactions, which is unnecessary when the block
generation time is very short.

# Changes include

- [x] Bugfix (non-breaking change that solves an issue)
- [ ] Hotfix (change that solves an urgent issue, and requires immediate
attention)
- [x] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (change that is not backwards-compatible and/or
changes current functionality)

# Checklist

- [x] I have assigned this PR to myself
- [x] I have added at least 1 reviewer
- [x] I have added the relevant labels
- [ ] I have updated the official documentation
- [x] I have added sufficient documentation in code

## Testing

- [x] I have tested this code with the official test suite
- [ ] I have tested this code manually

### Manual tests

#### 1 node pprof test

* Set up a pos mode server with `pprof` flag set.
* Send huge amount transactions of different accounts using some tool.
* Check cpu usage and heatmap.

The node should perform OK in this branch, and yields bad performance in
the target branch.

#### 4 nodes network test

Not done yet.

# Documentation update

Will update documentation once it merged.
  • Loading branch information
DarianShawn authored Oct 24, 2022
1 parent 999ba6f commit 5e07351
Show file tree
Hide file tree
Showing 9 changed files with 449 additions and 206 deletions.
53 changes: 33 additions & 20 deletions consensus/dev/dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,61 +108,74 @@ type transitionInterface interface {
}

func (d *Dev) writeTransactions(gasLimit uint64, transition transitionInterface) []*types.Transaction {
var successful []*types.Transaction
var includedTxs []*types.Transaction

d.txpool.Prepare()
// get all pending transactions once and for all
pendingTxs := d.txpool.Pending()
// get highest price transaction queue
priceTxs := types.NewTransactionsByPriceAndNonce(pendingTxs)

for {
tx := d.txpool.Pop()
tx := priceTxs.Peek()
if tx == nil {
d.logger.Debug("no more transactions")

break
}

if tx.ExceedsBlockGasLimit(gasLimit) {
// The address is punished. For current loop, it would not include its transactions any more.
d.txpool.Drop(tx)
priceTxs.Pop()

continue
}

if err := transition.Write(tx); err != nil {
d.logger.Debug("write transaction failed", "hash", tx.Hash, "from", tx.From,
"nonce", tx.Nonce, "err", err)

//nolint:errorlint
if _, ok := err.(*state.GasLimitReachedTransitionApplicationError); ok {
// Ignore those out-of-gas transaction whose gas limit too large
} else if _, ok := err.(*state.AllGasUsedError); ok {
if _, ok := err.(*state.AllGasUsedError); ok {
// no more transaction could be packed
d.logger.Debug("Not enough gas for further transactions")

break
} else if _, ok := err.(*state.GasLimitReachedTransitionApplicationError); ok {
// Ignore transaction when the free gas not enough
d.logger.Debug("Gas limit exceeded for current block", "from", tx.From)
priceTxs.Pop()
} else if nonceErr, ok := err.(*state.NonceTooLowError); ok {
// lower nonce tx, demote all promotable transactions
// low nonce tx, should reset accounts once done
d.logger.Warn("write transaction nonce too low",
"hash", tx.Hash, "from", tx.From, "nonce", tx.Nonce)
// skip the address, whose txs should be reset first.
d.txpool.DemoteAllPromoted(tx, nonceErr.CorrectNonce)
d.logger.Error("write transaction nonce too low", "hash", tx.Hash, "from", tx.From,
"nonce", tx.Nonce, "err", err)
priceTxs.Pop()
} else if nonceErr, ok := err.(*state.NonceTooHighError); ok {
// higher nonce tx, demote all promotable transactions
// high nonce tx, should reset accounts once done
d.logger.Error("write miss some transactions with higher nonce",
tx.Hash, "from", tx.From, "nonce", tx.Nonce)
d.txpool.DemoteAllPromoted(tx, nonceErr.CorrectNonce)
d.logger.Error("write miss some transactions with higher nonce", tx.Hash, "from", tx.From,
"nonce", tx.Nonce, "err", err)
priceTxs.Pop()
} else {
// no matter what kind of failure, drop is reasonable for not executed it yet
d.logger.Debug("write not executed transaction failed",
"hash", tx.Hash, "from", tx.From,
"nonce", tx.Nonce, "err", err)
d.txpool.Drop(tx)
priceTxs.Pop()
}

continue
}

// no errors, pop the tx from the pool
d.txpool.RemoveExecuted(tx)
// no errors, go on
priceTxs.Shift()

successful = append(successful, tx)
includedTxs = append(includedTxs, tx)
}

d.logger.Info("picked out txns from pool", "num", len(successful), "remaining", d.txpool.Length())
d.logger.Info("picked out txns from pool", "num", len(includedTxs))

return successful
return includedTxs
}

// writeNewBLock generates a new block based on transactions from the pool,
Expand Down
130 changes: 81 additions & 49 deletions consensus/ibft/ibft.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,10 @@ type blockchainInterface interface {
}

type txPoolInterface interface {
Prepare()
Length() uint64
Pop() *types.Transaction
RemoveExecuted(tx *types.Transaction)
Drop(tx *types.Transaction)
DemoteAllPromoted(tx *types.Transaction, correctNonce uint64)
ResetWithHeaders(headers ...*types.Header)
Pending() map[types.Address][]*types.Transaction
}

type syncerInterface interface {
Expand Down Expand Up @@ -627,9 +624,14 @@ func (i *Ibft) buildBlock(snap *Snapshot, parent *types.Header) (*types.Block, e
}
// If the mechanism is PoS -> build a regular block if it's not an end-of-epoch block
// If the mechanism is PoA -> always build a regular block, regardless of epoch
txns := []*types.Transaction{}
var (
txs []*types.Transaction
dropTxs []*types.Transaction
resetTxs []*demoteTransaction
)

if i.shouldWriteTransactions(header.Number) {
txns = i.writeTransactions(gasLimit, transition)
txs, dropTxs, resetTxs = i.writeTransactions(gasLimit, transition)
}

if err := i.PreStateCommit(header, transition); err != nil {
Expand All @@ -652,7 +654,7 @@ func (i *Ibft) buildBlock(snap *Snapshot, parent *types.Header) (*types.Block, e
// build the block
block := consensus.BuildBlock(consensus.BuildBlockParams{
Header: header,
Txns: txns,
Txns: txs,
Receipts: transition.Receipts(),
})

Expand All @@ -668,7 +670,22 @@ func (i *Ibft) buildBlock(snap *Snapshot, parent *types.Header) (*types.Block, e
// is sealed after all the committed seals
block.Header.ComputeHash()

i.logger.Info("build block", "number", header.Number, "txns", len(txns))
// TODO: remove these logic. ibft should not manipulate the txpool status.
// drop account txs first
for _, tx := range dropTxs {
i.txpool.Drop(tx)
}
// demote account txs
for _, tx := range resetTxs {
i.txpool.DemoteAllPromoted(tx.Tx, tx.CorrectNonce)
}

i.logger.Info("build block",
"number", header.Number,
"txs", len(txs),
"dropTxs", len(dropTxs),
"resetTxs", len(resetTxs),
)

return block, nil
}
Expand All @@ -678,82 +695,97 @@ type transitionInterface interface {
WriteFailedReceipt(txn *types.Transaction) error
}

type demoteTransaction struct {
Tx *types.Transaction
CorrectNonce uint64
}

// writeTransactions writes transactions from the txpool to the transition object
// and returns transactions that were included in the transition (new block)
func (i *Ibft) writeTransactions(gasLimit uint64, transition transitionInterface) []*types.Transaction {
var transactions []*types.Transaction

successTxCount := 0
failedTxCount := 0

i.txpool.Prepare()
func (i *Ibft) writeTransactions(
gasLimit uint64,
transition transitionInterface,
) (
includedTransactions []*types.Transaction,
shouldDropTxs []*types.Transaction,
shouldDemoteTxs []*demoteTransaction,
) {
// get all pending transactions once and for all
pendingTxs := i.txpool.Pending()
// get highest price transaction queue
priceTxs := types.NewTransactionsByPriceAndNonce(pendingTxs)

for {
tx := i.txpool.Pop()
tx := priceTxs.Peek()
if tx == nil {
i.logger.Debug("no more transactions")

break
}

if tx.ExceedsBlockGasLimit(gasLimit) {
copyTx := tx.Copy()
failedTxCount++
// The address is punished. For current validator, it would not include its transactions any more.
i.txpool.Drop(tx)
// the account transactions should be dropped
shouldDropTxs = append(shouldDropTxs, tx)
// The address is punished. For current loop, it would not include its transactions any more.
priceTxs.Pop()
// write failed receipts
if err := transition.WriteFailedReceipt(tx); err != nil {
continue
i.logger.Error("write receipt failed", "err", err)
}

transactions = append(transactions, copyTx)

continue
}

if err := transition.Write(tx); err != nil {
i.logger.Debug("write transaction failed", "hash", tx.Hash, "from", tx.From,
"nonce", tx.Nonce, "err", err)

//nolint:errorlint
if _, ok := err.(*state.GasLimitReachedTransitionApplicationError); ok {
// Ignore transaction when the free gas not enough
} else if _, ok := err.(*state.AllGasUsedError); ok {
if _, ok := err.(*state.AllGasUsedError); ok {
// no more transaction could be packed
i.logger.Debug("Not enough gas for further transactions")

break
} else if _, ok := err.(*state.GasLimitReachedTransitionApplicationError); ok {
// Ignore transaction when the free gas not enough
i.logger.Debug("Gas limit exceeded for current block", "from", tx.From)
priceTxs.Pop()
} else if nonceErr, ok := err.(*state.NonceTooLowError); ok {
// low nonce tx, demote all promotable transactions
failedTxCount++
i.txpool.DemoteAllPromoted(tx, nonceErr.CorrectNonce)
i.logger.Error("write transaction nonce too low", "hash", tx.Hash, "from", tx.From,
"nonce", tx.Nonce, "err", err)
// low nonce tx, should reset accounts once done
i.logger.Warn("write transaction nonce too low",
"hash", tx.Hash, "from", tx.From, "nonce", tx.Nonce)
// skip the address, whose txs should be reset first.
shouldDemoteTxs = append(shouldDemoteTxs, &demoteTransaction{tx, nonceErr.CorrectNonce})
// priceTxs.Shift()
priceTxs.Pop()
} else if nonceErr, ok := err.(*state.NonceTooHighError); ok {
// high nonce tx, demote all promotable transactions
failedTxCount++
i.txpool.DemoteAllPromoted(tx, nonceErr.CorrectNonce)
i.logger.Error("write miss some transactions with higher nonce", tx.Hash, "from", tx.From,
"nonce", tx.Nonce, "err", err)
// high nonce tx, should reset accounts once done
i.logger.Error("write miss some transactions with higher nonce",
tx.Hash, "from", tx.From, "nonce", tx.Nonce)
shouldDemoteTxs = append(shouldDemoteTxs, &demoteTransaction{tx, nonceErr.CorrectNonce})
priceTxs.Pop()
} else {
failedTxCount++
// no matter what kind of failure, drop is reasonable for not executed it yet
i.txpool.Drop(tx)
i.logger.Debug("write not executed transaction failed",
"hash", tx.Hash, "from", tx.From,
"nonce", tx.Nonce, "err", err)
shouldDropTxs = append(shouldDropTxs, tx)
priceTxs.Pop()
}

continue
}

// no errors, remove the tx from the pool
i.txpool.RemoveExecuted(tx)
// no errors, go on
priceTxs.Shift()

successTxCount++

transactions = append(transactions, tx)
includedTransactions = append(includedTransactions, tx)
}

//nolint:lll
i.logger.Info("executed txns", "failed ", failedTxCount, "successful", successTxCount, "remaining in pool", i.txpool.Length())
i.logger.Info("executed txns",
"successful", len(includedTransactions),
"shouldDropTxs", len(shouldDropTxs),
"shouldDemoteTxs", len(shouldDemoteTxs),
)

return transactions
return
}

// runAcceptState runs the Accept state loop
Expand Down
Loading

0 comments on commit 5e07351

Please sign in to comment.