Skip to content

Commit

Permalink
Issue #3379 - Add tracking of WebSocket Sessions to various WebSocket…
Browse files Browse the repository at this point in the history
… Container APIs

+ Jetty WebSocket API now tracks Sessions and will close them on
  lifecycle stop
+ Javax WebSocket API now tracks Sessions and will close them on
  lifecycle stop
+ Adding Jetty WebSocket tests for proper close / session tracking

Signed-off-by: Joakim Erdfelt <joakim@erdfelt.com>
  • Loading branch information
joakime committed Feb 21, 2019
1 parent 3826397 commit 82adc20
Show file tree
Hide file tree
Showing 36 changed files with 2,779 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,38 @@

package org.eclipse.jetty.websocket.javax.common;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

import java.util.function.Consumer;
import javax.websocket.Extension;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;

import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;

public abstract class JavaxWebSocketContainer extends ContainerLifeCycle implements javax.websocket.WebSocketContainer
{
private final static Logger LOG = Log.getLogger(JavaxWebSocketContainer.class);
private final SessionTracker sessionTracker = new SessionTracker();
private long defaultAsyncSendTimeout = -1;
private int defaultMaxBinaryMessageBufferSize = 64 * 1024;
private int defaultMaxTextMessageBufferSize = 64 * 1024;
private List<JavaxWebSocketSessionListener> sessionListeners = new ArrayList<>();

public JavaxWebSocketContainer()
{
addSessionListener(sessionTracker);
addBean(sessionTracker);
}

public abstract ByteBufferPool getBufferPool();

Expand Down Expand Up @@ -100,7 +113,7 @@ public Set<Extension> getInstalledExtensions()
*/
public Set<javax.websocket.Session> getOpenSessions()
{
return new HashSet<>(getBeans(JavaxWebSocketSession.class));
return sessionTracker.getSessions();
}

public JavaxWebSocketFrameHandler newFrameHandler(Object websocketPojo, UpgradeRequest upgradeRequest, UpgradeResponse upgradeResponse,
Expand All @@ -118,4 +131,45 @@ public void setAsyncSendTimeout(long timeoutInMillis)
protected abstract WebSocketExtensionRegistry getExtensionRegistry();

protected abstract JavaxWebSocketFrameHandlerFactory getFrameHandlerFactory();

/**
* Register a WebSocketSessionListener with the container
*
* @param listener the listener
*/
public void addSessionListener(JavaxWebSocketSessionListener listener)
{
sessionListeners.add(listener);
}

/**
* Remove a WebSocketSessionListener from the container
*
* @param listener the listener
* @return true if listener was present and removed
*/
public boolean removeSessionListener(JavaxWebSocketSessionListener listener)
{
return sessionListeners.remove(listener);
}

/**
* Notify Session Listeners of events
*
* @param consumer the consumer to pass to each listener
*/
public void notifySessionListeners(Consumer<JavaxWebSocketSessionListener> consumer)
{
for (JavaxWebSocketSessionListener listener : sessionListeners)
{
try
{
consumer.accept(listener);
}
catch (Throwable x)
{
LOG.info("Exception while invoking listener " + listener, x);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import javax.websocket.CloseReason;
import javax.websocket.Decoder;
import javax.websocket.EndpointConfig;
Expand Down Expand Up @@ -229,7 +228,7 @@ public void onOpen(CoreSession coreSession, Callback callback)
if (openHandle != null)
openHandle.invoke();

container.addBean(session, true);
container.notifySessionListeners((listener) -> listener.onJavaxWebSocketSessionOpened(session));
callback.succeeded();
futureSession.complete(session);
}
Expand Down Expand Up @@ -283,8 +282,8 @@ public void onClosed(CloseStatus closeStatus, Callback callback)
CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.getCloseCode(closeStatus.getCode()), closeStatus.getReason());
closeHandle.invoke(closeReason);
}
container.removeBean(session);
callback.succeeded();
container.notifySessionListeners((listener) -> listener.onJavaxWebSocketSessionClosed(session));
}
catch (Throwable cause)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import javax.websocket.CloseReason;
import javax.websocket.EndpointConfig;
import javax.websocket.Extension;
Expand All @@ -39,10 +38,12 @@
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;

import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.javax.common.decoders.AvailableDecoders;
Expand Down Expand Up @@ -551,6 +552,25 @@ public boolean isSecure()
return coreSession.isSecure();
}

@Override
protected void doStop()
{
coreSession.close(CloseStatus.SHUTDOWN, "Container being shut down", new Callback()
{
@Override
public void succeeded()
{
coreSession.abort();
}

@Override
public void failed(Throwable x)
{
coreSession.abort();
}
});
}

@Override
public synchronized void removeMessageHandler(MessageHandler handler)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//

package org.eclipse.jetty.websocket.javax.common;

public interface JavaxWebSocketSessionListener
{
void onJavaxWebSocketSessionOpened(JavaxWebSocketSession session);

void onJavaxWebSocketSessionClosed(JavaxWebSocketSession session);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//

package org.eclipse.jetty.websocket.javax.common;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.websocket.Session;

import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.LifeCycle;

public class SessionTracker extends AbstractLifeCycle implements JavaxWebSocketSessionListener
{
private CopyOnWriteArraySet<JavaxWebSocketSession> sessions = new CopyOnWriteArraySet<>();

public Set<Session> getSessions()
{
return Collections.unmodifiableSet(sessions);
}

@Override
public void onJavaxWebSocketSessionOpened(JavaxWebSocketSession session)
{
sessions.add(session);
}

@Override
public void onJavaxWebSocketSessionClosed(JavaxWebSocketSession session)
{
sessions.remove(sessions);
}

@Override
protected void doStop() throws Exception
{
for (JavaxWebSocketSession session : sessions)
{
LifeCycle.stop(session);
}
super.doStop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,19 @@
import java.net.URI;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;

import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
Expand All @@ -43,14 +46,20 @@
import org.eclipse.jetty.websocket.client.impl.JettyClientUpgradeRequest;
import org.eclipse.jetty.websocket.common.JettyWebSocketFrameHandler;
import org.eclipse.jetty.websocket.common.JettyWebSocketFrameHandlerFactory;
import org.eclipse.jetty.websocket.common.SessionTracker;
import org.eclipse.jetty.websocket.common.WebSocketContainer;
import org.eclipse.jetty.websocket.common.WebSocketSessionListener;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;

public class WebSocketClient extends ContainerLifeCycle implements WebSocketPolicy
public class WebSocketClient extends ContainerLifeCycle implements WebSocketPolicy, WebSocketContainer
{
private static final Logger LOG = Log.getLogger(WebSocketClient.class);
private final WebSocketCoreClient coreClient;
private final int id = ThreadLocalRandom.current().nextInt();
private final JettyWebSocketFrameHandlerFactory frameHandlerFactory;
private final List<WebSocketSessionListener> sessionListeners = new CopyOnWriteArrayList<>();
private final SessionTracker sessionTracker = new SessionTracker();
private ClassLoader contextClassLoader;
private DecoratedObjectFactory objectFactory;
private WebSocketExtensionRegistry extensionRegistry;
Expand Down Expand Up @@ -88,7 +97,9 @@ private WebSocketClient(WebSocketCoreClient coreClient)
this.contextClassLoader = this.getClass().getClassLoader();
this.objectFactory = new DecoratedObjectFactory();
this.extensionRegistry = new WebSocketExtensionRegistry();
this.frameHandlerFactory = new JettyWebSocketFrameHandlerFactory(getExecutor());
this.frameHandlerFactory = new JettyWebSocketFrameHandlerFactory(this);
this.sessionListeners.add(sessionTracker);
addBean(sessionTracker);
}

public CompletableFuture<Session> connect(Object websocket, URI toUri) throws IOException
Expand Down Expand Up @@ -124,6 +135,34 @@ public WebSocketBehavior getBehavior()
return WebSocketBehavior.CLIENT;
}

@Override
public void addSessionListener(WebSocketSessionListener listener)
{
sessionListeners.add(listener);
}

@Override
public boolean removeSessionListener(WebSocketSessionListener listener)
{
return sessionListeners.remove(listener);
}

@Override
public void notifySessionListeners(Consumer<WebSocketSessionListener> consumer)
{
for (WebSocketSessionListener listener : sessionListeners)
{
try
{
consumer.accept(listener);
}
catch (Throwable x)
{
LOG.info("Exception while invoking listener " + listener, x);
}
}
}

@Override
public Duration getIdleTimeout()
{
Expand Down Expand Up @@ -224,6 +263,7 @@ public ByteBufferPool getBufferPool()
return getHttpClient().getByteBufferPool();
}

@Override
public Executor getExecutor()
{
return getHttpClient().getExecutor();
Expand All @@ -246,7 +286,7 @@ public DecoratedObjectFactory getObjectFactory()

public Collection<Session> getOpenSessions()
{
return Collections.unmodifiableSet(new HashSet<>(getBeans(Session.class)));
return sessionTracker.getSessions();
}

public JettyWebSocketFrameHandler newFrameHandler(Object websocketPojo, UpgradeRequest upgradeRequest, UpgradeResponse upgradeResponse,
Expand Down
Loading

0 comments on commit 82adc20

Please sign in to comment.