From 60628f90e81454af72a62ac8a56efce265bbcf7d Mon Sep 17 00:00:00 2001 From: Edgar Espina Date: Mon, 20 Mar 2017 09:32:19 -0300 Subject: [PATCH] Send message to all connected web socket clients fix #645 --- .../test/java/org/jooby/issues/Issue645.java | 84 ++++++++++++++ jooby/src/main/java/org/jooby/WebSocket.java | 52 +++++++++ .../org/jooby/internal/WebSocketImpl.java | 45 ++++++-- .../test/java/org/jooby/WebSocketTest.java | 94 ++++++++++++++++ .../org/jooby/internal/WebSocketImplTest.java | 104 ++++++++++++++++++ md/web-sockets.md | 6 + 6 files changed, 375 insertions(+), 10 deletions(-) create mode 100644 coverage-report/src/test/java/org/jooby/issues/Issue645.java diff --git a/coverage-report/src/test/java/org/jooby/issues/Issue645.java b/coverage-report/src/test/java/org/jooby/issues/Issue645.java new file mode 100644 index 0000000000..747c249173 --- /dev/null +++ b/coverage-report/src/test/java/org/jooby/issues/Issue645.java @@ -0,0 +1,84 @@ +package org.jooby.issues; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.jooby.test.ServerFeature; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.ning.http.client.AsyncHttpClient; +import com.ning.http.client.AsyncHttpClientConfig; +import com.ning.http.client.ws.WebSocket; +import com.ning.http.client.ws.WebSocketTextListener; +import com.ning.http.client.ws.WebSocketUpgradeHandler; + +public class Issue645 extends ServerFeature { + + { + ws("/ws", (ws) -> { + + ws.onMessage(message -> { + System.out.println(Thread.currentThread()); + ws.broadcast("=" + message.value(), () -> { + System.out.println(Thread.currentThread()); + ws.close(); + }); + }); + }); + + } + + private AsyncHttpClient client; + + @Before + public void before() { + client = new AsyncHttpClient(new AsyncHttpClientConfig.Builder().build()); + } + + @After + public void after() { + client.close(); + } + + @Test + public void sendText() throws Exception { + LinkedList messages = new LinkedList<>(); + + CountDownLatch latch = new CountDownLatch(1); + + client.prepareGet(ws("ws").toString()) + .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener( + new WebSocketTextListener() { + + @Override + public void onMessage(final String message) { + messages.add(message); + latch.countDown(); + } + + @Override + public void onOpen(final WebSocket websocket) { + websocket.sendMessage("hey!"); + } + + @Override + public void onClose(final WebSocket websocket) { + } + + @Override + public void onError(final Throwable t) { + } + }).build()) + .get(); + if (latch.await(1L, TimeUnit.SECONDS)) { + assertEquals(Arrays.asList("=hey!"), messages); + } + } + +} diff --git a/jooby/src/main/java/org/jooby/WebSocket.java b/jooby/src/main/java/org/jooby/WebSocket.java index 8ffa10257b..16904499db 100644 --- a/jooby/src/main/java/org/jooby/WebSocket.java +++ b/jooby/src/main/java/org/jooby/WebSocket.java @@ -734,4 +734,56 @@ default void send(final Object data, final OnError err) throws Exception { */ void send(Object data, SuccessCallback success, OnError err) throws Exception; + /** + * Send data to all connected sessions. + * + * If the web socket is closed this method throw an {@link Err} with {@link #NORMAL} close status. + * + * @param data Data to send. + * @throws Exception If something goes wrong. + */ + default void broadcast(final Object data) throws Exception { + broadcast(data, SUCCESS, ERR); + } + + /** + * Send data to all connected sessions. + * + * If the web socket is closed this method throw an {@link Err} with {@link #NORMAL} close status. + * + * @param data Data to send. + * @param success A success callback. + * @throws Exception If something goes wrong. + */ + default void broadcast(final Object data, final SuccessCallback success) throws Exception { + broadcast(data, success, ERR); + } + + /** + * Send data to all connected sessions. + * + * If the web socket is closed this method throw an {@link Err} with {@link #NORMAL} close status. + * + * @param data Data to send. + * @param err An err callback. + * @throws Exception If something goes wrong. + */ + default void broadcast(final Object data, final OnError err) throws Exception { + broadcast(data, SUCCESS, err); + ; + } + + /** + * Send data to all connected sessions. + * + * If the web socket is closed this method throw an {@link Err} with {@link #NORMAL} close status. + * + * @param data Data to send. + * @param success A success callback. + * @param err An err callback. + * @throws Exception If something goes wrong. + */ + void broadcast(Object data, SuccessCallback success, OnError err) + throws Exception; + } diff --git a/jooby/src/main/java/org/jooby/internal/WebSocketImpl.java b/jooby/src/main/java/org/jooby/internal/WebSocketImpl.java index 45e6996f82..8974803119 100644 --- a/jooby/src/main/java/org/jooby/internal/WebSocketImpl.java +++ b/jooby/src/main/java/org/jooby/internal/WebSocketImpl.java @@ -26,6 +26,8 @@ import java.util.Locale; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import org.jooby.Err; import org.jooby.MediaType; @@ -58,6 +60,9 @@ public class WebSocketImpl implements WebSocket { /** The logging system. */ private final Logger log = LoggerFactory.getLogger(WebSocket.class); + /** All connected websocket. */ + private static final Queue sessions = new ConcurrentLinkedQueue<>(); + private Locale locale; private String path; @@ -103,6 +108,7 @@ public WebSocketImpl(final OnOpen handler, final String path, @Override public void close(final CloseStatus status) { + sessions.remove(this); synchronized (this) { open = false; ws.close(status.code(), status.reason()); @@ -111,6 +117,7 @@ public void close(final CloseStatus status) { @Override public void resume() { + sessions.add(this); synchronized (this) { if (suspended) { ws.resume(); @@ -121,6 +128,7 @@ public void resume() { @Override public void pause() { + sessions.remove(this); synchronized (this) { if (!suspended) { ws.pause(); @@ -131,6 +139,7 @@ public void pause() { @Override public void terminate() throws Exception { + sessions.remove(this); synchronized (this) { open = false; ws.terminate(); @@ -142,6 +151,18 @@ public boolean isOpen() { return open && ws.isOpen(); } + @Override + public void broadcast(final Object data, final SuccessCallback success, final OnError err) + throws Exception { + for (WebSocket ws : sessions) { + try { + ws.send(data, success, err); + } catch (Exception ex) { + err.onError(ex); + } + } + } + @Override public void send(final Object data, final SuccessCallback success, final OnError err) throws Exception { @@ -160,7 +181,7 @@ public void send(final Object data, final SuccessCallback success, final OnError success, err).render(data); } else { - throw new Err(WebSocket.NORMAL, "Cannot send message on closed web socket"); + throw new Err(WebSocket.NORMAL, "WebSocket is closed."); } } } @@ -190,20 +211,24 @@ public void connect(final Injector injector, final Request req, final NativeWebS new StrParamReferenceImpl("body", "message", ImmutableList.of(message)))))) .onFailure(this::handleErr)); - ws.onCloseMessage((code, reason) -> Try - .run(sync(() -> { - this.open = false; - if (closeCallback != null) { - closeCallback.onClose(reason.map(r -> WebSocket.CloseStatus.of(code, r)) - .orElse(WebSocket.CloseStatus.of(code))); - } - closeCallback = null; - })).onFailure(this::handleErr)); + ws.onCloseMessage((code, reason) -> { + sessions.remove(this); + + Try.run(sync(() -> { + this.open = false; + if (closeCallback != null) { + closeCallback.onClose(reason.map(r -> WebSocket.CloseStatus.of(code, r)) + .orElse(WebSocket.CloseStatus.of(code))); + } + closeCallback = null; + })).onFailure(this::handleErr); + }); ws.onErrorMessage(this::handleErr); // connect now try { + sessions.add(this); handler.onOpen(req, this); } catch (Throwable ex) { handleErr(ex); diff --git a/jooby/src/test/java/org/jooby/WebSocketTest.java b/jooby/src/test/java/org/jooby/WebSocketTest.java index b38641fb81..4f421e8461 100644 --- a/jooby/src/test/java/org/jooby/WebSocketTest.java +++ b/jooby/src/test/java/org/jooby/WebSocketTest.java @@ -56,6 +56,12 @@ public void send(final Object data, final SuccessCallback success, final OnError throw new UnsupportedOperationException(); } + @Override + public void broadcast(final Object data, final SuccessCallback success, final OnError err) + throws Exception { + throw new UnsupportedOperationException(); + } + @Override public void onMessage(final OnMessage callback) throws Exception { throw new UnsupportedOperationException(); @@ -260,6 +266,27 @@ public void send(final Object data, final SuccessCallback success, final OnError assertEquals(data, dataList.getFirst()); } + @SuppressWarnings("resource") + @Test + public void broadcast() throws Exception { + Object data = new Object(); + WebSocket.SuccessCallback SUCCESS_ = WebSocket.SUCCESS; + WebSocket.OnError ERR_ = WebSocket.ERR; + LinkedList dataList = new LinkedList<>(); + WebSocket ws = new WebSocketMock() { + @Override + public void broadcast(final Object data, final SuccessCallback success, final OnError err) + throws Exception { + dataList.add(data); + assertEquals(SUCCESS_, success); + assertEquals(ERR_, err); + } + }; + ws.broadcast(data); + assertTrue(dataList.size() > 0); + assertEquals(data, dataList.getFirst()); + } + @SuppressWarnings("resource") @Test public void sendCustomSuccess() throws Exception { @@ -282,6 +309,28 @@ public void send(final Object data, final SuccessCallback success, final OnError assertEquals(data, dataList.getFirst()); } + @SuppressWarnings("resource") + @Test + public void broadcastCustomSuccess() throws Exception { + Object data = new Object(); + WebSocket.SuccessCallback SUCCESS_ = () -> { + }; + WebSocket.OnError ERR_ = WebSocket.ERR; + LinkedList dataList = new LinkedList<>(); + WebSocket ws = new WebSocketMock() { + @Override + public void broadcast(final Object data, final SuccessCallback success, final OnError err) + throws Exception { + dataList.add(data); + assertEquals(SUCCESS_, success); + assertEquals(ERR_, err); + } + }; + ws.broadcast(data, SUCCESS_); + assertTrue(dataList.size() > 0); + assertEquals(data, dataList.getFirst()); + } + @SuppressWarnings("resource") @Test public void sendCustomErr() throws Exception { @@ -304,6 +353,28 @@ public void send(final Object data, final SuccessCallback success, final OnError assertEquals(data, dataList.getFirst()); } + @SuppressWarnings("resource") + @Test + public void broadcastCustomErr() throws Exception { + Object data = new Object(); + WebSocket.SuccessCallback SUCCESS_ = WebSocket.SUCCESS; + WebSocket.OnError ERR_ = (ex) -> { + }; + LinkedList dataList = new LinkedList<>(); + WebSocket ws = new WebSocketMock() { + @Override + public void broadcast(final Object data, final SuccessCallback success, final OnError err) + throws Exception { + dataList.add(data); + assertEquals(SUCCESS_, success); + assertEquals(ERR_, err); + } + }; + ws.broadcast(data, ERR_); + assertTrue(dataList.size() > 0); + assertEquals(data, dataList.getFirst()); + } + @SuppressWarnings("resource") @Test public void sendCustomSuccessAndErr() throws Exception { @@ -327,6 +398,29 @@ public void send(final Object data, final SuccessCallback success, final OnError assertEquals(data, dataList.getFirst()); } + @SuppressWarnings("resource") + @Test + public void broadcastCustomSuccessAndErr() throws Exception { + Object data = new Object(); + WebSocket.SuccessCallback SUCCESS_ = () -> { + }; + WebSocket.OnError ERR_ = (ex) -> { + }; + LinkedList dataList = new LinkedList<>(); + WebSocket ws = new WebSocketMock() { + @Override + public void broadcast(final Object data, final SuccessCallback success, final OnError err) + throws Exception { + dataList.add(data); + assertEquals(SUCCESS_, success); + assertEquals(ERR_, err); + } + }; + ws.broadcast(data, SUCCESS_, ERR_); + assertTrue(dataList.size() > 0); + assertEquals(data, dataList.getFirst()); + } + @SuppressWarnings("resource") @Test public void getInstance() throws Exception { diff --git a/jooby/src/test/java/org/jooby/internal/WebSocketImplTest.java b/jooby/src/test/java/org/jooby/internal/WebSocketImplTest.java index 5c00c9ae89..3ffd0b2459 100644 --- a/jooby/src/test/java/org/jooby/internal/WebSocketImplTest.java +++ b/jooby/src/test/java/org/jooby/internal/WebSocketImplTest.java @@ -2,10 +2,12 @@ import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.isA; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.lang.reflect.Field; import java.nio.channels.ClosedChannelException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -15,6 +17,7 @@ import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.Queue; import java.util.Set; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -32,6 +35,8 @@ import org.jooby.spi.NativeWebSocket; import org.jooby.test.MockUnit; import org.jooby.test.MockUnit.Block; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.core.classloader.annotations.PrepareForTest; @@ -113,6 +118,105 @@ public void sendString() throws Exception { }); } + @SuppressWarnings("unchecked") + @Before + @After + public void resetSessions() throws Exception { + Field field = WebSocketImpl.class.getDeclaredField("sessions"); + field.setAccessible(true); + Queue sessions = (Queue) field.get(null); + sessions.clear(); + } + + @SuppressWarnings({"resource" }) + @Test + public void sendBroadcast() throws Exception { + Object data = "String"; + String path = "/"; + String pattern = "/pattern"; + Map vars = new HashMap<>(); + MediaType consumes = MediaType.all; + MediaType produces = MediaType.all; + new MockUnit(WebSocket.OnOpen1.class, WebSocket.SuccessCallback.class, + WebSocket.OnError.class, Injector.class, Request.class, NativeWebSocket.class) + .expect(connect) + .expect(callbacks) + .expect(locale) + .expect(unit -> { + List renderers = Collections.emptyList(); + + NativeWebSocket ws = unit.get(NativeWebSocket.class); + expect(ws.isOpen()).andReturn(true); + + WebSocketRendererContext ctx = unit.mockConstructor(WebSocketRendererContext.class, + new Class[]{List.class, NativeWebSocket.class, MediaType.class, Charset.class, + Locale.class, + WebSocket.SuccessCallback.class, + WebSocket.OnError.class }, + renderers, ws, + produces, StandardCharsets.UTF_8, + Locale.CANADA, + unit.get(WebSocket.SuccessCallback.class), + unit.get(WebSocket.OnError.class)); + ctx.render(data); + }) + .run(unit -> { + WebSocketImpl ws = new WebSocketImpl( + unit.get(WebSocket.OnOpen1.class), path, pattern, vars, consumes, produces); + ws.connect(unit.get(Injector.class), unit.get(Request.class), + unit.get(NativeWebSocket.class)); + + ws.broadcast(data, unit.get(WebSocket.SuccessCallback.class), + unit.get(WebSocket.OnError.class)); + }); + } + + @SuppressWarnings({"resource" }) + @Test + public void sendBroadcastErr() throws Exception { + Object data = "String"; + String path = "/"; + String pattern = "/pattern"; + Map vars = new HashMap<>(); + MediaType consumes = MediaType.all; + MediaType produces = MediaType.all; + new MockUnit(WebSocket.OnOpen1.class, WebSocket.SuccessCallback.class, + WebSocket.OnError.class, Injector.class, Request.class, NativeWebSocket.class) + .expect(connect) + .expect(callbacks) + .expect(locale) + .expect(unit -> { + List renderers = Collections.emptyList(); + + NativeWebSocket ws = unit.get(NativeWebSocket.class); + expect(ws.isOpen()).andReturn(true); + + WebSocketRendererContext ctx = unit.mockConstructor(WebSocketRendererContext.class, + new Class[]{List.class, NativeWebSocket.class, MediaType.class, Charset.class, + Locale.class, + WebSocket.SuccessCallback.class, + WebSocket.OnError.class }, + renderers, ws, + produces, StandardCharsets.UTF_8, + Locale.CANADA, + unit.get(WebSocket.SuccessCallback.class), + unit.get(WebSocket.OnError.class)); + ctx.render(data); + IllegalStateException x = new IllegalStateException("intentional err"); + expectLastCall().andThrow(x); + unit.get(WebSocket.OnError.class).onError(x); + }) + .run(unit -> { + WebSocketImpl ws = new WebSocketImpl( + unit.get(WebSocket.OnOpen1.class), path, pattern, vars, consumes, produces); + ws.connect(unit.get(Injector.class), unit.get(Request.class), + unit.get(NativeWebSocket.class)); + + ws.broadcast(data, unit.get(WebSocket.SuccessCallback.class), + unit.get(WebSocket.OnError.class)); + }); + } + @SuppressWarnings({"resource" }) @Test(expected = Err.class) public void sendClose() throws Exception { diff --git a/md/web-sockets.md b/md/web-sockets.md index d5657c95d4..c2d11bffab 100644 --- a/md/web-sockets.md +++ b/md/web-sockets.md @@ -22,6 +22,12 @@ Keep in mind that **web socket** are not like routes. There is no stack/pipe or You can mount a socket to a path used by a route, but you can't have two or more web sockets under the same path. +## send and broadcast + +As you saw early a [web socket]({{defdocs}}/WebSocket.html) and send data to client via [ws.send(...)]({{defdocs}}/WebSocket.html#send-java.lang.Object-) method. + +The [ws.broadcast(...)]({{defdocs}}/WebSocket.html#broadcast-java.lang.Object-) method does the same thing but for all the connected clients. + ## require Access to existing services is provided via [ws.require(type)]({{defdocs}}/WebSocket.html#require-com.google.inject.Key-) method: