failure)
{
return new Callback()
@@ -129,6 +135,10 @@ public void failed(Throwable x)
};
}
+ /** Creaste a callback that runs completed when it succeeds or fails
+ * @param completed The completion to run on success or failure
+ * @return a new callback
+ */
static Callback from(Runnable completed)
{
return new Completing()
@@ -140,6 +150,67 @@ public void completed()
};
}
+ /**
+ * Create a nested callback that runs completed after
+ * completing the nested callback.
+ * @param callback The nested callback
+ * @param completed The completion to run after the nested callback is completed
+ * @return a new callback.
+ */
+ static Callback from(Callback callback, Runnable completed)
+ {
+ return new Nested(callback)
+ {
+ public void completed()
+ {
+ completed.run();
+ }
+ };
+ }
+
+ /**
+ * Create a nested callback that runs completed before
+ * completing the nested callback.
+ * @param callback The nested callback
+ * @param completed The completion to run before the nested callback is completed. Any exceptions thrown
+ * from completed will result in a callback failure.
+ * @return a new callback.
+ */
+ static Callback from(Runnable completed, Callback callback)
+ {
+ return new Callback()
+ {
+ @Override
+ public void succeeded()
+ {
+ try
+ {
+ completed.run();
+ callback.succeeded();
+ }
+ catch(Throwable t)
+ {
+ callback.failed(t);
+ }
+ }
+
+ @Override
+ public void failed(Throwable x)
+ {
+ try
+ {
+ completed.run();
+ }
+ catch(Throwable t)
+ {
+ x.addSuppressed(t);
+ }
+ callback.failed(x);
+ }
+ };
+ }
+
+
class Completing implements Callback
{
@Override
@@ -158,7 +229,11 @@ public void completed()
{
}
}
-
+
+ /**
+ * Nested Completing Callback that completes after
+ * completing the nested callback
+ */
class Nested extends Completing
{
private final Callback callback;
diff --git a/jetty-websocket/javax-websocket-client/src/main/java/org/eclipse/jetty/websocket/javax/client/JavaxWebSocketClientContainer.java b/jetty-websocket/javax-websocket-client/src/main/java/org/eclipse/jetty/websocket/javax/client/JavaxWebSocketClientContainer.java
index f538c5e4cefe..e43a2fec36a2 100644
--- a/jetty-websocket/javax-websocket-client/src/main/java/org/eclipse/jetty/websocket/javax/client/JavaxWebSocketClientContainer.java
+++ b/jetty-websocket/javax-websocket-client/src/main/java/org/eclipse/jetty/websocket/javax/client/JavaxWebSocketClientContainer.java
@@ -18,15 +18,6 @@
package org.eclipse.jetty.websocket.javax.client;
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.io.ByteBufferPool;
-import org.eclipse.jetty.util.DecoratedObjectFactory;
-import org.eclipse.jetty.util.annotation.ManagedObject;
-import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
-import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
-import org.eclipse.jetty.websocket.javax.common.*;
-
-import javax.websocket.*;
import java.io.IOException;
import java.net.URI;
import java.util.Objects;
@@ -34,8 +25,29 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
+import javax.websocket.ClientEndpoint;
+import javax.websocket.ClientEndpointConfig;
+import javax.websocket.DeploymentException;
+import javax.websocket.Endpoint;
+import javax.websocket.EndpointConfig;
+import javax.websocket.Extension;
+import javax.websocket.Session;
+
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.io.ByteBufferPool;
+import org.eclipse.jetty.util.DecoratedObjectFactory;
+import org.eclipse.jetty.util.annotation.ManagedObject;
+import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
+import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
+import org.eclipse.jetty.websocket.javax.common.ConfiguredEndpoint;
+import org.eclipse.jetty.websocket.javax.common.InvalidWebSocketException;
+import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketContainer;
+import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketExtensionConfig;
+import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandlerFactory;
+
/**
* Container for Client use of the javax.websocket API.
*
@@ -161,12 +173,15 @@ private Session connect(ConfiguredEndpoint configuredEndpoint, URI destURI) thro
try
{
Future sessionFuture = connect(upgradeRequest);
- long timeout = getDefaultMaxSessionIdleTimeout();
+ long timeout = coreClient.getHttpClient().getConnectTimeout();
if (timeout>0)
- return sessionFuture.get(timeout, TimeUnit.MILLISECONDS);
-
+ return sessionFuture.get(timeout+1000, TimeUnit.MILLISECONDS);
return sessionFuture.get();
}
+ catch (TimeoutException e)
+ {
+ throw new IOException("Connection future not completed " + destURI, e);
+ }
catch (Exception e)
{
throw new IOException("Unable to connect to " + destURI, e);
diff --git a/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java
index 6bde0f123a06..066c0ab12920 100644
--- a/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java
+++ b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java
@@ -18,26 +18,47 @@
package org.eclipse.jetty.websocket.javax.common;
-import org.eclipse.jetty.util.BufferUtil;
-import org.eclipse.jetty.util.Callback;
-import org.eclipse.jetty.util.log.Log;
-import org.eclipse.jetty.util.log.Logger;
-import org.eclipse.jetty.websocket.core.*;
-import org.eclipse.jetty.websocket.javax.common.decoders.AvailableDecoders;
-import org.eclipse.jetty.websocket.javax.common.messages.*;
-import org.eclipse.jetty.websocket.javax.common.util.InvokerUtils;
-
-import javax.websocket.MessageHandler;
-import javax.websocket.*;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
-public class JavaxWebSocketFrameHandler implements FrameHandler.Adaptor
+import javax.websocket.CloseReason;
+import javax.websocket.Decoder;
+import javax.websocket.EndpointConfig;
+import javax.websocket.MessageHandler;
+import javax.websocket.PongMessage;
+import javax.websocket.Session;
+
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.websocket.core.CloseStatus;
+import org.eclipse.jetty.websocket.core.Frame;
+import org.eclipse.jetty.websocket.core.FrameHandler;
+import org.eclipse.jetty.websocket.core.OpCode;
+import org.eclipse.jetty.websocket.core.ProtocolException;
+import org.eclipse.jetty.websocket.core.WebSocketConstants;
+import org.eclipse.jetty.websocket.core.WebSocketException;
+import org.eclipse.jetty.websocket.javax.common.decoders.AvailableDecoders;
+import org.eclipse.jetty.websocket.javax.common.messages.DecodedBinaryMessageSink;
+import org.eclipse.jetty.websocket.javax.common.messages.DecodedBinaryStreamMessageSink;
+import org.eclipse.jetty.websocket.javax.common.messages.DecodedTextMessageSink;
+import org.eclipse.jetty.websocket.javax.common.messages.DecodedTextStreamMessageSink;
+import org.eclipse.jetty.websocket.javax.common.messages.PartialByteArrayMessageSink;
+import org.eclipse.jetty.websocket.javax.common.messages.PartialByteBufferMessageSink;
+import org.eclipse.jetty.websocket.javax.common.messages.PartialStringMessageSink;
+import org.eclipse.jetty.websocket.javax.common.util.InvokerUtils;
+
+public class JavaxWebSocketFrameHandler implements FrameHandler
{
private final Logger LOG;
private final JavaxWebSocketContainer container;
@@ -171,106 +192,57 @@ public void setMaxBinaryMessageBufferSize(int maxBinaryMessageBufferSize)
}
@Override
- public void onClosed(CloseStatus closeStatus)
+ public void onOpen(CoreSession coreSession, Callback callback)
{
- if (closeHandle != null)
- {
- try
- {
- CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.getCloseCode(closeStatus.getCode()), closeStatus.getReason());
- closeHandle.invoke(closeReason);
- }
- catch (Throwable cause)
- {
- throw new WebSocketException(endpointInstance.getClass().getName() + " CLOSE method error: " + cause.getMessage(), cause);
- }
- }
-
- container.removeBean(session);
- }
-
- @SuppressWarnings("Duplicates")
- @Override
- public void onError(Throwable cause)
- {
- futureSession.completeExceptionally(cause);
-
- if (errorHandle == null)
- {
- LOG.warn("Unhandled Error: " + endpointInstance, cause);
- return;
- }
-
try
{
- errorHandle.invoke(cause);
- }
- catch (Throwable t)
- {
- WebSocketException wsError = new WebSocketException(endpointInstance.getClass().getName() + " ERROR method error: " + cause.getMessage(), t);
- wsError.addSuppressed(cause);
- throw wsError;
- }
- }
-
- @Override
- public void onOpen(CoreSession coreSession) throws Exception
- {
- this.coreSession = coreSession;
- session = new JavaxWebSocketSession(container, coreSession, this, upgradeRequest.getUserPrincipal(), id, endpointConfig);
+ this.coreSession = coreSession;
+ session = new JavaxWebSocketSession(container, coreSession, this, upgradeRequest.getUserPrincipal(), id, endpointConfig);
- openHandle = InvokerUtils.bindTo(openHandle, session, endpointConfig);
- closeHandle = InvokerUtils.bindTo(closeHandle, session);
- errorHandle = InvokerUtils.bindTo(errorHandle, session);
+ openHandle = InvokerUtils.bindTo(openHandle, session, endpointConfig);
+ closeHandle = InvokerUtils.bindTo(closeHandle, session);
+ errorHandle = InvokerUtils.bindTo(errorHandle, session);
- JavaxWebSocketFrameHandlerMetadata.MessageMetadata actualTextMetadata = JavaxWebSocketFrameHandlerMetadata.MessageMetadata.copyOf(textMetadata);
- JavaxWebSocketFrameHandlerMetadata.MessageMetadata actualBinaryMetadata = JavaxWebSocketFrameHandlerMetadata.MessageMetadata.copyOf(binaryMetadata);
+ JavaxWebSocketFrameHandlerMetadata.MessageMetadata actualTextMetadata = JavaxWebSocketFrameHandlerMetadata.MessageMetadata.copyOf(textMetadata);
+ JavaxWebSocketFrameHandlerMetadata.MessageMetadata actualBinaryMetadata = JavaxWebSocketFrameHandlerMetadata.MessageMetadata.copyOf(binaryMetadata);
- pongHandle = InvokerUtils.bindTo(pongHandle, session);
+ pongHandle = InvokerUtils.bindTo(pongHandle, session);
- if (actualTextMetadata != null)
- {
- actualTextMetadata.handle = InvokerUtils.bindTo(actualTextMetadata.handle, endpointInstance, endpointConfig, session);
- actualTextMetadata.handle = JavaxWebSocketFrameHandlerFactory.wrapNonVoidReturnType(actualTextMetadata.handle, session);
- textSink = JavaxWebSocketFrameHandlerFactory.createMessageSink(session, actualTextMetadata);
+ if (actualTextMetadata != null)
+ {
+ actualTextMetadata.handle = InvokerUtils.bindTo(actualTextMetadata.handle, endpointInstance, endpointConfig, session);
+ actualTextMetadata.handle = JavaxWebSocketFrameHandlerFactory.wrapNonVoidReturnType(actualTextMetadata.handle, session);
+ textSink = JavaxWebSocketFrameHandlerFactory.createMessageSink(session, actualTextMetadata);
- textMetadata = actualTextMetadata;
- }
+ textMetadata = actualTextMetadata;
+ }
- if (actualBinaryMetadata != null)
- {
- actualBinaryMetadata.handle = InvokerUtils.bindTo(actualBinaryMetadata.handle, endpointInstance, endpointConfig, session);
- actualBinaryMetadata.handle = JavaxWebSocketFrameHandlerFactory.wrapNonVoidReturnType(actualBinaryMetadata.handle, session);
- binarySink = JavaxWebSocketFrameHandlerFactory.createMessageSink(session, actualBinaryMetadata);
+ if (actualBinaryMetadata != null)
+ {
+ actualBinaryMetadata.handle = InvokerUtils.bindTo(actualBinaryMetadata.handle, endpointInstance, endpointConfig, session);
+ actualBinaryMetadata.handle = JavaxWebSocketFrameHandlerFactory.wrapNonVoidReturnType(actualBinaryMetadata.handle, session);
+ binarySink = JavaxWebSocketFrameHandlerFactory.createMessageSink(session, actualBinaryMetadata);
- binaryMetadata = actualBinaryMetadata;
- }
+ binaryMetadata = actualBinaryMetadata;
+ }
- if (openHandle != null)
- {
- try
- {
+ if (openHandle != null)
openHandle.invoke();
- }
- catch (Throwable cause)
- {
- Exception wse = new WebSocketException(endpointInstance.getClass().getName() + " OPEN method error: " + cause.getMessage(), cause);
- // TODO This feels like double handling of the exception? Review need for futureSession
- futureSession.completeExceptionally(wse);
- throw wse;
- }
+ container.addBean(session, true);
+ futureSession.complete(session);
+ callback.succeeded();
}
+ catch (Throwable cause)
+ {
+ Exception wse = new WebSocketException(endpointInstance.getClass().getName() + " OPEN method error: " + cause.getMessage(), cause);
- container.addBean(session, true);
- futureSession.complete(session);
+ // TODO This feels like double handling of the exception? Review need for futureSession
+ futureSession.completeExceptionally(wse);
+ callback.failed(wse);
+ }
}
- /**
- * @see #onFrame(Frame,Callback)
- */
- public final void onFrame(Frame frame) {}
-
@Override
public void onFrame(Frame frame, Callback callback)
{
@@ -302,6 +274,50 @@ public void onFrame(Frame frame, Callback callback)
dataType = OpCode.UNDEFINED;
}
+
+ @Override
+ public void onClosed(CloseStatus closeStatus, Callback callback)
+ {
+ try
+ {
+ if (closeHandle != null)
+ {
+ CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.getCloseCode(closeStatus.getCode()), closeStatus.getReason());
+ closeHandle.invoke(closeReason);
+ }
+ container.removeBean(session);
+ callback.succeeded();
+ }
+ catch (Throwable cause)
+ {
+ callback.failed(new WebSocketException(endpointInstance.getClass().getName() + " CLOSE method error: " + cause.getMessage(), cause));
+ }
+ }
+
+ @Override
+ public void onError(Throwable cause, Callback callback)
+ {
+ try
+ {
+ futureSession.completeExceptionally(cause);
+
+ if (errorHandle != null)
+ errorHandle.invoke(cause);
+ else
+ LOG.warn("Unhandled Error: " + endpointInstance, cause);
+ callback.succeeded();
+ }
+ catch (Throwable t)
+ {
+ WebSocketException wsError = new WebSocketException(endpointInstance.getClass().getName() + " ERROR method error: " + cause.getMessage(), t);
+ wsError.addSuppressed(cause);
+ callback.failed(wsError);
+ // TODO should futureSession be failed here?
+ }
+ }
+
+
+
public Set getMessageHandlers()
{
if (messageHandlerMap.isEmpty())
diff --git a/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketSession.java b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketSession.java
index d63666d2a26f..85f0ea66b340 100644
--- a/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketSession.java
+++ b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketSession.java
@@ -220,10 +220,10 @@ public void filterReturnType(Object obj)
{
getBasicRemote().sendObject(obj);
}
- catch (Throwable cause)
+ catch (Exception cause)
{
- // TODO: need way to fail Channel.
- frameHandler.onError(cause);
+ // TODO review this
+ throw new RuntimeException(cause);
}
}
}
diff --git a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnCloseTest.java b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnCloseTest.java
index 2a32e029b66e..8f87f4a18c3a 100644
--- a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnCloseTest.java
+++ b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnCloseTest.java
@@ -18,6 +18,13 @@
package org.eclipse.jetty.websocket.javax.common;
+import java.util.concurrent.TimeUnit;
+
+import javax.websocket.ClientEndpoint;
+import javax.websocket.CloseReason;
+import javax.websocket.OnClose;
+import javax.websocket.Session;
+
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
@@ -25,12 +32,6 @@
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
-import javax.websocket.ClientEndpoint;
-import javax.websocket.CloseReason;
-import javax.websocket.OnClose;
-import javax.websocket.Session;
-import java.util.concurrent.TimeUnit;
-
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
@@ -44,12 +45,12 @@ private void assertOnCloseInvocation(TrackingSocket socket, Matcher even
JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket);
// These invocations are the same for all tests
- localEndpoint.onOpen(channel);
+ localEndpoint.onOpen(channel, Callback.NOOP);
CloseStatus status = new CloseStatus(CloseStatus.NORMAL, "Normal");
Frame closeFrame = status.toFrame();
localEndpoint.onFrame(closeFrame, Callback.from(() ->
{
- localEndpoint.onClosed(status);
+ localEndpoint.onClosed(status, Callback.NOOP);
}, t ->
{
throw new RuntimeException(t);
diff --git a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnErrorTest.java b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnErrorTest.java
index c91bf81ec19c..eaf62d31d267 100644
--- a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnErrorTest.java
+++ b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnErrorTest.java
@@ -18,14 +18,16 @@
package org.eclipse.jetty.websocket.javax.common;
-import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket;
-import org.hamcrest.Matcher;
-import org.junit.jupiter.api.Test;
+import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpoint;
import javax.websocket.OnError;
import javax.websocket.Session;
-import java.util.concurrent.TimeUnit;
+
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket;
+import org.hamcrest.Matcher;
+import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
@@ -40,8 +42,8 @@ private void assertOnErrorInvocation(TrackingSocket socket, Matcher even
JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket);
// These invocations are the same for all tests
- localEndpoint.onOpen(channel);
- localEndpoint.onError(new RuntimeException("From Testcase"));
+ localEndpoint.onOpen(channel, Callback.NOOP);
+ localEndpoint.onError(new RuntimeException("From Testcase"), Callback.NOOP);
String event = socket.events.poll(1, TimeUnit.SECONDS);
assertThat("Event", event, eventMatcher);
}
diff --git a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_BinaryStreamTest.java b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_BinaryStreamTest.java
index 1c60f3f0499f..6e53a2bbca1a 100644
--- a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_BinaryStreamTest.java
+++ b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_BinaryStreamTest.java
@@ -18,6 +18,15 @@
package org.eclipse.jetty.websocket.javax.common;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import javax.websocket.ClientEndpoint;
+import javax.websocket.OnMessage;
+
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.websocket.core.Frame;
@@ -25,14 +34,6 @@
import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket;
import org.junit.jupiter.api.Test;
-import javax.websocket.ClientEndpoint;
-import javax.websocket.OnMessage;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -44,7 +45,7 @@ private TrackingSocket performOnMessageInvocation(TrackingSocket socket, Functio
JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket);
// This invocation is the same for all tests
- localEndpoint.onOpen(channel);
+ localEndpoint.onOpen(channel, Callback.NOOP);
func.apply(localEndpoint);
diff --git a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_BinaryTest.java b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_BinaryTest.java
index 225d21992b90..f50220a502b4 100644
--- a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_BinaryTest.java
+++ b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_BinaryTest.java
@@ -18,6 +18,14 @@
package org.eclipse.jetty.websocket.javax.common;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+
+import javax.websocket.ClientEndpoint;
+import javax.websocket.OnMessage;
+import javax.websocket.Session;
+
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.Frame;
@@ -27,13 +35,6 @@
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
-import javax.websocket.ClientEndpoint;
-import javax.websocket.OnMessage;
-import javax.websocket.Session;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.TimeUnit;
-
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
@@ -47,7 +48,7 @@ private void assertOnMessageInvocation(TrackingSocket socket, Matcher ev
JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket);
// This invocation is the same for all tests
- localEndpoint.onOpen(channel);
+ localEndpoint.onOpen(channel, Callback.NOOP);
assertThat("Has Binary Metadata", localEndpoint.getBinaryMetadata(), notNullValue());
diff --git a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_TextStreamTest.java b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_TextStreamTest.java
index a036519ffe29..b689aa387ee8 100644
--- a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_TextStreamTest.java
+++ b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_TextStreamTest.java
@@ -18,6 +18,14 @@
package org.eclipse.jetty.websocket.javax.common;
+import java.io.IOException;
+import java.io.Reader;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import javax.websocket.ClientEndpoint;
+import javax.websocket.OnMessage;
+
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.websocket.core.Frame;
@@ -25,13 +33,6 @@
import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket;
import org.junit.jupiter.api.Test;
-import javax.websocket.ClientEndpoint;
-import javax.websocket.OnMessage;
-import java.io.IOException;
-import java.io.Reader;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -43,7 +44,7 @@ private TrackingSocket performOnMessageInvocation(TrackingSocket socket, Functio
JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket);
// This invocation is the same for all tests
- localEndpoint.onOpen(channel);
+ localEndpoint.onOpen(channel, Callback.NOOP);
func.apply(localEndpoint);
diff --git a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_TextTest.java b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_TextTest.java
index 6e47087cd2ab..8ceb54c2801d 100644
--- a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_TextTest.java
+++ b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_TextTest.java
@@ -18,6 +18,14 @@
package org.eclipse.jetty.websocket.javax.common;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+
+import javax.websocket.ClientEndpoint;
+import javax.websocket.OnMessage;
+import javax.websocket.Session;
+
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.Frame;
@@ -27,13 +35,6 @@
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
-import javax.websocket.ClientEndpoint;
-import javax.websocket.OnMessage;
-import javax.websocket.Session;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.TimeUnit;
-
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
@@ -46,7 +47,7 @@ private void onText(TrackingSocket socket, String msg) throws Exception
JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket);
// This invocation is the same for all tests
- localEndpoint.onOpen(channel);
+ localEndpoint.onOpen(channel, Callback.NOOP);
ByteBuffer payload = BufferUtil.toBuffer(msg, StandardCharsets.UTF_8);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload(payload).setFin(true), Callback.NOOP);
diff --git a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnOpenTest.java b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnOpenTest.java
index b031044dd84c..22d0a7800dbf 100644
--- a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnOpenTest.java
+++ b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnOpenTest.java
@@ -18,14 +18,16 @@
package org.eclipse.jetty.websocket.javax.common;
-import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket;
-import org.hamcrest.Matcher;
-import org.junit.jupiter.api.Test;
+import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpoint;
import javax.websocket.OnOpen;
import javax.websocket.Session;
-import java.util.concurrent.TimeUnit;
+
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket;
+import org.hamcrest.Matcher;
+import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
@@ -38,7 +40,7 @@ private void assertOnOpenInvocation(TrackingSocket socket, Matcher event
JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket);
// This invocation is the same for all tests
- localEndpoint.onOpen(channel);
+ localEndpoint.onOpen(channel, Callback.NOOP);
String event = socket.events.poll(1, TimeUnit.SECONDS);
assertThat("Event", event, eventMatcher);
}
diff --git a/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/NetworkFuzzer.java b/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/NetworkFuzzer.java
index 75c46a81439e..5238b8882c2a 100644
--- a/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/NetworkFuzzer.java
+++ b/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/NetworkFuzzer.java
@@ -18,6 +18,16 @@
package org.eclipse.jetty.websocket.javax.tests;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.io.EndPoint;
@@ -32,16 +42,6 @@
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.eclipse.jetty.websocket.core.internal.Generator;
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseable
{
private final LocalServer server;
@@ -217,7 +217,7 @@ protected void handleException(Throwable failure)
}
}
- public static class FrameCapture implements FrameHandler.Adaptor
+ public static class FrameCapture implements FrameHandler
{
private final BlockingQueue receivedFrames = new LinkedBlockingQueue<>();
private final EndPoint endPoint;
@@ -229,32 +229,34 @@ public FrameCapture(EndPoint endPoint)
this.endPoint = endPoint;
}
+
@Override
- public void onClosed(CloseStatus closeStatus)
+ public void onOpen(CoreSession coreSession, Callback callback)
{
+ this.session = coreSession;
+ callback.succeeded();
}
@Override
- public void onError(Throwable cause) throws Exception
+ public void onFrame(Frame frame, Callback callback)
{
+ receivedFrames.offer(Frame.copy(frame));
+ callback.succeeded();
}
@Override
- public void onFrame(Frame frame, Callback callback)
+ public void onError(Throwable cause, Callback callback)
{
- receivedFrames.offer(Frame.copy(frame));
- synchronized(this)
- {
- callback.succeeded();
- }
+ callback.succeeded();
}
@Override
- public void onOpen(CoreSession coreSession) throws Exception
+ public void onClosed(CloseStatus closeStatus, Callback callback)
{
- this.session = coreSession;
+ callback.succeeded();
}
+
public void writeRaw(ByteBuffer buffer) throws IOException
{
synchronized (this)
diff --git a/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/framehandlers/FrameEcho.java b/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/framehandlers/FrameEcho.java
index 706b4b398cbd..c5bbc0738d02 100644
--- a/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/framehandlers/FrameEcho.java
+++ b/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/framehandlers/FrameEcho.java
@@ -25,16 +25,17 @@
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
-public class FrameEcho implements FrameHandler.Adaptor
+public class FrameEcho implements FrameHandler
{
private Logger LOG = Log.getLogger(FrameEcho.class);
private CoreSession coreSession;
@Override
- public void onOpen(CoreSession coreSession) throws Exception
+ public void onOpen(CoreSession coreSession, Callback callback)
{
this.coreSession = coreSession;
+ callback.succeeded();
}
@Override
@@ -47,15 +48,18 @@ public void onFrame(Frame frame, Callback callback)
}
@Override
- public void onClosed(CloseStatus closeStatus)
+ public void onError(Throwable cause, Callback callback)
{
- coreSession = null;
+ if (LOG.isDebugEnabled())
+ LOG.debug(this + " onError ", cause);
+ callback.succeeded();
}
@Override
- public void onError(Throwable cause) throws Exception
+ public void onClosed(CloseStatus closeStatus, Callback callback)
{
- if (LOG.isDebugEnabled())
- LOG.debug(this + " onError ", cause);
+ coreSession = null;
+ callback.succeeded();
}
+
}
diff --git a/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/framehandlers/FrameHandlerTracker.java b/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/framehandlers/FrameHandlerTracker.java
index ecb117df4975..4df016bbcaaf 100644
--- a/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/framehandlers/FrameHandlerTracker.java
+++ b/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/framehandlers/FrameHandlerTracker.java
@@ -18,17 +18,17 @@
package org.eclipse.jetty.websocket.javax.tests.framehandlers;
-import org.eclipse.jetty.util.BufferUtil;
-import org.eclipse.jetty.util.Callback;
-import org.eclipse.jetty.websocket.core.CloseStatus;
-import org.eclipse.jetty.websocket.core.MessageHandler;
-
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicReference;
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.websocket.core.CloseStatus;
+import org.eclipse.jetty.websocket.core.MessageHandler;
+
public class FrameHandlerTracker extends MessageHandler
{
public CountDownLatch openLatch = new CountDownLatch(1);
@@ -45,39 +45,39 @@ public void addEvent(String format, Object... args)
}
@Override
- public void onOpen(CoreSession coreSession) throws Exception
+ public void onOpen(CoreSession coreSession, Callback callback)
{
- super.onOpen(coreSession);
- openLatch.countDown();
+ super.onOpen(coreSession, Callback.from(callback,()->openLatch.countDown()));
}
@Override
- public void onClosed(CloseStatus closeStatus) throws Exception
+ public void onText(String wholeMessage, Callback callback)
{
- super.onClosed(closeStatus);
-
- closeDetail.compareAndSet(null, closeStatus);
- closeLatch.countDown();
+ messageQueue.offer(wholeMessage);
+ callback.succeeded();
}
@Override
- public void onError(Throwable cause) throws Exception
+ public void onBinary(ByteBuffer wholeMessage, Callback callback)
{
- super.onError(cause);
- error.compareAndSet(null, cause);
+ bufferQueue.offer(BufferUtil.copy(wholeMessage));
+ callback.succeeded();
}
@Override
- public void onText(String wholeMessage, Callback callback)
+ public void onError(Throwable cause, Callback callback)
{
- messageQueue.offer(wholeMessage);
- callback.succeeded();
+ super.onError(cause, Callback.from(callback, ()-> error.compareAndSet(null, cause)));
}
@Override
- public void onBinary(ByteBuffer wholeMessage, Callback callback)
+ public void onClosed(CloseStatus closeStatus, Callback callback)
{
- bufferQueue.offer(BufferUtil.copy(wholeMessage));
- callback.succeeded();
+ super.onClosed(closeStatus, Callback.from(callback,()->
+ {
+ closeDetail.compareAndSet(null, closeStatus);
+ closeLatch.countDown();
+ }));
}
+
}
diff --git a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/MessageReceivingTest.java b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/MessageReceivingTest.java
index 5a71d9578257..5b0517436851 100644
--- a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/MessageReceivingTest.java
+++ b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/MessageReceivingTest.java
@@ -18,6 +18,23 @@
package org.eclipse.jetty.websocket.javax.tests.client;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import javax.websocket.ClientEndpointConfig;
+import javax.websocket.ContainerProvider;
+import javax.websocket.Endpoint;
+import javax.websocket.EndpointConfig;
+import javax.websocket.Session;
+import javax.websocket.WebSocketContainer;
+
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.LifeCycle;
@@ -28,31 +45,15 @@
import org.eclipse.jetty.websocket.core.MessageHandler;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.server.Negotiation;
+import org.eclipse.jetty.websocket.javax.common.util.TextUtil;
import org.eclipse.jetty.websocket.javax.tests.CoreServer;
import org.eclipse.jetty.websocket.javax.tests.DataUtils;
-import org.eclipse.jetty.websocket.javax.common.util.TextUtil;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import javax.websocket.ClientEndpointConfig;
-import javax.websocket.ContainerProvider;
-import javax.websocket.Endpoint;
-import javax.websocket.EndpointConfig;
-import javax.websocket.Session;
-import javax.websocket.WebSocketContainer;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
@@ -292,9 +293,10 @@ public void onBinary(ByteBuffer wholeMessage, Callback callback)
}
@Override
- public void onError(Throwable cause)
+ public void onError(Throwable cause, Callback callback)
{
LOG.warn(cause);
+ callback.succeeded();
}
}
@@ -323,9 +325,10 @@ public void onText(String wholeMessage, Callback callback)
}
@Override
- public void onError(Throwable cause)
+ public void onError(Throwable cause, Callback callback)
{
LOG.warn(cause);
+ callback.succeeded();
}
}
diff --git a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/OnCloseTest.java b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/OnCloseTest.java
index 72e5fe952e11..a60540331e59 100644
--- a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/OnCloseTest.java
+++ b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/OnCloseTest.java
@@ -30,19 +30,19 @@
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.FrameHandler;
-import org.eclipse.jetty.websocket.javax.tests.WSEventTracker;
-import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseReasonSocket;
-import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseSocket;
+import org.eclipse.jetty.websocket.javax.client.EmptyClientEndpointConfig;
+import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientContainer;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequest;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequestAdapter;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponse;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponseAdapter;
-import org.eclipse.jetty.websocket.javax.client.EmptyClientEndpointConfig;
-import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientContainer;
+import org.eclipse.jetty.websocket.javax.tests.WSEventTracker;
import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseReasonSessionSocket;
+import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseReasonSocket;
import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseSessionReasonSocket;
import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseSessionSocket;
+import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseSocket;
import org.hamcrest.Matchers;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -112,11 +112,11 @@ public void testOnCloseCall(Case testcase) throws Exception
CompletableFuture futureSession = new CompletableFuture<>();
JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(endpoint, request, response, futureSession);
- frameHandler.onOpen(new FrameHandler.CoreSession.Empty());
+ frameHandler.onOpen(new FrameHandler.CoreSession.Empty(), Callback.NOOP);
// Execute onClose call
frameHandler.onFrame(CloseStatus.toFrame(CloseStatus.NORMAL), Callback.NOOP);
- frameHandler.onClosed(CloseStatus.NORMAL_STATUS);
+ frameHandler.onClosed(CloseStatus.NORMAL_STATUS, Callback.NOOP);
// Test captured event
BlockingQueue events = endpoint.events;
diff --git a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/SessionAddMessageHandlerTest.java b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/SessionAddMessageHandlerTest.java
index 979da223f60b..c2344a5e270c 100644
--- a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/SessionAddMessageHandlerTest.java
+++ b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/SessionAddMessageHandlerTest.java
@@ -33,12 +33,9 @@
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.OpCode;
-import org.eclipse.jetty.websocket.javax.tests.MessageType;
-import org.eclipse.jetty.websocket.javax.tests.SessionMatchers;
-import org.eclipse.jetty.websocket.javax.tests.handlers.ByteArrayWholeHandler;
-import org.eclipse.jetty.websocket.javax.tests.handlers.ByteBufferPartialHandler;
-import org.eclipse.jetty.websocket.javax.tests.handlers.LongMessageHandler;
-import org.eclipse.jetty.websocket.javax.tests.handlers.StringWholeHandler;
+import org.eclipse.jetty.websocket.javax.client.EmptyClientEndpointConfig;
+import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientContainer;
+import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientFrameHandlerFactory;
import org.eclipse.jetty.websocket.javax.common.ConfiguredEndpoint;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandlerFactory;
@@ -47,20 +44,20 @@
import org.eclipse.jetty.websocket.javax.common.UpgradeRequestAdapter;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponse;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponseAdapter;
-import org.eclipse.jetty.websocket.javax.client.EmptyClientEndpointConfig;
-import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientContainer;
-import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientFrameHandlerFactory;
+import org.eclipse.jetty.websocket.javax.tests.MessageType;
+import org.eclipse.jetty.websocket.javax.tests.SessionMatchers;
+import org.eclipse.jetty.websocket.javax.tests.handlers.ByteArrayWholeHandler;
+import org.eclipse.jetty.websocket.javax.tests.handlers.ByteBufferPartialHandler;
+import org.eclipse.jetty.websocket.javax.tests.handlers.LongMessageHandler;
+import org.eclipse.jetty.websocket.javax.tests.handlers.StringWholeHandler;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
public class SessionAddMessageHandlerTest
{
@@ -88,7 +85,7 @@ public void initSession() throws Exception
JavaxWebSocketFrameHandlerFactory frameHandlerFactory = new JavaxWebSocketClientFrameHandlerFactory(container);
CompletableFuture futureSession = new CompletableFuture<>();
frameHandler = frameHandlerFactory.newJavaxFrameHandler(ei, handshakeRequest, handshakeResponse, futureSession);
- frameHandler.onOpen(new FrameHandler.CoreSession.Empty());
+ frameHandler.onOpen(new FrameHandler.CoreSession.Empty(), Callback.NOOP);
// Session
session = frameHandler.getSession();
diff --git a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/server/JavaxWebSocketFrameHandler_OnMessage_TextStreamTest.java b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/server/JavaxWebSocketFrameHandler_OnMessage_TextStreamTest.java
index b590af6cafb5..680f06f594f6 100644
--- a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/server/JavaxWebSocketFrameHandler_OnMessage_TextStreamTest.java
+++ b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/server/JavaxWebSocketFrameHandler_OnMessage_TextStreamTest.java
@@ -35,12 +35,12 @@
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.OpCode;
-import org.eclipse.jetty.websocket.javax.tests.WSEventTracker;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequest;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequestAdapter;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponse;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponseAdapter;
+import org.eclipse.jetty.websocket.javax.tests.WSEventTracker;
import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.is;
@@ -57,7 +57,7 @@ private T performOnMessageInvocation(T socket, Consum
// Establish endpoint function
JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(socket, request, response, futureSession);
- frameHandler.onOpen(new FrameHandler.CoreSession.Empty());
+ frameHandler.onOpen(new FrameHandler.CoreSession.Empty(), Callback.NOOP);
func.accept(frameHandler);
return socket;
}
diff --git a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java
index 8c859a15bf2c..91d8a1d24852 100644
--- a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java
+++ b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java
@@ -18,6 +18,11 @@
package org.eclipse.jetty.websocket.common;
+import java.lang.invoke.MethodHandle;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
@@ -26,14 +31,19 @@
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.common.invoke.InvalidSignatureException;
-import org.eclipse.jetty.websocket.core.*;
-
-import java.lang.invoke.MethodHandle;
-import java.nio.ByteBuffer;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-public class JettyWebSocketFrameHandler implements FrameHandler.Adaptor
+import org.eclipse.jetty.websocket.core.BadPayloadException;
+import org.eclipse.jetty.websocket.core.CloseException;
+import org.eclipse.jetty.websocket.core.CloseStatus;
+import org.eclipse.jetty.websocket.core.Frame;
+import org.eclipse.jetty.websocket.core.FrameHandler;
+import org.eclipse.jetty.websocket.core.MessageTooLargeException;
+import org.eclipse.jetty.websocket.core.OpCode;
+import org.eclipse.jetty.websocket.core.ProtocolException;
+import org.eclipse.jetty.websocket.core.UpgradeException;
+import org.eclipse.jetty.websocket.core.WebSocketException;
+import org.eclipse.jetty.websocket.core.WebSocketTimeoutException;
+
+public class JettyWebSocketFrameHandler implements FrameHandler
{
private final Logger log;
private final Executor executor;
@@ -104,61 +114,41 @@ public WebSocketSessionImpl getSession()
}
@Override
- public void onClosed(CloseStatus closeStatus)
+ public void onOpen(CoreSession coreSession, Callback callback)
{
- }
-
- @SuppressWarnings("Duplicates")
- @Override
- public void onError(Throwable cause)
- {
- cause = convertCause(cause);
- futureSession.completeExceptionally(cause);
-
- if (errorHandle == null)
- {
- log.warn("Unhandled Error: Endpoint " + endpointInstance.getClass().getName() + " : " + cause);
- if (log.isDebugEnabled())
- log.debug("unhandled", cause);
- return;
- }
-
try
{
- errorHandle.invoke(cause);
- }
- catch (Throwable t)
- {
- WebSocketException wsError = new WebSocketException(endpointInstance.getClass().getName() + " ERROR method error: " + cause.getMessage(), t);
- wsError.addSuppressed(cause);
- throw wsError;
- }
- }
-
- public static Throwable convertCause(Throwable cause)
- {
- if (cause instanceof MessageTooLargeException)
- return new org.eclipse.jetty.websocket.api.MessageTooLargeException(cause.getMessage(), cause);
+ customizer.customize(coreSession);
- if (cause instanceof ProtocolException)
- return new org.eclipse.jetty.websocket.api.ProtocolException(cause.getMessage(), cause);
-
- if (cause instanceof BadPayloadException)
- return new org.eclipse.jetty.websocket.api.BadPayloadException(cause.getMessage(), cause);
+ session = new WebSocketSessionImpl(coreSession, this, upgradeRequest, upgradeResponse);
- if (cause instanceof CloseException)
- return new org.eclipse.jetty.websocket.api.CloseException(((CloseException)cause).getStatusCode(), cause.getMessage(), cause);
+ frameHandle = JettyWebSocketFrameHandlerFactory.bindTo(frameHandle, session);
+ openHandle = JettyWebSocketFrameHandlerFactory.bindTo(openHandle, session);
+ closeHandle = JettyWebSocketFrameHandlerFactory.bindTo(closeHandle, session);
+ errorHandle = JettyWebSocketFrameHandlerFactory.bindTo(errorHandle, session);
+ textHandle = JettyWebSocketFrameHandlerFactory.bindTo(textHandle, session);
+ binaryHandle = JettyWebSocketFrameHandlerFactory.bindTo(binaryHandle, session);
+ pingHandle = JettyWebSocketFrameHandlerFactory.bindTo(pingHandle, session);
+ pongHandle = JettyWebSocketFrameHandlerFactory.bindTo(pongHandle, session);
- if (cause instanceof WebSocketTimeoutException)
- return new org.eclipse.jetty.websocket.api.WebSocketTimeoutException(cause.getMessage(), cause);
+ if (textHandle != null)
+ textSink = JettyWebSocketFrameHandlerFactory.createMessageSink(textHandle, textSinkClass, executor, coreSession.getMaxTextMessageSize());
- if (cause instanceof InvalidSignatureException)
- return new org.eclipse.jetty.websocket.api.InvalidWebSocketException(cause.getMessage(), cause);
+ if (binaryHandle != null)
+ binarySink = JettyWebSocketFrameHandlerFactory
+ .createMessageSink(binaryHandle, binarySinkClass, executor, coreSession.getMaxBinaryMessageSize());
- if (cause instanceof UpgradeException)
- return new org.eclipse.jetty.websocket.api.UpgradeException(((UpgradeException)cause).getRequestURI(), cause);
+ if (openHandle != null)
+ openHandle.invoke();
- return cause;
+ futureSession.complete(session);
+ callback.succeeded();
+ }
+ catch (Throwable cause)
+ {
+ // TODO should futureSession be failed here?
+ callback.failed(new WebSocketException(endpointInstance.getClass().getName() + " OPEN method error: " + cause.getMessage(), cause));
+ }
}
/**
@@ -185,61 +175,56 @@ public void onFrame(Frame frame, Callback callback)
switch (frame.getOpCode())
{
case OpCode.CLOSE:
- onClose(frame, callback);
+ onCloseFrame(frame, callback);
break;
case OpCode.PING:
- onPing(frame, callback);
+ onPingFrame(frame, callback);
break;
case OpCode.PONG:
- onPong(frame, callback);
+ onPongFrame(frame, callback);
break;
case OpCode.TEXT:
- onText(frame, callback);
+ onTextFrame(frame, callback);
break;
case OpCode.BINARY:
- onBinary(frame, callback);
+ onBinaryFrame(frame, callback);
break;
case OpCode.CONTINUATION:
- onContinuation(frame, callback);
+ onContinuationFrame(frame, callback);
break;
}
}
@Override
- public void onOpen(CoreSession coreSession)
+ public void onError(Throwable cause, Callback callback)
{
- customizer.customize(coreSession);
-
- session = new WebSocketSessionImpl(coreSession, this, upgradeRequest, upgradeResponse);
-
- frameHandle = JettyWebSocketFrameHandlerFactory.bindTo(frameHandle, session);
- openHandle = JettyWebSocketFrameHandlerFactory.bindTo(openHandle, session);
- closeHandle = JettyWebSocketFrameHandlerFactory.bindTo(closeHandle, session);
- errorHandle = JettyWebSocketFrameHandlerFactory.bindTo(errorHandle, session);
- textHandle = JettyWebSocketFrameHandlerFactory.bindTo(textHandle, session);
- binaryHandle = JettyWebSocketFrameHandlerFactory.bindTo(binaryHandle, session);
- pingHandle = JettyWebSocketFrameHandlerFactory.bindTo(pingHandle, session);
- pongHandle = JettyWebSocketFrameHandlerFactory.bindTo(pongHandle, session);
-
- if (textHandle != null)
- textSink = JettyWebSocketFrameHandlerFactory.createMessageSink(textHandle, textSinkClass, executor, coreSession.getMaxTextMessageSize());
-
- if (binaryHandle != null)
- binarySink = JettyWebSocketFrameHandlerFactory.createMessageSink(binaryHandle, binarySinkClass, executor, coreSession.getMaxBinaryMessageSize());
-
- if (openHandle != null)
+ try
{
- try
- {
- openHandle.invoke();
- }
- catch (Throwable cause)
+ cause = convertCause(cause);
+ futureSession.completeExceptionally(cause);
+
+ if (errorHandle != null)
+ errorHandle.invoke(cause);
+ else
{
- throw new WebSocketException(endpointInstance.getClass().getName() + " OPEN method error: " + cause.getMessage(), cause);
+ log.warn("Unhandled Error: Endpoint " + endpointInstance.getClass().getName() + " : " + cause);
+ if (log.isDebugEnabled())
+ log.debug("unhandled", cause);
}
+ callback.succeeded();
}
+ catch (Throwable t)
+ {
+ WebSocketException wsError = new WebSocketException(endpointInstance.getClass().getName() + " ERROR method error: " + cause.getMessage(), t);
+ wsError.addSuppressed(cause);
+ callback.failed(wsError);
+ }
+ }
- futureSession.complete(session);
+ @Override
+ public void onClosed(CloseStatus closeStatus, Callback callback)
+ {
+ callback.succeeded();
}
public String toString()
@@ -259,7 +244,7 @@ private void acceptMessage(Frame frame, Callback callback)
activeMessageSink = null;
}
- private void onBinary(Frame frame, Callback callback)
+ private void onBinaryFrame(Frame frame, Callback callback)
{
if (activeMessageSink == null)
activeMessageSink = binarySink;
@@ -267,7 +252,7 @@ private void onBinary(Frame frame, Callback callback)
acceptMessage(frame, callback);
}
- private void onClose(Frame frame, Callback callback)
+ private void onCloseFrame(Frame frame, Callback callback)
{
if (closeHandle != null)
{
@@ -284,12 +269,12 @@ private void onClose(Frame frame, Callback callback)
callback.succeeded();
}
- private void onContinuation(Frame frame, Callback callback)
+ private void onContinuationFrame(Frame frame, Callback callback)
{
acceptMessage(frame, callback);
}
- private void onPing(Frame frame, Callback callback)
+ private void onPingFrame(Frame frame, Callback callback)
{
if (pingHandle != null)
{
@@ -317,7 +302,7 @@ private void onPing(Frame frame, Callback callback)
callback.succeeded();
}
- private void onPong(Frame frame, Callback callback)
+ private void onPongFrame(Frame frame, Callback callback)
{
if (pongHandle != null)
{
@@ -337,11 +322,39 @@ private void onPong(Frame frame, Callback callback)
callback.succeeded();
}
- private void onText(Frame frame, Callback callback)
+ private void onTextFrame(Frame frame, Callback callback)
{
if (activeMessageSink == null)
activeMessageSink = textSink;
acceptMessage(frame, callback);
}
+
+
+ static Throwable convertCause(Throwable cause)
+ {
+ if (cause instanceof MessageTooLargeException)
+ return new org.eclipse.jetty.websocket.api.MessageTooLargeException(cause.getMessage(), cause);
+
+ if (cause instanceof ProtocolException)
+ return new org.eclipse.jetty.websocket.api.ProtocolException(cause.getMessage(), cause);
+
+ if (cause instanceof BadPayloadException)
+ return new org.eclipse.jetty.websocket.api.BadPayloadException(cause.getMessage(), cause);
+
+ if (cause instanceof CloseException)
+ return new org.eclipse.jetty.websocket.api.CloseException(((CloseException)cause).getStatusCode(), cause.getMessage(), cause);
+
+ if (cause instanceof WebSocketTimeoutException)
+ return new org.eclipse.jetty.websocket.api.WebSocketTimeoutException(cause.getMessage(), cause);
+
+ if (cause instanceof InvalidSignatureException)
+ return new org.eclipse.jetty.websocket.api.InvalidWebSocketException(cause.getMessage(), cause);
+
+ if (cause instanceof UpgradeException)
+ return new org.eclipse.jetty.websocket.api.UpgradeException(((UpgradeException)cause).getRequestURI(), cause);
+
+ return cause;
+ }
+
}
diff --git a/jetty-websocket/jetty-websocket-common/src/test/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerTest.java b/jetty-websocket/jetty-websocket-common/src/test/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerTest.java
index df5b3c23add0..7176f0140787 100644
--- a/jetty-websocket/jetty-websocket-common/src/test/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerTest.java
+++ b/jetty-websocket/jetty-websocket-common/src/test/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerTest.java
@@ -119,7 +119,7 @@ public void testConnectionListener() throws Exception
JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket);
// Trigger Events
- localEndpoint.onOpen(channel);
+ localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello?").setFin(true), Callback.NOOP);
localEndpoint.onFrame(CloseStatus.toFrame(StatusCode.NORMAL, "Normal"), Callback.NOOP);
@@ -163,7 +163,7 @@ public void testAnnotatedStreamedText_Single() throws Exception
JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket);
// Trigger Events
- localEndpoint.onOpen(channel);
+ localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello Text Stream").setFin(true), Callback.NOOP);
localEndpoint.onFrame(CloseStatus.toFrame(StatusCode.NORMAL, "Normal"), Callback.NOOP);
@@ -185,7 +185,7 @@ public void testAnnotatedStreamedText_MultipleParts() throws Exception
JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket);
// Trigger Events
- localEndpoint.onOpen(channel);
+ localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hel").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("lo ").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("Wor").setFin(false), Callback.NOOP);
@@ -208,7 +208,7 @@ public void testListenerPartialSocket() throws Exception
JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket);
// Trigger Events
- localEndpoint.onOpen(channel);
+ localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("World").setFin(true), Callback.NOOP);
@@ -238,7 +238,7 @@ public void testListenerBasicSocket() throws Exception
JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket);
// Trigger Events
- localEndpoint.onOpen(channel);
+ localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("World").setFin(true), Callback.NOOP);
@@ -264,10 +264,10 @@ public void testListenerBasicSocket_Error() throws Exception
JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket);
// Trigger Events
- localEndpoint.onOpen(channel);
+ localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP);
- localEndpoint.onError(new RuntimeException("Nothing to see here"));
+ localEndpoint.onError(new RuntimeException("Nothing to see here"), Callback.NOOP);
// Validate Events
socket.events.assertEvents(
@@ -284,7 +284,7 @@ public void testListenerFrameSocket() throws Exception
JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket);
// Trigger Events
- localEndpoint.onOpen(channel);
+ localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("World").setFin(true), Callback.NOOP);
@@ -314,7 +314,7 @@ public void testListenerPingPongSocket() throws Exception
JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket);
// Trigger Events
- localEndpoint.onOpen(channel);
+ localEndpoint.onOpen(channel, Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP);
localEndpoint.onFrame(new Frame(OpCode.PING).setPayload("You there?"), Callback.NOOP);
diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java
index 19d1b19f5ec6..bf0a18263557 100644
--- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java
+++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java
@@ -18,11 +18,6 @@
package org.eclipse.jetty.websocket.core;
-import org.eclipse.jetty.io.ByteBufferPool;
-import org.eclipse.jetty.util.Callback;
-import org.eclipse.jetty.websocket.core.client.UpgradeRequest;
-import org.eclipse.jetty.websocket.core.server.Negotiation;
-
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
@@ -30,6 +25,11 @@
import java.util.List;
import java.util.Map;
+import org.eclipse.jetty.io.ByteBufferPool;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.websocket.core.client.UpgradeRequest;
+import org.eclipse.jetty.websocket.core.server.Negotiation;
+
/**
* Interface for local WebSocket Endpoint Frame handling.
*
@@ -124,74 +124,6 @@ default boolean isDemanding()
return false;
}
-
- interface Adaptor extends FrameHandler
- {
- @Override
- default void onOpen(CoreSession coreSession, Callback callback)
- {
- try
- {
- onOpen(coreSession);
- callback.succeeded();
- }
- catch(Throwable t)
- {
- callback.failed(t);
- }
- }
-
- default void onOpen(CoreSession coreSession) throws Exception {}
-
- @Override
- default void onFrame(Frame frame, Callback callback)
- {
- try
- {
- onFrame(frame);
- callback.succeeded();
- }
- catch(Throwable t)
- {
- callback.failed(t);
- }
- }
-
- default void onFrame(Frame frame) throws Exception {}
-
- @Override
- default void onClosed(CloseStatus closeStatus, Callback callback)
- {
- try
- {
- onClosed(closeStatus);
- callback.succeeded();
- }
- catch(Throwable t)
- {
- callback.failed(t);
- }
- }
- default void onClosed(CloseStatus closeStatus) throws Exception {}
-
- @Override
- default void onError(Throwable cause, Callback callback)
- {
- try
- {
- onError(cause);
- callback.succeeded();
- }
- catch(Throwable t)
- {
- callback.failed(t);
- }
- }
-
- default void onError(Throwable cause) throws Exception {}
- }
-
-
interface Configuration
{
@@ -568,7 +500,7 @@ class ConfigurationCustomizer implements Customizer, Configuration
@Override
public Duration getIdleTimeout()
{
- return timeout==null ? Duration.ofSeconds(0) : timeout;
+ return timeout==null ? Duration.ZERO : timeout;
}
@Override
diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/MessageHandler.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/MessageHandler.java
index 566e93a33144..37828331948b 100644
--- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/MessageHandler.java
+++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/MessageHandler.java
@@ -18,14 +18,18 @@
package org.eclipse.jetty.websocket.core;
-import org.eclipse.jetty.util.*;
-import org.eclipse.jetty.util.log.Log;
-import org.eclipse.jetty.util.log.Logger;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.function.Consumer;
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.IteratingNestedCallback;
+import org.eclipse.jetty.util.Utf8Appendable;
+import org.eclipse.jetty.util.Utf8StringBuilder;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+
/**
* A utility implementation of FrameHandler that defragments
* text frames into a String message before calling {@link #onText(String, Callback)}.
@@ -33,7 +37,7 @@
* may extend {@link #isDemanding()} to return true and then explicityly control
* demand with calls to {@link org.eclipse.jetty.websocket.core.FrameHandler.CoreSession#demand(long)}
*/
-public class MessageHandler implements FrameHandler.Adaptor
+public class MessageHandler implements FrameHandler
{
public static MessageHandler from(Consumer onText, Consumer onBinary)
@@ -124,15 +128,16 @@ public void setMaxBinaryMessageSize(int maxBinaryMessageSize)
this.maxBinaryMessageSize = maxBinaryMessageSize;
}
- @Override
- public void onOpen(CoreSession coreSession) throws Exception
+ public CoreSession getCoreSession()
{
- this.coreSession = coreSession;
+ return coreSession;
}
- public CoreSession getCoreSession()
+ @Override
+ public void onOpen(CoreSession coreSession, Callback callback)
{
- return coreSession;
+ this.coreSession = coreSession;
+ callback.succeeded();
}
@Override
@@ -226,7 +231,38 @@ protected void appendByte(byte b) throws IOException
}
@Override
- public final void onFrame(Frame frame){}
+ public void onError(Throwable cause, Callback callback)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug(this + " onError ", cause);
+ callback.succeeded();
+ }
+
+ @Override
+ public void onClosed(CloseStatus closeStatus, Callback callback)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("{} onClosed {}", this, closeStatus);
+ if (utf8StringBuilder != null && utf8StringBuilder.length() > 0 && closeStatus.isNormal())
+ LOG.warn("{} closed with partial message: {} chars", utf8StringBuilder.length());
+
+ if (binaryMessage != null)
+ {
+ if (BufferUtil.hasContent(binaryMessage))
+ LOG.warn("{} closed with partial message: {} bytes", binaryMessage.remaining());
+
+ getCoreSession().getByteBufferPool().release(binaryMessage);
+ binaryMessage = null;
+ }
+
+ if (utf8StringBuilder != null)
+ {
+ utf8StringBuilder.reset();
+ utf8StringBuilder = null;
+ }
+ coreSession = null;
+ callback.succeeded();
+ }
private void onTextFrame(Frame frame, Callback callback)
{
@@ -422,36 +458,4 @@ protected Action process() throws Throwable
}
}.iterate();
}
-
- @Override
- public void onClosed(CloseStatus closeStatus) throws Exception
- {
- if (LOG.isDebugEnabled())
- LOG.debug("{} onClosed {}", this, closeStatus);
- if (utf8StringBuilder != null && utf8StringBuilder.length() > 0 && closeStatus.isNormal())
- LOG.warn("{} closed with partial message: {} chars", utf8StringBuilder.length());
-
- if (binaryMessage != null)
- {
- if (BufferUtil.hasContent(binaryMessage))
- LOG.warn("{} closed with partial message: {} bytes", binaryMessage.remaining());
-
- getCoreSession().getByteBufferPool().release(binaryMessage);
- binaryMessage = null;
- }
-
- if (utf8StringBuilder != null)
- {
- utf8StringBuilder.reset();
- utf8StringBuilder = null;
- }
- coreSession = null;
- }
-
- @Override
- public void onError(Throwable cause) throws Exception
- {
- if (LOG.isDebugEnabled())
- LOG.debug(this + " onError ", cause);
- }
}
diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java
index 840ad5a8c367..7db48c1d95fc 100644
--- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java
+++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java
@@ -18,16 +18,6 @@
package org.eclipse.jetty.websocket.core.internal;
-import org.eclipse.jetty.io.ByteBufferPool;
-import org.eclipse.jetty.util.Callback;
-import org.eclipse.jetty.util.IteratingCallback;
-import org.eclipse.jetty.util.Utf8Appendable;
-import org.eclipse.jetty.util.component.Dumpable;
-import org.eclipse.jetty.util.log.Log;
-import org.eclipse.jetty.util.log.Logger;
-import org.eclipse.jetty.websocket.core.*;
-import org.eclipse.jetty.websocket.core.internal.Parser.ParsedFrame;
-
import java.io.IOException;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
@@ -40,6 +30,28 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
+import org.eclipse.jetty.io.ByteBufferPool;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.IteratingCallback;
+import org.eclipse.jetty.util.Utf8Appendable;
+import org.eclipse.jetty.util.component.Dumpable;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.websocket.core.Behavior;
+import org.eclipse.jetty.websocket.core.CloseException;
+import org.eclipse.jetty.websocket.core.CloseStatus;
+import org.eclipse.jetty.websocket.core.Extension;
+import org.eclipse.jetty.websocket.core.ExtensionConfig;
+import org.eclipse.jetty.websocket.core.Frame;
+import org.eclipse.jetty.websocket.core.FrameHandler;
+import org.eclipse.jetty.websocket.core.IncomingFrames;
+import org.eclipse.jetty.websocket.core.OpCode;
+import org.eclipse.jetty.websocket.core.OutgoingFrames;
+import org.eclipse.jetty.websocket.core.ProtocolException;
+import org.eclipse.jetty.websocket.core.WebSocketConstants;
+import org.eclipse.jetty.websocket.core.WebSocketTimeoutException;
+import org.eclipse.jetty.websocket.core.internal.Parser.ParsedFrame;
+
import static org.eclipse.jetty.util.Callback.NOOP;
/**
@@ -469,9 +481,6 @@ public void onFrame(Frame frame, Callback callback)
@Override
public void sendFrame(Frame frame, Callback callback, boolean batch)
{
- if (LOG.isDebugEnabled())
- LOG.debug("sendFrame({}, {}, {})", frame, callback, batch);
-
try
{
assertValidOutgoing(frame);
@@ -487,26 +496,29 @@ public void sendFrame(Frame frame, Callback callback, boolean batch)
synchronized(flusher)
{
boolean closeConnection = channelState.onOutgoingFrame(frame);
+ if (LOG.isDebugEnabled())
+ LOG.debug("sendFrame({}, {}, {}) {}", frame, callback, batch, closeConnection);
- if (frame.getOpCode() == OpCode.CLOSE)
+ if (closeConnection)
{
- if (LOG.isDebugEnabled())
- LOG.debug("close({}, {}, {})", CloseStatus.getCloseStatus(frame), callback, batch);
-
- if (closeConnection)
- {
- callback = new Callback.Nested(callback)
- {
- @Override
- public void completed()
- {
- closeConnection(AbnormalCloseStatus.getCause(CloseStatus.getCloseStatus(frame)), channelState.getCloseStatus(),NOOP);
- }
- };
- }
+ Throwable cause = AbnormalCloseStatus.getCause(CloseStatus.getCloseStatus(frame));
+
+ Callback closeConnectionCallback = Callback.from(
+ ()->closeConnection(cause, channelState.getCloseStatus(), callback),
+ x->closeConnection(cause, channelState.getCloseStatus(), Callback.from(
+ ()-> callback.failed(x),
+ x2->
+ {
+ x.addSuppressed(x2);
+ callback.failed(x);
+ })));
+
+ flusher.queue.offer(new FrameEntry(frame, closeConnectionCallback, false));
+ }
+ else
+ {
+ flusher.queue.offer(new FrameEntry(frame, callback, batch));
}
-
- flusher.queue.offer(new FrameEntry(frame, callback, batch));
}
flusher.iterate();
}
diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/AbstractTestFrameHandler.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/AbstractTestFrameHandler.java
index 0ff0a1fab501..b8d53db1e310 100644
--- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/AbstractTestFrameHandler.java
+++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/AbstractTestFrameHandler.java
@@ -18,14 +18,14 @@
package org.eclipse.jetty.websocket.core;
+import java.nio.ByteBuffer;
+
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
-import java.nio.ByteBuffer;
-
import static org.eclipse.jetty.websocket.core.OpCode.PONG;
/**
@@ -35,7 +35,7 @@
* NOTE: The introduction of WebSocket over HTTP/2 might change the behavior and implementation some.
*
*/
-public class AbstractTestFrameHandler implements FrameHandler.Adaptor
+public class AbstractTestFrameHandler implements SynchronousFrameHandler
{
private Logger LOG = Log.getLogger(AbstractTestFrameHandler.class);
private byte partial = OpCode.UNDEFINED;
diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/MessageHandlerTest.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/MessageHandlerTest.java
index 50f7107e3a4e..f7d5b3c86e85 100644
--- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/MessageHandlerTest.java
+++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/MessageHandlerTest.java
@@ -111,7 +111,7 @@ public boolean isDemanding()
}
};
- handler.onOpen(session);
+ handler.onOpen(session, NOOP);
}
@Test
@@ -350,7 +350,7 @@ public void testTextTooLarge() throws Exception
FutureCallback callback;
handler.setMaxTextMessageSize(4);
- handler.onOpen(session);
+ handler.onOpen(session, NOOP);
callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.TEXT, true, "Testing"), callback);
@@ -369,7 +369,7 @@ public void testSplitTextTooLarge() throws Exception
FutureCallback callback;
handler.setMaxTextMessageSize(4);
- handler.onOpen(session);
+ handler.onOpen(session, NOOP);
callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.TEXT, false, "123"), callback);
@@ -570,7 +570,7 @@ public void testBinaryTooLarge() throws Exception
FutureCallback callback;
handler.setMaxBinaryMessageSize(4);
- handler.onOpen(session);
+ handler.onOpen(session, NOOP);
callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.BINARY, true, "Testing"), callback);
@@ -589,7 +589,7 @@ public void testSplitBinaryTooLarge() throws Exception
FutureCallback callback;
handler.setMaxBinaryMessageSize(4);
- handler.onOpen(session);
+ handler.onOpen(session, NOOP);
callback = new FutureCallback();
handler.onFrame(new Frame(OpCode.BINARY, false, "123"), callback);
@@ -653,7 +653,7 @@ protected void onBinary(ByteBuffer message, Callback callback)
}
};
- handler.onOpen(session);
+ handler.onOpen(session, NOOP);
FutureCallback callback;
@@ -681,7 +681,7 @@ protected void onText(String message, Callback callback)
}
};
- handler.onOpen(session);
+ handler.onOpen(session, NOOP);
FutureCallback callback;
diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/SynchronousFrameHandler.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/SynchronousFrameHandler.java
new file mode 100644
index 000000000000..07d84952dd6e
--- /dev/null
+++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/SynchronousFrameHandler.java
@@ -0,0 +1,87 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.websocket.core;
+
+import org.eclipse.jetty.util.Callback;
+
+public interface SynchronousFrameHandler extends FrameHandler
+{
+ @Override
+ default void onOpen(CoreSession coreSession, Callback callback)
+ {
+ try
+ {
+ onOpen(coreSession);
+ callback.succeeded();
+ }
+ catch(Throwable t)
+ {
+ callback.failed(t);
+ }
+ }
+
+ default void onOpen(CoreSession coreSession) throws Exception {}
+
+ @Override
+ default void onFrame(Frame frame, Callback callback)
+ {
+ try
+ {
+ onFrame(frame);
+ callback.succeeded();
+ }
+ catch(Throwable t)
+ {
+ callback.failed(t);
+ }
+ }
+
+ default void onFrame(Frame frame) throws Exception {}
+
+ @Override
+ default void onClosed(CloseStatus closeStatus, Callback callback)
+ {
+ try
+ {
+ onClosed(closeStatus);
+ callback.succeeded();
+ }
+ catch(Throwable t)
+ {
+ callback.failed(t);
+ }
+ }
+ default void onClosed(CloseStatus closeStatus) throws Exception {}
+
+ @Override
+ default void onError(Throwable cause, Callback callback)
+ {
+ try
+ {
+ onError(cause);
+ callback.succeeded();
+ }
+ catch(Throwable t)
+ {
+ callback.failed(t);
+ }
+ }
+
+ default void onError(Throwable cause) throws Exception {}
+}
diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/TestFrameHandler.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/TestFrameHandler.java
index 0f368b356193..5f644880238c 100644
--- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/TestFrameHandler.java
+++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/TestFrameHandler.java
@@ -18,18 +18,18 @@
package org.eclipse.jetty.websocket.core;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-
-public class TestFrameHandler implements FrameHandler.Adaptor
+public class TestFrameHandler implements SynchronousFrameHandler
{
- private static Logger LOG = Log.getLogger(TestFrameHandler.class);
+ private static Logger LOG = Log.getLogger(SynchronousFrameHandler.class);
private CoreSession session;
diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketCloseTest.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketCloseTest.java
index 3011cbe3147b..ca72b60db6b3 100644
--- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketCloseTest.java
+++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketCloseTest.java
@@ -18,6 +18,11 @@
package org.eclipse.jetty.websocket.core;
+import java.net.Socket;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.Server;
@@ -39,16 +44,14 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
-import java.net.Socket;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
import static org.eclipse.jetty.util.Callback.NOOP;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
-import static org.junit.jupiter.api.Assertions.*;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Tests of a core server with a fake client
@@ -352,7 +355,7 @@ public void onFrameThrows_OSHUT() throws Exception
assertThat(server.handler.closeStatus.getReason(), containsString("onReceiveFrame throws for binary frames"));
}
- static class TestFrameHandler implements FrameHandler.Adaptor
+ static class TestFrameHandler implements SynchronousFrameHandler
{
private CoreSession session;
String state;
diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketOpenTest.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketOpenTest.java
index 5dbd77be63b4..a5dea27d5022 100644
--- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketOpenTest.java
+++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketOpenTest.java
@@ -18,6 +18,12 @@
package org.eclipse.jetty.websocket.core;
+import java.net.Socket;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Exchanger;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.Server;
@@ -39,16 +45,14 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
-import java.net.Socket;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Exchanger;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
-
import static org.eclipse.jetty.util.Callback.NOOP;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.*;
-import static org.junit.jupiter.api.Assertions.*;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Tests of a core server with a fake client
@@ -83,7 +87,7 @@ public void testSendFrameInOnOpen() throws Exception
setup((s,c)->
{
assertThat(s.toString(),containsString("CONNECTED"));
- TestFrameHandler.sendText(s,"Hello", c);
+ WebSocketOpenTest.TestFrameHandler.sendText(s,"Hello", c);
s.demand(1);
return null;
});
@@ -170,7 +174,7 @@ public void testAsyncOnOpen() throws Exception
Thread.sleep(100);
// Can send while onOpen is active
- TestFrameHandler.sendText(session,"Hello", NOOP);
+ WebSocketOpenTest.TestFrameHandler.sendText(session,"Hello", NOOP);
Parser.ParsedFrame frame = receiveFrame(client.getInputStream());
assertThat(frame.getPayloadAsUTF8(),is("Hello"));
@@ -203,7 +207,7 @@ public void testAsyncOnOpen() throws Exception
- static class TestFrameHandler implements FrameHandler.Adaptor
+ static class TestFrameHandler implements SynchronousFrameHandler
{
private CoreSession session;
private BiFunction onOpen;
@@ -351,7 +355,7 @@ public void sendFrame(Frame frame)
public void sendText(String text)
{
LOG.info("sending {}...", text);
- TestFrameHandler.sendText(handler.session, text);
+ WebSocketOpenTest.TestFrameHandler.sendText(handler.session, text);
}
public void close()
diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/chat/ChatWebSocketServer.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/chat/ChatWebSocketServer.java
index 9dacb1b4a0e3..ad9f7fdc93d0 100644
--- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/chat/ChatWebSocketServer.java
+++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/chat/ChatWebSocketServer.java
@@ -18,6 +18,15 @@
package org.eclipse.jetty.websocket.core.chat;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
@@ -34,14 +43,6 @@
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
import static org.eclipse.jetty.util.Callback.NOOP;
public class ChatWebSocketServer
@@ -67,14 +68,12 @@ private FrameHandler negotiate(Negotiation negotiation)
// + MUST return the FrameHandler or null or exception?
return new MessageHandler()
{
-
@Override
- public void onOpen(CoreSession coreSession) throws Exception
+ public void onOpen(CoreSession coreSession, Callback callback)
{
LOG.debug("onOpen {}", coreSession);
setMaxTextMessageSize(2 * 1024);
- super.onOpen(coreSession);
- members.add(this);
+ super.onOpen(coreSession, Callback.from(()->{members.add(this); callback.succeeded();},x->callback.failed(x)));
}
@Override
@@ -92,10 +91,10 @@ public void onText(String message, Callback callback)
}
@Override
- public void onClosed(CloseStatus closeStatus) throws Exception
+ public void onClosed(CloseStatus closeStatus, Callback callback)
{
LOG.debug("onClosed {}", closeStatus);
- super.onClosed(closeStatus);
+ super.onClosed(closeStatus, Callback.from(()->members.remove(this),callback));
members.remove(this);
}
};
From 6f7395c8f0f10417430fc9a931f02895d278af05 Mon Sep 17 00:00:00 2001
From: Greg Wilkins
Date: Tue, 29 Jan 2019 10:38:12 +1100
Subject: [PATCH 6/8] Issue #3290 async onOpen, onClose and onError
Changes after review:
+ fixed import order
Signed-off-by: Greg Wilkins
---
.../org/eclipse/jetty/client/HttpRequest.java | 36 ++++++++++----
.../jetty/websocket/api/UpgradeRequest.java | 4 +-
.../websocket/core/client/UpgradeRequest.java | 47 +++++++++++++------
.../jetty/websocket/core/internal/Parser.java | 13 +++--
.../core/internal/WebSocketConnection.java | 24 ++++++----
.../core/server/WebSocketServerTest.java | 31 ++++++++----
6 files changed, 109 insertions(+), 46 deletions(-)
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java
index 1476153c980f..2ea5cb6ed43c 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java
@@ -18,21 +18,24 @@
package org.eclipse.jetty.client;
-import org.eclipse.jetty.client.api.*;
-import org.eclipse.jetty.client.util.FutureResponseListener;
-import org.eclipse.jetty.client.util.PathContentProvider;
-import org.eclipse.jetty.http.*;
-import org.eclipse.jetty.util.Callback;
-import org.eclipse.jetty.util.Fields;
-
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.HttpCookie;
-import java.net.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.nio.charset.UnsupportedCharsetException;
import java.nio.file.Path;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -40,6 +43,21 @@
import java.util.function.BiFunction;
import java.util.function.Supplier;
+import org.eclipse.jetty.client.api.ContentProvider;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.client.api.Response;
+import org.eclipse.jetty.client.api.Result;
+import org.eclipse.jetty.client.util.FutureResponseListener;
+import org.eclipse.jetty.client.util.PathContentProvider;
+import org.eclipse.jetty.http.HttpField;
+import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.http.HttpMethod;
+import org.eclipse.jetty.http.HttpVersion;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.Fields;
+
public class HttpRequest implements Request
{
private static final URI NULL_URI = URI.create("null:0");
diff --git a/jetty-websocket/jetty-websocket-api/src/main/java/org/eclipse/jetty/websocket/api/UpgradeRequest.java b/jetty-websocket/jetty-websocket-api/src/main/java/org/eclipse/jetty/websocket/api/UpgradeRequest.java
index 610ea4960b7a..df0458f01b5a 100644
--- a/jetty-websocket/jetty-websocket-api/src/main/java/org/eclipse/jetty/websocket/api/UpgradeRequest.java
+++ b/jetty-websocket/jetty-websocket-api/src/main/java/org/eclipse/jetty/websocket/api/UpgradeRequest.java
@@ -18,14 +18,14 @@
package org.eclipse.jetty.websocket.api;
-import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
-
import java.net.HttpCookie;
import java.net.URI;
import java.security.Principal;
import java.util.List;
import java.util.Map;
+import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
+
/**
* The HTTP Upgrade to WebSocket Request
*/
diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/client/UpgradeRequest.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/client/UpgradeRequest.java
index 901b3799a4fd..183a6df6b88a 100644
--- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/client/UpgradeRequest.java
+++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/client/UpgradeRequest.java
@@ -18,13 +18,33 @@
package org.eclipse.jetty.websocket.core.client;
-import org.eclipse.jetty.client.*;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Consumer;
+
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.HttpConversation;
+import org.eclipse.jetty.client.HttpRequest;
+import org.eclipse.jetty.client.HttpResponse;
+import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.http.HttpConnectionUpgrader;
-import org.eclipse.jetty.http.*;
+import org.eclipse.jetty.http.HttpField;
+import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.http.HttpMethod;
+import org.eclipse.jetty.http.HttpScheme;
+import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.B64Code;
@@ -32,18 +52,17 @@
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
-import org.eclipse.jetty.websocket.core.*;
-import org.eclipse.jetty.websocket.core.internal.*;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Locale;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.function.Consumer;
+import org.eclipse.jetty.websocket.core.Behavior;
+import org.eclipse.jetty.websocket.core.ExtensionConfig;
+import org.eclipse.jetty.websocket.core.FrameHandler;
+import org.eclipse.jetty.websocket.core.UpgradeException;
+import org.eclipse.jetty.websocket.core.WebSocketConstants;
+import org.eclipse.jetty.websocket.core.WebSocketException;
+import org.eclipse.jetty.websocket.core.internal.ExtensionStack;
+import org.eclipse.jetty.websocket.core.internal.Negotiated;
+import org.eclipse.jetty.websocket.core.internal.WebSocketChannel;
+import org.eclipse.jetty.websocket.core.internal.WebSocketConnection;
+import org.eclipse.jetty.websocket.core.internal.WebSocketCore;
public abstract class UpgradeRequest extends HttpRequest implements Response.CompleteListener, HttpConnectionUpgrader
{
diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/Parser.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/Parser.java
index 2ba16b2d53a7..40fc9aa104cf 100644
--- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/Parser.java
+++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/Parser.java
@@ -18,15 +18,20 @@
package org.eclipse.jetty.websocket.core.internal;
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
-import org.eclipse.jetty.websocket.core.*;
-
-import java.io.Closeable;
-import java.nio.ByteBuffer;
+import org.eclipse.jetty.websocket.core.CloseStatus;
+import org.eclipse.jetty.websocket.core.Frame;
+import org.eclipse.jetty.websocket.core.MessageTooLargeException;
+import org.eclipse.jetty.websocket.core.OpCode;
+import org.eclipse.jetty.websocket.core.ProtocolException;
+import org.eclipse.jetty.websocket.core.WebSocketException;
/**
* Parsing of a frames in WebSocket land.
diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java
index 5adc002644fa..cc393c31e1a8 100644
--- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java
+++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java
@@ -18,14 +18,6 @@
package org.eclipse.jetty.websocket.core.internal;
-import org.eclipse.jetty.io.*;
-import org.eclipse.jetty.util.BufferUtil;
-import org.eclipse.jetty.util.Callback;
-import org.eclipse.jetty.util.component.Dumpable;
-import org.eclipse.jetty.util.log.Log;
-import org.eclipse.jetty.util.log.Logger;
-import org.eclipse.jetty.websocket.core.*;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@@ -33,6 +25,22 @@
import java.util.Random;
import java.util.concurrent.Executor;
+import org.eclipse.jetty.io.AbstractConnection;
+import org.eclipse.jetty.io.ByteBufferPool;
+import org.eclipse.jetty.io.Connection;
+import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.io.RetainableByteBuffer;
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.component.Dumpable;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.websocket.core.Behavior;
+import org.eclipse.jetty.websocket.core.Frame;
+import org.eclipse.jetty.websocket.core.MessageTooLargeException;
+import org.eclipse.jetty.websocket.core.ProtocolException;
+import org.eclipse.jetty.websocket.core.WebSocketTimeoutException;
+
/**
* Provides the implementation of {@link org.eclipse.jetty.io.Connection} that is suitable for WebSocket
*/
diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/server/WebSocketServerTest.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/server/WebSocketServerTest.java
index e09b4e60a447..35be3f49164a 100644
--- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/server/WebSocketServerTest.java
+++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/server/WebSocketServerTest.java
@@ -18,6 +18,12 @@
package org.eclipse.jetty.websocket.core.server;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.NetworkConnector;
@@ -32,21 +38,28 @@
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
-import org.eclipse.jetty.websocket.core.*;
+import org.eclipse.jetty.websocket.core.CloseStatus;
+import org.eclipse.jetty.websocket.core.Frame;
+import org.eclipse.jetty.websocket.core.OpCode;
+import org.eclipse.jetty.websocket.core.RawFrameBuilder;
+import org.eclipse.jetty.websocket.core.TestFrameHandler;
+import org.eclipse.jetty.websocket.core.TestWebSocketNegotiator;
+import org.eclipse.jetty.websocket.core.TestWebSocketUpgradeHandler;
+import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
+import org.eclipse.jetty.websocket.core.WebSocketTester;
import org.eclipse.jetty.websocket.core.server.internal.RFC6455Handshaker;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.*;
-import static org.junit.jupiter.api.Assertions.*;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Tests of a core server with a fake client
From 3ad584eaef2e3eb0117ef8ba9378411bfacb9054 Mon Sep 17 00:00:00 2001
From: Greg Wilkins
Date: Tue, 29 Jan 2019 10:45:54 +1100
Subject: [PATCH 7/8] Issue #3290 async onOpen, onClose and onError
Changes after review:
+ failure to send abnormal close closes connection prior to failing
callback.
Signed-off-by: Greg Wilkins
---
.../core/internal/WebSocketChannel.java | 25 +++++++++++--------
1 file changed, 14 insertions(+), 11 deletions(-)
diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java
index 7db48c1d95fc..6e615b329dd0 100644
--- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java
+++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java
@@ -524,19 +524,22 @@ public void sendFrame(Frame frame, Callback callback, boolean batch)
}
catch (Throwable ex)
{
- try
- {
- callback.failed(ex);
- }
- finally
+ if (frame.getOpCode() == OpCode.CLOSE)
{
- if (frame.getOpCode() == OpCode.CLOSE)
- {
- CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
- if (closeStatus instanceof AbnormalCloseStatus)
- closeConnection(null, closeStatus, NOOP);
- }
+ CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
+ if (closeStatus instanceof AbnormalCloseStatus)
+ closeConnection(null, closeStatus, Callback.from(
+ ()->callback.failed(ex),
+ x2->
+ {
+ ex.addSuppressed(x2);
+ callback.failed(ex);
+ }));
+ else
+ callback.failed(ex);
}
+ else
+ callback.failed(ex);
}
}
From f5751618bd3875cb1511bfeea12b4fb72e79ccc8 Mon Sep 17 00:00:00 2001
From: Greg Wilkins
Date: Tue, 29 Jan 2019 13:08:35 +1100
Subject: [PATCH 8/8] Issue #3290 async onOpen, onClose and onError
Fixed OSGi tests.
Signed-off-by: Greg Wilkins
---
.../jetty/websocket/core/FrameHandler.java | 51 ++++++++++---------
.../core/internal/WebSocketChannelState.java | 36 ++++++++-----
.../client/WebSocketClientServerTest.java | 1 +
3 files changed, 50 insertions(+), 38 deletions(-)
diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java
index bf0a18263557..cb54e59fadbf 100644
--- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java
+++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java
@@ -90,6 +90,16 @@ public interface FrameHandler extends IncomingFrames
*/
void onFrame(Frame frame, Callback callback);
+ /**
+ * An error has occurred or been detected in websocket-core and being reported to FrameHandler.
+ * A call to onError will be followed by a call to {@link #onClosed(CloseStatus, Callback)} giving the close status
+ * derived from the error.
+ *
+ * @param cause the reason for the error
+ * @param callback the callback to indicate success in processing (or failure)
+ */
+ void onError(Throwable cause, Callback callback);
+
/**
* This is the Close Handshake Complete event.
*
@@ -102,15 +112,6 @@ public interface FrameHandler extends IncomingFrames
*/
void onClosed(CloseStatus closeStatus, Callback callback);
- /**
- * An error has occurred or been detected in websocket-core and being reported to FrameHandler.
- * A call to onError will be followed by a call to {@link #onClosed(CloseStatus, Callback)} giving the close status
- * derived from the error.
- *
- * @param cause the reason for the error
- * @param callback the callback to indicate success in processing (or failure)
- */
- void onError(Throwable cause, Callback callback);
/**
* Does the FrameHandler manage it's own demand?
@@ -216,22 +217,6 @@ interface CoreSession extends OutgoingFrames, Configuration
*/
boolean isSecure();
- /**
- * Issue a harsh abort of the underlying connection.
- *
- * This will terminate the connection, without sending a websocket close frame.
- * No WebSocket Protocol close handshake will be performed.
- *
- *
- * Once called, any read/write activity on the websocket from this point will be indeterminate.
- * This can result in the {@link #onError(Throwable,Callback)} event being called indicating any issue that arises.
- *
- *
- * Once the underlying connection has been determined to be closed, the {@link #onClosed(CloseStatus,Callback)} event will be called.
- *
- */
- void abort();
-
/**
* @return Client or Server behaviour
*/
@@ -294,6 +279,22 @@ interface CoreSession extends OutgoingFrames, Configuration
*/
void close(int statusCode, String reason, Callback callback);
+ /**
+ * Issue a harsh abort of the underlying connection.
+ *
+ * This will terminate the connection, without sending a websocket close frame.
+ * No WebSocket Protocol close handshake will be performed.
+ *
+ *
+ * Once called, any read/write activity on the websocket from this point will be indeterminate.
+ * This can result in the {@link #onError(Throwable,Callback)} event being called indicating any issue that arises.
+ *
+ *
+ * Once the underlying connection has been determined to be closed, the {@link #onClosed(CloseStatus,Callback)} event will be called.
+ *
+ */
+ void abort();
+
/**
* Manage flow control by indicating demand for handling Frames. A call to
* {@link FrameHandler#onFrame(Frame, Callback)} will only be made if a
diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannelState.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannelState.java
index 251dc5e6d60a..eceec214f699 100644
--- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannelState.java
+++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannelState.java
@@ -58,21 +58,21 @@ public void onOpen()
{
synchronized (this)
{
- if (_channelState != State.CONNECTED)
- throw new IllegalStateException(_channelState.toString());
+ switch(_channelState)
+ {
+ case CONNECTED:
+ _channelState = State.OPEN;
+ break;
- _channelState = State.OPEN;
- }
- }
+ case OSHUT:
+ case CLOSED:
+ // Already closed in onOpen handler
+ break;
- @Override
- public String toString()
- {
- return String.format("%s@%x{%s,i=%s,o=%s,c=%s}",getClass().getSimpleName(),hashCode(),
- _channelState,
- OpCode.name(_incomingContinuation),
- OpCode.name(_outgoingContinuation),
- _closeStatus);
+ default:
+ throw new IllegalStateException(_channelState.toString());
+ }
+ }
}
@@ -205,6 +205,16 @@ else if (frame.isDataFrame())
}
+ @Override
+ public String toString()
+ {
+ return String.format("%s@%x{%s,i=%s,o=%s,c=%s}",getClass().getSimpleName(),hashCode(),
+ _channelState,
+ OpCode.name(_incomingContinuation),
+ OpCode.name(_outgoingContinuation),
+ _closeStatus);
+ }
+
private static byte checkDataSequence(byte opcode, boolean fin, byte lastOpCode) throws ProtocolException
{
switch (opcode)
diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/client/WebSocketClientServerTest.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/client/WebSocketClientServerTest.java
index f564272c260d..bf642358376f 100644
--- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/client/WebSocketClientServerTest.java
+++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/client/WebSocketClientServerTest.java
@@ -113,6 +113,7 @@ public void onFrame(Frame frame, Callback callback)
{
LOG.info("channel aborted");
getCoreSession().abort();
+ callback.failed(new Exception());
}
else
{