Skip to content

Commit

Permalink
Closes #5
Browse files Browse the repository at this point in the history
Removed the non-if versions of the APIs as they're confusing and not needed
  • Loading branch information
Randgalt committed Jul 23, 2017
1 parent 77629d5 commit 413db59
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 163 deletions.
21 changes: 9 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ CompletableFuture/CompletionStage API is awkward and difficult to use.

```java
StagedFuture.async(executor)
.then(() -> queryDatabaseFor("something"))
.thenIf(() -> queryDatabaseFor("something"))
.withTimeout(Duration.ofSeconds(25))
.thenIf(record -> applyRecord(record)) // chain aborts if no record found
.then(result -> returnNextRecord(record))
.thenIf(result -> returnNextRecord(record))
.whenSucceeded(nextResult -> handleResult(nextResult))
.whenAborted(() -> handleAbort())
.whenFailed(e -> handleFailure(e));
Expand Down Expand Up @@ -60,24 +60,22 @@ Similarly to the builders in `CompletableFuture` you start a chain using the bui

#### Adding tasks to the chain

Tasks are added to the chain using one of the "then" methods. The first task added is specified via a supplier and subsequent tasks are specified via functions that take the result of the previous task:
Tasks are added to the chain using one of the "thenIf" methods. The first task added is specified via a supplier and subsequent tasks are specified via functions that take the result of the previous task:

_Initial Task_

- `then(Supplier<U> proc)` - Execute the given task synchronously or asynchronously depending on how the StagedFuture was built. The result is passed to the next task in the chain.
- `thenIf(Supplier<Optional<U>> proc)` - Execute the given task synchronously or asynchronously depending on how the StagedFuture was built. The given task returns an optional value that indicates whether or not the next stage can execute. If `Optional.empty()` is returned, the entire StagedFuture chain is considered to be aborted and no future tasks will execute. The `StagedFuture.whenAborted()` completer will get called.

_Subsequent Tasks_

- `then(Function<T, U> proc)` - If the chain has not been aborted or errored, the result of the current task is passed to this new task synchronously or asynchronously depending on how the StagedFuture was built. The new result of the given task is passed to the next task in the chain.
- `thenIf(Function<T, Optional<U>> proc)` - If the chain has not been aborted or errored, the result of the current task is passed to this new task synchronously or asynchronously depending on how the StagedFuture was built. The given task returns an optional value that indicates whether or not the next stage can execute. If `Optional.empty()` is returned, the entire StagedFuture chain is considered to be aborted and no future tasks will execute. The `StagedFuture.whenAborted()` completer will get called.

_Timeouts_

The "then" methods (see above) can optional be assigned a timeout or a timeout and default value:

- `thenXX(X).withTimeout(Duration timeout)` - Sets a timeout for this stage's task. If the given timeout elapses before the task completes this stage is completed exceptionally with a `TimeoutException`.
- `thenXX(X).withTimeout(Duration timeout, Supplier<T> defaultValue)` - Sets a timeout for this stage's task. If the given timeout elapses before the task completes this stage is completed with the given default value.
- `thenIf(X).withTimeout(Duration timeout)` - Sets a timeout for this stage's task. If the given timeout elapses before the task completes this stage is completed exceptionally with a `TimeoutException`.
- `thenIf(X).withTimeout(Duration timeout, Supplier<T> defaultValue)` - Sets a timeout for this stage's task. If the given timeout elapses before the task completes this stage is completed with the given default value.

_Completers_

Expand All @@ -93,7 +91,6 @@ _Chaining Other Stages_

You can include external stages into the chain:

- `thenStage(Function<T, CompletionStage<U>> stage)` - executes the given stage asynchronously as the next task in the chain.
- `thenStageIf(Function<T, CompletionStage<Optional<U>>> stage)` - executes the given stage asynchronously as the next task in the chain. If the stage returns an empty Optional the chain is aborted.

_Access The Internal CompletionStage_
Expand All @@ -120,10 +117,10 @@ It keeps track of the threads in use by the StagedFuture it is associated with.
```java
Cancelable cancelable = new Cancelable();
StagedFuture.async(executor, cancelable)
.then(() -> worker("1"))
.then(s -> hangingWorker("2"))
.then(s -> worker("3"))
.then(s -> worker("4"));
.thenIf(() -> worker("1"))
.thenIf(s -> hangingWorker("2"))
.thenIf(s -> worker("3"))
.thenIf(s -> worker("4"));

cancelable.cancel(true); // hangingWorker() gets interrupted
```
Expand Down
37 changes: 0 additions & 37 deletions src/main/java/io/soabase/stages/StagedFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,27 +112,6 @@ static StagedFutureBuilder asyncPool(Tracing tracing) {
*/
<U> StagedFutureTimeout<U> thenIf(Function<T, Optional<U>> proc);

/**
* <p>
* If the current stage completes successfully, execute the given task
* synchronously or asynchronously depending on how the StagedFuture was built.
* The given task receives the result of this stage's execution.
* </p>
*
* <p>
* Note: the returned value is a {@link StagedFutureTimeout} which allows
* a timeout and an optional default to be set for the task.
* </p>
*
* <p>
* Important: Procs that return <code>null</code> are not supported
* </p>
*
* @param proc task to execute
* @return next stage in the chain
*/
<U> StagedFutureTimeout<U> then(Function<T, U> proc);

/**
* <p>
* If the current stage completes successfully, chain to the given CompletionStage
Expand All @@ -149,22 +128,6 @@ static StagedFutureBuilder asyncPool(Tracing tracing) {
*/
<U> StagedFutureTimeout<U> thenStageIf(Function<T, CompletionStage<Optional<U>>> stage);

/**
* <p>
* If the current stage completes successfully, chain to the given CompletionStage
* synchronously or asynchronously depending on how the StagedFuture was built.
* </p>
*
* <p>
* Note: the returned value is a {@link StagedFutureTimeout} which allows
* a timeout and an optional default to be set for the task.
* </p>
*
* @param stage stage to chain to
* @return next stage in the chain
*/
<U> StagedFutureTimeout<U> thenStage(Function<T, CompletionStage<U>> stage);

/**
* If the stage and any previous stages in the chain complete successfully, the handler is called with the resulting value.
*
Expand Down
34 changes: 0 additions & 34 deletions src/main/java/io/soabase/stages/StagedFutureBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,6 @@ public interface StagedFutureBuilder {
*/
<U> StagedFutureTimeout<U> thenIf(Supplier<Optional<U>> proc);

/**
* <p>
* Execute the given task synchronously or asynchronously depending on how the StagedFuture was built.
* </p>
*
* <p>
* Note: the returned value is a {@link StagedFutureTimeout} which allows
* a timeout and an optional default to be set for the task.
* </p>
*
* <p>
* Important: Procs that return <code>null</code> are not supported
* </p>
*
* @param proc task to execute
* @return next stage in the chain
*/
<U> StagedFutureTimeout<U> then(Supplier<U> proc);

/**
* <p>
* Use the given CompletionStage as the initial stage for this StagedFuture.
Expand All @@ -75,19 +56,4 @@ public interface StagedFutureBuilder {
* @return next stage in the chain
*/
<U> StagedFutureTimeout<U> thenStageIf(CompletionStage<Optional<U>> stage);

/**
* <p>
* Use the given CompletionStage as the initial stage for this StagedFuture.
* </p>
*
* <p>
* Note: the returned value is a {@link StagedFutureTimeout} which allows
* a timeout and an optional default to be set for the task.
* </p>
*
* @param stage first stage
* @return next stage in the chain
*/
<U> StagedFutureTimeout<U> thenStage(CompletionStage<U> stage);
}
11 changes: 0 additions & 11 deletions src/main/java/io/soabase/stages/StagedFutureBuilderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ class StagedFutureBuilderImpl<T> implements StagedFutureBuilder {
this.tracing = tracing;
}

@Override
public <U> StagedFutureTimeout<U> then(Supplier<U> proc) {
return thenIf(() -> Optional.of(proc.get()));
}

@Override
public <U> StagedFutureTimeout<U> thenIf(Supplier<Optional<U>> proc) {
return new StagedFutureImpl<>(proc, executor, tracing);
Expand All @@ -46,10 +41,4 @@ public <U> StagedFutureTimeout<U> thenIf(Supplier<Optional<U>> proc) {
public <U> StagedFutureTimeout<U> thenStageIf(CompletionStage<Optional<U>> stage) {
return new StagedFutureImpl<>(stage, executor, tracing);
}

@Override
public <U> StagedFutureTimeout<U> thenStage(CompletionStage<U> stage) {
// async isn't necessary - it's just a conversion to optional
return new StagedFutureImpl<>(stage.thenApply(Optional::of), executor, tracing);
}
}
20 changes: 0 additions & 20 deletions src/main/java/io/soabase/stages/StagedFutureImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,6 @@ public <U> StagedFutureTimeout<U> thenIf(Function<T, Optional<U>> proc) {
return new StagedFutureImpl<>(executor, nextStage, tracing);
}

@Override
public <U> StagedFutureTimeout<U> then(Function<T, U> proc)
{
return thenIf(v -> of(proc.apply(v)));
}

@Override
public <U> StagedFutureTimeout<U> thenStageIf(Function<T, CompletionStage<Optional<U>>> stage) {
Objects.requireNonNull(stage, "stage cannot be null");
Expand All @@ -104,20 +98,6 @@ public <U> StagedFutureTimeout<U> thenStageIf(Function<T, CompletionStage<Option
return new StagedFutureImpl<>(executor, stageIf, tracing);
}

@Override
public <U> StagedFutureTimeout<U> thenStage(Function<T, CompletionStage<U>> stage) {
Objects.requireNonNull(stage, "stage cannot be null");
CompletionStage<Optional<U>> stageIf = future.thenComposeAsync(optional -> {
if ( optional.isPresent() ) {
CompletionStage<U> applied = stage.apply(optional.get());
return applied.thenApplyAsync(StagedFutureImpl::of, executor);
}

return CompletableFuture.completedFuture(Optional.empty());
}, executor);
return new StagedFutureImpl<>(executor, stageIf, tracing);
}

@Override
public StagedFuture<T> withTimeout(Duration max) {
CompletionStage<Optional<T>> timeout = Timeout.within(future, max);
Expand Down
79 changes: 30 additions & 49 deletions src/test/java/io/soabase/stages/TestStaged.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ public void tearDown() {
@Test
public void testBasic() throws Exception {
StagedFuture<List<TestTracing.Trace>> future = StagedFuture.sync(tracing)
.then(() -> worker("1"))
.then(s -> worker("2"))
.then(s -> worker("3"))
.thenIf(() -> worker("1"))
.thenIf(s -> worker("2"))
.thenIf(s -> worker("3"))
.whenSucceededYield(s -> tracing.getTracing());

Optional<List<TestTracing.Trace>> optional = complete(future);
Expand All @@ -76,27 +76,17 @@ public void testBasic() throws Exception {
}
}

@Test
public void testWrappedOptional() throws Exception {
AtomicBoolean weGotHere = new AtomicBoolean();
complete(StagedFuture.sync()
.then(() -> Optional.of("hey"))
.then(heyOpt -> (heyOpt.isPresent() && heyOpt.get().equals("hey")) ? Optional.empty() : Optional.of("fail"))
.whenSucceeded(shouldBeEmpty -> weGotHere.set(!shouldBeEmpty.isPresent())));
assertThat(weGotHere.get()).isTrue();
}

@Test
public void testAbort() throws Exception {
AtomicBoolean isAborted = new AtomicBoolean(false);
complete(StagedFuture.sync(tracing)
.then(() -> worker("1"))
.thenIf(() -> worker("1"))
.thenIf(s -> Optional.empty())
.then(s -> worker("2"))
.then(s -> worker("3"))
.then(s -> worker("4"))
.then(s -> worker("5"))
.then(s -> worker("6"))
.thenIf(s -> worker("2"))
.thenIf(s -> worker("3"))
.thenIf(s -> worker("4"))
.thenIf(s -> worker("5"))
.thenIf(s -> worker("6"))
.whenAborted(() -> isAborted.set(true)));

assertThat(isAborted.get()).isTrue();
Expand All @@ -117,10 +107,10 @@ public void testAbort() throws Exception {
public void testTimeout() throws Exception {
AtomicBoolean isTimeout = new AtomicBoolean(false);
complete(StagedFuture.async(executor, tracing)
.then(() -> worker("1"))
.then(s -> hangingWorker("2")).withTimeout(Duration.ofSeconds(2))
.then(s -> worker("3"))
.then(s -> worker("4"))
.thenIf(() -> worker("1"))
.thenIf(s -> hangingWorker("2")).withTimeout(Duration.ofSeconds(2))
.thenIf(s -> worker("3"))
.thenIf(s -> worker("4"))
.whenFailed(e -> {
while ( e instanceof CompletionException ) {
e = e.getCause();
Expand All @@ -145,10 +135,10 @@ public void testTimeout() throws Exception {
@Test
public void testTimeoutAndDefault() throws Exception {
complete(StagedFuture.async(executor, tracing)
.then(() -> worker("1"))
.then(s -> hangingWorker("2")).withTimeout(Duration.ofSeconds(2), () -> "default")
.then(this::worker)
.then(s -> worker("4")));
.thenIf(() -> worker("1"))
.thenIf(s -> hangingWorker("2")).withTimeout(Duration.ofSeconds(2), () -> "default")
.thenIf(this::worker)
.thenIf(s -> worker("4")));

List<TestTracing.Trace> traces = tracing.getTracing();
assertThat(traces).size().isEqualTo(7);
Expand All @@ -172,10 +162,10 @@ public void testTimeoutAndDefault() throws Exception {
public void testFailure() throws Exception {
AtomicReference<String> exceptionMessage = new AtomicReference<>();
complete(StagedFuture.async(executor, tracing)
.then(() -> worker("1"))
.then(s -> failureWorker("2"))
.then(s -> worker("3"))
.then(s -> worker("4"))
.thenIf(() -> worker("1"))
.thenIf(s -> failureWorker("2"))
.thenIf(s -> worker("3"))
.thenIf(s -> worker("4"))
.whenFailed(e -> {
while ( e instanceof CompletionException ) {
e = e.getCause();
Expand Down Expand Up @@ -203,8 +193,8 @@ public void testCancelable() throws Exception {
AtomicBoolean wasInterrupted = new AtomicBoolean(false);
Cancelable cancelable = new Cancelable(tracing);
StagedFuture<String> staged = StagedFuture.async(executor, cancelable)
.then(() -> worker("1"))
.then(s -> {
.thenIf(() -> worker("1"))
.thenIf(s -> {
latch.countDown();
try {
return hangingWorker("2");
Expand All @@ -215,8 +205,8 @@ public void testCancelable() throws Exception {
throw e;
}
})
.then(s -> worker("3"))
.then(s -> worker("4"));
.thenIf(s -> worker("3"))
.thenIf(s -> worker("4"));

latch.await();

Expand All @@ -231,15 +221,6 @@ public void testCancelable() throws Exception {
}
}

@Test(expected = ExecutionException.class)
public void testNullReturn() throws Exception {
complete(StagedFuture.async(executor, tracing)
.then(() -> worker("1"))
.then(s -> null)
.then(s -> worker("3"))
.then(s -> worker("4")));
}

private <T> Optional<T> complete(StagedFuture<T> stagedFuture) throws Exception {
return complete(stagedFuture.unwrap());
}
Expand All @@ -248,19 +229,19 @@ private <T> Optional<T> complete(CompletionStage<Optional<T>> stage) throws Exce
return stage.toCompletableFuture().get(5, TimeUnit.SECONDS);
}

private String worker(String context) {
private Optional<String> worker(String context) {
TestTracing.setContext(context);
tracing.resetLastContext(context);
return context;
return Optional.of(context);
}

private String failureWorker(@SuppressWarnings("SameParameterValue") String context) {
private Optional<String> failureWorker(@SuppressWarnings("SameParameterValue") String context) {
TestTracing.setContext(context);
tracing.resetLastContext(context);
throw new RuntimeException(context);
}

private String hangingWorker(@SuppressWarnings("SameParameterValue") String context) {
private Optional<String> hangingWorker(@SuppressWarnings("SameParameterValue") String context) {
TestTracing.setContext(context);
tracing.resetLastContext(context);
try {
Expand All @@ -269,6 +250,6 @@ private String hangingWorker(@SuppressWarnings("SameParameterValue") String cont
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return context;
return Optional.of(context);
}
}

0 comments on commit 413db59

Please sign in to comment.