Skip to content

Commit

Permalink
Refinements
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
  • Loading branch information
fab-10 committed Mar 28, 2024
1 parent 221bcf8 commit c08ecc0
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void sendTransactionHashesToPeer(final EthPeer peer) {
final Capability capability = peer.getConnection().capability(EthProtocol.NAME);
for (final List<Transaction> txBatch :
Iterables.partition(
transactionTracker.claimTransactionsHashesToSendToPeer(peer), MAX_TRANSACTIONS_HASHES)) {
transactionTracker.claimTransactionHashesToSendToPeer(peer), MAX_TRANSACTIONS_HASHES)) {
try {
final List<Hash> txHashes = toHashList(txBatch);
LOG.atTrace()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ public class PeerTransactionTracker implements EthPeer.DisconnectCallback {
private static final int MAX_TRACKED_SEEN_TRANSACTIONS = 100_000;
private final Map<EthPeer, Set<Hash>> seenTransactions = new ConcurrentHashMap<>();
private final Map<EthPeer, Set<Transaction>> transactionsToSend = new ConcurrentHashMap<>();
private final Map<EthPeer, Set<Transaction>> transactionHashToSend = new ConcurrentHashMap<>();
private final Map<EthPeer, Set<Transaction>> transactionHashesToSend = new ConcurrentHashMap<>();

public void reset() {
seenTransactions.clear();
transactionsToSend.clear();
transactionHashToSend.clear();
transactionHashesToSend.clear();
}

public synchronized void markTransactionsAsSeen(
Expand All @@ -56,9 +56,13 @@ public synchronized void addToPeerSendQueue(final EthPeer peer, final Transactio
transactionsToSend.computeIfAbsent(peer, key -> createTransactionsSet()).add(transaction);
}
}
public synchronized void addToPeerHashSendQueue(final EthPeer peer, final Transaction transaction) {

public synchronized void addToPeerHashSendQueue(
final EthPeer peer, final Transaction transaction) {
if (!hasPeerSeenTransaction(peer, transaction)) {
transactionHashToSend.computeIfAbsent(peer, key -> createTransactionsSet()).add(transaction);
transactionHashesToSend
.computeIfAbsent(peer, key -> createTransactionsSet())
.add(transaction);
}
}

Expand All @@ -75,11 +79,12 @@ public synchronized Set<Transaction> claimTransactionsToSendToPeer(final EthPeer
return emptySet();
}
}
public synchronized Set<Transaction> claimTransactionsHashesToSendToPeer(final EthPeer peer) {
final Set<Transaction> transactionsToSend = this.transactionHashToSend.remove(peer);
if (transactionsToSend != null) {
markTransactionHashesAsSeen(peer, toHashList(transactionsToSend));
return transactionsToSend;

public synchronized Set<Transaction> claimTransactionHashesToSendToPeer(final EthPeer peer) {
final Set<Transaction> transactionHashesToSend = this.transactionHashesToSend.remove(peer);
if (transactionHashesToSend != null) {
markTransactionHashesAsSeen(peer, toHashList(transactionHashesToSend));
return transactionHashesToSend;
} else {
return emptySet();
}
Expand Down Expand Up @@ -116,6 +121,6 @@ protected boolean removeEldestEntry(final Map.Entry<T, Boolean> eldest) {
public void onDisconnect(final EthPeer peer) {
seenTransactions.remove(peer);
transactionsToSend.remove(peer);
transactionHashToSend.remove(peer);
transactionHashesToSend.remove(peer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void onTransactionsAdded(final Collection<Transaction> transactions) {
.addArgument(sendOnlyHashPeers::size)
.addArgument(sendMixedPeers::size)
.addArgument(sendOnlyFullTransactionPeers)
.addArgument(() -> sendOnlyHashPeers.toString() + sendMixedPeers.toString())
.addArgument(() -> sendOnlyHashPeers.toString() + sendMixedPeers)
.log();

sendToFullTransactionsPeers(
Expand Down Expand Up @@ -159,7 +159,6 @@ private void sendFullTransactions(
fullTransactionPeers.forEach(
peer -> {
transactions.stream()
.filter(tx -> !ANNOUNCE_HASH_ONLY_TX_TYPES.contains(tx.getType()))
.forEach(transaction -> transactionTracker.addToPeerSendQueue(peer, transaction));
ethContext
.getScheduler()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ public void sendTransactionsToPeers() {

void sendTransactionsToPeer(final EthPeer peer) {
final Set<Transaction> allTxToSend = transactionTracker.claimTransactionsToSendToPeer(peer);
//uncomment this to see the exception being thrown
//if(allTxToSend.stream().anyMatch(t -> TransactionType.BLOB.equals(t.getType()))) {
// throw new RuntimeException("Sent Blob");
//}
while (!allTxToSend.isEmpty()) {
final LimitedTransactionsMessages limitedTransactionsMessages =
LimitedTransactionsMessages.createLimited(allTxToSend);
Expand Down

0 comments on commit c08ecc0

Please sign in to comment.