diff --git a/spring-messaging/spring-messaging.gradle b/spring-messaging/spring-messaging.gradle index 934dc0bcea4b..7240f98c9822 100644 --- a/spring-messaging/spring-messaging.gradle +++ b/spring-messaging/spring-messaging.gradle @@ -18,7 +18,7 @@ dependencies { compile(project(":spring-core")) optional(project(":spring-context")) optional(project(":spring-oxm")) - optional("io.projectreactor.ipc:reactor-netty") + optional("io.projectreactor.ipc:reactor-netty:0.8.0.BUILD-SNAPSHOT") optional("org.eclipse.jetty.websocket:websocket-server:${jettyVersion}") { exclude group: "javax.servlet", module: "javax.servlet-api" } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 83cce81f0d8e..a78d2f97d8b6 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -39,11 +39,10 @@ import reactor.core.publisher.MonoProcessor; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; +import reactor.ipc.netty.Connection; import reactor.ipc.netty.FutureMono; -import reactor.ipc.netty.NettyContext; import reactor.ipc.netty.NettyInbound; import reactor.ipc.netty.NettyOutbound; -import reactor.ipc.netty.options.ClientOptions; import reactor.ipc.netty.resources.LoopResources; import reactor.ipc.netty.resources.PoolResources; import reactor.ipc.netty.tcp.TcpClient; @@ -98,57 +97,18 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ * @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec */ public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec

codec) { - this(builder -> builder.host(host).port(port), codec); - } - - /** - * Constructor with a {@link ClientOptions.Builder} that can be used to - * customize Reactor Netty client options. - * - *

Note: this constructor manages the lifecycle of the - * {@link TcpClient} and its underlying resources. Please do not customize - * any of the following options: - * {@link ClientOptions.Builder#channelGroup(ChannelGroup) ChannelGroup}, - * {@link ClientOptions.Builder#loopResources(LoopResources) LoopResources}, and - * {@link ClientOptions.Builder#poolResources(PoolResources) PoolResources}. - * You may set the {@link ClientOptions.Builder#disablePool() disablePool} - * option if you simply want to turn off pooling. - * - *

For full control over the initialization and lifecycle of the TcpClient, - * see {@link #ReactorNettyTcpClient(TcpClient, ReactorNettyCodec)}. - * - * @param optionsConsumer consumer to customize client options - * @param codec the code to use - * @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec - */ - public ReactorNettyTcpClient(Consumer> optionsConsumer, - ReactorNettyCodec

codec) { - - Assert.notNull(optionsConsumer, "Consumer is required"); + Assert.notNull(host, "host is required"); + Assert.notNull(port, "port is required"); Assert.notNull(codec, "ReactorNettyCodec is required"); this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); - - Consumer> builtInConsumer = builder -> { - - Assert.isTrue(!builder.isLoopAvailable() && !builder.isPoolAvailable(), - "The provided ClientOptions.Builder contains LoopResources and/or PoolResources. " + - "Please, use the constructor that accepts a TcpClient instance " + - "for full control over initialization and lifecycle."); - - builder.channelGroup(this.channelGroup); - builder.preferNative(false); - - this.loopResources = LoopResources.create("tcp-client-loop"); - builder.loopResources(this.loopResources); - - if (!builder.isPoolDisabled()) { - this.poolResources = PoolResources.elastic("tcp-client-pool"); - builder.poolResources(this.poolResources); - } - }; - - this.tcpClient = TcpClient.create(optionsConsumer.andThen(builtInConsumer)); + this.loopResources = LoopResources.create("tcp-client-loop"); + this.poolResources = PoolResources.elastic("tcp-client-pool"); + this.tcpClient = TcpClient.create(poolResources) + .host(host) + .port(port) + .runOn(loopResources, false) + .doOnConnected(c -> channelGroup.add(c.channel())); this.codec = codec; } @@ -181,7 +141,8 @@ public ListenableFuture connect(final TcpConnectionHandler

handler) { } Mono connectMono = this.tcpClient - .newHandler(new ReactorNettyHandler(handler)) + .handle(new ReactorNettyHandler(handler)) + .connect() .doOnError(handler::afterConnectFailure) .then(); @@ -201,11 +162,12 @@ public ListenableFuture connect(TcpConnectionHandler

handler, Reconnect MonoProcessor connectMono = MonoProcessor.create(); this.tcpClient - .newHandler(new ReactorNettyHandler(handler)) + .handle(new ReactorNettyHandler(handler)) + .connect() .doOnNext(updateConnectMono(connectMono)) .doOnError(updateConnectMono(connectMono)) .doOnError(handler::afterConnectFailure) // report all connect failures to the handler - .flatMap(NettyContext::onClose) // post-connect issues + .flatMap(Connection::onDispose) // post-connect issues .retryWhen(reconnectFunction(strategy)) .repeatWhen(reconnectFunction(strategy)) .subscribe(); @@ -302,14 +264,16 @@ private class ReactorNettyHandler implements BiFunction apply(NettyInbound inbound, NettyOutbound outbound) { - if (logger.isDebugEnabled()) { - logger.debug("Connected to " + inbound.remoteAddress()); - } + inbound.withConnection(c -> { + if (logger.isDebugEnabled()) { + logger.debug("Connected to " + c.address()); + } + }); DirectProcessor completion = DirectProcessor.create(); TcpConnection

connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion); scheduler.schedule(() -> connectionHandler.afterConnected(connection)); - inbound.context().addHandler(new StompMessageDecoder<>(codec)); + inbound.withConnection(c -> c.addHandler(new StompMessageDecoder<>(codec))); inbound.receiveObject() .cast(Message.class) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java index c4107f7e9edd..06befac982af 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java @@ -17,12 +17,10 @@ package org.springframework.messaging.tcp.reactor; import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelPipeline; import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Mono; import reactor.ipc.netty.NettyInbound; import reactor.ipc.netty.NettyOutbound; -import reactor.ipc.netty.NettyPipeline; import org.springframework.messaging.Message; import org.springframework.messaging.tcp.TcpConnection; @@ -66,20 +64,13 @@ public ListenableFuture send(Message

message) { @Override @SuppressWarnings("deprecation") public void onReadInactivity(Runnable runnable, long inactivityDuration) { - // TODO: workaround for https://github.com/reactor/reactor-netty/issues/22 - ChannelPipeline pipeline = this.inbound.context().channel().pipeline(); - String name = NettyPipeline.OnChannelReadIdle; - if (pipeline.context(name) != null) { - pipeline.remove(name); - } - - this.inbound.onReadIdle(inactivityDuration, runnable); + this.inbound.withConnection(c -> c.onReadIdle(inactivityDuration, runnable)); } @Override @SuppressWarnings("deprecation") public void onWriteInactivity(Runnable runnable, long inactivityDuration) { - this.outbound.onWriteIdle(inactivityDuration, runnable); + this.inbound.withConnection(c -> c.onWriteIdle(inactivityDuration, runnable)); } @Override diff --git a/spring-test/spring-test.gradle b/spring-test/spring-test.gradle index 71867d3d2b5d..8d7378e9eb5c 100644 --- a/spring-test/spring-test.gradle +++ b/spring-test/spring-test.gradle @@ -80,7 +80,7 @@ dependencies { testCompile("org.apache.httpcomponents:httpclient:4.5.5") { exclude group: "commons-logging", module: "commons-logging" } - testCompile('io.projectreactor.ipc:reactor-netty') + testCompile('io.projectreactor.ipc:reactor-netty:0.8.0.BUILD-SNAPSHOT') testCompile('de.bechte.junit:junit-hierarchicalcontextrunner:4.12.1') // Pull in the latest JUnit 5 Launcher API and the Vintage engine as well // so that we can run JUnit 4 tests in IntelliJ IDEA. diff --git a/spring-web/spring-web.gradle b/spring-web/spring-web.gradle index 7e687f69b1d1..6e6b39394f46 100644 --- a/spring-web/spring-web.gradle +++ b/spring-web/spring-web.gradle @@ -34,7 +34,7 @@ dependencies { optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}") optional("io.reactivex.rxjava2:rxjava:${rxjava2Version}") optional("io.netty:netty-all") - optional("io.projectreactor.ipc:reactor-netty") + optional("io.projectreactor.ipc:reactor-netty:0.8.0.BUILD-SNAPSHOT") optional("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}") optional("org.eclipse.jetty:jetty-server:${jettyVersion}") { exclude group: "javax.servlet", module: "javax.servlet-api" diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java index 2ee616b1facf..a955e3ee8ebb 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java @@ -17,18 +17,19 @@ package org.springframework.http.client.reactive; import java.net.URI; -import java.util.function.Consumer; import java.util.function.Function; import reactor.core.publisher.Mono; +import reactor.ipc.netty.NettyInbound; +import reactor.ipc.netty.NettyOutbound; import reactor.ipc.netty.http.client.HttpClient; -import reactor.ipc.netty.http.client.HttpClientOptions; import reactor.ipc.netty.http.client.HttpClientRequest; import reactor.ipc.netty.http.client.HttpClientResponse; -import reactor.ipc.netty.options.ClientOptions; import org.springframework.http.HttpMethod; +import io.netty.buffer.ByteBufAllocator; + /** * Reactor-Netty implementation of {@link ClientHttpConnector}. * @@ -43,20 +44,19 @@ public class ReactorClientHttpConnector implements ClientHttpConnector { /** * Create a Reactor Netty {@link ClientHttpConnector} - * with default {@link ClientOptions} and HTTP compression support enabled. + * with a default configuration and HTTP compression support enabled. */ public ReactorClientHttpConnector() { - this.httpClient = HttpClient.builder() - .options(options -> options.compression(true)) - .build(); + this.httpClient = HttpClient.prepare() + .compress(); } /** * Create a Reactor Netty {@link ClientHttpConnector} with the given - * {@link HttpClientOptions.Builder} + * {@link HttpClient} */ - public ReactorClientHttpConnector(Consumer clientOptions) { - this.httpClient = HttpClient.create(clientOptions); + public ReactorClientHttpConnector(HttpClient httpClient) { + this.httpClient = httpClient; } @@ -69,22 +69,24 @@ public Mono connect(HttpMethod method, URI uri, } return this.httpClient - .request(adaptHttpMethod(method), - uri.toString(), - request -> requestCallback.apply(adaptRequest(method, uri, request))) - .map(this::adaptResponse); + .request(adaptHttpMethod(method)) + .uri(uri.toString()) + .send((req, out) -> requestCallback.apply(adaptRequest(method, uri, req, out))) + .responseConnection((res, con) -> Mono.just(adaptResponse(res, con.inbound(), con.outbound().alloc()))) + .next(); } private io.netty.handler.codec.http.HttpMethod adaptHttpMethod(HttpMethod method) { return io.netty.handler.codec.http.HttpMethod.valueOf(method.name()); } - private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request) { - return new ReactorClientHttpRequest(method, uri, request); + private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request, NettyOutbound out) { + return new ReactorClientHttpRequest(method, uri, request, out); } - private ClientHttpResponse adaptResponse(HttpClientResponse response) { - return new ReactorClientHttpResponse(response); + private ClientHttpResponse adaptResponse(HttpClientResponse response, NettyInbound nettyInbound, + ByteBufAllocator alloc) { + return new ReactorClientHttpResponse(response, nettyInbound, alloc); } } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java index 3302bc79abc8..db5a301ffe2c 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java @@ -25,6 +25,7 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.ipc.netty.NettyOutbound; import reactor.ipc.netty.http.client.HttpClientRequest; import org.springframework.core.io.buffer.DataBuffer; @@ -48,15 +49,18 @@ class ReactorClientHttpRequest extends AbstractClientHttpRequest implements Zero private final HttpClientRequest httpRequest; + private final NettyOutbound out; + private final NettyDataBufferFactory bufferFactory; public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri, - HttpClientRequest httpRequest) { + HttpClientRequest httpRequest, NettyOutbound out) { this.httpMethod = httpMethod; this.uri = uri; - this.httpRequest = httpRequest.failOnClientError(false).failOnServerError(false); - this.bufferFactory = new NettyDataBufferFactory(httpRequest.alloc()); + this.httpRequest = httpRequest; + this.out = out; + this.bufferFactory = new NettyDataBufferFactory(out.alloc()); } @@ -77,14 +81,14 @@ public URI getURI() { @Override public Mono writeWith(Publisher body) { - return doCommit(() -> this.httpRequest + return doCommit(() -> this.out .send(Flux.from(body).map(NettyDataBufferFactory::toByteBuf)).then()); } @Override public Mono writeAndFlushWith(Publisher> body) { Publisher> byteBufs = Flux.from(body).map(ReactorClientHttpRequest::toByteBufs); - return doCommit(() -> this.httpRequest.sendGroups(byteBufs).then()); + return doCommit(() -> this.out.sendGroups(byteBufs).then()); } private static Publisher toByteBufs(Publisher dataBuffers) { @@ -93,12 +97,12 @@ private static Publisher toByteBufs(Publisher dat @Override public Mono writeWith(File file, long position, long count) { - return doCommit(() -> this.httpRequest.sendFile(file.toPath(), position, count).then()); + return doCommit(() -> this.out.sendFile(file.toPath(), position, count).then()); } @Override public Mono setComplete() { - return doCommit(() -> httpRequest.sendHeaders().then()); + return doCommit(() -> out.then()); } @Override diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java index adbf6b1b5f28..58ab0cee0270 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java @@ -19,6 +19,7 @@ import java.util.Collection; import reactor.core.publisher.Flux; +import reactor.ipc.netty.NettyInbound; import reactor.ipc.netty.http.client.HttpClientResponse; import org.springframework.core.io.buffer.DataBuffer; @@ -30,6 +31,8 @@ import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; +import io.netty.buffer.ByteBufAllocator; + /** * {@link ClientHttpResponse} implementation for the Reactor-Netty HTTP client. * @@ -43,16 +46,21 @@ class ReactorClientHttpResponse implements ClientHttpResponse { private final HttpClientResponse response; + private final NettyInbound nettyInbound; + - public ReactorClientHttpResponse(HttpClientResponse response) { + public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound nettyInbound, + ByteBufAllocator alloc) { this.response = response; - this.dataBufferFactory = new NettyDataBufferFactory(response.channel().alloc()); + this.nettyInbound = nettyInbound; + this.dataBufferFactory = new NettyDataBufferFactory(alloc); } @Override public Flux getBody() { - return response.receive() + return nettyInbound + .receive() .map(buf -> { buf.retain(); return dataBufferFactory.wrap(buf); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java index be5c6572fcde..26dd57e8c547 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java @@ -21,11 +21,11 @@ import java.net.URISyntaxException; import javax.net.ssl.SSLSession; -import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.ssl.SslHandler; import reactor.core.publisher.Flux; +import reactor.ipc.netty.Connection; import reactor.ipc.netty.http.server.HttpServerRequest; import org.springframework.core.io.buffer.DataBuffer; @@ -90,16 +90,14 @@ private static URI resolveBaseUrl(HttpServerRequest request) throws URISyntaxExc } } else { - InetSocketAddress localAddress = (InetSocketAddress) request.context().channel().localAddress(); + InetSocketAddress localAddress = request.hostAddress(); return new URI(scheme, null, localAddress.getHostString(), localAddress.getPort(), null, null, null); } } private static String getScheme(HttpServerRequest request) { - ChannelPipeline pipeline = request.context().channel().pipeline(); - boolean ssl = pipeline.get(SslHandler.class) != null; - return ssl ? "https" : "http"; + return request.scheme(); } private static String resolveRequestUri(HttpServerRequest request) { @@ -157,7 +155,7 @@ public InetSocketAddress getRemoteAddress() { @Nullable protected SslInfo initSslInfo() { - SslHandler sslHandler = this.request.context().channel().pipeline().get(SslHandler.class); + SslHandler sslHandler = ((Connection) this.request).channel().pipeline().get(SslHandler.class); if (sslHandler != null) { SSLSession session = sslHandler.engine().getSession(); return new DefaultSslInfo(session); diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpServer.java b/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpServer.java index 4ef630fe62e5..b31b493643db 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpServer.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpServer.java @@ -18,7 +18,7 @@ import java.util.concurrent.atomic.AtomicReference; -import reactor.ipc.netty.NettyContext; +import reactor.ipc.netty.DisposableServer; import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter; @@ -31,13 +31,15 @@ public class ReactorHttpServer extends AbstractHttpServer { private reactor.ipc.netty.http.server.HttpServer reactorServer; - private AtomicReference nettyContext = new AtomicReference<>(); + private AtomicReference disposableServer = new AtomicReference<>(); @Override protected void initServer() throws Exception { this.reactorHandler = createHttpHandlerAdapter(); - this.reactorServer = reactor.ipc.netty.http.server.HttpServer.create(getHost(), getPort()); + this.reactorServer = reactor.ipc.netty.http.server.HttpServer.create() + .tcpConfiguration(tcpServer -> tcpServer.host(getHost())) + .port(getPort()); } private ReactorHttpHandlerAdapter createHttpHandlerAdapter() { @@ -46,21 +48,21 @@ private ReactorHttpHandlerAdapter createHttpHandlerAdapter() { @Override protected void startInternal() { - NettyContext nettyContext = this.reactorServer.newHandler(this.reactorHandler).block(); - setPort(nettyContext.address().getPort()); - this.nettyContext.set(nettyContext); + DisposableServer disposableServer = this.reactorServer.handle(this.reactorHandler).bind().block(); + setPort(disposableServer.address().getPort()); + this.disposableServer.set(disposableServer); } @Override protected void stopInternal() { - this.nettyContext.get().dispose(); + this.disposableServer.get().dispose(); } @Override protected void resetInternal() { this.reactorServer = null; this.reactorHandler = null; - this.nettyContext.set(null); + this.disposableServer.set(null); } } diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpsServer.java b/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpsServer.java index b7ed8bd71165..b3801bbf821e 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpsServer.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpsServer.java @@ -18,7 +18,7 @@ import java.util.concurrent.atomic.AtomicReference; -import reactor.ipc.netty.NettyContext; +import reactor.ipc.netty.DisposableServer; import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter; @@ -31,15 +31,16 @@ public class ReactorHttpsServer extends AbstractHttpServer { private reactor.ipc.netty.http.server.HttpServer reactorServer; - private AtomicReference nettyContext = new AtomicReference<>(); + private AtomicReference disposableServer = new AtomicReference<>(); @Override protected void initServer() throws Exception { this.reactorHandler = createHttpHandlerAdapter(); - this.reactorServer = reactor.ipc.netty.http.server.HttpServer.create(builder -> { - builder.host(getHost()).port(getPort()).sslSelfSigned(); - }); + this.reactorServer = reactor.ipc.netty.http.server.HttpServer.create() + .tcpConfiguration(tcpServer -> tcpServer.host(getHost()) + .secure()) + .port(getPort()); } private ReactorHttpHandlerAdapter createHttpHandlerAdapter() { @@ -48,21 +49,21 @@ private ReactorHttpHandlerAdapter createHttpHandlerAdapter() { @Override protected void startInternal() { - NettyContext nettyContext = this.reactorServer.newHandler(this.reactorHandler).block(); - setPort(nettyContext.address().getPort()); - this.nettyContext.set(nettyContext); + DisposableServer disposableServer = this.reactorServer.handle(this.reactorHandler).bind().block(); + setPort(disposableServer.address().getPort()); + this.disposableServer.set(disposableServer); } @Override protected void stopInternal() { - this.nettyContext.get().dispose(); + this.disposableServer.get().dispose(); } @Override protected void resetInternal() { this.reactorServer = null; this.reactorHandler = null; - this.nettyContext.set(null); + this.disposableServer.set(null); } } diff --git a/spring-webflux/spring-webflux.gradle b/spring-webflux/spring-webflux.gradle index 9871bb1c71c0..ff793a6ee43e 100644 --- a/spring-webflux/spring-webflux.gradle +++ b/spring-webflux/spring-webflux.gradle @@ -28,7 +28,7 @@ dependencies { optional("com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${jackson2Version}") optional("io.reactivex:rxjava:${rxjavaVersion}") optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}") - optional("io.projectreactor.ipc:reactor-netty") + optional("io.projectreactor.ipc:reactor-netty:0.8.0.BUILD-SNAPSHOT") optional("org.apache.tomcat:tomcat-websocket:${tomcatVersion}") { exclude group: "org.apache.tomcat", module: "tomcat-websocket-api" exclude group: "org.apache.tomcat", module: "tomcat-servlet-api" diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java index 42fa2ec8d1ba..d77f90b78c44 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/ReactorNettyWebSocketClient.java @@ -22,8 +22,6 @@ import io.netty.buffer.ByteBufAllocator; import reactor.core.publisher.Mono; import reactor.ipc.netty.http.client.HttpClient; -import reactor.ipc.netty.http.client.HttpClientOptions; -import reactor.ipc.netty.http.client.HttpClientResponse; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.HttpHeaders; @@ -48,15 +46,15 @@ public class ReactorNettyWebSocketClient extends WebSocketClientSupport implemen * Default constructor. */ public ReactorNettyWebSocketClient() { - this(options -> {}); + this(HttpClient.prepare()); } /** * Constructor that accepts an {@link HttpClientOptions.Builder} consumer * to supply to {@link HttpClient#create(Consumer)}. */ - public ReactorNettyWebSocketClient(Consumer clientOptions) { - this.httpClient = HttpClient.create(clientOptions); + public ReactorNettyWebSocketClient(HttpClient httpClient) { + this.httpClient = httpClient; } @@ -78,29 +76,28 @@ public Mono execute(URI url, HttpHeaders headers, WebSocketHandler handler List protocols = beforeHandshake(url, headers, handler); return getHttpClient() - .ws(url.toString(), - nettyHeaders -> setNettyHeaders(headers, nettyHeaders), - StringUtils.collectionToCommaDelimitedString(protocols)) - .flatMap(response -> { - HandshakeInfo info = afterHandshake(url, toHttpHeaders(response)); - ByteBufAllocator allocator = response.channel().alloc(); + .headers(nettyHeaders -> setNettyHeaders(headers, nettyHeaders)) + .websocket(StringUtils.collectionToCommaDelimitedString(protocols)) + .uri(url.toString()) + .handle((in, out) -> { + HandshakeInfo info = afterHandshake(url, toHttpHeaders(in.headers())); + ByteBufAllocator allocator = out.alloc(); NettyDataBufferFactory factory = new NettyDataBufferFactory(allocator); - return response.receiveWebsocket((in, out) -> { - WebSocketSession session = new ReactorNettyWebSocketSession(in, out, info, factory); - return handler.handle(session); - }); - }); + WebSocketSession session = new ReactorNettyWebSocketSession(in, out, info, factory); + return handler.handle(session); + }) + .next(); } private void setNettyHeaders(HttpHeaders headers, io.netty.handler.codec.http.HttpHeaders nettyHeaders) { headers.forEach(nettyHeaders::set); } - private HttpHeaders toHttpHeaders(HttpClientResponse response) { + private HttpHeaders toHttpHeaders(io.netty.handler.codec.http.HttpHeaders responseHeaders) { HttpHeaders headers = new HttpHeaders(); - response.responseHeaders().forEach(entry -> { + responseHeaders.forEach(entry -> { String name = entry.getKey(); - headers.put(name, response.responseHeaders().getAll(name)); + headers.put(name, responseHeaders.getAll(name)); }); return headers; } diff --git a/spring-websocket/spring-websocket.gradle b/spring-websocket/spring-websocket.gradle index a0fe52d73175..ddd4119873a6 100644 --- a/spring-websocket/spring-websocket.gradle +++ b/spring-websocket/spring-websocket.gradle @@ -44,5 +44,5 @@ dependencies { optional("com.fasterxml.jackson.core:jackson-databind:${jackson2Version}") testCompile("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}") testCompile("org.apache.tomcat.embed:tomcat-embed-websocket:${tomcatVersion}") - testCompile("io.projectreactor.ipc:reactor-netty") + testCompile("io.projectreactor.ipc:reactor-netty:0.8.0.BUILD-SNAPSHOT") }