Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EVM] Allow multiple txs from same account in a block #190

Merged
merged 5 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions abci/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,4 +256,9 @@ type ResponseCheckTxV2 struct {
IsPendingTransaction bool
Checker PendingTxChecker // must not be nil if IsPendingTransaction is true
ExpireTxHandler ExpireTxHandler

// helper properties for prioritization in mempool
EVMNonce uint64
EVMSenderAddress string
IsEVM bool
}
11 changes: 7 additions & 4 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,13 @@ func (txmp *TxMempool) CheckTx(
}

wtx := &WrappedTx{
tx: tx,
hash: txHash,
timestamp: time.Now().UTC(),
height: txmp.height,
tx: tx,
hash: txHash,
timestamp: time.Now().UTC(),
height: txmp.height,
evmNonce: res.EVMNonce,
evmAddress: res.EVMSenderAddress,
isEVM: res.IsEVM,
expiredCallback: func(removeFromCache bool) {
txmp.metrics.ExpiredTxs.Add(1)
if removeFromCache {
Expand Down
94 changes: 93 additions & 1 deletion internal/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,42 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a
sender string
)

if strings.HasPrefix(string(req.Tx), "evm") {
// format is evm-sender-0=account=priority=nonce
// split into respective vars
parts := bytes.Split(req.Tx, []byte("="))
sender = string(parts[0])
account := string(parts[1])
v, err := strconv.ParseInt(string(parts[2]), 10, 64)
if err != nil {
// could not parse
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{
Priority: priority,
Code: 100,
GasWanted: 1,
}}, nil
}
nonce, err := strconv.ParseInt(string(parts[3]), 10, 64)
if err != nil {
// could not parse
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{
Priority: priority,
Code: 101,
GasWanted: 1,
}}, nil
}
return &abci.ResponseCheckTxV2{
ResponseCheckTx: &abci.ResponseCheckTx{
Priority: v,
Code: code.CodeTypeOK,
GasWanted: 1,
},
EVMNonce: uint64(nonce),
EVMSenderAddress: account,
IsEVM: true,
}, nil
}

// infer the priority from the raw transaction value (sender=key=value)
parts := bytes.Split(req.Tx, []byte("="))
if len(parts) == 3 {
Expand All @@ -64,7 +100,6 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a
GasWanted: 1,
}}, nil
}

return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{
Priority: priority,
Sender: sender,
Expand Down Expand Up @@ -412,6 +447,63 @@ func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) {
require.NoError(t, txmp.CheckTx(ctx, tx, nil, TxInfo{SenderID: 0}))
}

func TestTxMempool_Prioritization(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)

txmp := setup(t, client, 100)
peerID := uint16(1)

address1 := "0xeD23B3A9DE15e92B9ef9540E587B3661E15A12fA"
address2 := "0xfD23B3A9DE15e92B9ef9540E587B3661E15A12fA"

// Generate transactions with different priorities
// there are two formats to comply with the above mocked CheckTX
// EVM: evm-sender=account=priority=nonce
// Non-EVM: sender=peer=priority
txs := [][]byte{
[]byte(fmt.Sprintf("sender-0-1=peer=%d", 9)),
[]byte(fmt.Sprintf("sender-1-1=peer=%d", 8)),
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 7, 0)),
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 9, 1)),
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 6, 0)),
[]byte(fmt.Sprintf("sender-2-1=peer=%d", 5)),
[]byte(fmt.Sprintf("sender-3-1=peer=%d", 4)),
}

// copy the slice of txs and shuffle the order randomly
txsCopy := make([][]byte, len(txs))
copy(txsCopy, txs)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
rng.Shuffle(len(txsCopy), func(i, j int) {
txsCopy[i], txsCopy[j] = txsCopy[j], txsCopy[i]
})

for i := range txsCopy {
require.NoError(t, txmp.CheckTx(ctx, txsCopy[i], nil, TxInfo{SenderID: peerID}))
}

// Reap the transactions
reapedTxs := txmp.ReapMaxTxs(len(txs))
// Check if the reaped transactions are in the correct order of their priorities
for _, tx := range txs {
fmt.Printf("expected: %s\n", string(tx))
}
fmt.Println("**************")
for _, reapedTx := range reapedTxs {
fmt.Printf("received: %s\n", string(reapedTx))
}
for i, reapedTx := range reapedTxs {
require.Equal(t, txs[i], []byte(reapedTx))
}
}

func TestTxMempool_CheckTxSamePeer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
116 changes: 98 additions & 18 deletions internal/mempool/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ var _ heap.Interface = (*TxPriorityQueue)(nil)

// TxPriorityQueue defines a thread-safe priority queue for valid transactions.
type TxPriorityQueue struct {
mtx sync.RWMutex
txs []*WrappedTx
mtx sync.RWMutex
txs []*WrappedTx
evmQueue map[string][]*WrappedTx
}

func NewTxPriorityQueue() *TxPriorityQueue {
pq := &TxPriorityQueue{
txs: make([]*WrappedTx, 0),
txs: make([]*WrappedTx, 0),
evmQueue: make(map[string][]*WrappedTx),
}

heap.Init(pq)
Expand Down Expand Up @@ -68,68 +70,146 @@ func (pq *TxPriorityQueue) GetEvictableTxs(priority, txSize, totalSize, cap int6
return nil
}

// requires read lock
func (pq *TxPriorityQueue) numQueuedUnsafe() int {
var result int
for _, queue := range pq.evmQueue {
result += len(queue)
}
// first items in queue are also in heap, subtract one
return result - len(pq.evmQueue)
}

// NumTxs returns the number of transactions in the priority queue. It is
// thread safe.
func (pq *TxPriorityQueue) NumTxs() int {
pq.mtx.RLock()
defer pq.mtx.RUnlock()

return len(pq.txs)
return len(pq.txs) + pq.numQueuedUnsafe()
}

func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) {
if queue, ok := pq.evmQueue[tx.evmAddress]; ok {
for i, t := range queue {
if t == tx {
stevenlanders marked this conversation as resolved.
Show resolved Hide resolved
pq.evmQueue[tx.evmAddress] = append(queue[:i], queue[i+1:]...)
if len(pq.evmQueue[tx.evmAddress]) == 0 {
delete(pq.evmQueue, tx.evmAddress)
}
break
}
}
}
}

func (pq *TxPriorityQueue) findTxIndexUnsafe(tx *WrappedTx) (int, bool) {
for i, t := range pq.txs {
if t == tx {
return i, true
}
}
return 0, false
}

// RemoveTx removes a specific transaction from the priority queue.
func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx) {
pq.mtx.Lock()
defer pq.mtx.Unlock()

if tx.heapIndex < len(pq.txs) {
heap.Remove(pq, tx.heapIndex)
if idx, ok := pq.findTxIndexUnsafe(tx); ok {
heap.Remove(pq, idx)
}

if tx.isEVM {
pq.removeQueuedEvmTxUnsafe(tx)
}
}

func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) {
if !tx.isEVM {
heap.Push(pq, tx)
return
}

queue, exists := pq.evmQueue[tx.evmAddress]
if !exists {
pq.evmQueue[tx.evmAddress] = []*WrappedTx{tx}
heap.Push(pq, tx)
return
}

first := queue[0]
if tx.evmNonce < first.evmNonce {
if idx, ok := pq.findTxIndexUnsafe(first); ok {
heap.Remove(pq, idx)
}
heap.Push(pq, tx)
}
queue = append(queue, tx)
sort.Slice(queue, func(i, j int) bool {
stevenlanders marked this conversation as resolved.
Show resolved Hide resolved
return queue[i].evmNonce < queue[j].evmNonce
})
pq.evmQueue[tx.evmAddress] = queue
}

// PushTx adds a valid transaction to the priority queue. It is thread safe.
func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) {
pq.mtx.Lock()
defer pq.mtx.Unlock()
pq.pushTxUnsafe(tx)
}

heap.Push(pq, tx)
func (pq *TxPriorityQueue) popTxUnsafe() *WrappedTx {
x := heap.Pop(pq)
if x == nil {
return nil
}

tx := x.(*WrappedTx)

if !tx.isEVM {
return tx
}

pq.removeQueuedEvmTxUnsafe(tx)
if len(pq.evmQueue[tx.evmAddress]) > 0 {
heap.Push(pq, pq.evmQueue[tx.evmAddress][0])
}

return tx
}

// PopTx removes the top priority transaction from the queue. It is thread safe.
func (pq *TxPriorityQueue) PopTx() *WrappedTx {
pq.mtx.Lock()
defer pq.mtx.Unlock()

x := heap.Pop(pq)
if x != nil {
return x.(*WrappedTx)
}

return nil
return pq.popTxUnsafe()
}

// dequeue up to `max` transactions and reenqueue while locked
func (pq *TxPriorityQueue) PeekTxs(max int) []*WrappedTx {
pq.mtx.Lock()
defer pq.mtx.Unlock()

numTxs := len(pq.txs)
numTxs := len(pq.txs) + pq.numQueuedUnsafe()
if max < 0 {
max = numTxs
}

cap := tmmath.MinInt(numTxs, max)
res := make([]*WrappedTx, 0, cap)
for i := 0; i < cap; i++ {
popped := heap.Pop(pq)
popped := pq.popTxUnsafe()
if popped == nil {
break
}
res = append(res, popped.(*WrappedTx))

res = append(res, popped)
}

for _, tx := range res {
heap.Push(pq, tx)
pq.pushTxUnsafe(tx)
}
return res
}
Expand Down
Loading
Loading