diff --git a/core/src/main/java/io/micronaut/core/execution/DelayedExecutionFlow.java b/core/src/main/java/io/micronaut/core/execution/DelayedExecutionFlow.java index f07927c7459..48b9389cb94 100644 --- a/core/src/main/java/io/micronaut/core/execution/DelayedExecutionFlow.java +++ b/core/src/main/java/io/micronaut/core/execution/DelayedExecutionFlow.java @@ -43,6 +43,22 @@ static DelayedExecutionFlow create() { */ void completeExceptionally(Throwable exc); + /** + * Check for cancellation. + * + * @return {@code true} iff this flow or any downstream flow has been cancelled + * @since 4.8.0 + */ + boolean isCancelled(); + + /** + * Add a listener that is called if this flow or any downstream flow is cancelled. + * + * @param hook The hook to call on cancellation + * @since 4.8.0 + */ + void onCancel(@NonNull Runnable hook); + /** * Complete this flow from the given flow. * diff --git a/core/src/main/java/io/micronaut/core/execution/DelayedExecutionFlowImpl.java b/core/src/main/java/io/micronaut/core/execution/DelayedExecutionFlowImpl.java index 3f5d04019f5..cc6b501c533 100644 --- a/core/src/main/java/io/micronaut/core/execution/DelayedExecutionFlowImpl.java +++ b/core/src/main/java/io/micronaut/core/execution/DelayedExecutionFlowImpl.java @@ -36,6 +36,8 @@ final class DelayedExecutionFlowImpl implements DelayedExecutionFlow { * The tail of the linked list of steps in this flow. */ private Step tail = head; + private Runnable onCancel; + private volatile boolean cancelled; /** * Perform the given step with the given item. Continue on until there is either no more steps, @@ -85,6 +87,11 @@ public void completeExceptionally(Throwable exc) { @SuppressWarnings("unchecked") private ExecutionFlow next(Step next) { Step oldTail = tail; + if (oldTail instanceof DelayedExecutionFlowImpl.Cancel) { + // because the Cancel step can only cancel flows upstream of it, we can't allow adding + // further downstream steps. + throw new IllegalStateException("Cannot add more ExecutionFlow steps after cancellation"); + } tail = next; ExecutionFlow output = oldTail.atomicSetNext(next); if (output != null) { @@ -135,7 +142,38 @@ public ImperativeExecutionFlow tryComplete() { } } - private abstract static class Step { + @Override + public boolean isCancelled() { + return cancelled; + } + + @Override + public void cancel() { + if (cancelled) { + return; + } + next(new Cancel()); + cancelled = true; + Runnable hook = this.onCancel; + if (hook != null) { + hook.run(); + } + } + + @Override + public void onCancel(Runnable hook) { + Runnable prev = this.onCancel; + if (prev != null) { + this.onCancel = () -> { + prev.run(); + hook.run(); + }; + } else { + this.onCancel = hook; + } + } + + private abstract static sealed class Step { /** * The next step to take, or {@code null} if there is no next step yet. */ @@ -334,4 +372,14 @@ ExecutionFlow apply(ExecutionFlow executionFlow) { return executionFlow; } } + + private static final class Cancel extends Step { + private static final ExecutionFlow ERR = ExecutionFlow.error(new AssertionError("Should never be hit, no further steps are allowed after cancel")); + + @Override + ExecutionFlow apply(ExecutionFlow input) { + input.cancel(); + return ERR; + } + } } diff --git a/core/src/main/java/io/micronaut/core/execution/ExecutionFlow.java b/core/src/main/java/io/micronaut/core/execution/ExecutionFlow.java index 0d0f440ba02..1c3b753a602 100644 --- a/core/src/main/java/io/micronaut/core/execution/ExecutionFlow.java +++ b/core/src/main/java/io/micronaut/core/execution/ExecutionFlow.java @@ -19,9 +19,15 @@ import io.micronaut.core.annotation.NonNull; import io.micronaut.core.annotation.Nullable; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Supplier; @@ -146,6 +152,19 @@ static ExecutionFlow async(@NonNull Executor executor, @NonNull Supplier< @NonNull ExecutionFlow putInContext(@NonNull String key, @NonNull Object value); + /** + * Store a contextual value if it is absent. + * + * @param key The key + * @param value The value + * @return a new flow + * @since 4.8.0 + */ + @NonNull + default ExecutionFlow putInContextIfAbsent(@NonNull String key, @NonNull Object value) { + return this; + } + /** * Invokes a provided function when the flow is resolved, or immediately if it is already done. * @@ -153,6 +172,47 @@ static ExecutionFlow async(@NonNull Executor executor, @NonNull Supplier< */ void onComplete(@NonNull BiConsumer fn); + /** + * Create a new {@link ExecutionFlow} that either returns the same result or, if the timeout + * expires before the result is received, a {@link TimeoutException}. + * + * @param timeout The timeout + * @param scheduler Scheduler to schedule the timeout task + * @param onDiscard An optional consumer to be called on the value of this flow if the flow + * completes after the timeout has expired and thus the value is discarded + * @return A new flow that will produce either the same value or a {@link TimeoutException} + */ + @NonNull + default ExecutionFlow timeout(@NonNull Duration timeout, @NonNull ScheduledExecutorService scheduler, @Nullable BiConsumer onDiscard) { + DelayedExecutionFlow delayed = DelayedExecutionFlow.create(); + AtomicBoolean completed = new AtomicBoolean(false); + // schedule the timeout + ScheduledFuture future = scheduler.schedule(() -> { + if (completed.compareAndSet(false, true)) { + cancel(); + delayed.completeExceptionally(new TimeoutException()); + } + }, timeout.toNanos(), TimeUnit.NANOSECONDS); + // forward any result + onComplete((t, throwable) -> { + if (completed.compareAndSet(false, true)) { + future.cancel(false); + if (throwable != null) { + delayed.completeExceptionally(throwable); + } else { + delayed.complete(t); + } + } else { + if (onDiscard != null) { + onDiscard.accept(t, throwable); + } + } + }); + // forward cancel from downstream + delayed.onCancel(this::cancel); + return delayed; + } + /** * Create an {@link ImperativeExecutionFlow} from this execution flow, if possible. The flow * will have its result immediately available. @@ -216,5 +276,18 @@ default CompletableFuture toCompletableFuture() { return completableFuture; } + /** + * Send an optional hint to the upstream producer that the result of this flow is no longer + * needed and can be discarded. This is an optional operation, and has no effect if the flow + * has already completed. After a cancellation, a flow might never complete. + *

If this flow contains a resource that needs to be cleaned up (e.g. an + * {@link java.io.InputStream}), the caller should still add a + * {@link #onComplete completion listener} for cleanup, in case the upstream producer does not + * support cancellation or has already submitted the result. + * + * @since 4.8.0 + */ + default void cancel() { + } } diff --git a/core/src/main/java/io/micronaut/core/execution/ImperativeExecutionFlow.java b/core/src/main/java/io/micronaut/core/execution/ImperativeExecutionFlow.java index c33516dcd59..f7575600a4c 100644 --- a/core/src/main/java/io/micronaut/core/execution/ImperativeExecutionFlow.java +++ b/core/src/main/java/io/micronaut/core/execution/ImperativeExecutionFlow.java @@ -19,7 +19,10 @@ import io.micronaut.core.annotation.NonNull; import io.micronaut.core.annotation.Nullable; +import java.time.Duration; import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.BiConsumer; /** * The imperative execution flow. @@ -54,4 +57,17 @@ public interface ImperativeExecutionFlow extends ExecutionFlow { default ImperativeExecutionFlow tryComplete() { return this; } + + /** + * {@inheritDoc} + * + * @deprecated This method has no effect for {@link ImperativeExecutionFlow}, it makes no sense + * to use it + */ + @Override + @NonNull + @Deprecated + default ExecutionFlow timeout(@NonNull Duration timeout, @NonNull ScheduledExecutorService scheduler, @Nullable BiConsumer onDiscard) { + return this; + } } diff --git a/core/src/test/groovy/io/micronaut/core/execution/DelayedExecutionFlowSpec.groovy b/core/src/test/groovy/io/micronaut/core/execution/DelayedExecutionFlowSpec.groovy index 56c9c9a4dbe..98aabb2b55c 100644 --- a/core/src/test/groovy/io/micronaut/core/execution/DelayedExecutionFlowSpec.groovy +++ b/core/src/test/groovy/io/micronaut/core/execution/DelayedExecutionFlowSpec.groovy @@ -78,4 +78,35 @@ class DelayedExecutionFlowSpec extends Specification { } return output } + + def 'cancel'() { + given: + def delayed1 = DelayedExecutionFlow.create() + def delayed2 = DelayedExecutionFlow.create() + def delayed3 = DelayedExecutionFlow.create() + def delayed4 = DelayedExecutionFlow.create() + def out = delayed1.flatMap { a -> + return delayed2.map { b -> a + b } + }.flatMap { a -> + return delayed3.map { b -> a + b } + }.flatMap { a -> + return delayed4.map { b -> a + b } + } + Object result = null + out.onComplete((v, t) -> result = v) + + when: + delayed1.complete("foo") + out.cancel() + then:"cancellation is forwarded to the inner flows" + delayed1.cancelled + delayed2.cancelled + + when: + delayed2.complete("bar") + delayed3.complete("baz") + delayed4.complete("fizz") + then:"result is still forwarded" + result == "foobarbazfizz" + } } diff --git a/http-client/src/main/java/io/micronaut/http/client/netty/ConnectionManager.java b/http-client/src/main/java/io/micronaut/http/client/netty/ConnectionManager.java index a77770d3049..c6bb5662a79 100644 --- a/http-client/src/main/java/io/micronaut/http/client/netty/ConnectionManager.java +++ b/http-client/src/main/java/io/micronaut/http/client/netty/ConnectionManager.java @@ -18,6 +18,7 @@ import io.micronaut.core.annotation.Internal; import io.micronaut.core.annotation.NonNull; import io.micronaut.core.annotation.Nullable; +import io.micronaut.core.execution.ExecutionFlow; import io.micronaut.core.naming.NameUtils; import io.micronaut.core.propagation.PropagatedContext; import io.micronaut.core.reflect.InstantiationUtils; @@ -111,7 +112,6 @@ import org.slf4j.Logger; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; -import reactor.core.scheduler.Schedulers; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLException; @@ -299,6 +299,10 @@ public final ByteBufAllocator alloc() { return (ByteBufAllocator) bootstrap.config().options().getOrDefault(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT); } + EventLoopGroup getGroup() { + return group; + } + /** * For testing. * @@ -482,10 +486,10 @@ private SslContext buildSslContext(DefaultHttpClient.RequestKey requestKey) { * Get a connection for non-websocket http client methods. * * @param requestKey The remote to connect to - * @param blockHint Optional information about what threads are blocked for this connection request + * @param blockHint Optional information about what threads are blocked for this connection request * @return A mono that will complete once the channel is ready for transmission */ - public final Mono connect(DefaultHttpClient.RequestKey requestKey, @Nullable BlockHint blockHint) { + public final ExecutionFlow connect(DefaultHttpClient.RequestKey requestKey, @Nullable BlockHint blockHint) { return pools.computeIfAbsent(requestKey, Pool::new).acquire(blockHint); } @@ -1157,15 +1161,19 @@ protected void onNewConnectionFailure(@Nullable Throwable cause) throws Exceptio this.requestKey = requestKey; } - Mono acquire(@Nullable BlockHint blockHint) { - PoolSink sink = new CancellableMonoSink<>(blockHint); + ExecutionFlow acquire(@Nullable BlockHint blockHint) { + PendingRequest sink = new PendingRequest(blockHint); addPendingRequest(sink); Optional acquireTimeout = configuration.getConnectionPoolConfiguration().getAcquireTimeout(); //noinspection OptionalIsPresent if (acquireTimeout.isPresent()) { - return sink.asMono().timeout(acquireTimeout.get(), Schedulers.fromExecutor(group)); + return sink.flow().timeout(acquireTimeout.get(), group, (v, e) -> { + if (v != null) { + v.release(); + } + }); } else { - return sink.asMono(); + return sink.flow(); } } @@ -1173,7 +1181,7 @@ Mono acquire(@Nullable BlockHint blockHint) { void onNewConnectionFailure(@Nullable Throwable error) throws Exception { super.onNewConnectionFailure(error); // to avoid an infinite loop, fail one pending request. - Sinks.One pending = pollPendingRequest(); + PendingRequest pending = pollPendingRequest(); if (pending != null) { HttpClientException wrapped; if (error == null) { @@ -1182,7 +1190,7 @@ void onNewConnectionFailure(@Nullable Throwable error) throws Exception { } else { wrapped = new HttpClientException("Connect Error: " + error.getMessage(), error); } - if (pending.tryEmitError(decorate(wrapped)) == Sinks.EmitResult.OK) { + if (pending.tryCompleteExceptionally(decorate(wrapped))) { // no need to log return; } @@ -1356,9 +1364,8 @@ void windDownConnection() { * @param sink The request for a pool handle * @param ph The pool handle */ - final void emitPoolHandle(Sinks.One sink, PoolHandle ph) { - Sinks.EmitResult emitResult = sink.tryEmitValue(ph); - if (emitResult.isFailure()) { + final void emitPoolHandle(PendingRequest sink, PoolHandle ph) { + if (!sink.tryComplete(ph)) { ph.release(); } else { if (!configuration.getConnectionPoolConfiguration().isEnabled()) { @@ -1369,14 +1376,14 @@ final void emitPoolHandle(Sinks.One sink, PoolHandle ph) { } @Override - public boolean dispatch(PoolSink sink) { + public boolean dispatch(PendingRequest sink) { if (!tryEarmarkForRequest()) { return false; } - BlockHint blockHint = sink.getBlockHint(); + BlockHint blockHint = sink.blockHint; if (blockHint != null && blockHint.blocks(channel.eventLoop())) { - sink.tryEmitError(BlockHint.createException()); + sink.tryCompleteExceptionally(BlockHint.createException()); return true; } if (channel.eventLoop().inEventLoop()) { @@ -1397,7 +1404,7 @@ public boolean dispatch(PoolSink sink) { * * @param sink The request for a pool handle */ - abstract void dispatch0(PoolSink sink); + abstract void dispatch0(PendingRequest sink); /** * Try to add a new request to this connection. This is called outside the event loop, @@ -1470,7 +1477,7 @@ void fireReadTimeout(ChannelHandlerContext ctx) { } @Override - void dispatch0(PoolSink sink) { + void dispatch0(PendingRequest sink) { if (!channel.isActive()) { // make sure the request isn't dispatched to this connection again windDownConnection(); @@ -1519,7 +1526,7 @@ public void notifyRequestPipelineBuilt() { emitPoolHandle(sink, ph); } - private void returnPendingRequest(PoolSink sink) { + private void returnPendingRequest(PendingRequest sink) { // failed, but the pending request may still work on another connection. addPendingRequest(sink); hasLiveRequest = false; @@ -1602,7 +1609,7 @@ void fireReadTimeout(ChannelHandlerContext ctx) { } @Override - void dispatch0(PoolSink sink) { + void dispatch0(PendingRequest sink) { if (!channel.isActive() || windDownConnection) { // make sure the request isn't dispatched to this connection again windDownConnection(); @@ -1682,7 +1689,7 @@ void adaptHeaders(Object msg) { } } - private void returnPendingRequest(PoolSink sink) { + private void returnPendingRequest(PendingRequest sink) { // failed, but the pending request may still work on another connection. addPendingRequest(sink); earmarkedOrLiveRequests.decrementAndGet(); diff --git a/http-client/src/main/java/io/micronaut/http/client/netty/DefaultHttpClient.java b/http-client/src/main/java/io/micronaut/http/client/netty/DefaultHttpClient.java index 9093107247b..3b4a1bff28c 100644 --- a/http-client/src/main/java/io/micronaut/http/client/netty/DefaultHttpClient.java +++ b/http-client/src/main/java/io/micronaut/http/client/netty/DefaultHttpClient.java @@ -25,6 +25,7 @@ import io.micronaut.core.beans.BeanMap; import io.micronaut.core.convert.ConversionService; import io.micronaut.core.convert.ConversionServiceAware; +import io.micronaut.core.execution.DelayedExecutionFlow; import io.micronaut.core.execution.ExecutionFlow; import io.micronaut.core.io.ResourceResolver; import io.micronaut.core.io.buffer.ByteBuffer; @@ -802,7 +803,7 @@ public Publisher> dataStream(@NonNull io.micronaut.http.HttpRe public Publisher> dataStream(@NonNull io.micronaut.http.HttpRequest request, @NonNull Argument errorType) { setupConversionService(request); final io.micronaut.http.HttpRequest parentRequest = ServerRequestContext.currentRequest().orElse(null); - return new MicronautFlux<>(Flux.from(resolveRequestURI(request)) + return new MicronautFlux<>(Flux.from(ReactiveExecutionFlow.toPublisher(() -> resolveRequestURI(request))) .flatMap(requestURI -> dataStreamImpl(toMutableRequest(request), errorType, parentRequest, requestURI))) .doAfterNext(buffer -> { Object o = buffer.asNativeBuffer(); @@ -823,7 +824,7 @@ public Publisher>> exchangeStream(@NonNull io.mic public Publisher>> exchangeStream(@NonNull io.micronaut.http.HttpRequest request, @NonNull Argument errorType) { setupConversionService(request); io.micronaut.http.HttpRequest parentRequest = ServerRequestContext.currentRequest().orElse(null); - return new MicronautFlux<>(Flux.from(resolveRequestURI(request)) + return new MicronautFlux<>(Flux.from(ReactiveExecutionFlow.toPublisher(() -> resolveRequestURI(request))) .flatMap(uri -> exchangeStreamImpl(parentRequest, toMutableRequest(request), errorType, uri))) .doAfterNext(byteBufferHttpResponse -> { ByteBuffer buffer = byteBufferHttpResponse.body(); @@ -843,7 +844,7 @@ public Publisher jsonStream(@NonNull io.micronaut.http.HttpRequest setupConversionService(request); final io.micronaut.http.HttpRequest parentRequest = ServerRequestContext.currentRequest().orElse(null); setupConversionService(parentRequest); - return Flux.from(resolveRequestURI(request)) + return Flux.from(ReactiveExecutionFlow.toPublisher(() -> resolveRequestURI(request))) .flatMap(requestURI -> jsonStreamImpl(parentRequest, toMutableRequest(request), type, errorType, requestURI)); } @@ -870,16 +871,16 @@ public Publisher> exchange(@NonNull io.micronaut.http. private Mono> exchange(io.micronaut.http.HttpRequest request, Argument bodyType, Argument errorType, @Nullable BlockHint blockHint) { setupConversionService(request); final io.micronaut.http.HttpRequest parentRequest = ServerRequestContext.currentRequest().orElse(null); - Mono> mono = resolveRequestURI(request).flatMap(uri -> { + ExecutionFlow> mono = resolveRequestURI(request).flatMap(uri -> { MutableHttpRequest mutableRequest = toMutableRequest(request).uri(uri); //noinspection unchecked return sendRequestWithRedirects( parentRequest, blockHint, mutableRequest, - (req, resp) -> Mono.>from(ReactiveExecutionFlow.fromFlow(InternalByteBody.bufferFlow(resp.byteBody()) + (req, resp) -> InternalByteBody.bufferFlow(resp.byteBody()) .onErrorResume(t -> ExecutionFlow.error(handleResponseError(mutableRequest, t))) - .flatMap(av -> handleExchangeResponse(bodyType, errorType, resp, av))).toPublisher()) + .flatMap(av -> handleExchangeResponse(bodyType, errorType, resp, av)) ).map(r -> (HttpResponse) r); }); @@ -892,16 +893,17 @@ private Mono> exchange(io.micronaut.http.HttpRequest { if (throwable instanceof TimeoutException) { - return Mono.error(ReadTimeoutException.TIMEOUT_EXCEPTION); + return ExecutionFlow.error(ReadTimeoutException.TIMEOUT_EXCEPTION); } - return Mono.error(throwable); + return ExecutionFlow.error(throwable); }); } } - return mono; + ExecutionFlow> finalMono = mono; + return Mono.from(ReactiveExecutionFlow.toPublisher(() -> finalMono)); } private @NonNull ExecutionFlow> handleExchangeResponse(Argument bodyType, Argument errorType, NettyClientByteBodyResponse resp, CloseableAvailableByteBody av) { @@ -992,8 +994,8 @@ public Publisher retrieve(io.micronaut.http.HttpRequest request, @Override public Publisher connect(Class clientEndpointType, MutableHttpRequest request) { setupConversionService(request); - Publisher uriPublisher = resolveRequestURI(request); - return Flux.from(uriPublisher) + ExecutionFlow uriPublisher = resolveRequestURI(request); + return Flux.from(ReactiveExecutionFlow.toPublisher(() -> uriPublisher)) .switchMap(resolvedURI -> connectWebSocket(resolvedURI, request, clientEndpointType, null)); } @@ -1003,9 +1005,9 @@ public Publisher connect(Class clientEndpointTyp String uri = webSocketBean.getBeanDefinition().stringValue(ClientWebSocket.class).orElse("/ws"); uri = UriTemplate.of(uri).expand(parameters); MutableHttpRequest request = io.micronaut.http.HttpRequest.GET(uri); - Publisher uriPublisher = resolveRequestURI(request); + ExecutionFlow uriPublisher = resolveRequestURI(request); - return Flux.from(uriPublisher) + return Flux.from(ReactiveExecutionFlow.toPublisher(() -> uriPublisher)) .switchMap(resolvedURI -> connectWebSocket(resolvedURI, request, clientEndpointType, webSocketBean)); } @@ -1063,7 +1065,7 @@ private Publisher connectWebSocket(URI uri, MutableHttpRequest request } private Flux>> exchangeStreamImpl(io.micronaut.http.HttpRequest parentRequest, MutableHttpRequest request, Argument errorType, URI requestURI) { - Flux> streamResponsePublisher = Flux.from(buildStreamExchange(parentRequest, request, requestURI, errorType)); + Flux> streamResponsePublisher = Flux.from(ReactiveExecutionFlow.toPublisher(() -> buildStreamExchange(parentRequest, request, requestURI, errorType))); return streamResponsePublisher.switchMap(response -> { StreamedHttpResponse streamedHttpResponse = NettyHttpResponseBuilder.toStreamResponse(response); Flux httpContentReactiveSequence = Flux.from(streamedHttpResponse); @@ -1085,9 +1087,7 @@ private Flux>> exchangeStreamImpl(io.micronaut.ht } private Flux jsonStreamImpl(io.micronaut.http.HttpRequest parentRequest, MutableHttpRequest request, Argument type, Argument errorType, URI requestURI) { - Flux> streamResponsePublisher = - Flux.from(buildStreamExchange(parentRequest, request, requestURI, errorType)); - return streamResponsePublisher.switchMap(response -> { + return Flux.from(ReactiveExecutionFlow.toPublisher(() -> buildStreamExchange(parentRequest, request, requestURI, errorType))).switchMap(response -> { if (!(response instanceof NettyStreamedHttpResponse)) { throw new IllegalStateException("Response has been wrapped in non streaming type. Do not wrap the response in client filters for stream requests"); } @@ -1102,7 +1102,7 @@ private Flux jsonStreamImpl(io.micronaut.http.HttpRequest parentReq } private Flux> dataStreamImpl(MutableHttpRequest request, Argument errorType, io.micronaut.http.HttpRequest parentRequest, URI requestURI) { - Flux> streamResponsePublisher = Flux.from(buildStreamExchange(parentRequest, request, requestURI, errorType)); + Flux> streamResponsePublisher = Flux.from(ReactiveExecutionFlow.toPublisher(() -> buildStreamExchange(parentRequest, request, requestURI, errorType))); Function> contentMapper = message -> { ByteBuf byteBuf = message.content(); return byteBufferFactory.wrap(byteBuf); @@ -1123,7 +1123,7 @@ private Flux> dataStreamImpl(MutableHttpRequest request, Ar * Implementation of {@link #jsonStream}, {@link #dataStream}, {@link #exchangeStream}. */ @SuppressWarnings("MagicNumber") - private Publisher> buildStreamExchange( + private ExecutionFlow> buildStreamExchange( @Nullable io.micronaut.http.HttpRequest parentRequest, @NonNull MutableHttpRequest request, @NonNull URI requestURI, @@ -1155,7 +1155,7 @@ private Publisher> buildStreamExchange( } } - return readBodyOnError(errorType, Mono.>just(toStreamingResponse(resp, body)) + return readBodyOnError(errorType, ExecutionFlow.>just(toStreamingResponse(resp, body)) .flatMap(r -> handleStreamHttpError(r, true))); } ); @@ -1180,7 +1180,7 @@ public Publisher> proxy(@NonNull io.micronaut.http.HttpRe public Publisher> proxy(@NonNull io.micronaut.http.HttpRequest request, @NonNull ProxyRequestOptions options) { Objects.requireNonNull(options, "options"); setupConversionService(request); - return resolveRequestURI(request) + return Flux.from(ReactiveExecutionFlow.toPublisher(() -> resolveRequestURI(request) .flatMap(requestURI -> { MutableHttpRequest httpRequest = toMutableRequest(request); if (!options.isRetainHostHeader()) { @@ -1200,12 +1200,12 @@ public Publisher> proxy(@NonNull io.micronaut.http.HttpRe body = NettyByteBody.toByteBufs(resp.byteBody()).map(DefaultHttpContent::new); } - return Mono.>just(toStreamingResponse(resp, body)) + return ExecutionFlow.>just(toStreamingResponse(resp, body)) .flatMap(r -> handleStreamHttpError(r, false)); } ); }) - .map(HttpResponse::toMutableResponse); + .map(HttpResponse::toMutableResponse))); } private void setupConversionService(io.micronaut.http.HttpRequest httpRequest) { @@ -1219,7 +1219,7 @@ private void setupConversionService(io.micronaut.http.HttpRequest httpRequest * @param The input type * @return A {@link Publisher} with the resolved URI */ - protected Mono resolveRequestURI(io.micronaut.http.HttpRequest request) { + protected ExecutionFlow resolveRequestURI(io.micronaut.http.HttpRequest request) { return resolveRequestURI(request, true); } @@ -1229,11 +1229,11 @@ protected Mono resolveRequestURI(io.micronaut.http.HttpRequest reque * @param The input type * @return A {@link Publisher} with the resolved URI */ - protected Mono resolveRequestURI(io.micronaut.http.HttpRequest request, boolean includeContextPath) { + protected ExecutionFlow resolveRequestURI(io.micronaut.http.HttpRequest request, boolean includeContextPath) { URI requestURI = request.getUri(); if (requestURI.getScheme() != null) { // if the request URI includes a scheme then it is fully qualified so use the direct server - return Mono.just(requestURI); + return ExecutionFlow.just(requestURI); } else { return resolveURI(request, includeContextPath); } @@ -1245,11 +1245,11 @@ protected Mono resolveRequestURI(io.micronaut.http.HttpRequest reque * @param The input type * @return A {@link Publisher} with the resolved URI */ - protected Mono resolveRedirectURI(io.micronaut.http.HttpRequest parentRequest, io.micronaut.http.HttpRequest request) { + protected ExecutionFlow resolveRedirectURI(io.micronaut.http.HttpRequest parentRequest, io.micronaut.http.HttpRequest request) { URI requestURI = request.getUri(); if (requestURI.getScheme() != null) { // if the request URI includes a scheme then it is fully qualified so use the direct server - return Mono.just(requestURI); + return ExecutionFlow.just(requestURI); } else { if (parentRequest == null || parentRequest.getUri().getHost() == null) { return resolveURI(request, false); @@ -1260,7 +1260,7 @@ protected Mono resolveRedirectURI(io.micronaut.http.HttpRequest pare .userInfo(parentURI.getUserInfo()) .host(parentURI.getHost()) .port(parentURI.getPort()); - return Mono.just(uriBuilder.build()); + return ExecutionFlow.just(uriBuilder.build()); } } } @@ -1377,72 +1377,72 @@ private static boolean permitsRequestBody(HttpMethod method) { ); } - private Mono> readBodyOnError(@Nullable Argument errorType, @NonNull Mono> publisher) { + private ExecutionFlow> readBodyOnError(@Nullable Argument errorType, @NonNull ExecutionFlow> publisher) { if (errorType != null && errorType != HttpClient.DEFAULT_ERROR_TYPE) { return publisher.onErrorResume(clientException -> { if (clientException instanceof HttpClientResponseException exception) { final HttpResponse response = exception.getResponse(); if (response instanceof NettyStreamedHttpResponse streamedResponse) { - return Mono.create(emitter -> { - final StreamedHttpResponse nettyResponse = streamedResponse.getNettyResponse(); - nettyResponse.subscribe(new Subscriber<>() { - final CompositeByteBuf buffer = byteBufferFactory.getNativeAllocator().compositeBuffer(); - Subscription s; - @Override - public void onSubscribe(Subscription s) { - this.s = s; - s.request(1); - } + DelayedExecutionFlow> delayed = DelayedExecutionFlow.create(); + final StreamedHttpResponse nettyResponse = streamedResponse.getNettyResponse(); + nettyResponse.subscribe(new Subscriber<>() { + final CompositeByteBuf buffer = byteBufferFactory.getNativeAllocator().compositeBuffer(); + Subscription s; + @Override + public void onSubscribe(Subscription s) { + this.s = s; + s.request(1); + } - @Override - public void onNext(HttpContent httpContent) { - buffer.addComponent(true, httpContent.content()); - s.request(1); - } + @Override + public void onNext(HttpContent httpContent) { + buffer.addComponent(true, httpContent.content()); + s.request(1); + } - @Override - public void onError(Throwable t) { - buffer.release(); - emitter.error(t); - } + @Override + public void onError(Throwable t) { + buffer.release(); + delayed.completeExceptionally(t); + } - @Override - public void onComplete() { - try { - FullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(nettyResponse.protocolVersion(), nettyResponse.status(), buffer, nettyResponse.headers(), new DefaultHttpHeaders(true)); - final FullNettyClientHttpResponse fullNettyClientHttpResponse = new FullNettyClientHttpResponse<>(fullHttpResponse, handlerRegistry, (Argument) errorType, true, conversionService); - emitter.error(decorate(new HttpClientResponseException( - fullHttpResponse.status().reasonPhrase(), - null, - fullNettyClientHttpResponse, - new HttpClientErrorDecoder() { - @Override - public Argument getErrorType(MediaType mediaType) { - return errorType; - } + @Override + public void onComplete() { + try { + FullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(nettyResponse.protocolVersion(), nettyResponse.status(), buffer, nettyResponse.headers(), new DefaultHttpHeaders(true)); + final FullNettyClientHttpResponse fullNettyClientHttpResponse = new FullNettyClientHttpResponse<>(fullHttpResponse, handlerRegistry, (Argument) errorType, true, conversionService); + delayed.completeExceptionally(decorate(new HttpClientResponseException( + fullHttpResponse.status().reasonPhrase(), + null, + fullNettyClientHttpResponse, + new HttpClientErrorDecoder() { + @Override + public Argument getErrorType(MediaType mediaType) { + return errorType; } - ))); - } finally { - buffer.release(); - } + } + ))); + } finally { + buffer.release(); } - }); + } }); + return delayed; } } - return Mono.error(clientException); + return ExecutionFlow.error(clientException); }); } return publisher; } - private Mono resolveURI(io.micronaut.http.HttpRequest request, boolean includeContextPath) { + private ExecutionFlow resolveURI(io.micronaut.http.HttpRequest request, boolean includeContextPath) { URI requestURI = request.getUri(); if (loadBalancer == null) { - return Mono.error(decorate(new NoHostException("Request URI specifies no host to connect to"))); + return ExecutionFlow.error(decorate(new NoHostException("Request URI specifies no host to connect to"))); } - return Mono.from(loadBalancer.select(getLoadBalancerDiscriminator())).map(server -> { + return ReactiveExecutionFlow.fromPublisher(loadBalancer.select(getLoadBalancerDiscriminator())).map(server -> { Optional authInfo = server.getMetadata().get(io.micronaut.http.HttpHeaders.AUTHORIZATION_INFO, String.class); if (request instanceof MutableHttpRequest httpRequest && authInfo.isPresent()) { httpRequest.getHeaders().auth(authInfo.get()); @@ -1457,16 +1457,16 @@ private Mono resolveURI(io.micronaut.http.HttpRequest request, boole ); } - private > Mono handleStreamHttpError( + private > ExecutionFlow handleStreamHttpError( R response, boolean failOnError ) { boolean errorStatus = response.code() >= 400; if (errorStatus && failOnError) { // todo: close response properly - return Mono.error(decorate(new HttpClientResponseException(response.reason(), response))); + return ExecutionFlow.error(decorate(new HttpClientResponseException(response.reason(), response))); } else { - return Mono.just(response); + return ExecutionFlow.just(response); } } @@ -1475,20 +1475,21 @@ public Publisher> exchange(io.micronaut.http.HttpReque if (requestBody == null) { requestBody = AvailableNettyByteBody.empty(); } - Mono> mono = null; + ExecutionFlow> mono = null; try { mono = sendRequestWithRedirects( ServerRequestContext.currentRequest().orElse(null), blockedThread == null ? null : new BlockHint(blockedThread, null), new RawHttpRequestWrapper<>(conversionService, request.toMutableRequest(), requestBody), - (req, resp) -> Mono.just(resp) + (req, resp) -> ExecutionFlow.just(resp) ); } finally { if (mono == null) { requestBody.close(); } } - return mono.doOnTerminate(requestBody::close); + ExecutionFlow> finalMono = mono; + return Mono.from(ReactiveExecutionFlow.toPublisher(() -> finalMono)).doOnTerminate(requestBody::close); } /** @@ -1504,11 +1505,11 @@ public Publisher> exchange(io.micronaut.http.HttpReque * request than the original (which is why it has a request parameter) * @return A mono containing the response */ - private Mono> sendRequestWithRedirects( + private ExecutionFlow> sendRequestWithRedirects( io.micronaut.http.HttpRequest parentRequest, @Nullable BlockHint blockHint, MutableHttpRequest request, - BiFunction, NettyClientByteBodyResponse, ? extends Mono>> readResponse + BiFunction, NettyClientByteBodyResponse, ? extends ExecutionFlow>> readResponse ) { if (informationalServiceId != null && request.getAttribute(HttpAttributes.SERVICE_ID).isEmpty()) { request.setAttribute(HttpAttributes.SERVICE_ID, informationalServiceId); @@ -1530,44 +1531,38 @@ private Mono> sendRequestWithRedirects( protected ExecutionFlow> provideResponse(io.micronaut.http.HttpRequest request, PropagatedContext propagatedContext) { try { try (PropagatedContext.Scope ignore = propagatedContext.propagate()) { - return ReactiveExecutionFlow.fromPublisher(Mono.from(sendRequestWithRedirectsNoFilter( + return sendRequestWithRedirectsNoFilter( parentRequest, blockHint, MutableHttpRequestWrapper.wrapIfNecessary(conversionService, request), readResponse - ))); + ); } } catch (Throwable e) { return ExecutionFlow.error(e); } } }; - Mono> responseMono = Mono.from(ReactiveExecutionFlow.fromFlow(runner.run(request)).toPublisher()); + ExecutionFlow> responseMono = runner.run(request); if (parentRequest != null) { - responseMono = responseMono.contextWrite(c -> { - // existing entry takes precedence. The parentRequest is derived from a thread - // local, and is more likely to be wrong than any reactive context we are fed. - if (c.hasKey(ServerRequestContext.KEY)) { - return c; - } else { - return c.put(ServerRequestContext.KEY, parentRequest); - } - }); + // existing entry takes precedence. The parentRequest is derived from a thread + // local, and is more likely to be wrong than any reactive context we are fed. + responseMono = responseMono.putInContextIfAbsent(ServerRequestContext.KEY, parentRequest); } return responseMono; } - private Mono> sendRequestWithRedirectsNoFilter( + private ExecutionFlow> sendRequestWithRedirectsNoFilter( io.micronaut.http.HttpRequest parentRequest, @Nullable BlockHint blockHint, MutableHttpRequest request, - BiFunction, NettyClientByteBodyResponse, ? extends Mono>> readResponse + BiFunction, NettyClientByteBodyResponse, ? extends ExecutionFlow>> readResponse ) { RequestKey requestKey; try { requestKey = new RequestKey(this, request.getUri()); } catch (Exception e) { - return Mono.error(e); + return ExecutionFlow.error(e); } // first: connect return connectionManager.connect(requestKey, blockHint) @@ -1590,7 +1585,7 @@ private Mono> sendRequestWithRedirectsNoFilter( ); } catch (HttpPostRequestEncoder.ErrorDataEncoderException e) { poolHandle.release(); - return Mono.error(e); + return ExecutionFlow.error(e); } // send the raw request @@ -1635,7 +1630,7 @@ private Mono> sendRequestWithRedirectsNoFilter( * @param byteBody The request body * @return A mono containing the response */ - private Mono sendRawRequest( + private ExecutionFlow sendRawRequest( ConnectionManager.PoolHandle poolHandle, io.micronaut.http.HttpRequest request, NettyByteBody byteBody @@ -1649,139 +1644,146 @@ private Mono sendRawRequest( .toHttpRequestWithoutBody() .setUri(uriWithoutHost); - return Mono.create(sink -> { - if (log.isDebugEnabled()) { - log.debug("Sending HTTP {} to {}", request.getMethodName(), request.getUri()); - } + DelayedExecutionFlow flow = DelayedExecutionFlow.create(); + // need to run the create() on the event loop so that pipeline modification happens synchronously + if (poolHandle.channel.eventLoop().inEventLoop()) { + sendRawRequest0(poolHandle, request, byteBody, flow, nettyRequest); + } else { + poolHandle.channel.eventLoop().execute(() -> sendRawRequest0(poolHandle, request, byteBody, flow, nettyRequest)); + } + return flow; + } - boolean expectContinue = HttpUtil.is100ContinueExpected(nettyRequest); - ChannelPipeline pipeline = poolHandle.channel.pipeline(); + private void sendRawRequest0(ConnectionManager.PoolHandle poolHandle, io.micronaut.http.HttpRequest request, NettyByteBody byteBody, DelayedExecutionFlow sink, HttpRequest nettyRequest) { + if (log.isDebugEnabled()) { + log.debug("Sending HTTP {} to {}", request.getMethodName(), request.getUri()); + } - // if the body is streamed, we have a StreamWriter, otherwise we have a ByteBuf. - StreamWriter streamWriter; - ByteBuf byteBuf; - if (byteBody instanceof AvailableNettyByteBody available) { - byteBuf = AvailableNettyByteBody.toByteBuf(available); - streamWriter = null; - } else { - streamWriter = new StreamWriter((StreamingNettyByteBody) byteBody, e -> { - poolHandle.taint(); - sink.error(e); - }); - pipeline.addLast(streamWriter); - byteBuf = null; - } + boolean expectContinue = HttpUtil.is100ContinueExpected(nettyRequest); + ChannelPipeline pipeline = poolHandle.channel.pipeline(); - if (log.isTraceEnabled()) { - HttpHeadersUtil.trace(log, nettyRequest.headers().names(), nettyRequest.headers()::getAll); - if (byteBuf != null) { - traceBody("Request", byteBuf); - } + // if the body is streamed, we have a StreamWriter, otherwise we have a ByteBuf. + StreamWriter streamWriter; + ByteBuf byteBuf; + if (byteBody instanceof AvailableNettyByteBody available) { + byteBuf = AvailableNettyByteBody.toByteBuf(available); + streamWriter = null; + } else { + streamWriter = new StreamWriter((StreamingNettyByteBody) byteBody, e -> { + poolHandle.taint(); + sink.completeExceptionally(e); + }); + pipeline.addLast(streamWriter); + byteBuf = null; + } + + if (log.isTraceEnabled()) { + HttpHeadersUtil.trace(log, nettyRequest.headers().names(), nettyRequest.headers()::getAll); + if (byteBuf != null) { + traceBody("Request", byteBuf); } + } - pipeline.addLast(ChannelPipelineCustomizer.HANDLER_MICRONAUT_HTTP_RESPONSE, new Http1ResponseHandler(new Http1ResponseHandler.ResponseListener() { - boolean stillExpectingContinue = expectContinue; + pipeline.addLast(ChannelPipelineCustomizer.HANDLER_MICRONAUT_HTTP_RESPONSE, new Http1ResponseHandler(new Http1ResponseHandler.ResponseListener() { + boolean stillExpectingContinue = expectContinue; - @Override - public void fail(ChannelHandlerContext ctx, Throwable cause) { - poolHandle.taint(); - sink.error(handleResponseError(request, cause)); - } + @Override + public void fail(ChannelHandlerContext ctx, Throwable cause) { + poolHandle.taint(); + sink.completeExceptionally(handleResponseError(request, cause)); + } - @Override - public void continueReceived(ChannelHandlerContext ctx) { - if (stillExpectingContinue) { - stillExpectingContinue = false; - if (streamWriter == null) { - ctx.writeAndFlush(new DefaultLastHttpContent(byteBuf), ctx.voidPromise()); - } else { - streamWriter.startWriting(); - } + @Override + public void continueReceived(ChannelHandlerContext ctx) { + if (stillExpectingContinue) { + stillExpectingContinue = false; + if (streamWriter == null) { + ctx.writeAndFlush(new DefaultLastHttpContent(byteBuf), ctx.voidPromise()); + } else { + streamWriter.startWriting(); } } + } - @Override - public void complete(io.netty.handler.codec.http.HttpResponse response, CloseableByteBody body) { - if (!HttpUtil.isKeepAlive(response)) { - poolHandle.taint(); - } - - sink.success(new NettyClientByteBodyResponse(response, body, conversionService)); + @Override + public void complete(io.netty.handler.codec.http.HttpResponse response, CloseableByteBody body) { + if (!HttpUtil.isKeepAlive(response)) { + poolHandle.taint(); } - @Override - public BodySizeLimits sizeLimits() { - return DefaultHttpClient.this.sizeLimits(); - } + sink.complete(new NettyClientByteBodyResponse(response, body, conversionService)); + } - @Override - public boolean isHeadResponse() { - return nettyRequest.method().equals(HttpMethod.HEAD); - } + @Override + public BodySizeLimits sizeLimits() { + return DefaultHttpClient.this.sizeLimits(); + } - @Override - public void finish(ChannelHandlerContext ctx) { - ctx.pipeline().remove(ChannelPipelineCustomizer.HANDLER_MICRONAUT_HTTP_RESPONSE); - if (streamWriter != null) { - if (!streamWriter.isCompleted()) { - // if there was an error, and we didn't fully write the request yet, the - // connection cannot be reused - poolHandle.taint(); - } - ctx.pipeline().remove(streamWriter); - } - if (stillExpectingContinue && byteBuf != null) { - byteBuf.release(); + @Override + public boolean isHeadResponse() { + return nettyRequest.method().equals(HttpMethod.HEAD); + } + + @Override + public void finish(ChannelHandlerContext ctx) { + ctx.pipeline().remove(ChannelPipelineCustomizer.HANDLER_MICRONAUT_HTTP_RESPONSE); + if (streamWriter != null) { + if (!streamWriter.isCompleted()) { + // if there was an error, and we didn't fully write the request yet, the + // connection cannot be reused + poolHandle.taint(); } - poolHandle.release(); + ctx.pipeline().remove(streamWriter); } - })); - poolHandle.notifyRequestPipelineBuilt(); - - HttpHeaders headers = nettyRequest.headers(); - OptionalLong length = byteBody.expectedLength(); - if (length.isPresent()) { - headers.remove(HttpHeaderNames.TRANSFER_ENCODING); - if (length.getAsLong() != 0 || permitsRequestBody(nettyRequest.method())) { - headers.set(HttpHeaderNames.CONTENT_LENGTH, length.getAsLong()); + if (stillExpectingContinue && byteBuf != null) { + byteBuf.release(); } - } else { - headers.remove(HttpHeaderNames.CONTENT_LENGTH); - headers.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); + poolHandle.release(); + } + })); + poolHandle.notifyRequestPipelineBuilt(); + + HttpHeaders headers = nettyRequest.headers(); + OptionalLong length = byteBody.expectedLength(); + if (length.isPresent()) { + headers.remove(HttpHeaderNames.TRANSFER_ENCODING); + if (length.getAsLong() != 0 || permitsRequestBody(nettyRequest.method())) { + headers.set(HttpHeaderNames.CONTENT_LENGTH, length.getAsLong()); } + } else { + headers.remove(HttpHeaderNames.CONTENT_LENGTH); + headers.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); + } - if (!poolHandle.http2) { - if (poolHandle.canReturn()) { - nettyRequest.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); - } else { - nettyRequest.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); - } + if (!poolHandle.http2) { + if (poolHandle.canReturn()) { + nettyRequest.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + } else { + nettyRequest.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); } + } - Channel channel = poolHandle.channel(); - if (streamWriter == null) { - if (!expectContinue) { - // it's a bit more efficient to use a full request for HTTP/2 - channel.writeAndFlush(new DefaultFullHttpRequest( - nettyRequest.protocolVersion(), - nettyRequest.method(), - nettyRequest.uri(), - byteBuf, - nettyRequest.headers(), - EmptyHttpHeaders.INSTANCE - ), channel.voidPromise()); - } else { - channel.writeAndFlush(nettyRequest, channel.voidPromise()); - } + Channel channel = poolHandle.channel(); + if (streamWriter == null) { + if (!expectContinue) { + // it's a bit more efficient to use a full request for HTTP/2 + channel.writeAndFlush(new DefaultFullHttpRequest( + nettyRequest.protocolVersion(), + nettyRequest.method(), + nettyRequest.uri(), + byteBuf, + nettyRequest.headers(), + EmptyHttpHeaders.INSTANCE + ), channel.voidPromise()); } else { channel.writeAndFlush(nettyRequest, channel.voidPromise()); - if (!expectContinue) { - streamWriter.startWriting(); - } } - - // need to run the create() on the event loop so that pipeline modification happens synchronously - }).subscribeOn(Schedulers.fromExecutor(poolHandle.channel.eventLoop())); + } else { + channel.writeAndFlush(nettyRequest, channel.voidPromise()); + if (!expectContinue) { + streamWriter.startWriting(); + } + } } private ByteBuf charSequenceToByteBuf(CharSequence bodyValue, MediaType requestContentType) { diff --git a/http-client/src/main/java/io/micronaut/http/client/netty/PoolResizer.java b/http-client/src/main/java/io/micronaut/http/client/netty/PoolResizer.java index ac0ee2ea3a8..7d7179e59fc 100644 --- a/http-client/src/main/java/io/micronaut/http/client/netty/PoolResizer.java +++ b/http-client/src/main/java/io/micronaut/http/client/netty/PoolResizer.java @@ -17,15 +17,17 @@ import io.micronaut.core.annotation.Internal; import io.micronaut.core.annotation.Nullable; +import io.micronaut.core.execution.DelayedExecutionFlow; +import io.micronaut.core.execution.ExecutionFlow; import io.micronaut.http.client.HttpClientConfiguration; import io.micronaut.http.client.exceptions.HttpClientException; import org.slf4j.Logger; -import reactor.core.publisher.Sinks; import java.util.Deque; import java.util.List; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -50,7 +52,7 @@ abstract class PoolResizer { private final AtomicInteger pendingConnectionCount = new AtomicInteger(0); - private final Deque> pendingRequests = new ConcurrentLinkedDeque<>(); + private final Deque pendingRequests = new ConcurrentLinkedDeque<>(); private final List http1Connections = new CopyOnWriteArrayList<>(); private final List http2Connections = new CopyOnWriteArrayList<>(); @@ -99,7 +101,7 @@ private void dirty() { private void doSomeWork() { BlockHint blockedPendingRequests = null; while (true) { - PoolSink toDispatch = pendingRequests.pollFirst(); + PendingRequest toDispatch = pendingRequests.pollFirst(); if (toDispatch == null) { break; } @@ -121,7 +123,7 @@ private void doSomeWork() { if (!dispatched) { pendingRequests.addFirst(toDispatch); blockedPendingRequests = - BlockHint.combine(blockedPendingRequests, toDispatch.getBlockHint()); + BlockHint.combine(blockedPendingRequests, toDispatch.blockHint); break; } } @@ -164,12 +166,12 @@ private void doSomeWork() { } } - private boolean dispatchSafe(ResizerConnection connection, PoolSink toDispatch) { + private boolean dispatchSafe(ResizerConnection connection, PendingRequest toDispatch) { try { return connection.dispatch(toDispatch); } catch (Exception e) { try { - if (toDispatch.tryEmitError(e) != Sinks.EmitResult.OK) { + if (!toDispatch.tryCompleteExceptionally(e)) { // this is probably fine, log it anyway log.debug("Failure during connection dispatch operation, but dispatch request was already complete.", e); } @@ -224,19 +226,18 @@ final void onConnectionInactive2(ResizerConnection connection) { dirty(); } - final void addPendingRequest(PoolSink sink) { + final void addPendingRequest(PendingRequest sink) { int maxPendingAcquires = connectionPoolConfiguration.getMaxPendingAcquires(); if (maxPendingAcquires != Integer.MAX_VALUE && pendingRequests.size() >= maxPendingAcquires) { - sink.tryEmitError(new HttpClientException("Cannot acquire connection, exceeded max pending acquires configuration")); + sink.tryCompleteExceptionally(new HttpClientException("Cannot acquire connection, exceeded max pending acquires configuration")); return; } pendingRequests.addLast(sink); dirty(); } - @Nullable - final Sinks.One pollPendingRequest() { - Sinks.One req = pendingRequests.pollFirst(); + final PendingRequest pollPendingRequest() { + PendingRequest req = pendingRequests.pollFirst(); if (req != null) { dirty(); } @@ -281,6 +282,42 @@ abstract static class ResizerConnection { * @return {@code true} if the acquisition may succeed (if it fails later, the pending * request must be readded), or {@code false} if it fails immediately */ - abstract boolean dispatch(PoolSink sink) throws Exception; + abstract boolean dispatch(PendingRequest sink) throws Exception; + } + + static final class PendingRequest extends AtomicBoolean { + final @Nullable BlockHint blockHint; + private final DelayedExecutionFlow sink = DelayedExecutionFlow.create(); + + PendingRequest(@Nullable BlockHint blockHint) { + this.blockHint = blockHint; + } + + ExecutionFlow flow() { + return sink; + } + + // DelayedExecutionFlow does not allow concurrent completes, so this is a simple guard + + boolean tryCompleteExceptionally(Throwable t) { + if (compareAndSet(false, true)) { + sink.completeExceptionally(t); + return true; + } else { + return false; + } + } + + boolean tryComplete(ConnectionManager.PoolHandle value) { + if (compareAndSet(false, true)) { + if (sink.isCancelled()) { + return false; + } + sink.complete(value); + return true; + } else { + return false; + } + } } } diff --git a/http-client/src/test/groovy/io/micronaut/http/client/ClientScopeSpec.groovy b/http-client/src/test/groovy/io/micronaut/http/client/ClientScopeSpec.groovy index 85765e09694..b01189fc594 100644 --- a/http-client/src/test/groovy/io/micronaut/http/client/ClientScopeSpec.groovy +++ b/http-client/src/test/groovy/io/micronaut/http/client/ClientScopeSpec.groovy @@ -14,12 +14,12 @@ import io.micronaut.http.annotation.Get import io.micronaut.http.client.annotation.Client import io.micronaut.http.client.exceptions.NoHostException import io.micronaut.http.client.netty.DefaultHttpClient +import io.micronaut.http.reactive.execution.ReactiveExecutionFlow import io.micronaut.jackson.annotation.JacksonFeatures import io.micronaut.runtime.server.EmbeddedServer import jakarta.inject.Inject import jakarta.inject.Singleton -import reactor.core.publisher.Flux -import spock.lang.IgnoreIf +import reactor.core.publisher.Mono import spock.lang.Retry import spock.lang.Specification @@ -89,8 +89,8 @@ class ClientScopeSpec extends Specification { then: //thrown(HttpClientException) - Flux.from(((DefaultHttpClient) myJavaService.client) - .resolveRequestURI(HttpRequest.GET("/foo"))).blockFirst().toString() == "http://localhost:${embeddedServer2.port}/foo" + Mono.from(ReactiveExecutionFlow.toPublisher(() -> ((DefaultHttpClient) myJavaService.client) + .resolveRequestURI(HttpRequest.GET("/foo")))).block().toString() == "http://localhost:${embeddedServer2.port}/foo" when:"test service definition with declarative client with jackson features" MyServiceJacksonFeatures jacksonFeatures = embeddedServer2.applicationContext.getBean(MyServiceJacksonFeatures) diff --git a/http-netty/src/main/java/io/micronaut/http/netty/body/AvailableNettyByteBody.java b/http-netty/src/main/java/io/micronaut/http/netty/body/AvailableNettyByteBody.java index 5aff7564781..962166591ab 100644 --- a/http-netty/src/main/java/io/micronaut/http/netty/body/AvailableNettyByteBody.java +++ b/http-netty/src/main/java/io/micronaut/http/netty/body/AvailableNettyByteBody.java @@ -156,6 +156,11 @@ protected Flux toByteBufPublisher() { return NettyByteBufferFactory.DEFAULT.wrap(claim()); } + @Override + public @NonNull CloseableByteBody move() { + return new AvailableNettyByteBody(claim()); + } + @Override public @NonNull String toString(Charset charset) { ByteBuf b = claim(); diff --git a/http-netty/src/main/java/io/micronaut/http/netty/body/StreamingNettyByteBody.java b/http-netty/src/main/java/io/micronaut/http/netty/body/StreamingNettyByteBody.java index bb4041e111e..b403396c4c8 100644 --- a/http-netty/src/main/java/io/micronaut/http/netty/body/StreamingNettyByteBody.java +++ b/http-netty/src/main/java/io/micronaut/http/netty/body/StreamingNettyByteBody.java @@ -181,6 +181,16 @@ public void error(Throwable e) { return sharedBuffer.subscribeFull(upstream, forceDelaySubscribe).map(AvailableNettyByteBody::new); } + @Override + public @NonNull CloseableByteBody move() { + BufferConsumer.Upstream upstream = this.upstream; + if (upstream == null) { + failClaim(); + } + this.upstream = null; + return new StreamingNettyByteBody(sharedBuffer, forceDelaySubscribe, upstream); + } + @Override public void close() { BufferConsumer.Upstream upstream = this.upstream; diff --git a/http-netty/src/test/groovy/io/micronaut/http/netty/body/AvailableNettyByteBodySpec.groovy b/http-netty/src/test/groovy/io/micronaut/http/netty/body/AvailableNettyByteBodySpec.groovy new file mode 100644 index 00000000000..003cf85439c --- /dev/null +++ b/http-netty/src/test/groovy/io/micronaut/http/netty/body/AvailableNettyByteBodySpec.groovy @@ -0,0 +1,19 @@ +package io.micronaut.http.netty.body + +import io.netty.buffer.Unpooled +import spock.lang.Specification + +import java.nio.charset.StandardCharsets + +class AvailableNettyByteBodySpec extends Specification { + def move() { + given: + def a = new AvailableNettyByteBody(Unpooled.copiedBuffer("foo", StandardCharsets.UTF_8)) + def b = a.move() + + when: + a.close() + then: + b.buffer().get().toString(StandardCharsets.UTF_8) == "foo" + } +} diff --git a/http-netty/src/test/groovy/io/micronaut/http/netty/body/StreamingNettyByteBodySpec.groovy b/http-netty/src/test/groovy/io/micronaut/http/netty/body/StreamingNettyByteBodySpec.groovy new file mode 100644 index 00000000000..f5a6e8cecde --- /dev/null +++ b/http-netty/src/test/groovy/io/micronaut/http/netty/body/StreamingNettyByteBodySpec.groovy @@ -0,0 +1,21 @@ +package io.micronaut.http.netty.body + +import io.netty.buffer.Unpooled +import io.netty.channel.embedded.EmbeddedChannel +import reactor.core.publisher.Flux +import spock.lang.Specification + +import java.nio.charset.StandardCharsets + +class StreamingNettyByteBodySpec extends Specification { + def move() { + given: + def a = NettyBodyAdapter.adapt(Flux.just(Unpooled.copiedBuffer("foo", StandardCharsets.UTF_8)), new EmbeddedChannel().eventLoop()) + def b = a.move() + + when: + a.close() + then: + b.buffer().get().toString(StandardCharsets.UTF_8) == "foo" + } +} diff --git a/http/src/main/java/io/micronaut/http/body/ByteBody.java b/http/src/main/java/io/micronaut/http/body/ByteBody.java index d4d5f8b8c2b..49b565ab6bc 100644 --- a/http/src/main/java/io/micronaut/http/body/ByteBody.java +++ b/http/src/main/java/io/micronaut/http/body/ByteBody.java @@ -138,8 +138,25 @@ default ByteBody allowDiscard() { * * @return A future that completes when all bytes are available */ + @NonNull CompletableFuture buffer(); + /** + * Create a new {@link CloseableByteBody} with the same content but an independent lifecycle, + * claiming this body in the process. + *

This is a primary operation. After this operation, no other primary operation or + * {@link #split()} may be done. + *

The purpose of this method is to move the data to a different component in an + * application, making clear that the receiving component claims ownership of the body. If the + * sending component then closes the original {@link ByteBody} for example, it will have no + * impact on the new {@link CloseableByteBody} that the receiver is working with. + * + * @return A new {@link CloseableByteBody} with the same content. + * @since 4.8.0 + */ + @NonNull + CloseableByteBody move(); + /** * This enum controls how backpressure should be handled if one of the two bodies * ("downstreams") is consuming data slower than the other. diff --git a/http/src/main/java/io/micronaut/http/body/stream/AvailableByteArrayBody.java b/http/src/main/java/io/micronaut/http/body/stream/AvailableByteArrayBody.java index c29c392a826..258ccc9e366 100644 --- a/http/src/main/java/io/micronaut/http/body/stream/AvailableByteArrayBody.java +++ b/http/src/main/java/io/micronaut/http/body/stream/AvailableByteArrayBody.java @@ -22,6 +22,7 @@ import io.micronaut.core.io.buffer.ByteBufferFactory; import io.micronaut.core.util.ArgumentUtils; import io.micronaut.http.body.CloseableAvailableByteBody; +import io.micronaut.http.body.CloseableByteBody; import io.micronaut.http.body.InternalByteBody; import java.io.ByteArrayInputStream; @@ -88,6 +89,11 @@ public long length() { return bufferFactory.wrap(toByteArray()); } + @Override + public @NonNull CloseableByteBody move() { + return new AvailableByteArrayBody(bufferFactory, toByteArray()); + } + @Override public void close() { array = null; diff --git a/http/src/main/java/io/micronaut/http/body/stream/InputStreamByteBody.java b/http/src/main/java/io/micronaut/http/body/stream/InputStreamByteBody.java index 4bf3377f884..2dafbdf3e3e 100644 --- a/http/src/main/java/io/micronaut/http/body/stream/InputStreamByteBody.java +++ b/http/src/main/java/io/micronaut/http/body/stream/InputStreamByteBody.java @@ -161,6 +161,11 @@ public void close() { }); } + @Override + public @NonNull CloseableByteBody move() { + return new InputStreamByteBody(context, toInputStream()); + } + private record Context( OptionalLong expectedLength, Executor ioExecutor, diff --git a/http/src/main/java/io/micronaut/http/reactive/execution/ReactiveExecutionFlow.java b/http/src/main/java/io/micronaut/http/reactive/execution/ReactiveExecutionFlow.java index 2fce4f4cb33..92011406942 100644 --- a/http/src/main/java/io/micronaut/http/reactive/execution/ReactiveExecutionFlow.java +++ b/http/src/main/java/io/micronaut/http/reactive/execution/ReactiveExecutionFlow.java @@ -35,7 +35,7 @@ * @since 4.0.0 */ @Internal -public interface ReactiveExecutionFlow extends ExecutionFlow { +public sealed interface ReactiveExecutionFlow extends ExecutionFlow permits ReactorExecutionFlowImpl { /** * Creates a new reactive flow from a publisher. diff --git a/http/src/main/java/io/micronaut/http/reactive/execution/ReactorExecutionFlowImpl.java b/http/src/main/java/io/micronaut/http/reactive/execution/ReactorExecutionFlowImpl.java index 1c18bd23b92..c09ed27940d 100644 --- a/http/src/main/java/io/micronaut/http/reactive/execution/ReactorExecutionFlowImpl.java +++ b/http/src/main/java/io/micronaut/http/reactive/execution/ReactorExecutionFlowImpl.java @@ -16,6 +16,7 @@ package io.micronaut.http.reactive.execution; import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.NonNull; import io.micronaut.core.annotation.Nullable; import io.micronaut.core.execution.ExecutionFlow; import io.micronaut.core.execution.ImperativeExecutionFlow; @@ -29,6 +30,8 @@ import reactor.util.context.Context; import reactor.util.context.ContextView; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; @@ -45,6 +48,7 @@ final class ReactorExecutionFlowImpl implements ReactiveExecutionFlow { private Mono value; + private List subscriptionsToCancel = new ArrayList<>(1); ReactorExecutionFlowImpl(Publisher value) { this(value instanceof Flux flux ? flux.next() : Mono.from(value)); @@ -84,6 +88,30 @@ public ExecutionFlow putInContext(String key, Object value) { return this; } + @Override + public @NonNull ExecutionFlow putInContextIfAbsent(@NonNull String key, @NonNull Object value) { + this.value = this.value.contextWrite(context -> { + if (!context.hasKey(key)) { + return context.put(key, value); + } else { + return context; + } + }); + return this; + } + + @Override + public void cancel() { + List stc; + synchronized (this) { + stc = subscriptionsToCancel; + subscriptionsToCancel = null; + } + for (Subscription subscription : stc) { + subscription.cancel(); + } + } + @Override public void onComplete(BiConsumer fn) { value.subscribe(new CoreSubscriber<>() { @@ -102,7 +130,20 @@ public Context currentContext() { @Override public void onSubscribe(Subscription s) { this.subscription = s; - s.request(1); + boolean cancel; + synchronized (ReactorExecutionFlowImpl.this) { + if (subscriptionsToCancel == null) { + cancel = true; + } else { + subscriptionsToCancel.add(subscription); + cancel = false; + } + } + if (cancel) { + s.cancel(); + } else { + s.request(1); + } } @Override @@ -173,7 +214,7 @@ public void accept(Object o, Throwable throwable) { } }; next.onComplete(reactiveConsumer); - return sink.asMono(); + return sink.asMono().doOnCancel(next::cancel); }); } } diff --git a/http/src/test/groovy/io/micronaut/http/body/stream/AvailableByteArrayBodySpec.groovy b/http/src/test/groovy/io/micronaut/http/body/stream/AvailableByteArrayBodySpec.groovy new file mode 100644 index 00000000000..0747f469995 --- /dev/null +++ b/http/src/test/groovy/io/micronaut/http/body/stream/AvailableByteArrayBodySpec.groovy @@ -0,0 +1,19 @@ +package io.micronaut.http.body.stream + +import io.micronaut.core.io.buffer.ByteArrayBufferFactory +import spock.lang.Specification + +import java.nio.charset.StandardCharsets + +class AvailableByteArrayBodySpec extends Specification { + def move() { + given: + def a = AvailableByteArrayBody.create(ByteArrayBufferFactory.INSTANCE, "foo".getBytes(StandardCharsets.UTF_8)) + def b = a.move() + + when: + a.close() + then: + b.buffer().get().toString(StandardCharsets.UTF_8) == "foo" + } +} diff --git a/http/src/test/groovy/io/micronaut/http/body/stream/InputStreamByteBodySpec.groovy b/http/src/test/groovy/io/micronaut/http/body/stream/InputStreamByteBodySpec.groovy new file mode 100644 index 00000000000..326ba430395 --- /dev/null +++ b/http/src/test/groovy/io/micronaut/http/body/stream/InputStreamByteBodySpec.groovy @@ -0,0 +1,24 @@ +package io.micronaut.http.body.stream + +import io.micronaut.core.io.buffer.ByteArrayBufferFactory +import spock.lang.Specification + +import java.nio.charset.StandardCharsets +import java.util.concurrent.Executors + +class InputStreamByteBodySpec extends Specification { + def move() { + given: + def pool = Executors.newCachedThreadPool() + def a = InputStreamByteBody.create(new ByteArrayInputStream("foo".getBytes(StandardCharsets.UTF_8)), OptionalLong.empty(), pool, ByteArrayBufferFactory.INSTANCE) + def b = a.move() + + when: + a.close() + then: + b.buffer().get().toString(StandardCharsets.UTF_8) == "foo" + + cleanup: + pool.shutdown() + } +}