Skip to content

Commit

Permalink
Send message to all connected web socket clients fix #645
Browse files Browse the repository at this point in the history
  • Loading branch information
jknack committed Mar 21, 2017
1 parent 7c9f59b commit 547c603
Show file tree
Hide file tree
Showing 6 changed files with 395 additions and 30 deletions.
84 changes: 84 additions & 0 deletions coverage-report/src/test/java/org/jooby/issues/Issue645.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package org.jooby.issues;

import static org.junit.Assert.assertEquals;

import java.util.Arrays;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.jooby.test.ServerFeature;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClientConfig;
import com.ning.http.client.ws.WebSocket;
import com.ning.http.client.ws.WebSocketTextListener;
import com.ning.http.client.ws.WebSocketUpgradeHandler;

public class Issue645 extends ServerFeature {

{
ws("/ws", (ws) -> {

ws.onMessage(message -> {
System.out.println(Thread.currentThread());
ws.broadcast("=" + message.value(), () -> {
System.out.println(Thread.currentThread());
ws.close();
});
});
});

}

private AsyncHttpClient client;

@Before
public void before() {
client = new AsyncHttpClient(new AsyncHttpClientConfig.Builder().build());
}

@After
public void after() {
client.close();
}

@Test
public void sendText() throws Exception {
LinkedList<String> messages = new LinkedList<>();

CountDownLatch latch = new CountDownLatch(1);

client.prepareGet(ws("ws").toString())
.execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(
new WebSocketTextListener() {

@Override
public void onMessage(final String message) {
messages.add(message);
latch.countDown();
}

@Override
public void onOpen(final WebSocket websocket) {
websocket.sendMessage("hey!");
}

@Override
public void onClose(final WebSocket websocket) {
}

@Override
public void onError(final Throwable t) {
}
}).build())
.get();
if (latch.await(1L, TimeUnit.SECONDS)) {
assertEquals(Arrays.asList("=hey!"), messages);
}
}

}
92 changes: 72 additions & 20 deletions jooby/src/main/java/org/jooby/WebSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ interface OnOpen {
interface OnOpen1 extends OnOpen {

@Override
default void onOpen(final Request req, final WebSocket ws) throws Exception {
default void onOpen(Request req, WebSocket ws) throws Exception {
onOpen(ws);
}

Expand All @@ -207,17 +207,17 @@ default void onOpen(final Request req, final WebSocket ws) throws Exception {
*/
class CloseStatus {
/** A status code. */
private final int code;
private int code;

/** A close reason. */
private final String reason;
private String reason;

/**
* Create a new {@link CloseStatus} instance.
*
* @param code the status code
*/
private CloseStatus(final int code) {
private CloseStatus(int code) {
this(code, null);
}

Expand All @@ -227,7 +227,7 @@ private CloseStatus(final int code) {
* @param code the status code
* @param reason the reason
*/
private CloseStatus(final int code, final String reason) {
private CloseStatus(int code, String reason) {
Preconditions.checkArgument((code >= 1000 && code < 5000), "Invalid code: %s", code);
this.code = code;
this.reason = reason == null || reason.isEmpty() ? null : reason;
Expand All @@ -239,7 +239,7 @@ private CloseStatus(final int code, final String reason) {
* @param code A status code.
* @return A new close status.
*/
public static CloseStatus of(final int code) {
public static CloseStatus of(int code) {
return new CloseStatus(code);
}

Expand All @@ -250,7 +250,7 @@ public static CloseStatus of(final int code) {
* @param reason A close reason.
* @return A new close status.
*/
public static CloseStatus of(final int code, final String reason) {
public static CloseStatus of(int code, String reason) {
requireNonNull(reason, "A reason is required.");
return new CloseStatus(code, reason);
}
Expand Down Expand Up @@ -368,7 +368,7 @@ class Definition {
* @param pattern A path pattern.
* @param handler A ws handler.
*/
public Definition(final String pattern, final OnOpen handler) {
public Definition(String pattern, OnOpen handler) {
requireNonNull(pattern, "A route path is required.");
requireNonNull(handler, "A handler is required.");

Expand All @@ -391,7 +391,7 @@ public String pattern() {
* @param path A path pattern.
* @return A web socket or empty optional.
*/
public Optional<WebSocket> matches(final String path) {
public Optional<WebSocket> matches(String path) {
RouteMatcher matcher = routePattern.matcher("WS" + path);
if (matcher.matches()) {
return Optional.of(asWebSocket(matcher));
Expand All @@ -405,7 +405,7 @@ public Optional<WebSocket> matches(final String path) {
* @param type The media types to test for.
* @return This route definition.
*/
public Definition consumes(final String type) {
public Definition consumes(String type) {
return consumes(MediaType.valueOf(type));
}

Expand All @@ -415,7 +415,7 @@ public Definition consumes(final String type) {
* @param type The media types to test for.
* @return This route definition.
*/
public Definition consumes(final MediaType type) {
public Definition consumes(MediaType type) {
this.consumes = requireNonNull(type, "A type is required.");
return this;
}
Expand All @@ -426,7 +426,7 @@ public Definition consumes(final MediaType type) {
* @param type The media types to test for.
* @return This route definition.
*/
public Definition produces(final String type) {
public Definition produces(String type) {
return produces(MediaType.valueOf(type));
}

Expand All @@ -436,7 +436,7 @@ public Definition produces(final String type) {
* @param type The media types to test for.
* @return This route definition.
*/
public Definition produces(final MediaType type) {
public Definition produces(MediaType type) {
this.produces = requireNonNull(type, "A type is required.");
return this;
}
Expand All @@ -456,7 +456,7 @@ public MediaType produces() {
}

@Override
public boolean equals(final Object obj) {
public boolean equals(Object obj) {
if (obj instanceof Definition) {
Definition def = (Definition) obj;
return this.pattern.equals(def.pattern);
Expand Down Expand Up @@ -484,7 +484,7 @@ public String toString() {
* @param matcher A route matcher.
* @return A new web socket.
*/
private WebSocket asWebSocket(final RouteMatcher matcher) {
private WebSocket asWebSocket(RouteMatcher matcher) {
return new WebSocketImpl(handler, matcher.path(), pattern, matcher.vars(),
consumes, produces);
}
Expand Down Expand Up @@ -632,7 +632,7 @@ interface Handler<T> extends OnClose, OnMessage<T>, OnError, OnOpen {
* @param code Close status code.
* @param reason Close reason.
*/
default void close(final int code, final String reason) {
default void close(int code, String reason) {
close(CloseStatus.of(code, reason));
}

Expand All @@ -641,7 +641,7 @@ default void close(final int code, final String reason) {
*
* @param code Close status code.
*/
default void close(final int code) {
default void close(int code) {
close(CloseStatus.of(code));
}

Expand Down Expand Up @@ -692,7 +692,7 @@ default void close() {
* @param data Data to send.
* @throws Exception If something goes wrong.
*/
default void send(final Object data) throws Exception {
default void send(Object data) throws Exception {
send(data, SUCCESS, ERR);
}

Expand All @@ -705,7 +705,7 @@ default void send(final Object data) throws Exception {
* @param success A success callback.
* @throws Exception If something goes wrong.
*/
default void send(final Object data, final SuccessCallback success) throws Exception {
default void send(Object data, SuccessCallback success) throws Exception {
send(data, success, ERR);
}

Expand All @@ -718,7 +718,7 @@ default void send(final Object data, final SuccessCallback success) throws Excep
* @param err An err callback.
* @throws Exception If something goes wrong.
*/
default void send(final Object data, final OnError err) throws Exception {
default void send(Object data, OnError err) throws Exception {
send(data, SUCCESS, err);
}

Expand All @@ -734,4 +734,56 @@ default void send(final Object data, final OnError err) throws Exception {
*/
void send(Object data, SuccessCallback success, OnError err) throws Exception;

/**
* Send data to all connected sessions.
*
* If the web socket is closed this method throw an {@link Err} with {@link #NORMAL} close status.
*
* @param data Data to send.
* @throws Exception If something goes wrong.
*/
default void broadcast(Object data) throws Exception {
broadcast(data, SUCCESS, ERR);
}

/**
* Send data to all connected sessions.
*
* If the web socket is closed this method throw an {@link Err} with {@link #NORMAL} close status.
*
* @param data Data to send.
* @param success A success callback.
* @throws Exception If something goes wrong.
*/
default void broadcast(Object data, SuccessCallback success) throws Exception {
broadcast(data, success, ERR);
}

/**
* Send data to all connected sessions.
*
* If the web socket is closed this method throw an {@link Err} with {@link #NORMAL} close status.
*
* @param data Data to send.
* @param err An err callback.
* @throws Exception If something goes wrong.
*/
default void broadcast(Object data, OnError err) throws Exception {
broadcast(data, SUCCESS, err);
;
}

/**
* Send data to all connected sessions.
*
* If the web socket is closed this method throw an {@link Err} with {@link #NORMAL} close status.
*
* @param data Data to send.
* @param success A success callback.
* @param err An err callback.
* @throws Exception If something goes wrong.
*/
void broadcast(Object data, SuccessCallback success, OnError err)
throws Exception;

}
Loading

0 comments on commit 547c603

Please sign in to comment.