diff --git a/readme.md b/readme.md
index 1c8d5c6..a860ef3 100644
--- a/readme.md
+++ b/readme.md
@@ -110,6 +110,16 @@ To support mock of web sockets this wrapper allows you to either specify a ``req
.waitFor(1500).andEmit("root - DELETED")
.done()
.once()
+
+#### Closing Web Socket messages ####
+
+ server.expect().withPath("/api/v1/users/watch")
+ .andUpgradeToWebSocket()
+ .open()
+ .waitFor(1000).andEmit("root - CREATED")
+ .waitFor(1500).andEmit(new WebsocketCloseReason(1000, "Bye bye"))
+ .done()
+ .once()
### CRUD Mocking ###
diff --git a/src/main/java/io/fabric8/mockwebserver/internal/InlineWebSocketSessionBuilder.java b/src/main/java/io/fabric8/mockwebserver/internal/InlineWebSocketSessionBuilder.java
index cf31e8a..3aa741d 100644
--- a/src/main/java/io/fabric8/mockwebserver/internal/InlineWebSocketSessionBuilder.java
+++ b/src/main/java/io/fabric8/mockwebserver/internal/InlineWebSocketSessionBuilder.java
@@ -1,12 +1,12 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,17 +20,9 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.mockwebserver.Context;
import io.fabric8.mockwebserver.MockServerException;
-import io.fabric8.mockwebserver.dsl.Emitable;
-import io.fabric8.mockwebserver.dsl.EventDoneable;
-import io.fabric8.mockwebserver.dsl.Function;
-import io.fabric8.mockwebserver.dsl.TimesOrOnceable;
-import io.fabric8.mockwebserver.dsl.WebSocketSessionBuilder;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Queue;
+import io.fabric8.mockwebserver.dsl.*;
+
+import java.util.*;
public class InlineWebSocketSessionBuilder implements WebSocketSessionBuilder, EventDoneable {
@@ -171,7 +163,10 @@ private WebSocketMessage toWebSocketMessage(Object content, Boolean toBeRemoved)
}
private WebSocketMessage toWebSocketMessage(Long delay, Object content, Boolean toBeRemoved) {
- if (content instanceof String) {
+ if (content instanceof WebsocketCloseReason) {
+ WebsocketCloseReason closeReason = (WebsocketCloseReason) content;
+ return new WebSocketMessage(delay, closeReason.getReason(), toBeRemoved, closeReason.getCode());
+ } else if (content instanceof String) {
return new WebSocketMessage(delay, (String) content, toBeRemoved);
} else if (content instanceof WebSocketMessage) {
return (WebSocketMessage) content;
diff --git a/src/main/java/io/fabric8/mockwebserver/internal/WebSocketMessage.java b/src/main/java/io/fabric8/mockwebserver/internal/WebSocketMessage.java
index 3bc44ea..d6426df 100644
--- a/src/main/java/io/fabric8/mockwebserver/internal/WebSocketMessage.java
+++ b/src/main/java/io/fabric8/mockwebserver/internal/WebSocketMessage.java
@@ -23,40 +23,50 @@ public class WebSocketMessage {
private final byte[] body;
private final boolean toBeRemoved;
private final boolean binary;
+ private final Integer closingReason;
public WebSocketMessage(String body) {
this(0L, body, true);
}
public WebSocketMessage(byte[] body) {
- this(0L, body, true, true);
+ this(0L, body, true, true, null);
}
public WebSocketMessage(String body, boolean toBeRemoved) {
- this(0L, body.getBytes(StandardCharsets.UTF_8), toBeRemoved, false);
+ this(0L, body.getBytes(StandardCharsets.UTF_8), toBeRemoved, false, null);
}
public WebSocketMessage(byte[] body, boolean toBeRemoved) {
- this(0L, body, toBeRemoved, true);
+ this(0L, body, toBeRemoved, true, null);
}
public WebSocketMessage(Long delay, String body, boolean toBeRemoved) {
- this(delay, body.getBytes(StandardCharsets.UTF_8), toBeRemoved, false);
+ this(delay, body.getBytes(StandardCharsets.UTF_8), toBeRemoved, false, null);
}
public WebSocketMessage(Long delay, byte[] body, boolean toBeRemoved) {
- this(delay, body, toBeRemoved, true);
+ this(delay, body, toBeRemoved, true, null);
}
+
+ public WebSocketMessage(Long delay, String body, boolean toBeRemoved, Integer closingReason) {
+ this(delay, body.getBytes(StandardCharsets.UTF_8), toBeRemoved, false, closingReason);
+ }
- public WebSocketMessage(Long delay, String body, boolean toBeRemoved, boolean binary) {
- this(delay, body.getBytes(StandardCharsets.UTF_8), toBeRemoved, binary);
+ public WebSocketMessage(Long delay, byte[] body, boolean toBeRemoved, Integer closingReason) {
+ this(delay, body, toBeRemoved, true, closingReason);
+ }
+
+ public WebSocketMessage(Long delay, String body, boolean toBeRemoved, boolean binary, Integer closingReason) {
+ this(delay, body.getBytes(StandardCharsets.UTF_8), toBeRemoved, binary, closingReason);
}
- public WebSocketMessage(Long delay, byte[] body, boolean toBeRemoved, boolean binary) {
+ public WebSocketMessage(Long delay, byte[] body, boolean toBeRemoved, boolean binary, Integer closingReason) {
this.delay = delay;
this.body = body;
this.toBeRemoved = toBeRemoved;
this.binary = binary;
+ this.closingReason = closingReason;
}
public Long getDelay() {
@@ -78,4 +88,8 @@ public byte[] getBytes() {
public boolean isBinary() {
return binary;
}
+
+ public Integer getClosingReason() {
+ return closingReason;
+ }
}
diff --git a/src/main/java/io/fabric8/mockwebserver/internal/WebSocketSession.java b/src/main/java/io/fabric8/mockwebserver/internal/WebSocketSession.java
index f91a452..322d6d1 100644
--- a/src/main/java/io/fabric8/mockwebserver/internal/WebSocketSession.java
+++ b/src/main/java/io/fabric8/mockwebserver/internal/WebSocketSession.java
@@ -24,6 +24,7 @@
import okhttp3.mockwebserver.RecordedRequest;
import okio.ByteString;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -161,7 +162,13 @@ private void send(final WebSocket ws, final WebSocketMessage message) {
pendingMessages.add(id);
executor.schedule(() -> {
if (ws != null) {
- if (message.isBinary()) {
+ if (message.getClosingReason() != null) {
+ if (message.isBinary()) {
+ ws.close(message.getClosingReason(), new String(message.getBytes(), StandardCharsets.UTF_8));
+ } else {
+ ws.close(message.getClosingReason(), message.getBody());
+ }
+ } else if (message.isBinary()) {
ws.send(ByteString.of(message.getBytes()));
} else {
ws.send(message.getBody());
diff --git a/src/main/java/io/fabric8/mockwebserver/internal/WebsocketCloseReason.java b/src/main/java/io/fabric8/mockwebserver/internal/WebsocketCloseReason.java
new file mode 100644
index 0000000..56580d4
--- /dev/null
+++ b/src/main/java/io/fabric8/mockwebserver/internal/WebsocketCloseReason.java
@@ -0,0 +1,22 @@
+package io.fabric8.mockwebserver.internal;
+
+public class WebsocketCloseReason {
+
+ private final int code;
+ private final String reason;
+
+ public WebsocketCloseReason(int code, String reason) {
+ super();
+ this.code = code;
+ this.reason = reason;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public String getReason() {
+ return reason;
+ }
+
+}
diff --git a/src/test/groovy/io/fabric8/mockwebserver/DefaultMockServerWebSocketTest.groovy b/src/test/groovy/io/fabric8/mockwebserver/DefaultMockServerWebSocketTest.groovy
index ef218fa..31ff83a 100644
--- a/src/test/groovy/io/fabric8/mockwebserver/DefaultMockServerWebSocketTest.groovy
+++ b/src/test/groovy/io/fabric8/mockwebserver/DefaultMockServerWebSocketTest.groovy
@@ -28,6 +28,8 @@ import java.util.concurrent.TimeUnit
import java.util.stream.Collectors
import java.util.stream.IntStream
+import io.fabric8.mockwebserver.internal.WebsocketCloseReason
+
class DefaultMockServerWebSocketTest extends Specification {
DefaultMockServer server
@@ -119,4 +121,56 @@ class DefaultMockServerWebSocketTest extends Specification {
cleanup:
wss.forEach(ws -> ws.close(1000, "Test finished"))
}
+
+ // https://github.com/fabric8io/mockwebserver/pull/66#issuecomment-944289335
+ def "andUpgradeToWebSocket, with closing events, should close session"() {
+ given:
+ server.expect()
+ .withPath("/websocket")
+ .andUpgradeToWebSocket().open().waitFor(5L).andEmit("A text message").waitFor(10L).andEmit(new WebsocketCloseReason(1000, "Everything is ok")).done().always()
+ def future = new CompletableFuture>()
+ when:
+ def ws = client.newWebSocket(new Request.Builder().url(server.url("/websocket")).build(), new WebSocketListener() {
+ List messages = new ArrayList<>()
+ @Override
+ void onMessage(WebSocket webSocket, String text) {
+ messages.add(text)
+ }
+ @Override
+ void onClosing(WebSocket webSocket, int code, String reason) {
+ messages.add("Session closed with code " + code + " and reason " + reason)
+ future.complete(messages)
+ }
+ })
+ then:
+ def result = future.get(50L, TimeUnit.MILLISECONDS);
+ assert result.size() == 2
+ assert result.get(0) == "A text message"
+ assert result.get(1) == "Session closed with code 1000 and reason Everything is ok"
+ }
+
+ def "andUpgradeToWebSocket, with closing events, should close session, later message skipped"() {
+ given:
+ server.expect()
+ .withPath("/websocket")
+ .andUpgradeToWebSocket().open().waitFor(10L).andEmit("A text message").waitFor(5L).andEmit(new WebsocketCloseReason(1000, "Everything is ok")).done().always()
+ def future = new CompletableFuture>()
+ when:
+ def ws = client.newWebSocket(new Request.Builder().url(server.url("/websocket")).build(), new WebSocketListener() {
+ List messages = new ArrayList<>()
+ @Override
+ void onMessage(WebSocket webSocket, String text) {
+ messages.add(text)
+ }
+ @Override
+ void onClosing(WebSocket webSocket, int code, String reason) {
+ messages.add("Session closed with code " + code + " and reason " + reason)
+ future.complete(messages)
+ }
+ })
+ then:
+ def result = future.get(50L, TimeUnit.MILLISECONDS);
+ assert result.size() == 1
+ assert result.get(0) == "Session closed with code 1000 and reason Everything is ok"
+ }
}