Skip to content

Commit

Permalink
Introduce FutureUtils to reduce duplicated code around CompletableFut…
Browse files Browse the repository at this point in the history
  • Loading branch information
ajsutton authored and tmohay committed Feb 18, 2019
1 parent ed817e4 commit 0ab30f0
Show file tree
Hide file tree
Showing 8 changed files with 291 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package tech.pegasys.pantheon.ethereum.eth.manager;

import static tech.pegasys.pantheon.util.FutureUtils.completedExceptionally;

import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;

Expand Down Expand Up @@ -110,9 +112,7 @@ protected final <S> CompletableFuture<S> executeSubTask(
});
return subTaskFuture;
} else {
final CompletableFuture<S> future = new CompletableFuture<>();
future.completeExceptionally(new CancellationException());
return future;
return completedExceptionally(new CancellationException());
}
}
}
Expand All @@ -135,9 +135,7 @@ protected final <S> CompletableFuture<S> registerSubTask(
});
return subTaskFuture;
} else {
final CompletableFuture<S> future = new CompletableFuture<>();
future.completeExceptionally(new CancellationException());
return future;
return completedExceptionally(new CancellationException());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package tech.pegasys.pantheon.ethereum.eth.manager;

import static tech.pegasys.pantheon.util.FutureUtils.propagateResult;

import tech.pegasys.pantheon.util.ExceptionUtils;

import java.time.Duration;
Expand Down Expand Up @@ -98,19 +100,7 @@ public <T> CompletableFuture<T> scheduleSyncWorkerTask(
final Supplier<CompletableFuture<T>> future) {
final CompletableFuture<T> promise = new CompletableFuture<>();
final Future<?> workerFuture =
syncWorkerExecutor.submit(
() -> {
future
.get()
.whenComplete(
(r, t) -> {
if (t != null) {
promise.completeExceptionally(t);
} else {
promise.complete(r);
}
});
});
syncWorkerExecutor.submit(() -> propagateResult(future.get(), promise));
// If returned promise is cancelled, cancel the worker future
promise.whenComplete(
(r, t) -> {
Expand Down Expand Up @@ -170,18 +160,7 @@ public <T> CompletableFuture<T> scheduleFutureTask(
final CompletableFuture<T> promise = new CompletableFuture<>();
final ScheduledFuture<?> scheduledFuture =
scheduler.schedule(
() -> {
future
.get()
.whenComplete(
(r, t) -> {
if (t != null) {
promise.completeExceptionally(t);
} else {
promise.complete(r);
}
});
},
() -> propagateResult(future.get(), promise),
duration.toMillis(),
TimeUnit.MILLISECONDS);
// If returned promise is cancelled, cancel scheduled task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import static java.util.concurrent.CompletableFuture.completedFuture;
import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.CHAIN_TOO_SHORT;
import static tech.pegasys.pantheon.util.FutureUtils.completedExceptionally;
import static tech.pegasys.pantheon.util.FutureUtils.exceptionallyCompose;

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
Expand Down Expand Up @@ -73,59 +75,39 @@ public CompletableFuture<FastSyncState> waitForSuitablePeers(final FastSyncState
ethContext, syncConfig.getFastSyncMinimumPeerCount(), ethTasksTimer);

final EthScheduler scheduler = ethContext.getScheduler();
final CompletableFuture<FastSyncState> result = new CompletableFuture<>();
scheduler
.timeout(waitForPeersTask, syncConfig.getFastSyncMaximumPeerWaitTime())
.handle(
(waitResult, error) -> {
return exceptionallyCompose(
scheduler.timeout(waitForPeersTask, syncConfig.getFastSyncMaximumPeerWaitTime()),
error -> {
if (ExceptionUtils.rootCause(error) instanceof TimeoutException) {
if (ethContext.getEthPeers().availablePeerCount() > 0) {
LOG.warn(
"Fast sync timed out before minimum peer count was reached. Continuing with reduced peers.");
result.complete(fastSyncState);
return completedFuture(null);
} else {
LOG.warn(
"Maximum wait time for fast sync reached but no peers available. Continuing to wait for any available peer.");
waitForAnyPeer()
.thenAccept(value -> result.complete(fastSyncState))
.exceptionally(
taskError -> {
result.completeExceptionally(error);
return null;
});
return waitForAnyPeer();
}
} else if (error != null) {
LOG.error("Failed to find peers for fast sync", error);
result.completeExceptionally(error);
} else {
result.complete(fastSyncState);
return completedExceptionally(error);
}
return null;
});

return result;
})
.thenApply(successfulWaitResult -> fastSyncState);
}

private CompletableFuture<Void> waitForAnyPeer() {
final CompletableFuture<Void> result = new CompletableFuture<>();
waitForAnyPeer(result);
return result;
}

private void waitForAnyPeer(final CompletableFuture<Void> result) {
ethContext
.getScheduler()
.timeout(WaitForPeersTask.create(ethContext, 1, ethTasksTimer))
.whenComplete(
(waitResult, throwable) -> {
if (ExceptionUtils.rootCause(throwable) instanceof TimeoutException) {
waitForAnyPeer(result);
} else if (throwable != null) {
result.completeExceptionally(throwable);
} else {
result.complete(waitResult);
}
});
final CompletableFuture<Void> waitForPeerResult =
ethContext.getScheduler().timeout(WaitForPeersTask.create(ethContext, 1, ethTasksTimer));
return exceptionallyCompose(
waitForPeerResult,
throwable -> {
if (ExceptionUtils.rootCause(throwable) instanceof TimeoutException) {
return waitForAnyPeer();
}
return completedExceptionally(throwable);
});
}

public CompletableFuture<FastSyncState> selectPivotBlock(final FastSyncState fastSyncState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package tech.pegasys.pantheon.ethereum.eth.sync.fastsync;

import static java.util.Collections.emptyList;
import static tech.pegasys.pantheon.util.FutureUtils.completedExceptionally;

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.Block;
Expand Down Expand Up @@ -112,11 +113,9 @@ public CompletableFuture<List<BlockWithReceipts>> validateAndImportBlocks(
}

private CompletableFuture<List<BlockWithReceipts>> invalidBlockFailure(final Block block) {
final CompletableFuture<List<BlockWithReceipts>> result = new CompletableFuture<>();
result.completeExceptionally(
return completedExceptionally(
new InvalidBlockException(
"Failed to import block", block.getHeader().getNumber(), block.getHash()));
return result;
}

private BlockImporter<C> getBlockImporter(final BlockWithReceipts blockWithReceipt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package tech.pegasys.pantheon.ethereum.eth.sync.tasks;

import static tech.pegasys.pantheon.util.FutureUtils.completedExceptionally;

import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
Expand Down Expand Up @@ -87,9 +89,7 @@ private CompletableFuture<PeerTaskResult<List<BlockHeader>>> downloadHeader(fina
private CompletableFuture<PeerTaskResult<List<Block>>> completeBlock(
final PeerTaskResult<List<BlockHeader>> headerResult) {
if (headerResult.getResult().isEmpty()) {
final CompletableFuture<PeerTaskResult<List<Block>>> future = new CompletableFuture<>();
future.completeExceptionally(new IncompleteResultsException());
return future;
return completedExceptionally(new IncompleteResultsException());
}

return executeSubTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,12 @@ protected abstract CompletableFuture<Void> sendOutgoingPacket(
public abstract CompletableFuture<?> stop();

public CompletableFuture<?> start() {
final CompletableFuture<?> future = new CompletableFuture<>();
if (config.isActive()) {
final String host = config.getBindHost();
final int port = config.getBindPort();
LOG.info("Starting peer discovery agent on host={}, port={}", host, port);

listenForConnections()
return listenForConnections()
.thenAccept(
(InetSocketAddress localAddress) -> {
// Once listener is set up, finish initializing
Expand All @@ -140,21 +139,11 @@ public CompletableFuture<?> start() {
localAddress.getPort());
isActive = true;
startController();
})
.whenComplete(
(res, err) -> {
// Finalize future
if (err != null) {
future.completeExceptionally(err);
} else {
future.complete(null);
}
});
} else {
this.isActive = false;
future.complete(null);
return CompletableFuture.completedFuture(null);
}
return future;
}

private void startController() {
Expand Down
87 changes: 87 additions & 0 deletions util/src/main/java/tech/pegasys/pantheon/util/FutureUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.util;

import static java.util.concurrent.CompletableFuture.completedFuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

public class FutureUtils {

/**
* Creates a {@link CompletableFuture} that is exceptionally completed by <code>error</code>.
*
* @param error the error to exceptionally complete the future with
* @param <T> the type of CompletableFuture
* @return a CompletableFuture exceptionally completed by <code>error</code>.
*/
public static <T> CompletableFuture<T> completedExceptionally(final Throwable error) {
final CompletableFuture<T> future = new CompletableFuture<>();
future.completeExceptionally(error);
return future;
}

/**
* Returns a new CompletionStage that, when the provided stage completes exceptionally, is
* executed with the provided stage's exception as the argument to the supplied function.
* Otherwise the returned stage completes successfully with the same value as the provided stage.
*
* <p>This is the exceptional equivalent to {@link CompletionStage#thenCompose(Function)}
*
* @param future the future to handle results or exceptions from
* @param errorHandler the function returning a new CompletionStage
* @param <T> the type of the CompletionStage's result
* @return the CompletionStage
*/
public static <T> CompletableFuture<T> exceptionallyCompose(
final CompletableFuture<T> future,
final Function<Throwable, CompletionStage<T>> errorHandler) {
final CompletableFuture<T> result = new CompletableFuture<>();
future.whenComplete(
(value, error) -> {
try {
final CompletionStage<T> nextStep =
error != null ? errorHandler.apply(error) : completedFuture(value);
propagateResult(nextStep, result);
} catch (final Throwable t) {
result.completeExceptionally(t);
}
});
return result;
}

/**
* Propagates the result of one {@link CompletionStage} to a different {@link CompletableFuture}.
*
* <p>When <code>from</code> completes successfully, <code>to</code> will be completed
* successfully with the same value. When <code>from</code> completes exceptionally, <code>to
* </code> will be completed exceptionally with the same exception.
*
* @param from the CompletionStage to take results and exceptions from
* @param to the CompletableFuture to propagate results and exceptions to
* @param <T> the type of the success value
*/
public static <T> void propagateResult(
final CompletionStage<T> from, final CompletableFuture<T> to) {
from.whenComplete(
(value, error) -> {
if (error != null) {
to.completeExceptionally(error);
} else {
to.complete(value);
}
});
}
}
Loading

0 comments on commit 0ab30f0

Please sign in to comment.