From 2c05e6f708867e82a314b7f37970531ff7e25d4f Mon Sep 17 00:00:00 2001 From: Fabio Di Fabio Date: Mon, 3 Jun 2024 19:43:51 +0200 Subject: [PATCH] Avoid keeping txpool lock during block creation Signed-off-by: Fabio Di Fabio --- .../txselection/BlockTransactionSelector.java | 2 +- .../bonsai/AbstractIsolationTests.java | 3 +- .../transactions/TransactionPoolFactory.java | 3 +- .../AbstractPrioritizedTransactions.java | 14 +- .../layered/AbstractTransactionsLayer.java | 20 +-- .../layered/LayeredPendingTransactions.java | 128 ++++++++---------- .../layered/ReadyTransactions.java | 9 +- .../layered/SenderPendingTransactions.java | 37 +++++ .../layered/SparseTransactions.java | 31 ++++- .../layered/TransactionsLayer.java | 8 +- .../AbstractLayeredTransactionPoolTest.java | 3 +- .../LayeredPendingTransactionsTest.java | 8 +- .../eth/transactions/layered/LayersTest.java | 54 ++++---- .../eth/transactions/layered/ReplayTest.java | 2 +- 14 files changed, 191 insertions(+), 131 deletions(-) create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SenderPendingTransactions.java diff --git a/ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/txselection/BlockTransactionSelector.java b/ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/txselection/BlockTransactionSelector.java index 22d75e27aaa..ab311c3fb38 100644 --- a/ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/txselection/BlockTransactionSelector.java +++ b/ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/txselection/BlockTransactionSelector.java @@ -164,7 +164,7 @@ private List createTransactionSelectors( public TransactionSelectionResults buildTransactionListForBlock() { LOG.atDebug() .setMessage("Transaction pool stats {}") - .addArgument(blockSelectionContext.transactionPool().logStats()) + .addArgument(blockSelectionContext.transactionPool()::logStats) .log(); timeLimitedSelection(); LOG.atTrace() diff --git a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/AbstractIsolationTests.java b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/AbstractIsolationTests.java index d9b9d5d1955..215fd60e378 100644 --- a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/AbstractIsolationTests.java +++ b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/AbstractIsolationTests.java @@ -137,7 +137,8 @@ public abstract class AbstractIsolationTests { txPoolMetrics, transactionReplacementTester, new BlobCache(), - MiningParameters.newDefault())); + MiningParameters.newDefault()), + ethScheduler); protected final List accounts = GenesisConfigFile.fromResource("/dev.json") diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java index 4c0462f2d72..d32b7bafe8e 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolFactory.java @@ -357,6 +357,7 @@ private static PendingTransactions createLayeredPendingTransactions( miningParameters); } - return new LayeredPendingTransactions(transactionPoolConfiguration, pendingTransactionsSorter); + return new LayeredPendingTransactions( + transactionPoolConfiguration, pendingTransactionsSorter, ethScheduler); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java index 11654be53cb..b763ffe7bdf 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java @@ -24,13 +24,13 @@ import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.TreeSet; import java.util.function.BiFunction; import java.util.function.Predicate; -import java.util.stream.Stream; /** * Holds the current set of executable pending transactions, that are candidate for inclusion on @@ -168,8 +168,16 @@ protected int[] getRemainingPromotionsPerType() { } @Override - public Stream stream() { - return orderByFee.descendingSet().stream(); + public List getBySender() { + final var sendersToAdd = new HashSet<>(txsBySender.keySet()); + return orderByFee.descendingSet().stream() + .map(PendingTransaction::getSender) + .filter(sendersToAdd::remove) + .map( + sender -> + new SenderPendingTransactions( + sender, List.copyOf(txsBySender.get(sender).values()))) + .toList(); } @Override diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java index 9cc8b44296a..31ba72a693e 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java @@ -14,6 +14,7 @@ */ package org.hyperledger.besu.ethereum.eth.transactions.layered; +import static java.util.Collections.unmodifiableList; import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ADDED; import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN; import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT; @@ -54,7 +55,6 @@ import java.util.function.BinaryOperator; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -138,6 +138,8 @@ public boolean contains(final Transaction transaction) { || nextLayer.contains(transaction); } + public abstract List getBySender(); + @Override public List getAll() { final List allNextLayers = nextLayer.getAll(); @@ -548,17 +550,17 @@ public List getAllPriority() { return priorityTxs; } - Stream stream(final Address sender) { - return txsBySender.getOrDefault(sender, EMPTY_SENDER_TXS).values().stream(); - } - @Override - public List getAllFor(final Address sender) { - return Stream.concat(stream(sender), nextLayer.getAllFor(sender).stream()).toList(); + public synchronized List getAllFor(final Address sender) { + final var fromNextLayers = nextLayer.getAllFor(sender); + final var fromThisLayer = txsBySender.getOrDefault(sender, EMPTY_SENDER_TXS).values(); + final var concatLayers = + new ArrayList(fromThisLayer.size() + fromNextLayers.size()); + concatLayers.addAll(fromThisLayer); + concatLayers.addAll(fromNextLayers); + return unmodifiableList(concatLayers); } - abstract Stream stream(); - @Override public int count() { return pendingTransactions.size() + nextLayer.count(); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactions.java index 9e304a4f0f5..77e3a28707b 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactions.java @@ -27,6 +27,7 @@ import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.Transaction; +import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction; import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionAddedListener; import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionDroppedListener; @@ -41,13 +42,10 @@ import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.OptionalLong; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collector; import java.util.stream.Collectors; @@ -63,12 +61,15 @@ public class LayeredPendingTransactions implements PendingTransactions { private static final Marker INVALID_TX_REMOVED = MarkerFactory.getMarker("INVALID_TX_REMOVED"); private final TransactionPoolConfiguration poolConfig; private final AbstractPrioritizedTransactions prioritizedTransactions; + private final EthScheduler ethScheduler; public LayeredPendingTransactions( final TransactionPoolConfiguration poolConfig, - final AbstractPrioritizedTransactions prioritizedTransactions) { + final AbstractPrioritizedTransactions prioritizedTransactions, + final EthScheduler ethScheduler) { this.poolConfig = poolConfig; this.prioritizedTransactions = prioritizedTransactions; + this.ethScheduler = ethScheduler; } @Override @@ -311,79 +312,56 @@ public synchronized List getPriorityTransactions() { } @Override - // There's a small edge case here we could encounter. - // When we pass an upgrade block that has a new transaction type, we start allowing transactions - // of that new type into our pool. - // If we then reorg to a block lower than the upgrade block height _and_ we create a block, that - // block could end up with transactions of the new type. - // This seems like it would be very rare but worth it to document that we don't handle that case - // right now. - public synchronized void selectTransactions( - final PendingTransactions.TransactionSelector selector) { + public void selectTransactions(final PendingTransactions.TransactionSelector selector) { final List invalidTransactions = new ArrayList<>(); - final Set alreadyChecked = new HashSet<>(); - final Set
skipSenders = new HashSet<>(); - final AtomicBoolean completed = new AtomicBoolean(false); - - prioritizedTransactions.stream() - .takeWhile(unused -> !completed.get()) - .filter(highPrioPendingTx -> !skipSenders.contains(highPrioPendingTx.getSender())) - .peek(this::logSenderTxs) - .forEach( - highPrioPendingTx -> - prioritizedTransactions.stream(highPrioPendingTx.getSender()) - .takeWhile( - candidatePendingTx -> - !skipSenders.contains(candidatePendingTx.getSender()) - && !completed.get()) - .filter( - candidatePendingTx -> - !alreadyChecked.contains(candidatePendingTx.getHash()) - && Long.compareUnsigned( - candidatePendingTx.getNonce(), highPrioPendingTx.getNonce()) - <= 0) - .forEach( - candidatePendingTx -> { - alreadyChecked.add(candidatePendingTx.getHash()); - final var res = selector.evaluateTransaction(candidatePendingTx); - - LOG.atTrace() - .setMessage("Selection result {} for transaction {}") - .addArgument(res) - .addArgument(candidatePendingTx::toTraceLog) - .log(); - - if (res.discard()) { - invalidTransactions.add(candidatePendingTx); - logDiscardedTransaction(candidatePendingTx, res); - } - - if (res.stop()) { - completed.set(true); - } - - if (!res.selected()) { - // avoid processing other txs from this sender if this one is skipped - // since the following will not be selected due to the nonce gap - skipSenders.add(candidatePendingTx.getSender()); - LOG.trace("Skipping tx from sender {}", candidatePendingTx.getSender()); - } - })); - - invalidTransactions.forEach( - invalidTx -> prioritizedTransactions.remove(invalidTx, INVALIDATED)); - } - private void logSenderTxs(final PendingTransaction highPrioPendingTx) { - LOG.atTrace() - .setMessage("highPrioPendingTx {}, senderTxs {}") - .addArgument(highPrioPendingTx::toTraceLog) - .addArgument( - () -> - prioritizedTransactions.stream(highPrioPendingTx.getSender()) - .map(PendingTransaction::toTraceLog) - .collect(Collectors.joining(", "))) - .log(); + final List candidateTransactions; + synchronized (this) { + // since selecting transactions for block creation is a potential long operation + // we want to avoid to keep the log for all the process, so we just lock to get + // the candidate transactions + candidateTransactions = prioritizedTransactions.getBySender(); + } + + selection: + for (final var senderTxs : candidateTransactions) { + LOG.trace("highPrioSenderTxs {}", senderTxs); + + for (final var candidatePendingTx : senderTxs.pendingTransactions()) { + final var selectionResult = selector.evaluateTransaction(candidatePendingTx); + + LOG.atTrace() + .setMessage("Selection result {} for transaction {}") + .addArgument(selectionResult) + .addArgument(candidatePendingTx::toTraceLog) + .log(); + + if (selectionResult.discard()) { + invalidTransactions.add(candidatePendingTx); + logDiscardedTransaction(candidatePendingTx, selectionResult); + } + + if (selectionResult.stop()) { + break selection; + } + + if (!selectionResult.selected()) { + // avoid processing other txs from this sender if this one is skipped + // since the following will not be selected due to the nonce gap + LOG.trace("Skipping remaining txs for sender {}", candidatePendingTx.getSender()); + break; + } + } + } + + ethScheduler.scheduleTxWorkerTask( + () -> + invalidTransactions.forEach( + invalidTx -> { + synchronized (this) { + prioritizedTransactions.remove(invalidTx, INVALIDATED); + } + })); } @Override diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java index 3cd07ecab9c..ef6f50e59be 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java @@ -38,7 +38,6 @@ import java.util.function.BiFunction; import java.util.function.Predicate; import java.util.stream.Collectors; -import java.util.stream.Stream; public class ReadyTransactions extends AbstractSequentialTransactionsLayer { @@ -138,10 +137,14 @@ protected boolean promotionFilter(final PendingTransaction pendingTransaction) { } @Override - public Stream stream() { + public synchronized List getBySender() { return orderByMaxFee.descendingSet().stream() .map(PendingTransaction::getSender) - .flatMap(sender -> txsBySender.get(sender).values().stream()); + .map( + sender -> + new SenderPendingTransactions( + sender, List.copyOf(txsBySender.get(sender).values()))) + .toList(); } @Override diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SenderPendingTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SenderPendingTransactions.java new file mode 100644 index 00000000000..cbcf3b4153f --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SenderPendingTransactions.java @@ -0,0 +1,37 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.transactions.layered; + +import org.hyperledger.besu.datatypes.Address; +import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction; + +import java.util.List; +import java.util.stream.Collectors; + +public record SenderPendingTransactions( + Address sender, List pendingTransactions) { + + @Override + public String toString() { + return "Sender " + + sender + + " has " + + pendingTransactions.size() + + " pending transactions " + + pendingTransactions.stream() + .map(PendingTransaction::toTraceLog) + .collect(Collectors.joining(",", "[", "]")); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java index 1a4f7b67d8c..0590b7b1282 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java @@ -44,15 +44,19 @@ import java.util.function.BiFunction; import java.util.function.Predicate; import java.util.stream.IntStream; -import java.util.stream.Stream; import com.google.common.collect.Iterables; public class SparseTransactions extends AbstractTransactionsLayer { + /** + * Order sparse tx by priority flag and sequence asc, so that we pick for eviction txs that have + * no priority and with the lowest sequence number (oldest) first. + */ private final NavigableSet sparseEvictionOrder = new TreeSet<>( Comparator.comparing(PendingTransaction::hasPriority) .thenComparing(PendingTransaction::getSequence)); + private final Map gapBySender = new HashMap<>(); private final List orderByGap; @@ -220,7 +224,8 @@ private NavigableMap getSequentialSubset( } @Override - public void remove(final PendingTransaction invalidatedTx, final RemovalReason reason) { + public synchronized void remove( + final PendingTransaction invalidatedTx, final RemovalReason reason) { final var senderTxs = txsBySender.get(invalidatedTx.getSender()); if (senderTxs != null && senderTxs.containsKey(invalidatedTx.getNonce())) { @@ -312,9 +317,27 @@ protected boolean promotionFilter(final PendingTransaction pendingTransaction) { return false; } + /** + * Return the full content of this layer, organized as a list of sender pending txs. For each + * sender the collection pending txs is ordered by nonce asc. + * + *

Return sender list order detail: first the sender of the tx that will be evicted as last. So + * for example if the same sender has the first and the last txs in the eviction order, it will be + * the first in the returned list, since we give precedence to tx that will be evicted later. + * + * @return a list of sender pending txs + */ @Override - public Stream stream() { - return sparseEvictionOrder.descendingSet().stream(); + public synchronized List getBySender() { + final var sendersToAdd = new HashSet<>(txsBySender.keySet()); + return sparseEvictionOrder.descendingSet().stream() + .map(PendingTransaction::getSender) + .filter(sendersToAdd::remove) + .map( + sender -> + new SenderPendingTransactions( + sender, List.copyOf(txsBySender.get(sender).values()))) + .toList(); } @Override diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java index 46c756226d2..d3c22aeef10 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java @@ -41,8 +41,6 @@ public interface TransactionsLayer { boolean contains(Transaction transaction); - List getAll(); - TransactionAddedResult add(PendingTransaction pendingTransaction, int gap); void remove(PendingTransaction pendingTransaction, RemovalReason reason); @@ -52,6 +50,10 @@ void blockAdded( BlockHeader blockHeader, final Map maxConfirmedNonceBySender); + List getAll(); + + List getAllFor(Address sender); + List getAllLocal(); List getAllPriority(); @@ -93,8 +95,6 @@ List promote( String logSender(Address sender); - List getAllFor(Address sender); - enum RemovalReason { CONFIRMED, CROSS_LAYER_REPLACED, diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractLayeredTransactionPoolTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractLayeredTransactionPoolTest.java index c43f7d6cb30..1e2c4101929 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractLayeredTransactionPoolTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractLayeredTransactionPoolTest.java @@ -58,7 +58,8 @@ protected PendingTransactions createPendingTransactions( return new LayeredPendingTransactions( poolConfig, createPrioritizedTransactions( - poolConfig, readyLayer, txPoolMetrics, transactionReplacementTester)); + poolConfig, readyLayer, txPoolMetrics, transactionReplacementTester), + ethScheduler); } protected abstract AbstractPrioritizedTransactions createPrioritizedTransactions( diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactionsTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactionsTest.java index 3b79b463687..b2f03ce175d 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactionsTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactionsTest.java @@ -170,14 +170,16 @@ public void setup() { senderLimitedLayers = createLayers(senderLimitedConfig); smallLayers = createLayers(smallPoolConfig); - pendingTransactions = new LayeredPendingTransactions(poolConf, layers.prioritizedTransactions); + pendingTransactions = + new LayeredPendingTransactions(poolConf, layers.prioritizedTransactions, ethScheduler); senderLimitedTransactions = new LayeredPendingTransactions( - senderLimitedConfig, senderLimitedLayers.prioritizedTransactions); + senderLimitedConfig, senderLimitedLayers.prioritizedTransactions, ethScheduler); smallPendingTransactions = - new LayeredPendingTransactions(smallPoolConfig, smallLayers.prioritizedTransactions); + new LayeredPendingTransactions( + smallPoolConfig, smallLayers.prioritizedTransactions, ethScheduler); } @Test diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayersTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayersTest.java index dab3cce6810..6946111e5ce 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayersTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayersTest.java @@ -199,7 +199,7 @@ private void assertScenario( MiningParameters.newDefault().setMinTransactionGasPrice(MIN_GAS_PRICE)); final LayeredPendingTransactions pendingTransactions = - new LayeredPendingTransactions(poolConfig, prioritizedTransactions); + new LayeredPendingTransactions(poolConfig, prioritizedTransactions, ethScheduler); scenario.execute( pendingTransactions, @@ -306,7 +306,7 @@ static Stream providerAddTransactions() { Arguments.of( new Scenario("fill sparse 2") .addForSender(S1, 5, 3, 2) - .expectedSparseForSender(S1, 5, 3, 2)), + .expectedSparseForSender(S1, 2, 3, 5)), Arguments.of( new Scenario("overflow sparse 1") .addForSender(S1, 1, 2, 3, 4) @@ -315,13 +315,13 @@ static Stream providerAddTransactions() { Arguments.of( new Scenario("overflow sparse 2") .addForSender(S1, 4, 2, 3, 1) - .expectedSparseForSender(S1, 2, 3, 1) + .expectedSparseForSender(S1, 1, 2, 3) .expectedDroppedForSender(S1, 4)), Arguments.of( new Scenario("overflow sparse 3") .addForSender(S1, 0, 4, 2, 3, 5) .expectedPrioritizedForSender(S1, 0) - .expectedSparseForSender(S1, 4, 2, 3) + .expectedSparseForSender(S1, 2, 3, 4) .expectedDroppedForSender(S1, 5))); } @@ -334,7 +334,7 @@ static Stream providerAddTransactionsMultipleSenders() { Arguments.of( new Scenario("add first sparse") .addForSenders(S1, 1, S2, 2) - .expectedSparseForSenders(S1, 1, S2, 2)), + .expectedSparseForSenders(S2, 2, S1, 1)), Arguments.of( new Scenario("fill prioritized 1") .addForSender(S1, 0, 1, 2) @@ -357,11 +357,11 @@ static Stream providerAddTransactionsMultipleSenders() { .addForSenders(S1, 2, S2, 1) .expectedPrioritizedForSenders() .expectedReadyForSenders() - .expectedSparseForSenders(S1, 2, S2, 1) + .expectedSparseForSenders(S2, 1, S1, 2) .addForSenders(S2, 2, S1, 0) .expectedPrioritizedForSender(S1, 0) .expectedReadyForSenders() - .expectedSparseForSenders(S1, 2, S2, 1, S2, 2) + .expectedSparseForSenders(S2, 1, S2, 2, S1, 2) .addForSenders(S1, 1) .expectedPrioritizedForSenders(S1, 0, S1, 1, S1, 2) .expectedReadyForSenders() @@ -431,15 +431,15 @@ static Stream providerAddTransactionsMultipleSenders() { .addForSenders(S2, 0, S3, 2, S1, 1) .expectedPrioritizedForSender(S2, 0) .expectedReadyForSenders() - .expectedSparseForSenders(S3, 2, S1, 1) + .expectedSparseForSenders(S1, 1, S3, 2) .addForSenders(S2, 1) .expectedPrioritizedForSenders(S2, 0, S2, 1) .expectedReadyForSenders() - .expectedSparseForSenders(S3, 2, S1, 1) + .expectedSparseForSenders(S1, 1, S3, 2) .addForSenders(S3, 0) .expectedPrioritizedForSenders(S3, 0, S2, 0, S2, 1) .expectedReadyForSenders() - .expectedSparseForSenders(S3, 2, S1, 1) + .expectedSparseForSenders(S1, 1, S3, 2) .addForSenders(S1, 0) .expectedPrioritizedForSenders(S3, 0, S2, 0, S2, 1) .expectedReadyForSenders(S1, 0, S1, 1) @@ -452,7 +452,7 @@ static Stream providerAddTransactionsMultipleSenders() { .addForSenders(S4, 0, S4, 1, S3, 3) .expectedPrioritizedForSenders(S4, 0, S4, 1, S3, 0) .expectedReadyForSenders(S3, 1, S2, 0, S2, 1) - .expectedSparseForSenders(S3, 2, S1, 1, S1, 0) + .expectedSparseForSenders(S1, 0, S1, 1, S3, 2) // ToDo: non optimal discard, worth to improve? .expectedDroppedForSender(S3, 3)), Arguments.of( @@ -813,7 +813,7 @@ static Stream providerNextNonceForSender() { Arguments.of( new Scenario("out of order sequence with gap 1") .addForSender(S1, 2, 1) - .expectedSparseForSender(S1, 2, 1) + .expectedSparseForSender(S1, 1, 2) .expectedNextNonceForSenders(S1, null)), Arguments.of( new Scenario("out of order sequence with gap 2") @@ -969,7 +969,7 @@ static Stream providerSelectTransactions() { Arguments.of( new Scenario("out of order sequence with gap 1") .addForSender(S1, 2, 1) - .expectedSparseForSender(S1, 2, 1) + .expectedSparseForSender(S1, 1, 2) .expectedSelectedTransactions()), Arguments.of( new Scenario("out of order sequence with gap 2") @@ -1073,8 +1073,7 @@ static Stream providerAsyncWorldStateUpdates() { .setAccountNonce(S1, 5) .addForSender(S1, 7) .expectedPrioritizedForSenders() - // remember that sparse are checked by oldest first - .expectedSparseForSender(S1, 8, 9, 7))); + .expectedSparseForSender(S1, 7, 8, 9))); } static Stream providerPrioritySenders() { @@ -1195,7 +1194,7 @@ static Stream providerPrioritySenders() { .addForSender(S3, 0) .expectedSparseForSender(S3, 0) .addForSender(SP1, 0) - .expectedSparseForSenders(S3, 0, SP1, 0) + .expectedSparseForSenders(SP1, 0, S3, 0) .confirmedForSenders(SP2, 0) .expectedPrioritizedForSender(SP2, 1, 2, 3) .expectedReadyForSenders(SP2, 4, SP2, 5, SP1, 0) @@ -1510,23 +1509,26 @@ public Scenario expectedDroppedForSenders() { private void assertExpectedPrioritized( final AbstractPrioritizedTransactions prioLayer, final List expected) { - assertThat(prioLayer.stream()).describedAs("Prioritized").containsExactlyElementsOf(expected); + assertThat(prioLayer.getBySender()) + .describedAs("Prioritized") + .flatExtracting(SenderPendingTransactions::pendingTransactions) + .containsExactlyElementsOf(expected); } private void assertExpectedReady( final ReadyTransactions readyLayer, final List expected) { - assertThat(readyLayer.stream()).describedAs("Ready").containsExactlyElementsOf(expected); + assertThat(readyLayer.getBySender()) + .describedAs("Ready") + .flatExtracting(SenderPendingTransactions::pendingTransactions) + .containsExactlyElementsOf(expected); } private void assertExpectedSparse( final SparseTransactions sparseLayer, final List expected) { - // sparse txs are returned from the most recent to the oldest, so reverse it to make writing - // scenarios easier - final var sortedExpected = new ArrayList<>(expected); - Collections.reverse(sortedExpected); - assertThat(sparseLayer.stream()) + assertThat(sparseLayer.getBySender()) .describedAs("Sparse") - .containsExactlyElementsOf(sortedExpected); + .flatExtracting(SenderPendingTransactions::pendingTransactions) + .containsExactlyElementsOf(expected); } private void assertExpectedDropped( @@ -1587,7 +1589,9 @@ public Scenario expectedSelectedTransactions(final Object... args) { } actions.add( (pending, prio, ready, sparse, dropped) -> - assertThat(prio.stream()).containsExactlyElementsOf(expectedSelected)); + assertThat(prio.getBySender()) + .flatExtracting(SenderPendingTransactions::pendingTransactions) + .containsExactlyElementsOf(expectedSelected)); return this; } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReplayTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReplayTest.java index 8b395a44153..14ed39c2e59 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReplayTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReplayTest.java @@ -129,7 +129,7 @@ public void replay() throws IOException { final AbstractPrioritizedTransactions prioritizedTransactions = createLayers(poolConfig, txPoolMetrics, baseFeeMarket); final LayeredPendingTransactions pendingTransactions = - new LayeredPendingTransactions(poolConfig, prioritizedTransactions); + new LayeredPendingTransactions(poolConfig, prioritizedTransactions, ethScheduler); br.lines() .forEach( line -> {