Skip to content

Commit 4b66d40

Browse files
[SignalR] [Java] Log 'WebSocket stopped' once (#43532)
1 parent 351eccf commit 4b66d40

File tree

7 files changed

+127
-76
lines changed

7 files changed

+127
-76
lines changed

src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/HubConnection.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -439,15 +439,14 @@ private Completable stop(String errorMessage) {
439439
this.state.unlock();
440440
}
441441

442-
Completable stopTask = startTask.onErrorComplete().andThen(Completable.defer(() ->
442+
CompletableSubject subject = CompletableSubject.create();
443+
startTask.onErrorComplete().subscribe(() ->
443444
{
444445
Completable stop = connectionState.transport.stop();
445-
stop.onErrorComplete().subscribe();
446-
return stop;
447-
}));
448-
stopTask.onErrorComplete().subscribe();
446+
stop.subscribe(() -> subject.onComplete(), e -> subject.onError(e));
447+
});
449448

450-
return stopTask;
449+
return subject;
451450
}
452451

453452
private void ReceiveLoop(ByteBuffer payload)

src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/WebSocketTransport.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,9 @@ public void setOnClose(TransportOnClosedCallback onCloseCallback) {
8282

8383
@Override
8484
public Completable stop() {
85-
return webSocketClient.stop().doOnEvent(t -> logger.info("WebSocket connection stopped."));
85+
Completable stop = webSocketClient.stop();
86+
stop.onErrorComplete().subscribe(() -> logger.info("WebSocket connection stopped."));
87+
return stop;
8688
}
8789

8890
void onClose(Integer code, String reason) {

src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/HubConnectionTest.java

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2797,14 +2797,11 @@ public void SkippingNegotiateDoesNotNegotiate() {
27972797
.create("http://example")
27982798
.withTransport(TransportEnum.WEBSOCKETS)
27992799
.shouldSkipNegotiate(true)
2800+
.withHandshakeResponseTimeout(1)
28002801
.withHttpClient(client)
28012802
.build();
28022803

2803-
try {
2804-
hubConnection.start().timeout(30, TimeUnit.SECONDS).blockingAwait();
2805-
} catch (Exception e) {
2806-
assertEquals("WebSockets isn't supported in testing currently.", e.getMessage());
2807-
}
2804+
assertThrows(RuntimeException.class, () -> hubConnection.start().timeout(30, TimeUnit.SECONDS).blockingAwait());
28082805
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
28092806
assertFalse(negotiateCalled.get());
28102807

@@ -3986,4 +3983,43 @@ public void keepAliveIntervalIsSetThroughBuilder()
39863983

39873984
assertEquals(interval, hubConnection.getKeepAliveInterval());
39883985
}
3986+
3987+
@Test
3988+
public void WebsocketStopLoggedOnce() {
3989+
try (TestLogger logger = new TestLogger(WebSocketTransport.class.getName())) {
3990+
AtomicBoolean negotiateCalled = new AtomicBoolean(false);
3991+
TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate?negotiateVersion=1",
3992+
(req) -> {
3993+
negotiateCalled.set(true);
3994+
return Single.just(new HttpResponse(200, "",
3995+
TestUtils.stringToByteBuffer("{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
3996+
+ "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}")));
3997+
});
3998+
3999+
HubConnection hubConnection = HubConnectionBuilder
4000+
.create("http://example")
4001+
.withTransport(TransportEnum.WEBSOCKETS)
4002+
.shouldSkipNegotiate(true)
4003+
.withHandshakeResponseTimeout(100)
4004+
.withHttpClient(client)
4005+
.build();
4006+
4007+
Completable startTask = hubConnection.start().timeout(30, TimeUnit.SECONDS);
4008+
hubConnection.stop().timeout(30, TimeUnit.SECONDS).blockingAwait();
4009+
4010+
assertThrows(RuntimeException.class, () -> startTask.blockingAwait());
4011+
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
4012+
assertFalse(negotiateCalled.get());
4013+
4014+
ILoggingEvent[] logs = logger.getLogs();
4015+
int count = 0;
4016+
for (ILoggingEvent iLoggingEvent : logs) {
4017+
if (iLoggingEvent.getFormattedMessage().startsWith("WebSocket connection stopped.")) {
4018+
count++;
4019+
}
4020+
}
4021+
4022+
assertEquals(1, count);
4023+
}
4024+
}
39894025
}

src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/TestHttpClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public TestHttpClient on(String method, String url, TestHttpRequestHandler handl
7070

7171
@Override
7272
public WebSocketWrapper createWebSocket(String url, Map<String, String> headers) {
73-
throw new RuntimeException("WebSockets isn't supported in testing currently.");
73+
return new TestWebSocketWrapper(url, headers);
7474
}
7575

7676
@Override

src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/TestLogger.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,15 @@ public void append(ILoggingEvent event) {
4343
this.logger.addAppender(this.appender);
4444
}
4545

46+
public ILoggingEvent[] getLogs() {
47+
lock.lock();
48+
try {
49+
return list.toArray(new ILoggingEvent[0]);
50+
} finally {
51+
lock.unlock();
52+
}
53+
}
54+
4655
public ILoggingEvent assertLog(String logMessage) {
4756
ILoggingEvent[] localList;
4857
lock.lock();
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package com.microsoft.signalr;
2+
3+
import java.nio.ByteBuffer;
4+
import java.util.Map;
5+
6+
import io.reactivex.rxjava3.core.Completable;
7+
import io.reactivex.rxjava3.core.Single;
8+
9+
class WebSocketTestHttpClient extends HttpClient {
10+
@Override
11+
public Single<HttpResponse> send(HttpRequest request) {
12+
return null;
13+
}
14+
15+
@Override
16+
public Single<HttpResponse> send(HttpRequest request, ByteBuffer body) {
17+
return null;
18+
}
19+
20+
@Override
21+
public WebSocketWrapper createWebSocket(String url, Map<String, String> headers) {
22+
return new TestWebSocketWrapper(url, headers);
23+
}
24+
25+
@Override
26+
public HttpClient cloneWithTimeOut(int timeoutInMilliseconds) {
27+
return null;
28+
}
29+
30+
@Override
31+
public void close() {
32+
}
33+
}
34+
35+
class TestWebSocketWrapper extends WebSocketWrapper {
36+
private WebSocketOnClosedCallback onClose;
37+
38+
public TestWebSocketWrapper(String url, Map<String, String> headers)
39+
{
40+
}
41+
42+
@Override
43+
public Completable start() {
44+
return Completable.complete();
45+
}
46+
47+
@Override
48+
public Completable stop() {
49+
if (onClose != null) {
50+
onClose.invoke(null, "");
51+
}
52+
return Completable.complete();
53+
}
54+
55+
@Override
56+
public Completable send(ByteBuffer message) {
57+
return Completable.complete();
58+
}
59+
60+
@Override
61+
public void setOnReceive(OnReceiveCallBack onReceive) {
62+
}
63+
64+
@Override
65+
public void setOnClose(WebSocketOnClosedCallback onClose) {
66+
this.onClose = onClose;
67+
}
68+
}

src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/WebSocketTransportTest.java

Lines changed: 0 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,11 @@
55

66
import static org.junit.jupiter.api.Assertions.*;
77

8-
import java.nio.ByteBuffer;
98
import java.util.HashMap;
10-
import java.util.Map;
11-
import java.util.concurrent.TimeUnit;
129
import java.util.concurrent.atomic.AtomicBoolean;
1310

1411
import org.junit.jupiter.api.Test;
1512

16-
import io.reactivex.rxjava3.core.Completable;
17-
import io.reactivex.rxjava3.core.Single;
18-
1913
class WebSocketTransportTest {
2014
@Test
2115
public void CanPassNullExitCodeToOnClosed() {
@@ -28,61 +22,4 @@ public void CanPassNullExitCodeToOnClosed() {
2822
transport.stop();
2923
assertTrue(closed.get());
3024
}
31-
32-
class WebSocketTestHttpClient extends HttpClient {
33-
@Override
34-
public Single<HttpResponse> send(HttpRequest request) {
35-
return null;
36-
}
37-
38-
@Override
39-
public Single<HttpResponse> send(HttpRequest request, ByteBuffer body) {
40-
return null;
41-
}
42-
43-
@Override
44-
public WebSocketWrapper createWebSocket(String url, Map<String, String> headers) {
45-
return new TestWrapper();
46-
}
47-
48-
@Override
49-
public HttpClient cloneWithTimeOut(int timeoutInMilliseconds) {
50-
return null;
51-
}
52-
53-
@Override
54-
public void close() {
55-
}
56-
}
57-
58-
class TestWrapper extends WebSocketWrapper {
59-
private WebSocketOnClosedCallback onClose;
60-
61-
@Override
62-
public Completable start() {
63-
return Completable.complete();
64-
}
65-
66-
@Override
67-
public Completable stop() {
68-
if (onClose != null) {
69-
onClose.invoke(null, "");
70-
}
71-
return Completable.complete();
72-
}
73-
74-
@Override
75-
public Completable send(ByteBuffer message) {
76-
return null;
77-
}
78-
79-
@Override
80-
public void setOnReceive(OnReceiveCallBack onReceive) {
81-
}
82-
83-
@Override
84-
public void setOnClose(WebSocketOnClosedCallback onClose) {
85-
this.onClose = onClose;
86-
}
87-
}
8825
}

0 commit comments

Comments
 (0)