Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EVM txs eviction logic #204

Merged
merged 1 commit into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ pending-size = {{ .Mempool.PendingSize }}

max-pending-txs-bytes = {{ .Mempool.MaxPendingTxsBytes }}

pending-ttl-duration = {{ .Mempool.PendingTTLDuration }}
pending-ttl-duration = "{{ .Mempool.PendingTTLDuration }}"

pending-ttl-num-blocks = {{ .Mempool.PendingTTLNumBlocks }}

Expand Down
36 changes: 26 additions & 10 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func NewTxMempool(
timestampIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
return wtx1.timestamp.After(wtx2.timestamp) || wtx1.timestamp.Equal(wtx2.timestamp)
}),
pendingTxs: NewPendingTxs(),
pendingTxs: NewPendingTxs(cfg),
failedCheckTxCounts: map[types.NodeID]uint64{},
peerManager: peerManager,
}
Expand Down Expand Up @@ -340,7 +340,9 @@ func (txmp *TxMempool) CheckTx(
return err
}
atomic.AddInt64(&txmp.pendingSizeBytes, int64(wtx.Size()))
txmp.pendingTxs.Insert(wtx, res, txInfo)
if err := txmp.pendingTxs.Insert(wtx, res, txInfo); err != nil {
return err
}
}
}

Expand All @@ -362,7 +364,7 @@ func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error {

// remove the committed transaction from the transaction store and indexes
if wtx := txmp.txStore.GetTxByHash(txKey); wtx != nil {
txmp.removeTx(wtx, false)
txmp.removeTx(wtx, false, true)
return nil
}

Expand Down Expand Up @@ -401,7 +403,7 @@ func (txmp *TxMempool) Flush() {
txmp.timestampIndex.Reset()

for _, wtx := range txmp.txStore.GetAllTxs() {
txmp.removeTx(wtx, false)
txmp.removeTx(wtx, false, false)
}

atomic.SwapInt64(&txmp.sizeBytes, 0)
Expand Down Expand Up @@ -513,7 +515,7 @@ func (txmp *TxMempool) Update(

// remove the committed transaction from the transaction store and indexes
if wtx := txmp.txStore.GetTxByHash(tx.Key()); wtx != nil {
txmp.removeTx(wtx, false)
txmp.removeTx(wtx, false, false)
}
}

Expand Down Expand Up @@ -634,7 +636,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck
// - The transaction, toEvict, can be removed while a concurrent
// reCheckTx callback is being executed for the same transaction.
for _, toEvict := range evictTxs {
txmp.removeTx(toEvict, true)
txmp.removeTx(toEvict, true, true)
txmp.logger.Debug(
"evicted existing good transaction; mempool full",
"old_tx", fmt.Sprintf("%X", toEvict.tx.Hash()),
Expand Down Expand Up @@ -745,7 +747,7 @@ func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckT
panic("corrupted reCheckTx cursor")
}

txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache)
txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache, true)
}
}

Expand Down Expand Up @@ -871,13 +873,13 @@ func (txmp *TxMempool) insertTx(wtx *WrappedTx) bool {
return true
}

func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) {
func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool, shouldReenqueue bool) {
if txmp.txStore.IsTxRemoved(wtx.hash) {
return
}

txmp.txStore.RemoveTx(wtx)
txmp.priorityIndex.RemoveTx(wtx)
toBeReenqueued := txmp.priorityIndex.RemoveTx(wtx, shouldReenqueue)
txmp.heightIndex.Remove(wtx)
txmp.timestampIndex.Remove(wtx)

Expand All @@ -889,6 +891,20 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) {
atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size()))

wtx.removeHandler(removeFromCache)

if shouldReenqueue {
for _, reenqueue := range toBeReenqueued {
txmp.removeTx(reenqueue, removeFromCache, false)
}
for _, reenqueue := range toBeReenqueued {
rtx := reenqueue.tx
go func() {
if err := txmp.CheckTx(context.Background(), rtx, nil, TxInfo{}); err != nil {
txmp.logger.Error(fmt.Sprintf("failed to reenqueue transaction %X due to %s", rtx.Hash(), err))
}
}()
}
}
}

func (txmp *TxMempool) expire(blockHeight int64, wtx *WrappedTx) {
Expand Down Expand Up @@ -967,7 +983,7 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
}

// remove pending txs that have expired
txmp.pendingTxs.PurgeExpired(txmp.config.PendingTTLNumBlocks, blockHeight, txmp.config.PendingTTLDuration, now, func(wtx *WrappedTx) {
txmp.pendingTxs.PurgeExpired(blockHeight, now, func(wtx *WrappedTx) {
atomic.AddInt64(&txmp.pendingSizeBytes, int64(-wtx.Size()))
txmp.expire(blockHeight, wtx)
})
Expand Down
127 changes: 121 additions & 6 deletions internal/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
// transaction priority based on the value in the key/value pair.
type application struct {
*kvstore.Application

occupiedNonces map[string][]uint64
}

type testTx struct {
Expand All @@ -38,6 +40,7 @@ type testTx struct {
}

func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTxV2, error) {

var (
priority int64
sender string
Expand All @@ -58,7 +61,7 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a
GasWanted: 1,
}}, nil
}
nonce, err := strconv.ParseInt(string(parts[3]), 10, 64)
nonce, err := strconv.ParseUint(string(parts[3]), 10, 64)
if err != nil {
// could not parse
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{
Expand All @@ -67,15 +70,50 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a
GasWanted: 1,
}}, nil
}
if app.occupiedNonces == nil {
app.occupiedNonces = make(map[string][]uint64)
}
if _, exists := app.occupiedNonces[account]; !exists {
app.occupiedNonces[account] = []uint64{}
}
active := true
for i := uint64(0); i < nonce; i++ {
found := false
for _, occ := range app.occupiedNonces[account] {
if occ == i {
found = true
break
}
}
if !found {
active = false
break
}
}
app.occupiedNonces[account] = append(app.occupiedNonces[account], nonce)
return &abci.ResponseCheckTxV2{
ResponseCheckTx: &abci.ResponseCheckTx{
Priority: v,
Code: code.CodeTypeOK,
GasWanted: 1,
},
EVMNonce: uint64(nonce),
EVMSenderAddress: account,
IsEVM: true,
EVMNonce: nonce,
EVMSenderAddress: account,
IsEVM: true,
IsPendingTransaction: !active,
Checker: func() abci.PendingTxCheckerResponse { return abci.Pending },
ExpireTxHandler: func() {
idx := -1
for i, n := range app.occupiedNonces[account] {
if n == nonce {
idx = i
break
}
}
if idx >= 0 {
app.occupiedNonces[account] = append(app.occupiedNonces[account][:idx], app.occupiedNonces[account][idx+1:]...)
}
},
}, nil
}

Expand Down Expand Up @@ -470,12 +508,14 @@ func TestTxMempool_Prioritization(t *testing.T) {
txs := [][]byte{
[]byte(fmt.Sprintf("sender-0-1=peer=%d", 9)),
[]byte(fmt.Sprintf("sender-1-1=peer=%d", 8)),
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 7, 0)),
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 9, 1)),
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 6, 0)),
[]byte(fmt.Sprintf("sender-2-1=peer=%d", 5)),
[]byte(fmt.Sprintf("sender-3-1=peer=%d", 4)),
}
evmTxs := [][]byte{
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 7, 0)),
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 9, 1)),
}

// copy the slice of txs and shuffle the order randomly
txsCopy := make([][]byte, len(txs))
Expand All @@ -484,6 +524,16 @@ func TestTxMempool_Prioritization(t *testing.T) {
rng.Shuffle(len(txsCopy), func(i, j int) {
txsCopy[i], txsCopy[j] = txsCopy[j], txsCopy[i]
})
txs = [][]byte{
[]byte(fmt.Sprintf("sender-0-1=peer=%d", 9)),
[]byte(fmt.Sprintf("sender-1-1=peer=%d", 8)),
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 7, 0)),
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 9, 1)),
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 6, 0)),
[]byte(fmt.Sprintf("sender-2-1=peer=%d", 5)),
[]byte(fmt.Sprintf("sender-3-1=peer=%d", 4)),
}
txsCopy = append(txsCopy, evmTxs...)

for i := range txsCopy {
require.NoError(t, txmp.CheckTx(ctx, txsCopy[i], nil, TxInfo{SenderID: peerID}))
Expand All @@ -504,6 +554,71 @@ func TestTxMempool_Prioritization(t *testing.T) {
}
}

func TestTxMempool_PendingStoreSize(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)

txmp := setup(t, client, 100)
txmp.config.PendingSize = 1
peerID := uint16(1)

address1 := "0xeD23B3A9DE15e92B9ef9540E587B3661E15A12fA"

require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 1, 1)), nil, TxInfo{SenderID: peerID}))
err := txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 1, 2)), nil, TxInfo{SenderID: peerID})
require.Error(t, err)
require.Contains(t, err.Error(), "mempool pending set is full")
}

func TestTxMempool_EVMEviction(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)

txmp := setup(t, client, 100)
txmp.config.Size = 1
peerID := uint16(1)

address1 := "0xeD23B3A9DE15e92B9ef9540E587B3661E15A12fA"
address2 := "0xfD23B3A9DE15e92B9ef9540E587B3661E15A12fA"

require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 1, 0)), nil, TxInfo{SenderID: peerID}))
// this should evict the previous tx
require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 2, 0)), nil, TxInfo{SenderID: peerID}))
require.Equal(t, 1, txmp.priorityIndex.NumTxs())
require.Equal(t, int64(2), txmp.priorityIndex.txs[0].priority)

txmp.config.Size = 2
require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 3, 1)), nil, TxInfo{SenderID: peerID}))
require.Equal(t, 0, txmp.pendingTxs.Size())
require.Equal(t, 2, txmp.priorityIndex.NumTxs())
// this would evict the tx with priority 2 and cause the tx with priority 3 to go pending
require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 4, 0)), nil, TxInfo{SenderID: peerID}))
time.Sleep(1 * time.Second) // reenqueue is async
require.Equal(t, 1, txmp.priorityIndex.NumTxs())
tx := txmp.priorityIndex.txs[0]
require.Equal(t, 1, txmp.pendingTxs.Size())

require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 5, 1)), nil, TxInfo{SenderID: peerID}))
require.Equal(t, 2, txmp.priorityIndex.NumTxs())
txmp.removeTx(tx, true, false)
// should not reenqueue
require.Equal(t, 1, txmp.priorityIndex.NumTxs())
time.Sleep(1 * time.Second) // pendingTxs should still be one even after sleeping for a sec
require.Equal(t, 1, txmp.pendingTxs.Size())
}

func TestTxMempool_CheckTxSamePeer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
26 changes: 18 additions & 8 deletions internal/mempool/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ func (pq *TxPriorityQueue) GetEvictableTxs(priority, txSize, totalSize, cap int6
pq.mtx.RLock()
defer pq.mtx.RUnlock()

txs := make([]*WrappedTx, len(pq.txs))
copy(txs, pq.txs)
txs := []*WrappedTx{}
txs = append(txs, pq.txs...)
for _, queue := range pq.evmQueue {
txs = append(txs, queue[1:]...)
}

sort.Slice(txs, func(i, j int) bool {
return txs[i].priority < txs[j].priority
Expand Down Expand Up @@ -111,18 +114,19 @@ func (pq *TxPriorityQueue) NumTxs() int {
return len(pq.txs) + pq.numQueuedUnsafe()
}

func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) {
func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) (removedIdx int) {
if queue, ok := pq.evmQueue[tx.evmAddress]; ok {
for i, t := range queue {
if t.tx.Key() == tx.tx.Key() {
pq.evmQueue[tx.evmAddress] = append(queue[:i], queue[i+1:]...)
if len(pq.evmQueue[tx.evmAddress]) == 0 {
delete(pq.evmQueue, tx.evmAddress)
}
break
return i
}
}
}
return -1
}

func (pq *TxPriorityQueue) findTxIndexUnsafe(tx *WrappedTx) (int, bool) {
Expand All @@ -135,21 +139,27 @@ func (pq *TxPriorityQueue) findTxIndexUnsafe(tx *WrappedTx) (int, bool) {
}

// RemoveTx removes a specific transaction from the priority queue.
func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx) {
func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx, shouldReenqueue bool) (toBeReenqueued []*WrappedTx) {
pq.mtx.Lock()
defer pq.mtx.Unlock()

var removedIdx int

if idx, ok := pq.findTxIndexUnsafe(tx); ok {
heap.Remove(pq, idx)
if tx.isEVM {
pq.removeQueuedEvmTxUnsafe(tx)
if len(pq.evmQueue[tx.evmAddress]) > 0 {
removedIdx = pq.removeQueuedEvmTxUnsafe(tx)
if !shouldReenqueue && len(pq.evmQueue[tx.evmAddress]) > 0 {
heap.Push(pq, pq.evmQueue[tx.evmAddress][0])
}
}
} else if tx.isEVM {
pq.removeQueuedEvmTxUnsafe(tx)
removedIdx = pq.removeQueuedEvmTxUnsafe(tx)
}
if tx.isEVM && shouldReenqueue && len(pq.evmQueue[tx.evmAddress]) > 0 && removedIdx >= 0 {
toBeReenqueued = pq.evmQueue[tx.evmAddress][removedIdx:]
}
return
}

func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) {
Expand Down
Loading
Loading