From e9397cf6a0505c3bd088bbaec09738ee817ac06e Mon Sep 17 00:00:00 2001 From: Patrik Dudits Date: Mon, 28 Oct 2019 12:27:00 +0100 Subject: [PATCH] Experiment 2 : Reimplement MP REST Client AsyncInvocationInterceptor support Quite straightforward, the only thing that keeps the implementation complex is, that PreInvocationInterceptor doesn't know what kind of invocation is going to happen, therefore it cannot be registered on top level, and the decision to register interceptor needs to be postponed until we know we're about to call async. We could use one more flag in request context to signal that, but still we'd still need to act in MethodModel. --- .../restclient/AsyncInterceptorSupport.java | 142 +++++++++++++++++ .../restclient/ExecutorServiceWrapper.java | 145 ------------------ .../microprofile/restclient/MethodModel.java | 24 +-- .../restclient/RestClientBuilderImpl.java | 9 +- 4 files changed, 153 insertions(+), 167 deletions(-) create mode 100644 ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/AsyncInterceptorSupport.java delete mode 100644 ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/ExecutorServiceWrapper.java diff --git a/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/AsyncInterceptorSupport.java b/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/AsyncInterceptorSupport.java new file mode 100644 index 0000000000..19697a1bd5 --- /dev/null +++ b/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/AsyncInterceptorSupport.java @@ -0,0 +1,142 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. + * + * Copyright (c) [2019] Payara Foundation and/or its affiliates. All rights reserved. + * + * The contents of this file are subject to the terms of either the GNU + * General Public License Version 2 only ("GPL") or the Common Development + * and Distribution License("CDDL") (collectively, the "License"). You + * may not use this file except in compliance with the License. You can + * obtain a copy of the License at + * https://github.com/payara/Payara/blob/master/LICENSE.txt + * See the License for the specific + * language governing permissions and limitations under the License. + * + * When distributing the software, include this License Header Notice in each + * file and include the License file at glassfish/legal/LICENSE.txt. + * + * GPL Classpath Exception: + * The Payara Foundation designates this particular file as subject to the "Classpath" + * exception as provided by the Payara Foundation in the GPL Version 2 section of the License + * file that accompanied this code. + * + * Modifications: + * If applicable, add the following below the License Header, with the fields + * enclosed by brackets [] replaced by your own identifying information: + * "Portions Copyright [year] [name of copyright owner]" + * + * Contributor(s): + * If you wish your version of this file to be governed by only the CDDL or + * only the GPL Version 2, indicate your decision by adding "[Contributor] + * elects to include this software in this distribution under the [CDDL or GPL + * Version 2] license." If you don't indicate a single choice of license, a + * recipient has the option to distribute your version of this file under + * either the CDDL, the GPL Version 2 or to extend the choice of license to + * its licensees as provided above. However, if you add GPL Version 2 code + * and therefore, elected the GPL Version 2 license, then the option applies + * only if the new code is made subject to such option by the copyright + * holder. + */ + +package org.glassfish.jersey.microprofile.restclient; + +import org.eclipse.microprofile.rest.client.ext.AsyncInvocationInterceptor; +import org.eclipse.microprofile.rest.client.ext.AsyncInvocationInterceptorFactory; +import org.glassfish.jersey.client.spi.PostInvocationInterceptor; +import org.glassfish.jersey.client.spi.PreInvocationInterceptor; + +import javax.ws.rs.client.ClientRequestContext; +import javax.ws.rs.client.ClientRequestFilter; +import javax.ws.rs.client.ClientResponseContext; +import javax.ws.rs.core.Configurable; +import java.util.List; + +class AsyncInterceptorSupport { + private final List factories; + private static final String KEY = AsyncInterceptorSupport.class.getName(); + + AsyncInterceptorSupport(List factories) { + this.factories = factories; + } + + private PreInvocationInterceptor preSubmit() { + return this::preSubmit; + } + + private ClientRequestFilter preInvoke() { + return this::preInvoke; + } + + private PostInvocationInterceptor postInvoke() { + return new PostInvoke(); + } + + private void preSubmit(ClientRequestContext requestContext) { + AsyncInvocationInterceptor[] interceptors = factories.stream().map(AsyncInvocationInterceptorFactory::newInterceptor) + .toArray(AsyncInvocationInterceptor[]::new); + requestContext.setProperty(KEY, interceptors); + //applyContext methods need to be called in reverse ordering of priority + for (AsyncInvocationInterceptor interceptor : interceptors) { + interceptor.prepareContext(); + } + } + + private void preInvoke(ClientRequestContext requestContext) { + AsyncInvocationInterceptor[] interceptors = readInterceptors(requestContext); + if (interceptors != null) { + for (int i = interceptors.length - 1; i >= 0; i--) { + AsyncInvocationInterceptor interceptor = interceptors[i]; + interceptor.applyContext(); + } + } + } + + private AsyncInvocationInterceptor[] readInterceptors(ClientRequestContext requestContext) { + Object interceptors = requestContext.getProperty(KEY); + if (interceptors instanceof AsyncInvocationInterceptor[]) { + return (AsyncInvocationInterceptor[]) interceptors; + } + return null; + } + + static void register(List interceptorFactories, + Configurable config) { + if (interceptorFactories != null && !interceptorFactories.isEmpty()) { + AsyncInterceptorSupport support = new AsyncInterceptorSupport(interceptorFactories); + config.register(support.preSubmit()).register(support.preInvoke()).register(support.postInvoke()); + } + } + + private class PostInvoke implements PostInvocationInterceptor { + + @Override + public void afterRequest(ClientRequestContext requestContext, ClientResponseContext responseContext) { + AsyncInvocationInterceptor[] interceptors = readInterceptors(requestContext); + if (interceptors == null) { + return; + } + for (AsyncInvocationInterceptor interceptor : interceptors) { + interceptor.removeContext(); + } + } + + @Override + public void onException(ClientRequestContext requestContext, ExceptionContext exceptionContext) { + AsyncInvocationInterceptor[] interceptors = readInterceptors(requestContext); + if (interceptors == null) { + return; + } + for (AsyncInvocationInterceptor interceptor : interceptors) { + try { + interceptor.removeContext(); + } catch (Throwable t) { + exceptionContext.getThrowables().add(t); + } + } + } + + + } + + +} diff --git a/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/ExecutorServiceWrapper.java b/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/ExecutorServiceWrapper.java deleted file mode 100644 index 479b1fb038..0000000000 --- a/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/ExecutorServiceWrapper.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v. 2.0, which is available at - * http://www.eclipse.org/legal/epl-2.0. - * - * This Source Code may also be made available under the following Secondary - * Licenses when the conditions for such availability set forth in the - * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, - * version 2 with the GNU Classpath Exception, which is available at - * https://www.gnu.org/software/classpath/license.html. - * - * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 - */ - -package org.glassfish.jersey.microprofile.restclient; - -import java.util.Collection; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; - -import org.eclipse.microprofile.rest.client.ext.AsyncInvocationInterceptor; - -/** - * Invokes all {@link AsyncInvocationInterceptor} for every new thread. - * - * @author David Kral - */ -class ExecutorServiceWrapper implements ExecutorService { - - static final ThreadLocal> asyncInterceptors = new ThreadLocal<>(); - - private final ExecutorService wrapped; - - ExecutorServiceWrapper(ExecutorService wrapped) { - this.wrapped = wrapped; - } - - @Override - public void shutdown() { - wrapped.shutdown(); - } - - @Override - public List shutdownNow() { - return wrapped.shutdownNow(); - } - - @Override - public boolean isShutdown() { - return wrapped.isShutdown(); - } - - @Override - public boolean isTerminated() { - return wrapped.isTerminated(); - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - return wrapped.awaitTermination(timeout, unit); - } - - @Override - public Future submit(Callable task) { - return wrapped.submit(wrap(task)); - } - - @Override - public Future submit(Runnable task, T result) { - return wrapped.submit(wrap(task), result); - } - - @Override - public Future submit(Runnable task) { - return wrapped.submit(wrap(task)); - } - - @Override - public List> invokeAll(Collection> tasks) throws InterruptedException { - return wrapped.invokeAll(wrap(tasks)); - } - - @Override - public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException { - return wrapped.invokeAll(wrap(tasks), timeout, unit); - } - - @Override - public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { - return wrapped.invokeAny(wrap(tasks)); - } - - @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - return wrapped.invokeAny(wrap(tasks), timeout, unit); - } - - @Override - public void execute(Runnable command) { - wrapped.execute(wrap(command)); - } - - private static Callable wrap(Callable task) { - List asyncInvocationInterceptors = asyncInterceptors.get(); - asyncInterceptors.remove(); - return () -> { - applyContextOnInterceptors(asyncInvocationInterceptors); - return task.call(); - }; - } - - private static Runnable wrap(Runnable task) { - List asyncInvocationInterceptors = asyncInterceptors.get(); - asyncInterceptors.remove(); - return () -> { - applyContextOnInterceptors(asyncInvocationInterceptors); - task.run(); - }; - } - - private static void applyContextOnInterceptors(List asyncInvocationInterceptors) { - if (asyncInvocationInterceptors != null) { - //applyContext methods need to be called in reverse ordering of priority - for (int i = asyncInvocationInterceptors.size(); i-- > 0; ) { - asyncInvocationInterceptors.get(i).applyContext(); - } - } - } - - private static Collection> wrap(Collection> tasks) { - return tasks.stream() - .map(ExecutorServiceWrapper::wrap) - .collect(Collectors.toList()); - } -} diff --git a/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/MethodModel.java b/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/MethodModel.java index a3c52738cc..af82daabc2 100644 --- a/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/MethodModel.java +++ b/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/MethodModel.java @@ -189,6 +189,11 @@ Object invokeMethod(WebTarget classLevelTarget, Method method, Object[] args) { customHeaders.remove(HttpHeaders.CONTENT_TYPE); } + boolean isAsync = CompletionStage.class.isAssignableFrom(method.getReturnType()); + + if (isAsync) { + AsyncInterceptorSupport.register(interfaceModel.getAsyncInterceptorFactories(), webTarget); + } Invocation.Builder builder = webTarget .request(produces) .property(INVOKED_METHOD, method) @@ -197,7 +202,7 @@ Object invokeMethod(WebTarget classLevelTarget, Method method, Object[] args) { Object response; - if (CompletionStage.class.isAssignableFrom(method.getReturnType())) { + if (isAsync) { response = asynchronousCall(builder, entityToUse, method, customHeaders); } else { response = synchronousCall(builder, entityToUse, method, customHeaders); @@ -245,13 +250,6 @@ private CompletableFuture asynchronousCall(Invocation.Builder builder, Method method, MultivaluedMap customHeaders) { - //AsyncInterceptors initialization - List asyncInterceptors = interfaceModel.getAsyncInterceptorFactories().stream() - .map(AsyncInvocationInterceptorFactory::newInterceptor) - .collect(Collectors.toList()); - asyncInterceptors.forEach(AsyncInvocationInterceptor::prepareContext); - ExecutorServiceWrapper.asyncInterceptors.set(asyncInterceptors); - CompletableFuture result = new CompletableFuture<>(); Future theFuture; if (entity != null @@ -264,7 +262,6 @@ private CompletableFuture asynchronousCall(Invocation.Builder builder, CompletableFuture completableFuture = (CompletableFuture) theFuture; completableFuture.thenAccept(response -> { - asyncInterceptors.forEach(AsyncInvocationInterceptor::removeContext); try { evaluateResponse(response, method); if (returnType.getType().equals(Void.class)) { @@ -278,15 +275,6 @@ private CompletableFuture asynchronousCall(Invocation.Builder builder, result.completeExceptionally(e); } }).exceptionally(throwable -> { - // Since it could have been the removeContext method causing exception, we need to be more careful - // to assure, that the future completes - asyncInterceptors.forEach(interceptor -> { - try { - interceptor.removeContext(); - } catch (Throwable e) { - throwable.addSuppressed(e); - } - }); result.completeExceptionally(throwable); return null; }); diff --git a/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/RestClientBuilderImpl.java b/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/RestClientBuilderImpl.java index e963850edb..a9c2dc2c1e 100644 --- a/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/RestClientBuilderImpl.java +++ b/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/RestClientBuilderImpl.java @@ -86,7 +86,7 @@ class RestClientBuilderImpl implements RestClientBuilder { private final ConfigWrapper configWrapper; private URI uri; private ClientBuilder clientBuilder; - private Supplier executorService; + private ExecutorService executorService; private HostnameVerifier sslHostnameVerifier; private SSLContext sslContext; private KeyStore sslTrustStore; @@ -100,7 +100,6 @@ class RestClientBuilderImpl implements RestClientBuilder { asyncInterceptorFactories = new ArrayList<>(); config = ConfigProvider.getConfig(); configWrapper = new ConfigWrapper(clientBuilder.getConfiguration()); - executorService = Executors::newCachedThreadPool; } @Override @@ -130,7 +129,7 @@ public RestClientBuilder executorService(ExecutorService executor) { if (executor == null) { throw new IllegalArgumentException("ExecutorService cannot be null."); } - executorService = () -> executor; + executorService = executor; return this; } @@ -156,7 +155,9 @@ public T build(Class interfaceClass) throws IllegalStateException, RestCl //sort all AsyncInvocationInterceptorFactory by priority asyncInterceptorFactories.sort(Comparator.comparingInt(AsyncInvocationInterceptorFactoryPriorityWrapper::getPriority)); - clientBuilder.executorService(new ExecutorServiceWrapper(executorService.get())); + if (executorService != null) { + clientBuilder.executorService(executorService); + } if (null != sslContext) { clientBuilder.sslContext(sslContext);