From c17d5c6c211db0201569780a87359954644037c6 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Thu, 28 Jul 2022 22:40:38 -0700 Subject: [PATCH 1/2] Remove unused internal code from `servicetalk-grpc-api` Motivation: #1893 removed gRPC filter functionality, but left some internal code required to support removed filters. Modifications: - Remove `GrpcRoutes#drainToStreamingRoutes()`; - Remove `GrpcRouteConversions`, remove conversions from `RouteProvider`; - Make `RouteProvider` class private; - Remove unused `GrpcUtils#newResponse` overload; Result: No unused code. --- .../grpc/api/GrpcRouteConversions.java | 326 ------------------ .../io/servicetalk/grpc/api/GrpcRouter.java | 147 ++------ .../io/servicetalk/grpc/api/GrpcRoutes.java | 45 --- .../io/servicetalk/grpc/api/GrpcUtils.java | 10 - 4 files changed, 35 insertions(+), 493 deletions(-) delete mode 100644 servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouteConversions.java diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouteConversions.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouteConversions.java deleted file mode 100644 index ac750a4429..0000000000 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouteConversions.java +++ /dev/null @@ -1,326 +0,0 @@ -/* - * Copyright © 2019 Apple Inc. and the ServiceTalk project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.servicetalk.grpc.api; - -import io.servicetalk.concurrent.CompletableSource; -import io.servicetalk.concurrent.GracefulAutoCloseable; -import io.servicetalk.concurrent.PublisherSource.Subscriber; -import io.servicetalk.concurrent.api.AsyncCloseable; -import io.servicetalk.concurrent.api.AsyncCloseables; -import io.servicetalk.concurrent.api.Completable; -import io.servicetalk.concurrent.api.Publisher; -import io.servicetalk.concurrent.api.Single; -import io.servicetalk.concurrent.api.internal.ConnectablePayloadWriter; -import io.servicetalk.grpc.api.GrpcRoutes.BlockingRoute; -import io.servicetalk.grpc.api.GrpcRoutes.BlockingStreamingRoute; -import io.servicetalk.grpc.api.GrpcRoutes.RequestStreamingRoute; -import io.servicetalk.grpc.api.GrpcRoutes.ResponseStreamingRoute; -import io.servicetalk.grpc.api.GrpcRoutes.Route; -import io.servicetalk.grpc.api.GrpcRoutes.StreamingRoute; - -import java.io.IOException; - -import static io.servicetalk.concurrent.Cancellable.IGNORE_CANCEL; -import static io.servicetalk.concurrent.api.Processors.newCompletableProcessor; -import static io.servicetalk.concurrent.api.Publisher.from; -import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; -import static io.servicetalk.concurrent.api.SourceAdapters.toSource; -import static io.servicetalk.concurrent.internal.SubscriberUtils.handleExceptionFromOnSubscribe; -import static io.servicetalk.oio.api.internal.PayloadWriterUtils.safeClose; -import static io.servicetalk.utils.internal.PlatformDependent.throwException; -import static java.util.Objects.requireNonNull; - -final class GrpcRouteConversions { - private GrpcRouteConversions() { - // No instance - } - - static Route toRoute( - final StreamingRoute original) { - requireNonNull(original); - return new Route() { - @Override - public Single handle(final GrpcServiceContext ctx, final Req request) { - return original.handle(ctx, from(request)).firstOrError(); - } - - @Override - public Completable closeAsync() { - return original.closeAsync(); - } - - @Override - public Completable closeAsyncGracefully() { - return original.closeAsyncGracefully(); - } - }; - } - - static Route toRoute( - final BlockingStreamingRoute original) { - return toRoute(toStreaming(original)); - } - - static Route toRoute( - final BlockingRoute original) { - return toRoute(toStreaming(original)); - } - - static StreamingRoute toStreaming( - final Route original) { - requireNonNull(original); - return new StreamingRoute() { - @Override - public Publisher handle(final GrpcServiceContext ctx, final Publisher request) { - return request.firstOrError().flatMapPublisher(req -> original.handle(ctx, req).toPublisher()); - } - - @Override - public Completable closeAsync() { - return original.closeAsync(); - } - - @Override - public Completable closeAsyncGracefully() { - return original.closeAsyncGracefully(); - } - }; - } - - static StreamingRoute toStreaming( - final BlockingStreamingRoute original) { - requireNonNull(original); - return new StreamingRoute() { - private final AsyncCloseable closeable = toAsyncCloseable(original); - @Override - public Publisher handle(final GrpcServiceContext ctx, final Publisher request) { - return new Publisher() { - @Override - protected void handleSubscribe(final Subscriber subscriber) { - final ConnectablePayloadWriter connectablePayloadWriter = - new ConnectablePayloadWriter<>(); - final Publisher pub = connectablePayloadWriter.connect(); - final CompletableSource.Processor exceptionProcessor = newCompletableProcessor(); - toSource(fromSource(exceptionProcessor).merge(pub)).subscribe(subscriber); - final GrpcPayloadWriter grpcPayloadWriter = new GrpcPayloadWriter() { - @Override - public void write(final Resp resp) throws IOException { - connectablePayloadWriter.write(resp); - } - - @Override - public void close() throws IOException { - connectablePayloadWriter.close(); - } - - @Override - public void close(final Throwable cause) throws IOException { - connectablePayloadWriter.close(cause); - } - - @Override - public void flush() throws IOException { - connectablePayloadWriter.flush(); - } - }; - try { - original.handle(ctx, request.toIterable(), grpcPayloadWriter); - - // The user code has returned successfully, complete the processor so the response stream - // can complete. If the user handles the request asynchronously (e.g. on another thread) - // they are responsible for closing the payloadWriter. - exceptionProcessor.onComplete(); - } catch (Throwable t) { - try { - exceptionProcessor.onError(t); - } finally { - safeClose(grpcPayloadWriter, t); - } - } - } - }; - } - - @Override - public Completable closeAsync() { - return closeable.closeAsync(); - } - - @Override - public Completable closeAsyncGracefully() { - return closeable.closeAsyncGracefully(); - } - }; - } - - static StreamingRoute toStreaming( - final BlockingRoute original) { - requireNonNull(original); - return new StreamingRoute() { - private final AsyncCloseable closeable = toAsyncCloseable(original); - @Override - public Publisher handle(final GrpcServiceContext ctx, final Publisher request) { - return request.firstOrError().map(req -> { - try { - return original.handle(ctx, req); - } catch (Exception e) { - return throwException(e); - } - }).toPublisher(); - } - - @Override - public Completable closeAsync() { - return closeable.closeAsync(); - } - - @Override - public Completable closeAsyncGracefully() { - return closeable.closeAsyncGracefully(); - } - }; - } - - static RequestStreamingRoute - toRequestStreamingRoute(final Route original) { - requireNonNull(original); - return new RequestStreamingRoute() { - @Override - public Single handle(final GrpcServiceContext ctx, final Publisher request) { - return request.firstOrError().flatMap(req -> original.handle(ctx, req)); - } - - @Override - public Completable closeAsync() { - return original.closeAsync(); - } - - @Override - public Completable closeAsyncGracefully() { - return original.closeAsyncGracefully(); - } - }; - } - - static RequestStreamingRoute - toRequestStreamingRoute(final StreamingRoute original) { - requireNonNull(original); - return new RequestStreamingRoute() { - @Override - public Single handle(final GrpcServiceContext ctx, final Publisher request) { - return original.handle(ctx, request).firstOrError(); - } - - @Override - public Completable closeAsync() { - return original.closeAsync(); - } - - @Override - public Completable closeAsyncGracefully() { - return original.closeAsyncGracefully(); - } - }; - } - - static RequestStreamingRoute - toRequestStreamingRoute(final BlockingStreamingRoute original) { - return toRequestStreamingRoute(toStreaming(original)); - } - - static RequestStreamingRoute - toRequestStreamingRoute(final BlockingRoute original) { - return toRequestStreamingRoute(toStreaming(original)); - } - - static ResponseStreamingRoute - toResponseStreamingRoute(final Route original) { - requireNonNull(original); - return new ResponseStreamingRoute() { - @Override - public Publisher handle(final GrpcServiceContext ctx, final Req request) { - return original.handle(ctx, request).toPublisher(); - } - - @Override - public Completable closeAsync() { - return original.closeAsync(); - } - - @Override - public Completable closeAsyncGracefully() { - return original.closeAsyncGracefully(); - } - }; - } - - static ResponseStreamingRoute - toResponseStreamingRoute(final StreamingRoute original) { - requireNonNull(original); - return new ResponseStreamingRoute() { - @Override - public Publisher handle(final GrpcServiceContext ctx, final Req request) { - return original.handle(ctx, from(request)); - } - - @Override - public Completable closeAsync() { - return original.closeAsync(); - } - - @Override - public Completable closeAsyncGracefully() { - return original.closeAsyncGracefully(); - } - }; - } - - static ResponseStreamingRoute - toResponseStreamingRoute(final BlockingStreamingRoute original) { - return toResponseStreamingRoute(toStreaming(original)); - } - - static ResponseStreamingRoute - toResponseStreamingRoute(final BlockingRoute original) { - return toResponseStreamingRoute(toStreaming(original)); - } - - static AsyncCloseable toAsyncCloseable(final GracefulAutoCloseable original) { - return AsyncCloseables.toAsyncCloseable(graceful -> new Completable() { - @Override - protected void handleSubscribe(final CompletableSource.Subscriber subscriber) { - try { - subscriber.onSubscribe(IGNORE_CANCEL); - } catch (Throwable cause) { - handleExceptionFromOnSubscribe(subscriber, cause); - return; - } - - try { - if (graceful) { - original.closeGracefully(); - } else { - original.close(); - } - } catch (Throwable t) { - subscriber.onError(t); - return; - } - subscriber.onComplete(); - } - }); - } -} diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java index 40bb3e9796..34bd38b25c 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java @@ -18,6 +18,7 @@ import io.servicetalk.buffer.api.Buffer; import io.servicetalk.concurrent.BlockingIterable; import io.servicetalk.concurrent.BlockingIterator; +import io.servicetalk.concurrent.CompletableSource; import io.servicetalk.concurrent.GracefulAutoCloseable; import io.servicetalk.concurrent.api.AsyncCloseable; import io.servicetalk.concurrent.api.AsyncCloseables; @@ -69,16 +70,12 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.function.Function; -import java.util.function.Supplier; import javax.annotation.Nullable; +import static io.servicetalk.concurrent.Cancellable.IGNORE_CANCEL; import static io.servicetalk.concurrent.api.Single.succeeded; +import static io.servicetalk.concurrent.internal.SubscriberUtils.handleExceptionFromOnSubscribe; import static io.servicetalk.grpc.api.GrpcHeaderValues.APPLICATION_GRPC; -import static io.servicetalk.grpc.api.GrpcRouteConversions.toAsyncCloseable; -import static io.servicetalk.grpc.api.GrpcRouteConversions.toRequestStreamingRoute; -import static io.servicetalk.grpc.api.GrpcRouteConversions.toResponseStreamingRoute; -import static io.servicetalk.grpc.api.GrpcRouteConversions.toRoute; -import static io.servicetalk.grpc.api.GrpcRouteConversions.toStreaming; import static io.servicetalk.grpc.api.GrpcStatus.fromCodeValue; import static io.servicetalk.grpc.api.GrpcStatusCode.INVALID_ARGUMENT; import static io.servicetalk.grpc.api.GrpcStatusCode.UNIMPLEMENTED; @@ -297,19 +294,6 @@ static final class Builder { this.executionStrategies = executionStrategies; } - RouteProviders drainRoutes() { - final Map allRoutes = new HashMap<>(); - allRoutes.putAll(routes); - allRoutes.putAll(streamingRoutes); - allRoutes.putAll(blockingRoutes); - allRoutes.putAll(blockingStreamingRoutes); - routes.clear(); - streamingRoutes.clear(); - blockingRoutes.clear(); - blockingStreamingRoutes.clear(); - return new RouteProviders(allRoutes); - } - GrpcExecutionStrategy executionStrategyFor(final String path, final GrpcExecutionStrategy defaultValue) { return executionStrategies.getOrDefault(path, defaultValue); } @@ -399,9 +383,7 @@ public Completable closeAsync() { public Completable closeAsyncGracefully() { return route.closeAsyncGracefully(); } - }, executionStrategy == null ? defaultStrategy() : executionStrategy), - () -> toStreaming(route), () -> toRequestStreamingRoute(route), - () -> toResponseStreamingRoute(route), () -> route, route)), + }, executionStrategy == null ? defaultStrategy() : executionStrategy), route)), // We only assume duplication across blocking and async variant of the same API and not between // aggregated and streaming. Therefore, verify that there is no blocking-aggregated route registered // for the same path: @@ -476,8 +458,7 @@ public HttpExecutionStrategy serviceInvocationStrategy() { return executionStrategy == null ? defaultStrategy() : executionStrategy; } }; - }, () -> route, () -> toRequestStreamingRoute(route), () -> toResponseStreamingRoute(route), - () -> toRoute(route), route)), + }, route)), // We only assume duplication across blocking and async variant of the same API and not between // aggregated and streaming. Therefore, verify that there is no blocking-streaming route registered // for the same path: @@ -605,9 +586,7 @@ public void close() throws Exception { public void closeGracefully() throws Exception { route.closeGracefully(); } - }, executionStrategy == null ? defaultStrategy() : executionStrategy), - () -> toStreaming(route), () -> toRequestStreamingRoute(route), - () -> toResponseStreamingRoute(route), () -> toRoute(route), route)), + }, executionStrategy == null ? defaultStrategy() : executionStrategy), route)), // We only assume duplication across blocking and async variant of the same API and not between // aggregated and streaming. Therefore, verify that there is no async-aggregated route registered // for the same path: @@ -675,9 +654,7 @@ public void close() throws Exception { public void closeGracefully() throws Exception { route.closeGracefully(); } - }, executionStrategy == null ? defaultStrategy() : executionStrategy), - () -> toStreaming(route), () -> toRequestStreamingRoute(route), - () -> toResponseStreamingRoute(route), () -> toRoute(route), route)), + }, executionStrategy == null ? defaultStrategy() : executionStrategy), route)), // We only assume duplication across blocking and async variant of the same API and not between // aggregated and streaming. Therefore, verify that there is no async-streaming route registered // for the same path: @@ -843,106 +820,26 @@ public void flush() throws IOException { } } - static final class RouteProviders implements AsyncCloseable { - - private final Map routes; - private final CompositeCloseable closeable; - - RouteProviders(final Map routes) { - this.routes = routes; - closeable = AsyncCloseables.newCompositeCloseable(); - for (RouteProvider provider : routes.values()) { - closeable.append(provider); - } - } - - RouteProvider routeProvider(final String path) { - final RouteProvider routeProvider = routes.get(path); - if (routeProvider == null) { - throw new IllegalArgumentException("No routes registered for path: " + path); - } - return routeProvider; - } - - @Override - public Completable closeAsync() { - return closeable.closeAsync(); - } - - @Override - public Completable closeAsyncGracefully() { - return closeable.closeAsyncGracefully(); - } - } - - static final class RouteProvider implements AsyncCloseable { + private static final class RouteProvider implements AsyncCloseable { private final Function routeProvider; - private final Supplier> toStreamingConverter; - private final Supplier> toRequestStreamingRouteConverter; - private final Supplier> toResponseStreamingRouteConverter; - private final Supplier> toRouteConverter; private final AsyncCloseable closeable; RouteProvider(final Function routeProvider, - final Supplier> toStreamingConverter, - final Supplier> toRequestStreamingRouteConverter, - final Supplier> toResponseStreamingRouteConverter, - final Supplier> toRouteConverter, final AsyncCloseable closeable) { this.routeProvider = routeProvider; - this.toStreamingConverter = toStreamingConverter; - this.toRequestStreamingRouteConverter = toRequestStreamingRouteConverter; - this.toResponseStreamingRouteConverter = toResponseStreamingRouteConverter; - this.toRouteConverter = toRouteConverter; this.closeable = closeable; } RouteProvider(final Function routeProvider, - final Supplier> toStreamingConverter, - final Supplier> toRequestStreamingRouteConverter, - final Supplier> toResponseStreamingRouteConverter, - final Supplier> toRouteConverter, final GracefulAutoCloseable closeable) { - this(routeProvider, toStreamingConverter, toRequestStreamingRouteConverter, - toResponseStreamingRouteConverter, toRouteConverter, toAsyncCloseable(closeable)); + this(routeProvider, toAsyncCloseable(closeable)); } ServiceAdapterHolder buildRoute(GrpcExecutionContext executionContext) { return routeProvider.apply(executionContext); } - RequestStreamingRoute asRequestStreamingRoute() { - // We assume that generated code passes the correct types here. - @SuppressWarnings("unchecked") - RequestStreamingRoute toReturn = - (RequestStreamingRoute) toRequestStreamingRouteConverter.get(); - return toReturn; - } - - ResponseStreamingRoute - asResponseStreamingRoute() { - // We assume that generated code passes the correct types here. - @SuppressWarnings("unchecked") - ResponseStreamingRoute toReturn = - (ResponseStreamingRoute) toResponseStreamingRouteConverter.get(); - return toReturn; - } - - StreamingRoute asStreamingRoute() { - // We assume that generated code passes the correct types here. - @SuppressWarnings("unchecked") - StreamingRoute toReturn = (StreamingRoute) toStreamingConverter.get(); - return toReturn; - } - - Route asRoute() { - // We assume that generated code passes the correct types here. - @SuppressWarnings("unchecked") - Route toReturn = (Route) toRouteConverter.get(); - return toReturn; - } - @Override public Completable closeAsync() { return closeable.closeAsync(); @@ -952,5 +849,31 @@ public Completable closeAsync() { public Completable closeAsyncGracefully() { return closeable.closeAsyncGracefully(); } + + private static AsyncCloseable toAsyncCloseable(final GracefulAutoCloseable original) { + return AsyncCloseables.toAsyncCloseable(graceful -> new Completable() { + @Override + protected void handleSubscribe(final CompletableSource.Subscriber subscriber) { + try { + subscriber.onSubscribe(IGNORE_CANCEL); + } catch (Throwable cause) { + handleExceptionFromOnSubscribe(subscriber, cause); + return; + } + + try { + if (graceful) { + original.closeGracefully(); + } else { + original.close(); + } + } catch (Throwable t) { + subscriber.onError(t); + return; + } + subscriber.onComplete(); + } + }); + } } } diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRoutes.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRoutes.java index 978e4afd80..11f8d37318 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRoutes.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRoutes.java @@ -23,7 +23,6 @@ import io.servicetalk.concurrent.api.Single; import io.servicetalk.encoding.api.BufferDecoderGroup; import io.servicetalk.encoding.api.BufferEncoder; -import io.servicetalk.grpc.api.GrpcRouter.RouteProviders; import io.servicetalk.grpc.api.GrpcServiceFactory.ServerBinder; import io.servicetalk.grpc.api.GrpcUtils.DefaultMethodDescriptor; import io.servicetalk.http.api.HttpExecutionStrategies; @@ -120,50 +119,6 @@ final Single bind(final ServerBinder binder, final GrpcExecut */ protected abstract Service newServiceFromRoutes(AllGrpcRoutes routes); - /** - * Returns a {@link AllGrpcRoutes} representing this {@link GrpcRoutes}. Any route registered that is not a - * {@link StreamingRoute} will be converted to a {@link StreamingRoute}. - * @return {@link AllGrpcRoutes} representing this {@link GrpcRoutes}. - */ - AllGrpcRoutes drainToStreamingRoutes() { - final RouteProviders routeProviders = routeBuilder.drainRoutes(); - return new AllGrpcRoutes() { - @Override - public StreamingRoute streamingRouteFor( - final String path) throws IllegalArgumentException { - return routeProviders.routeProvider(path).asStreamingRoute(); - } - - @Override - public Route routeFor(final String path) - throws IllegalArgumentException { - return routeProviders.routeProvider(path).asRoute(); - } - - @Override - public RequestStreamingRoute - requestStreamingRouteFor(final String path) throws IllegalArgumentException { - return routeProviders.routeProvider(path).asRequestStreamingRoute(); - } - - @Override - public ResponseStreamingRoute - responseStreamingRouteFor(final String path) throws IllegalArgumentException { - return routeProviders.routeProvider(path).asResponseStreamingRoute(); - } - - @Override - public Completable closeAsync() { - return routeProviders.closeAsync(); - } - - @Override - public Completable closeAsyncGracefully() { - return routeProviders.closeAsyncGracefully(); - } - }; - } - static GrpcRoutes merge(GrpcRoutes... allRoutes) { final GrpcRouter.Builder[] builders = new GrpcRouter.Builder[allRoutes.length]; final Set errors = new TreeSet<>(); diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcUtils.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcUtils.java index 8644e1e19e..5d10cc4974 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcUtils.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcUtils.java @@ -180,16 +180,6 @@ static StreamingHttpResponse newResponse(final StreamingHttpResponseFactory .transform(new GrpcStatusUpdater(allocator, STATUS_OK)); } - static StreamingHttpResponse newResponse(final StreamingHttpResponseFactory responseFactory, - final CharSequence contentType, - @Nullable final CharSequence encoding, - @Nullable final CharSequence acceptedEncoding, - final GrpcStatus status, - final BufferAllocator allocator) { - return newStreamingResponse(responseFactory, contentType, encoding, acceptedEncoding) - .transform(new GrpcStatusUpdater(allocator, status)); - } - static HttpResponse newResponse(final HttpResponseFactory responseFactory, final CharSequence contentType, @Nullable final CharSequence encoding, From 2b62cc74798c7923c387221d5c36d51747e6712d Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Fri, 29 Jul 2022 08:32:56 -0700 Subject: [PATCH 2/2] comments --- .../io/servicetalk/grpc/api/GrpcRouter.java | 47 +++++++++---------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java index 34bd38b25c..1686c4fff2 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java @@ -69,7 +69,6 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.function.Function; import javax.annotation.Nullable; import static io.servicetalk.concurrent.Cancellable.IGNORE_CANCEL; @@ -183,7 +182,7 @@ private static void populateRoutes(final GrpcExecutionContext executionContext, final Map executionStrategies) { for (Map.Entry entry : routes.entrySet()) { final String path = entry.getKey(); - final ServiceAdapterHolder adapterHolder = entry.getValue().buildRoute(executionContext); + final ServiceAdapterHolder adapterHolder = entry.getValue().serviceAdapterHolder(); final StreamingHttpService route = closeable.append(adapterHolder.adaptor()); final GrpcExecutionStrategy routeStrategy = executionStrategies.getOrDefault(path, null); final HttpExecutionStrategy missing = null == routeStrategy ? @@ -338,7 +337,7 @@ void addRoute(MethodDescriptor methodDescriptor, CharSequence responseContentType = grpcContentType(methodDescriptor.responseDescriptor() .serializerDescriptor().contentType()); verifyNoOverrides(routes.put(methodDescriptor.httpPath(), - new RouteProvider(executionContext -> toStreamingHttpService( + new RouteProvider(toStreamingHttpService( new HttpService() { @Override public Single handle(final HttpServiceContext ctx, final HttpRequest request, @@ -408,8 +407,9 @@ void addStreamingRoute( .serializerDescriptor().contentType()); CharSequence responseContentType = grpcContentType(methodDescriptor.responseDescriptor() .serializerDescriptor().contentType()); - verifyNoOverrides(streamingRoutes.put(methodDescriptor.httpPath(), new RouteProvider(executionContext -> { - final StreamingHttpService service = new StreamingHttpService() { + verifyNoOverrides(streamingRoutes.put(methodDescriptor.httpPath(), new RouteProvider( + new ServiceAdapterHolder() { + private final StreamingHttpService service = new StreamingHttpService() { @Override public Single handle( final HttpServiceContext ctx, final StreamingHttpRequest request, @@ -447,17 +447,16 @@ public Completable closeAsyncGracefully() { return route.closeAsyncGracefully(); } }; - return new ServiceAdapterHolder() { - @Override - public StreamingHttpService adaptor() { - return service; - } - @Override - public HttpExecutionStrategy serviceInvocationStrategy() { - return executionStrategy == null ? defaultStrategy() : executionStrategy; - } - }; + @Override + public StreamingHttpService adaptor() { + return service; + } + + @Override + public HttpExecutionStrategy serviceInvocationStrategy() { + return executionStrategy == null ? defaultStrategy() : executionStrategy; + } }, route)), // We only assume duplication across blocking and async variant of the same API and not between // aggregated and streaming. Therefore, verify that there is no blocking-streaming route registered @@ -548,7 +547,7 @@ void addBlockingRoute( .serializerDescriptor().contentType()); CharSequence responseContentType = grpcContentType(methodDescriptor.responseDescriptor() .serializerDescriptor().contentType()); - verifyNoOverrides(blockingRoutes.put(methodDescriptor.httpPath(), new RouteProvider(executionContext -> + verifyNoOverrides(blockingRoutes.put(methodDescriptor.httpPath(), new RouteProvider( toStreamingHttpService(new BlockingHttpService() { @Override public HttpResponse handle(final HttpServiceContext ctx, final HttpRequest request, @@ -612,7 +611,7 @@ void addBlockingStreamingRoute( CharSequence responseContentType = grpcContentType(methodDescriptor.responseDescriptor() .serializerDescriptor().contentType()); verifyNoOverrides(blockingStreamingRoutes.put(methodDescriptor.httpPath(), - new RouteProvider(executionContext -> toStreamingHttpService(new BlockingStreamingHttpService() { + new RouteProvider(toStreamingHttpService(new BlockingStreamingHttpService() { @Override public void handle(final HttpServiceContext ctx, final BlockingStreamingHttpRequest request, final BlockingStreamingHttpServerResponse response) throws Exception { @@ -822,22 +821,22 @@ public void flush() throws IOException { private static final class RouteProvider implements AsyncCloseable { - private final Function routeProvider; + private final ServiceAdapterHolder serviceAdapterHolder; private final AsyncCloseable closeable; - RouteProvider(final Function routeProvider, + RouteProvider(final ServiceAdapterHolder serviceAdapterHolder, final AsyncCloseable closeable) { - this.routeProvider = routeProvider; + this.serviceAdapterHolder = serviceAdapterHolder; this.closeable = closeable; } - RouteProvider(final Function routeProvider, + RouteProvider(final ServiceAdapterHolder serviceAdapterHolder, final GracefulAutoCloseable closeable) { - this(routeProvider, toAsyncCloseable(closeable)); + this(serviceAdapterHolder, toAsyncCloseable(closeable)); } - ServiceAdapterHolder buildRoute(GrpcExecutionContext executionContext) { - return routeProvider.apply(executionContext); + ServiceAdapterHolder serviceAdapterHolder() { + return serviceAdapterHolder; } @Override