diff --git a/acceptance-tests/src/test/java/tech/pegasys/pantheon/tests/acceptance/dsl/node/ThreadPantheonNodeRunner.java b/acceptance-tests/src/test/java/tech/pegasys/pantheon/tests/acceptance/dsl/node/ThreadPantheonNodeRunner.java index 8bb574c720..0f36207bae 100644 --- a/acceptance-tests/src/test/java/tech/pegasys/pantheon/tests/acceptance/dsl/node/ThreadPantheonNodeRunner.java +++ b/acceptance-tests/src/test/java/tech/pegasys/pantheon/tests/acceptance/dsl/node/ThreadPantheonNodeRunner.java @@ -137,7 +137,8 @@ public void startNode(final PantheonNode node) { PantheonEvents.class, new PantheonEventsImpl( pantheonController.getProtocolManager().getBlockBroadcaster(), - pantheonController.getTransactionPool())); + pantheonController.getTransactionPool(), + pantheonController.getSyncState())); pantheonPluginContext.startPlugins(); final Runner runner = diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/SyncStatus.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/SyncStatus.java index 165df0e95d..72596546b1 100644 --- a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/SyncStatus.java +++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/SyncStatus.java @@ -14,7 +14,7 @@ import java.util.Objects; -public final class SyncStatus { +public final class SyncStatus implements tech.pegasys.pantheon.plugin.data.SyncStatus { private final long startingBlock; private final long currentBlock; @@ -26,18 +26,22 @@ public SyncStatus(final long startingBlock, final long currentBlock, final long this.highestBlock = highestBlock; } + @Override public long getStartingBlock() { return startingBlock; } + @Override public long getCurrentBlock() { return currentBlock; } + @Override public long getHighestBlock() { return highestBlock; } + @Override public boolean inSync() { return currentBlock == highestBlock; } diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/Synchronizer.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/Synchronizer.java index 408674be8b..eb2ed49400 100644 --- a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/Synchronizer.java +++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/Synchronizer.java @@ -12,6 +12,9 @@ */ package tech.pegasys.pantheon.ethereum.core; +import tech.pegasys.pantheon.plugin.data.SyncStatus; +import tech.pegasys.pantheon.plugin.services.PantheonEvents; + import java.util.Optional; /** Provides an interface to block synchronization processes. */ @@ -27,12 +30,7 @@ public interface Synchronizer { */ Optional getSyncStatus(); - long observeSyncStatus(final SyncStatusListener listener); + long observeSyncStatus(final PantheonEvents.SyncStatusListener listener); boolean removeObserver(long observerId); - - @FunctionalInterface - interface SyncStatusListener { - void onSyncStatus(final SyncStatus status); - } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java index 528705a4fa..20993cd01c 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java @@ -15,7 +15,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import tech.pegasys.pantheon.ethereum.ProtocolContext; -import tech.pegasys.pantheon.ethereum.core.SyncStatus; import tech.pegasys.pantheon.ethereum.core.Synchronizer; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastDownloaderFactory; @@ -29,7 +28,9 @@ import tech.pegasys.pantheon.ethereum.worldstate.Pruner; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; import tech.pegasys.pantheon.metrics.PantheonMetricCategory; +import tech.pegasys.pantheon.plugin.data.SyncStatus; import tech.pegasys.pantheon.plugin.services.MetricsSystem; +import tech.pegasys.pantheon.plugin.services.PantheonEvents.SyncStatusListener; import tech.pegasys.pantheon.util.ExceptionUtils; import tech.pegasys.pantheon.util.Subscribers; @@ -176,10 +177,11 @@ public Optional getSyncStatus() { if (!running.get()) { return Optional.empty(); } - if (syncState.syncStatus().getCurrentBlock() == syncState.syncStatus().getHighestBlock()) { + final SyncStatus syncStatus = syncState.syncStatus(); + if (syncStatus.inSync()) { return Optional.empty(); } - return Optional.of(syncState.syncStatus()); + return Optional.of(syncStatus); } @Override @@ -194,6 +196,6 @@ public boolean removeObserver(final long observerId) { } private void syncStatusCallback(final SyncStatus status) { - syncStatusListeners.forEach(c -> c.onSyncStatus(status)); + syncStatusListeners.forEach(c -> c.onSyncStatusChanged(status)); } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncState.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncState.java index 257a83803c..c3cbc57546 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncState.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncState.java @@ -16,14 +16,16 @@ import tech.pegasys.pantheon.ethereum.chain.ChainHead; import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.SyncStatus; -import tech.pegasys.pantheon.ethereum.core.Synchronizer.SyncStatusListener; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers; import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; +import tech.pegasys.pantheon.plugin.services.PantheonEvents.SyncStatusListener; import tech.pegasys.pantheon.util.Subscribers; import java.util.Optional; +import com.google.common.annotations.VisibleForTesting; + public class SyncState { private static final long SYNC_TOLERANCE = 5; private final Blockchain blockchain; @@ -49,17 +51,22 @@ public SyncState(final Blockchain blockchain, final EthPeers ethPeers) { }); } - private void publishSyncStatus() { + @VisibleForTesting + public void publishSyncStatus() { final SyncStatus syncStatus = syncStatus(); - syncStatusListeners.forEach(c -> c.onSyncStatus(syncStatus)); + syncStatusListeners.forEach(c -> c.onSyncStatusChanged(syncStatus)); } public void addInSyncListener(final InSyncListener observer) { inSyncListeners.subscribe(observer); } - public void addSyncStatusListener(final SyncStatusListener observer) { - syncStatusListeners.subscribe(observer); + public long addSyncStatusListener(final SyncStatusListener observer) { + return syncStatusListeners.subscribe(observer); + } + + public void removeSyncStatusListener(final long listenerId) { + syncStatusListeners.unsubscribe(listenerId); } public SyncStatus syncStatus() { @@ -141,10 +148,10 @@ public long bestChainHeight(final long localChainHeight) { } private synchronized void checkInSync() { - final boolean currentSyncStatus = isInSync(); - if (lastInSync != currentSyncStatus) { - lastInSync = currentSyncStatus; - inSyncListeners.forEach(c -> c.onSyncStatusChanged(currentSyncStatus)); + final boolean currentInSync = isInSync(); + if (lastInSync != currentInSync) { + lastInSync = currentInSync; + inSyncListeners.forEach(c -> c.onSyncStatusChanged(currentInSync)); } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncStateTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncStateTest.java index f5caa86a94..06d5f7711e 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncStateTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncStateTest.java @@ -31,10 +31,10 @@ import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture; import tech.pegasys.pantheon.ethereum.core.Hash; -import tech.pegasys.pantheon.ethereum.core.Synchronizer.SyncStatusListener; import tech.pegasys.pantheon.ethereum.eth.manager.ChainState; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers; +import tech.pegasys.pantheon.plugin.services.PantheonEvents.SyncStatusListener; import tech.pegasys.pantheon.util.uint.UInt256; import java.util.Collections; @@ -237,7 +237,7 @@ public void shouldSendSyncStatusWhenBlockIsAddedToTheChain() { new BlockBody(Collections.emptyList(), Collections.emptyList()))), blockchain); - verify(syncStatusListener).onSyncStatus(eq(syncState.syncStatus())); + verify(syncStatusListener).onSyncStatusChanged(eq(syncState.syncStatus())); } private void setupOutOfSyncState() { diff --git a/ethereum/graphql/src/main/java/tech/pegasys/pantheon/ethereum/graphql/GraphQLDataFetchers.java b/ethereum/graphql/src/main/java/tech/pegasys/pantheon/ethereum/graphql/GraphQLDataFetchers.java index f85953ec35..f5bdae7ded 100644 --- a/ethereum/graphql/src/main/java/tech/pegasys/pantheon/ethereum/graphql/GraphQLDataFetchers.java +++ b/ethereum/graphql/src/main/java/tech/pegasys/pantheon/ethereum/graphql/GraphQLDataFetchers.java @@ -18,7 +18,6 @@ import tech.pegasys.pantheon.ethereum.core.Account; import tech.pegasys.pantheon.ethereum.core.Address; import tech.pegasys.pantheon.ethereum.core.Hash; -import tech.pegasys.pantheon.ethereum.core.SyncStatus; import tech.pegasys.pantheon.ethereum.core.Synchronizer; import tech.pegasys.pantheon.ethereum.core.Transaction; import tech.pegasys.pantheon.ethereum.core.WorldState; @@ -38,6 +37,7 @@ import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.Capability; import tech.pegasys.pantheon.ethereum.rlp.RLP; import tech.pegasys.pantheon.ethereum.rlp.RLPException; +import tech.pegasys.pantheon.plugin.data.SyncStatus; import tech.pegasys.pantheon.util.bytes.Bytes32; import tech.pegasys.pantheon.util.bytes.BytesValue; import tech.pegasys.pantheon.util.uint.UInt256; diff --git a/ethereum/graphql/src/main/java/tech/pegasys/pantheon/ethereum/graphql/internal/pojoadapter/SyncStateAdapter.java b/ethereum/graphql/src/main/java/tech/pegasys/pantheon/ethereum/graphql/internal/pojoadapter/SyncStateAdapter.java index 9aa1505d13..97d7df27b7 100644 --- a/ethereum/graphql/src/main/java/tech/pegasys/pantheon/ethereum/graphql/internal/pojoadapter/SyncStateAdapter.java +++ b/ethereum/graphql/src/main/java/tech/pegasys/pantheon/ethereum/graphql/internal/pojoadapter/SyncStateAdapter.java @@ -12,7 +12,7 @@ */ package tech.pegasys.pantheon.ethereum.graphql.internal.pojoadapter; -import tech.pegasys.pantheon.ethereum.core.SyncStatus; +import tech.pegasys.pantheon.plugin.data.SyncStatus; import java.util.Optional; diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/health/ReadinessCheck.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/health/ReadinessCheck.java index 70645f0875..d623d6390f 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/health/ReadinessCheck.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/health/ReadinessCheck.java @@ -12,11 +12,11 @@ */ package tech.pegasys.pantheon.ethereum.jsonrpc.health; -import tech.pegasys.pantheon.ethereum.core.SyncStatus; import tech.pegasys.pantheon.ethereum.core.Synchronizer; import tech.pegasys.pantheon.ethereum.jsonrpc.health.HealthService.HealthCheck; import tech.pegasys.pantheon.ethereum.jsonrpc.health.HealthService.ParamSource; import tech.pegasys.pantheon.ethereum.p2p.network.P2PNetwork; +import tech.pegasys.pantheon.plugin.data.SyncStatus; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/results/SyncingResult.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/results/SyncingResult.java index 7887d527c8..35ead9a3da 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/results/SyncingResult.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/results/SyncingResult.java @@ -12,7 +12,7 @@ */ package tech.pegasys.pantheon.ethereum.jsonrpc.internal.results; -import tech.pegasys.pantheon.ethereum.core.SyncStatus; +import tech.pegasys.pantheon.plugin.data.SyncStatus; import java.util.Objects; diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManager.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManager.java index 4225f15deb..2298420283 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManager.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManager.java @@ -37,7 +37,7 @@ /** * The SubscriptionManager is responsible for managing subscriptions and sending messages to the - * clients that have an active subscription subscription. + * clients that have an active subscription. */ public class SubscriptionManager extends AbstractVerticle { diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java index 64382f243e..c90ac19504 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java @@ -12,12 +12,12 @@ */ package tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.syncing; -import tech.pegasys.pantheon.ethereum.core.SyncStatus; import tech.pegasys.pantheon.ethereum.core.Synchronizer; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.results.SyncingResult; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.Subscription; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType; +import tech.pegasys.pantheon.plugin.data.SyncStatus; public class SyncingSubscriptionService { diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/health/ReadinessCheckTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/health/ReadinessCheckTest.java index b5856c4263..ca8e6472b1 100644 --- a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/health/ReadinessCheckTest.java +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/health/ReadinessCheckTest.java @@ -16,10 +16,10 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import tech.pegasys.pantheon.ethereum.core.SyncStatus; import tech.pegasys.pantheon.ethereum.core.Synchronizer; import tech.pegasys.pantheon.ethereum.jsonrpc.health.HealthService.ParamSource; import tech.pegasys.pantheon.ethereum.p2p.network.P2PNetwork; +import tech.pegasys.pantheon.plugin.data.SyncStatus; import java.util.HashMap; import java.util.Map; @@ -141,6 +141,7 @@ public void shouldNotBeReadyWhenCustomMaxBlocksBehindIsInvalid() { } private Optional createSyncStatus(final int currentBlock, final int highestBlock) { - return Optional.of(new SyncStatus(0, currentBlock, highestBlock)); + return Optional.of( + new tech.pegasys.pantheon.ethereum.core.SyncStatus(0, currentBlock, highestBlock)); } } diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/methods/EthSyncingTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/methods/EthSyncingTest.java index 7425a64ccb..5f41b4f158 100644 --- a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/methods/EthSyncingTest.java +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/internal/methods/EthSyncingTest.java @@ -17,12 +17,12 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import tech.pegasys.pantheon.ethereum.core.SyncStatus; import tech.pegasys.pantheon.ethereum.core.Synchronizer; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.JsonRpcRequest; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.response.JsonRpcResponse; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.response.JsonRpcSuccessResponse; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.results.SyncingResult; +import tech.pegasys.pantheon.plugin.data.SyncStatus; import java.util.Optional; @@ -66,7 +66,8 @@ public void shouldReturnFalseWhenSyncStatusIsEmpty() { @Test public void shouldReturnExpectedValueWhenSyncStatusIsNotEmpty() { final JsonRpcRequest request = requestWithParams(); - final SyncStatus expectedSyncStatus = new SyncStatus(0, 1, 2); + final SyncStatus expectedSyncStatus = + new tech.pegasys.pantheon.ethereum.core.SyncStatus(0, 1, 2); final JsonRpcResponse expectedResponse = new JsonRpcSuccessResponse(request.getId(), new SyncingResult(expectedSyncStatus)); final Optional optionalSyncStatus = Optional.of(expectedSyncStatus); diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java index 830ec0d2ca..2ed3b407ff 100644 --- a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java @@ -20,10 +20,10 @@ import tech.pegasys.pantheon.ethereum.core.SyncStatus; import tech.pegasys.pantheon.ethereum.core.Synchronizer; -import tech.pegasys.pantheon.ethereum.core.Synchronizer.SyncStatusListener; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.results.SyncingResult; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType; +import tech.pegasys.pantheon.plugin.services.PantheonEvents.SyncStatusListener; import java.util.Collections; import java.util.List; @@ -69,7 +69,7 @@ public void shouldSendSyncStatusWhenReceiveSyncStatus() { .when(subscriptionManager) .notifySubscribersOnWorkerThread(any(), any(), any()); - syncStatusListener.onSyncStatus(syncStatus); + syncStatusListener.onSyncStatusChanged(syncStatus); verify(subscriptionManager) .sendMessage(eq(subscription.getSubscriptionId()), eq(expectedSyncingResult)); @@ -91,7 +91,7 @@ public void shouldSendNotSyncingStatusWhenReceiveSyncStatusAtHead() { .when(subscriptionManager) .notifySubscribersOnWorkerThread(any(), any(), any()); - syncStatusListener.onSyncStatus(syncStatus); + syncStatusListener.onSyncStatusChanged(syncStatus); verify(subscriptionManager) .sendMessage(eq(subscription.getSubscriptionId()), any(NotSynchronisingResult.class)); diff --git a/ethereum/permissioning/src/main/java/tech/pegasys/pantheon/ethereum/permissioning/node/provider/SyncStatusNodePermissioningProvider.java b/ethereum/permissioning/src/main/java/tech/pegasys/pantheon/ethereum/permissioning/node/provider/SyncStatusNodePermissioningProvider.java index 9493c132fb..e531d0bfc0 100644 --- a/ethereum/permissioning/src/main/java/tech/pegasys/pantheon/ethereum/permissioning/node/provider/SyncStatusNodePermissioningProvider.java +++ b/ethereum/permissioning/src/main/java/tech/pegasys/pantheon/ethereum/permissioning/node/provider/SyncStatusNodePermissioningProvider.java @@ -14,11 +14,11 @@ import static com.google.common.base.Preconditions.checkNotNull; -import tech.pegasys.pantheon.ethereum.core.SyncStatus; import tech.pegasys.pantheon.ethereum.core.Synchronizer; import tech.pegasys.pantheon.ethereum.p2p.peers.EnodeURL; import tech.pegasys.pantheon.ethereum.permissioning.node.NodePermissioningProvider; import tech.pegasys.pantheon.metrics.PantheonMetricCategory; +import tech.pegasys.pantheon.plugin.data.SyncStatus; import tech.pegasys.pantheon.plugin.services.MetricsSystem; import tech.pegasys.pantheon.plugin.services.metrics.Counter; diff --git a/ethereum/permissioning/src/test/java/tech/pegasys/pantheon/ethereum/permissioning/node/provider/SyncStatusNodePermissioningProviderTest.java b/ethereum/permissioning/src/test/java/tech/pegasys/pantheon/ethereum/permissioning/node/provider/SyncStatusNodePermissioningProviderTest.java index c62fc53159..3fdbaa69da 100644 --- a/ethereum/permissioning/src/test/java/tech/pegasys/pantheon/ethereum/permissioning/node/provider/SyncStatusNodePermissioningProviderTest.java +++ b/ethereum/permissioning/src/test/java/tech/pegasys/pantheon/ethereum/permissioning/node/provider/SyncStatusNodePermissioningProviderTest.java @@ -21,10 +21,10 @@ import tech.pegasys.pantheon.ethereum.core.SyncStatus; import tech.pegasys.pantheon.ethereum.core.Synchronizer; -import tech.pegasys.pantheon.ethereum.core.Synchronizer.SyncStatusListener; import tech.pegasys.pantheon.ethereum.p2p.peers.EnodeURL; import tech.pegasys.pantheon.metrics.PantheonMetricCategory; import tech.pegasys.pantheon.plugin.services.MetricsSystem; +import tech.pegasys.pantheon.plugin.services.PantheonEvents.SyncStatusListener; import tech.pegasys.pantheon.plugin.services.metrics.Counter; import java.util.ArrayList; @@ -104,7 +104,7 @@ public void before() { @Test public void whenIsNotInSyncHasReachedSyncShouldReturnFalse() { - syncStatusListener.onSyncStatus(new SyncStatus(0, 1, 2)); + syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 1, 2)); assertThat(provider.hasReachedSync()).isFalse(); assertThat(syncGauge.getAsInt()).isEqualTo(0); @@ -112,7 +112,7 @@ public void whenIsNotInSyncHasReachedSyncShouldReturnFalse() { @Test public void whenInSyncHasReachedSyncShouldReturnTrue() { - syncStatusListener.onSyncStatus(new SyncStatus(0, 1, 1)); + syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 1, 1)); assertThat(provider.hasReachedSync()).isTrue(); assertThat(syncGauge.getAsInt()).isEqualTo(1); @@ -120,22 +120,22 @@ public void whenInSyncHasReachedSyncShouldReturnTrue() { @Test public void whenInSyncChangesFromTrueToFalseHasReachedSyncShouldReturnTrue() { - syncStatusListener.onSyncStatus(new SyncStatus(0, 1, 2)); + syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 1, 2)); assertThat(provider.hasReachedSync()).isFalse(); assertThat(syncGauge.getAsInt()).isEqualTo(0); - syncStatusListener.onSyncStatus(new SyncStatus(0, 2, 1)); + syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 2, 1)); assertThat(provider.hasReachedSync()).isTrue(); assertThat(syncGauge.getAsInt()).isEqualTo(1); - syncStatusListener.onSyncStatus(new SyncStatus(0, 2, 3)); + syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 2, 3)); assertThat(provider.hasReachedSync()).isTrue(); assertThat(syncGauge.getAsInt()).isEqualTo(1); } @Test public void whenHasNotSyncedNonBootnodeShouldNotBePermitted() { - syncStatusListener.onSyncStatus(new SyncStatus(0, 1, 2)); + syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 1, 2)); assertThat(provider.hasReachedSync()).isFalse(); assertThat(syncGauge.getAsInt()).isEqualTo(0); @@ -149,7 +149,7 @@ public void whenHasNotSyncedNonBootnodeShouldNotBePermitted() { @Test public void whenHasNotSyncedBootnodeIncomingConnectionShouldNotBePermitted() { - syncStatusListener.onSyncStatus(new SyncStatus(0, 1, 2)); + syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 1, 2)); assertThat(provider.hasReachedSync()).isFalse(); assertThat(syncGauge.getAsInt()).isEqualTo(0); @@ -163,7 +163,7 @@ public void whenHasNotSyncedBootnodeIncomingConnectionShouldNotBePermitted() { @Test public void whenHasNotSyncedBootnodeOutgoingConnectionShouldBePermitted() { - syncStatusListener.onSyncStatus(new SyncStatus(0, 1, 2)); + syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 1, 2)); assertThat(provider.hasReachedSync()).isFalse(); assertThat(syncGauge.getAsInt()).isEqualTo(0); @@ -177,7 +177,7 @@ public void whenHasNotSyncedBootnodeOutgoingConnectionShouldBePermitted() { @Test public void whenHasSyncedIsPermittedShouldReturnTrue() { - syncStatusListener.onSyncStatus(new SyncStatus(0, 1, 1)); + syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 1, 1)); assertThat(provider.hasReachedSync()).isTrue(); assertThat(syncGauge.getAsInt()).isEqualTo(1); @@ -191,7 +191,7 @@ public void whenHasSyncedIsPermittedShouldReturnTrue() { @Test public void syncStatusPermissioningCheckShouldIgnoreEnodeURLDiscoveryPort() { - syncStatusListener.onSyncStatus(new SyncStatus(0, 1, 2)); + syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 1, 2)); assertThat(provider.hasReachedSync()).isFalse(); final EnodeURL bootnode = diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonCommand.java b/pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonCommand.java index 7071a8f127..a678c57b0d 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonCommand.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonCommand.java @@ -822,8 +822,9 @@ private PantheonCommand startPlugins() { pantheonPluginContext.addService( PantheonEvents.class, new PantheonEventsImpl( - (pantheonController.getProtocolManager().getBlockBroadcaster()), - pantheonController.getTransactionPool())); + pantheonController.getProtocolManager().getBlockBroadcaster(), + pantheonController.getTransactionPool(), + pantheonController.getSyncState())); pantheonPluginContext.startPlugins(); return this; } diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/PantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/PantheonController.java index 0b75529f32..84cacc53a7 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/PantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/PantheonController.java @@ -21,6 +21,7 @@ import tech.pegasys.pantheon.ethereum.core.PrivacyParameters; import tech.pegasys.pantheon.ethereum.core.Synchronizer; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; +import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPool; import tech.pegasys.pantheon.ethereum.jsonrpc.RpcApi; import tech.pegasys.pantheon.ethereum.jsonrpc.internal.methods.JsonRpcMethod; @@ -48,6 +49,7 @@ public class PantheonController implements java.io.Closeable { private final MiningCoordinator miningCoordinator; private final PrivacyParameters privacyParameters; private final Runnable close; + private final SyncState syncState; PantheonController( final ProtocolSchedule protocolSchedule, @@ -56,18 +58,20 @@ public class PantheonController implements java.io.Closeable { final GenesisConfigOptions genesisConfigOptions, final SubProtocolConfiguration subProtocolConfiguration, final Synchronizer synchronizer, - final JsonRpcMethodFactory additionalJsonRpcMethodsFactory, - final KeyPair keyPair, + final SyncState syncState, final TransactionPool transactionPool, final MiningCoordinator miningCoordinator, final PrivacyParameters privacyParameters, - final Runnable close) { + final Runnable close, + final JsonRpcMethodFactory additionalJsonRpcMethodsFactory, + final KeyPair keyPair) { this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.ethProtocolManager = ethProtocolManager; this.genesisConfigOptions = genesisConfigOptions; this.subProtocolConfiguration = subProtocolConfiguration; this.synchronizer = synchronizer; + this.syncState = syncState; this.additionalJsonRpcMethodsFactory = additionalJsonRpcMethodsFactory; this.keyPair = keyPair; this.transactionPool = transactionPool; @@ -126,6 +130,10 @@ public Map getAdditionalJsonRpcMethods( return additionalJsonRpcMethodsFactory.createJsonRpcMethods(enabledRpcApis); } + public SyncState getSyncState() { + return syncState; + } + public static class Builder { public PantheonControllerBuilder fromEthNetworkConfig( diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/PantheonControllerBuilder.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/PantheonControllerBuilder.java index c855fbd862..576d5b1895 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/PantheonControllerBuilder.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/PantheonControllerBuilder.java @@ -323,8 +323,7 @@ public PantheonController build() throws IOException { genesisConfig.getConfigOptions(genesisConfigOverrides), subProtocolConfiguration, synchronizer, - additionalJsonRpcMethodFactory, - nodeKeys, + syncState, transactionPool, miningCoordinator, privacyParameters, @@ -338,7 +337,9 @@ public PantheonController build() throws IOException { } catch (final IOException e) { LOG.error("Failed to close storage provider", e); } - }); + }, + additionalJsonRpcMethodFactory, + nodeKeys); } protected void prepForBuild() {} diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/services/PantheonEventsImpl.java b/pantheon/src/main/java/tech/pegasys/pantheon/services/PantheonEventsImpl.java index db2a27f079..da78154ba2 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/services/PantheonEventsImpl.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/services/PantheonEventsImpl.java @@ -12,66 +12,65 @@ */ package tech.pegasys.pantheon.services; -import tech.pegasys.pantheon.ethereum.core.Block; -import tech.pegasys.pantheon.ethereum.core.Transaction; import tech.pegasys.pantheon.ethereum.eth.sync.BlockBroadcaster; +import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPool; import tech.pegasys.pantheon.plugin.services.PantheonEvents; public class PantheonEventsImpl implements PantheonEvents { private final BlockBroadcaster blockBroadcaster; private final TransactionPool transactionPool; + private final SyncState syncState; public PantheonEventsImpl( - final BlockBroadcaster blockBroadcaster, final TransactionPool transactionPool) { + final BlockBroadcaster blockBroadcaster, + final TransactionPool transactionPool, + final SyncState syncState) { this.blockBroadcaster = blockBroadcaster; this.transactionPool = transactionPool; + this.syncState = syncState; } @Override - public Object addNewBlockPropagatedListener(final NewBlockPropagatedListener listener) { + public long addBlockPropagatedListener(final BlockPropagatedListener listener) { return blockBroadcaster.subscribePropagateNewBlocks( - block -> dispatchNewBlockPropagatedMessage(block, listener)); + block -> listener.newBlockPropagated(block.getHeader())); } @Override - public void removeNewBlockPropagatedListener(final Object listenerIdentifier) { - if (listenerIdentifier instanceof Long) { - blockBroadcaster.unsubscribePropagateNewBlocks((Long) listenerIdentifier); - } + public void removeBlockPropagatedListener(final long listenerIdentifier) { + blockBroadcaster.unsubscribePropagateNewBlocks(listenerIdentifier); } - private void dispatchNewBlockPropagatedMessage( - final Block block, final NewBlockPropagatedListener listener) { - listener.newBlockPropagated(block.getHeader()); + @Override + public long addTransactionAddedListener(final TransactionAddedListener listener) { + return transactionPool.subscribePendingTransactions(listener::onTransactionAdded); } @Override - public Object addNewTransactionAddedListener(final NewTransactionAddedListener listener) { - return transactionPool.subscribePendingTransactions( - transaction -> dispatchTransactionAddedMessage(transaction, listener)); + public void removeTransactionAddedListener(final long listenerIdentifier) { + transactionPool.unsubscribePendingTransactions(listenerIdentifier); } @Override - public void removeNewTransactionAddedListener(final Object listenerIdentifier) { - if (listenerIdentifier instanceof Long) { - transactionPool.unsubscribePendingTransactions((Long) listenerIdentifier); - } + public long addTransactionDroppedListener( + final TransactionDroppedListener transactionDroppedListener) { + return transactionPool.subscribeDroppedTransactions( + transactionDroppedListener::onTransactionDropped); } @Override - public Object addNewTransactionDroppedListener( - final TransactionDroppedListener newTransactionDroppedListener) { - throw new UnsupportedOperationException(); + public void removeTransactionDroppedListener(final long listenerIdentifier) { + transactionPool.unsubscribeDroppedTransactions(listenerIdentifier); } @Override - public void removeTransactionDroppedListener(final Object listenerIdentifier) { - throw new UnsupportedOperationException(); + public long addSyncStatusListener(final SyncStatusListener syncStatusListener) { + return syncState.addSyncStatusListener(syncStatusListener); } - private void dispatchTransactionAddedMessage( - final Transaction transaction, final NewTransactionAddedListener listener) { - listener.newTransactionAdded(transaction); + @Override + public void removeSyncStatusListener(final long listenerIdentifier) { + syncState.removeSyncStatusListener(listenerIdentifier); } } diff --git a/pantheon/src/test/java/tech/pegasys/pantheon/plugins/TestPantheonEventsPlugin.java b/pantheon/src/test/java/tech/pegasys/pantheon/plugins/TestPantheonEventsPlugin.java index d2716fb456..9aee3803e2 100644 --- a/pantheon/src/test/java/tech/pegasys/pantheon/plugins/TestPantheonEventsPlugin.java +++ b/pantheon/src/test/java/tech/pegasys/pantheon/plugins/TestPantheonEventsPlugin.java @@ -34,7 +34,7 @@ public class TestPantheonEventsPlugin implements PantheonPlugin { private PantheonContext context; - private Optional subscriptionId; + private Optional subscriptionId; private final AtomicInteger blockCounter = new AtomicInteger(); private File callbackDir; @@ -50,7 +50,7 @@ public void start() { subscriptionId = context .getService(PantheonEvents.class) - .map(events -> events.addNewBlockPropagatedListener(this::onBlockAnnounce)); + .map(events -> events.addBlockPropagatedListener(this::onBlockAnnounce)); LOG.info("Listening with ID#" + subscriptionId); } @@ -60,7 +60,7 @@ public void stop() { id -> context .getService(PantheonEvents.class) - .ifPresent(pantheonEvents -> pantheonEvents.removeNewBlockPropagatedListener(id))); + .ifPresent(pantheonEvents -> pantheonEvents.removeBlockPropagatedListener(id))); LOG.info("No longer listening with ID#" + subscriptionId); } diff --git a/pantheon/src/test/java/tech/pegasys/pantheon/services/PantheonEventsImplTest.java b/pantheon/src/test/java/tech/pegasys/pantheon/services/PantheonEventsImplTest.java index 112eb1fe90..194984e79c 100644 --- a/pantheon/src/test/java/tech/pegasys/pantheon/services/PantheonEventsImplTest.java +++ b/pantheon/src/test/java/tech/pegasys/pantheon/services/PantheonEventsImplTest.java @@ -42,6 +42,7 @@ import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive; import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.plugin.data.BlockHeader; +import tech.pegasys.pantheon.plugin.data.SyncStatus; import tech.pegasys.pantheon.plugin.data.Transaction; import tech.pegasys.pantheon.testutil.TestClock; import tech.pegasys.pantheon.util.uint.UInt256; @@ -66,7 +67,7 @@ public class PantheonEventsImplTest { @Mock private ProtocolSchedule mockProtocolSchedule; @Mock private ProtocolContext mockProtocolContext; - @Mock private SyncState mockSyncState; + private SyncState syncState; @Mock private EthPeers mockEthPeers; @Mock private EthContext mockEthContext; @Mock private EthMessages mockEthMessages; @@ -109,17 +110,40 @@ public void setUp() { mockEthContext, TestClock.fixed(), new NoOpMetricsSystem(), - mockSyncState, + syncState, Wei.ZERO, - TransactionPoolConfiguration.builder().build()); + TransactionPoolConfiguration.builder().txPoolMaxSize(1).build()); + syncState = new SyncState(mockBlockchain, mockEthPeers); - serviceImpl = new PantheonEventsImpl(blockBroadcaster, transactionPool); + serviceImpl = new PantheonEventsImpl(blockBroadcaster, transactionPool, syncState); + } + + @Test + public void syncStatusEventFiresAfterSubscribe() { + final AtomicReference result = new AtomicReference<>(); + serviceImpl.addSyncStatusListener(result::set); + + assertThat(result.get()).isNull(); + syncState.publishSyncStatus(); + assertThat(result.get()).isNotNull(); + } + + @Test + public void syncStatusEventDoesNotFireAfterUnsubscribe() { + final AtomicReference result = new AtomicReference<>(); + final long id = serviceImpl.addSyncStatusListener(result::set); + syncState.publishSyncStatus(); + assertThat(result.get()).isNotNull(); + result.set(null); + serviceImpl.removeSyncStatusListener(id); + syncState.publishSyncStatus(); + assertThat(result.get()).isNull(); } @Test public void newBlockEventFiresAfterSubscribe() { final AtomicReference result = new AtomicReference<>(); - serviceImpl.addNewBlockPropagatedListener(result::set); + serviceImpl.addBlockPropagatedListener(result::set); assertThat(result.get()).isNull(); blockBroadcaster.propagate(generateBlock(), UInt256.of(1)); @@ -130,12 +154,13 @@ public void newBlockEventFiresAfterSubscribe() { @Test public void newBlockEventDoesNotFireAfterUnsubscribe() { final AtomicReference result = new AtomicReference<>(); - final Object id = serviceImpl.addNewBlockPropagatedListener(result::set); + final long id = serviceImpl.addBlockPropagatedListener(result::set); assertThat(result.get()).isNull(); blockBroadcaster.propagate(generateBlock(), UInt256.of(1)); - serviceImpl.removeNewBlockPropagatedListener(id); + assertThat(result.get()).isNotNull(); + serviceImpl.removeBlockPropagatedListener(id); result.set(null); blockBroadcaster.propagate(generateBlock(), UInt256.of(1)); @@ -149,15 +174,14 @@ public void propagationWithoutSubscriptionsCompletes() { @Test public void newBlockEventUselessUnsubscribesCompletes() { - serviceImpl.removeNewBlockPropagatedListener("doesNotExist"); - serviceImpl.removeNewBlockPropagatedListener(5); - serviceImpl.removeNewBlockPropagatedListener(5L); + serviceImpl.removeBlockPropagatedListener(5); + serviceImpl.removeBlockPropagatedListener(5L); } @Test - public void newTransactionEventFiresAfterSubscribe() { + public void transactionAddedEventFiresAfterSubscribe() { final AtomicReference result = new AtomicReference<>(); - serviceImpl.addNewTransactionAddedListener(result::set); + serviceImpl.addTransactionAddedListener(result::set); assertThat(result.get()).isNull(); transactionPool.addLocalTransaction(TX1); @@ -166,14 +190,15 @@ public void newTransactionEventFiresAfterSubscribe() { } @Test - public void newTransactionEventDoesNotFireAfterUnsubscribe() { + public void transactionAddedEventDoesNotFireAfterUnsubscribe() { final AtomicReference result = new AtomicReference<>(); - final Object id = serviceImpl.addNewTransactionAddedListener(result::set); + final long id = serviceImpl.addTransactionAddedListener(result::set); assertThat(result.get()).isNull(); transactionPool.addLocalTransaction(TX1); + assertThat(result.get()).isNotNull(); - serviceImpl.removeNewTransactionAddedListener(id); + serviceImpl.removeTransactionAddedListener(id); result.set(null); transactionPool.addLocalTransaction(TX2); @@ -181,10 +206,39 @@ public void newTransactionEventDoesNotFireAfterUnsubscribe() { } @Test - public void newTransactionEventUselessUnsubscribesCompletes() { - serviceImpl.removeNewTransactionAddedListener("doesNotExist"); - serviceImpl.removeNewTransactionAddedListener(5); - serviceImpl.removeNewTransactionAddedListener(5L); + public void transactionAddedEventUselessUnsubscribesCompletes() { + serviceImpl.removeTransactionAddedListener(5); + serviceImpl.removeTransactionAddedListener(5L); + } + + @Test + public void transactionDroppedEventFiresAfterSubscribe() { + final AtomicReference result = new AtomicReference<>(); + serviceImpl.addTransactionDroppedListener(result::set); + + assertThat(result.get()).isNull(); + // The max pool size is configured to 1 so adding two transactions should trigger a drop + transactionPool.addLocalTransaction(TX1); + transactionPool.addLocalTransaction(TX2); + + assertThat(result.get()).isNotNull(); + } + + @Test + public void transactionDroppedEventDoesNotFireAfterUnsubscribe() { + final AtomicReference result = new AtomicReference<>(); + final long id = serviceImpl.addTransactionDroppedListener(result::set); + + assertThat(result.get()).isNull(); + transactionPool.addLocalTransaction(TX1); + transactionPool.addLocalTransaction(TX2); + + assertThat(result.get()).isNotNull(); + serviceImpl.removeTransactionAddedListener(id); + result.set(null); + + transactionPool.addLocalTransaction(TX2); + assertThat(result.get()).isNull(); } private Block generateBlock() { diff --git a/plugin-api/build.gradle b/plugin-api/build.gradle index 6c2bad3b13..898cd952bf 100644 --- a/plugin-api/build.gradle +++ b/plugin-api/build.gradle @@ -56,7 +56,7 @@ Calculated : ${currentHash} tasks.register('checkAPIChanges', FileStateChecker) { description = "Checks that the API for the Plugin-API project does not change without deliberate thought" files = sourceSets.main.allJava.files - knownHash = 'PBo0D4R6/1EYXEn+k0nmWHW4TkklUWQbQGNqgWzslfw=' + knownHash = 'j39vjVpNEK0kTpk/MLK8BHnqkFoRO9BWajrm9WoejWM=' } check.dependsOn('checkAPIChanges') diff --git a/plugin-api/src/main/java/tech/pegasys/pantheon/plugin/data/SyncStatus.java b/plugin-api/src/main/java/tech/pegasys/pantheon/plugin/data/SyncStatus.java new file mode 100644 index 0000000000..753972b325 --- /dev/null +++ b/plugin-api/src/main/java/tech/pegasys/pantheon/plugin/data/SyncStatus.java @@ -0,0 +1,47 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.plugin.data; + +import tech.pegasys.pantheon.plugin.Unstable; + +@Unstable +public interface SyncStatus { + + /** + * Get the height of the block at which this synchronization attempt began. + * + * @return height of the block at which this synchronization attempt began. + */ + long getStartingBlock(); + + /** + * Get the height of the last block the synchronizer received + * + * @return the height of the last block the synchronizer received + */ + long getCurrentBlock(); + + /** + * Get the height of the highest known block. + * + * @return the height of the highest known block. + */ + long getHighestBlock(); + + /** + * Checks if the synchronizer is within a default sync tolerance of the highest known block + * + * @return true if it is within the tolerance, false otherwise + */ + boolean inSync(); +} diff --git a/plugin-api/src/main/java/tech/pegasys/pantheon/plugin/services/PantheonEvents.java b/plugin-api/src/main/java/tech/pegasys/pantheon/plugin/services/PantheonEvents.java index 18a99fa343..344364917d 100644 --- a/plugin-api/src/main/java/tech/pegasys/pantheon/plugin/services/PantheonEvents.java +++ b/plugin-api/src/main/java/tech/pegasys/pantheon/plugin/services/PantheonEvents.java @@ -14,6 +14,7 @@ import tech.pegasys.pantheon.plugin.Unstable; import tech.pegasys.pantheon.plugin.data.BlockHeader; +import tech.pegasys.pantheon.plugin.data.SyncStatus; import tech.pegasys.pantheon.plugin.data.Transaction; /** @@ -22,10 +23,13 @@ *

Currently supported events * *

    - *
  • newBlockPropagated - Fired when a new block header has been received and validated - * and is about to be sent out to other peers, but before the body of the block has been - * evaluated and validated. - *
  • newTransactionAdded - Fired when a new transaction has been added to the node. + *
  • BlockPropagated - Fired when a new block header has been received and validated and + * is about to be sent out to other peers, but before the body of the block has been evaluated + * and validated. + *
  • TransactionAdded - Fired when a new transaction has been added to the node. + *
  • TransactionDropped - Fired when a new transaction has been dropped from the node. + *
  • Logs - Fired when a new block containing logs is received. + *
  • SynchronizerStatus - Fired when the status of the synchronizer changes. *
*/ @Unstable @@ -34,52 +38,67 @@ public interface PantheonEvents { /** * Add a listener watching new blocks propagated. * - * @param newBlockPropagatedListener The listener that will accept a BlockHeader as the event. + * @param blockPropagatedListener The listener that will accept a BlockHeader as the event. * @return an object to be used as an identifier when de-registering the event. */ - Object addNewBlockPropagatedListener(NewBlockPropagatedListener newBlockPropagatedListener); + long addBlockPropagatedListener(BlockPropagatedListener blockPropagatedListener); /** * Remove the blockAdded listener from pantheon notifications. * * @param listenerIdentifier The instance that was returned from addBlockAddedListener; */ - void removeNewBlockPropagatedListener(Object listenerIdentifier); + void removeBlockPropagatedListener(long listenerIdentifier); /** * Add a listener watching new transactions added to the node. * - * @param newTransactionAddedListener The listener that will accept the Transaction object as the + * @param transactionAddedListener The listener that will accept the Transaction object as the * event. * @return an object to be used as an identifier when de-registering the event. */ - Object addNewTransactionAddedListener(NewTransactionAddedListener newTransactionAddedListener); + long addTransactionAddedListener(TransactionAddedListener transactionAddedListener); /** * Remove the blockAdded listener from pantheon notifications. * - * @param listenerIdentifier The instance that was returned from addNewTransactionAddedListener; + * @param listenerIdentifier The instance that was returned from addTransactionAddedListener; */ - void removeNewTransactionAddedListener(Object listenerIdentifier); + void removeTransactionAddedListener(long listenerIdentifier); /** * Add a listener watching dropped transactions. * - * @param newTransactionDroppedListener The listener that will accept the Transaction object as - * the event. + * @param transactionDroppedListener The listener that will accept the Transaction object as the + * event. * @return an object to be used as an identifier when de-registering the event. */ - Object addNewTransactionDroppedListener(TransactionDroppedListener newTransactionDroppedListener); + long addTransactionDroppedListener(TransactionDroppedListener transactionDroppedListener); /** * Remove the transactionDropped listener from pantheon notifications. * * @param listenerIdentifier The instance that was returned from addTransactionDroppedListener; */ - void removeTransactionDroppedListener(Object listenerIdentifier); + void removeTransactionDroppedListener(long listenerIdentifier); + + /** + * Add a listener watching the synchronizer status. + * + * @param syncStatusListener The listener that will accept the SyncStatus object as the event. + * @return an object to be used as an identifier when de-registering the event. + */ + long addSyncStatusListener(SyncStatusListener syncStatusListener); + + /** + * Remove the logs listener from pantheon notifications. + * + * @param listenerIdentifier The instance that was returned from addTransactionDroppedListener; + */ + void removeSyncStatusListener(long listenerIdentifier); /** The listener interface for receiving new block propagated events. */ - interface NewBlockPropagatedListener { + interface BlockPropagatedListener { /** * Invoked when a new block header has been received and validated and is about to be sent out @@ -88,20 +107,20 @@ interface NewBlockPropagatedListener { *

The block may not have been imported to the local chain yet and may fail later * validations. * - * @param newBlockHeader the new block header. + * @param blockHeader the new block header. */ - void newBlockPropagated(BlockHeader newBlockHeader); + void newBlockPropagated(BlockHeader blockHeader); } /** The listener interface for receiving new transaction added events. */ - interface NewTransactionAddedListener { + interface TransactionAddedListener { /** * Invoked when a new transaction has been added to the node. * * @param transaction the new transaction. */ - void newTransactionAdded(Transaction transaction); + void onTransactionAdded(Transaction transaction); } /** The listener interface for receiving transaction dropped events. */ @@ -112,6 +131,17 @@ interface TransactionDroppedListener { * * @param transaction the dropped transaction. */ - void newTransactionDropped(Transaction transaction); + void onTransactionDropped(Transaction transaction); + } + + /** The listener interface for receiving sync status events. */ + interface SyncStatusListener { + + /** + * Invoked when the synchronizer status changes + * + * @param syncStatus the sync status + */ + void onSyncStatusChanged(SyncStatus syncStatus); } }