Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Layered txpool: fix for unsent drop notifications on remove #7538

Merged
merged 15 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ sonarqube {
}
}

tasks.register('spotlessCheckFast') {
dependsOn subprojects.collect { it.tasks.withType(com.diffplug.gradle.spotless.SpotlessCheck) }
}

project.tasks["sonarqube"].dependsOn "jacocoRootReport"

if (!JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_21)) {
Expand Down Expand Up @@ -436,12 +432,6 @@ allprojects {
options.addStringOption('Xwerror', '-html5')
options.encoding = 'UTF-8'
}

plugins.withType(JavaPlugin) {
tasks.withType(JavaCompile) {
it.dependsOn(rootProject.tasks.named('spotlessCheckFast'))
}
}
}

task deploy() {}
Expand All @@ -465,7 +455,7 @@ task checkMavenCoordinateCollisions {

tasks.register('checkPluginAPIChanges', DefaultTask) {}
checkPluginAPIChanges.dependsOn(':plugin-api:checkAPIChanges')
check.dependsOn('checkPluginAPIChanges', 'checkMavenCoordinateCollisions', 'spotlessCheckFast')
check.dependsOn('checkPluginAPIChanges', 'checkMavenCoordinateCollisions')

subprojects {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.FOLLOW_INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.FOLLOW_INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.MOVE;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.transactions.BlobCache;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
import org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason;

import java.util.Map;
import java.util.NavigableMap;
Expand All @@ -44,7 +46,7 @@ public AbstractSequentialTransactionsLayer(
}

@Override
public void remove(final PendingTransaction invalidatedTx, final RemovalReason reason) {
public void remove(final PendingTransaction invalidatedTx, final PoolRemovalReason reason) {
nextLayer.remove(invalidatedTx, reason);

final var senderTxs = txsBySender.get(invalidatedTx.getSender());
Expand Down Expand Up @@ -77,7 +79,7 @@ private void pushDown(
senderTxs.remove(txToRemove.getNonce());
processRemove(senderTxs, txToRemove.getTransaction(), FOLLOW_INVALIDATED);
})
.forEach(followingTx -> nextLayer.add(followingTx, gap));
.forEach(followingTx -> nextLayer.add(followingTx, gap, MOVE));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.TRY_NEXT_LAYER;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.CONFIRMED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.CROSS_LAYER_REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.PROMOTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.PROMOTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.CONFIRMED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.CROSS_LAYER_REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.RemovedFrom.POOL;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.MOVE;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
Expand Down Expand Up @@ -169,7 +171,8 @@ protected abstract TransactionAddedResult canAdd(
final PendingTransaction pendingTransaction, final int gap);

@Override
public TransactionAddedResult add(final PendingTransaction pendingTransaction, final int gap) {
public TransactionAddedResult add(
final PendingTransaction pendingTransaction, final int gap, final AddReason addReason) {

// is replacing an existing one?
TransactionAddedResult addStatus = maybeReplaceTransaction(pendingTransaction);
Expand All @@ -178,7 +181,7 @@ public TransactionAddedResult add(final PendingTransaction pendingTransaction, f
}

if (addStatus.equals(TRY_NEXT_LAYER)) {
return addToNextLayer(pendingTransaction, gap);
return addToNextLayer(pendingTransaction, gap, addReason);
}

if (addStatus.isSuccess()) {
Expand All @@ -192,7 +195,10 @@ public TransactionAddedResult add(final PendingTransaction pendingTransaction, f
tryFillGap(addStatus, pendingTransaction, getRemainingPromotionsPerType());
}

ethScheduler.scheduleTxWorkerTask(() -> notifyTransactionAdded(pendingTransaction));
if (addReason.sendNotification()) {
ethScheduler.scheduleTxWorkerTask(() -> notifyTransactionAdded(pendingTransaction));
}

} else {
final var rejectReason = addStatus.maybeInvalidReason().orElseThrow();
metrics.incrementRejected(pendingTransaction, rejectReason, name());
Expand Down Expand Up @@ -302,24 +308,26 @@ public PendingTransaction promoteFor(
}

private TransactionAddedResult addToNextLayer(
final PendingTransaction pendingTransaction, final int distance) {
final PendingTransaction pendingTransaction, final int distance, final AddReason addReason) {
return addToNextLayer(
txsBySender.getOrDefault(pendingTransaction.getSender(), EMPTY_SENDER_TXS),
pendingTransaction,
distance);
distance,
addReason);
}

protected TransactionAddedResult addToNextLayer(
final NavigableMap<Long, PendingTransaction> senderTxs,
final PendingTransaction pendingTransaction,
final int distance) {
final int distance,
final AddReason addReason) {
final int nextLayerDistance;
if (senderTxs.isEmpty()) {
nextLayerDistance = distance;
} else {
nextLayerDistance = (int) (pendingTransaction.getNonce() - (senderTxs.lastKey() + 1));
}
return nextLayer.add(pendingTransaction, nextLayerDistance);
return nextLayer.add(pendingTransaction, nextLayerDistance, addReason);
}

private void processAdded(final PendingTransaction addedTx) {
Expand Down Expand Up @@ -353,7 +361,7 @@ private void evict(final long spaceToFree, final int txsToEvict) {
++evictedCount;
evictedSize += lastTx.memorySize();
// evicted can always be added to the next layer
addToNextLayer(lessReadySenderTxs, lastTx, 0);
addToNextLayer(lessReadySenderTxs, lastTx, 0, MOVE);
}

if (lessReadySenderTxs.isEmpty()) {
Expand Down Expand Up @@ -411,6 +419,9 @@ protected PendingTransaction processRemove(
decreaseCounters(removedTx);
metrics.incrementRemoved(removedTx, removalReason.label(), name());
internalRemove(senderTxs, removedTx, removalReason);
if (removalReason.removedFrom().equals(POOL)) {
notifyTransactionDropped(removedTx);
}
}
return removedTx;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.BELOW_BASE_FEE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.DEMOTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.MOVE;

import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.core.BlockHeader;
Expand Down Expand Up @@ -104,7 +105,7 @@ protected void internalBlockAdded(final BlockHeader blockHeader, final FeeMarket
while (itTxsBySender.hasNext()) {
final var senderTxs = itTxsBySender.next().getValue();

Optional<Long> maybeFirstUnderpricedNonce = Optional.empty();
Optional<Long> maybeFirstDemotedNonce = Optional.empty();

for (final var e : senderTxs.entrySet()) {
final PendingTransaction tx = e.getValue();
Expand All @@ -114,26 +115,28 @@ protected void internalBlockAdded(final BlockHeader blockHeader, final FeeMarket
} else {
// otherwise sender txs starting from this nonce need to be demoted to next layer,
// and we can go to next sender
maybeFirstUnderpricedNonce = Optional.of(e.getKey());
maybeFirstDemotedNonce = Optional.of(e.getKey());
break;
}
}

maybeFirstUnderpricedNonce.ifPresent(
maybeFirstDemotedNonce.ifPresent(
nonce -> {
// demote all txs after the first underpriced to the next layer, because none of them is
// demote all txs after the first demoted to the next layer, because none of them is
// executable now, and we can avoid sorting them until they are candidate for execution
// again
final var demoteTxs = senderTxs.tailMap(nonce, true);
while (!demoteTxs.isEmpty()) {
final PendingTransaction demoteTx = demoteTxs.pollLastEntry().getValue();
LOG.atTrace()
.setMessage("Demoting tx {} with max gas price below next block base fee {}")
.setMessage(
"Demoting tx {} since it does not respect anymore the requisites to stay in this layer."
+ " Next block base fee {}")
.addArgument(demoteTx::toTraceLog)
.addArgument(newNextBlockBaseFee::toHumanReadableString)
.log();
processEvict(senderTxs, demoteTx, BELOW_BASE_FEE);
addToNextLayer(senderTxs, demoteTx, 0);
processEvict(senderTxs, demoteTx, DEMOTED);
addToNextLayer(senderTxs, demoteTx, 0, MOVE);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.DROPPED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.DROPPED;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
Expand All @@ -25,6 +25,7 @@
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionDroppedListener;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
import org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason;
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;
import org.hyperledger.besu.util.Subscribers;

Expand Down Expand Up @@ -75,15 +76,16 @@ public List<PendingTransaction> getAll() {
}

@Override
public TransactionAddedResult add(final PendingTransaction pendingTransaction, final int gap) {
public TransactionAddedResult add(
final PendingTransaction pendingTransaction, final int gap, final AddReason reason) {
notifyTransactionDropped(pendingTransaction);
metrics.incrementRemoved(pendingTransaction, DROPPED.label(), name());
++droppedCount;
return TransactionAddedResult.DROPPED;
}

@Override
public void remove(final PendingTransaction pendingTransaction, final RemovalReason reason) {}
public void remove(final PendingTransaction pendingTransaction, final PoolRemovalReason reason) {}

@Override
public void penalize(final PendingTransaction penalizedTx) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.INTERNAL_ERROR;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.NONCE_TOO_FAR_IN_FUTURE_FOR_SENDER;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.RECONCILED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.RECONCILED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.AddReason.NEW;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
Expand Down Expand Up @@ -100,7 +101,7 @@ public synchronized TransactionAddedResult addTransaction(
}

try {
return prioritizedTransactions.add(pendingTransaction, (int) nonceDistance);
return prioritizedTransactions.add(pendingTransaction, (int) nonceDistance, NEW);
} catch (final Throwable throwable) {
return reconcileAndRetryAdd(
pendingTransaction, stateSenderNonce, (int) nonceDistance, throwable);
Expand All @@ -123,7 +124,7 @@ private TransactionAddedResult reconcileAndRetryAdd(
.log();
reconcileSender(pendingTransaction.getSender(), stateSenderNonce);
try {
return prioritizedTransactions.add(pendingTransaction, nonceDistance);
return prioritizedTransactions.add(pendingTransaction, nonceDistance, NEW);
} catch (final Throwable throwable2) {
// the error should have been solved by the reconcile, logging at higher level now
LOG.atWarn()
Expand Down Expand Up @@ -210,7 +211,7 @@ private void reconcileSender(final Address sender, final long stateSenderNonce)
final long lowestNonce = reAddTxs.getFirst().getNonce();
final int newNonceDistance = (int) Math.max(0, lowestNonce - stateSenderNonce);

reAddTxs.forEach(ptx -> prioritizedTransactions.add(ptx, newNonceDistance));
reAddTxs.forEach(ptx -> prioritizedTransactions.add(ptx, newNonceDistance, NEW));
}

LOG.atDebug()
Expand Down Expand Up @@ -315,7 +316,6 @@ public synchronized List<Transaction> getPriorityTransactions() {

@Override
public void selectTransactions(final PendingTransactions.TransactionSelector selector) {
final List<PendingTransaction> invalidTransactions = new ArrayList<>();
final List<PendingTransaction> penalizedTransactions = new ArrayList<>();
final Set<Address> skipSenders = new HashSet<>();

Expand Down Expand Up @@ -346,7 +346,12 @@ public void selectTransactions(final PendingTransactions.TransactionSelector sel
.log();

if (selectionResult.discard()) {
invalidTransactions.add(candidatePendingTx);
ethScheduler.scheduleTxWorkerTask(
() -> {
synchronized (this) {
prioritizedTransactions.remove(candidatePendingTx, INVALIDATED);
}
});
logDiscardedTransaction(candidatePendingTx, selectionResult);
}

Expand Down Expand Up @@ -376,20 +381,13 @@ public void selectTransactions(final PendingTransactions.TransactionSelector sel
}

ethScheduler.scheduleTxWorkerTask(
() -> {
invalidTransactions.forEach(
invalidTx -> {
synchronized (this) {
prioritizedTransactions.remove(invalidTx, INVALIDATED);
}
});
penalizedTransactions.forEach(
penalizedTx -> {
synchronized (this) {
prioritizedTransactions.internalPenalize(penalizedTx);
}
});
});
() ->
penalizedTransactions.forEach(
penalizedTx -> {
synchronized (this) {
prioritizedTransactions.internalPenalize(penalizedTx);
}
}));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.PROMOTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.PROMOTED;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.core.BlockHeader;
Expand Down
Loading
Loading