A tiny library that makes staged/pipelined CompletableFutures much easier to create and manage.
- You have a sequence of tasks that pipeline or chain
- These tasks can be executed synchronously or asynchronously
- You might need to abort/cancel the chain in the middle
- You might need to provide a timeout for the tasks
Most of this can be done with Java 8's CompletableFuture/CompletionStage today. Timeouts must be added manually (or wait for Java 9). Aborting tasks is not supported. Also, the CompletableFuture/CompletionStage API is awkward and difficult to use.
StagedFuture
simplifies CompletableFuture/CompletionStage so that you can write code like this:
StagedFuture.async(executor)
.thenIf(() -> queryDatabaseFor("something"))
.withTimeout(Duration.ofSeconds(25))
.thenIf(record -> applyRecord(record)) // chain aborts if no record found
.thenIf(result -> returnNextRecord(result))
.whenSucceeded(nextResult -> handleResult(nextResult))
.whenAborted(() -> handleAbort())
.whenFailed(e -> handleFailure(e));
- Can easily set timeouts and timeouts with default values for tasks (without waiting for Java 9)
- Allows setting the executor once instead of with each chained call
- Allows a task to signal that the remainder of the chain should be canceled
- Simplified API
Note: you can easily access the managed CompletionStage
when needed by calling StagedFuture#unwrap()
.
Stages is available from Maven Central. Use your favorite build tool and specify:
GroupId | ArtifactId |
---|---|
io.soabase.stages | soabase-stages |
Similarly to the builders in CompletableFuture
you start a chain using the builders in StagedFuture
. There are syncrhonous and asynchronous builders:
StagedFuture.sync()
- starts a StagedFuture chain that executes tasks synchronouslyStagedFuture.async(executor)
- starts a StagedFuture chain that executes tasks asynchronously using the given executorStagedFuture.asyncPool()
- starts a StagedFuture chain that executes tasks asynchronously using the ForkJoin pool
Tasks are added to the chain using one of the "thenIf" methods. The first task added is specified via a supplier and subsequent tasks are specified via functions that take the result of the previous task:
Initial Task
thenIf(Supplier<Optional<U>> proc)
- Execute the given task synchronously or asynchronously depending on how the StagedFuture was built. The given task returns an optional value that indicates whether or not the next stage can execute. IfOptional.empty()
is returned, the entire StagedFuture chain is considered to be aborted and no future tasks will execute. TheStagedFuture.whenAborted()
completer will get called.
Subsequent Tasks
thenIf(Function<T, Optional<U>> proc)
- If the chain has not been aborted or errored, the result of the current task is passed to this new task synchronously or asynchronously depending on how the StagedFuture was built. The given task returns an optional value that indicates whether or not the next stage can execute. IfOptional.empty()
is returned, the entire StagedFuture chain is considered to be aborted and no future tasks will execute. TheStagedFuture.whenAborted()
completer will get called.
Timeouts
The "then" methods (see above) can optional be assigned a timeout or a timeout and default value:
thenIf(X).withTimeout(Duration timeout)
- Sets a timeout for this stage's task. If the given timeout elapses before the task completes this stage is completed exceptionally with aTimeoutException
.thenIf(X).withTimeout(Duration timeout, Supplier<T> defaultValue)
- Sets a timeout for this stage's task. If the given timeout elapses before the task completes this stage is completed with the given default value.
Completers
At any point in the chain, you can add handlers for successful completions, failures or aborts:
whenSucceeded(Consumer<T> handler)
- if the chain completes successfully the handler is called.whenSucceededYield(Function<T, U> handler)
- same aswhenSucceeded()
but allows mapping the return type.whenAborted(Runnable handler)
- if the chain is aborted (i.e. one of thethenIf()
tasks returns empty) the handler is called.whenFailed(Consumer<Throwable> handler)
- if there is an exception or failure in the chain the handler is called.whenFinal(Runnable handler)
- calls the handler when the chain completes in any way (success, abort, exception, etc.).
Chaining Other Stages
You can include external stages into the chain:
thenStageIf(Function<T, CompletionStage<Optional<U>>> stage)
- executes the given stage asynchronously as the next task in the chain. If the stage returns an empty Optional the chain is aborted.
Access The Internal CompletionStage
You can access the internally managed CompletionStage
via:
unwrap()
- returns theCompletionStage<Optional<T>>
.
The tasks submitted to StagedFuture can optionally be traced via the Tracing
interface. The library comes with an SLF4J tracer and a System.out
tracer. You can also write your own. Pass an instace of the tracer to the StagedFuture builder. E.g.
StagedFuture.async(executor, Tracing.debug(logger)).
then(...)
...
The special purpose tracer, Cancelable
, can be used to enable canceling a running chain.
It keeps track of the threads in use by the StagedFuture it is associated with. At any time you can call cancelChain(boolean) to interrupt currently running tasks and prevent new tasks from running. E.g.
Cancelable cancelable = new Cancelable();
StagedFuture.async(executor, cancelable)
.thenIf(() -> worker("1"))
.thenIf(s -> hangingWorker("2"))
.thenIf(s -> worker("3"))
.thenIf(s -> worker("4"));
cancelable.cancel(true); // hangingWorker() gets interrupted
The CompletionStage wrappers that StagedFuture uses internally can be used directly without having to use StagedFuture
.
The Timeout
class has methods that wrap CompletionStage
adding timeouts and timeouts with default values. It roughly emulates the forthcoming Java 9 timeout features for CompletableFuture.
The Aborted
class has methods that wrap CompletionStage
and call given handlers when the stage completes with an empty Optional
.