Skip to content

Commit

Permalink
Merge pull request #680 from jooby-project/645
Browse files Browse the repository at this point in the history
Send message to all connected web socket clients fix #645
  • Loading branch information
jknack authored Mar 21, 2017
2 parents 7071898 + 60628f9 commit 08c255a
Show file tree
Hide file tree
Showing 6 changed files with 375 additions and 10 deletions.
84 changes: 84 additions & 0 deletions coverage-report/src/test/java/org/jooby/issues/Issue645.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package org.jooby.issues;

import static org.junit.Assert.assertEquals;

import java.util.Arrays;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.jooby.test.ServerFeature;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClientConfig;
import com.ning.http.client.ws.WebSocket;
import com.ning.http.client.ws.WebSocketTextListener;
import com.ning.http.client.ws.WebSocketUpgradeHandler;

public class Issue645 extends ServerFeature {

{
ws("/ws", (ws) -> {

ws.onMessage(message -> {
System.out.println(Thread.currentThread());
ws.broadcast("=" + message.value(), () -> {
System.out.println(Thread.currentThread());
ws.close();
});
});
});

}

private AsyncHttpClient client;

@Before
public void before() {
client = new AsyncHttpClient(new AsyncHttpClientConfig.Builder().build());
}

@After
public void after() {
client.close();
}

@Test
public void sendText() throws Exception {
LinkedList<String> messages = new LinkedList<>();

CountDownLatch latch = new CountDownLatch(1);

client.prepareGet(ws("ws").toString())
.execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(
new WebSocketTextListener() {

@Override
public void onMessage(final String message) {
messages.add(message);
latch.countDown();
}

@Override
public void onOpen(final WebSocket websocket) {
websocket.sendMessage("hey!");
}

@Override
public void onClose(final WebSocket websocket) {
}

@Override
public void onError(final Throwable t) {
}
}).build())
.get();
if (latch.await(1L, TimeUnit.SECONDS)) {
assertEquals(Arrays.asList("=hey!"), messages);
}
}

}
52 changes: 52 additions & 0 deletions jooby/src/main/java/org/jooby/WebSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -734,4 +734,56 @@ default void send(final Object data, final OnError err) throws Exception {
*/
void send(Object data, SuccessCallback success, OnError err) throws Exception;

/**
* Send data to all connected sessions.
*
* If the web socket is closed this method throw an {@link Err} with {@link #NORMAL} close status.
*
* @param data Data to send.
* @throws Exception If something goes wrong.
*/
default void broadcast(final Object data) throws Exception {
broadcast(data, SUCCESS, ERR);
}

/**
* Send data to all connected sessions.
*
* If the web socket is closed this method throw an {@link Err} with {@link #NORMAL} close status.
*
* @param data Data to send.
* @param success A success callback.
* @throws Exception If something goes wrong.
*/
default void broadcast(final Object data, final SuccessCallback success) throws Exception {
broadcast(data, success, ERR);
}

/**
* Send data to all connected sessions.
*
* If the web socket is closed this method throw an {@link Err} with {@link #NORMAL} close status.
*
* @param data Data to send.
* @param err An err callback.
* @throws Exception If something goes wrong.
*/
default void broadcast(final Object data, final OnError err) throws Exception {
broadcast(data, SUCCESS, err);
;
}

/**
* Send data to all connected sessions.
*
* If the web socket is closed this method throw an {@link Err} with {@link #NORMAL} close status.
*
* @param data Data to send.
* @param success A success callback.
* @param err An err callback.
* @throws Exception If something goes wrong.
*/
void broadcast(Object data, SuccessCallback success, OnError err)
throws Exception;

}
45 changes: 35 additions & 10 deletions jooby/src/main/java/org/jooby/internal/WebSocketImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.Locale;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.jooby.Err;
import org.jooby.MediaType;
Expand Down Expand Up @@ -58,6 +60,9 @@ public class WebSocketImpl implements WebSocket {
/** The logging system. */
private final Logger log = LoggerFactory.getLogger(WebSocket.class);

/** All connected websocket. */
private static final Queue<WebSocket> sessions = new ConcurrentLinkedQueue<>();

private Locale locale;

private String path;
Expand Down Expand Up @@ -103,6 +108,7 @@ public WebSocketImpl(final OnOpen handler, final String path,

@Override
public void close(final CloseStatus status) {
sessions.remove(this);
synchronized (this) {
open = false;
ws.close(status.code(), status.reason());
Expand All @@ -111,6 +117,7 @@ public void close(final CloseStatus status) {

@Override
public void resume() {
sessions.add(this);
synchronized (this) {
if (suspended) {
ws.resume();
Expand All @@ -121,6 +128,7 @@ public void resume() {

@Override
public void pause() {
sessions.remove(this);
synchronized (this) {
if (!suspended) {
ws.pause();
Expand All @@ -131,6 +139,7 @@ public void pause() {

@Override
public void terminate() throws Exception {
sessions.remove(this);
synchronized (this) {
open = false;
ws.terminate();
Expand All @@ -142,6 +151,18 @@ public boolean isOpen() {
return open && ws.isOpen();
}

@Override
public void broadcast(final Object data, final SuccessCallback success, final OnError err)
throws Exception {
for (WebSocket ws : sessions) {
try {
ws.send(data, success, err);
} catch (Exception ex) {
err.onError(ex);
}
}
}

@Override
public void send(final Object data, final SuccessCallback success, final OnError err)
throws Exception {
Expand All @@ -160,7 +181,7 @@ public void send(final Object data, final SuccessCallback success, final OnError
success,
err).render(data);
} else {
throw new Err(WebSocket.NORMAL, "Cannot send message on closed web socket");
throw new Err(WebSocket.NORMAL, "WebSocket is closed.");
}
}
}
Expand Down Expand Up @@ -190,20 +211,24 @@ public void connect(final Injector injector, final Request req, final NativeWebS
new StrParamReferenceImpl("body", "message", ImmutableList.of(message))))))
.onFailure(this::handleErr));

ws.onCloseMessage((code, reason) -> Try
.run(sync(() -> {
this.open = false;
if (closeCallback != null) {
closeCallback.onClose(reason.map(r -> WebSocket.CloseStatus.of(code, r))
.orElse(WebSocket.CloseStatus.of(code)));
}
closeCallback = null;
})).onFailure(this::handleErr));
ws.onCloseMessage((code, reason) -> {
sessions.remove(this);

Try.run(sync(() -> {
this.open = false;
if (closeCallback != null) {
closeCallback.onClose(reason.map(r -> WebSocket.CloseStatus.of(code, r))
.orElse(WebSocket.CloseStatus.of(code)));
}
closeCallback = null;
})).onFailure(this::handleErr);
});

ws.onErrorMessage(this::handleErr);

// connect now
try {
sessions.add(this);
handler.onOpen(req, this);
} catch (Throwable ex) {
handleErr(ex);
Expand Down
94 changes: 94 additions & 0 deletions jooby/src/test/java/org/jooby/WebSocketTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ public void send(final Object data, final SuccessCallback success, final OnError
throw new UnsupportedOperationException();
}

@Override
public void broadcast(final Object data, final SuccessCallback success, final OnError err)
throws Exception {
throw new UnsupportedOperationException();
}

@Override
public void onMessage(final OnMessage<Mutant> callback) throws Exception {
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -260,6 +266,27 @@ public void send(final Object data, final SuccessCallback success, final OnError
assertEquals(data, dataList.getFirst());
}

@SuppressWarnings("resource")
@Test
public void broadcast() throws Exception {
Object data = new Object();
WebSocket.SuccessCallback SUCCESS_ = WebSocket.SUCCESS;
WebSocket.OnError ERR_ = WebSocket.ERR;
LinkedList<Object> dataList = new LinkedList<>();
WebSocket ws = new WebSocketMock() {
@Override
public void broadcast(final Object data, final SuccessCallback success, final OnError err)
throws Exception {
dataList.add(data);
assertEquals(SUCCESS_, success);
assertEquals(ERR_, err);
}
};
ws.broadcast(data);
assertTrue(dataList.size() > 0);
assertEquals(data, dataList.getFirst());
}

@SuppressWarnings("resource")
@Test
public void sendCustomSuccess() throws Exception {
Expand All @@ -282,6 +309,28 @@ public void send(final Object data, final SuccessCallback success, final OnError
assertEquals(data, dataList.getFirst());
}

@SuppressWarnings("resource")
@Test
public void broadcastCustomSuccess() throws Exception {
Object data = new Object();
WebSocket.SuccessCallback SUCCESS_ = () -> {
};
WebSocket.OnError ERR_ = WebSocket.ERR;
LinkedList<Object> dataList = new LinkedList<>();
WebSocket ws = new WebSocketMock() {
@Override
public void broadcast(final Object data, final SuccessCallback success, final OnError err)
throws Exception {
dataList.add(data);
assertEquals(SUCCESS_, success);
assertEquals(ERR_, err);
}
};
ws.broadcast(data, SUCCESS_);
assertTrue(dataList.size() > 0);
assertEquals(data, dataList.getFirst());
}

@SuppressWarnings("resource")
@Test
public void sendCustomErr() throws Exception {
Expand All @@ -304,6 +353,28 @@ public void send(final Object data, final SuccessCallback success, final OnError
assertEquals(data, dataList.getFirst());
}

@SuppressWarnings("resource")
@Test
public void broadcastCustomErr() throws Exception {
Object data = new Object();
WebSocket.SuccessCallback SUCCESS_ = WebSocket.SUCCESS;
WebSocket.OnError ERR_ = (ex) -> {
};
LinkedList<Object> dataList = new LinkedList<>();
WebSocket ws = new WebSocketMock() {
@Override
public void broadcast(final Object data, final SuccessCallback success, final OnError err)
throws Exception {
dataList.add(data);
assertEquals(SUCCESS_, success);
assertEquals(ERR_, err);
}
};
ws.broadcast(data, ERR_);
assertTrue(dataList.size() > 0);
assertEquals(data, dataList.getFirst());
}

@SuppressWarnings("resource")
@Test
public void sendCustomSuccessAndErr() throws Exception {
Expand All @@ -327,6 +398,29 @@ public void send(final Object data, final SuccessCallback success, final OnError
assertEquals(data, dataList.getFirst());
}

@SuppressWarnings("resource")
@Test
public void broadcastCustomSuccessAndErr() throws Exception {
Object data = new Object();
WebSocket.SuccessCallback SUCCESS_ = () -> {
};
WebSocket.OnError ERR_ = (ex) -> {
};
LinkedList<Object> dataList = new LinkedList<>();
WebSocket ws = new WebSocketMock() {
@Override
public void broadcast(final Object data, final SuccessCallback success, final OnError err)
throws Exception {
dataList.add(data);
assertEquals(SUCCESS_, success);
assertEquals(ERR_, err);
}
};
ws.broadcast(data, SUCCESS_, ERR_);
assertTrue(dataList.size() > 0);
assertEquals(data, dataList.getFirst());
}

@SuppressWarnings("resource")
@Test
public void getInstance() throws Exception {
Expand Down
Loading

0 comments on commit 08c255a

Please sign in to comment.