Skip to content

Commit

Permalink
Server-side AsyncContext initialized in lifecycle observer is lost (#…
Browse files Browse the repository at this point in the history
…3111)

Motivation:

If users put any data into `AsyncContext` inside `HttpLifecycleObserver`
configured via `HttpServerBuilder.lifecycleObserver(...)` it won't be
visible for filters and service because
`ClearAsyncContextHttpServiceFilter` is appended after
`HttpLifecycleObserverServiceFilter` inside `applyInternalFilters`.

Modifications:
1. Move `ClearAsyncContextHttpServiceFilter` from the beginning of
`noOffloadServiceFilters` to `applyInternalFilters`. However, this move
discovered another bug in path that handles
`noOffloadServiceFilters.isEmpty()` case because it never adds
`OffloadingFilter`.
2. Refactor `listenForService` method to always make a copy of filters
lists, then prepend/append internal filters to those copies in easy to
read/understand way, and construct a single filters `Stream` for
`buildService` method.
3. Refactor `OffloadingFilter` to act as a regular filter factory
instead of taking all further `serviceFilters` as pre-built factory.
4. Rename `ClearAsyncContextHttpServiceFilter` singleton instance.
5. Fix another bug when `OptionalSslNegotiator` binder takes a raw
service instead of `filteredService` (user-defined filters were never
applied for this path).
6. Modify `AbstractHttpServiceAsyncContextTest` to test `AsyncContext`
propagation from lifecycle observer and non-offloading filters.
7. Modify `AbstractHttpServiceAsyncContextTest` to test `AsyncContext`
initialized by early/late connection acceptor is not visible at request
path.
8. Add `BlockingHttpServiceAsyncContextTest` and
`HttpServiceAsyncContextTest` to make sure `AsyncContext` propagation
for blocking/async aggregated services is also tested.
9. Use `ParameterizedTest` where possible.

Results:

1. `AsyncContext` initialized inside server's lifecycle observer is
visible through filter chain and services.
2. Construction of server-side filter chain is consistent and
sequential.
3. Filters are applied for servers with optional TLS.
  • Loading branch information
idelpivnitskiy authored Nov 18, 2024
1 parent 158d4c3 commit d7dd886
Show file tree
Hide file tree
Showing 10 changed files with 574 additions and 208 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
// this ExecutionStrategy to understand if we need to offload more than we already offloaded:
final HttpExecutionStrategy additionalOffloads = ctx.executionContext().executionStrategy().missing(strategy);

Executor useExecutor = null != executor ? executor : ctx.executionContext().executor();
final Executor useExecutor = executor != null ? executor : ctx.executionContext().executor();

// The service should see this ExecutionStrategy and Executor inside the ExecutionContext:
final HttpServiceContext wrappedCtx =
Expand Down Expand Up @@ -136,12 +136,12 @@ public static StreamingHttpService offloadService(final HttpExecutionStrategy st
new StreamingHttpService() {
@Override
public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
StreamingHttpRequest request,
final StreamingHttpRequest request,
final StreamingHttpResponseFactory responseFactory) {
Executor useExecutor = null != executor ? executor : ctx.executionContext().executor();
final Executor useExecutor = executor != null ? executor : ctx.executionContext().executor();

// The service should see this ExecutionStrategy and Executor inside the ExecutionContext:
HttpServiceContext wrappedCtx =
final HttpServiceContext wrappedCtx =
new ExecutionContextOverridingServiceContext(ctx, strategy, useExecutor);

return service.handle(wrappedCtx, request, responseFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
*/
final class ClearAsyncContextHttpServiceFilter implements StreamingHttpServiceFilterFactory {

static final ClearAsyncContextHttpServiceFilter CLEAR_ASYNC_CONTEXT_HTTP_SERVICE_FILTER =
new ClearAsyncContextHttpServiceFilter();
static final ClearAsyncContextHttpServiceFilter INSTANCE = new ClearAsyncContextHttpServiceFilter();

private ClearAsyncContextHttpServiceFilter() {
// singleton
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,23 @@
final class OffloadingFilter implements StreamingHttpServiceFilterFactory {

private final HttpExecutionStrategy strategy;
private final StreamingHttpServiceFilterFactory offloaded;
private final BooleanSupplier shouldOffload;

/**
* Creates a new instance.
*
* @param strategy Execution strategy for the offloaded filters
* @param offloaded Filters to be offloaded
* @param shouldOffload returns true if offloading is appropriate for the current execution context.
*/
OffloadingFilter(HttpExecutionStrategy strategy, StreamingHttpServiceFilterFactory offloaded,
BooleanSupplier shouldOffload) {
OffloadingFilter(final HttpExecutionStrategy strategy, final BooleanSupplier shouldOffload) {
this.strategy = strategy;
this.offloaded = offloaded;
this.shouldOffload = shouldOffload;
}

@Override
public StreamingHttpServiceFilter create(StreamingHttpService service) {
public StreamingHttpServiceFilter create(final StreamingHttpService service) {
StreamingHttpService offloadedService = StreamingHttpServiceToOffloadedStreamingHttpService.offloadService(
strategy, null, shouldOffload, offloaded.create(service));
strategy, null, shouldOffload, service);
return new StreamingHttpServiceFilter(offloadedService);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright © 2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.netty;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

abstract class AbstractAsyncHttpServiceAsyncContextTest extends AbstractHttpServiceAsyncContextTest {

@Test
void newRequestsGetFreshContextImmediate() throws Exception {
newRequestsGetFreshContext(true);
}

private static List<Arguments> params() {
List<Arguments> params = new ArrayList<>();
for (boolean useImmediate : Arrays.asList(false, true)) {
for (InitContextKeyPlace place : InitContextKeyPlace.values()) {
for (boolean asyncService : Arrays.asList(false, true)) {
if (!useImmediate && !asyncService) {
continue;
}
params.add(Arguments.of(useImmediate, place, asyncService));
}
}
}
return params;
}

@ParameterizedTest(name = "{displayName} [{index}]: useImmediate={0} initContextKeyPlace={1} asyncService={2}")
@MethodSource("params")
void contextPreservedOverFilterBoundariesAsync(boolean useImmediate, InitContextKeyPlace place,
boolean asyncService) throws Exception {
contextPreservedOverFilterBoundaries(useImmediate, place, asyncService);
}

@ParameterizedTest(name = "{displayName} [{index}]: connectionAcceptorType={0}")
@EnumSource(ConnectionAcceptorType.class)
void connectionAcceptorContextDoesNotLeakImmediate(ConnectionAcceptorType type) throws Exception {
connectionAcceptorContextDoesNotLeak(type, true);
}
}
Loading

0 comments on commit d7dd886

Please sign in to comment.