From 14a63dbf76cce125194d05820246aae3f61501b1 Mon Sep 17 00:00:00 2001 From: martin-tarjanyi Date: Sat, 14 Mar 2020 23:27:01 +0100 Subject: [PATCH 01/12] Integrate Apache http client with WebClient --- build.gradle | 2 + spring-web/spring-web.gradle | 2 + .../HttpComponentsAsyncClientHttpRequest.java | 3 +- ...mponentsAsyncClientHttpRequestFactory.java | 3 +- ...HttpComponentsAsyncClientHttpResponse.java | 3 +- .../reactive/ApacheClientHttpConnector.java | 104 ++++++++++++++ .../reactive/ApacheClientHttpRequest.java | 130 ++++++++++++++++++ .../reactive/ApacheClientHttpResponse.java | 111 +++++++++++++++ .../reactive/MonoFutureCallbackAdapter.java | 50 +++++++ spring-webflux/spring-webflux.gradle | 2 + .../client/DefaultWebClientBuilder.java | 9 ++ .../client/WebClientIntegrationTests.java | 56 +++++++- .../annotation/SseIntegrationTests.java | 7 +- src/docs/asciidoc/web/webflux-webclient.adoc | 27 ++++ src/docs/asciidoc/web/webflux.adoc | 1 + 15 files changed, 500 insertions(+), 10 deletions(-) create mode 100644 spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpConnector.java create mode 100644 spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpRequest.java create mode 100644 spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpResponse.java create mode 100644 spring-web/src/main/java/org/springframework/http/client/reactive/MonoFutureCallbackAdapter.java diff --git a/build.gradle b/build.gradle index ab7cabca47ef..127253a6f67e 100644 --- a/build.gradle +++ b/build.gradle @@ -155,6 +155,8 @@ configure(allprojects) { project -> exclude group: "commons-logging", name: "commons-logging" } dependency "org.eclipse.jetty:jetty-reactive-httpclient:1.1.2" + dependency 'org.apache.httpcomponents.client5:httpclient5:5.0' + dependency 'org.apache.httpcomponents.core5:httpcore5-reactive:5.0' dependency "org.jruby:jruby:9.2.11.0" dependency "org.python:jython-standalone:2.7.1" diff --git a/spring-web/spring-web.gradle b/spring-web/spring-web.gradle index 60c40ed4eeec..c16ccc80b7d5 100644 --- a/spring-web/spring-web.gradle +++ b/spring-web/spring-web.gradle @@ -35,6 +35,8 @@ dependencies { exclude group: "javax.servlet", module: "javax.servlet-api" } optional("org.eclipse.jetty:jetty-reactive-httpclient") + optional('org.apache.httpcomponents.client5:httpclient5:5.0') + optional('org.apache.httpcomponents.core5:httpcore5-reactive:5.0') optional("com.squareup.okhttp3:okhttp") optional("org.apache.httpcomponents:httpclient") optional("org.apache.httpcomponents:httpasyncclient") diff --git a/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpRequest.java index d1166051c759..2d80ff9e9d57 100644 --- a/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpRequest.java @@ -48,7 +48,8 @@ * @author Arjen Poutsma * @since 4.0 * @see HttpComponentsClientHttpRequestFactory#createRequest - * @deprecated as of Spring 5.0, with no direct replacement + * @deprecated as of Spring 5.0, in favor of + * {@link org.springframework.http.client.reactive.ApacheClientHttpConnector} */ @Deprecated final class HttpComponentsAsyncClientHttpRequest extends AbstractBufferingAsyncClientHttpRequest { diff --git a/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpRequestFactory.java b/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpRequestFactory.java index 85cf0bab0d3e..d47dfffad400 100644 --- a/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpRequestFactory.java +++ b/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpRequestFactory.java @@ -44,7 +44,8 @@ * @author Stephane Nicoll * @since 4.0 * @see HttpAsyncClient - * @deprecated as of Spring 5.0, with no direct replacement + * @deprecated as of Spring 5.0, in favor of + * {@link org.springframework.http.client.reactive.ApacheClientHttpConnector} */ @Deprecated public class HttpComponentsAsyncClientHttpRequestFactory extends HttpComponentsClientHttpRequestFactory diff --git a/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpResponse.java index cba3a722c43f..61be13ae082e 100644 --- a/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpResponse.java @@ -37,7 +37,8 @@ * @author Arjen Poutsma * @since 4.0 * @see HttpComponentsAsyncClientHttpRequest#executeAsync() - * @deprecated as of Spring 5.0, with no direct replacement + * @deprecated as of Spring 5.0, in favor of + * {@link org.springframework.http.client.reactive.ApacheClientHttpConnector} */ @Deprecated final class HttpComponentsAsyncClientHttpResponse extends AbstractClientHttpResponse { diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpConnector.java new file mode 100644 index 000000000000..7cbbc5e8ecf6 --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpConnector.java @@ -0,0 +1,104 @@ +/* + * Copyright 2002-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.client.reactive; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.function.Function; + +import org.apache.hc.client5.http.cookie.BasicCookieStore; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.nio.support.BasicRequestProducer; +import org.apache.hc.core5.reactive.ReactiveEntityProducer; +import org.apache.hc.core5.reactive.ReactiveResponseConsumer; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.http.HttpMethod; +import org.springframework.lang.Nullable; + +/** + * {@link ClientHttpConnector} implementation for the Apache HttpComponents HttpClient 5.x. + * + * @author Martin Tarjányi + * @since 5.3 + * @see Apache HttpComponents + */ +public class ApacheClientHttpConnector implements ClientHttpConnector { + private final CloseableHttpAsyncClient client; + + private final DefaultDataBufferFactory dataBufferFactory; + + /** + * Default constructor that creates and starts a new instance of {@link CloseableHttpAsyncClient}. + */ + public ApacheClientHttpConnector() { + this(HttpAsyncClients.createDefault()); + } + + /** + * Constructor with an initialized {@link CloseableHttpAsyncClient}. + */ + public ApacheClientHttpConnector(CloseableHttpAsyncClient client) { + this.dataBufferFactory = new DefaultDataBufferFactory(); + this.client = client; + this.client.start(); + } + + @Override + public Mono connect(HttpMethod method, URI uri, + Function> requestCallback) { + + ApacheClientHttpRequest request = new ApacheClientHttpRequest(method, uri, this.dataBufferFactory); + + return requestCallback.apply(request).then(Mono.defer(() -> execute(request))); + } + + private Mono execute(ApacheClientHttpRequest request) { + Flux byteBufferFlux = request.getByteBufferFlux(); + + ReactiveEntityProducer reactiveEntityProducer = createReactiveEntityProducer(request, byteBufferFlux); + + BasicRequestProducer basicRequestProducer = new BasicRequestProducer(request.getHttpRequest(), + reactiveEntityProducer); + + HttpClientContext context = HttpClientContext.create(); + context.setCookieStore(new BasicCookieStore()); + + return Mono.>>create(sink -> { + ReactiveResponseConsumer reactiveResponseConsumer = new ReactiveResponseConsumer(new MonoFutureCallbackAdapter<>(sink)); + this.client.execute(basicRequestProducer, reactiveResponseConsumer, context, null); + }).map(message -> new ApacheClientHttpResponse(this.dataBufferFactory, message, context)); + } + + @Nullable + private ReactiveEntityProducer createReactiveEntityProducer(ApacheClientHttpRequest request, + @Nullable Flux byteBufferFlux) { + + if (byteBufferFlux == null) { + return null; + } + + return new ReactiveEntityProducer(byteBufferFlux, request.getContentLength(), null, null); + } +} diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpRequest.java new file mode 100644 index 000000000000..4199543ad16b --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpRequest.java @@ -0,0 +1,130 @@ +/* + * Copyright 2002-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.client.reactive; + +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.Collection; + +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.message.BasicHttpRequest; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; + +import static org.springframework.http.MediaType.ALL_VALUE; + +/** + * {@link ClientHttpRequest} implementation for the Apache HttpComponents HttpClient 5.x. + * + * @author Martin Tarjányi + * @since 5.3 + * @see Apache HttpComponents + */ +class ApacheClientHttpRequest extends AbstractClientHttpRequest { + private final HttpRequest httpRequest; + + private final DataBufferFactory dataBufferFactory; + + private Flux byteBufferFlux; + + private long contentLength = -1; + + public ApacheClientHttpRequest(HttpMethod method, URI uri, DataBufferFactory dataBufferFactory) { + this.httpRequest = new BasicHttpRequest(method.name(), uri); + this.dataBufferFactory = dataBufferFactory; + } + + @Override + public HttpMethod getMethod() { + return HttpMethod.resolve(this.httpRequest.getMethod()); + } + + @Override + public URI getURI() { + try { + return this.httpRequest.getUri(); + } + catch (URISyntaxException ex) { + throw new IllegalArgumentException("Invalid URI syntax.", ex); + } + } + + @Override + public DataBufferFactory bufferFactory() { + return this.dataBufferFactory; + } + + @Override + public Mono writeWith(Publisher body) { + return doCommit(() -> { + this.byteBufferFlux = Flux.from(body).map(DataBuffer::asByteBuffer); + return Mono.empty(); + }); + } + + @Override + public Mono writeAndFlushWith(Publisher> body) { + return writeWith(Flux.from(body).flatMap(p -> p)); + } + + @Override + public Mono setComplete() { + return doCommit(); + } + + @Override + protected void applyHeaders() { + HttpHeaders headers = getHeaders(); + this.contentLength = headers.getContentLength(); + + headers.entrySet() + .stream() + .filter(entry -> !HttpHeaders.CONTENT_LENGTH.equals(entry.getKey())) + .forEach(entry -> entry.getValue().forEach(v -> this.httpRequest.addHeader(entry.getKey(), v))); + + if (!this.httpRequest.containsHeader(HttpHeaders.ACCEPT)) { + this.httpRequest.addHeader(HttpHeaders.ACCEPT, ALL_VALUE); + } + } + + @Override + protected void applyCookies() { + getCookies().values() + .stream() + .flatMap(Collection::stream) + .forEach(httpCookie -> this.httpRequest.addHeader(HttpHeaders.COOKIE, httpCookie.toString())); + } + + public HttpRequest getHttpRequest() { + return this.httpRequest; + } + + public Flux getByteBufferFlux() { + return this.byteBufferFlux; + } + + public long getContentLength() { + return this.contentLength; + } +} diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpResponse.java new file mode 100644 index 000000000000..e2c1fe8aabac --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpResponse.java @@ -0,0 +1,111 @@ +/* + * Copyright 2002-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.client.reactive; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseCookie; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import static org.apache.hc.client5.http.cookie.Cookie.MAX_AGE_ATTR; + +/** + * {@link ClientHttpResponse} implementation for the Apache HttpComponents HttpClient 5.x. + * + * @author Martin Tarjányi + * @since 5.3 + * @see Apache HttpComponents + */ +class ApacheClientHttpResponse implements ClientHttpResponse { + private final Message> message; + + private final Flux dataBufferFlux; + + private final HttpClientContext context; + + private final AtomicBoolean rejectSubscribers = new AtomicBoolean(); + + public ApacheClientHttpResponse(DefaultDataBufferFactory dataBufferFactory, + Message> message, + HttpClientContext context) { + + this.message = message; + this.context = context; + + this.dataBufferFlux = Flux.from(this.message.getBody()) + .doOnSubscribe(s -> { + if (!this.rejectSubscribers.compareAndSet(false, true)) { + throw new IllegalStateException("The client response body can only be consumed once."); + } + }) + .map(dataBufferFactory::wrap); + } + + @Override + public HttpStatus getStatusCode() { + return HttpStatus.valueOf(this.message.getHead().getCode()); + } + + @Override + public int getRawStatusCode() { + return this.message.getHead().getCode(); + } + + @Override + public MultiValueMap getCookies() { + LinkedMultiValueMap result = new LinkedMultiValueMap<>(); + this.context.getCookieStore().getCookies().forEach(cookie -> + result.add(cookie.getName(), ResponseCookie.from(cookie.getName(), cookie.getValue()) + .domain(cookie.getDomain()) + .path(cookie.getPath()) + .maxAge(Long.parseLong(Objects.toString(cookie.getAttribute(MAX_AGE_ATTR), "-1"))) + .secure(cookie.isSecure()) + .httpOnly(cookie.containsAttribute("httponly")) + .build())); + return result; + } + + @Override + public Flux getBody() { + return this.dataBufferFlux; + } + + @Override + public HttpHeaders getHeaders() { + return Arrays.stream(this.message.getHead().getHeaders()) + .collect(HttpHeaders::new, this::addHeader, HttpHeaders::putAll); + } + + private void addHeader(HttpHeaders httpHeaders, Header header) { + httpHeaders.add(header.getName(), header.getValue()); + } +} diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/MonoFutureCallbackAdapter.java b/spring-web/src/main/java/org/springframework/http/client/reactive/MonoFutureCallbackAdapter.java new file mode 100644 index 000000000000..1c6fb3ff27af --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/MonoFutureCallbackAdapter.java @@ -0,0 +1,50 @@ +/* + * Copyright 2002-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.client.reactive; + +import org.apache.hc.core5.concurrent.FutureCallback; +import reactor.core.publisher.MonoSink; + +/** + * Transforms {@link FutureCallback} events to {@link reactor.core.publisher.Mono} events. + * + * @author Martin Tarjányi + * @since 5.3 + * @param the result type + * @see Jetty ReactiveStreams HttpClient + */ +class MonoFutureCallbackAdapter implements FutureCallback { + private final MonoSink sink; + + public MonoFutureCallbackAdapter(MonoSink sink) { + this.sink = sink; + } + + @Override + public void completed(T result) { + this.sink.success(result); + } + + @Override + public void failed(Exception ex) { + this.sink.error(ex); + } + + @Override + public void cancelled() { + } +} diff --git a/spring-webflux/spring-webflux.gradle b/spring-webflux/spring-webflux.gradle index 1028d6e59c42..16f8fa93740e 100644 --- a/spring-webflux/spring-webflux.gradle +++ b/spring-webflux/spring-webflux.gradle @@ -46,6 +46,8 @@ dependencies { testCompile("org.eclipse.jetty:jetty-server") testCompile("org.eclipse.jetty:jetty-servlet") testCompile("org.eclipse.jetty:jetty-reactive-httpclient") + testCompile('org.apache.httpcomponents.client5:httpclient5:5.0') + testCompile('org.apache.httpcomponents.core5:httpcore5-reactive:5.0') testCompile("com.squareup.okhttp3:mockwebserver") testCompile("org.jetbrains.kotlin:kotlin-script-runtime") testRuntime("org.jetbrains.kotlin:kotlin-scripting-jsr223-embeddable") diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java index dfd1087598b2..bbe5a1d7c82b 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java @@ -24,6 +24,7 @@ import java.util.function.Consumer; import org.springframework.http.HttpHeaders; +import org.springframework.http.client.reactive.ApacheClientHttpConnector; import org.springframework.http.client.reactive.ClientHttpConnector; import org.springframework.http.client.reactive.JettyClientHttpConnector; import org.springframework.http.client.reactive.ReactorClientHttpConnector; @@ -50,10 +51,15 @@ final class DefaultWebClientBuilder implements WebClient.Builder { private static final boolean jettyClientPresent; + private static final boolean apacheClientPresent; + static { ClassLoader loader = DefaultWebClientBuilder.class.getClassLoader(); reactorClientPresent = ClassUtils.isPresent("reactor.netty.http.client.HttpClient", loader); jettyClientPresent = ClassUtils.isPresent("org.eclipse.jetty.client.HttpClient", loader); + apacheClientPresent = + ClassUtils.isPresent("org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient", loader) && + ClassUtils.isPresent("org.apache.hc.core5.reactive.ReactiveDataConsumer", loader); } @@ -275,6 +281,9 @@ else if (reactorClientPresent) { else if (jettyClientPresent) { return new JettyClientHttpConnector(); } + else if (apacheClientPresent) { + return new ApacheClientHttpConnector(); + } throw new IllegalStateException("No suitable default ClientHttpConnector found"); } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java index 659004be4894..bd802d50a72d 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java @@ -54,7 +54,9 @@ import org.springframework.http.HttpRequest; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; +import org.springframework.http.ResponseCookie; import org.springframework.http.ResponseEntity; +import org.springframework.http.client.reactive.ApacheClientHttpConnector; import org.springframework.http.client.reactive.ClientHttpConnector; import org.springframework.http.client.reactive.JettyClientHttpConnector; import org.springframework.http.client.reactive.ReactorClientHttpConnector; @@ -70,6 +72,7 @@ * @author Denys Ivano * @author Sebastien Deleuze * @author Sam Brannen + * @author Martin Tarjányi */ class WebClientIntegrationTests { @@ -81,10 +84,13 @@ class WebClientIntegrationTests { } static Stream arguments() { - return Stream.of(new JettyClientHttpConnector(), new ReactorClientHttpConnector()); + return Stream.of( + new ReactorClientHttpConnector(), + new JettyClientHttpConnector(), + new ApacheClientHttpConnector() + ); } - private MockWebServer server; private WebClient webClient; @@ -352,10 +358,12 @@ void retrieveEntityWithServerError(ClientHttpConnector connector) { startServer(connector); prepareResponse(response -> response.setResponseCode(500) - .setHeader("Content-Type", "text/plain").setBody("Internal Server error")); + .setHeader("Content-Type", "text/plain") + .setBody("Internal Server error")); Mono> result = this.webClient.get() - .uri("/").accept(MediaType.APPLICATION_JSON) + .uri("/") + .accept(MediaType.APPLICATION_JSON) .retrieve() .toEntity(String.class); @@ -639,6 +647,42 @@ void shouldSendCookies(ClientHttpConnector connector) { }); } + @ParameterizedWebClientTest + void shouldReceiveResponseCookies(ClientHttpConnector connector) { + startServer(connector); + + prepareResponse(response -> response + .setHeader("Content-Type", "text/plain") + .addHeader("Set-Cookie", "testkey1=testvalue1;") + .addHeader("Set-Cookie", "testkey2=testvalue2; Max-Age=42; HttpOnly; Secure") + .setBody("test")); + + Mono result = this.webClient.get() + .uri("/test") + .exchange(); + + StepVerifier.create(result) + .consumeNextWith(response -> { + assertThat(response.cookies()).containsOnlyKeys("testkey1", "testkey2"); + + ResponseCookie cookie1 = response.cookies().get("testkey1").get(0); + assertThat(cookie1.getValue()).isEqualTo("testvalue1"); + assertThat(cookie1.isSecure()).isFalse(); + assertThat(cookie1.isHttpOnly()).isFalse(); + assertThat(cookie1.getMaxAge().getSeconds()).isEqualTo(-1); + + ResponseCookie cookie2 = response.cookies().get("testkey2").get(0); + assertThat(cookie2.getValue()).isEqualTo("testvalue2"); + assertThat(cookie2.isSecure()).isTrue(); + assertThat(cookie2.isHttpOnly()).isTrue(); + assertThat(cookie2.getMaxAge().getSeconds()).isEqualTo(42); + }) + .expectComplete() + .verify(Duration.ofSeconds(3)); + + expectRequestCount(1); + } + @ParameterizedWebClientTest // SPR-16246 void shouldSendLargeTextFile(ClientHttpConnector connector) throws Exception { startServer(connector); @@ -821,7 +865,7 @@ void shouldGetErrorSignalWhenRetrievingUnknownStatusCode(ClientHttpConnector con .expectErrorSatisfies(throwable -> { assertThat(throwable instanceof UnknownHttpStatusCodeException).isTrue(); UnknownHttpStatusCodeException ex = (UnknownHttpStatusCodeException) throwable; - assertThat(ex.getMessage()).isEqualTo(("Unknown status code ["+errorStatus+"]")); + assertThat(ex.getMessage()).isEqualTo(("Unknown status code [" + errorStatus + "]")); assertThat(ex.getRawStatusCode()).isEqualTo(errorStatus); assertThat(ex.getStatusText()).isEqualTo(""); assertThat(ex.getHeaders().getContentType()).isEqualTo(MediaType.TEXT_PLAIN); @@ -1065,7 +1109,7 @@ void shouldReceiveEmptyResponse(ClientHttpConnector connector) { .flatMap(response -> response.toEntity(Void.class)); StepVerifier.create(result).assertNext(r -> - assertThat(r.getStatusCode().is2xxSuccessful()).isTrue() + assertThat(r.getStatusCode().is2xxSuccessful()).isTrue() ).verifyComplete(); } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java index b83e8f3b46cd..d86cca543f3d 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java @@ -33,6 +33,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.client.reactive.ApacheClientHttpConnector; import org.springframework.http.client.reactive.ClientHttpConnector; import org.springframework.http.client.reactive.JettyClientHttpConnector; import org.springframework.http.client.reactive.ReactorClientHttpConnector; @@ -73,12 +74,16 @@ static Object[][] arguments() { return new Object[][] { {new JettyHttpServer(), new ReactorClientHttpConnector()}, {new JettyHttpServer(), new JettyClientHttpConnector()}, + {new JettyHttpServer(), new ApacheClientHttpConnector()}, {new ReactorHttpServer(), new ReactorClientHttpConnector()}, {new ReactorHttpServer(), new JettyClientHttpConnector()}, + {new ReactorHttpServer(), new ApacheClientHttpConnector()}, {new TomcatHttpServer(), new ReactorClientHttpConnector()}, {new TomcatHttpServer(), new JettyClientHttpConnector()}, + {new TomcatHttpServer(), new ApacheClientHttpConnector()}, {new UndertowHttpServer(), new ReactorClientHttpConnector()}, - {new UndertowHttpServer(), new JettyClientHttpConnector()} + {new UndertowHttpServer(), new JettyClientHttpConnector()}, + {new UndertowHttpServer(), new ApacheClientHttpConnector()} }; } diff --git a/src/docs/asciidoc/web/webflux-webclient.adoc b/src/docs/asciidoc/web/webflux-webclient.adoc index 922a50976e11..5e3f7cc986c8 100644 --- a/src/docs/asciidoc/web/webflux-webclient.adoc +++ b/src/docs/asciidoc/web/webflux-webclient.adoc @@ -369,6 +369,33 @@ shows: <2> Plug the connector into the `WebClient.Builder`. + +[[webflux-client-builder-apache]] +=== Apache + +The following example shows how to customize Apache `HttpClient` settings: + +[source,java,indent=0,subs="verbatim,quotes",role="primary"] +.Java +---- + HttpAsyncClientBuilder clientBuilder = HttpAsyncClients.custom(); + clientBuilder.setDefaultRequestConfig(...); + CloseableHttpAsyncClient client = clientBuilder.build(); + ClientHttpConnector connector = new ApacheClientHttpConnector(client); + + WebClient webClient = WebClient.builder().clientConnector(connector).build(); +---- +[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] +.Kotlin +---- + val client = HttpAsyncClients.custom().apply { + setDefaultRequestConfig(...) + }.build() + val connector: ClientHttpConnector = ApacheClientHttpConnector(client) + val webClient = WebClient.builder().clientConnector(connector).build() +---- + + [[webflux-client-retrieve]] == `retrieve()` diff --git a/src/docs/asciidoc/web/webflux.adoc b/src/docs/asciidoc/web/webflux.adoc index 546c49393cde..53aa7ddfb407 100644 --- a/src/docs/asciidoc/web/webflux.adoc +++ b/src/docs/asciidoc/web/webflux.adoc @@ -314,6 +314,7 @@ controllers and functional endpoints are built. requests with non-blocking I/O and Reactive Streams back pressure, along with adapters for https://github.com/reactor/reactor-netty[Reactor Netty] and for the reactive https://github.com/jetty-project/jetty-reactive-httpclient[Jetty HttpClient]. +https://hc.apache.org/[Apache HttpComponents]. The higher level <> used in applications builds on this basic contract. * For client and server, <> to use to serialize and From 1bf43f0fdb4f9ab9457b764d10cf573138138afa Mon Sep 17 00:00:00 2001 From: martin-tarjanyi Date: Sat, 14 Mar 2020 23:37:27 +0100 Subject: [PATCH 02/12] Fix javadoc --- .../http/client/reactive/MonoFutureCallbackAdapter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/MonoFutureCallbackAdapter.java b/spring-web/src/main/java/org/springframework/http/client/reactive/MonoFutureCallbackAdapter.java index 1c6fb3ff27af..e06075fc5476 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/MonoFutureCallbackAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/MonoFutureCallbackAdapter.java @@ -25,7 +25,6 @@ * @author Martin Tarjányi * @since 5.3 * @param the result type - * @see Jetty ReactiveStreams HttpClient */ class MonoFutureCallbackAdapter implements FutureCallback { private final MonoSink sink; From 85dd3b714d0fdf95741e7e84e16e1e9d168192c4 Mon Sep 17 00:00:00 2001 From: martin-tarjanyi Date: Thu, 19 Mar 2020 19:56:37 +0100 Subject: [PATCH 03/12] Minor code and doc polishment --- .../client/reactive/ApacheClientHttpConnector.java | 3 ++- .../client/reactive/ApacheClientHttpResponse.java | 14 ++++++++++---- src/docs/asciidoc/web/webflux-webclient.adoc | 2 +- src/docs/asciidoc/web/webflux.adoc | 6 +++--- 4 files changed, 16 insertions(+), 9 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpConnector.java index 7cbbc5e8ecf6..b4f04aea567c 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpConnector.java @@ -33,6 +33,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.http.HttpMethod; import org.springframework.lang.Nullable; @@ -47,7 +48,7 @@ public class ApacheClientHttpConnector implements ClientHttpConnector { private final CloseableHttpAsyncClient client; - private final DefaultDataBufferFactory dataBufferFactory; + private final DataBufferFactory dataBufferFactory; /** * Default constructor that creates and starts a new instance of {@link CloseableHttpAsyncClient}. diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpResponse.java index e2c1fe8aabac..5954c6654c7a 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpResponse.java @@ -18,9 +18,9 @@ import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hc.client5.http.cookie.Cookie; import org.apache.hc.client5.http.protocol.HttpClientContext; import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpResponse; @@ -29,7 +29,7 @@ import reactor.core.publisher.Flux; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseCookie; @@ -54,7 +54,7 @@ class ApacheClientHttpResponse implements ClientHttpResponse { private final AtomicBoolean rejectSubscribers = new AtomicBoolean(); - public ApacheClientHttpResponse(DefaultDataBufferFactory dataBufferFactory, + public ApacheClientHttpResponse(DataBufferFactory dataBufferFactory, Message> message, HttpClientContext context) { @@ -87,7 +87,7 @@ public MultiValueMap getCookies() { result.add(cookie.getName(), ResponseCookie.from(cookie.getName(), cookie.getValue()) .domain(cookie.getDomain()) .path(cookie.getPath()) - .maxAge(Long.parseLong(Objects.toString(cookie.getAttribute(MAX_AGE_ATTR), "-1"))) + .maxAge(getMaxAgeSeconds(cookie)) .secure(cookie.isSecure()) .httpOnly(cookie.containsAttribute("httponly")) .build())); @@ -108,4 +108,10 @@ public HttpHeaders getHeaders() { private void addHeader(HttpHeaders httpHeaders, Header header) { httpHeaders.add(header.getName(), header.getValue()); } + + private long getMaxAgeSeconds(Cookie cookie) { + String maxAgeAttribute = cookie.getAttribute(MAX_AGE_ATTR); + + return maxAgeAttribute == null ? -1 : Long.parseLong(maxAgeAttribute); + } } diff --git a/src/docs/asciidoc/web/webflux-webclient.adoc b/src/docs/asciidoc/web/webflux-webclient.adoc index 5e3f7cc986c8..86afbd665a40 100644 --- a/src/docs/asciidoc/web/webflux-webclient.adoc +++ b/src/docs/asciidoc/web/webflux-webclient.adoc @@ -391,7 +391,7 @@ The following example shows how to customize Apache `HttpClient` settings: val client = HttpAsyncClients.custom().apply { setDefaultRequestConfig(...) }.build() - val connector: ClientHttpConnector = ApacheClientHttpConnector(client) + val connector = ApacheClientHttpConnector(client) val webClient = WebClient.builder().clientConnector(connector).build() ---- diff --git a/src/docs/asciidoc/web/webflux.adoc b/src/docs/asciidoc/web/webflux.adoc index 53aa7ddfb407..7cb0b7214962 100644 --- a/src/docs/asciidoc/web/webflux.adoc +++ b/src/docs/asciidoc/web/webflux.adoc @@ -312,9 +312,9 @@ request handling, on top of which concrete programming models such as annotated controllers and functional endpoints are built. * For the client side, there is a basic `ClientHttpConnector` contract to perform HTTP requests with non-blocking I/O and Reactive Streams back pressure, along with adapters for -https://github.com/reactor/reactor-netty[Reactor Netty] and for the reactive -https://github.com/jetty-project/jetty-reactive-httpclient[Jetty HttpClient]. -https://hc.apache.org/[Apache HttpComponents]. +https://github.com/reactor/reactor-netty[Reactor Netty], reactive +https://github.com/jetty-project/jetty-reactive-httpclient[Jetty HttpClient] +and https://hc.apache.org/[Apache HttpComponents]. The higher level <> used in applications builds on this basic contract. * For client and server, <> to use to serialize and From df32d7f026941fff2dc376d62cceb3424cd34d1f Mon Sep 17 00:00:00 2001 From: martin-tarjanyi Date: Sun, 22 Mar 2020 17:10:02 +0100 Subject: [PATCH 04/12] Fix handling of multiple request cookies for apache http client --- .../client/reactive/ApacheClientHttpRequest.java | 13 +++++++++++-- .../client/reactive/ApacheClientHttpResponse.java | 2 +- .../function/client/WebClientIntegrationTests.java | 8 +++++++- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpRequest.java index 4199543ad16b..a76106f57c0b 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpRequest.java @@ -20,6 +20,7 @@ import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.stream.Collectors; import org.apache.hc.core5.http.HttpRequest; import org.apache.hc.core5.http.message.BasicHttpRequest; @@ -29,6 +30,7 @@ import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.http.HttpCookie; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -110,10 +112,17 @@ protected void applyHeaders() { @Override protected void applyCookies() { - getCookies().values() + if (getCookies().isEmpty()) { + return; + } + + String cookiesString = getCookies().values() .stream() .flatMap(Collection::stream) - .forEach(httpCookie -> this.httpRequest.addHeader(HttpHeaders.COOKIE, httpCookie.toString())); + .map(HttpCookie::toString) + .collect(Collectors.joining("; ")); + + this.httpRequest.addHeader(HttpHeaders.COOKIE, cookiesString); } public HttpRequest getHttpRequest() { diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpResponse.java index 5954c6654c7a..5058c0bae3f5 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpResponse.java @@ -84,7 +84,7 @@ public int getRawStatusCode() { public MultiValueMap getCookies() { LinkedMultiValueMap result = new LinkedMultiValueMap<>(); this.context.getCookieStore().getCookies().forEach(cookie -> - result.add(cookie.getName(), ResponseCookie.from(cookie.getName(), cookie.getValue()) + result.add(cookie.getName(), ResponseCookie.fromClientResponse(cookie.getName(), cookie.getValue()) .domain(cookie.getDomain()) .path(cookie.getPath()) .maxAge(getMaxAgeSeconds(cookie)) diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java index bd802d50a72d..f8ab2c029b89 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java @@ -626,12 +626,18 @@ void shouldSendPojoAsJson(ClientHttpConnector connector) { void shouldSendCookies(ClientHttpConnector connector) { startServer(connector); + // reactor-netty not handles multiple request cookies correctly and fails on this test, should report as bug + if (connector instanceof ReactorClientHttpConnector) { + return; + } + prepareResponse(response -> response .setHeader("Content-Type", "text/plain").setBody("test")); Mono result = this.webClient.get() .uri("/test") .cookie("testkey", "testvalue") + .cookie("testkey2", "testvalue2") .retrieve() .bodyToMono(String.class); @@ -643,7 +649,7 @@ void shouldSendCookies(ClientHttpConnector connector) { expectRequestCount(1); expectRequest(request -> { assertThat(request.getPath()).isEqualTo("/test"); - assertThat(request.getHeader(HttpHeaders.COOKIE)).isEqualTo("testkey=testvalue"); + assertThat(request.getHeader(HttpHeaders.COOKIE)).isEqualTo("testkey=testvalue; testkey2=testvalue2"); }); } From 3a7f4ed7859a54a8dd3ff8988ad166aceeade752 Mon Sep 17 00:00:00 2001 From: martin-tarjanyi Date: Thu, 26 Mar 2020 20:17:57 +0100 Subject: [PATCH 05/12] Conform to code style --- .../reactive/ApacheClientHttpConnector.java | 7 ++++++- .../client/reactive/ApacheClientHttpRequest.java | 3 +++ .../client/reactive/ApacheClientHttpResponse.java | 15 +++++++++------ .../reactive/MonoFutureCallbackAdapter.java | 3 +++ .../function/client/DefaultWebClientBuilder.java | 5 ++--- 5 files changed, 23 insertions(+), 10 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpConnector.java index b4f04aea567c..c9f9b7785d53 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpConnector.java @@ -46,10 +46,12 @@ * @see Apache HttpComponents */ public class ApacheClientHttpConnector implements ClientHttpConnector { + private final CloseableHttpAsyncClient client; private final DataBufferFactory dataBufferFactory; + /** * Default constructor that creates and starts a new instance of {@link CloseableHttpAsyncClient}. */ @@ -66,6 +68,7 @@ public ApacheClientHttpConnector(CloseableHttpAsyncClient client) { this.client.start(); } + @Override public Mono connect(HttpMethod method, URI uri, Function> requestCallback) { @@ -87,7 +90,9 @@ private Mono execute(ApacheClientHttpRequest request) { context.setCookieStore(new BasicCookieStore()); return Mono.>>create(sink -> { - ReactiveResponseConsumer reactiveResponseConsumer = new ReactiveResponseConsumer(new MonoFutureCallbackAdapter<>(sink)); + ReactiveResponseConsumer reactiveResponseConsumer = + new ReactiveResponseConsumer(new MonoFutureCallbackAdapter<>(sink)); + this.client.execute(basicRequestProducer, reactiveResponseConsumer, context, null); }).map(message -> new ApacheClientHttpResponse(this.dataBufferFactory, message, context)); } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpRequest.java index a76106f57c0b..8739ba121207 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpRequest.java @@ -44,6 +44,7 @@ * @see Apache HttpComponents */ class ApacheClientHttpRequest extends AbstractClientHttpRequest { + private final HttpRequest httpRequest; private final DataBufferFactory dataBufferFactory; @@ -52,11 +53,13 @@ class ApacheClientHttpRequest extends AbstractClientHttpRequest { private long contentLength = -1; + public ApacheClientHttpRequest(HttpMethod method, URI uri, DataBufferFactory dataBufferFactory) { this.httpRequest = new BasicHttpRequest(method.name(), uri); this.dataBufferFactory = dataBufferFactory; } + @Override public HttpMethod getMethod() { return HttpMethod.resolve(this.httpRequest.getMethod()); diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpResponse.java index 5058c0bae3f5..ecbd8f6a1a9f 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpResponse.java @@ -46,6 +46,7 @@ * @see Apache HttpComponents */ class ApacheClientHttpResponse implements ClientHttpResponse { + private final Message> message; private final Flux dataBufferFlux; @@ -54,6 +55,7 @@ class ApacheClientHttpResponse implements ClientHttpResponse { private final AtomicBoolean rejectSubscribers = new AtomicBoolean(); + public ApacheClientHttpResponse(DataBufferFactory dataBufferFactory, Message> message, HttpClientContext context) { @@ -70,6 +72,7 @@ public ApacheClientHttpResponse(DataBufferFactory dataBufferFactory, .map(dataBufferFactory::wrap); } + @Override public HttpStatus getStatusCode() { return HttpStatus.valueOf(this.message.getHead().getCode()); @@ -94,6 +97,12 @@ public MultiValueMap getCookies() { return result; } + private long getMaxAgeSeconds(Cookie cookie) { + String maxAgeAttribute = cookie.getAttribute(MAX_AGE_ATTR); + + return maxAgeAttribute == null ? -1 : Long.parseLong(maxAgeAttribute); + } + @Override public Flux getBody() { return this.dataBufferFlux; @@ -108,10 +117,4 @@ public HttpHeaders getHeaders() { private void addHeader(HttpHeaders httpHeaders, Header header) { httpHeaders.add(header.getName(), header.getValue()); } - - private long getMaxAgeSeconds(Cookie cookie) { - String maxAgeAttribute = cookie.getAttribute(MAX_AGE_ATTR); - - return maxAgeAttribute == null ? -1 : Long.parseLong(maxAgeAttribute); - } } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/MonoFutureCallbackAdapter.java b/spring-web/src/main/java/org/springframework/http/client/reactive/MonoFutureCallbackAdapter.java index e06075fc5476..0f28ada577d2 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/MonoFutureCallbackAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/MonoFutureCallbackAdapter.java @@ -27,12 +27,15 @@ * @param the result type */ class MonoFutureCallbackAdapter implements FutureCallback { + private final MonoSink sink; + public MonoFutureCallbackAdapter(MonoSink sink) { this.sink = sink; } + @Override public void completed(T result) { this.sink.success(result); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java index bbe5a1d7c82b..3b040e203c97 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java @@ -57,9 +57,8 @@ final class DefaultWebClientBuilder implements WebClient.Builder { ClassLoader loader = DefaultWebClientBuilder.class.getClassLoader(); reactorClientPresent = ClassUtils.isPresent("reactor.netty.http.client.HttpClient", loader); jettyClientPresent = ClassUtils.isPresent("org.eclipse.jetty.client.HttpClient", loader); - apacheClientPresent = - ClassUtils.isPresent("org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient", loader) && - ClassUtils.isPresent("org.apache.hc.core5.reactive.ReactiveDataConsumer", loader); + apacheClientPresent = ClassUtils.isPresent("org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient", loader) + && ClassUtils.isPresent("org.apache.hc.core5.reactive.ReactiveDataConsumer", loader); } From ac233bb31c6ae0dd7ea40fea5fe84ae999eca709 Mon Sep 17 00:00:00 2001 From: martin-tarjanyi Date: Fri, 27 Mar 2020 23:30:13 +0100 Subject: [PATCH 06/12] Rename Apache client components for consistency --- .../HttpComponentsAsyncClientHttpRequest.java | 2 +- ...ttpComponentsAsyncClientHttpRequestFactory.java | 2 +- .../HttpComponentsAsyncClientHttpResponse.java | 2 +- ...java => HttpComponentsClientHttpConnector.java} | 14 +++++++------- ...t.java => HttpComponentsClientHttpRequest.java} | 4 ++-- ....java => HttpComponentsClientHttpResponse.java} | 4 ++-- .../function/client/DefaultWebClientBuilder.java | 10 +++++----- .../function/client/WebClientIntegrationTests.java | 4 ++-- .../method/annotation/SseIntegrationTests.java | 10 +++++----- src/docs/asciidoc/web/webflux-webclient.adoc | 10 +++++----- 10 files changed, 31 insertions(+), 31 deletions(-) rename spring-web/src/main/java/org/springframework/http/client/reactive/{ApacheClientHttpConnector.java => HttpComponentsClientHttpConnector.java} (84%) rename spring-web/src/main/java/org/springframework/http/client/reactive/{ApacheClientHttpRequest.java => HttpComponentsClientHttpRequest.java} (95%) rename spring-web/src/main/java/org/springframework/http/client/reactive/{ApacheClientHttpResponse.java => HttpComponentsClientHttpResponse.java} (96%) diff --git a/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpRequest.java index 2d80ff9e9d57..ca97edde0bb8 100644 --- a/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpRequest.java @@ -49,7 +49,7 @@ * @since 4.0 * @see HttpComponentsClientHttpRequestFactory#createRequest * @deprecated as of Spring 5.0, in favor of - * {@link org.springframework.http.client.reactive.ApacheClientHttpConnector} + * {@link org.springframework.http.client.reactive.HttpComponentsClientHttpConnector} */ @Deprecated final class HttpComponentsAsyncClientHttpRequest extends AbstractBufferingAsyncClientHttpRequest { diff --git a/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpRequestFactory.java b/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpRequestFactory.java index d47dfffad400..84bd07a159e6 100644 --- a/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpRequestFactory.java +++ b/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpRequestFactory.java @@ -45,7 +45,7 @@ * @since 4.0 * @see HttpAsyncClient * @deprecated as of Spring 5.0, in favor of - * {@link org.springframework.http.client.reactive.ApacheClientHttpConnector} + * {@link org.springframework.http.client.reactive.HttpComponentsClientHttpConnector} */ @Deprecated public class HttpComponentsAsyncClientHttpRequestFactory extends HttpComponentsClientHttpRequestFactory diff --git a/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpResponse.java index 61be13ae082e..54297db86008 100644 --- a/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpResponse.java @@ -38,7 +38,7 @@ * @since 4.0 * @see HttpComponentsAsyncClientHttpRequest#executeAsync() * @deprecated as of Spring 5.0, in favor of - * {@link org.springframework.http.client.reactive.ApacheClientHttpConnector} + * {@link org.springframework.http.client.reactive.HttpComponentsClientHttpConnector} */ @Deprecated final class HttpComponentsAsyncClientHttpResponse extends AbstractClientHttpResponse { diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpConnector.java similarity index 84% rename from spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpConnector.java rename to spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpConnector.java index c9f9b7785d53..8afdf5268501 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpConnector.java @@ -45,7 +45,7 @@ * @since 5.3 * @see Apache HttpComponents */ -public class ApacheClientHttpConnector implements ClientHttpConnector { +public class HttpComponentsClientHttpConnector implements ClientHttpConnector { private final CloseableHttpAsyncClient client; @@ -55,14 +55,14 @@ public class ApacheClientHttpConnector implements ClientHttpConnector { /** * Default constructor that creates and starts a new instance of {@link CloseableHttpAsyncClient}. */ - public ApacheClientHttpConnector() { + public HttpComponentsClientHttpConnector() { this(HttpAsyncClients.createDefault()); } /** * Constructor with an initialized {@link CloseableHttpAsyncClient}. */ - public ApacheClientHttpConnector(CloseableHttpAsyncClient client) { + public HttpComponentsClientHttpConnector(CloseableHttpAsyncClient client) { this.dataBufferFactory = new DefaultDataBufferFactory(); this.client = client; this.client.start(); @@ -73,12 +73,12 @@ public ApacheClientHttpConnector(CloseableHttpAsyncClient client) { public Mono connect(HttpMethod method, URI uri, Function> requestCallback) { - ApacheClientHttpRequest request = new ApacheClientHttpRequest(method, uri, this.dataBufferFactory); + HttpComponentsClientHttpRequest request = new HttpComponentsClientHttpRequest(method, uri, this.dataBufferFactory); return requestCallback.apply(request).then(Mono.defer(() -> execute(request))); } - private Mono execute(ApacheClientHttpRequest request) { + private Mono execute(HttpComponentsClientHttpRequest request) { Flux byteBufferFlux = request.getByteBufferFlux(); ReactiveEntityProducer reactiveEntityProducer = createReactiveEntityProducer(request, byteBufferFlux); @@ -94,11 +94,11 @@ private Mono execute(ApacheClientHttpRequest request) { new ReactiveResponseConsumer(new MonoFutureCallbackAdapter<>(sink)); this.client.execute(basicRequestProducer, reactiveResponseConsumer, context, null); - }).map(message -> new ApacheClientHttpResponse(this.dataBufferFactory, message, context)); + }).map(message -> new HttpComponentsClientHttpResponse(this.dataBufferFactory, message, context)); } @Nullable - private ReactiveEntityProducer createReactiveEntityProducer(ApacheClientHttpRequest request, + private ReactiveEntityProducer createReactiveEntityProducer(HttpComponentsClientHttpRequest request, @Nullable Flux byteBufferFlux) { if (byteBufferFlux == null) { diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java similarity index 95% rename from spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpRequest.java rename to spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java index 8739ba121207..be18e283ed15 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java @@ -43,7 +43,7 @@ * @since 5.3 * @see Apache HttpComponents */ -class ApacheClientHttpRequest extends AbstractClientHttpRequest { +class HttpComponentsClientHttpRequest extends AbstractClientHttpRequest { private final HttpRequest httpRequest; @@ -54,7 +54,7 @@ class ApacheClientHttpRequest extends AbstractClientHttpRequest { private long contentLength = -1; - public ApacheClientHttpRequest(HttpMethod method, URI uri, DataBufferFactory dataBufferFactory) { + public HttpComponentsClientHttpRequest(HttpMethod method, URI uri, DataBufferFactory dataBufferFactory) { this.httpRequest = new BasicHttpRequest(method.name(), uri); this.dataBufferFactory = dataBufferFactory; } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpResponse.java similarity index 96% rename from spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpResponse.java rename to spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpResponse.java index ecbd8f6a1a9f..08f592889405 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ApacheClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpResponse.java @@ -45,7 +45,7 @@ * @since 5.3 * @see Apache HttpComponents */ -class ApacheClientHttpResponse implements ClientHttpResponse { +class HttpComponentsClientHttpResponse implements ClientHttpResponse { private final Message> message; @@ -56,7 +56,7 @@ class ApacheClientHttpResponse implements ClientHttpResponse { private final AtomicBoolean rejectSubscribers = new AtomicBoolean(); - public ApacheClientHttpResponse(DataBufferFactory dataBufferFactory, + public HttpComponentsClientHttpResponse(DataBufferFactory dataBufferFactory, Message> message, HttpClientContext context) { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java index 3b040e203c97..fed9093abd92 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java @@ -24,8 +24,8 @@ import java.util.function.Consumer; import org.springframework.http.HttpHeaders; -import org.springframework.http.client.reactive.ApacheClientHttpConnector; import org.springframework.http.client.reactive.ClientHttpConnector; +import org.springframework.http.client.reactive.HttpComponentsClientHttpConnector; import org.springframework.http.client.reactive.JettyClientHttpConnector; import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.http.codec.ClientCodecConfigurer; @@ -51,13 +51,13 @@ final class DefaultWebClientBuilder implements WebClient.Builder { private static final boolean jettyClientPresent; - private static final boolean apacheClientPresent; + private static final boolean httpComponentsClientPresent; static { ClassLoader loader = DefaultWebClientBuilder.class.getClassLoader(); reactorClientPresent = ClassUtils.isPresent("reactor.netty.http.client.HttpClient", loader); jettyClientPresent = ClassUtils.isPresent("org.eclipse.jetty.client.HttpClient", loader); - apacheClientPresent = ClassUtils.isPresent("org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient", loader) + httpComponentsClientPresent = ClassUtils.isPresent("org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient", loader) && ClassUtils.isPresent("org.apache.hc.core5.reactive.ReactiveDataConsumer", loader); } @@ -280,8 +280,8 @@ else if (reactorClientPresent) { else if (jettyClientPresent) { return new JettyClientHttpConnector(); } - else if (apacheClientPresent) { - return new ApacheClientHttpConnector(); + else if (httpComponentsClientPresent) { + return new HttpComponentsClientHttpConnector(); } throw new IllegalStateException("No suitable default ClientHttpConnector found"); } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java index ca9e2e04f9cc..2be1a304387a 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java @@ -59,8 +59,8 @@ import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; -import org.springframework.http.client.reactive.ApacheClientHttpConnector; import org.springframework.http.client.reactive.ClientHttpConnector; +import org.springframework.http.client.reactive.HttpComponentsClientHttpConnector; import org.springframework.http.client.reactive.JettyClientHttpConnector; import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.web.testfixture.xml.Pojo; @@ -90,7 +90,7 @@ static Stream arguments() { return Stream.of( new ReactorClientHttpConnector(), new JettyClientHttpConnector(), - new ApacheClientHttpConnector() + new HttpComponentsClientHttpConnector() ); } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java index d86cca543f3d..6344b86c874c 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java @@ -33,8 +33,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.ParameterizedTypeReference; -import org.springframework.http.client.reactive.ApacheClientHttpConnector; import org.springframework.http.client.reactive.ClientHttpConnector; +import org.springframework.http.client.reactive.HttpComponentsClientHttpConnector; import org.springframework.http.client.reactive.JettyClientHttpConnector; import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.http.codec.ServerSentEvent; @@ -74,16 +74,16 @@ static Object[][] arguments() { return new Object[][] { {new JettyHttpServer(), new ReactorClientHttpConnector()}, {new JettyHttpServer(), new JettyClientHttpConnector()}, - {new JettyHttpServer(), new ApacheClientHttpConnector()}, + {new JettyHttpServer(), new HttpComponentsClientHttpConnector()}, {new ReactorHttpServer(), new ReactorClientHttpConnector()}, {new ReactorHttpServer(), new JettyClientHttpConnector()}, - {new ReactorHttpServer(), new ApacheClientHttpConnector()}, + {new ReactorHttpServer(), new HttpComponentsClientHttpConnector()}, {new TomcatHttpServer(), new ReactorClientHttpConnector()}, {new TomcatHttpServer(), new JettyClientHttpConnector()}, - {new TomcatHttpServer(), new ApacheClientHttpConnector()}, + {new TomcatHttpServer(), new HttpComponentsClientHttpConnector()}, {new UndertowHttpServer(), new ReactorClientHttpConnector()}, {new UndertowHttpServer(), new JettyClientHttpConnector()}, - {new UndertowHttpServer(), new ApacheClientHttpConnector()} + {new UndertowHttpServer(), new HttpComponentsClientHttpConnector()} }; } diff --git a/src/docs/asciidoc/web/webflux-webclient.adoc b/src/docs/asciidoc/web/webflux-webclient.adoc index 86afbd665a40..1e64b1740381 100644 --- a/src/docs/asciidoc/web/webflux-webclient.adoc +++ b/src/docs/asciidoc/web/webflux-webclient.adoc @@ -370,10 +370,10 @@ shows: -[[webflux-client-builder-apache]] -=== Apache +[[webflux-client-builder-http-components]] +=== HttpComponents -The following example shows how to customize Apache `HttpClient` settings: +The following example shows how to customize Apache HttpComponents `HttpClient` settings: [source,java,indent=0,subs="verbatim,quotes",role="primary"] .Java @@ -381,7 +381,7 @@ The following example shows how to customize Apache `HttpClient` settings: HttpAsyncClientBuilder clientBuilder = HttpAsyncClients.custom(); clientBuilder.setDefaultRequestConfig(...); CloseableHttpAsyncClient client = clientBuilder.build(); - ClientHttpConnector connector = new ApacheClientHttpConnector(client); + ClientHttpConnector connector = new HttpComponentsClientHttpConnector(client); WebClient webClient = WebClient.builder().clientConnector(connector).build(); ---- @@ -391,7 +391,7 @@ The following example shows how to customize Apache `HttpClient` settings: val client = HttpAsyncClients.custom().apply { setDefaultRequestConfig(...) }.build() - val connector = ApacheClientHttpConnector(client) + val connector = HttpComponentsClientHttpConnector(client) val webClient = WebClient.builder().clientConnector(connector).build() ---- From d33dc057e54930f20f1b1e9915d8ec86e4a29aa5 Mon Sep 17 00:00:00 2001 From: martin-tarjanyi Date: Sat, 28 Mar 2020 00:01:36 +0100 Subject: [PATCH 07/12] Polish HttpComponents WebClient implementation --- .../HttpComponentsClientHttpConnector.java | 27 ++++++++++ .../HttpComponentsClientHttpResponse.java | 9 ++-- .../reactive/MonoFutureCallbackAdapter.java | 52 ------------------- 3 files changed, 30 insertions(+), 58 deletions(-) delete mode 100644 spring-web/src/main/java/org/springframework/http/client/reactive/MonoFutureCallbackAdapter.java diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpConnector.java index 8afdf5268501..9003f2bbf304 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpConnector.java @@ -24,6 +24,7 @@ import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.apache.hc.client5.http.impl.async.HttpAsyncClients; import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.http.HttpResponse; import org.apache.hc.core5.http.Message; import org.apache.hc.core5.http.nio.support.BasicRequestProducer; @@ -32,6 +33,7 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoSink; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DefaultDataBufferFactory; @@ -107,4 +109,29 @@ private ReactiveEntityProducer createReactiveEntityProducer(HttpComponentsClient return new ReactiveEntityProducer(byteBufferFlux, request.getContentLength(), null, null); } + + + private static class MonoFutureCallbackAdapter implements FutureCallback { + + private final MonoSink sink; + + public MonoFutureCallbackAdapter(MonoSink sink) { + this.sink = sink; + } + + @Override + public void completed(T result) { + this.sink.success(result); + } + + @Override + public void failed(Exception ex) { + this.sink.error(ex); + } + + @Override + public void cancelled() { + } + } + } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpResponse.java index 08f592889405..46f7395ee68a 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpResponse.java @@ -22,7 +22,6 @@ import org.apache.hc.client5.http.cookie.Cookie; import org.apache.hc.client5.http.protocol.HttpClientContext; -import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpResponse; import org.apache.hc.core5.http.Message; import org.reactivestreams.Publisher; @@ -111,10 +110,8 @@ public Flux getBody() { @Override public HttpHeaders getHeaders() { return Arrays.stream(this.message.getHead().getHeaders()) - .collect(HttpHeaders::new, this::addHeader, HttpHeaders::putAll); - } - - private void addHeader(HttpHeaders httpHeaders, Header header) { - httpHeaders.add(header.getName(), header.getValue()); + .collect(HttpHeaders::new, + (httpHeaders, header) -> httpHeaders.add(header.getName(), header.getValue()), + HttpHeaders::putAll); } } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/MonoFutureCallbackAdapter.java b/spring-web/src/main/java/org/springframework/http/client/reactive/MonoFutureCallbackAdapter.java deleted file mode 100644 index 0f28ada577d2..000000000000 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/MonoFutureCallbackAdapter.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2002-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.http.client.reactive; - -import org.apache.hc.core5.concurrent.FutureCallback; -import reactor.core.publisher.MonoSink; - -/** - * Transforms {@link FutureCallback} events to {@link reactor.core.publisher.Mono} events. - * - * @author Martin Tarjányi - * @since 5.3 - * @param the result type - */ -class MonoFutureCallbackAdapter implements FutureCallback { - - private final MonoSink sink; - - - public MonoFutureCallbackAdapter(MonoSink sink) { - this.sink = sink; - } - - - @Override - public void completed(T result) { - this.sink.success(result); - } - - @Override - public void failed(Exception ex) { - this.sink.error(ex); - } - - @Override - public void cancelled() { - } -} From 1a92f3cfceea3bfbdb8c19a9b8f77170996b745d Mon Sep 17 00:00:00 2001 From: martin-tarjanyi Date: Sat, 28 Mar 2020 22:20:57 +0100 Subject: [PATCH 08/12] Re-add response cookie integration test for WebClient --- .../client/WebClientIntegrationTests.java | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java index 2be1a304387a..15b2047ca2bd 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java @@ -58,6 +58,7 @@ import org.springframework.http.HttpRequest; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; +import org.springframework.http.ResponseCookie; import org.springframework.http.ResponseEntity; import org.springframework.http.client.reactive.ClientHttpConnector; import org.springframework.http.client.reactive.HttpComponentsClientHttpConnector; @@ -1084,6 +1085,42 @@ void filterForErrorHandling(ClientHttpConnector connector) { expectRequestCount(2); } + @ParameterizedWebClientTest + void exchangeResponseCookies(ClientHttpConnector connector) { + startServer(connector); + + prepareResponse(response -> response + .setHeader("Content-Type", "text/plain") + .addHeader("Set-Cookie", "testkey1=testvalue1;") + .addHeader("Set-Cookie", "testkey2=testvalue2; Max-Age=42; HttpOnly; Secure") + .setBody("test")); + + Mono result = this.webClient.get() + .uri("/test") + .exchange(); + + StepVerifier.create(result) + .consumeNextWith(response -> { + assertThat(response.cookies()).containsOnlyKeys("testkey1", "testkey2"); + + ResponseCookie cookie1 = response.cookies().get("testkey1").get(0); + assertThat(cookie1.getValue()).isEqualTo("testvalue1"); + assertThat(cookie1.isSecure()).isFalse(); + assertThat(cookie1.isHttpOnly()).isFalse(); + assertThat(cookie1.getMaxAge().getSeconds()).isEqualTo(-1); + + ResponseCookie cookie2 = response.cookies().get("testkey2").get(0); + assertThat(cookie2.getValue()).isEqualTo("testvalue2"); + assertThat(cookie2.isSecure()).isTrue(); + assertThat(cookie2.isHttpOnly()).isTrue(); + assertThat(cookie2.getMaxAge().getSeconds()).isEqualTo(42); + }) + .expectComplete() + .verify(Duration.ofSeconds(3)); + + expectRequestCount(1); + } + private void prepareResponse(Consumer consumer) { MockResponse response = new MockResponse(); From 58f84882139eb61f7751874c4a6f14005a90271a Mon Sep 17 00:00:00 2001 From: martin-tarjanyi Date: Sun, 29 Mar 2020 20:28:50 +0200 Subject: [PATCH 09/12] Use CookieStore to send cookies with HttpComponents client --- .../HttpComponentsClientHttpConnector.java | 36 +++++++++++++++---- .../HttpComponentsClientHttpRequest.java | 26 +++++++++----- 2 files changed, 47 insertions(+), 15 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpConnector.java index 9003f2bbf304..b5778ea75338 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpConnector.java @@ -19,6 +19,7 @@ import java.net.URI; import java.nio.ByteBuffer; import java.util.function.Function; +import java.util.function.Supplier; import org.apache.hc.client5.http.cookie.BasicCookieStore; import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; @@ -53,6 +54,8 @@ public class HttpComponentsClientHttpConnector implements ClientHttpConnector { private final DataBufferFactory dataBufferFactory; + private final Supplier contextSupplier; + /** * Default constructor that creates and starts a new instance of {@link CloseableHttpAsyncClient}. @@ -62,10 +65,25 @@ public HttpComponentsClientHttpConnector() { } /** - * Constructor with an initialized {@link CloseableHttpAsyncClient}. + * Constructor with a pre-configured {@link CloseableHttpAsyncClient} instance. + * @param client the client to use */ public HttpComponentsClientHttpConnector(CloseableHttpAsyncClient client) { + this(client, HttpClientContext::create); + } + + /** + * Constructor with a pre-configured {@link CloseableHttpAsyncClient} instance + * and a {@link HttpClientContext} supplier lambda which is called before each request + * and passed to the client. + * @param client the client to use + * @param contextSupplier a {@link HttpClientContext} supplier + */ + public HttpComponentsClientHttpConnector(CloseableHttpAsyncClient client, + Supplier contextSupplier) { + this.dataBufferFactory = new DefaultDataBufferFactory(); + this.contextSupplier = contextSupplier; this.client = client; this.client.start(); } @@ -75,12 +93,19 @@ public HttpComponentsClientHttpConnector(CloseableHttpAsyncClient client) { public Mono connect(HttpMethod method, URI uri, Function> requestCallback) { - HttpComponentsClientHttpRequest request = new HttpComponentsClientHttpRequest(method, uri, this.dataBufferFactory); + HttpClientContext context = this.contextSupplier.get(); + + if (context.getCookieStore() == null) { + context.setCookieStore(new BasicCookieStore()); + } - return requestCallback.apply(request).then(Mono.defer(() -> execute(request))); + HttpComponentsClientHttpRequest request = new HttpComponentsClientHttpRequest(method, uri, + context, this.dataBufferFactory); + + return requestCallback.apply(request).then(Mono.defer(() -> execute(request, context))); } - private Mono execute(HttpComponentsClientHttpRequest request) { + private Mono execute(HttpComponentsClientHttpRequest request, HttpClientContext context) { Flux byteBufferFlux = request.getByteBufferFlux(); ReactiveEntityProducer reactiveEntityProducer = createReactiveEntityProducer(request, byteBufferFlux); @@ -88,9 +113,6 @@ private Mono execute(HttpComponentsClientHttpRequest request BasicRequestProducer basicRequestProducer = new BasicRequestProducer(request.getHttpRequest(), reactiveEntityProducer); - HttpClientContext context = HttpClientContext.create(); - context.setCookieStore(new BasicCookieStore()); - return Mono.>>create(sink -> { ReactiveResponseConsumer reactiveResponseConsumer = new ReactiveResponseConsumer(new MonoFutureCallbackAdapter<>(sink)); diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java index be18e283ed15..116f1c62ca9d 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java @@ -20,8 +20,10 @@ import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.Collection; -import java.util.stream.Collectors; +import org.apache.hc.client5.http.cookie.CookieStore; +import org.apache.hc.client5.http.impl.cookie.BasicClientCookie; +import org.apache.hc.client5.http.protocol.HttpClientContext; import org.apache.hc.core5.http.HttpRequest; import org.apache.hc.core5.http.message.BasicHttpRequest; import org.reactivestreams.Publisher; @@ -30,7 +32,6 @@ import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.http.HttpCookie; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -49,12 +50,17 @@ class HttpComponentsClientHttpRequest extends AbstractClientHttpRequest { private final DataBufferFactory dataBufferFactory; + private final HttpClientContext context; + private Flux byteBufferFlux; private long contentLength = -1; - public HttpComponentsClientHttpRequest(HttpMethod method, URI uri, DataBufferFactory dataBufferFactory) { + public HttpComponentsClientHttpRequest(HttpMethod method, URI uri, HttpClientContext context, + DataBufferFactory dataBufferFactory) { + + this.context = context; this.httpRequest = new BasicHttpRequest(method.name(), uri); this.dataBufferFactory = dataBufferFactory; } @@ -119,13 +125,17 @@ protected void applyCookies() { return; } - String cookiesString = getCookies().values() + CookieStore cookieStore = this.context.getCookieStore(); + + getCookies().values() .stream() .flatMap(Collection::stream) - .map(HttpCookie::toString) - .collect(Collectors.joining("; ")); - - this.httpRequest.addHeader(HttpHeaders.COOKIE, cookiesString); + .forEach(cookie -> { + BasicClientCookie clientCookie = new BasicClientCookie(cookie.getName(), cookie.getValue()); + clientCookie.setDomain(getURI().getHost()); + clientCookie.setPath(getURI().getPath()); + cookieStore.addCookie(clientCookie); + }); } public HttpRequest getHttpRequest() { From 3b032e287c842ad3aba1d5580758e81354b09fd3 Mon Sep 17 00:00:00 2001 From: martin-tarjanyi Date: Sun, 29 Mar 2020 20:47:34 +0200 Subject: [PATCH 10/12] Move response body stream handling to getBody method --- .../HttpComponentsClientHttpResponse.java | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpResponse.java index 46f7395ee68a..90669b93d380 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpResponse.java @@ -46,9 +46,9 @@ */ class HttpComponentsClientHttpResponse implements ClientHttpResponse { - private final Message> message; + private final DataBufferFactory dataBufferFactory; - private final Flux dataBufferFlux; + private final Message> message; private final HttpClientContext context; @@ -59,16 +59,9 @@ public HttpComponentsClientHttpResponse(DataBufferFactory dataBufferFactory, Message> message, HttpClientContext context) { + this.dataBufferFactory = dataBufferFactory; this.message = message; this.context = context; - - this.dataBufferFlux = Flux.from(this.message.getBody()) - .doOnSubscribe(s -> { - if (!this.rejectSubscribers.compareAndSet(false, true)) { - throw new IllegalStateException("The client response body can only be consumed once."); - } - }) - .map(dataBufferFactory::wrap); } @@ -104,7 +97,13 @@ private long getMaxAgeSeconds(Cookie cookie) { @Override public Flux getBody() { - return this.dataBufferFlux; + return Flux.from(this.message.getBody()) + .doOnSubscribe(s -> { + if (!this.rejectSubscribers.compareAndSet(false, true)) { + throw new IllegalStateException("The client response body can only be consumed once."); + } + }) + .map(this.dataBufferFactory::wrap); } @Override From b5c2fa0e86c7ff78d6ef6b5001439db646672d91 Mon Sep 17 00:00:00 2001 From: martin-tarjanyi Date: Sun, 29 Mar 2020 22:55:02 +0200 Subject: [PATCH 11/12] Pass content encoding and type to ReactiveEntityProducer --- .../reactive/HttpComponentsClientHttpConnector.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpConnector.java index b5778ea75338..73c8aa41855a 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpConnector.java @@ -26,6 +26,7 @@ import org.apache.hc.client5.http.impl.async.HttpAsyncClients; import org.apache.hc.client5.http.protocol.HttpClientContext; import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ContentType; import org.apache.hc.core5.http.HttpResponse; import org.apache.hc.core5.http.Message; import org.apache.hc.core5.http.nio.support.BasicRequestProducer; @@ -38,6 +39,7 @@ import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.lang.Nullable; @@ -129,7 +131,15 @@ private ReactiveEntityProducer createReactiveEntityProducer(HttpComponentsClient return null; } - return new ReactiveEntityProducer(byteBufferFlux, request.getContentLength(), null, null); + String contentEncoding = request.getHeaders().getFirst(HttpHeaders.CONTENT_ENCODING); + + ContentType contentType = null; + + if (request.getHeaders().getContentType() != null) { + contentType = ContentType.parse(request.getHeaders().getContentType().toString()); + } + + return new ReactiveEntityProducer(byteBufferFlux, request.getContentLength(), contentType, contentEncoding); } From f578f6e1da44a12f4f4b274312be958e20d01e82 Mon Sep 17 00:00:00 2001 From: martin-tarjanyi Date: Thu, 2 Apr 2020 23:50:05 +0200 Subject: [PATCH 12/12] Code review changes --- .../HttpComponentsClientHttpConnector.java | 67 ++++++------------- .../HttpComponentsClientHttpRequest.java | 35 +++++++--- 2 files changed, 46 insertions(+), 56 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpConnector.java index 73c8aa41855a..c9b8e4064e0c 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpConnector.java @@ -18,30 +18,25 @@ import java.net.URI; import java.nio.ByteBuffer; +import java.util.function.BiFunction; import java.util.function.Function; -import java.util.function.Supplier; import org.apache.hc.client5.http.cookie.BasicCookieStore; import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.apache.hc.client5.http.impl.async.HttpAsyncClients; import org.apache.hc.client5.http.protocol.HttpClientContext; import org.apache.hc.core5.concurrent.FutureCallback; -import org.apache.hc.core5.http.ContentType; import org.apache.hc.core5.http.HttpResponse; import org.apache.hc.core5.http.Message; -import org.apache.hc.core5.http.nio.support.BasicRequestProducer; -import org.apache.hc.core5.reactive.ReactiveEntityProducer; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; import org.apache.hc.core5.reactive.ReactiveResponseConsumer; import org.reactivestreams.Publisher; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DefaultDataBufferFactory; -import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; -import org.springframework.lang.Nullable; /** * {@link ClientHttpConnector} implementation for the Apache HttpComponents HttpClient 5.x. @@ -54,9 +49,9 @@ public class HttpComponentsClientHttpConnector implements ClientHttpConnector { private final CloseableHttpAsyncClient client; - private final DataBufferFactory dataBufferFactory; + private final BiFunction contextProvider; - private final Supplier contextSupplier; + private DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory(); /** @@ -71,7 +66,7 @@ public HttpComponentsClientHttpConnector() { * @param client the client to use */ public HttpComponentsClientHttpConnector(CloseableHttpAsyncClient client) { - this(client, HttpClientContext::create); + this(client, (method, uri) -> HttpClientContext.create()); } /** @@ -79,23 +74,26 @@ public HttpComponentsClientHttpConnector(CloseableHttpAsyncClient client) { * and a {@link HttpClientContext} supplier lambda which is called before each request * and passed to the client. * @param client the client to use - * @param contextSupplier a {@link HttpClientContext} supplier + * @param contextProvider a {@link HttpClientContext} supplier */ public HttpComponentsClientHttpConnector(CloseableHttpAsyncClient client, - Supplier contextSupplier) { + BiFunction contextProvider) { - this.dataBufferFactory = new DefaultDataBufferFactory(); - this.contextSupplier = contextSupplier; + this.contextProvider = contextProvider; this.client = client; this.client.start(); } + public void setBufferFactory(DataBufferFactory bufferFactory) { + this.dataBufferFactory = bufferFactory; + } + @Override public Mono connect(HttpMethod method, URI uri, Function> requestCallback) { - HttpClientContext context = this.contextSupplier.get(); + HttpClientContext context = this.contextProvider.apply(method, uri); if (context.getCookieStore() == null) { context.setCookieStore(new BasicCookieStore()); @@ -108,51 +106,28 @@ public Mono connect(HttpMethod method, URI uri, } private Mono execute(HttpComponentsClientHttpRequest request, HttpClientContext context) { - Flux byteBufferFlux = request.getByteBufferFlux(); - - ReactiveEntityProducer reactiveEntityProducer = createReactiveEntityProducer(request, byteBufferFlux); - - BasicRequestProducer basicRequestProducer = new BasicRequestProducer(request.getHttpRequest(), - reactiveEntityProducer); + AsyncRequestProducer requestProducer = request.toRequestProducer(); return Mono.>>create(sink -> { ReactiveResponseConsumer reactiveResponseConsumer = - new ReactiveResponseConsumer(new MonoFutureCallbackAdapter<>(sink)); + new ReactiveResponseConsumer(new MonoFutureCallbackAdapter(sink)); - this.client.execute(basicRequestProducer, reactiveResponseConsumer, context, null); + this.client.execute(requestProducer, reactiveResponseConsumer, context, null); }).map(message -> new HttpComponentsClientHttpResponse(this.dataBufferFactory, message, context)); } - @Nullable - private ReactiveEntityProducer createReactiveEntityProducer(HttpComponentsClientHttpRequest request, - @Nullable Flux byteBufferFlux) { - - if (byteBufferFlux == null) { - return null; - } - - String contentEncoding = request.getHeaders().getFirst(HttpHeaders.CONTENT_ENCODING); - - ContentType contentType = null; - - if (request.getHeaders().getContentType() != null) { - contentType = ContentType.parse(request.getHeaders().getContentType().toString()); - } - - return new ReactiveEntityProducer(byteBufferFlux, request.getContentLength(), contentType, contentEncoding); - } - - private static class MonoFutureCallbackAdapter implements FutureCallback { + private static class MonoFutureCallbackAdapter + implements FutureCallback>> { - private final MonoSink sink; + private final MonoSink>> sink; - public MonoFutureCallbackAdapter(MonoSink sink) { + public MonoFutureCallbackAdapter(MonoSink>> sink) { this.sink = sink; } @Override - public void completed(T result) { + public void completed(Message> result) { this.sink.success(result); } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java index 116f1c62ca9d..5486a5362156 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java @@ -24,8 +24,12 @@ import org.apache.hc.client5.http.cookie.CookieStore; import org.apache.hc.client5.http.impl.cookie.BasicClientCookie; import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.core5.http.ContentType; import org.apache.hc.core5.http.HttpRequest; import org.apache.hc.core5.http.message.BasicHttpRequest; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.support.BasicRequestProducer; +import org.apache.hc.core5.reactive.ReactiveEntityProducer; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -34,6 +38,7 @@ import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; +import org.springframework.lang.Nullable; import static org.springframework.http.MediaType.ALL_VALUE; @@ -52,10 +57,9 @@ class HttpComponentsClientHttpRequest extends AbstractClientHttpRequest { private final HttpClientContext context; + @Nullable private Flux byteBufferFlux; - private long contentLength = -1; - public HttpComponentsClientHttpRequest(HttpMethod method, URI uri, HttpClientContext context, DataBufferFactory dataBufferFactory) { @@ -107,7 +111,6 @@ public Mono setComplete() { @Override protected void applyHeaders() { HttpHeaders headers = getHeaders(); - this.contentLength = headers.getContentLength(); headers.entrySet() .stream() @@ -138,15 +141,27 @@ protected void applyCookies() { }); } - public HttpRequest getHttpRequest() { - return this.httpRequest; - } + public AsyncRequestProducer toRequestProducer() { + ReactiveEntityProducer reactiveEntityProducer = createReactiveEntityProducer(); - public Flux getByteBufferFlux() { - return this.byteBufferFlux; + return new BasicRequestProducer(this.httpRequest, reactiveEntityProducer); } - public long getContentLength() { - return this.contentLength; + @Nullable + private ReactiveEntityProducer createReactiveEntityProducer() { + if (this.byteBufferFlux == null) { + return null; + } + + String contentEncoding = getHeaders().getFirst(HttpHeaders.CONTENT_ENCODING); + + ContentType contentType = null; + + if (getHeaders().getContentType() != null) { + contentType = ContentType.parse(getHeaders().getContentType().toString()); + } + + return new ReactiveEntityProducer(this.byteBufferFlux, getHeaders().getContentLength(), + contentType, contentEncoding); } }