Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

[PAN-3083] Transactions listeners should use the subscriber pattern #1877

Merged
merged 3 commits into from
Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public class PendingTransactions {
private final Map<Address, SortedMap<Long, TransactionInfo>> transactionsBySender =
new HashMap<>();

private final Subscribers<PendingTransactionListener> listeners = Subscribers.create();
private final Subscribers<PendingTransactionListener> pendingTransactionSubscribers =
Subscribers.create();

private final Subscribers<PendingTransactionDroppedListener> transactionDroppedListeners =
Subscribers.create();
Expand Down Expand Up @@ -261,7 +262,7 @@ private boolean shouldReplace(
}

private void notifyTransactionAdded(final Transaction transaction) {
listeners.forEach(listener -> listener.onTransactionAdded(transaction));
pendingTransactionSubscribers.forEach(listener -> listener.onTransactionAdded(transaction));
}

private void notifyTransactionDropped(final Transaction transaction) {
Expand Down Expand Up @@ -289,12 +290,20 @@ public Set<TransactionInfo> getTransactionInfo() {
return new HashSet<>(pendingTransactions.values());
}

void addTransactionListener(final PendingTransactionListener listener) {
listeners.subscribe(listener);
long subscribePendingTransactions(final PendingTransactionListener listener) {
return pendingTransactionSubscribers.subscribe(listener);
}

void unsubscribePendingTransactions(final long id) {
pendingTransactionSubscribers.unsubscribe(id);
}

long subscribeDroppedTransactions(final PendingTransactionDroppedListener listener) {
return transactionDroppedListeners.subscribe(listener);
}

void addTransactionDroppedListener(final PendingTransactionDroppedListener listener) {
transactionDroppedListeners.subscribe(listener);
void unsubscribeDroppedTransactions(final long id) {
transactionDroppedListeners.unsubscribe(id);
}

public OptionalLong getNextNonceForSender(final Address sender) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,20 @@ public void addRemoteTransactions(final Collection<Transaction> transactions) {
}
}

public void addTransactionListener(final PendingTransactionListener listener) {
pendingTransactions.addTransactionListener(listener);
public long subscribePendingTransactions(final PendingTransactionListener listener) {
return pendingTransactions.subscribePendingTransactions(listener);
}

public void addTransactionDroppedListener(final PendingTransactionDroppedListener listener) {
pendingTransactions.addTransactionDroppedListener(listener);
public void unsubscribePendingTransactions(final long id) {
pendingTransactions.unsubscribePendingTransactions(id);
}

public long subscribeDroppedTransactions(final PendingTransactionDroppedListener listener) {
return pendingTransactions.subscribeDroppedTransactions(listener);
}

public void unsubscribeDroppedTransactions(final long id) {
pendingTransactions.unsubscribeDroppedTransactions(id);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;

import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair;
Expand Down Expand Up @@ -159,16 +160,31 @@ public void shouldStartDroppingLocalTransactionsWhenPoolIsFullOfLocalTransaction

@Test
public void shouldNotifyListenerWhenRemoteTransactionAdded() {
transactions.addTransactionListener(listener);
transactions.subscribePendingTransactions(listener);

transactions.addRemoteTransaction(transaction1);

verify(listener).onTransactionAdded(transaction1);
}

@Test
public void shouldNotNotifyListenerAfterUnsubscribe() {
final long id = transactions.subscribePendingTransactions(listener);

transactions.addRemoteTransaction(transaction1);

verify(listener).onTransactionAdded(transaction1);

transactions.unsubscribePendingTransactions(id);

transactions.addRemoteTransaction(transaction2);

verifyZeroInteractions(listener);
}

@Test
public void shouldNotifyListenerWhenLocalTransactionAdded() {
transactions.addTransactionListener(listener);
transactions.subscribePendingTransactions(listener);

transactions.addLocalTransaction(transaction1);

Expand All @@ -179,18 +195,36 @@ public void shouldNotifyListenerWhenLocalTransactionAdded() {
public void shouldNotifyDroppedListenerWhenRemoteTransactionDropped() {
transactions.addRemoteTransaction(transaction1);

transactions.addTransactionDroppedListener(droppedListener);
transactions.subscribeDroppedTransactions(droppedListener);

transactions.removeTransaction(transaction1);

verify(droppedListener).onTransactionDropped(transaction1);
}

@Test
public void shouldNotNotifyDroppedListenerAfterUnsubscribe() {
transactions.addRemoteTransaction(transaction1);
transactions.addRemoteTransaction(transaction2);

final long id = transactions.subscribeDroppedTransactions(droppedListener);

transactions.removeTransaction(transaction1);

verify(droppedListener).onTransactionDropped(transaction1);

transactions.unsubscribeDroppedTransactions(id);

transactions.removeTransaction(transaction2);

verifyNoMoreInteractions(droppedListener);
}

@Test
public void shouldNotifyDroppedListenerWhenLocalTransactionDropped() {
transactions.addLocalTransaction(transaction1);

transactions.addTransactionDroppedListener(droppedListener);
transactions.subscribeDroppedTransactions(droppedListener);

transactions.removeTransaction(transaction1);

Expand All @@ -201,7 +235,7 @@ public void shouldNotifyDroppedListenerWhenLocalTransactionDropped() {
public void shouldNotNotifyDroppedListenerWhenTransactionAddedToBlock() {
transactions.addRemoteTransaction(transaction1);

transactions.addTransactionDroppedListener(droppedListener);
transactions.subscribeDroppedTransactions(droppedListener);

transactions.transactionAddedToBlock(transaction1);

Expand Down Expand Up @@ -310,7 +344,7 @@ public void shouldNotReplaceTransactionWithSameSenderAndNonceWhenGasPriceIsLower
final Transaction transaction1b = transactionWithNonceSenderAndGasPrice(1, KEYS1, 1);
assertThat(transactions.addRemoteTransaction(transaction1)).isTrue();

transactions.addTransactionListener(listener);
transactions.subscribePendingTransactions(listener);
assertThat(transactions.addRemoteTransaction(transaction1b)).isFalse();

assertTransactionNotPending(transaction1b);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public void shouldReadTransactionsFromThePreviousCanonicalHeadWhenAReorgOccurs()
final Block reorgFork1 = appendBlock(UInt256.ONE, commonParent);
verifyChainHeadIs(originalFork2);

transactions.addTransactionListener(listener);
transactions.subscribePendingTransactions(listener);
final Block reorgFork2 = appendBlock(UInt256.of(2000), reorgFork1.getHeader());
verifyChainHeadIs(reorgFork2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public FilterManager(
this.filterRepository = filterRepository;
checkNotNull(blockchainQueries.getBlockchain());
blockchainQueries.getBlockchain().observeBlockAdded(this::recordBlockEvent);
transactionPool.addTransactionListener(this::recordPendingTransactionEvent);
transactionPool.subscribePendingTransactions(this::recordPendingTransactionEvent);
this.blockchainQueries = blockchainQueries;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,8 +598,8 @@ private SubscriptionManager createSubscriptionManager(
new PendingTransactionSubscriptionService(subscriptionManager);
final PendingTransactionDroppedSubscriptionService pendingTransactionsRemoved =
new PendingTransactionDroppedSubscriptionService(subscriptionManager);
transactionPool.addTransactionListener(pendingTransactions);
transactionPool.addTransactionDroppedListener(pendingTransactionsRemoved);
transactionPool.subscribePendingTransactions(pendingTransactions);
transactionPool.subscribeDroppedTransactions(pendingTransactionsRemoved);
vertx.deployVerticle(subscriptionManager);

return subscriptionManager;
Expand Down