diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/WebsocketTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/WebsocketTest.java index 541e1db21..2c131e54e 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/WebsocketTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/WebsocketTest.java @@ -54,6 +54,7 @@ import reactor.core.scheduler.Schedulers; import reactor.netty.BaseHttpTest; import reactor.netty.Connection; +import reactor.netty.DisposableServer; import reactor.netty.channel.AbortedException; import reactor.netty.http.Http11SslContextSpec; import reactor.netty.http.Http2SslContextSpec; @@ -101,20 +102,8 @@ static void createSelfSignedCertificate() throws CertificateException { } void doSimpleTest(HttpServer server, HttpClient client) { - disposableServer = server.handle((in, out) -> out.sendWebsocket((i, o) -> o.sendString(Mono.just("test")))) - .bindNow(); - - List res = - client.headers(h -> h.add("Authorization", auth)) - .websocket() - .uri("/test") - .handle((i, o) -> i.receive().asString()) - .log("client") - .collectList() - .block(Duration.ofSeconds(5)); - - assertThat(res).isNotNull(); - assertThat(res.get(0)).isEqualTo("test"); + disposableServer = createDisposableServer(server, (i, o) -> o.sendString(Mono.just("test"))); + sendRequestWithCollect(client, (i, o) -> i.receive().asString(), "test"); } void doServerWebSocketFailed(HttpServer server, HttpClient client) { @@ -129,31 +118,19 @@ void doServerWebSocketFailed(HttpServer server, HttpClient client) { }) .bindNow(); - Mono res = - client.websocket() - .uri("/test") - .handle((in, out) -> in.receive().aggregate().asString()) - .next(); - - StepVerifier.create(res) - .expectError(WebSocketHandshakeException.class) - .verify(Duration.ofSeconds(30)); + client.websocket() + .uri("/test") + .handle((in, out) -> in.receive().aggregate().asString()) + .next() + .as(StepVerifier::create) + .expectError(WebSocketHandshakeException.class) + .verify(Duration.ofSeconds(30)); } void doUnidirectional(HttpServer server, HttpClient client) { int c = 10; - disposableServer = server.handle((in, out) -> out.sendWebsocket( - (i, o) -> o.sendString( - Mono.just("test") - .delayElement(Duration.ofMillis(100)) - .repeat()))) - .bindNow(); - - Flux ws = client.websocket() - .uri("/") - .handle((in, out) -> in.aggregateFrames() - .receive() - .asString()); + disposableServer = createDisposableServer(server, + (i, o) -> o.sendString(Mono.just("test").delayElement(Duration.ofMillis(100)).repeat())); List expected = Flux.range(1, c) @@ -162,11 +139,17 @@ void doUnidirectional(HttpServer server, HttpClient client) { .block(); assertThat(expected).isNotNull(); - StepVerifier.create(ws.take(c) - .log()) - .expectNextSequence(expected) - .expectComplete() - .verify(Duration.ofSeconds(30)); + client.websocket() + .uri("/") + .handle((in, out) -> in.aggregateFrames() + .receive() + .asString()) + .take(c) + .log() + .as(StepVerifier::create) + .expectNextSequence(expected) + .expectComplete() + .verify(Duration.ofSeconds(30)); } void doWebSocketRespondsToRequestsFromClients(HttpServer server, HttpClient client) { @@ -185,55 +168,45 @@ void doWebSocketRespondsToRequestsFromClients(HttpServer server, HttpClient clie .map(it -> it + '!') .log("server-reply"))); }) - .bindNow(Duration.ofSeconds(5)); - - Mono> response = - client.headers(h -> h.add("Content-Type", "text/plain") - .add("test", "test")) - .websocket() - .uri("/test/World") - .handle((i, o) -> { - o.sendString(Flux.range(1, 1000) - .log("client-send") - .map(it -> "" + it), Charset.defaultCharset()) - .then() - .subscribe(); - - return i.receive() - .asString(); - }) - .log("client-received") - .publishOn(Schedulers.parallel()) - .doOnNext(s -> clientRes.incrementAndGet()) - .take(1000) - .collectList() - .cache() - .doOnError(i -> System.err.println("Failed requesting server: " + i)); + .bindNow(Duration.ofSeconds(30)); log.debug("STARTING: server[" + serverRes.get() + "] / client[" + clientRes.get() + "]"); - StepVerifier.create(response) - .expectNextMatches(list -> "1000!".equals(list.get(999))) - .expectComplete() - .verify(Duration.ofSeconds(30)); - - log.debug("FINISHED: server[" + serverRes.get() + "] / client[" + clientRes + "]"); + client.headers(h -> h.add("Content-Type", "text/plain") + .add("test", "test")) + .websocket() + .uri("/test/World") + .handle((i, o) -> { + o.sendString(Flux.range(1, 1000) + .log("client-send") + .map(it -> "" + it), Charset.defaultCharset()) + .then() + .subscribe(); + + return i.receive() + .asString(); + }) + .log("client-received") + .publishOn(Schedulers.parallel()) + .doOnNext(s -> clientRes.incrementAndGet()) + .take(1000) + .collectList() + .cache() + .doOnError(i -> System.err.println("Failed requesting server: " + i)) + .as(StepVerifier::create) + .expectNextMatches(list -> "1000!".equals(list.get(999))) + .expectComplete() + .verify(Duration.ofSeconds(30)); + + log.debug("FINISHED: server[" + serverRes.get() + "] / client[" + clientRes.get() + "]"); } void doUnidirectionalBinary(HttpServer server, HttpClient client) { int c = 10; - disposableServer = server.handle((in, out) -> out.sendWebsocket( - (i, o) -> o.sendByteArray( - Mono.just("test".getBytes(Charset.defaultCharset())) - .delayElement(Duration.ofMillis(100)) - .repeat()))) - .bindNow(); - - Flux ws = client.websocket() - .uri("/test") - .handle((i, o) -> i.aggregateFrames() - .receive() - .asString()); + disposableServer = createDisposableServer(server, + (i, o) -> o.sendByteArray(Mono.just("test".getBytes(Charset.defaultCharset())) + .delayElement(Duration.ofMillis(100)) + .repeat())); List expected = Flux.range(1, c) @@ -242,11 +215,17 @@ void doUnidirectionalBinary(HttpServer server, HttpClient client) { .block(); assertThat(expected).isNotNull(); - StepVerifier.create(ws.take(c) - .log()) - .expectNextSequence(expected) - .expectComplete() - .verify(Duration.ofSeconds(30)); + client.websocket() + .uri("/test") + .handle((i, o) -> i.aggregateFrames() + .receive() + .asString()) + .take(c) + .log() + .as(StepVerifier::create) + .expectNextSequence(expected) + .expectComplete() + .verify(Duration.ofSeconds(30)); } void doDuplexEcho(HttpServer server, HttpClient client) throws Exception { @@ -254,13 +233,12 @@ void doDuplexEcho(HttpServer server, HttpClient client) throws Exception { CountDownLatch clientLatch = new CountDownLatch(c); CountDownLatch serverLatch = new CountDownLatch(c); - disposableServer = server.handle((in, out) -> out.sendWebsocket((i, o) -> o.sendString( - i.receive() - .asString() - .take(c) - .doOnNext(s -> serverLatch.countDown()) - .log("server")))) - .bindNow(); + disposableServer = createDisposableServer(server, + (i, o) -> o.sendString(i.receive() + .asString() + .take(c) + .doOnNext(s -> serverLatch.countDown()) + .log("server"))); Flux flux = Flux.interval(Duration.ofMillis(200)) .map(Object::toString); @@ -279,112 +257,57 @@ void doDuplexEcho(HttpServer server, HttpClient client) throws Exception { } void doSimpleSubProtocolServerNoSubProtocol(HttpServer server, HttpClient client, String errorMessage) { - disposableServer = server.handle((in, out) -> out.sendWebsocket((i, o) -> o.sendString(Mono.just("test")))) - .bindNow(); + disposableServer = createDisposableServer(server, (i, o) -> o.sendString(Mono.just("test"))); - StepVerifier.create( - client.headers(h -> h.add("Authorization", auth)) - .websocket(WebsocketClientSpec.builder().protocols("SUBPROTOCOL,OTHER").build()) - .uri("/test") - .handle((i, o) -> i.receive().asString())) - .verifyErrorMessage(errorMessage); + client.headers(h -> h.add("Authorization", auth)) + .websocket(WebsocketClientSpec.builder().protocols("SUBPROTOCOL,OTHER").build()) + .uri("/test") + .handle((i, o) -> i.receive().asString()) + .as(StepVerifier::create) + .verifyErrorMessage(errorMessage); } void doSimpleSubProtocolServerNotSupported(HttpServer server, HttpClient client, String errorMessage) { - disposableServer = server.handle((in, out) -> out.sendWebsocket( - (i, o) -> o.sendString(Mono.just("test")), - WebsocketServerSpec.builder().protocols("protoA,protoB").build())) - .bindNow(); + disposableServer = createDisposableServer(server, (i, o) -> o.sendString(Mono.just("test")), + WebsocketServerSpec.builder().protocols("protoA,protoB").build()); - StepVerifier.create( - client.headers(h -> h.add("Authorization", auth)) - .websocket(WebsocketClientSpec.builder().protocols("SUBPROTOCOL,OTHER").build()) - .uri("/test") - .handle((i, o) -> i.receive().asString())) - //the SERVER returned null which means that it couldn't select a protocol - .verifyErrorMessage(errorMessage); + client.headers(h -> h.add("Authorization", auth)) + .websocket(WebsocketClientSpec.builder().protocols("SUBPROTOCOL,OTHER").build()) + .uri("/test") + .handle((i, o) -> i.receive().asString()) + .as(StepVerifier::create) + //the SERVER returned null which means that it couldn't select a protocol + .verifyErrorMessage(errorMessage); } void doSimpleSubProtocolServerSupported(HttpServer server, HttpClient client) { - disposableServer = server.handle((in, out) -> out.sendWebsocket( - (i, o) -> o.sendString(Mono.just("test")), - WebsocketServerSpec.builder().protocols("SUBPROTOCOL").build())) - .bindNow(); - - List res = - client.headers(h -> h.add("Authorization", auth)) - .websocket(WebsocketClientSpec.builder().protocols("SUBPROTOCOL,OTHER").build()) - .uri("/test") - .handle((i, o) -> i.receive().asString()) - .log() - .collectList() - .block(Duration.ofSeconds(30)); - - assertThat(res).isNotNull(); - assertThat(res.get(0)).isEqualTo("test"); + disposableServer = createDisposableServer(server, (i, o) -> o.sendString(Mono.just("test")), + WebsocketServerSpec.builder().protocols("SUBPROTOCOL").build()); + sendRequestWithCollect(client, WebsocketClientSpec.builder().protocols("SUBPROTOCOL,OTHER").build(), + (i, o) -> i.receive().asString(), "test"); } void doSimpleSubProtocolSelected(HttpServer server, HttpClient client) { - disposableServer = server.handle((in, out) -> out.sendWebsocket( - (i, o) -> o.sendString( - Mono.just("SERVER:" + o.selectedSubprotocol())), - WebsocketServerSpec.builder().protocols("NOT, Common").build())) - .bindNow(); - - List res = - client.headers(h -> h.add("Authorization", auth)) - .websocket(WebsocketClientSpec.builder().protocols("Common,OTHER").build()) - .uri("/test") - .handle((in, out) -> in.receive() - .asString() - .map(srv -> "CLIENT:" + in.selectedSubprotocol() + "-" + srv)) - .log() - .collectList() - .block(Duration.ofSeconds(30)); - - assertThat(res).isNotNull(); - assertThat(res.get(0)).isEqualTo("CLIENT:Common-SERVER:Common"); + disposableServer = createDisposableServer(server, (i, o) -> o.sendString(Mono.just("SERVER:" + o.selectedSubprotocol())), + WebsocketServerSpec.builder().protocols("NOT, Common").build()); + sendRequestWithCollect(client, WebsocketClientSpec.builder().protocols("Common,OTHER").build(), + (in, out) -> in.receive().asString().map(srv -> "CLIENT:" + in.selectedSubprotocol() + "-" + srv), + "CLIENT:Common-SERVER:Common"); } void doNoSubProtocolSelected(HttpServer server, HttpClient client) { - disposableServer = server.handle((in, out) -> out.sendWebsocket((i, o) -> o.sendString( - Mono.just("SERVER:" + o.selectedSubprotocol())))) - .bindNow(); - - List res = - client.headers(h -> h.add("Authorization", auth)) - .websocket() - .uri("/test") - .handle((in, out) -> in.receive() - .asString() - .map(srv -> "CLIENT:" + in.selectedSubprotocol() + "-" + srv)) - .log() - .collectList() - .block(Duration.ofSeconds(30)); - - assertThat(res).isNotNull(); - assertThat(res.get(0)).isEqualTo("CLIENT:null-SERVER:null"); + disposableServer = createDisposableServer(server, (i, o) -> o.sendString(Mono.just("SERVER:" + o.selectedSubprotocol()))); + sendRequestWithCollect(client, + (in, out) -> in.receive().asString().map(srv -> "CLIENT:" + in.selectedSubprotocol() + "-" + srv), + "CLIENT:null-SERVER:null"); } void doAnySubProtocolSelectsFirstClientProvided(HttpServer server, HttpClient client) { - disposableServer = server.handle((in, out) -> out.sendWebsocket((i, o) -> o.sendString( - Mono.just("SERVER:" + o.selectedSubprotocol())), - WebsocketServerSpec.builder().protocols("proto2,*").build())) - .bindNow(); - - List res = - client.headers(h -> h.add("Authorization", auth)) - .websocket(WebsocketClientSpec.builder().protocols("proto1, proto2").build()) - .uri("/test") - .handle((in, out) -> in.receive() - .asString() - .map(srv -> "CLIENT:" + in.selectedSubprotocol() + "-" + srv)) - .log() - .collectList() - .block(Duration.ofSeconds(30)); - - assertThat(res).isNotNull(); - assertThat(res.get(0)).isEqualTo("CLIENT:proto1-SERVER:proto1"); + disposableServer = createDisposableServer(server, (i, o) -> o.sendString(Mono.just("SERVER:" + o.selectedSubprotocol())), + WebsocketServerSpec.builder().protocols("proto2,*").build()); + sendRequestWithCollect(client, WebsocketClientSpec.builder().protocols("proto1, proto2").build(), + (in, out) -> in.receive().asString().map(srv -> "CLIENT:" + in.selectedSubprotocol() + "-" + srv), + "CLIENT:proto1-SERVER:proto1"); } void doSendToWebsocketSubProtocol(HttpServer server, HttpClient client) throws InterruptedException { @@ -393,17 +316,16 @@ void doSendToWebsocketSubProtocol(HttpServer server, HttpClient client) throws I AtomicReference clientSelectedProtocolWhenSimplyUpgrading = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - disposableServer = server.handle((in, out) -> out.sendWebsocket( - (i, o) -> { - serverSelectedProtocol.set(i.selectedSubprotocol()); - latch.countDown(); - return i.receive() - .asString() - .doOnNext(System.err::println) - .then(); - }, - WebsocketServerSpec.builder().protocols("not,proto1").build())) - .bindNow(); + disposableServer = createDisposableServer(server, + (i, o) -> { + serverSelectedProtocol.set(i.selectedSubprotocol()); + latch.countDown(); + return i.receive() + .asString() + .doOnNext(System.err::println) + .then(); + }, + WebsocketServerSpec.builder().protocols("not,proto1").build()); client.headers(h -> h.add("Authorization", auth)) .websocket(WebsocketClientSpec.builder().protocols("proto1,proto2").build()) @@ -422,49 +344,45 @@ void doSendToWebsocketSubProtocol(HttpServer server, HttpClient client) throws I } void doTestMaxFramePayloadLengthFailed(HttpServer server, HttpClient client) { - disposableServer = server.handle((in, out) -> out.sendWebsocket((i, o) -> o.sendString(Mono.just("12345678901")))) - .bindNow(); - - Mono response = client.websocket(WebsocketClientSpec.builder().maxFramePayloadLength(10).build()) - .handle((in, out) -> in.receive() - .asString() - .map(srv -> srv)) - .log() - .then(); + disposableServer = createDisposableServer(server, (i, o) -> o.sendString(Mono.just("12345678901"))); - StepVerifier.create(response) - .expectError(CorruptedFrameException.class) - .verify(Duration.ofSeconds(30)); + client.websocket(WebsocketClientSpec.builder().maxFramePayloadLength(10).build()) + .handle((in, out) -> in.receive() + .asString() + .map(srv -> srv)) + .log() + .then() + .as(StepVerifier::create) + .expectError(CorruptedFrameException.class) + .verify(Duration.ofSeconds(30)); } void doTestMaxFramePayloadLengthSuccess(HttpServer server, HttpClient client) { - disposableServer = server.handle((in, out) -> out.sendWebsocket((i, o) -> o.sendString(Mono.just("12345678901")))) - .bindNow(); + disposableServer = createDisposableServer(server, (i, o) -> o.sendString(Mono.just("12345678901"))); - Mono response = client.websocket(WebsocketClientSpec.builder().maxFramePayloadLength(11).build()) - .handle((in, out) -> in.receive() - .asString() - .map(srv -> srv)) - .log() - .then(); - - StepVerifier.create(response) - .expectComplete() - .verify(Duration.ofSeconds(30)); + client.websocket(WebsocketClientSpec.builder().maxFramePayloadLength(11).build()) + .handle((in, out) -> in.receive() + .asString() + .map(srv -> srv)) + .log() + .then() + .as(StepVerifier::create) + .expectComplete() + .verify(Duration.ofSeconds(30)); } void doTestServerMaxFramePayloadLength(HttpServer server, HttpClient client, int maxFramePayloadLength, Flux input, Flux expectation, int count) { disposableServer = - server.handle((req, res) -> res.sendWebsocket((in, out) -> - out.sendObject(in.aggregateFrames() - .receiveFrames() - .map(WebSocketFrame::content) - .map(byteBuf -> - byteBuf.readCharSequence(byteBuf.readableBytes(), Charset.defaultCharset()).toString()) - .map(TextWebSocketFrame::new)), - WebsocketServerSpec.builder().maxFramePayloadLength(maxFramePayloadLength).build())) - .bindNow(); + createDisposableServer(server, + (in, out) -> + out.sendObject(in.aggregateFrames() + .receiveFrames() + .map(WebSocketFrame::content) + .map(byteBuf -> + byteBuf.readCharSequence(byteBuf.readableBytes(), Charset.defaultCharset()).toString()) + .map(TextWebSocketFrame::new)), + WebsocketServerSpec.builder().maxFramePayloadLength(maxFramePayloadLength).build()); AtomicReference> output = new AtomicReference<>(new ArrayList<>()); client.websocket() @@ -486,12 +404,8 @@ void doTestServerMaxFramePayloadLength(HttpServer server, HttpClient client, void doClosePool(HttpServer server, HttpClient client) { ConnectionProvider pr = ConnectionProvider.create("closePool", 1); - disposableServer = server.handle((in, out) -> out.sendWebsocket( - (i, o) -> o.sendString( - Mono.just("test") - .delayElement(Duration.ofMillis(100)) - .repeat()))) - .bindNow(); + disposableServer = createDisposableServer(server, + (i, o) -> o.sendString(Mono.just("test").delayElement(Duration.ofMillis(100)).repeat())); Flux ws = client.websocket() .uri("/") @@ -505,66 +419,49 @@ void doClosePool(HttpServer server, HttpClient client) { .block(); assertThat(expected).isNotNull(); - StepVerifier.create( - Flux.range(1, 10) - .concatMap(i -> ws.take(2) - .log())) - .expectNextSequence(expected) - .expectComplete() - .verify(Duration.ofSeconds(30)); + Flux.range(1, 10) + .concatMap(i -> ws.take(2) + .log()) + .as(StepVerifier::create) + .expectNextSequence(expected) + .expectComplete() + .verify(Duration.ofSeconds(30)); pr.dispose(); } void doTestCloseWebSocketFrameSentByServer(HttpServer server, HttpClient client) { - disposableServer = - server.handle((req, res) -> - res.sendWebsocket((in, out) -> out.sendObject(in.receiveFrames() - .doOnNext(WebSocketFrame::retain)))) - .bindNow(); - - Flux response = - client.websocket() - .uri("/") - .handle((in, out) -> out.sendString(Mono.just("echo")) - .sendObject(new CloseWebSocketFrame()) - .then() - .thenMany(in.receiveFrames())); - - StepVerifier.create(response) - .expectNextMatches(webSocketFrame -> - webSocketFrame instanceof TextWebSocketFrame && - "echo".equals(((TextWebSocketFrame) webSocketFrame).text())) - .expectComplete() - .verify(Duration.ofSeconds(30)); + disposableServer = createDisposableServer(server, + (in, out) -> out.sendObject(in.receiveFrames().doOnNext(WebSocketFrame::retain))); + + sendRequest(client, "/", + (in, out) -> out.sendString(Mono.just("echo")) + .sendObject(new CloseWebSocketFrame()) + .then() + .thenMany(in.receiveFrames()), + webSocketFrame -> webSocketFrame instanceof TextWebSocketFrame && + "echo".equals(((TextWebSocketFrame) webSocketFrame).text())); } void doTestCloseWebSocketFrameSentByClient(HttpServer server, HttpClient client) { - disposableServer = - server.handle((req, res) -> - res.sendWebsocket((in, out) -> out.sendString(Mono.just("echo")) - .sendObject(new CloseWebSocketFrame()))) - .bindNow(); + disposableServer = createDisposableServer(server, + (in, out) -> out.sendString(Mono.just("echo")).sendObject(new CloseWebSocketFrame())); - Mono response = - client.websocket() - .uri("/") - .handle((in, out) -> out.sendObject(in.receiveFrames() - .doOnNext(WebSocketFrame::retain) - .then())) - .next(); - - StepVerifier.create(response) - .expectComplete() - .verify(Duration.ofSeconds(30)); + client.websocket() + .uri("/") + .handle((in, out) -> out.sendObject(in.receiveFrames() + .doOnNext(WebSocketFrame::retain) + .then())) + .next() + .as(StepVerifier::create) + .expectComplete() + .verify(Duration.ofSeconds(30)); } void doTestConnectionAliveWhenTransformationErrors(HttpServer server, HttpClient client, BiFunction> handler, Flux expectation, int count) { - disposableServer = - server.handle((req, res) -> res.sendWebsocket(handler)) - .bindNow(); + disposableServer = createDisposableServer(server, handler); AtomicReference> output = new AtomicReference<>(new ArrayList<>()); client.websocket() @@ -585,12 +482,8 @@ void doTestConnectionAliveWhenTransformationErrors(HttpServer server, HttpClient } void doTestClientOnCloseIsInvokedClientSendClose(HttpServer server, HttpClient client) throws Exception { - disposableServer = - server.handle((req, res) -> - res.sendWebsocket((in, out) -> - out.sendString(Flux.interval(Duration.ofSeconds(1)) - .map(l -> l + "")))) - .bindNow(); + disposableServer = createDisposableServer(server, + (in, out) -> out.sendString(Flux.interval(Duration.ofSeconds(1)).map(l -> l + ""))); CountDownLatch latch = new CountDownLatch(3); AtomicBoolean error = new AtomicBoolean(); @@ -641,12 +534,8 @@ void doTestClientOnCloseIsInvokedClientSendClose(HttpServer server, HttpClient c } void doTestClientOnCloseIsInvokedClientDisposed(HttpServer server, HttpClient client) throws Exception { - disposableServer = - server.handle((req, res) -> - res.sendWebsocket((in, out) -> - out.sendString(Flux.interval(Duration.ofSeconds(1)) - .map(l -> l + "")))) - .bindNow(); + disposableServer = createDisposableServer(server, + (in, out) -> out.sendString(Flux.interval(Duration.ofSeconds(1)).map(l -> l + ""))); CountDownLatch latch = new CountDownLatch(3); AtomicBoolean error = new AtomicBoolean(); @@ -656,39 +545,39 @@ void doTestClientOnCloseIsInvokedClientDisposed(HttpServer server, HttpClient cl in.withConnection(conn -> { Mono.delay(Duration.ofSeconds(3)) .subscribe(c -> { - log.debug("context.dispose()"); - conn.dispose(); - latch.countDown(); + log.debug("context.dispose()"); + conn.dispose(); + latch.countDown(); }); conn.onDispose() - .subscribe( - c -> { // no-op - }, - t -> { - t.printStackTrace(); - error.set(true); - }, - () -> { - log.debug("context.onClose() completed"); - latch.countDown(); - }); - }); - Mono.delay(Duration.ofSeconds(3)) - .repeat(() -> { - AtomicBoolean disposed = new AtomicBoolean(false); - in.withConnection(conn -> { - disposed.set(conn.isDisposed()); - log.debug("context.isDisposed() " + conn.isDisposed()); - }); - if (disposed.get()) { + .subscribe( + c -> { // no-op + }, + t -> { + t.printStackTrace(); + error.set(true); + }, + () -> { + log.debug("context.onClose() completed"); latch.countDown(); - return false; - } - return true; - }) - .subscribe(); - return Mono.delay(Duration.ofSeconds(7)) - .then(); + }); + }); + Mono.delay(Duration.ofSeconds(3)) + .repeat(() -> { + AtomicBoolean disposed = new AtomicBoolean(false); + in.withConnection(conn -> { + disposed.set(conn.isDisposed()); + log.debug("context.isDisposed() " + conn.isDisposed()); + }); + if (disposed.get()) { + latch.countDown(); + return false; + } + return true; + }) + .subscribe(); + return Mono.delay(Duration.ofSeconds(7)) + .then(); }) .blockLast(Duration.ofSeconds(30)); @@ -698,9 +587,7 @@ void doTestClientOnCloseIsInvokedClientDisposed(HttpServer server, HttpClient cl } void doTestClientOnCloseIsInvokedServerInitiatedClose(HttpServer server, HttpClient client) throws Exception { - disposableServer = - server.handle((req, res) -> res.sendWebsocket((in, out) -> out.sendString(Mono.just("test")))) - .bindNow(); + disposableServer = createDisposableServer(server, (in, out) -> out.sendString(Mono.just("test"))); CountDownLatch latch = new CountDownLatch(2); AtomicBoolean error = new AtomicBoolean(); @@ -710,16 +597,16 @@ void doTestClientOnCloseIsInvokedServerInitiatedClose(HttpServer server, HttpCli in.withConnection(conn -> conn.onDispose() .subscribe( - c -> { // no-op - }, - t -> { - t.printStackTrace(); - error.set(true); - }, - () -> { - log.debug("context.onClose() completed"); - latch.countDown(); - })); + c -> { // no-op + }, + t -> { + t.printStackTrace(); + error.set(true); + }, + () -> { + log.debug("context.onClose() completed"); + latch.countDown(); + })); Mono.delay(Duration.ofSeconds(3)) .repeat(() -> { AtomicBoolean disposed = new AtomicBoolean(false); @@ -744,32 +631,26 @@ void doTestClientOnCloseIsInvokedServerInitiatedClose(HttpServer server, HttpCli } void doTestIssue460(HttpServer server, HttpClient client) { - disposableServer = - server.host("::1") - .handle((req, res) -> res.sendWebsocket((in, out) -> Mono.never())) - .bindNow(); + disposableServer = createDisposableServer(server.host("::1"), (in, out) -> Mono.never()); - HttpClient httpClient = client.headers(h -> h.add(HttpHeaderNames.HOST, "[::1")); - - StepVerifier.create(httpClient.websocket() - .connect()) - .expectError() - .verify(Duration.ofSeconds(30)); + client.headers(h -> h.add(HttpHeaderNames.HOST, "[::1")) + .websocket() + .connect() + .as(StepVerifier::create) + .expectError() + .verify(Duration.ofSeconds(30)); } void doTestIssue444(HttpServer server, HttpClient client, BiFunction> fn) { - disposableServer = - server.host("localhost") - .handle((req, res) -> res.sendWebsocket(fn)) - .bindNow(); + disposableServer = createDisposableServer(server.host("localhost"), fn); - StepVerifier.create( - client.websocket() - .uri("/") - .handle((i, o) -> i.receiveFrames() - .then())) - .expectComplete() - .verify(Duration.ofSeconds(30)); + client.websocket() + .uri("/") + .handle((i, o) -> i.receiveFrames() + .then()) + .as(StepVerifier::create) + .expectComplete() + .verify(Duration.ofSeconds(30)); } void doTestIssue821(HttpServer server, HttpClient client) throws Exception { @@ -781,26 +662,25 @@ void doTestIssue821(HttpServer server, HttpClient client) throws Exception { out.sendString(Mono.just("scheduled")) .then() .subscribe( - null, - t -> { - error.set(t); - latch.countDown(); - }, - null), + null, + t -> { + error.set(t); + latch.countDown(); + }, + null), 500, TimeUnit.MILLISECONDS); return out.sendString(Mono.just("test")); })) .bindNow(); - String res = - client.websocket() - .uri("/ws") - .receive() - .asString() - .blockLast(Duration.ofSeconds(5)); - - assertThat(res).isNotNull() - .isEqualTo("test"); + client.websocket() + .uri("/ws") + .receive() + .asString() + .as(StepVerifier::create) + .expectNext("test") + .expectComplete() + .verify(Duration.ofSeconds(30)); assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue(); @@ -813,36 +693,26 @@ void doTestIssue821(HttpServer server, HttpClient client) throws Exception { void doTestIssue900_1(HttpServer server, HttpClient client) throws Exception { AtomicReference statusClient = new AtomicReference<>(); - disposableServer = - server.handle((req, res) -> - res.sendWebsocket((in, out) -> out.sendObject(in.receiveFrames() - .doOnNext(WebSocketFrame::retain)))) - .bindNow(); + disposableServer = createDisposableServer(server, + (in, out) -> out.sendObject(in.receiveFrames().doOnNext(WebSocketFrame::retain))); CountDownLatch latch = new CountDownLatch(1); - Flux response = - client.websocket() - .uri("/") - .handle((in, out) -> { - in.receiveCloseStatus() - .doOnNext(o -> { - statusClient.set(o); - latch.countDown(); - }) - .subscribe(); - - return out.sendObject(Flux.just(new TextWebSocketFrame("echo"), - new CloseWebSocketFrame(1008, "something"))) - .then() - .thenMany(in.receiveFrames()); - }); - - StepVerifier.create(response) - .expectNextMatches(webSocketFrame -> - webSocketFrame instanceof TextWebSocketFrame && - "echo".equals(((TextWebSocketFrame) webSocketFrame).text())) - .expectComplete() - .verify(Duration.ofSeconds(30)); + sendRequest(client, "/", + (in, out) -> { + in.receiveCloseStatus() + .doOnNext(o -> { + statusClient.set(o); + latch.countDown(); + }) + .subscribe(); + + return out.sendObject(Flux.just(new TextWebSocketFrame("echo"), + new CloseWebSocketFrame(1008, "something"))) + .then() + .thenMany(in.receiveFrames()); + }, + webSocketFrame -> webSocketFrame instanceof TextWebSocketFrame && + "echo".equals(((TextWebSocketFrame) webSocketFrame).text())); assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue(); assertThat(statusClient.get()).isNotNull() @@ -854,28 +724,26 @@ void doTestIssue900_2(HttpServer server, HttpClient client) throws Exception { AtomicReference incomingData = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - disposableServer = - server.handle((req, res) -> - res.sendWebsocket((in, out) -> { - in.receiveCloseStatus() - .doOnNext(o -> { - statusServer.set(o); - latch.countDown(); - }) - .subscribe(); - - return out.sendObject(Flux.just(new TextWebSocketFrame("echo"), - new CloseWebSocketFrame(1008, "something")) - .delayElements(Duration.ofMillis(100))) - .then(in.receiveFrames() - .doOnNext(o -> { - if (o instanceof TextWebSocketFrame) { - incomingData.set(((TextWebSocketFrame) o).text()); - } - }) - .then()); - })) - .bindNow(); + disposableServer = createDisposableServer(server, + (in, out) -> { + in.receiveCloseStatus() + .doOnNext(o -> { + statusServer.set(o); + latch.countDown(); + }) + .subscribe(); + + return out.sendObject(Flux.just(new TextWebSocketFrame("echo"), + new CloseWebSocketFrame(1008, "something")) + .delayElements(Duration.ofMillis(100))) + .then(in.receiveFrames() + .doOnNext(o -> { + if (o instanceof TextWebSocketFrame) { + incomingData.set(((TextWebSocketFrame) o).text()); + } + }) + .then()); + }); client.websocket() .uri("/") @@ -894,20 +762,18 @@ void doTestIssue663_1(HttpServer server, HttpClient client) throws Exception { AtomicBoolean incomingData = new AtomicBoolean(); CountDownLatch latch = new CountDownLatch(1); - disposableServer = - server.handle((req, resp) -> - resp.sendWebsocket((i, o) -> - o.sendObject(Flux.just(new PingWebSocketFrame(), new CloseWebSocketFrame()) - .delayElements(Duration.ofMillis(100))) - .then(i.receiveFrames() - .doOnNext(f -> { - if (f instanceof PongWebSocketFrame) { - incomingData.set(true); - } - }) - .doOnComplete(latch::countDown) - .then()))) - .bindNow(); + disposableServer = createDisposableServer(server, + (i, o) -> + o.sendObject(Flux.just(new PingWebSocketFrame(), new CloseWebSocketFrame()) + .delayElements(Duration.ofMillis(100))) + .then(i.receiveFrames() + .doOnNext(f -> { + if (f instanceof PongWebSocketFrame) { + incomingData.set(true); + } + }) + .doOnComplete(latch::countDown) + .then())); client.websocket() .uri("/") @@ -922,16 +788,14 @@ void doTestIssue663_2(HttpServer server, HttpClient client) throws Exception { AtomicBoolean incomingData = new AtomicBoolean(); CountDownLatch latch = new CountDownLatch(1); - disposableServer = - server.handle((req, resp) -> - resp.sendWebsocket((i, o) -> - o.sendObject(Flux.just(new PingWebSocketFrame(), new CloseWebSocketFrame()) - .delayElements(Duration.ofMillis(100))) - .then(i.receiveFrames() - .doOnNext(f -> incomingData.set(true)) - .doOnComplete(latch::countDown) - .then()))) - .bindNow(); + disposableServer = createDisposableServer(server, + (i, o) -> + o.sendObject(Flux.just(new PingWebSocketFrame(), new CloseWebSocketFrame()) + .delayElements(Duration.ofMillis(100))) + .then(i.receiveFrames() + .doOnNext(f -> incomingData.set(true)) + .doOnComplete(latch::countDown) + .then())); client.websocket(WebsocketClientSpec.builder().handlePing(true).build()) .uri("/") @@ -946,9 +810,7 @@ void doTestIssue663_3(HttpServer server, HttpClient client) throws Exception { AtomicBoolean incomingData = new AtomicBoolean(); CountDownLatch latch = new CountDownLatch(1); - disposableServer = - server.handle((req, resp) -> resp.sendWebsocket((i, o) -> i.receiveFrames().then())) - .bindNow(); + disposableServer = createDisposableServer(server, (i, o) -> i.receiveFrames().then()); client.websocket() .uri("/") @@ -973,10 +835,9 @@ void doTestIssue663_4(HttpServer server, HttpClient client) throws Exception { AtomicBoolean incomingData = new AtomicBoolean(); CountDownLatch latch = new CountDownLatch(1); - disposableServer = - server.handle((req, resp) -> resp.sendWebsocket((i, o) -> i.receiveFrames().then(), - WebsocketServerSpec.builder().handlePing(true).build())) - .bindNow(); + disposableServer = createDisposableServer(server, + (i, o) -> i.receiveFrames().then(), + WebsocketServerSpec.builder().handlePing(true).build()); client.websocket() .uri("/") @@ -999,18 +860,16 @@ void doTestIssue967(HttpServer server, HttpClient client) throws Exception { .map(i -> Integer.toString(i)) .delayElements(Duration.ofMillis(50)); - disposableServer = - server.handle((req, res) -> - res.sendWebsocket((in, out) -> - Mono.when(out.sendString(somePublisher), - in.receiveFrames() - .cast(TextWebSocketFrame.class) - .map(TextWebSocketFrame::text) - .publish() // We want the connection alive even after takeUntil - .autoConnect() // which will trigger cancel - .takeUntil(msg -> msg.equals("5")) - .then()))) - .bindNow(); + disposableServer = createDisposableServer(server, + (in, out) -> + Mono.when(out.sendString(somePublisher), + in.receiveFrames() + .cast(TextWebSocketFrame.class) + .map(TextWebSocketFrame::text) + .publish() // We want the connection alive even after takeUntil + .autoConnect() // which will trigger cancel + .takeUntil(msg -> msg.equals("5")) + .then())); Flux toSend = Flux.range(1, 10) .map(i -> Integer.toString(i)); @@ -1042,10 +901,7 @@ void doTestWebsocketCompression(HttpServer server, HttpClient client, boolean co WebsocketServerSpec websocketServerSpec = clientServerNoContextTakeover ? serverBuilder.compressionAllowServerNoContext(true).compressionPreferredClientNoContext(true).build() : serverBuilder.build(); - disposableServer = - server.handle((req, res) -> - res.sendWebsocket((in, out) -> out.sendString(Mono.just("test")), websocketServerSpec)) - .bindNow(); + disposableServer = createDisposableServer(server, (in, out) -> out.sendString(Mono.just("test")), websocketServerSpec); AtomicBoolean clientHandler = new AtomicBoolean(); @@ -1067,12 +923,7 @@ void doTestWebsocketCompression(HttpServer server, HttpClient client, boolean co }; Predicate> predicate = t -> "test".equals(t.getT1()) && "null".equals(t.getT2()); - StepVerifier.create(client.websocket() - .uri("/") - .handle(receiver)) - .expectNextMatches(predicate) - .expectComplete() - .verify(Duration.ofSeconds(30)); + sendRequest(client, "/", receiver, predicate); assertThat(clientHandler.get()).isFalse(); if (compress) { @@ -1085,12 +936,7 @@ void doTestWebsocketCompression(HttpServer server, HttpClient client, boolean co WebsocketClientSpec websocketClientSpec = clientServerNoContextTakeover ? clientBuilder.compressionAllowClientNoContext(true).compressionRequestedServerNoContext(true).build() : clientBuilder.build(); - StepVerifier.create(client.websocket(websocketClientSpec) - .uri("/") - .handle(receiver)) - .expectNextMatches(predicate) - .expectComplete() - .verify(Duration.ofSeconds(30)); + sendRequest(client, websocketClientSpec, "/", receiver, predicate); assertThat(clientHandler.get()).isEqualTo(compress); } @@ -1099,18 +945,16 @@ void doTestIssue1485_CloseFrameSentByClient(HttpServer server, HttpClient client AtomicReference statusClient = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(2); - disposableServer = - server.handle((req, res) -> - res.sendWebsocket((in, out) -> { - in.receiveCloseStatus() - .doOnNext(status -> { - statusServer.set(status); - latch.countDown(); - }) - .subscribe(); - return in.receive().then(); - })) - .bindNow(); + disposableServer = createDisposableServer(server, + (in, out) -> { + in.receiveCloseStatus() + .doOnNext(status -> { + statusServer.set(status); + latch.countDown(); + }) + .subscribe(); + return in.receive().then(); + }); client.websocket() .uri("/") @@ -1124,7 +968,7 @@ void doTestIssue1485_CloseFrameSentByClient(HttpServer server, HttpClient client return out.sendObject(new CloseWebSocketFrame()) .then(in.receive().then()); }) - .blockLast(Duration.ofSeconds(5)); + .blockLast(Duration.ofSeconds(30)); assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); @@ -1142,19 +986,17 @@ void doTestIssue1485_CloseFrameSentByServer(HttpServer server, HttpClient client AtomicReference statusClient = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(2); - disposableServer = - server.handle((req, res) -> - res.sendWebsocket((in, out) -> { - in.receiveCloseStatus() - .doOnNext(status -> { - statusServer.set(status); - latch.countDown(); - }) - .subscribe(); - return out.sendObject(new CloseWebSocketFrame()) - .then(in.receive().then()); - })) - .bindNow(); + disposableServer = createDisposableServer(server, + (in, out) -> { + in.receiveCloseStatus() + .doOnNext(status -> { + statusServer.set(status); + latch.countDown(); + }) + .subscribe(); + return out.sendObject(new CloseWebSocketFrame()) + .then(in.receive().then()); + }); client.websocket() .uri("/") @@ -1167,7 +1009,7 @@ void doTestIssue1485_CloseFrameSentByServer(HttpServer server, HttpClient client .subscribe(); return in.receive(); }) - .blockLast(Duration.ofSeconds(5)); + .blockLast(Duration.ofSeconds(30)); assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); @@ -1200,18 +1042,17 @@ void doTestConnectionClosedWhenFailedUpgrade(HttpServer server, HttpClient clien void doTestIssue3295(HttpServer server, HttpClient client) throws Exception { AtomicReference serverError = new AtomicReference<>(); CountDownLatch serverLatch = new CountDownLatch(1); - disposableServer = - server.handle((req, res) -> res.sendWebsocket((in, out) -> - in.aggregateFrames(10) - .receiveFrames() - .doOnError(t -> { - serverError.set(t); - serverLatch.countDown(); - }) - .cast(BinaryWebSocketFrame.class) - .map(DefaultByteBufHolder::content) - .then())) - .bindNow(); + disposableServer = createDisposableServer(server, + (in, out) -> + in.aggregateFrames(10) + .receiveFrames() + .doOnError(t -> { + serverError.set(t); + serverLatch.countDown(); + }) + .cast(BinaryWebSocketFrame.class) + .map(DefaultByteBufHolder::content) + .then()); AtomicReference clientStatus = new AtomicReference<>(); AtomicReference connection = new AtomicReference<>(); @@ -1235,7 +1076,7 @@ void doTestIssue3295(HttpServer server, HttpClient client) throws Exception { new ContinuationWebSocketFrame(true, 0, Unpooled.wrappedBuffer(content3)))); }) .then() - .block(Duration.ofSeconds(5)); + .block(Duration.ofSeconds(30)); assertThat(serverLatch.await(5, TimeUnit.SECONDS)).isTrue(); assertThat(serverError.get()).isNotNull().isInstanceOf(TooLongFrameException.class); @@ -1245,4 +1086,58 @@ void doTestIssue3295(HttpServer server, HttpClient client) throws Exception { assertThat(connection.get()).isNotNull(); assertThat(connection.get().channel().isActive()).isFalse(); } + + private static DisposableServer createDisposableServer(HttpServer server, + BiFunction> handler) { + return createDisposableServer(server, handler, null); + } + + private static DisposableServer createDisposableServer(HttpServer server, + BiFunction> handler, + @Nullable WebsocketServerSpec spec) { + return spec == null ? + server.handle((in, out) -> out.sendWebsocket(handler)).bindNow() : + server.handle((in, out) -> out.sendWebsocket(handler, spec)).bindNow(); + } + + private static void sendRequest(HttpClient client, String uri, + BiFunction> receiver, + Predicate predicate) { + sendRequest(client, null, uri, receiver, predicate); + } + + private static void sendRequest(HttpClient client, @Nullable WebsocketClientSpec spec, String uri, + BiFunction> receiver, + Predicate predicate) { + HttpClient.WebsocketSender sender = spec == null ? client.websocket() : client.websocket(spec); + sender.uri(uri) + .handle(receiver) + .as(StepVerifier::create) + .expectNextMatches(predicate) + .expectComplete() + .verify(Duration.ofSeconds(30)); + } + + private static void sendRequestWithCollect(HttpClient client, + BiFunction> receiver, + String expectedBody) { + sendRequestWithCollect(client, null, receiver, expectedBody); + } + + private static void sendRequestWithCollect(HttpClient client, @Nullable WebsocketClientSpec spec, + BiFunction> receiver, + String expectedBody) { + HttpClient.WebsocketSender sender = spec == null ? + client.headers(h -> h.add("Authorization", auth)).websocket() : + client.headers(h -> h.add("Authorization", auth)).websocket(spec); + + sender.uri("/test") + .handle(receiver) + .log("client") + .collectList() + .as(StepVerifier::create) + .expectNextMatches(l -> expectedBody.equals(l.get(0))) + .expectComplete() + .verify(Duration.ofSeconds(30)); + } }