Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to avoid broadcasting full blob txs #6835

Merged
merged 8 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
- Don't enable the BFT mining coordinator when running sub commands such as `blocks export` [#6675](https://github.com/hyperledger/besu/pull/6675)
- In JSON-RPC return optional `v` fields for type 1 and type 2 transactions [#6762](https://github.com/hyperledger/besu/pull/6762)
- Fix Shanghai/QBFT block import bug when syncing new nodes [#6765](https://github.com/hyperledger/besu/pull/6765)
- Fix to avoid broadcasting full blob txs, instead of only the tx announcement, to a subset of nodes [#6835](https://github.com/hyperledger/besu/pull/6835)

### Download Links

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void sendTransactionHashesToPeer(final EthPeer peer) {
final Capability capability = peer.getConnection().capability(EthProtocol.NAME);
for (final List<Transaction> txBatch :
Iterables.partition(
transactionTracker.claimTransactionsToSendToPeer(peer), MAX_TRANSACTIONS_HASHES)) {
transactionTracker.claimTransactionHashesToSendToPeer(peer), MAX_TRANSACTIONS_HASHES)) {
try {
final List<Hash> txHashes = toHashList(txBatch);
LOG.atTrace()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ public class PeerTransactionTracker implements EthPeer.DisconnectCallback {
private static final int MAX_TRACKED_SEEN_TRANSACTIONS = 100_000;
private final Map<EthPeer, Set<Hash>> seenTransactions = new ConcurrentHashMap<>();
private final Map<EthPeer, Set<Transaction>> transactionsToSend = new ConcurrentHashMap<>();
private final Map<EthPeer, Set<Transaction>> transactionHashesToSend = new ConcurrentHashMap<>();

public void reset() {
seenTransactions.clear();
transactionsToSend.clear();
transactionHashesToSend.clear();
}

public synchronized void markTransactionsAsSeen(
Expand All @@ -55,6 +57,15 @@ public synchronized void addToPeerSendQueue(final EthPeer peer, final Transactio
}
}

public synchronized void addToPeerHashSendQueue(
final EthPeer peer, final Transaction transaction) {
if (!hasPeerSeenTransaction(peer, transaction)) {
transactionHashesToSend
.computeIfAbsent(peer, key -> createTransactionsSet())
.add(transaction);
}
}

public Iterable<EthPeer> getEthPeersWithUnsentTransactions() {
return transactionsToSend.keySet();
}
Expand All @@ -69,6 +80,16 @@ public synchronized Set<Transaction> claimTransactionsToSendToPeer(final EthPeer
}
}

public synchronized Set<Transaction> claimTransactionHashesToSendToPeer(final EthPeer peer) {
final Set<Transaction> transactionHashesToSend = this.transactionHashesToSend.remove(peer);
if (transactionHashesToSend != null) {
markTransactionHashesAsSeen(peer, toHashList(transactionHashesToSend));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should work, but i wonder if it would be more correct to mark them as seen after we know they've been sent to the peer. we could restore them to the cache in the exception handling of NewPooledTransactionHashesMessageSender:63

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, will review next

return transactionHashesToSend;
} else {
return emptySet();
}
}

public boolean hasSeenTransaction(final Hash txHash) {
return seenTransactions.values().stream().anyMatch(seen -> seen.contains(txHash));
}
Expand Down Expand Up @@ -100,5 +121,6 @@ protected boolean removeEldestEntry(final Map.Entry<T, Boolean> eldest) {
public void onDisconnect(final EthPeer peer) {
seenTransactions.remove(peer);
transactionsToSend.remove(peer);
transactionHashesToSend.remove(peer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,16 +49,33 @@ public class TransactionBroadcaster implements TransactionBatchAddedListener {
private final TransactionsMessageSender transactionsMessageSender;
private final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender;
private final EthContext ethContext;
private final Random random;

public TransactionBroadcaster(
final EthContext ethContext,
final PeerTransactionTracker transactionTracker,
final TransactionsMessageSender transactionsMessageSender,
final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender) {
this(
ethContext,
transactionTracker,
transactionsMessageSender,
newPooledTransactionHashesMessageSender,
null);
}

@VisibleForTesting
protected TransactionBroadcaster(
final EthContext ethContext,
final PeerTransactionTracker transactionTracker,
final TransactionsMessageSender transactionsMessageSender,
final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender,
final Long seed) {
this.transactionTracker = transactionTracker;
this.transactionsMessageSender = transactionsMessageSender;
this.newPooledTransactionHashesMessageSender = newPooledTransactionHashesMessageSender;
this.ethContext = ethContext;
this.random = seed != null ? new Random(seed) : new Random();
}

public void relayTransactionPoolTo(
Expand All @@ -65,7 +84,13 @@ public void relayTransactionPoolTo(
if (peer.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES)) {
sendTransactionHashes(toTransactionList(pendingTransactions), List.of(peer));
} else {
sendFullTransactions(toTransactionList(pendingTransactions), List.of(peer));
// we need to exclude txs that support hash only broadcasting
final var fullBroadcastTxs =
pendingTransactions.stream()
.map(PendingTransaction::getTransaction)
.filter(tx -> !ANNOUNCE_HASH_ONLY_TX_TYPES.contains(tx.getType()))
.toList();
sendFullTransactions(fullBroadcastTxs, List.of(peer));
}
}
}
Expand All @@ -77,7 +102,7 @@ public void onTransactionsAdded(final Collection<Transaction> transactions) {
return;
}

final int numPeersToSendFullTransactions = (int) Math.ceil(Math.sqrt(currPeerCount));
final int numPeersToSendFullTransactions = (int) Math.round(Math.sqrt(currPeerCount));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows for rounding down as well as up, instead of ceil which is up only. It still works, but I'm unclear why it is necessary or preferred?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not necessary, just seems more appropriate to round instead of ceil, to avoid strange results when few peers are connected, for example with 2 peers, with round we have 1, otherwise is 2.


final Map<Boolean, List<Transaction>> transactionByBroadcastMode =
transactions.stream()
Expand Down Expand Up @@ -107,7 +132,7 @@ public void onTransactionsAdded(final Collection<Transaction> transactions) {
numPeersToSendFullTransactions - sendOnlyFullTransactionPeers.size(),
sendOnlyHashPeers.size());

Collections.shuffle(sendOnlyHashPeers);
Collections.shuffle(sendOnlyHashPeers, random);

// move peers from the mixed list to reach the required size for full transaction peers
movePeersBetweenLists(sendOnlyHashPeers, sendMixedPeers, delta);
Expand All @@ -121,7 +146,7 @@ public void onTransactionsAdded(final Collection<Transaction> transactions) {
.addArgument(sendOnlyHashPeers::size)
.addArgument(sendMixedPeers::size)
.addArgument(sendOnlyFullTransactionPeers)
.addArgument(() -> sendOnlyHashPeers.toString() + sendMixedPeers.toString())
.addArgument(() -> sendOnlyHashPeers.toString() + sendMixedPeers)
.log();

sendToFullTransactionsPeers(
Expand All @@ -141,7 +166,7 @@ private void sendToOnlyHashPeers(
final Map<Boolean, List<Transaction>> txsByHashOnlyBroadcast,
final List<EthPeer> hashOnlyPeers) {
final List<Transaction> allTransactions =
txsByHashOnlyBroadcast.values().stream().flatMap(List::stream).collect(Collectors.toList());
txsByHashOnlyBroadcast.values().stream().flatMap(List::stream).toList();

sendTransactionHashes(allTransactions, hashOnlyPeers);
}
Expand Down Expand Up @@ -175,7 +200,7 @@ private void sendTransactionHashes(
.forEach(
peer -> {
transactions.forEach(
transaction -> transactionTracker.addToPeerSendQueue(peer, transaction));
transaction -> transactionTracker.addToPeerHashSendQueue(peer, transaction));
ethContext
.getScheduler()
.scheduleSyncWorkerTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ public void setUp() {
@Test
public void shouldSendPendingTransactionsToEachPeer() throws Exception {

transactionTracker.addToPeerSendQueue(peer1, transaction1);
transactionTracker.addToPeerSendQueue(peer1, transaction2);
transactionTracker.addToPeerSendQueue(peer2, transaction3);
transactionTracker.addToPeerHashSendQueue(peer1, transaction1);
transactionTracker.addToPeerHashSendQueue(peer1, transaction2);
transactionTracker.addToPeerHashSendQueue(peer2, transaction3);

List.of(peer1, peer2).forEach(messageSender::sendTransactionHashesToPeer);

Expand All @@ -96,7 +96,8 @@ public void shouldSendTransactionsInBatchesWithLimit() throws Exception {
final Set<Transaction> transactions =
generator.transactions(6000).stream().collect(Collectors.toSet());

transactions.forEach(transaction -> transactionTracker.addToPeerSendQueue(peer1, transaction));
transactions.forEach(
transaction -> transactionTracker.addToPeerHashSendQueue(peer1, transaction));

messageSender.sendTransactionHashesToPeer(peer1);
final ArgumentCaptor<MessageData> messageDataArgumentCaptor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class TransactionBroadcasterTest {

private static final Long FIXED_RANDOM_SEED = 0L;
@Mock private EthContext ethContext;
@Mock private EthPeers ethPeers;
@Mock private EthScheduler ethScheduler;
Expand Down Expand Up @@ -92,12 +92,14 @@ public void setUp() {
when(ethContext.getEthPeers()).thenReturn(ethPeers);
when(ethContext.getScheduler()).thenReturn(ethScheduler);

// we use the fixed random seed to have a predictable shuffle of peers
txBroadcaster =
new TransactionBroadcaster(
ethContext,
transactionTracker,
transactionsMessageSender,
newPooledTransactionHashesMessageSender);
newPooledTransactionHashesMessageSender,
FIXED_RANDOM_SEED);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent move. My list of "flaky tests to fix" thanks you!

}

@Test
Expand Down Expand Up @@ -132,7 +134,7 @@ public void relayTransactionHashesFromPoolWhenPeerSupportEth65() {

txBroadcaster.relayTransactionPoolTo(ethPeerWithEth65, pendingTxs);

verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65, txs);
verifyTransactionAddedToPeerHashSendingQueue(ethPeerWithEth65, txs);

sendTaskCapture.getValue().run();

Expand Down Expand Up @@ -177,14 +179,16 @@ public void onTransactionsAddedWithOnlyFewEth65PeersSendFullTransactions() {
List<Transaction> txs = toTransactionList(setupTransactionPool(1, 1));

txBroadcaster.onTransactionsAdded(txs);

verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65, txs);
// the shuffled hash only peer list is always:
// [ethPeerWithEth65_3, ethPeerWithEth65_2, ethPeerWithEth65]
// so ethPeerWithEth65 and ethPeerWithEth65_2 are moved to the mixed broadcast list
verifyTransactionAddedToPeerHashSendingQueue(ethPeerWithEth65, txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65_2, txs);

sendTaskCapture.getAllValues().forEach(Runnable::run);

verify(transactionsMessageSender, times(2)).sendTransactionsToPeer(any(EthPeer.class));
verifyNoInteractions(newPooledTransactionHashesMessageSender);
verify(transactionsMessageSender).sendTransactionsToPeer(ethPeerWithEth65_2);
verify(newPooledTransactionHashesMessageSender).sendTransactionHashesToPeer(ethPeerWithEth65);
}

@Test
Expand All @@ -196,10 +200,12 @@ public void onTransactionsAddedWithOnlyEth65PeersSendFullTransactionsAndTransact
List<Transaction> txs = toTransactionList(setupTransactionPool(1, 1));

txBroadcaster.onTransactionsAdded(txs);

// the shuffled hash only peer list is always:
// [ethPeerWithEth65_3, ethPeerWithEth65_2, ethPeerWithEth65]
// so ethPeerWithEth65 and ethPeerWithEth65_2 are moved to the mixed broadcast list
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65, txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65_2, txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65_3, txs);
verifyTransactionAddedToPeerHashSendingQueue(ethPeerWithEth65_3, txs);

sendTaskCapture.getAllValues().forEach(Runnable::run);

Expand All @@ -218,8 +224,10 @@ public void onTransactionsAddedWithMixedPeersSendFullTransactionsAndTransactionH
List<Transaction> txs = toTransactionList(setupTransactionPool(1, 1));

txBroadcaster.onTransactionsAdded(txs);

verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65, txs);
// the shuffled hash only peer list is always:
// [ethPeerWithEth65, ethPeerWithEth65_2]
// so ethPeerWithEth65_2 is moved to the mixed broadcast list
verifyTransactionAddedToPeerHashSendingQueue(ethPeerWithEth65, txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65_2, txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerNoEth65, txs);

Expand Down Expand Up @@ -250,9 +258,11 @@ public void onTransactionsAddedWithMixedPeersSendFullTransactionsAndTransactionH
List<Transaction> txs = toTransactionList(setupTransactionPool(BLOB, 0, 1));

txBroadcaster.onTransactionsAdded(txs);

verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65, txs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65_2, txs);
// the shuffled hash only peer list is always:
// [ethPeerWithEth65, ethPeerWithEth65_2]
// so ethPeerWithEth65_2 is moved to the mixed broadcast list
verifyTransactionAddedToPeerHashSendingQueue(ethPeerWithEth65, txs);
verifyTransactionAddedToPeerHashSendingQueue(ethPeerWithEth65_2, txs);
verifyNoTransactionAddedToPeerSendingQueue(ethPeerNoEth65);

sendTaskCapture.getAllValues().forEach(Runnable::run);
Expand All @@ -268,7 +278,6 @@ public void onTransactionsAddedWithMixedPeersSendFullTransactionsAndTransactionH

@Test
public void onTransactionsAddedWithMixedPeersAndMixedBroadcastKind() {

List<EthPeer> eth65Peers = List.of(ethPeerWithEth65, ethPeerWithEth65_2);

when(ethPeers.peerCount()).thenReturn(3);
Expand All @@ -285,9 +294,12 @@ public void onTransactionsAddedWithMixedPeersAndMixedBroadcastKind() {
mixedTxs.addAll(hashBroadcastTxs);

txBroadcaster.onTransactionsAdded(mixedTxs);

verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65, mixedTxs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65_2, mixedTxs);
// the shuffled hash only peer list is always:
// [ethPeerWithEth65, ethPeerWithEth65_2]
// so ethPeerWithEth65_2 is moved to the mixed broadcast list
verifyTransactionAddedToPeerHashSendingQueue(ethPeerWithEth65, mixedTxs);
verifyTransactionAddedToPeerHashSendingQueue(ethPeerWithEth65_2, hashBroadcastTxs);
verifyTransactionAddedToPeerSendingQueue(ethPeerWithEth65_2, fullBroadcastTxs);
verifyTransactionAddedToPeerSendingQueue(ethPeerNoEth65, fullBroadcastTxs);

sendTaskCapture.getAllValues().forEach(Runnable::run);
Expand Down Expand Up @@ -348,6 +360,16 @@ private void verifyTransactionAddedToPeerSendingQueue(
.containsExactlyInAnyOrderElementsOf(transactions);
}

private void verifyTransactionAddedToPeerHashSendingQueue(
final EthPeer peer, final Collection<Transaction> transactions) {

ArgumentCaptor<Transaction> trackedTransactions = ArgumentCaptor.forClass(Transaction.class);
verify(transactionTracker, times(transactions.size()))
.addToPeerHashSendQueue(eq(peer), trackedTransactions.capture());
assertThat(trackedTransactions.getAllValues())
.containsExactlyInAnyOrderElementsOf(transactions);
}

private void verifyNoTransactionAddedToPeerSendingQueue(final EthPeer peer) {

verify(transactionTracker, times(0)).addToPeerSendQueue(eq(peer), any());
Expand Down
Loading