Skip to content

Commit 9552c82

Browse files
committed
Remove use of simple executor in WebSocket clients
Issue: SPR-11580
1 parent 8aefcb9 commit 9552c82

File tree

4 files changed

+71
-23
lines changed

4 files changed

+71
-23
lines changed

spring-websocket/src/main/java/org/springframework/web/socket/client/jetty/JettyWebSocketClient.java

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,10 @@
2828
import org.eclipse.jetty.websocket.client.WebSocketClient;
2929
import org.springframework.context.SmartLifecycle;
3030
import org.springframework.core.task.AsyncListenableTaskExecutor;
31-
import org.springframework.core.task.SimpleAsyncTaskExecutor;
3231
import org.springframework.core.task.TaskExecutor;
3332
import org.springframework.http.HttpHeaders;
34-
import org.springframework.util.Assert;
3533
import org.springframework.util.concurrent.ListenableFuture;
34+
import org.springframework.util.concurrent.ListenableFutureTask;
3635
import org.springframework.web.socket.WebSocketHandler;
3736
import org.springframework.web.socket.WebSocketSession;
3837
import org.springframework.web.socket.adapter.jetty.JettyWebSocketHandlerAdapter;
@@ -60,32 +59,33 @@ public class JettyWebSocketClient extends AbstractWebSocketClient implements Sma
6059

6160
private final Object lifecycleMonitor = new Object();
6261

63-
private AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("WebSocketClient-");
62+
private AsyncListenableTaskExecutor taskExecutor;
6463

6564

6665
/**
6766
* Default constructor that creates an instance of
68-
* {@link org.eclipse.jetty.websocket.client.WebSocketClient} with default settings.
67+
* {@link org.eclipse.jetty.websocket.client.WebSocketClient}.
6968
*/
7069
public JettyWebSocketClient() {
7170
this.client = new org.eclipse.jetty.websocket.client.WebSocketClient();
7271
}
7372

7473
/**
75-
* Constructor that accepts a pre-configured {@link WebSocketClient}.
74+
* Constructor that accepts an existing
75+
* {@link org.eclipse.jetty.websocket.client.WebSocketClient} instance.
7676
*/
7777
public JettyWebSocketClient(WebSocketClient client) {
78-
super();
7978
this.client = client;
8079
}
8180

8281

8382
/**
84-
* Set a {@link TaskExecutor} to use to open the connection.
85-
* By default {@link SimpleAsyncTaskExecutor} is used.
83+
* Set an {@link AsyncListenableTaskExecutor} to use when opening connections.
84+
*
85+
* <p>If this property is not configured, calls to any of the
86+
* {@code doHandshake} methods will block until the connection is established.
8687
*/
8788
public void setTaskExecutor(AsyncListenableTaskExecutor taskExecutor) {
88-
Assert.notNull(taskExecutor, "TaskExecutor must not be null");
8989
this.taskExecutor = taskExecutor;
9090
}
9191

@@ -189,14 +189,23 @@ public ListenableFuture<WebSocketSession> doHandshakeInternal(WebSocketHandler w
189189
final JettyWebSocketSession wsSession = new JettyWebSocketSession(handshakeAttributes, user);
190190
final JettyWebSocketHandlerAdapter listener = new JettyWebSocketHandlerAdapter(wsHandler, wsSession);
191191

192-
return this.taskExecutor.submitListenable(new Callable<WebSocketSession>() {
192+
Callable<WebSocketSession> connectTask = new Callable<WebSocketSession>() {
193193
@Override
194194
public WebSocketSession call() throws Exception {
195195
Future<Session> future = client.connect(listener, uri, request);
196196
future.get();
197197
return wsSession;
198198
}
199-
});
199+
};
200+
201+
if (this.taskExecutor != null) {
202+
return this.taskExecutor.submitListenable(connectTask);
203+
}
204+
else {
205+
ListenableFutureTask<WebSocketSession> task = new ListenableFutureTask<WebSocketSession>(connectTask);
206+
task.run();
207+
return task;
208+
}
200209
}
201210

202211
/**

spring-websocket/src/main/java/org/springframework/web/socket/client/standard/StandardWebSocketClient.java

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@
3434
import javax.websocket.WebSocketContainer;
3535

3636
import org.springframework.core.task.AsyncListenableTaskExecutor;
37-
import org.springframework.core.task.SimpleAsyncTaskExecutor;
3837
import org.springframework.core.task.TaskExecutor;
3938
import org.springframework.http.HttpHeaders;
4039
import org.springframework.util.Assert;
4140
import org.springframework.util.concurrent.ListenableFuture;
41+
import org.springframework.util.concurrent.ListenableFutureTask;
4242
import org.springframework.web.socket.WebSocketExtension;
4343
import org.springframework.web.socket.WebSocketHandler;
4444
import org.springframework.web.socket.WebSocketSession;
@@ -58,22 +58,24 @@ public class StandardWebSocketClient extends AbstractWebSocketClient {
5858

5959
private final WebSocketContainer webSocketContainer;
6060

61-
private AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("WebSocketClient-");
61+
private AsyncListenableTaskExecutor taskExecutor;
6262

6363

6464
/**
6565
* Default constructor that calls {@code ContainerProvider.getWebSocketContainer()}
66-
* to obtain a {@link WebSocketContainer} instance.
66+
* to obtain a (new) {@link WebSocketContainer} instance. Also see constructor
67+
* accepting existing {@code WebSocketContainer} instance.
6768
*/
6869
public StandardWebSocketClient() {
6970
this.webSocketContainer = ContainerProvider.getWebSocketContainer();
7071
}
7172

7273
/**
73-
* Constructor that accepts a pre-configured {@link WebSocketContainer} instance.
74-
* If using XML configuration see {@link WebSocketContainerFactoryBean}. In Java
74+
* Constructor accepting an existing {@link WebSocketContainer} instance.
75+
*
76+
* <p>For XML configuration see {@link WebSocketContainerFactoryBean}. For Java
7577
* configuration use {@code ContainerProvider.getWebSocketContainer()} to obtain
76-
* a container instance.
78+
* the {@code WebSocketContainer} instance.
7779
*/
7880
public StandardWebSocketClient(WebSocketContainer webSocketContainer) {
7981
Assert.notNull(webSocketContainer, "WebSocketContainer must not be null");
@@ -82,11 +84,12 @@ public StandardWebSocketClient(WebSocketContainer webSocketContainer) {
8284

8385

8486
/**
85-
* Set a {@link TaskExecutor} to use to open the connection.
86-
* By default {@link SimpleAsyncTaskExecutor} is used.
87+
* Set an {@link AsyncListenableTaskExecutor} to use when opening connections.
88+
*
89+
* <p>If this property is not configured, calls to any of the
90+
* {@code doHandshake} methods will block until the connection is established.
8791
*/
8892
public void setTaskExecutor(AsyncListenableTaskExecutor taskExecutor) {
89-
Assert.notNull(taskExecutor, "TaskExecutor must not be null");
9093
this.taskExecutor = taskExecutor;
9194
}
9295

@@ -116,13 +119,22 @@ protected ListenableFuture<WebSocketSession> doHandshakeInternal(WebSocketHandle
116119
configBuidler.extensions(adaptExtensions(extensions));
117120
final Endpoint endpoint = new StandardWebSocketHandlerAdapter(webSocketHandler, session);
118121

119-
return this.taskExecutor.submitListenable(new Callable<WebSocketSession>() {
122+
Callable<WebSocketSession> connectTask = new Callable<WebSocketSession>() {
120123
@Override
121124
public WebSocketSession call() throws Exception {
122125
webSocketContainer.connectToServer(endpoint, configBuidler.build(), uri);
123126
return session;
124127
}
125-
});
128+
};
129+
130+
if (this.taskExecutor != null) {
131+
return this.taskExecutor.submitListenable(connectTask);
132+
}
133+
else {
134+
ListenableFutureTask<WebSocketSession> task = new ListenableFutureTask<WebSocketSession>(connectTask);
135+
task.run();
136+
return task;
137+
}
126138
}
127139

128140
private static List<Extension> adaptExtensions(List<WebSocketExtension> extensions) {

spring-websocket/src/test/java/org/springframework/web/socket/client/jetty/JettyWebSocketClientTests.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.junit.After;
2929
import org.junit.Before;
3030
import org.junit.Test;
31+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
3132
import org.springframework.util.CollectionUtils;
3233
import org.springframework.util.SocketUtils;
3334
import org.springframework.web.socket.WebSocketHandler;
@@ -88,6 +89,19 @@ public void doHandshake() throws Exception {
8889
assertEquals("echo", this.wsSession.getAcceptedProtocol());
8990
}
9091

92+
@Test
93+
public void doHandshakeWithTaskExecutor() throws Exception {
94+
95+
WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
96+
headers.setSecWebSocketProtocol(Arrays.asList("echo"));
97+
98+
this.client.setTaskExecutor(new SimpleAsyncTaskExecutor());
99+
this.wsSession = this.client.doHandshake(new TextWebSocketHandler(), headers, new URI(this.wsUrl)).get();
100+
101+
assertEquals(this.wsUrl, this.wsSession.getUri().toString());
102+
assertEquals("echo", this.wsSession.getAcceptedProtocol());
103+
}
104+
91105

92106
private static class TestJettyWebSocketServer {
93107

spring-websocket/src/test/java/org/springframework/web/socket/client/standard/StandardWebSocketClientTests.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2013 the original author or authors.
2+
* Copyright 2002-2014 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,6 +30,9 @@
3030
import org.junit.Test;
3131
import org.mockito.ArgumentCaptor;
3232

33+
import org.springframework.core.task.AsyncListenableTaskExecutor;
34+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
35+
import org.springframework.core.task.TaskExecutor;
3336
import org.springframework.web.socket.WebSocketHandler;
3437
import org.springframework.web.socket.WebSocketSession;
3538
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
@@ -134,4 +137,14 @@ public void headersClientEndpointConfigurator() throws Exception {
134137
assertEquals(Collections.singletonMap("foo", Arrays.asList("bar")), map);
135138
}
136139

140+
@Test
141+
public void taskExecutor() throws Exception {
142+
143+
URI uri = new URI("ws://example.com/abc");
144+
this.wsClient.setTaskExecutor(new SimpleAsyncTaskExecutor());
145+
WebSocketSession session = this.wsClient.doHandshake(this.wsHandler, this.headers, uri).get();
146+
147+
assertNotNull(session);
148+
}
149+
137150
}

0 commit comments

Comments
 (0)