diff --git a/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/AsyncInterceptorSupport.java b/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/AsyncInterceptorSupport.java new file mode 100644 index 0000000000..19697a1bd5 --- /dev/null +++ b/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/AsyncInterceptorSupport.java @@ -0,0 +1,142 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. + * + * Copyright (c) [2019] Payara Foundation and/or its affiliates. All rights reserved. + * + * The contents of this file are subject to the terms of either the GNU + * General Public License Version 2 only ("GPL") or the Common Development + * and Distribution License("CDDL") (collectively, the "License"). You + * may not use this file except in compliance with the License. You can + * obtain a copy of the License at + * https://github.com/payara/Payara/blob/master/LICENSE.txt + * See the License for the specific + * language governing permissions and limitations under the License. + * + * When distributing the software, include this License Header Notice in each + * file and include the License file at glassfish/legal/LICENSE.txt. + * + * GPL Classpath Exception: + * The Payara Foundation designates this particular file as subject to the "Classpath" + * exception as provided by the Payara Foundation in the GPL Version 2 section of the License + * file that accompanied this code. + * + * Modifications: + * If applicable, add the following below the License Header, with the fields + * enclosed by brackets [] replaced by your own identifying information: + * "Portions Copyright [year] [name of copyright owner]" + * + * Contributor(s): + * If you wish your version of this file to be governed by only the CDDL or + * only the GPL Version 2, indicate your decision by adding "[Contributor] + * elects to include this software in this distribution under the [CDDL or GPL + * Version 2] license." If you don't indicate a single choice of license, a + * recipient has the option to distribute your version of this file under + * either the CDDL, the GPL Version 2 or to extend the choice of license to + * its licensees as provided above. However, if you add GPL Version 2 code + * and therefore, elected the GPL Version 2 license, then the option applies + * only if the new code is made subject to such option by the copyright + * holder. + */ + +package org.glassfish.jersey.microprofile.restclient; + +import org.eclipse.microprofile.rest.client.ext.AsyncInvocationInterceptor; +import org.eclipse.microprofile.rest.client.ext.AsyncInvocationInterceptorFactory; +import org.glassfish.jersey.client.spi.PostInvocationInterceptor; +import org.glassfish.jersey.client.spi.PreInvocationInterceptor; + +import javax.ws.rs.client.ClientRequestContext; +import javax.ws.rs.client.ClientRequestFilter; +import javax.ws.rs.client.ClientResponseContext; +import javax.ws.rs.core.Configurable; +import java.util.List; + +class AsyncInterceptorSupport { + private final List factories; + private static final String KEY = AsyncInterceptorSupport.class.getName(); + + AsyncInterceptorSupport(List factories) { + this.factories = factories; + } + + private PreInvocationInterceptor preSubmit() { + return this::preSubmit; + } + + private ClientRequestFilter preInvoke() { + return this::preInvoke; + } + + private PostInvocationInterceptor postInvoke() { + return new PostInvoke(); + } + + private void preSubmit(ClientRequestContext requestContext) { + AsyncInvocationInterceptor[] interceptors = factories.stream().map(AsyncInvocationInterceptorFactory::newInterceptor) + .toArray(AsyncInvocationInterceptor[]::new); + requestContext.setProperty(KEY, interceptors); + //applyContext methods need to be called in reverse ordering of priority + for (AsyncInvocationInterceptor interceptor : interceptors) { + interceptor.prepareContext(); + } + } + + private void preInvoke(ClientRequestContext requestContext) { + AsyncInvocationInterceptor[] interceptors = readInterceptors(requestContext); + if (interceptors != null) { + for (int i = interceptors.length - 1; i >= 0; i--) { + AsyncInvocationInterceptor interceptor = interceptors[i]; + interceptor.applyContext(); + } + } + } + + private AsyncInvocationInterceptor[] readInterceptors(ClientRequestContext requestContext) { + Object interceptors = requestContext.getProperty(KEY); + if (interceptors instanceof AsyncInvocationInterceptor[]) { + return (AsyncInvocationInterceptor[]) interceptors; + } + return null; + } + + static void register(List interceptorFactories, + Configurable config) { + if (interceptorFactories != null && !interceptorFactories.isEmpty()) { + AsyncInterceptorSupport support = new AsyncInterceptorSupport(interceptorFactories); + config.register(support.preSubmit()).register(support.preInvoke()).register(support.postInvoke()); + } + } + + private class PostInvoke implements PostInvocationInterceptor { + + @Override + public void afterRequest(ClientRequestContext requestContext, ClientResponseContext responseContext) { + AsyncInvocationInterceptor[] interceptors = readInterceptors(requestContext); + if (interceptors == null) { + return; + } + for (AsyncInvocationInterceptor interceptor : interceptors) { + interceptor.removeContext(); + } + } + + @Override + public void onException(ClientRequestContext requestContext, ExceptionContext exceptionContext) { + AsyncInvocationInterceptor[] interceptors = readInterceptors(requestContext); + if (interceptors == null) { + return; + } + for (AsyncInvocationInterceptor interceptor : interceptors) { + try { + interceptor.removeContext(); + } catch (Throwable t) { + exceptionContext.getThrowables().add(t); + } + } + } + + + } + + +} diff --git a/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/ExecutorServiceWrapper.java b/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/ExecutorServiceWrapper.java deleted file mode 100644 index 479b1fb038..0000000000 --- a/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/ExecutorServiceWrapper.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v. 2.0, which is available at - * http://www.eclipse.org/legal/epl-2.0. - * - * This Source Code may also be made available under the following Secondary - * Licenses when the conditions for such availability set forth in the - * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, - * version 2 with the GNU Classpath Exception, which is available at - * https://www.gnu.org/software/classpath/license.html. - * - * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 - */ - -package org.glassfish.jersey.microprofile.restclient; - -import java.util.Collection; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; - -import org.eclipse.microprofile.rest.client.ext.AsyncInvocationInterceptor; - -/** - * Invokes all {@link AsyncInvocationInterceptor} for every new thread. - * - * @author David Kral - */ -class ExecutorServiceWrapper implements ExecutorService { - - static final ThreadLocal> asyncInterceptors = new ThreadLocal<>(); - - private final ExecutorService wrapped; - - ExecutorServiceWrapper(ExecutorService wrapped) { - this.wrapped = wrapped; - } - - @Override - public void shutdown() { - wrapped.shutdown(); - } - - @Override - public List shutdownNow() { - return wrapped.shutdownNow(); - } - - @Override - public boolean isShutdown() { - return wrapped.isShutdown(); - } - - @Override - public boolean isTerminated() { - return wrapped.isTerminated(); - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - return wrapped.awaitTermination(timeout, unit); - } - - @Override - public Future submit(Callable task) { - return wrapped.submit(wrap(task)); - } - - @Override - public Future submit(Runnable task, T result) { - return wrapped.submit(wrap(task), result); - } - - @Override - public Future submit(Runnable task) { - return wrapped.submit(wrap(task)); - } - - @Override - public List> invokeAll(Collection> tasks) throws InterruptedException { - return wrapped.invokeAll(wrap(tasks)); - } - - @Override - public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException { - return wrapped.invokeAll(wrap(tasks), timeout, unit); - } - - @Override - public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { - return wrapped.invokeAny(wrap(tasks)); - } - - @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - return wrapped.invokeAny(wrap(tasks), timeout, unit); - } - - @Override - public void execute(Runnable command) { - wrapped.execute(wrap(command)); - } - - private static Callable wrap(Callable task) { - List asyncInvocationInterceptors = asyncInterceptors.get(); - asyncInterceptors.remove(); - return () -> { - applyContextOnInterceptors(asyncInvocationInterceptors); - return task.call(); - }; - } - - private static Runnable wrap(Runnable task) { - List asyncInvocationInterceptors = asyncInterceptors.get(); - asyncInterceptors.remove(); - return () -> { - applyContextOnInterceptors(asyncInvocationInterceptors); - task.run(); - }; - } - - private static void applyContextOnInterceptors(List asyncInvocationInterceptors) { - if (asyncInvocationInterceptors != null) { - //applyContext methods need to be called in reverse ordering of priority - for (int i = asyncInvocationInterceptors.size(); i-- > 0; ) { - asyncInvocationInterceptors.get(i).applyContext(); - } - } - } - - private static Collection> wrap(Collection> tasks) { - return tasks.stream() - .map(ExecutorServiceWrapper::wrap) - .collect(Collectors.toList()); - } -} diff --git a/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/MethodModel.java b/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/MethodModel.java index a3c52738cc..af82daabc2 100644 --- a/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/MethodModel.java +++ b/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/MethodModel.java @@ -189,6 +189,11 @@ Object invokeMethod(WebTarget classLevelTarget, Method method, Object[] args) { customHeaders.remove(HttpHeaders.CONTENT_TYPE); } + boolean isAsync = CompletionStage.class.isAssignableFrom(method.getReturnType()); + + if (isAsync) { + AsyncInterceptorSupport.register(interfaceModel.getAsyncInterceptorFactories(), webTarget); + } Invocation.Builder builder = webTarget .request(produces) .property(INVOKED_METHOD, method) @@ -197,7 +202,7 @@ Object invokeMethod(WebTarget classLevelTarget, Method method, Object[] args) { Object response; - if (CompletionStage.class.isAssignableFrom(method.getReturnType())) { + if (isAsync) { response = asynchronousCall(builder, entityToUse, method, customHeaders); } else { response = synchronousCall(builder, entityToUse, method, customHeaders); @@ -245,13 +250,6 @@ private CompletableFuture asynchronousCall(Invocation.Builder builder, Method method, MultivaluedMap customHeaders) { - //AsyncInterceptors initialization - List asyncInterceptors = interfaceModel.getAsyncInterceptorFactories().stream() - .map(AsyncInvocationInterceptorFactory::newInterceptor) - .collect(Collectors.toList()); - asyncInterceptors.forEach(AsyncInvocationInterceptor::prepareContext); - ExecutorServiceWrapper.asyncInterceptors.set(asyncInterceptors); - CompletableFuture result = new CompletableFuture<>(); Future theFuture; if (entity != null @@ -264,7 +262,6 @@ private CompletableFuture asynchronousCall(Invocation.Builder builder, CompletableFuture completableFuture = (CompletableFuture) theFuture; completableFuture.thenAccept(response -> { - asyncInterceptors.forEach(AsyncInvocationInterceptor::removeContext); try { evaluateResponse(response, method); if (returnType.getType().equals(Void.class)) { @@ -278,15 +275,6 @@ private CompletableFuture asynchronousCall(Invocation.Builder builder, result.completeExceptionally(e); } }).exceptionally(throwable -> { - // Since it could have been the removeContext method causing exception, we need to be more careful - // to assure, that the future completes - asyncInterceptors.forEach(interceptor -> { - try { - interceptor.removeContext(); - } catch (Throwable e) { - throwable.addSuppressed(e); - } - }); result.completeExceptionally(throwable); return null; }); diff --git a/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/RestClientBuilderImpl.java b/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/RestClientBuilderImpl.java index e963850edb..a9c2dc2c1e 100644 --- a/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/RestClientBuilderImpl.java +++ b/ext/microprofile/mp-rest-client/src/main/java/org/glassfish/jersey/microprofile/restclient/RestClientBuilderImpl.java @@ -86,7 +86,7 @@ class RestClientBuilderImpl implements RestClientBuilder { private final ConfigWrapper configWrapper; private URI uri; private ClientBuilder clientBuilder; - private Supplier executorService; + private ExecutorService executorService; private HostnameVerifier sslHostnameVerifier; private SSLContext sslContext; private KeyStore sslTrustStore; @@ -100,7 +100,6 @@ class RestClientBuilderImpl implements RestClientBuilder { asyncInterceptorFactories = new ArrayList<>(); config = ConfigProvider.getConfig(); configWrapper = new ConfigWrapper(clientBuilder.getConfiguration()); - executorService = Executors::newCachedThreadPool; } @Override @@ -130,7 +129,7 @@ public RestClientBuilder executorService(ExecutorService executor) { if (executor == null) { throw new IllegalArgumentException("ExecutorService cannot be null."); } - executorService = () -> executor; + executorService = executor; return this; } @@ -156,7 +155,9 @@ public T build(Class interfaceClass) throws IllegalStateException, RestCl //sort all AsyncInvocationInterceptorFactory by priority asyncInterceptorFactories.sort(Comparator.comparingInt(AsyncInvocationInterceptorFactoryPriorityWrapper::getPriority)); - clientBuilder.executorService(new ExecutorServiceWrapper(executorService.get())); + if (executorService != null) { + clientBuilder.executorService(executorService); + } if (null != sslContext) { clientBuilder.sslContext(sslContext);