Skip to content

Commit

Permalink
Avoid keeping txpool lock during block creation
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
  • Loading branch information
fab-10 committed Jun 7, 2024
1 parent bfada7d commit 33f1aa1
Show file tree
Hide file tree
Showing 14 changed files with 221 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private List<AbstractTransactionSelector> createTransactionSelectors(
public TransactionSelectionResults buildTransactionListForBlock() {
LOG.atDebug()
.setMessage("Transaction pool stats {}")
.addArgument(blockSelectionContext.transactionPool().logStats())
.addArgument(blockSelectionContext.transactionPool()::logStats)
.log();
timeLimitedSelection();
LOG.atTrace()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ public abstract class AbstractIsolationTests {
txPoolMetrics,
transactionReplacementTester,
new BlobCache(),
MiningParameters.newDefault()));
MiningParameters.newDefault()),
ethScheduler);

protected final List<GenesisAllocation> accounts =
GenesisConfigFile.fromResource("/dev.json")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ private static PendingTransactions createLayeredPendingTransactions(
miningParameters);
}

return new LayeredPendingTransactions(transactionPoolConfiguration, pendingTransactionsSorter);
return new LayeredPendingTransactions(
transactionPoolConfiguration, pendingTransactionsSorter, ethScheduler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeSet;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Stream;

/**
* Holds the current set of executable pending transactions, that are candidate for inclusion on
Expand Down Expand Up @@ -167,9 +167,25 @@ protected int[] getRemainingPromotionsPerType() {
return remainingPromotionsPerType;
}

/**
* Return the full content of this layer, organized as a list of sender pending txs. For each
* sender the collection pending txs is ordered by nonce asc.
*
* <p>Returned sender list order detail: first the sender of the most profitable tx.
*
* @return a list of sender pending txs
*/
@Override
public Stream<PendingTransaction> stream() {
return orderByFee.descendingSet().stream();
public List<SenderPendingTransactions> getBySender() {
final var sendersToAdd = new HashSet<>(txsBySender.keySet());
return orderByFee.descendingSet().stream()
.map(PendingTransaction::getSender)
.filter(sendersToAdd::remove)
.map(
sender ->
new SenderPendingTransactions(
sender, List.copyOf(txsBySender.get(sender).values())))
.toList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import static java.util.Collections.unmodifiableList;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ADDED;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT;
Expand Down Expand Up @@ -54,7 +55,6 @@
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -138,6 +138,14 @@ public boolean contains(final Transaction transaction) {
|| nextLayer.contains(transaction);
}

/**
* Return the full content of this layer, organized as a list of sender pending txs. For each
* sender the collection pending txs is ordered by nonce asc.
*
* @return a list of sender pending txs
*/
public abstract List<SenderPendingTransactions> getBySender();

@Override
public List<PendingTransaction> getAll() {
final List<PendingTransaction> allNextLayers = nextLayer.getAll();
Expand Down Expand Up @@ -548,17 +556,17 @@ public List<Transaction> getAllPriority() {
return priorityTxs;
}

Stream<PendingTransaction> stream(final Address sender) {
return txsBySender.getOrDefault(sender, EMPTY_SENDER_TXS).values().stream();
}

@Override
public List<PendingTransaction> getAllFor(final Address sender) {
return Stream.concat(stream(sender), nextLayer.getAllFor(sender).stream()).toList();
public synchronized List<PendingTransaction> getAllFor(final Address sender) {
final var fromNextLayers = nextLayer.getAllFor(sender);
final var fromThisLayer = txsBySender.getOrDefault(sender, EMPTY_SENDER_TXS).values();
final var concatLayers =
new ArrayList<PendingTransaction>(fromThisLayer.size() + fromNextLayers.size());
concatLayers.addAll(fromThisLayer);
concatLayers.addAll(fromNextLayers);
return unmodifiableList(concatLayers);
}

abstract Stream<PendingTransaction> stream();

@Override
public int count() {
return pendingTransactions.size() + nextLayer.count();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionAddedListener;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionDroppedListener;
Expand All @@ -41,13 +42,10 @@

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collector;
import java.util.stream.Collectors;

Expand All @@ -63,12 +61,15 @@ public class LayeredPendingTransactions implements PendingTransactions {
private static final Marker INVALID_TX_REMOVED = MarkerFactory.getMarker("INVALID_TX_REMOVED");
private final TransactionPoolConfiguration poolConfig;
private final AbstractPrioritizedTransactions prioritizedTransactions;
private final EthScheduler ethScheduler;

public LayeredPendingTransactions(
final TransactionPoolConfiguration poolConfig,
final AbstractPrioritizedTransactions prioritizedTransactions) {
final AbstractPrioritizedTransactions prioritizedTransactions,
final EthScheduler ethScheduler) {
this.poolConfig = poolConfig;
this.prioritizedTransactions = prioritizedTransactions;
this.ethScheduler = ethScheduler;
}

@Override
Expand Down Expand Up @@ -311,79 +312,57 @@ public synchronized List<Transaction> getPriorityTransactions() {
}

@Override
// There's a small edge case here we could encounter.
// When we pass an upgrade block that has a new transaction type, we start allowing transactions
// of that new type into our pool.
// If we then reorg to a block lower than the upgrade block height _and_ we create a block, that
// block could end up with transactions of the new type.
// This seems like it would be very rare but worth it to document that we don't handle that case
// right now.
public synchronized void selectTransactions(
final PendingTransactions.TransactionSelector selector) {
public void selectTransactions(final PendingTransactions.TransactionSelector selector) {
final List<PendingTransaction> invalidTransactions = new ArrayList<>();
final Set<Hash> alreadyChecked = new HashSet<>();
final Set<Address> skipSenders = new HashSet<>();
final AtomicBoolean completed = new AtomicBoolean(false);

prioritizedTransactions.stream()
.takeWhile(unused -> !completed.get())
.filter(highPrioPendingTx -> !skipSenders.contains(highPrioPendingTx.getSender()))
.peek(this::logSenderTxs)
.forEach(
highPrioPendingTx ->
prioritizedTransactions.stream(highPrioPendingTx.getSender())
.takeWhile(
candidatePendingTx ->
!skipSenders.contains(candidatePendingTx.getSender())
&& !completed.get())
.filter(
candidatePendingTx ->
!alreadyChecked.contains(candidatePendingTx.getHash())
&& Long.compareUnsigned(
candidatePendingTx.getNonce(), highPrioPendingTx.getNonce())
<= 0)
.forEach(
candidatePendingTx -> {
alreadyChecked.add(candidatePendingTx.getHash());
final var res = selector.evaluateTransaction(candidatePendingTx);

LOG.atTrace()
.setMessage("Selection result {} for transaction {}")
.addArgument(res)
.addArgument(candidatePendingTx::toTraceLog)
.log();

if (res.discard()) {
invalidTransactions.add(candidatePendingTx);
logDiscardedTransaction(candidatePendingTx, res);
}

if (res.stop()) {
completed.set(true);
}

if (!res.selected()) {
// avoid processing other txs from this sender if this one is skipped
// since the following will not be selected due to the nonce gap
skipSenders.add(candidatePendingTx.getSender());
LOG.trace("Skipping tx from sender {}", candidatePendingTx.getSender());
}
}));

invalidTransactions.forEach(
invalidTx -> prioritizedTransactions.remove(invalidTx, INVALIDATED));
}

private void logSenderTxs(final PendingTransaction highPrioPendingTx) {
LOG.atTrace()
.setMessage("highPrioPendingTx {}, senderTxs {}")
.addArgument(highPrioPendingTx::toTraceLog)
.addArgument(
() ->
prioritizedTransactions.stream(highPrioPendingTx.getSender())
.map(PendingTransaction::toTraceLog)
.collect(Collectors.joining(", ")))
.log();
final List<SenderPendingTransactions> candidateTxsBySender;
synchronized (this) {
// since selecting transactions for block creation is a potential long operation
// we want to avoid to keep the lock for all the process, but we just lock to get
// the candidate transactions
candidateTxsBySender = prioritizedTransactions.getBySender();
}

selection:
for (final var senderTxs : candidateTxsBySender) {
LOG.trace("highPrioSenderTxs {}", senderTxs);

for (final var candidatePendingTx : senderTxs.pendingTransactions()) {
final var selectionResult = selector.evaluateTransaction(candidatePendingTx);

LOG.atTrace()
.setMessage("Selection result {} for transaction {}")
.addArgument(selectionResult)
.addArgument(candidatePendingTx::toTraceLog)
.log();

if (selectionResult.discard()) {
invalidTransactions.add(candidatePendingTx);
logDiscardedTransaction(candidatePendingTx, selectionResult);
}

if (selectionResult.stop()) {
LOG.trace("Stopping selection");
break selection;
}

if (!selectionResult.selected()) {
// avoid processing other txs from this sender if this one is skipped
// since the following will not be selected due to the nonce gap
LOG.trace("Skipping remaining txs for sender {}", candidatePendingTx.getSender());
break;
}
}
}

ethScheduler.scheduleTxWorkerTask(
() ->
invalidTransactions.forEach(
invalidTx -> {
synchronized (this) {
prioritizedTransactions.remove(invalidTx, INVALIDATED);
}
}));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class ReadyTransactions extends AbstractSequentialTransactionsLayer {

Expand Down Expand Up @@ -137,11 +136,24 @@ protected boolean promotionFilter(final PendingTransaction pendingTransaction) {
return true;
}

/**
* Return the full content of this layer, organized as a list of sender pending txs. For each
* sender the collection pending txs is ordered by nonce asc.
*
* <p>Returned sender list order detail: first the sender of the tx with the highest max gas
* price.
*
* @return a list of sender pending txs
*/
@Override
public Stream<PendingTransaction> stream() {
public synchronized List<SenderPendingTransactions> getBySender() {
return orderByMaxFee.descendingSet().stream()
.map(PendingTransaction::getSender)
.flatMap(sender -> txsBySender.get(sender).values().stream());
.map(
sender ->
new SenderPendingTransactions(
sender, List.copyOf(txsBySender.get(sender).values())))
.toList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;

import java.util.List;
import java.util.stream.Collectors;

/**
* A list of pending transactions of a specific sender, ordered by nonce asc
*
* @param sender the sender
* @param pendingTransactions the list of pending transactions order by nonce asc
*/
public record SenderPendingTransactions(
Address sender, List<PendingTransaction> pendingTransactions) {

@Override
public String toString() {
return "Sender "
+ sender
+ " has "
+ pendingTransactions.size()
+ " pending transactions "
+ pendingTransactions.stream()
.map(PendingTransaction::toTraceLog)
.collect(Collectors.joining(",", "[", "]"));
}
}
Loading

0 comments on commit 33f1aa1

Please sign in to comment.