Skip to content

Commit

Permalink
Migrate netty HttpClient to ExecutionFlow (#11288)
Browse files Browse the repository at this point in the history
* Add ByteBody.move (#11280)

* Migrate netty HttpClient to ExecutionFlow
Also adds cancellation support to ExecutionFlow.

* review

* fix double cancel

---------

Co-authored-by: Sergio del Amo <sergio.delamo@softamo.com>
  • Loading branch information
yawkat and sdelamo authored Nov 6, 2024
1 parent 74940d3 commit 0ee9791
Show file tree
Hide file tree
Showing 20 changed files with 645 additions and 248 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,22 @@ static <T> DelayedExecutionFlow<T> create() {
*/
void completeExceptionally(Throwable exc);

/**
* Check for cancellation.
*
* @return {@code true} iff this flow or any downstream flow has been cancelled
* @since 4.8.0
*/
boolean isCancelled();

/**
* Add a listener that is called if this flow or any downstream flow is cancelled.
*
* @param hook The hook to call on cancellation
* @since 4.8.0
*/
void onCancel(@NonNull Runnable hook);

/**
* Complete this flow from the given flow.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ final class DelayedExecutionFlowImpl<T> implements DelayedExecutionFlow<T> {
* The tail of the linked list of steps in this flow.
*/
private Step tail = head;
private Runnable onCancel;
private volatile boolean cancelled;

/**
* Perform the given step with the given item. Continue on until there is either no more steps,
Expand Down Expand Up @@ -85,6 +87,11 @@ public void completeExceptionally(Throwable exc) {
@SuppressWarnings("unchecked")
private <R> ExecutionFlow<R> next(Step next) {
Step oldTail = tail;
if (oldTail instanceof DelayedExecutionFlowImpl.Cancel<?>) {
// because the Cancel step can only cancel flows upstream of it, we can't allow adding
// further downstream steps.
throw new IllegalStateException("Cannot add more ExecutionFlow steps after cancellation");
}
tail = next;
ExecutionFlow output = oldTail.atomicSetNext(next);
if (output != null) {
Expand Down Expand Up @@ -135,7 +142,38 @@ public ImperativeExecutionFlow<T> tryComplete() {
}
}

private abstract static class Step<I, O> {
@Override
public boolean isCancelled() {
return cancelled;
}

@Override
public void cancel() {
if (cancelled) {
return;
}
next(new Cancel());
cancelled = true;
Runnable hook = this.onCancel;
if (hook != null) {
hook.run();
}
}

@Override
public void onCancel(Runnable hook) {
Runnable prev = this.onCancel;
if (prev != null) {
this.onCancel = () -> {
prev.run();
hook.run();
};
} else {
this.onCancel = hook;
}
}

private abstract static sealed class Step<I, O> {
/**
* The next step to take, or {@code null} if there is no next step yet.
*/
Expand Down Expand Up @@ -334,4 +372,14 @@ ExecutionFlow<E> apply(ExecutionFlow<E> executionFlow) {
return executionFlow;
}
}

private static final class Cancel<E> extends Step<E, E> {
private static final ExecutionFlow ERR = ExecutionFlow.error(new AssertionError("Should never be hit, no further steps are allowed after cancel"));

@Override
ExecutionFlow<E> apply(ExecutionFlow<E> input) {
input.cancel();
return ERR;
}
}
}
73 changes: 73 additions & 0 deletions core/src/main/java/io/micronaut/core/execution/ExecutionFlow.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -146,13 +152,67 @@ static <T> ExecutionFlow<T> async(@NonNull Executor executor, @NonNull Supplier<
@NonNull
ExecutionFlow<T> putInContext(@NonNull String key, @NonNull Object value);

/**
* Store a contextual value if it is absent.
*
* @param key The key
* @param value The value
* @return a new flow
* @since 4.8.0
*/
@NonNull
default ExecutionFlow<T> putInContextIfAbsent(@NonNull String key, @NonNull Object value) {
return this;
}

/**
* Invokes a provided function when the flow is resolved, or immediately if it is already done.
*
* @param fn The function
*/
void onComplete(@NonNull BiConsumer<? super T, Throwable> fn);

/**
* Create a new {@link ExecutionFlow} that either returns the same result or, if the timeout
* expires before the result is received, a {@link TimeoutException}.
*
* @param timeout The timeout
* @param scheduler Scheduler to schedule the timeout task
* @param onDiscard An optional consumer to be called on the value of this flow if the flow
* completes after the timeout has expired and thus the value is discarded
* @return A new flow that will produce either the same value or a {@link TimeoutException}
*/
@NonNull
default ExecutionFlow<T> timeout(@NonNull Duration timeout, @NonNull ScheduledExecutorService scheduler, @Nullable BiConsumer<T, Throwable> onDiscard) {
DelayedExecutionFlow<T> delayed = DelayedExecutionFlow.create();
AtomicBoolean completed = new AtomicBoolean(false);
// schedule the timeout
ScheduledFuture<?> future = scheduler.schedule(() -> {
if (completed.compareAndSet(false, true)) {
cancel();
delayed.completeExceptionally(new TimeoutException());
}
}, timeout.toNanos(), TimeUnit.NANOSECONDS);
// forward any result
onComplete((t, throwable) -> {
if (completed.compareAndSet(false, true)) {
future.cancel(false);
if (throwable != null) {
delayed.completeExceptionally(throwable);
} else {
delayed.complete(t);
}
} else {
if (onDiscard != null) {
onDiscard.accept(t, throwable);
}
}
});
// forward cancel from downstream
delayed.onCancel(this::cancel);
return delayed;
}

/**
* Create an {@link ImperativeExecutionFlow} from this execution flow, if possible. The flow
* will have its result immediately available.
Expand Down Expand Up @@ -216,5 +276,18 @@ default CompletableFuture<T> toCompletableFuture() {
return completableFuture;
}

/**
* Send an optional hint to the upstream producer that the result of this flow is no longer
* needed and can be discarded. This is an optional operation, and has no effect if the flow
* has already completed. After a cancellation, a flow might never complete.
* <p>If this flow contains a resource that needs to be cleaned up (e.g. an
* {@link java.io.InputStream}), the caller should still add a
* {@link #onComplete completion listener} for cleanup, in case the upstream producer does not
* support cancellation or has already submitted the result.
*
* @since 4.8.0
*/
default void cancel() {
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer;

/**
* The imperative execution flow.
Expand Down Expand Up @@ -54,4 +57,17 @@ public interface ImperativeExecutionFlow<T> extends ExecutionFlow<T> {
default ImperativeExecutionFlow<T> tryComplete() {
return this;
}

/**
* {@inheritDoc}
*
* @deprecated This method has no effect for {@link ImperativeExecutionFlow}, it makes no sense
* to use it
*/
@Override
@NonNull
@Deprecated
default ExecutionFlow<T> timeout(@NonNull Duration timeout, @NonNull ScheduledExecutorService scheduler, @Nullable BiConsumer<T, Throwable> onDiscard) {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,35 @@ class DelayedExecutionFlowSpec extends Specification {
}
return output
}

def 'cancel'() {
given:
def delayed1 = DelayedExecutionFlow.create()
def delayed2 = DelayedExecutionFlow.create()
def delayed3 = DelayedExecutionFlow.create()
def delayed4 = DelayedExecutionFlow.create()
def out = delayed1.flatMap { a ->
return delayed2.map { b -> a + b }
}.flatMap { a ->
return delayed3.map { b -> a + b }
}.flatMap { a ->
return delayed4.map { b -> a + b }
}
Object result = null
out.onComplete((v, t) -> result = v)

when:
delayed1.complete("foo")
out.cancel()
then:"cancellation is forwarded to the inner flows"
delayed1.cancelled
delayed2.cancelled

when:
delayed2.complete("bar")
delayed3.complete("baz")
delayed4.complete("fizz")
then:"result is still forwarded"
result == "foobarbazfizz"
}
}
Loading

0 comments on commit 0ee9791

Please sign in to comment.