diff --git a/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketContainer.java b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketContainer.java index 5f91aa9d13c9..e7185b1fc057 100644 --- a/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketContainer.java +++ b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketContainer.java @@ -18,11 +18,13 @@ package org.eclipse.jetty.websocket.javax.common; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; - +import java.util.function.Consumer; import javax.websocket.Extension; import javax.websocket.Session; import javax.websocket.WebSocketContainer; @@ -30,13 +32,24 @@ import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.DecoratedObjectFactory; import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry; public abstract class JavaxWebSocketContainer extends ContainerLifeCycle implements javax.websocket.WebSocketContainer { + private final static Logger LOG = Log.getLogger(JavaxWebSocketContainer.class); + private final SessionTracker sessionTracker = new SessionTracker(); private long defaultAsyncSendTimeout = -1; private int defaultMaxBinaryMessageBufferSize = 64 * 1024; private int defaultMaxTextMessageBufferSize = 64 * 1024; + private List sessionListeners = new ArrayList<>(); + + public JavaxWebSocketContainer() + { + addSessionListener(sessionTracker); + addBean(sessionTracker); + } public abstract ByteBufferPool getBufferPool(); @@ -100,7 +113,7 @@ public Set getInstalledExtensions() */ public Set getOpenSessions() { - return new HashSet<>(getBeans(JavaxWebSocketSession.class)); + return sessionTracker.getSessions(); } public JavaxWebSocketFrameHandler newFrameHandler(Object websocketPojo, UpgradeRequest upgradeRequest, UpgradeResponse upgradeResponse, @@ -118,4 +131,45 @@ public void setAsyncSendTimeout(long timeoutInMillis) protected abstract WebSocketExtensionRegistry getExtensionRegistry(); protected abstract JavaxWebSocketFrameHandlerFactory getFrameHandlerFactory(); + + /** + * Register a WebSocketSessionListener with the container + * + * @param listener the listener + */ + public void addSessionListener(JavaxWebSocketSessionListener listener) + { + sessionListeners.add(listener); + } + + /** + * Remove a WebSocketSessionListener from the container + * + * @param listener the listener + * @return true if listener was present and removed + */ + public boolean removeSessionListener(JavaxWebSocketSessionListener listener) + { + return sessionListeners.remove(listener); + } + + /** + * Notify Session Listeners of events + * + * @param consumer the consumer to pass to each listener + */ + public void notifySessionListeners(Consumer consumer) + { + for (JavaxWebSocketSessionListener listener : sessionListeners) + { + try + { + consumer.accept(listener); + } + catch (Throwable x) + { + LOG.info("Exception while invoking listener " + listener, x); + } + } + } } diff --git a/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java index 1e11426fe81a..0a2c8885ad3c 100644 --- a/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java +++ b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java @@ -29,7 +29,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; - import javax.websocket.CloseReason; import javax.websocket.Decoder; import javax.websocket.EndpointConfig; @@ -229,7 +228,7 @@ public void onOpen(CoreSession coreSession, Callback callback) if (openHandle != null) openHandle.invoke(); - container.addBean(session, true); + container.notifySessionListeners((listener) -> listener.onJavaxWebSocketSessionOpened(session)); callback.succeeded(); futureSession.complete(session); } @@ -283,8 +282,8 @@ public void onClosed(CloseStatus closeStatus, Callback callback) CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.getCloseCode(closeStatus.getCode()), closeStatus.getReason()); closeHandle.invoke(closeReason); } - container.removeBean(session); callback.succeeded(); + container.notifySessionListeners((listener) -> listener.onJavaxWebSocketSessionClosed(session)); } catch (Throwable cause) { diff --git a/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketSession.java b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketSession.java index 85f0ea66b340..357d8538e8d6 100644 --- a/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketSession.java +++ b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketSession.java @@ -29,7 +29,6 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; - import javax.websocket.CloseReason; import javax.websocket.EndpointConfig; import javax.websocket.Extension; @@ -39,10 +38,12 @@ import javax.websocket.Session; import javax.websocket.WebSocketContainer; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.SharedBlockingCallback; import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.core.CloseStatus; import org.eclipse.jetty.websocket.core.ExtensionConfig; import org.eclipse.jetty.websocket.core.FrameHandler; import org.eclipse.jetty.websocket.javax.common.decoders.AvailableDecoders; @@ -551,6 +552,25 @@ public boolean isSecure() return coreSession.isSecure(); } + @Override + protected void doStop() + { + coreSession.close(CloseStatus.SHUTDOWN, "Container being shut down", new Callback() + { + @Override + public void succeeded() + { + coreSession.abort(); + } + + @Override + public void failed(Throwable x) + { + coreSession.abort(); + } + }); + } + @Override public synchronized void removeMessageHandler(MessageHandler handler) { diff --git a/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketSessionListener.java b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketSessionListener.java new file mode 100644 index 000000000000..bf4088f6e87b --- /dev/null +++ b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketSessionListener.java @@ -0,0 +1,26 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.javax.common; + +public interface JavaxWebSocketSessionListener +{ + void onJavaxWebSocketSessionOpened(JavaxWebSocketSession session); + + void onJavaxWebSocketSessionClosed(JavaxWebSocketSession session); +} diff --git a/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/SessionTracker.java b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/SessionTracker.java new file mode 100644 index 000000000000..e2ee48a731d3 --- /dev/null +++ b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/SessionTracker.java @@ -0,0 +1,59 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.javax.common; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import javax.websocket.Session; + +import org.eclipse.jetty.util.component.AbstractLifeCycle; +import org.eclipse.jetty.util.component.LifeCycle; + +public class SessionTracker extends AbstractLifeCycle implements JavaxWebSocketSessionListener +{ + private CopyOnWriteArraySet sessions = new CopyOnWriteArraySet<>(); + + public Set getSessions() + { + return Collections.unmodifiableSet(sessions); + } + + @Override + public void onJavaxWebSocketSessionOpened(JavaxWebSocketSession session) + { + sessions.add(session); + } + + @Override + public void onJavaxWebSocketSessionClosed(JavaxWebSocketSession session) + { + sessions.remove(sessions); + } + + @Override + protected void doStop() throws Exception + { + for (JavaxWebSocketSession session : sessions) + { + LifeCycle.stop(session); + } + super.doStop(); + } +} diff --git a/jetty-websocket/jetty-websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java b/jetty-websocket/jetty-websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java index 8d6b0f66841c..785904157626 100644 --- a/jetty-websocket/jetty-websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java +++ b/jetty-websocket/jetty-websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java @@ -24,16 +24,19 @@ import java.net.URI; import java.time.Duration; import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; +import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.DecoratedObjectFactory; import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.UpgradeRequest; @@ -43,14 +46,20 @@ import org.eclipse.jetty.websocket.client.impl.JettyClientUpgradeRequest; import org.eclipse.jetty.websocket.common.JettyWebSocketFrameHandler; import org.eclipse.jetty.websocket.common.JettyWebSocketFrameHandlerFactory; +import org.eclipse.jetty.websocket.common.SessionTracker; +import org.eclipse.jetty.websocket.common.WebSocketContainer; +import org.eclipse.jetty.websocket.common.WebSocketSessionListener; import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry; import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient; -public class WebSocketClient extends ContainerLifeCycle implements WebSocketPolicy +public class WebSocketClient extends ContainerLifeCycle implements WebSocketPolicy, WebSocketContainer { + private static final Logger LOG = Log.getLogger(WebSocketClient.class); private final WebSocketCoreClient coreClient; private final int id = ThreadLocalRandom.current().nextInt(); private final JettyWebSocketFrameHandlerFactory frameHandlerFactory; + private final List sessionListeners = new CopyOnWriteArrayList<>(); + private final SessionTracker sessionTracker = new SessionTracker(); private ClassLoader contextClassLoader; private DecoratedObjectFactory objectFactory; private WebSocketExtensionRegistry extensionRegistry; @@ -88,7 +97,9 @@ private WebSocketClient(WebSocketCoreClient coreClient) this.contextClassLoader = this.getClass().getClassLoader(); this.objectFactory = new DecoratedObjectFactory(); this.extensionRegistry = new WebSocketExtensionRegistry(); - this.frameHandlerFactory = new JettyWebSocketFrameHandlerFactory(getExecutor()); + this.frameHandlerFactory = new JettyWebSocketFrameHandlerFactory(this); + this.sessionListeners.add(sessionTracker); + addBean(sessionTracker); } public CompletableFuture connect(Object websocket, URI toUri) throws IOException @@ -124,6 +135,34 @@ public WebSocketBehavior getBehavior() return WebSocketBehavior.CLIENT; } + @Override + public void addSessionListener(WebSocketSessionListener listener) + { + sessionListeners.add(listener); + } + + @Override + public boolean removeSessionListener(WebSocketSessionListener listener) + { + return sessionListeners.remove(listener); + } + + @Override + public void notifySessionListeners(Consumer consumer) + { + for (WebSocketSessionListener listener : sessionListeners) + { + try + { + consumer.accept(listener); + } + catch (Throwable x) + { + LOG.info("Exception while invoking listener " + listener, x); + } + } + } + @Override public Duration getIdleTimeout() { @@ -224,6 +263,7 @@ public ByteBufferPool getBufferPool() return getHttpClient().getByteBufferPool(); } + @Override public Executor getExecutor() { return getHttpClient().getExecutor(); @@ -246,7 +286,7 @@ public DecoratedObjectFactory getObjectFactory() public Collection getOpenSessions() { - return Collections.unmodifiableSet(new HashSet<>(getBeans(Session.class))); + return sessionTracker.getSessions(); } public JettyWebSocketFrameHandler newFrameHandler(Object websocketPojo, UpgradeRequest upgradeRequest, UpgradeResponse upgradeResponse, diff --git a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java index a1a3e224e0ff..40413395f7d4 100644 --- a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java +++ b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java @@ -46,7 +46,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler { private final Logger log; - private final Executor executor; + private final WebSocketContainer container; private final Object endpointInstance; private MethodHandle openHandle; private MethodHandle closeHandle; @@ -73,7 +73,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler private MessageSink activeMessageSink; private WebSocketSessionImpl session; - public JettyWebSocketFrameHandler(Executor executor, + public JettyWebSocketFrameHandler(WebSocketContainer container, Object endpointInstance, UpgradeRequest upgradeRequest, UpgradeResponse upgradeResponse, MethodHandle openHandle, MethodHandle closeHandle, MethodHandle errorHandle, @@ -87,7 +87,7 @@ public JettyWebSocketFrameHandler(Executor executor, { this.log = Log.getLogger(endpointInstance.getClass()); - this.executor = executor; + this.container = container; this.endpointInstance = endpointInstance; this.upgradeRequest = upgradeRequest; this.upgradeResponse = upgradeResponse; @@ -131,6 +131,8 @@ public void onOpen(CoreSession coreSession, Callback callback) pingHandle = JettyWebSocketFrameHandlerFactory.bindTo(pingHandle, session); pongHandle = JettyWebSocketFrameHandlerFactory.bindTo(pongHandle, session); + Executor executor = container.getExecutor(); + if (textHandle != null) textSink = JettyWebSocketFrameHandlerFactory.createMessageSink(textHandle, textSinkClass, executor, coreSession.getMaxTextMessageSize()); @@ -141,6 +143,8 @@ public void onOpen(CoreSession coreSession, Callback callback) if (openHandle != null) openHandle.invoke(); + container.notifySessionListeners((listener) -> listener.onWebSocketSessionOpened(session)); + callback.succeeded(); futureSession.complete(session); } @@ -225,6 +229,7 @@ public void onError(Throwable cause, Callback callback) public void onClosed(CloseStatus closeStatus, Callback callback) { callback.succeeded(); + container.notifySessionListeners((listener) -> listener.onWebSocketSessionClosed(session)); } public String toString() @@ -333,7 +338,6 @@ private void onTextFrame(Frame frame, Callback callback) acceptMessage(frame, callback); } - static Throwable convertCause(Throwable cause) { if (cause instanceof MessageTooLargeException) diff --git a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerFactory.java b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerFactory.java index a06d5fce5607..4a25d3b499e7 100644 --- a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerFactory.java +++ b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerFactory.java @@ -80,13 +80,12 @@ */ public class JettyWebSocketFrameHandlerFactory extends ContainerLifeCycle { - private final Executor executor; + private final WebSocketContainer container; private Map, JettyWebSocketFrameHandlerMetadata> metadataMap = new ConcurrentHashMap<>(); - public JettyWebSocketFrameHandlerFactory(Executor executor) + public JettyWebSocketFrameHandlerFactory(WebSocketContainer container) { - this.executor = executor; - addBean(executor); + this.container = container; } public JettyWebSocketFrameHandlerMetadata getMetadata(Class endpointClass) @@ -148,7 +147,7 @@ public JettyWebSocketFrameHandler newJettyFrameHandler(Object endpointInstance, future = new CompletableFuture<>(); JettyWebSocketFrameHandler frameHandler = new JettyWebSocketFrameHandler( - executor, + container, endpointInstance, upgradeRequest, upgradeResponse, openHandle, closeHandle, errorHandle, diff --git a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/SessionTracker.java b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/SessionTracker.java new file mode 100644 index 000000000000..206da1b19eba --- /dev/null +++ b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/SessionTracker.java @@ -0,0 +1,59 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.common; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.eclipse.jetty.util.component.AbstractLifeCycle; +import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.websocket.api.Session; + +public class SessionTracker extends AbstractLifeCycle implements WebSocketSessionListener +{ + private List sessions = new CopyOnWriteArrayList<>(); + + public Collection getSessions() + { + return sessions; + } + + @Override + public void onWebSocketSessionOpened(WebSocketSessionImpl session) + { + sessions.add(session); + } + + @Override + public void onWebSocketSessionClosed(WebSocketSessionImpl session) + { + sessions.remove(session); + } + + @Override + protected void doStop() throws Exception + { + for (Session session : sessions) + { + LifeCycle.stop(session); + } + super.doStop(); + } +} diff --git a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketContainer.java b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketContainer.java new file mode 100644 index 000000000000..2b6bfca16ff0 --- /dev/null +++ b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketContainer.java @@ -0,0 +1,65 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.common; + +import java.util.Collection; +import java.util.concurrent.Executor; +import java.util.function.Consumer; + +import org.eclipse.jetty.websocket.api.Session; + +/** + * Generic interface to the Container (server or client) that jetty-websocket-common can use + */ +public interface WebSocketContainer +{ + /** + * The Container provided Executor. + */ + Executor getExecutor(); + + /** + * Get the collection of open Sessions being tracked by this container + * + * @return the collection of open sessions + */ + Collection getOpenSessions(); + + /** + * Register a WebSocketSessionListener with the container + * + * @param listener the listener + */ + void addSessionListener(WebSocketSessionListener listener); + + /** + * Remove a WebSocketSessionListener from the container + * + * @param listener the listener + * @return true if listener was present and removed + */ + boolean removeSessionListener(WebSocketSessionListener listener); + + /** + * Notify the Session Listeners of an event. + * + * @param consumer the consumer to call for each tracked listener + */ + void notifySessionListeners(Consumer consumer); +} diff --git a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSessionImpl.java b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSessionImpl.java index 58f02e6f479d..93bb017b8939 100644 --- a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSessionImpl.java +++ b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSessionImpl.java @@ -23,17 +23,23 @@ import java.time.Duration; import java.util.Objects; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.Dumpable; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.CloseStatus; import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.SuspendToken; import org.eclipse.jetty.websocket.api.UpgradeRequest; import org.eclipse.jetty.websocket.api.UpgradeResponse; import org.eclipse.jetty.websocket.api.WebSocketBehavior; import org.eclipse.jetty.websocket.core.FrameHandler; -public class WebSocketSessionImpl implements Session, Dumpable +public class WebSocketSessionImpl extends AbstractLifeCycle implements Session, Dumpable { + private static final Logger LOG = Log.getLogger(WebSocketSessionImpl.class); private final FrameHandler.CoreSession coreSession; private final JettyWebSocketFrameHandler frameHandler; private final JettyWebSocketRemoteEndpoint remoteEndpoint; @@ -206,6 +212,30 @@ public SuspendToken suspend() return null; } + public FrameHandler.CoreSession getCoreSession() + { + return coreSession; + } + + @Override + protected void doStop() throws Exception + { + coreSession.close(StatusCode.SHUTDOWN, "Container being shut down", new Callback() + { + @Override + public void succeeded() + { + coreSession.abort(); + } + + @Override + public void failed(Throwable x) + { + coreSession.abort(); + } + }); + } + @Override public void dump(Appendable out, String indent) throws IOException { diff --git a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSessionListener.java b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSessionListener.java new file mode 100644 index 000000000000..a4d32017409c --- /dev/null +++ b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSessionListener.java @@ -0,0 +1,29 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.common; + +/** + * Interface for Listeners that are interested in knowing about the WebSocketSession history. + */ +public interface WebSocketSessionListener +{ + void onWebSocketSessionOpened(WebSocketSessionImpl session); + + void onWebSocketSessionClosed(WebSocketSessionImpl session); +} diff --git a/jetty-websocket/jetty-websocket-common/src/test/java/org/eclipse/jetty/websocket/common/DummyContainer.java b/jetty-websocket/jetty-websocket-common/src/test/java/org/eclipse/jetty/websocket/common/DummyContainer.java new file mode 100644 index 000000000000..82b82b2cf2c0 --- /dev/null +++ b/jetty-websocket/jetty-websocket-common/src/test/java/org/eclipse/jetty/websocket/common/DummyContainer.java @@ -0,0 +1,86 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.common; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.function.Consumer; + +import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.eclipse.jetty.websocket.api.Session; + +public class DummyContainer extends ContainerLifeCycle implements WebSocketContainer +{ + private final QueuedThreadPool executor; + private final List sessionListeners = new ArrayList<>(); + private final SessionTracker sessionTracker = new SessionTracker(); + + public DummyContainer() + { + executor = new QueuedThreadPool(); + executor.setName("dummy-container"); + addBean(executor); + + addSessionListener(sessionTracker); + addBean(sessionTracker); + } + + @Override + public Executor getExecutor() + { + return executor; + } + + @Override + public Collection getOpenSessions() + { + return sessionTracker.getSessions(); + } + + @Override + public void addSessionListener(WebSocketSessionListener listener) + { + sessionListeners.add(listener); + } + + @Override + public boolean removeSessionListener(WebSocketSessionListener listener) + { + return sessionListeners.remove(listener); + } + + @Override + public void notifySessionListeners(Consumer consumer) + { + for (WebSocketSessionListener listener : sessionListeners) + { + try + { + consumer.accept(listener); + } + catch (Throwable x) + { + x.printStackTrace(System.err); + } + } + } +} diff --git a/jetty-websocket/jetty-websocket-common/src/test/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerTest.java b/jetty-websocket/jetty-websocket-common/src/test/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerTest.java index 7176f0140787..eaf3a37fbf5e 100644 --- a/jetty-websocket/jetty-websocket-common/src/test/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerTest.java +++ b/jetty-websocket/jetty-websocket-common/src/test/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerTest.java @@ -27,7 +27,6 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IO; -import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.UpgradeRequest; @@ -56,21 +55,22 @@ public class JettyWebSocketFrameHandlerTest { - private static QueuedThreadPool executor = new QueuedThreadPool(); + private static DummyContainer container; @BeforeAll public static void startContainer() throws Exception { - executor.start(); + container = new DummyContainer(); + container.start(); } @AfterAll public static void stopContainer() throws Exception { - executor.stop(); + container.stop(); } - private JettyWebSocketFrameHandlerFactory endpointFactory = new JettyWebSocketFrameHandlerFactory(executor); + private JettyWebSocketFrameHandlerFactory endpointFactory = new JettyWebSocketFrameHandlerFactory(container); private FrameHandler.CoreSession channel = new FrameHandler.CoreSession.Empty() { @Override diff --git a/jetty-websocket/jetty-websocket-common/src/test/java/org/eclipse/jetty/websocket/common/LocalEndpointMetadataTest.java b/jetty-websocket/jetty-websocket-common/src/test/java/org/eclipse/jetty/websocket/common/LocalEndpointMetadataTest.java index bdcdef2ef413..9fdf87adccd8 100644 --- a/jetty-websocket/jetty-websocket-common/src/test/java/org/eclipse/jetty/websocket/common/LocalEndpointMetadataTest.java +++ b/jetty-websocket/jetty-websocket-common/src/test/java/org/eclipse/jetty/websocket/common/LocalEndpointMetadataTest.java @@ -18,7 +18,6 @@ package org.eclipse.jetty.websocket.common; -import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.websocket.api.InvalidWebSocketException; import org.eclipse.jetty.websocket.common.endpoints.annotated.AnnotatedBinaryArraySocket; import org.eclipse.jetty.websocket.common.endpoints.annotated.AnnotatedBinaryStreamSocket; @@ -55,22 +54,22 @@ public class LocalEndpointMetadataTest { public static final Matcher EXISTS = notNullValue(); - public static QueuedThreadPool threadpool; + public static DummyContainer container; @BeforeAll public static void startContainer() throws Exception { - threadpool = new QueuedThreadPool(); - threadpool.start(); + container = new DummyContainer(); + container.start(); } @AfterAll public static void stopContainer() throws Exception { - threadpool.stop(); + container.stop(); } - private JettyWebSocketFrameHandlerFactory endpointFactory = new JettyWebSocketFrameHandlerFactory(threadpool); + private JettyWebSocketFrameHandlerFactory endpointFactory = new JettyWebSocketFrameHandlerFactory(container); private JettyWebSocketFrameHandlerMetadata createMetadata(Class endpointClass) { diff --git a/jetty-websocket/jetty-websocket-server/src/main/java/org/eclipse/jetty/websocket/server/JettyServerFrameHandlerFactory.java b/jetty-websocket/jetty-websocket-server/src/main/java/org/eclipse/jetty/websocket/server/JettyServerFrameHandlerFactory.java index 4c800db23890..8422f8e3d805 100644 --- a/jetty-websocket/jetty-websocket-server/src/main/java/org/eclipse/jetty/websocket/server/JettyServerFrameHandlerFactory.java +++ b/jetty-websocket/jetty-websocket-server/src/main/java/org/eclipse/jetty/websocket/server/JettyServerFrameHandlerFactory.java @@ -19,8 +19,6 @@ package org.eclipse.jetty.websocket.server; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - import javax.servlet.ServletContext; import javax.servlet.ServletException; @@ -28,8 +26,10 @@ import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.websocket.common.JettyWebSocketFrameHandlerFactory; +import org.eclipse.jetty.websocket.common.WebSocketContainer; import org.eclipse.jetty.websocket.core.FrameHandler; import org.eclipse.jetty.websocket.server.internal.DelegatedJettyServletUpgradeRequest; +import org.eclipse.jetty.websocket.server.internal.JettyWebSocketServerContainer; import org.eclipse.jetty.websocket.server.internal.UpgradeResponseAdapter; import org.eclipse.jetty.websocket.servlet.FrameHandlerFactory; import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; @@ -47,21 +47,18 @@ public static JettyServerFrameHandlerFactory ensureFactory(ServletContext servle JettyServerFrameHandlerFactory factory = contextHandler.getBean(JettyServerFrameHandlerFactory.class); if (factory == null) { - Executor executor = (Executor)servletContext - .getAttribute("org.eclipse.jetty.server.Executor"); - if (executor == null) - executor = contextHandler.getServer().getThreadPool(); - - factory = new JettyServerFrameHandlerFactory(executor); + JettyWebSocketServerContainer container = new JettyWebSocketServerContainer(contextHandler); + servletContext.setAttribute(WebSocketContainer.class.getName(), container); + factory = new JettyServerFrameHandlerFactory(container); contextHandler.addManaged(factory); contextHandler.addLifeCycleListener(factory); } return factory; } - public JettyServerFrameHandlerFactory(Executor executor) + public JettyServerFrameHandlerFactory(WebSocketContainer container) { - super(executor); + super(container); } @Override diff --git a/jetty-websocket/jetty-websocket-server/src/main/java/org/eclipse/jetty/websocket/server/internal/JettyWebSocketServerContainer.java b/jetty-websocket/jetty-websocket-server/src/main/java/org/eclipse/jetty/websocket/server/internal/JettyWebSocketServerContainer.java new file mode 100644 index 000000000000..4b28f6d1ca41 --- /dev/null +++ b/jetty-websocket/jetty-websocket-server/src/main/java/org/eclipse/jetty/websocket/server/internal/JettyWebSocketServerContainer.java @@ -0,0 +1,99 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.server.internal; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.function.Consumer; + +import org.eclipse.jetty.server.handler.ContextHandler; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.common.SessionTracker; +import org.eclipse.jetty.websocket.common.WebSocketContainer; +import org.eclipse.jetty.websocket.common.WebSocketSessionListener; + +public class JettyWebSocketServerContainer implements WebSocketContainer +{ + private final static Logger LOG = Log.getLogger(JettyWebSocketServerContainer.class); + private final Executor executor; + private final List sessionListeners = new ArrayList<>(); + private final SessionTracker sessionTracker = new SessionTracker(); + + public JettyWebSocketServerContainer(ContextHandler handler) + { + Executor executor = (Executor) handler + .getAttribute("org.eclipse.jetty.server.Executor"); + if (executor == null) + { + executor = handler.getServer().getThreadPool(); + } + if (executor == null) + { + executor = new QueuedThreadPool(); // default settings + } + this.executor = executor; + addSessionListener(sessionTracker); + handler.addBean(sessionTracker); + } + + @Override + public Executor getExecutor() + { + return this.executor; + } + + @Override + public void addSessionListener(WebSocketSessionListener listener) + { + sessionListeners.add(listener); + } + + @Override + public boolean removeSessionListener(WebSocketSessionListener listener) + { + return sessionListeners.remove(listener); + } + + @Override + public void notifySessionListeners(Consumer consumer) + { + for (WebSocketSessionListener listener : sessionListeners) + { + try + { + consumer.accept(listener); + } + catch (Throwable x) + { + LOG.info("Exception while invoking listener " + listener, x); + } + } + } + + @Override + public Collection getOpenSessions() + { + return sessionTracker.getSessions(); + } +} diff --git a/jetty-websocket/jetty-websocket-tests/pom.xml b/jetty-websocket/jetty-websocket-tests/pom.xml index 1c0f7120e920..fffc0b59e810 100644 --- a/jetty-websocket/jetty-websocket-tests/pom.xml +++ b/jetty-websocket/jetty-websocket-tests/pom.xml @@ -8,15 +8,15 @@ 10.0.0-SNAPSHOT - 4.0.0 + 4.0.0 jetty-websocket-tests Jetty :: Websocket :: org.eclipse.jetty.websocket :: Tests - + ${project.groupId}.jetty.websocket.tests - + org.eclipse.jetty.websocket jetty-websocket-api @@ -44,7 +44,7 @@ - + org.apache.felix diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/CloseTrackingEndpoint.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/CloseTrackingEndpoint.java new file mode 100644 index 000000000000..bf92d1ac4903 --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/CloseTrackingEndpoint.java @@ -0,0 +1,128 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketAdapter; +import org.eclipse.jetty.websocket.common.WebSocketSessionImpl; +import org.eclipse.jetty.websocket.core.internal.WebSocketChannel; +import org.eclipse.jetty.websocket.core.internal.WebSocketConnection; +import org.hamcrest.Matcher; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +public class CloseTrackingEndpoint extends WebSocketAdapter +{ + private static final Logger LOG = Log.getLogger(CloseTrackingEndpoint.class); + + public int closeCode = -1; + public String closeReason = null; + public CountDownLatch closeLatch = new CountDownLatch(1); + public AtomicInteger closeCount = new AtomicInteger(0); + public CountDownLatch openLatch = new CountDownLatch(1); + public CountDownLatch errorLatch = new CountDownLatch(1); + + public LinkedBlockingQueue messageQueue = new LinkedBlockingQueue<>(); + public AtomicReference error = new AtomicReference<>(); + + public void assertReceivedCloseEvent(int clientTimeoutMs, Matcher statusCodeMatcher) + throws InterruptedException + { + assertReceivedCloseEvent(clientTimeoutMs, statusCodeMatcher, null); + } + + public void assertReceivedCloseEvent(int clientTimeoutMs, Matcher statusCodeMatcher, Matcher reasonMatcher) + throws InterruptedException + { + assertThat("Client Close Event Occurred", closeLatch.await(clientTimeoutMs, TimeUnit.MILLISECONDS), is(true)); + assertThat("Client Close Event Count", closeCount.get(), is(1)); + assertThat("Client Close Event Status Code", closeCode, statusCodeMatcher); + if (reasonMatcher == null) + { + assertThat("Client Close Event Reason", closeReason, nullValue()); + } + else + { + assertThat("Client Close Event Reason", closeReason, reasonMatcher); + } + } + + public void clearQueues() + { + messageQueue.clear(); + } + + @Override + public void onWebSocketClose(int statusCode, String reason) + { + LOG.debug("onWebSocketClose({},{})", statusCode, reason); + super.onWebSocketClose(statusCode, reason); + closeCount.incrementAndGet(); + closeCode = statusCode; + closeReason = reason; + closeLatch.countDown(); + } + + @Override + public void onWebSocketConnect(Session session) + { + LOG.debug("onWebSocketConnect({})", session); + super.onWebSocketConnect(session); + openLatch.countDown(); + } + + @Override + public void onWebSocketError(Throwable cause) + { + LOG.debug("onWebSocketError", cause); + assertThat("Unique Error Event", error.compareAndSet(null, cause), is(true)); + errorLatch.countDown(); + } + + @Override + public void onWebSocketText(String message) + { + LOG.debug("onWebSocketText({})", message); + messageQueue.offer(message); + } + + public EndPoint getEndPoint() + { + Session session = getSession(); + assertThat("Session type", session, instanceOf(WebSocketSessionImpl.class)); + + WebSocketSessionImpl wsSession = (WebSocketSessionImpl) session; + WebSocketChannel wsChannel = (WebSocketChannel) wsSession.getCoreSession(); + WebSocketConnection wsConnection = wsChannel.getConnection(); + + return wsConnection.getEndPoint(); + } +} diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/EchoCreator.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/EchoCreator.java new file mode 100644 index 000000000000..c9cc46b144a2 --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/EchoCreator.java @@ -0,0 +1,37 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests; + +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketCreator; + +public class EchoCreator implements WebSocketCreator +{ + @Override + public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) + { + if (req.hasSubProtocol("echo")) + { + resp.setAcceptedSubProtocol("echo"); + } + + return new EchoSocket(); + } +} diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/EchoSocket.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/EchoSocket.java new file mode 100644 index 000000000000..4908c6cb29c4 --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/EchoSocket.java @@ -0,0 +1,35 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests; + +import java.io.IOException; + +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; + +@WebSocket +public class EchoSocket +{ + @OnWebSocketMessage + public void onMessage(Session session, String msg) throws IOException + { + session.getRemote().sendString(msg); + } +} diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/BadNetworkTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/BadNetworkTest.java new file mode 100644 index 000000000000..11ceae1f4468 --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/BadNetworkTest.java @@ -0,0 +1,203 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests.client; + +import java.io.IOException; +import java.net.URI; +import java.time.Duration; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.api.WebSocketListener; +import org.eclipse.jetty.websocket.api.util.WSURI; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.server.JettyWebSocketServletContainerInitializer; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.eclipse.jetty.websocket.tests.CloseTrackingEndpoint; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; + +/** + * Tests for conditions due to bad networking. + */ +public class BadNetworkTest +{ + private Server server; + private WebSocketClient client; + + @BeforeEach + public void startClient() throws Exception + { + client = new WebSocketClient(); + client.setIdleTimeout(Duration.ofMillis(500)); + client.start(); + } + + @BeforeEach + public void startServer() throws Exception + { + server = new Server(); + + ServerConnector connector = new ServerConnector(server); + connector.setPort(0); + server.addConnector(connector); + + ServletContextHandler context = new ServletContextHandler(); + JettyWebSocketServletContainerInitializer.configure(context); + context.setContextPath("/"); + ServletHolder holder = new ServletHolder(new WebSocketServlet() + { + @Override + public void configure(WebSocketServletFactory factory) + { + factory.setIdleTimeout(Duration.ofSeconds(10)); + factory.setMaxTextMessageSize(1024 * 1024 * 2); + factory.register(ServerEndpoint.class); + } + }); + context.addServlet(holder, "/ws"); + + HandlerList handlers = new HandlerList(); + handlers.addHandler(context); + handlers.addHandler(new DefaultHandler()); + server.setHandler(handlers); + + server.start(); + } + + @AfterEach + public void stopClient() throws Exception + { + client.stop(); + } + + @AfterEach + public void stopServer() throws Exception + { + server.stop(); + } + + @Test + public void testAbruptClientClose() throws Exception + { + CloseTrackingEndpoint wsocket = new CloseTrackingEndpoint(); + + URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws")); + Future future = client.connect(wsocket, wsUri); + + // Validate that we are connected + future.get(30, TimeUnit.SECONDS); + + // Have client disconnect abruptly + Session session = wsocket.getSession(); + session.disconnect(); + + // Client Socket should see a close event, with status NO_CLOSE + // This event is automatically supplied by the underlying WebSocketClientConnection + // in the situation of a bad network connection. + wsocket.assertReceivedCloseEvent(5000, is(StatusCode.NO_CLOSE), containsString("")); + } + + @Test + public void testAbruptServerClose() throws Exception + { + CloseTrackingEndpoint wsocket = new CloseTrackingEndpoint(); + + URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws")); + Future future = client.connect(wsocket, wsUri); + + // Validate that we are connected + Session session = future.get(30, TimeUnit.SECONDS); + + // Have server disconnect abruptly + session.getRemote().sendString("abort"); + + // Client Socket should see a close event, with status NO_CLOSE + // This event is automatically supplied by the underlying WebSocketClientConnection + // in the situation of a bad network connection. + wsocket.assertReceivedCloseEvent(5000, is(StatusCode.NO_CLOSE), containsString("")); + } + + public static class ServerEndpoint implements WebSocketListener + { + private static final Logger LOG = Log.getLogger(ClientCloseTest.ServerEndpoint.class); + private Session session; + + @Override + public void onWebSocketBinary(byte[] payload, int offset, int len) + { + } + + @Override + public void onWebSocketText(String message) + { + try + { + if (message.equals("abort")) + { + session.disconnect(); + } + else + { + // simple echo + session.getRemote().sendString(message); + } + } + catch (IOException e) + { + LOG.warn(e); + } + } + + @Override + public void onWebSocketClose(int statusCode, String reason) + { + } + + @Override + public void onWebSocketConnect(Session session) + { + this.session = session; + } + + @Override + public void onWebSocketError(Throwable cause) + { + if (LOG.isDebugEnabled()) + { + LOG.debug(cause); + } + } + } +} diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientCloseTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientCloseTest.java new file mode 100644 index 000000000000..a48c791cf2d4 --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientCloseTest.java @@ -0,0 +1,481 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests.client; + + +import java.io.IOException; +import java.net.URI; +import java.nio.channels.ClosedChannelException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.CloseException; +import org.eclipse.jetty.websocket.api.MessageTooLargeException; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.api.WebSocketFrameListener; +import org.eclipse.jetty.websocket.api.WebSocketListener; +import org.eclipse.jetty.websocket.api.util.WSURI; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.core.CloseStatus; +import org.eclipse.jetty.websocket.core.OpCode; +import org.eclipse.jetty.websocket.server.JettyWebSocketServletContainerInitializer; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.eclipse.jetty.websocket.tests.CloseTrackingEndpoint; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.time.Duration.ofSeconds; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; + +public class ClientCloseTest +{ + private Server server; + private WebSocketClient client; + + private Session confirmConnection(CloseTrackingEndpoint clientSocket, Future clientFuture) throws Exception + { + // Wait for client connect on via future + Session session = clientFuture.get(30, SECONDS); + + try + { + // Send message from client to server + final String echoMsg = "echo-test"; + Future testFut = clientSocket.getRemote().sendStringByFuture(echoMsg); + + // Wait for send future + testFut.get(5, SECONDS); + + // Verify received message + String recvMsg = clientSocket.messageQueue.poll(5, SECONDS); + assertThat("Received message", recvMsg, is(echoMsg)); + + // Verify that there are no errors + assertThat("Error events", clientSocket.error.get(), nullValue()); + } + finally + { + clientSocket.clearQueues(); + } + + return session; + } + + @BeforeEach + public void startClient() throws Exception + { + client = new WebSocketClient(); + client.setMaxTextMessageSize(1024); + client.start(); + } + + @BeforeEach + public void startServer() throws Exception + { + server = new Server(); + + ServerConnector connector = new ServerConnector(server); + connector.setPort(0); + server.addConnector(connector); + + ServletContextHandler context = new ServletContextHandler(); + JettyWebSocketServletContainerInitializer.configure(context); + context.setContextPath("/"); + ServletHolder holder = new ServletHolder(new WebSocketServlet() + { + @Override + public void configure(WebSocketServletFactory factory) + { + factory.setIdleTimeout(Duration.ofSeconds(10)); + factory.setMaxTextMessageSize(1024 * 1024 * 2); + factory.register(ServerEndpoint.class); + } + }); + context.addServlet(holder, "/ws"); + + HandlerList handlers = new HandlerList(); + handlers.addHandler(context); + handlers.addHandler(new DefaultHandler()); + server.setHandler(handlers); + + server.start(); + } + + @AfterEach + public void stopClient() throws Exception + { + client.stop(); + } + + @AfterEach + public void stopServer() throws Exception + { + server.stop(); + } + + @Test + public void testHalfClose() throws Exception + { + // Set client timeout + final int timeout = 5000; + client.setIdleTimeout(Duration.ofMillis(timeout)); + + ClientOpenSessionTracker clientSessionTracker = new ClientOpenSessionTracker(1); + clientSessionTracker.addTo(client); + + // Client connects + URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws")); + CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint(); + Future clientConnectFuture = client.connect(clientSocket, wsUri); + + try (Session session = confirmConnection(clientSocket, clientConnectFuture)) + { + // client confirms connection via echo + + // client sends close frame (code 1000, normal) + final String origCloseReason = "send-more-frames"; + clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason); + + // Verify received messages + String recvMsg = clientSocket.messageQueue.poll(5, SECONDS); + assertThat("Received message 1", recvMsg, is("Hello")); + recvMsg = clientSocket.messageQueue.poll(5, SECONDS); + assertThat("Received message 2", recvMsg, is("World")); + + // Verify that there are no errors + assertThat("Error events", clientSocket.error.get(), nullValue()); + + // client close event on ws-endpoint + clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.NORMAL), containsString("")); + } + + clientSessionTracker.assertClosedProperly(client); + } + + @Test + public void testMessageTooLargeException() throws Exception + { + // Set client timeout + final int timeout = 3000; + client.setIdleTimeout(Duration.ofMillis(timeout)); + + ClientOpenSessionTracker clientSessionTracker = new ClientOpenSessionTracker(1); + clientSessionTracker.addTo(client); + + // Client connects + URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws")); + CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint(); + Future clientConnectFuture = client.connect(clientSocket, wsUri); + + try (Session session = confirmConnection(clientSocket, clientConnectFuture)) + { + // client confirms connection via echo + + session.getRemote().sendString("too-large-message"); + + clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.MESSAGE_TOO_LARGE), containsString("exceeds maximum size")); + + // client should have noticed the error + assertThat("OnError Latch", clientSocket.errorLatch.await(2, SECONDS), is(true)); + assertThat("OnError", clientSocket.error.get(), instanceOf(MessageTooLargeException.class)); + } + + // client triggers close event on client ws-endpoint + clientSessionTracker.assertClosedProperly(client); + } + + @Test + public void testRemoteDisconnect() throws Exception + { + // Set client timeout + final int clientTimeout = 1000; + client.setIdleTimeout(Duration.ofMillis(clientTimeout)); + + ClientOpenSessionTracker clientSessionTracker = new ClientOpenSessionTracker(1); + clientSessionTracker.addTo(client); + + // Client connects + URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws")); + CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint(); + Future clientConnectFuture = client.connect(clientSocket, wsUri); + + try (Session ignored = confirmConnection(clientSocket, clientConnectFuture)) + { + // client confirms connection via echo + + // client sends close frame (triggering server connection abort) + final String origCloseReason = "abort"; + clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason); + + // client reads -1 (EOF) + // client triggers close event on client ws-endpoint + clientSocket.assertReceivedCloseEvent(clientTimeout * 2, + is(StatusCode.SHUTDOWN), + containsString("timeout")); + } + + clientSessionTracker.assertClosedProperly(client); + } + + @Test + public void testServerNoCloseHandshake() throws Exception + { + // Set client timeout + final int clientTimeout = 1000; + client.setIdleTimeout(Duration.ofMillis(clientTimeout)); + + ClientOpenSessionTracker clientSessionTracker = new ClientOpenSessionTracker(1); + clientSessionTracker.addTo(client); + + // Client connects + URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws")); + CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint(); + Future clientConnectFuture = client.connect(clientSocket, wsUri); + + try (Session ignored = confirmConnection(clientSocket, clientConnectFuture)) + { + // client confirms connection via echo + + // client sends close frame + final String origCloseReason = "sleep|5000"; + clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason); + + // client close should occur + clientSocket.assertReceivedCloseEvent(clientTimeout * 2, + is(StatusCode.SHUTDOWN), + containsString("timeout")); + + // client idle timeout triggers close event on client ws-endpoint + assertThat("OnError Latch", clientSocket.errorLatch.await(2, SECONDS), is(true)); + assertThat("OnError", clientSocket.error.get(), instanceOf(CloseException.class)); + assertThat("OnError.cause", clientSocket.error.get().getCause(), instanceOf(TimeoutException.class)); + } + + clientSessionTracker.assertClosedProperly(client); + } + + @Test + public void testStopLifecycle() throws Exception + { + // Set client timeout + final int timeout = 1000; + client.setIdleTimeout(Duration.ofMillis(timeout)); + + int sessionCount = 3; + ClientOpenSessionTracker clientSessionTracker = new ClientOpenSessionTracker(sessionCount); + clientSessionTracker.addTo(client); + + URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws")); + List clientSockets = new ArrayList<>(); + + // Open Multiple Clients + for (int i = 0; i < sessionCount; i++) + { + // Client Request Upgrade + CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint(); + clientSockets.add(clientSocket); + Future clientConnectFuture = client.connect(clientSocket, wsUri); + + // client confirms connection via echo + confirmConnection(clientSocket, clientConnectFuture); + } + + assertTimeoutPreemptively(ofSeconds(5), () -> { + // client lifecycle stop (the meat of this test) + client.stop(); + }); + + // clients disconnect + for (int i = 0; i < sessionCount; i++) + { + clientSockets.get(i).assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL), containsString("Disconnected")); + } + + // ensure all Sessions are gone. connections are gone. etc. (client and server) + // ensure ConnectionListener onClose is called 3 times + clientSessionTracker.assertClosedProperly(client); + } + + @Test + public void testWriteException() throws Exception + { + // Set client timeout + final int timeout = 2000; + client.setIdleTimeout(Duration.ofMillis(timeout)); + + ClientOpenSessionTracker clientSessionTracker = new ClientOpenSessionTracker(1); + clientSessionTracker.addTo(client); + + // Client connects + URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws")); + CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint(); + Future clientConnectFuture = client.connect(clientSocket, wsUri); + + // client confirms connection via echo + confirmConnection(clientSocket, clientConnectFuture); + + // setup client endpoint for write failure (test only) + EndPoint endp = clientSocket.getEndPoint(); + endp.shutdownOutput(); + + // client enqueue close frame + // should result in a client write failure + final String origCloseReason = "Normal Close from Client"; + clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason); + + assertThat("OnError Latch", clientSocket.errorLatch.await(2, SECONDS), is(true)); + assertThat("OnError", clientSocket.error.get(), instanceOf(ClosedChannelException.class)); + + // client triggers close event on client ws-endpoint + // assert - close code==1006 (abnormal) + clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL), containsString("Eof")); + + clientSessionTracker.assertClosedProperly(client); + } + + public static class ServerEndpoint implements WebSocketFrameListener, WebSocketListener + { + private static final Logger LOG = Log.getLogger(ServerEndpoint.class); + private Session session; + + @Override + public void onWebSocketBinary(byte[] payload, int offset, int len) + { + } + + @Override + public void onWebSocketText(String message) + { + try + { + if (message.equals("too-large-message")) + { + // send extra large message + byte[] buf = new byte[1024 * 1024]; + Arrays.fill(buf, (byte) 'x'); + String bigmsg = new String(buf, UTF_8); + session.getRemote().sendString(bigmsg); + } + else + { + // simple echo + session.getRemote().sendString(message); + } + } + catch (IOException ignore) + { + LOG.debug(ignore); + } + } + + @Override + public void onWebSocketClose(int statusCode, String reason) + { + } + + @Override + public void onWebSocketConnect(Session session) + { + this.session = session; + } + + @Override + public void onWebSocketError(Throwable cause) + { + if (LOG.isDebugEnabled()) + { + LOG.debug(cause); + } + } + + @Override + public void onWebSocketFrame(org.eclipse.jetty.websocket.api.extensions.Frame frame) + { + if (frame.getOpCode() == OpCode.CLOSE) + { + CloseStatus closeInfo = new CloseStatus(frame.getPayload()); + String reason = closeInfo.getReason(); + + if (reason.equals("send-more-frames")) + { + try + { + session.getRemote().sendString("Hello"); + session.getRemote().sendString("World"); + } + catch (Throwable ignore) + { + LOG.debug("OOPS", ignore); + } + } + else if (reason.equals("abort")) + { + try + { + SECONDS.sleep(1); + LOG.info("Server aborting session abruptly"); + session.disconnect(); + } + catch (Throwable ignore) + { + LOG.ignore(ignore); + } + } + else if (reason.startsWith("sleep|")) + { + int idx = reason.indexOf('|'); + int timeMs = Integer.parseInt(reason.substring(idx + 1)); + try + { + LOG.info("Server Sleeping for {} ms", timeMs); + TimeUnit.MILLISECONDS.sleep(timeMs); + } + catch (InterruptedException ignore) + { + LOG.ignore(ignore); + } + } + } + } + } +} \ No newline at end of file diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientOpenSessionTracker.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientOpenSessionTracker.java new file mode 100644 index 000000000000..9f2471cba94c --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientOpenSessionTracker.java @@ -0,0 +1,76 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests.client; + +import java.util.concurrent.CountDownLatch; + +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.common.WebSocketSessionImpl; +import org.eclipse.jetty.websocket.common.WebSocketSessionListener; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientOpenSessionTracker implements Connection.Listener, WebSocketSessionListener +{ + private final CountDownLatch closeSessionLatch; + private final CountDownLatch closeConnectionLatch; + + public ClientOpenSessionTracker(int expectedSessions) + { + this.closeSessionLatch = new CountDownLatch(expectedSessions); + this.closeConnectionLatch = new CountDownLatch(expectedSessions); + } + + public void addTo(WebSocketClient client) + { + client.addSessionListener(this); + client.addBean(this); + } + + public void assertClosedProperly(WebSocketClient client) throws InterruptedException + { + assertTrue(closeConnectionLatch.await(5, SECONDS), "All Jetty Connections should have been closed"); + assertTrue(closeSessionLatch.await(5, SECONDS), "All WebSocket Sessions should have been closed"); + assertTrue(client.getOpenSessions().isEmpty(), "Client OpenSessions MUST be empty"); + } + + @Override + public void onOpened(Connection connection) + { + } + + @Override + public void onClosed(Connection connection) + { + this.closeConnectionLatch.countDown(); + } + + @Override + public void onWebSocketSessionOpened(WebSocketSessionImpl session) + { + } + + @Override + public void onWebSocketSessionClosed(WebSocketSessionImpl session) + { + this.closeSessionLatch.countDown(); + } +} diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientSessionsTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientSessionsTest.java new file mode 100644 index 000000000000..1d5af4270033 --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientSessionsTest.java @@ -0,0 +1,163 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests.client; + +import java.net.URI; +import java.time.Duration; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.websocket.api.RemoteEndpoint; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.api.util.WSURI; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.common.WebSocketSessionImpl; +import org.eclipse.jetty.websocket.common.WebSocketSessionListener; +import org.eclipse.jetty.websocket.server.JettyWebSocketServletContainerInitializer; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.eclipse.jetty.websocket.tests.CloseTrackingEndpoint; +import org.eclipse.jetty.websocket.tests.EchoCreator; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientSessionsTest +{ + private Server server; + + @BeforeEach + public void startServer() throws Exception + { + server = new Server(); + + ServerConnector connector = new ServerConnector(server); + connector.setPort(0); + server.addConnector(connector); + + ServletContextHandler context = new ServletContextHandler(); + JettyWebSocketServletContainerInitializer.configure(context); + context.setContextPath("/"); + ServletHolder holder = new ServletHolder(new WebSocketServlet() + { + @Override + public void configure(WebSocketServletFactory factory) + { + factory.setIdleTimeout(Duration.ofSeconds(10)); + factory.setMaxTextMessageSize(1024 * 1024 * 2); + factory.setCreator(new EchoCreator()); + } + }); + context.addServlet(holder, "/ws"); + + HandlerList handlers = new HandlerList(); + handlers.addHandler(context); + handlers.addHandler(new DefaultHandler()); + server.setHandler(handlers); + + server.start(); + } + + @AfterEach + public void stopServer() throws Exception + { + server.stop(); + } + + @Test + public void testBasicEcho_FromClient() throws Exception + { + WebSocketClient client = new WebSocketClient(); + + CountDownLatch onSessionCloseLatch = new CountDownLatch(1); + + client.addSessionListener(new WebSocketSessionListener() { + @Override + public void onWebSocketSessionOpened(WebSocketSessionImpl session) + { + } + + @Override + public void onWebSocketSessionClosed(WebSocketSessionImpl session) + { + onSessionCloseLatch.countDown(); + } + }); + + client.start(); + try + { + CloseTrackingEndpoint cliSock = new CloseTrackingEndpoint(); + client.setIdleTimeout(Duration.ofSeconds(10)); + + URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws")); + ClientUpgradeRequest request = new ClientUpgradeRequest(); + request.setSubProtocols("echo"); + Future future = client.connect(cliSock,wsUri,request); + + try (Session sess = future.get(30000, TimeUnit.MILLISECONDS)) + { + assertThat("Session", sess, notNullValue()); + assertThat("Session.open", sess.isOpen(), is(true)); + assertThat("Session.upgradeRequest", sess.getUpgradeRequest(), notNullValue()); + assertThat("Session.upgradeResponse", sess.getUpgradeResponse(), notNullValue()); + + Collection sessions = client.getOpenSessions(); + assertThat("client.connectionManager.sessions.size", sessions.size(), is(1)); + + RemoteEndpoint remote = sess.getRemote(); + remote.sendString("Hello World!"); + + Collection open = client.getOpenSessions(); + assertThat("(Before Close) Open Sessions.size", open.size(), is(1)); + + String received = cliSock.messageQueue.poll(5, TimeUnit.SECONDS); + assertThat("Message", received, containsString("Hello World!")); + } + + cliSock.assertReceivedCloseEvent(30000, is(StatusCode.NORMAL)); + + assertTrue(onSessionCloseLatch.await(5, TimeUnit.SECONDS), "Saw onSessionClose events"); + TimeUnit.SECONDS.sleep(1); + + Collection open = client.getOpenSessions(); + assertThat("(After Close) Open Sessions.size", open.size(), is(0)); + } + finally + { + client.stop(); + } + } +} diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientWriteThread.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientWriteThread.java new file mode 100644 index 000000000000..105df9929177 --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientWriteThread.java @@ -0,0 +1,107 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests.client; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; +import org.eclipse.jetty.websocket.api.RemoteEndpoint; +import org.eclipse.jetty.websocket.api.Session; + +public class ClientWriteThread extends Thread +{ + private static final Logger LOG = Log.getLogger(ClientWriteThread.class); + private final Session session; + private int slowness = -1; + private int messageCount = 100; + private String message = "Hello"; + + public ClientWriteThread(Session session) + { + this.session = session; + } + + public String getMessage() + { + return message; + } + + public int getMessageCount() + { + return messageCount; + } + + public int getSlowness() + { + return slowness; + } + + @Override + public void run() + { + final AtomicInteger m = new AtomicInteger(); + + try + { + LOG.debug("Writing {} messages to connection {}",messageCount); + LOG.debug("Artificial Slowness {} ms",slowness); + Future lastMessage = null; + RemoteEndpoint remote = session.getRemote(); + while (m.get() < messageCount) + { + lastMessage = remote.sendStringByFuture(message + "/" + m.get() + "/"); + + m.incrementAndGet(); + + if (slowness > 0) + { + TimeUnit.MILLISECONDS.sleep(slowness); + } + } + if (remote.getBatchMode() == BatchMode.ON) + remote.flush(); + // block on write of last message + if (lastMessage != null) + lastMessage.get(2,TimeUnit.MINUTES); // block on write + } + catch (Exception e) + { + LOG.warn(e); + } + } + + public void setMessage(String message) + { + this.message = message; + } + + public void setMessageCount(int messageCount) + { + this.messageCount = messageCount; + } + + public void setSlowness(int slowness) + { + this.slowness = slowness; + } +} \ No newline at end of file diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/SlowClientTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/SlowClientTest.java new file mode 100644 index 000000000000..929a5a7481be --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/SlowClientTest.java @@ -0,0 +1,144 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests.client; + +import java.net.URI; +import java.time.Duration; +import java.util.concurrent.Future; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.api.util.WSURI; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.server.JettyWebSocketServletContainerInitializer; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.eclipse.jetty.websocket.tests.CloseTrackingEndpoint; +import org.eclipse.jetty.websocket.tests.EchoSocket; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; + +/** + * This Regression Test Exists because of Client side Idle timeout, Read, and Parser bugs. + */ +public class SlowClientTest +{ + private Server server; + private WebSocketClient client; + + @BeforeEach + public void startClient() throws Exception + { + client = new WebSocketClient(); + client.setIdleTimeout(Duration.ofSeconds(60)); + client.start(); + } + + @BeforeEach + public void startServer() throws Exception + { + server = new Server(); + + ServerConnector connector = new ServerConnector(server); + connector.setPort(0); + server.addConnector(connector); + + ServletContextHandler context = new ServletContextHandler(); + context.setContextPath("/"); + ServletHolder websocket = new ServletHolder(new WebSocketServlet() + { + @Override + public void configure(WebSocketServletFactory factory) + { + factory.register(EchoSocket.class); + } + }); + context.addServlet(websocket, "/ws"); + JettyWebSocketServletContainerInitializer.configure(context); + + HandlerList handlers = new HandlerList(); + handlers.addHandler(context); + handlers.addHandler(new DefaultHandler()); + + server.setHandler(handlers); + + server.start(); + } + + @AfterEach + public void stopClient() throws Exception + { + client.stop(); + } + + @AfterEach + public void stopServer() throws Exception + { + server.stop(); + } + + @Test + public void testClientSlowToSend() throws Exception + { + CloseTrackingEndpoint clientEndpoint = new CloseTrackingEndpoint(); + client.setIdleTimeout(Duration.ofSeconds(60)); + + URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws")); + Future future = client.connect(clientEndpoint, wsUri); + + // Confirm connected + Session session = future.get(5, SECONDS); + + int messageCount = 10; + try + { + // Have client write slowly. + ClientWriteThread writer = new ClientWriteThread(clientEndpoint.getSession()); + writer.setMessageCount(messageCount); + writer.setMessage("Hello"); + writer.setSlowness(10); + writer.start(); + writer.join(); + + // Close + clientEndpoint.getSession().close(StatusCode.NORMAL, "Done"); + + // confirm close received on server + clientEndpoint.assertReceivedCloseEvent(10000, is(StatusCode.NORMAL), containsString("Done")); + } + finally + { + if (session != null) + { + session.close(); + } + } + } +} diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/AbstractCloseEndpoint.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/AbstractCloseEndpoint.java new file mode 100644 index 000000000000..ccd271410c32 --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/AbstractCloseEndpoint.java @@ -0,0 +1,76 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests.server; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.WebSocketAdapter; +import org.hamcrest.Matcher; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +public abstract class AbstractCloseEndpoint extends WebSocketAdapter +{ + public final Logger LOG; + public CountDownLatch closeLatch = new CountDownLatch(1); + public String closeReason = null; + public int closeStatusCode = -1; + public LinkedBlockingQueue errors = new LinkedBlockingQueue<>(); + + public AbstractCloseEndpoint() + { + this.LOG = Log.getLogger(this.getClass().getName()); + } + + @Override + public void onWebSocketClose(int statusCode, String reason) + { + LOG.debug("onWebSocketClose({}, {})",statusCode,reason); + this.closeStatusCode = statusCode; + this.closeReason = reason; + closeLatch.countDown(); + } + + @Override + public void onWebSocketError(Throwable cause) + { + errors.offer(cause); + } + + public void assertReceivedCloseEvent(int clientTimeoutMs, Matcher statusCodeMatcher, Matcher reasonMatcher) + throws InterruptedException + { + assertThat("Client Close Event Occurred", closeLatch.await(clientTimeoutMs, TimeUnit.MILLISECONDS), is(true)); + assertThat("Client Close Event Status Code", closeStatusCode, statusCodeMatcher); + if (reasonMatcher == null) + { + assertThat("Client Close Event Reason", closeReason, nullValue()); + } + else + { + assertThat("Client Close Event Reason", closeReason, reasonMatcher); + } + } +} diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/ContainerEndpoint.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/ContainerEndpoint.java new file mode 100644 index 000000000000..c43506121983 --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/ContainerEndpoint.java @@ -0,0 +1,67 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests.server; + +import java.util.Collection; + +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.common.WebSocketContainer; + +/** + * On Message, return container information + */ +public class ContainerEndpoint extends AbstractCloseEndpoint +{ + private final WebSocketContainer container; + private Session session; + + public ContainerEndpoint(WebSocketContainer container) + { + super(); + this.container = container; + } + + @Override + public void onWebSocketText(String message) + { + LOG.debug("onWebSocketText({})",message); + if (message.equalsIgnoreCase("openSessions")) + { + Collection sessions = container.getOpenSessions(); + + StringBuilder ret = new StringBuilder(); + ret.append("openSessions.size=").append(sessions.size()).append('\n'); + int idx = 0; + for (Session sess : sessions) + { + ret.append('[').append(idx++).append("] ").append(sess.toString()).append('\n'); + } + session.getRemote().sendStringByFuture(ret.toString()); + } + session.close(StatusCode.NORMAL,"ContainerEndpoint"); + } + + @Override + public void onWebSocketConnect(Session sess) + { + LOG.debug("onWebSocketConnect({})",sess); + this.session = sess; + } +} diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/FastCloseEndpoint.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/FastCloseEndpoint.java new file mode 100644 index 000000000000..d9d455735022 --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/FastCloseEndpoint.java @@ -0,0 +1,35 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests.server; + +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; + +/** + * On Connect, close socket + */ +public class FastCloseEndpoint extends AbstractCloseEndpoint +{ + @Override + public void onWebSocketConnect(Session sess) + { + LOG.debug("onWebSocketConnect({})", sess); + sess.close(StatusCode.NORMAL, "FastCloseServer"); + } +} diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/FastFailEndpoint.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/FastFailEndpoint.java new file mode 100644 index 000000000000..bf1f9ba670b7 --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/FastFailEndpoint.java @@ -0,0 +1,36 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests.server; + +import org.eclipse.jetty.websocket.api.Session; + +/** + * On Connect, throw unhandled exception + */ +public class FastFailEndpoint extends AbstractCloseEndpoint +{ + @Override + public void onWebSocketConnect(Session sess) + { + LOG.debug("onWebSocketConnect({})",sess); + // Test failure due to unhandled exception + // this should trigger a fast-fail closure during open/connect + throw new RuntimeException("Intentional FastFail"); + } +} diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/ServerCloseCreator.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/ServerCloseCreator.java new file mode 100644 index 000000000000..fd507d2a5fac --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/ServerCloseCreator.java @@ -0,0 +1,82 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests.server; + +import java.util.concurrent.LinkedBlockingQueue; +import javax.servlet.ServletContext; + +import org.eclipse.jetty.websocket.common.WebSocketContainer; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketCreator; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.eclipse.jetty.websocket.tests.EchoSocket; + +import static java.util.concurrent.TimeUnit.SECONDS; + +public class ServerCloseCreator implements WebSocketCreator +{ + private final WebSocketServletFactory serverFactory; + private LinkedBlockingQueue createdSocketQueue = new LinkedBlockingQueue<>(); + + public ServerCloseCreator(WebSocketServletFactory serverFactory) + { + this.serverFactory = serverFactory; + } + + @Override + public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) + { + AbstractCloseEndpoint closeSocket = null; + + if (req.hasSubProtocol("fastclose")) + { + closeSocket = new FastCloseEndpoint(); + resp.setAcceptedSubProtocol("fastclose"); + } + else if (req.hasSubProtocol("fastfail")) + { + closeSocket = new FastFailEndpoint(); + resp.setAcceptedSubProtocol("fastfail"); + } + else if (req.hasSubProtocol("container")) + { + ServletContext context = req.getHttpServletRequest().getServletContext(); + WebSocketContainer container = + (WebSocketContainer) context.getAttribute(WebSocketContainer.class.getName()); + closeSocket = new ContainerEndpoint(container); + resp.setAcceptedSubProtocol("container"); + } + + if (closeSocket != null) + { + createdSocketQueue.offer(closeSocket); + return closeSocket; + } + else + { + return new EchoSocket(); + } + } + + public AbstractCloseEndpoint pollLastCreated() throws InterruptedException + { + return createdSocketQueue.poll(5, SECONDS); + } +} diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/ServerCloseTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/ServerCloseTest.java new file mode 100644 index 000000000000..0b1c6b32482b --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/ServerCloseTest.java @@ -0,0 +1,278 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests.server; + +import java.net.URI; +import java.nio.channels.ClosedChannelException; +import java.time.Duration; +import java.util.concurrent.Future; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.log.StacklessLogging; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.api.util.WSURI; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.common.WebSocketSessionImpl; +import org.eclipse.jetty.websocket.server.JettyWebSocketServletContainerInitializer; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.eclipse.jetty.websocket.tests.CloseTrackingEndpoint; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; + +/** + * Tests various close scenarios + */ +public class ServerCloseTest +{ + private WebSocketClient client; + private Server server; + private ServerCloseCreator serverEndpointCreator; + + @BeforeEach + public void startServer() throws Exception + { + server = new Server(); + + ServerConnector connector = new ServerConnector(server); + connector.setPort(0); + server.addConnector(connector); + + ServletContextHandler context = new ServletContextHandler(); + context.setContextPath("/"); + + ServletHolder closeEndpoint = new ServletHolder(new WebSocketServlet() + { + @Override + public void configure(WebSocketServletFactory factory) + { + factory.setIdleTimeout(Duration.ofSeconds(2)); + serverEndpointCreator = new ServerCloseCreator(factory); + factory.setCreator(serverEndpointCreator); + } + }); + context.addServlet(closeEndpoint, "/ws"); + JettyWebSocketServletContainerInitializer.configure(context); + + HandlerList handlers = new HandlerList(); + handlers.addHandler(context); + handlers.addHandler(new DefaultHandler()); + + server.setHandler(handlers); + + server.start(); + } + + @AfterEach + public void stopServer() throws Exception + { + server.stop(); + } + + @BeforeEach + public void startClient() throws Exception + { + client = new WebSocketClient(); + client.setIdleTimeout(Duration.ofSeconds(2)); + client.start(); + } + + @AfterEach + public void stopClient() throws Exception + { + client.stop(); + } + + private void close(Session session) + { + if (session != null) + { + session.close(); + } + } + + /** + * Test fast close (bug #403817) + * + * @throws Exception on test failure + */ + @Test + public void fastClose() throws Exception + { + ClientUpgradeRequest request = new ClientUpgradeRequest(); + request.setSubProtocols("fastclose"); + CloseTrackingEndpoint clientEndpoint = new CloseTrackingEndpoint(); + + URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws")); + Future futSession = client.connect(clientEndpoint, wsUri, request); + + Session session = null; + try + { + session = futSession.get(5, SECONDS); + + // Verify that client got close + clientEndpoint.assertReceivedCloseEvent(5000, is(StatusCode.NORMAL), containsString("")); + + // Verify that server socket got close event + AbstractCloseEndpoint serverEndpoint = serverEndpointCreator.pollLastCreated(); + assertThat("Fast Close Latch", serverEndpoint.closeLatch.await(5, SECONDS), is(true)); + assertThat("Fast Close.statusCode", serverEndpoint.closeStatusCode, is(StatusCode.ABNORMAL)); + } + finally + { + close(session); + } + } + + /** + * Test fast fail (bug #410537) + * + * @throws Exception on test failure + */ + @Test + public void fastFail() throws Exception + { + ClientUpgradeRequest request = new ClientUpgradeRequest(); + request.setSubProtocols("fastfail"); + CloseTrackingEndpoint clientEndpoint = new CloseTrackingEndpoint(); + + URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws")); + Future futSession = client.connect(clientEndpoint, wsUri, request); + + Session session = null; + try(StacklessLogging ignore = new StacklessLogging(FastFailEndpoint.class, WebSocketSessionImpl.class)) + { + session = futSession.get(5, SECONDS); + + // Verify that client got close indicating SERVER_ERROR + clientEndpoint.assertReceivedCloseEvent(5000, is(StatusCode.SERVER_ERROR), containsString("Intentional FastFail")); + + // Verify that server socket got close event + AbstractCloseEndpoint serverEndpoint = serverEndpointCreator.pollLastCreated(); + serverEndpoint.assertReceivedCloseEvent(5000, is(StatusCode.SERVER_ERROR), containsString("Intentional FastFail")); + + // Validate errors (must be "java.lang.RuntimeException: Intentional Exception from onWebSocketConnect") + assertThat("socket.onErrors", serverEndpoint.errors.size(), greaterThanOrEqualTo(1)); + Throwable cause = serverEndpoint.errors.poll(5, SECONDS); + assertThat("Error type", cause, instanceOf(RuntimeException.class)); + // ... with optional ClosedChannelException + cause = serverEndpoint.errors.peek(); + if (cause != null) + { + assertThat("Error type", cause, instanceOf(ClosedChannelException.class)); + } + } + finally + { + close(session); + } + } + + @Test + public void dropConnection() throws Exception + { + ClientUpgradeRequest request = new ClientUpgradeRequest(); + request.setSubProtocols("container"); + CloseTrackingEndpoint clientEndpoint = new CloseTrackingEndpoint(); + + URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws")); + Future futSession = client.connect(clientEndpoint, wsUri, request); + + Session session = null; + try(StacklessLogging ignore = new StacklessLogging(WebSocketSessionImpl.class)) + { + session = futSession.get(5, SECONDS); + + // Cause a client endpoint failure + clientEndpoint.getEndPoint().close(); + + // Verify that client got close + clientEndpoint.assertReceivedCloseEvent(5000, is(StatusCode.ABNORMAL), containsString("Disconnected")); + + // Verify that server socket got close event + AbstractCloseEndpoint serverEndpoint = serverEndpointCreator.pollLastCreated(); + serverEndpoint.assertReceivedCloseEvent(5000, is(StatusCode.ABNORMAL), containsString("Disconnected")); + } finally + { + close(session); + } + } + + + /** + * Test session open session cleanup (bug #474936) + * + * @throws Exception on test failure + */ + @Test + public void testOpenSessionCleanup() throws Exception + { + fastFail(); + fastClose(); + dropConnection(); + + ClientUpgradeRequest request = new ClientUpgradeRequest(); + request.setSubProtocols("container"); + CloseTrackingEndpoint clientEndpoint = new CloseTrackingEndpoint(); + + URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws")); + Future futSession = client.connect(clientEndpoint, wsUri, request); + + Session session = null; + try(StacklessLogging ignore = new StacklessLogging(WebSocketSessionImpl.class)) + { + session = futSession.get(5, SECONDS); + + session.getRemote().sendString("openSessions"); + + String msg = clientEndpoint.messageQueue.poll(5, SECONDS); + + assertThat("Should only have 1 open session", msg, containsString("openSessions.size=1\n")); + + // Verify that client got close + clientEndpoint.assertReceivedCloseEvent(5000, is(StatusCode.NORMAL), containsString("ContainerEndpoint")); + + // Verify that server socket got close event + AbstractCloseEndpoint serverEndpoint = serverEndpointCreator.pollLastCreated(); + assertThat("Server Open Sessions Latch", serverEndpoint.closeLatch.await(5, SECONDS), is(true)); + assertThat("Server Open Sessions.statusCode", serverEndpoint.closeStatusCode, is(StatusCode.NORMAL)); + assertThat("Server Open Sessions.errors", serverEndpoint.errors.size(), is(0)); + } + finally + { + close(session); + } + } +} diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/SlowServerEndpoint.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/SlowServerEndpoint.java new file mode 100644 index 000000000000..ebdce28cfc46 --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/SlowServerEndpoint.java @@ -0,0 +1,76 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests.server; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; + +@WebSocket +public class SlowServerEndpoint +{ + private static final Logger LOG = Log.getLogger(SlowServerEndpoint.class); + + @OnWebSocketMessage + public void onMessage(Session session, String msg) + { + ThreadLocalRandom random = ThreadLocalRandom.current(); + + if (msg.startsWith("send-slow|")) + { + int idx = msg.indexOf('|'); + int msgCount = Integer.parseInt(msg.substring(idx + 1)); + CompletableFuture.runAsync(() -> + { + for (int i = 0; i < msgCount; i++) + { + try + { + session.getRemote().sendString("Hello/" + i + "/"); + // fake some slowness + TimeUnit.MILLISECONDS.sleep(random.nextInt(2000)); + } + catch (Throwable cause) + { + LOG.warn(cause); + } + } + }); + } + else + { + // echo message. + try + { + session.getRemote().sendString(msg); + } + catch (IOException ignore) + { + LOG.ignore(ignore); + } + } + } +} diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/SlowServerTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/SlowServerTest.java new file mode 100644 index 000000000000..686768869773 --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/SlowServerTest.java @@ -0,0 +1,143 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests.server; + +import java.net.URI; +import java.time.Duration; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.util.WSURI; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.server.JettyWebSocketServletContainerInitializer; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.eclipse.jetty.websocket.tests.CloseTrackingEndpoint; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +/** + * This Regression Test Exists because of Server side Idle timeout, Write, and Generator bugs. + */ +public class SlowServerTest +{ + private Server server; + private WebSocketClient client; + + @BeforeEach + public void startClient() throws Exception + { + client = new WebSocketClient(); + client.setIdleTimeout(Duration.ofSeconds(60)); + client.start(); + } + + @BeforeEach + public void startServer() throws Exception + { + server = new Server(); + + ServerConnector connector = new ServerConnector(server); + connector.setPort(0); + server.addConnector(connector); + + ServletContextHandler context = new ServletContextHandler(); + context.setContextPath("/"); + + ServletHolder websocket = new ServletHolder(new WebSocketServlet() + { + @Override + public void configure(WebSocketServletFactory factory) + { + factory.register(SlowServerEndpoint.class); + } + }); + context.addServlet(websocket, "/ws"); + JettyWebSocketServletContainerInitializer.configure(context); + + HandlerList handlers = new HandlerList(); + handlers.addHandler(context); + handlers.addHandler(new DefaultHandler()); + + server.setHandler(handlers); + + server.start(); + } + + @AfterEach + public void stopClient() throws Exception + { + client.stop(); + } + + @AfterEach + public void stopServer() throws Exception + { + server.stop(); + } + + @Test + public void testServerSlowToSend() throws Exception + { + CloseTrackingEndpoint clientEndpoint = new CloseTrackingEndpoint(); + client.setIdleTimeout(Duration.ofSeconds(60)); + + URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws")); + Future future = client.connect(clientEndpoint, wsUri); + + Session session = null; + try + { + // Confirm connected + session = future.get(5, SECONDS); + + int messageCount = 10; + + session.getRemote().sendString("send-slow|" + messageCount); + + // Verify receive + LinkedBlockingQueue responses = clientEndpoint.messageQueue; + + for (int i = 0; i < messageCount; i++) + { + String response = responses.poll(5, SECONDS); + assertThat("Server Message[" + i + "]", response, is("Hello/" + i + "/")); + } + } + finally + { + if (session != null) + { + session.close(); + } + } + } +} diff --git a/jetty-websocket/websocket-core/src/test/resources/jetty-logging.properties b/jetty-websocket/websocket-core/src/test/resources/jetty-logging.properties index d78a65493125..039b5a51133a 100644 --- a/jetty-websocket/websocket-core/src/test/resources/jetty-logging.properties +++ b/jetty-websocket/websocket-core/src/test/resources/jetty-logging.properties @@ -16,7 +16,7 @@ # ======================================================================== # org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog -org.eclipse.jetty.LEVEL=INFO +org.eclipse.jetty.LEVEL=WARN # org.eclipse.jetty.io.LEVEL=DEBUG # org.eclipse.jetty.websocket.core.LEVEL=DEBUG # org.eclipse.jetty.util.log.stderr.LONG=true