Skip to content

Commit

Permalink
Introduce ReactorNettyClientRequestFactory
Browse files Browse the repository at this point in the history
This commit introduces an implementation of ClientHttpRequestFactory
based on Reactor Netty's HttpClient.

Closes gh-30835
  • Loading branch information
poutsma committed Jul 7, 2023
1 parent d720d6b commit 20dd66c
Show file tree
Hide file tree
Showing 6 changed files with 436 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.http.client;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.netty.NettyOutbound;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientRequest;

import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.lang.Nullable;
import org.springframework.util.StreamUtils;

/**
* {@link ClientHttpRequest} implementation for the Reactor-Netty HTTP client.
* Created via the {@link ReactorNettyClientRequestFactory}.
*
* @author Arjen Poutsma
* @since 6.1
*/
final class ReactorNettyClientRequest extends AbstractStreamingClientHttpRequest {

private final HttpClient httpClient;

private final HttpMethod method;

private final URI uri;

private final Duration exchangeTimeout;

private final Duration readTimeout;


public ReactorNettyClientRequest(HttpClient httpClient, URI uri, HttpMethod method,
Duration exchangeTimeout, Duration readTimeout) {

this.httpClient = httpClient;
this.method = method;
this.uri = uri;
this.exchangeTimeout = exchangeTimeout;
this.readTimeout = readTimeout;
}


@Override
public HttpMethod getMethod() {
return this.method;
}

@Override
public URI getURI() {
return this.uri;
}


@Override
protected ClientHttpResponse executeInternal(HttpHeaders headers, @Nullable Body body) throws IOException {
HttpClient.RequestSender requestSender = this.httpClient
.request(io.netty.handler.codec.http.HttpMethod.valueOf(this.method.name()));

requestSender = (this.uri.isAbsolute() ? requestSender.uri(this.uri) : requestSender.uri(this.uri.toString()));

try {
ReactorNettyClientResponse result = requestSender.send((reactorRequest, nettyOutbound) ->
send(headers, body, reactorRequest, nettyOutbound))
.responseConnection((reactorResponse, connection) ->
Mono.just(new ReactorNettyClientResponse(reactorResponse, connection, this.readTimeout)))
.next()
.block(this.exchangeTimeout);

if (result == null) {
throw new IOException("HTTP exchange resulted in no result");
}
else {
return result;
}
}
catch (RuntimeException ex) { // Exceptions.ReactiveException is package private
Throwable cause = ex.getCause();

if (cause instanceof UncheckedIOException uioEx) {
throw uioEx.getCause();
}
else if (cause instanceof IOException ioEx) {
throw ioEx;
}
else {
throw ex;
}
}
}

private Publisher<Void> send(HttpHeaders headers, @Nullable Body body,
HttpClientRequest reactorRequest, NettyOutbound nettyOutbound) {

headers.forEach((key, value) -> reactorRequest.requestHeaders().set(key, value));

if (body != null) {
AtomicReference<Executor> executor = new AtomicReference<>();

return nettyOutbound
.withConnection(connection -> executor.set(connection.channel().eventLoop()))
.send(FlowAdapters.toPublisher(OutputStreamPublisher.create(
outputStream -> body.writeTo(StreamUtils.nonClosing(outputStream)),
new ByteBufMapper(nettyOutbound.alloc()),
executor.getAndSet(null))));
}
else {
return nettyOutbound;
}
}


private static final class ByteBufMapper implements OutputStreamPublisher.ByteMapper<ByteBuf> {

private final ByteBufAllocator allocator;


public ByteBufMapper(ByteBufAllocator allocator) {
this.allocator = allocator;
}


@Override
public ByteBuf map(int b) {
ByteBuf byteBuf = this.allocator.buffer(1);
byteBuf.writeByte(b);
return byteBuf;
}

@Override
public ByteBuf map(byte[] b, int off, int len) {
ByteBuf byteBuf = this.allocator.buffer(len);
byteBuf.writeBytes(b, off, len);
return byteBuf;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.http.client;

import java.io.IOException;
import java.net.URI;
import java.time.Duration;

import io.netty.channel.ChannelOption;
import reactor.netty.http.client.HttpClient;

import org.springframework.http.HttpMethod;
import org.springframework.util.Assert;

/**
* Reactor-Netty implementation of {@link ClientHttpRequestFactory}.
*
* @author Arjen Poutsma
* @since 6.1
*/
public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactory {

private final HttpClient httpClient;


private Duration exchangeTimeout = Duration.ofSeconds(5);

private Duration readTimeout = Duration.ofSeconds(10);



/**
* Create a new instance of the {@code ReactorNettyClientRequestFactory}
* with a default {@link HttpClient} that has compression enabled.
*/
public ReactorNettyClientRequestFactory() {
this(HttpClient.create().compress(true));
}

/**
* Create a new instance of the {@code ReactorNettyClientRequestFactory}
* based on the given {@link HttpClient}.
* @param httpClient the client to base on
*/
public ReactorNettyClientRequestFactory(HttpClient httpClient) {
Assert.notNull(httpClient, "HttpClient must not be null");
this.httpClient = httpClient;
}

/**
* Set the underlying connect timeout in milliseconds.
* A value of 0 specifies an infinite timeout.
* <p>Default is 30 seconds.
* @see HttpClient#option(ChannelOption, Object)
* @see ChannelOption#CONNECT_TIMEOUT_MILLIS
*/
public void setConnectTimeout(int connectTimeout) {
Assert.isTrue(connectTimeout >= 0, "Timeout must be a non-negative value");
this.httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
}

/**
* Set the underlying connect timeout in milliseconds.
* A value of 0 specifies an infinite timeout.
* <p>Default is 30 seconds.
* @see HttpClient#option(ChannelOption, Object)
* @see ChannelOption#CONNECT_TIMEOUT_MILLIS
*/
public void setConnectTimeout(Duration connectTimeout) {
Assert.notNull(connectTimeout, "ConnectTimeout must not be null");
Assert.isTrue(!connectTimeout.isNegative(), "Timeout must be a non-negative value");
this.httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int)connectTimeout.toMillis());
}

/**
* Set the underlying read timeout in milliseconds.
* <p>Default is 10 seconds.
*/
public void setReadTimeout(long readTimeout) {
Assert.isTrue(readTimeout > 0, "Timeout must be a positive value");
this.readTimeout = Duration.ofMillis(readTimeout);
}

/**
* Set the underlying read timeout as {@code Duration}.
* <p>Default is 10 seconds.
*/
public void setReadTimeout(Duration readTimeout) {
Assert.notNull(readTimeout, "ReadTimeout must not be null");
Assert.isTrue(!readTimeout.isNegative(), "Timeout must be a non-negative value");
this.readTimeout = readTimeout;
}

/**
* Set the timeout for the HTTP exchange in milliseconds.
* <p>Default is 30 seconds.
*/
public void setExchangeTimeout(long exchangeTimeout) {
Assert.isTrue(exchangeTimeout > 0, "Timeout must be a positive value");
this.exchangeTimeout = Duration.ofMillis(exchangeTimeout);
}

/**
* Set the timeout for the HTTP exchange.
* <p>Default is 30 seconds.
*/
public void setExchangeTimeout(Duration exchangeTimeout) {
Assert.notNull(exchangeTimeout, "ExchangeTimeout must not be null");
Assert.isTrue(!exchangeTimeout.isNegative(), "Timeout must be a non-negative value");
this.exchangeTimeout = exchangeTimeout;
}



@Override
public ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod) throws IOException {
return new ReactorNettyClientRequest(this.httpClient, uri, httpMethod, this.exchangeTimeout, this.readTimeout);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.http.client;

import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;

import reactor.netty.Connection;
import reactor.netty.http.client.HttpClientResponse;

import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatusCode;
import org.springframework.http.support.Netty4HeadersAdapter;
import org.springframework.lang.Nullable;

/**
* {@link ClientHttpResponse} implementation for the Reactor-Netty HTTP client.
*
* @author Arjen Poutsma
* @since 6.1
*/
final class ReactorNettyClientResponse implements ClientHttpResponse {

private final HttpClientResponse response;

private final Connection connection;

private final HttpHeaders headers;

private final Duration readTimeout;

@Nullable
private volatile InputStream body;



public ReactorNettyClientResponse(HttpClientResponse response, Connection connection, Duration readTimeout) {
this.response = response;
this.connection = connection;
this.readTimeout = readTimeout;
this.headers = HttpHeaders.readOnlyHttpHeaders(new Netty4HeadersAdapter(response.responseHeaders()));
}

@Override
public HttpStatusCode getStatusCode() {
return HttpStatusCode.valueOf(this.response.status().code());
}

@Override
public String getStatusText() {
return this.response.status().reasonPhrase();
}

@Override
public HttpHeaders getHeaders() {
return this.headers;
}

@Override
public InputStream getBody() throws IOException {
if (this.body == null) {
InputStream body = this.connection.inbound().receive()
.aggregate().asInputStream().block(this.readTimeout);
if (body != null) {
this.body = body;
}
else {
throw new IOException("Could not receive body");
}
}
return this.body;
}

@Override
public void close() {
this.connection.dispose();
}
}
Loading

0 comments on commit 20dd66c

Please sign in to comment.