diff --git a/src/main/java/org/red5/server/BaseConnection.java b/src/main/java/org/red5/server/BaseConnection.java index 7b79742e..0521eb01 100644 --- a/src/main/java/org/red5/server/BaseConnection.java +++ b/src/main/java/org/red5/server/BaseConnection.java @@ -156,10 +156,7 @@ public void setStreamId(int id) { */ @ConstructorProperties(value = { "persistent" }) public BaseConnection() { - log.debug("New BaseConnection"); - this.type = PERSISTENT; - this.sessionId = RandomStringUtils.randomAlphanumeric(13).toUpperCase(); - log.debug("Generated session id: {}", sessionId); + this(PERSISTENT); } /** @@ -201,6 +198,7 @@ public BaseConnection(String type, String host, String remoteAddress, int remote this.path = path; this.sessionId = sessionId; this.params = params; + log.debug("Generated session id: {}", sessionId); } /** {@inheritDoc} */ @@ -226,13 +224,15 @@ public Semaphore getLock() { */ public void initialize(IClient client) { log.debug("initialize - client: {}", client); - if (this.client != null && this.client instanceof Client) { + if (this.client != null && this.client instanceof Client && !this.client.equals(client)) { // unregister old client + log.trace("Unregistering previous client: {}", this.client); ((Client) this.client).unregister(this, false); } this.client = client; - if (this.client instanceof Client) { + if (this.client instanceof Client && !((Client) this.client).isRegistered(this)) { // register new client + log.trace("Registering client: {}", this.client); ((Client) this.client).register(this); } } @@ -317,7 +317,7 @@ public IClient getClient() { * @return true if connection is bound to scope, false otherwise */ public boolean isConnected() { - log.debug("Connected: {}", (scope != null)); + //log.debug("Connected: {}", (scope != null)); return scope != null; } diff --git a/src/main/java/org/red5/server/Client.java b/src/main/java/org/red5/server/Client.java index 1da86c9d..47444ea3 100644 --- a/src/main/java/org/red5/server/Client.java +++ b/src/main/java/org/red5/server/Client.java @@ -55,25 +55,25 @@ public class Client extends AttributeStore implements IClient { */ protected static final String PERMISSIONS = IPersistable.TRANSIENT_PREFIX + "_red5_permissions"; + /** + * Client registry where Client is registered + */ + protected transient WeakReference registry; + /** * Connections this client is associated with. */ - protected CopyOnWriteArraySet connections = new CopyOnWriteArraySet(); + protected transient CopyOnWriteArraySet connections = new CopyOnWriteArraySet(); /** * Creation time as Timestamp */ - protected long creationTime; + protected final long creationTime; /** * Clients identifier */ - protected String id; - - /** - * Client registry where Client is registered - */ - protected WeakReference registry; + protected final String id; /** * Whether or not the bandwidth has been checked. @@ -81,8 +81,7 @@ public class Client extends AttributeStore implements IClient { protected boolean bandwidthChecked; /** - * Creates client, sets creation time and registers it in ClientRegistry - * DW: nope, does not currently register it in ClientRegistry! + * Creates client, sets creation time and registers it in ClientRegistry. * * @param id Client id * @param registry ClientRegistry @@ -90,12 +89,40 @@ public class Client extends AttributeStore implements IClient { @ConstructorProperties({ "id", "registry" }) public Client(String id, ClientRegistry registry) { super(); - this.id = id; + if (id != null) { + this.id = id; + } else { + this.id = registry.nextId(); + } + this.creationTime = System.currentTimeMillis(); // use a weak reference to prevent any hard-links to the registry this.registry = new WeakReference(registry); - this.creationTime = System.currentTimeMillis(); } + /** + * Creates client, sets creation time and registers it in ClientRegistry. + * + * @param id Client id + * @param creationTime Creation time + * @param registry ClientRegistry + */ + @ConstructorProperties({ "id", "creationTime", "registry" }) + public Client(String id, Long creationTime, ClientRegistry registry) { + super(); + if (id != null) { + this.id = id; + } else { + this.id = registry.nextId(); + } + if (creationTime != null) { + this.creationTime = creationTime; + } else { + this.creationTime = System.currentTimeMillis(); + } + // use a weak reference to prevent any hard-links to the registry + this.registry = new WeakReference(registry); + } + /** * Disconnects client from Red5 application */ @@ -149,15 +176,6 @@ public Set getConnections(IScope scope) { return Collections.emptySet(); } - /** - * Sets the time at which the client was created. - * - * @param creationTime - */ - public void setCreationTime(long creationTime) { - this.creationTime = creationTime; - } - /** * Returns the time at which the client was created. * @@ -168,14 +186,8 @@ public long getCreationTime() { } /** - * Sets the client id - */ - public void setId(String id) { - this.id = id; - } - - /** - * Returns the client id + * Returns the client id. + * * @return client id */ public String getId() { @@ -217,12 +229,28 @@ public List iterateScopeNameList() { return scopeNames; } + /** + * Returns registration status of given connection. + * + * @param conn + * @return + */ + public boolean isRegistered(IConnection conn) { + return connections.contains(conn); + } + /** * Associate connection with client * @param conn Connection object */ protected void register(IConnection conn) { - log.debug("Registering connection for this client {}", id); + if (log.isDebugEnabled()) { + if (conn == null) { + log.debug("Register null connection, client id: {}", id); + } else { + log.debug("Register connection ({}:{}) client id: {}", conn.getRemoteAddress(), conn.getRemotePort(), id); + } + } if (conn != null) { IScope scope = conn.getScope(); if (scope != null) { @@ -250,6 +278,7 @@ protected void unregister(IConnection conn) { * @param deleteIfNoConns Whether to delete this client if it no longer has any connections */ protected void unregister(IConnection conn, boolean deleteIfNoConns) { + log.debug("Unregister connection ({}:{}) client id: {}", conn.getRemoteAddress(), conn.getRemotePort(), id); // remove connection from connected scopes list connections.remove(conn); // If client is not connected to any scope any longer then remove @@ -321,8 +350,7 @@ public static Client from(CompositeData cd) { Client instance = null; if (cd.containsKey("id")) { String id = (String) cd.get("id"); - instance = new Client(id, null); - instance.setCreationTime((Long) cd.get("creationTime")); + instance = new Client(id, (Long) cd.get("creationTime"), null); instance.setAttribute(PERMISSIONS, cd.get(PERMISSIONS)); } if (cd.containsKey("attributes")) { diff --git a/src/main/java/org/red5/server/ClientRegistry.java b/src/main/java/org/red5/server/ClientRegistry.java index 024b4e05..94eaeab9 100644 --- a/src/main/java/org/red5/server/ClientRegistry.java +++ b/src/main/java/org/red5/server/ClientRegistry.java @@ -35,6 +35,8 @@ import org.red5.server.exception.ClientNotFoundException; import org.red5.server.exception.ClientRejectedException; import org.red5.server.jmx.mxbeans.ClientRegistryMXBean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.jmx.export.annotation.ManagedResource; /** @@ -45,6 +47,8 @@ @ManagedResource(objectName="org.red5.server:type=ClientRegistry,name=default", description="ClientRegistry") public class ClientRegistry implements IClientRegistry, ClientRegistryMXBean { + private Logger log = LoggerFactory.getLogger(ClientRegistry.class); + /** * Clients map */ @@ -89,19 +93,11 @@ public void addClient(IClient client) { * Add the client to the registry */ private void addClient(String id, IClient client) { - //check to see if the id already exists first + // check to see if the id already exists first if (!hasClient(id)) { clients.put(id, client); } else { - // DW the Client object is meant to be unifying connections from a remote user. But currently the only case we - // specify this currently is when we use a remoting session. So we actually just create an arbitrary id, which means - // RTMP connections from same user are not combined. - //get the next available client id - String newId = nextId(); - //update the client - client.setId(newId); - //add the client to the list - addClient(newId, client); + log.debug("Client id: {} already registered", id); } } @@ -140,7 +136,7 @@ protected boolean hasClients() { @SuppressWarnings("unchecked") protected Collection getClients() { if (!hasClients()) { - // Avoid creating new Collection object if no clients exist. + // avoid creating new Collection object if no clients exist. return Collections.EMPTY_SET; } return Collections.unmodifiableCollection(clients.values()); @@ -192,11 +188,15 @@ public IClient newClient(Object[] params) throws ClientNotFoundException, Client * @return Next client id */ public String nextId() { - //when we reach max int, reset to zero - if (nextId.get() == Integer.MAX_VALUE) { - nextId.set(0); - } - return String.format("%s", nextId.getAndIncrement()); + String id = "-1"; + do { + // when we reach max int, reset to zero + if (nextId.get() == Integer.MAX_VALUE) { + nextId.set(0); + } + id = String.format("%d", nextId.getAndIncrement()); + } while (hasClient(id)); + return id; } /** @@ -204,7 +204,7 @@ public String nextId() { * @return Previous client id */ public String previousId() { - return String.format("%s", nextId.get()); + return String.format("%d", nextId.get()); } /** diff --git a/src/main/java/org/red5/server/api/IClient.java b/src/main/java/org/red5/server/api/IClient.java index 80c3fd37..59926bb6 100644 --- a/src/main/java/org/red5/server/api/IClient.java +++ b/src/main/java/org/red5/server/api/IClient.java @@ -45,29 +45,24 @@ public interface IClient extends IAttributeStore { */ public static final String ID = "red5.client"; - /** - * Sets the clients id - * @param id client id - */ - public void setId(String id); - /** * Get the unique ID for this client. This will be generated by the server * if not passed upon connection from client-side Flex/Flash app. To assign a custom ID to the client use - * params object of - * {@link IApplication#appConnect(IConnection, Object[])} method, that - * contains 2nd all the rest values you pass to - * NetConnection.connect method. + * params object of {@link IApplication#appConnect(IConnection, Object[])} method, that + * contains 2nd all the rest values you pass to NetConnection.connect method. * * Example: * * At client side: - * NetConnection.connect( "http://localhost/killerapp/", "user123" ); + * + * NetConnection.connect("http://localhost/killerapp/", "user123"); + * * - * then at server side: - * public boolean appConnect( IConnection connection, Object[] params ){
+ * then at server side: + * + * public boolean appConnect( IConnection connection, Object[] params){ * try { - * connection.getClient().setId( (String) params[0] ); + * connection.getClient().setAttribute("param0", (String) params[0]); * } catch(Exception e){
* log.error("{}", e); * } diff --git a/src/main/java/org/red5/server/api/IConnection.java b/src/main/java/org/red5/server/api/IConnection.java index 6f9784a0..06f8284d 100644 --- a/src/main/java/org/red5/server/api/IConnection.java +++ b/src/main/java/org/red5/server/api/IConnection.java @@ -29,12 +29,8 @@ /** * The connection object. * - * Each connection has an associated client and scope. Connections may be - * persistent, polling, or transient. The aim of this interface is to provide - * basic connection methods shared between different types of connections - * - * Future subclasses: RTMPConnection, RemotingConnection, AJAXConnection, - * HttpConnection, etc + * Each connection has an associated client and scope. Connections may be persistent, polling, or transient. + * The aim of this interface is to provide basic connection methods shared between different types of connections. * * @author The Red5 Project * @author Luke Hubbard (luke@codegent.com) diff --git a/src/main/java/org/red5/server/api/Red5.java b/src/main/java/org/red5/server/api/Red5.java index 09ebda67..726c3e3f 100644 --- a/src/main/java/org/red5/server/api/Red5.java +++ b/src/main/java/org/red5/server/api/Red5.java @@ -122,7 +122,15 @@ public Red5() { * @param connection Thread local connection */ public static void setConnectionLocal(IConnection connection) { - log.debug("Set connection: {} with thread: {}", (connection != null ? connection.getSessionId() : null), Thread.currentThread().getName()); + if (log.isDebugEnabled()) { + log.debug("Set connection: {} with thread: {}", (connection != null ? connection.getSessionId() : null), Thread.currentThread().getName()); + try { + StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace(); + StackTraceElement stackTraceElement = stackTraceElements[2]; + log.debug("Caller: {}.{} #{}", stackTraceElement.getClassName(), stackTraceElement.getMethodName(), stackTraceElement.getLineNumber()); + } catch (Exception e) { + } + } if (connection != null) { connThreadLocal.set(new WeakReference(connection)); IScope scope = connection.getScope(); @@ -144,10 +152,11 @@ public static void setConnectionLocal(IConnection connection) { * @return Connection object */ public static IConnection getConnectionLocal() { - log.debug("Get connection on thread: {}", Thread.currentThread().getName()); WeakReference ref = connThreadLocal.get(); if (ref != null) { - return ref.get(); + IConnection connection = ref.get(); + log.debug("Get connection: {} on thread: {}", (connection != null ? connection.getSessionId() : null), Thread.currentThread().getName()); + return connection; } else { return null; } diff --git a/src/main/java/org/red5/server/api/scope/IScope.java b/src/main/java/org/red5/server/api/scope/IScope.java index c190df52..03b94058 100644 --- a/src/main/java/org/red5/server/api/scope/IScope.java +++ b/src/main/java/org/red5/server/api/scope/IScope.java @@ -43,6 +43,7 @@ * * @author The Red5 Project * @author Luke Hubbard (luke@codegent.com) + * @author Paul Gregoire (mondain@gmail.com) */ public interface IScope extends IBasicScope, ResourcePatternResolver, IServiceHandlerProvider { diff --git a/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java b/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java index e7a16f0b..5045578a 100755 --- a/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java +++ b/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java @@ -1170,8 +1170,9 @@ protected void writingMessage(Packet message) { * Increases number of read messages by one. Updates number of bytes read. */ public void messageReceived() { - if (log.isTraceEnabled()) + if (log.isTraceEnabled()) { log.trace("messageReceived"); + } readMessages.incrementAndGet(); // trigger generation of BytesRead messages updateBytesRead(); @@ -1226,8 +1227,9 @@ public String messageTypeToName(byte headerDataType) { */ @SuppressWarnings("unchecked") public void handleMessageReceived(Packet message) { - if (log.isTraceEnabled()) + if (log.isTraceEnabled()) { log.trace("handleMessageReceived - {}", sessionId); + } final byte dataType = message.getHeader().getDataType(); // route these types outside the executor switch(dataType) { @@ -1268,22 +1270,25 @@ public void handleMessageReceived(Packet message) { ListenableFuture future = (ListenableFuture) executor.submitListenable(new ListenableFutureTask(task)); currentQueueSize.incrementAndGet(); future.addCallback(new ListenableFutureCallback() { + private int getProcessingTime() { - return (int) ((System.nanoTime() - startTime)/1000); + return (int) ((System.nanoTime() - startTime) / 1000); } public void onFailure(Throwable t) { - currentQueueSize.decrementAndGet(); - - if (log.isWarnEnabled()) + currentQueueSize.decrementAndGet(); + if (log.isWarnEnabled()) { log.warn("onFailure - session: {}, msgtype: {}, processingTime: {}, packetNum: {}", sessionId, getMessageType(sentMessage), getProcessingTime(), packetNumber); + } } public void onSuccess(Boolean success) { currentQueueSize.decrementAndGet(); - if (log.isDebugEnabled()) + if (log.isDebugEnabled()) { log.debug("onSuccess - session: {}, msgType: {}, processingTime: {}, packetNum: {}", sessionId, getMessageType(sentMessage), getProcessingTime(), packetNumber); + } } + }); } catch (TaskRejectedException tre) { Throwable[] suppressed = tre.getSuppressed(); @@ -1358,8 +1363,9 @@ public void sendSharedObjectMessage(String name, int currentVersion, boolean per try { // get the channel for so updates Channel channel = getChannel((byte) 3); - if (log.isTraceEnabled()) + if (log.isTraceEnabled()) { log.trace("Send to channel: {}", channel); + } channel.write(syncMessage); } catch (Exception e) { log.warn("Exception sending shared object", e); @@ -1369,8 +1375,9 @@ public void sendSharedObjectMessage(String name, int currentVersion, boolean per /** {@inheritDoc} */ public void ping() { long newPingTime = System.currentTimeMillis(); - if (log.isDebugEnabled()) + if (log.isDebugEnabled()) { log.debug("Send Ping: session=[{}], currentTime=[{}], lastPingTime=[{}]", new Object[] { getSessionId(), newPingTime, lastPingSentOn.get() }); + } if (lastPingSentOn.get() == 0) { lastPongReceivedOn.set(newPingTime); } @@ -1391,12 +1398,14 @@ public void ping() { public void pingReceived(Ping pong) { long now = System.currentTimeMillis(); long previousPingValue = (int) (lastPingSentOn.get() & 0xffffffff); - if (log.isDebugEnabled()) + if (log.isDebugEnabled()) { log.debug("Pong Rx: session=[{}] at {} with value {}, previous received at {}", new Object[] { getSessionId(), now, pong.getValue2(), previousPingValue }); + } if (pong.getValue2() == previousPingValue) { lastPingRoundTripTime.set((int) (now & 0xffffffff) - pong.getValue2()); - if (log.isDebugEnabled()) + if (log.isDebugEnabled()) { log.debug("Ping response session=[{}], RTT=[{} ms]", new Object[] { getSessionId(), lastPingRoundTripTime.get() }); + } } else { int pingRtt = (int) (now & 0xffffffff) - pong.getValue2(); log.info("Pong delayed: session=[{}], ping response took [{} ms] to arrive. Connection may be congested.", new Object[] { getSessionId(), pingRtt }); diff --git a/src/main/java/org/red5/server/net/rtmp/ReceivedMessageTask.java b/src/main/java/org/red5/server/net/rtmp/ReceivedMessageTask.java index ae5d5b20..d694fd21 100644 --- a/src/main/java/org/red5/server/net/rtmp/ReceivedMessageTask.java +++ b/src/main/java/org/red5/server/net/rtmp/ReceivedMessageTask.java @@ -12,7 +12,7 @@ public final class ReceivedMessageTask implements Callable { private final static Logger log = LoggerFactory.getLogger(ReceivedMessageTask.class); - private RTMPConnection conn; + private final RTMPConnection conn; private final IRTMPHandler handler; diff --git a/src/main/java/org/red5/server/net/rtmp/codec/RTMPMinaProtocolEncoder.java b/src/main/java/org/red5/server/net/rtmp/codec/RTMPMinaProtocolEncoder.java index ca94d8dd..054ee0ce 100644 --- a/src/main/java/org/red5/server/net/rtmp/codec/RTMPMinaProtocolEncoder.java +++ b/src/main/java/org/red5/server/net/rtmp/codec/RTMPMinaProtocolEncoder.java @@ -50,9 +50,13 @@ public void encode(IoSession session, Object message, ProtocolEncoderOutput out) log.trace("Session id: {}", sessionId); RTMPConnection conn = (RTMPConnection) RTMPConnManager.getInstance().getConnectionBySessionId(sessionId); if (conn != null) { + RTMPConnection prev = null; // look for and compare the connection local; set it from the session if (!conn.equals((RTMPConnection) Red5.getConnectionLocal())) { log.debug("Connection local ({}) didn't match io session ({})", (Red5.getConnectionLocal() != null ? Red5.getConnectionLocal().getSessionId() : "null"), sessionId); + // keep track of conn we're replacing + prev = (RTMPConnection) Red5.getConnectionLocal(); + // replace conn with the one from the session id lookup Red5.setConnectionLocal(conn); } final Semaphore lock = conn.getEncoderLock(); @@ -70,37 +74,22 @@ public void encode(IoSession session, Object message, ProtocolEncoderOutput out) log.trace("Writing output data"); out.write(buf); } else { - /* - LinkedList chunks = Chunker.chunk(buf, requestedWriteChunkSize, targetChunkSize); - log.trace("Writing output data in {} chunks", chunks.size()); - for (IoBuffer chunk : chunks) { - out.write(chunk); - } - chunks.clear(); - chunks = null; - */ int sentChunks = Chunker.chunkAndWrite(out, buf, requestedWriteChunkSize, targetChunkSize); log.trace("Wrote {} chunks", sentChunks); } } else { log.trace("Response buffer was null after encoding"); } - // WriteFuture future = out.flush(); - // if (future != null) { - // future.addListener(new IoFutureListener() { - // @Override - // public void operationComplete(WriteFuture future) { - // //log.debug("Buffer freed"); - // buf.free(); - // } - // }); - // } } catch (Exception ex) { log.error("Exception during encode", ex); } finally { log.trace("Encoder lock releasing.. {}", conn.getSessionId()); lock.release(); } + // set connection local back to previous value + if (prev != null) { + Red5.setConnectionLocal(prev); + } } else { log.debug("Connection is no longer available for encoding, may have been closed already"); } diff --git a/src/main/java/org/red5/server/scope/BasicScope.java b/src/main/java/org/red5/server/scope/BasicScope.java index 1194be08..805222c1 100644 --- a/src/main/java/org/red5/server/scope/BasicScope.java +++ b/src/main/java/org/red5/server/scope/BasicScope.java @@ -45,6 +45,11 @@ public abstract class BasicScope implements IBasicScope, Comparable protected static Logger log = LoggerFactory.getLogger(BasicScope.class); + /** + * Scheduled job name for keep alive check + */ + private String keepAliveJobName; + /** * Parent scope. Scopes can be nested. * @@ -97,12 +102,7 @@ public abstract class BasicScope implements IBasicScope, Comparable /** * List of event listeners */ - protected CopyOnWriteArraySet listeners; - - /** - * Scheduled job name for keep alive check - */ - private String keepAliveJobName; + protected transient CopyOnWriteArraySet listeners; /** * Creates unnamed scope diff --git a/src/main/java/org/red5/server/scope/Scope.java b/src/main/java/org/red5/server/scope/Scope.java index dd7b7577..a0dbebd0 100644 --- a/src/main/java/org/red5/server/scope/Scope.java +++ b/src/main/java/org/red5/server/scope/Scope.java @@ -92,70 +92,70 @@ public class Scope extends BasicScope implements IScope, IScopeStatistics, Scope private static final int UNSET = -1; /** - * Auto-start flag + * Timestamp the scope was created. */ - private boolean autoStart = true; + private long creationTime; /** - * Child scopes + * Scope nesting depth, unset by default */ - private final ConcurrentScopeSet children; + private int depth = UNSET; /** - * Connected clients map + * Whether scope is enabled */ - private final CopyOnWriteArraySet clients; + private boolean enabled = true; /** - * Storage for scope attributes + * Whether scope is running */ - protected final AttributeStore attributes = new AttributeStore(); + private boolean running; /** - * Statistics about connections to the scope. + * Auto-start flag */ - protected final StatisticsCounter connectionStats = new StatisticsCounter(); + private boolean autoStart = true; /** - * Statistics about sub-scopes. + * Scope context */ - protected final StatisticsCounter subscopeStats = new StatisticsCounter(); + private transient IContext context; /** - * Scope context + * Scope handler */ - private IContext context; + private transient IScopeHandler handler; /** - * Timestamp the scope was created. + * Registered service handlers for this scope. The map is created on-demand + * only if it's accessed for writing. */ - private long creationTime; + private transient volatile ConcurrentMap serviceHandlers; /** - * Scope nesting depth, unset by default + * Child scopes */ - private int depth = UNSET; + private final transient ConcurrentScopeSet children; /** - * Whether scope is enabled + * Connected clients map */ - private boolean enabled = true; + private final transient CopyOnWriteArraySet clients; /** - * Scope handler + * Statistics about connections to the scope. */ - private IScopeHandler handler; + protected final transient StatisticsCounter connectionStats = new StatisticsCounter(); /** - * Whether scope is running + * Statistics about sub-scopes. */ - private boolean running; + protected final transient StatisticsCounter subscopeStats = new StatisticsCounter(); /** - * Registered service handlers for this scope. The map is created on-demand - * only if it's accessed for writing. + * Storage for scope attributes */ - private volatile ConcurrentMap serviceHandlers; + protected final AttributeStore attributes = new AttributeStore(); /** * Mbean object name. @@ -1440,6 +1440,7 @@ public IBasicScope getBasicScope(ScopeType type, String name) { * Builder pattern */ public final static class Builder { + private IScope parent; private ScopeType type; diff --git a/src/main/java/org/red5/server/so/ClientSharedObject.java b/src/main/java/org/red5/server/so/ClientSharedObject.java index c1277eaa..81a47b2b 100644 --- a/src/main/java/org/red5/server/so/ClientSharedObject.java +++ b/src/main/java/org/red5/server/so/ClientSharedObject.java @@ -56,17 +56,17 @@ public class ClientSharedObject extends SharedObject implements IClientSharedObj /** * Synchronization lock */ - private final ReentrantLock lock = new ReentrantLock(); + private final transient ReentrantLock lock = new ReentrantLock(); /** * Set of listeners */ - private Set listeners = new CopyOnWriteArraySet(); + private transient CopyOnWriteArraySet listeners = new CopyOnWriteArraySet(); /** * Set of event handlers */ - private ConcurrentMap handlers = new ConcurrentHashMap(1, 0.9f, 1); + private transient ConcurrentMap handlers = new ConcurrentHashMap(1, 0.9f, 1); /** * Create new client SO with diff --git a/src/main/java/org/red5/server/so/ISharedObjectEvent.java b/src/main/java/org/red5/server/so/ISharedObjectEvent.java index 1f0ea363..7d42b5ec 100644 --- a/src/main/java/org/red5/server/so/ISharedObjectEvent.java +++ b/src/main/java/org/red5/server/so/ISharedObjectEvent.java @@ -65,4 +65,5 @@ enum Type { * @return the value of the event */ public Object getValue(); + } diff --git a/src/main/java/org/red5/server/so/SharedObject.java b/src/main/java/org/red5/server/so/SharedObject.java index 339913c9..ff666ebe 100644 --- a/src/main/java/org/red5/server/so/SharedObject.java +++ b/src/main/java/org/red5/server/so/SharedObject.java @@ -119,12 +119,12 @@ public class SharedObject extends AttributeStore implements ISharedObjectStatist /** * Synchronization events */ - protected volatile ConcurrentLinkedQueue syncEvents = new ConcurrentLinkedQueue(); + protected transient volatile ConcurrentLinkedQueue syncEvents = new ConcurrentLinkedQueue(); /** * Listeners */ - protected volatile CopyOnWriteArraySet listeners = new CopyOnWriteArraySet(); + protected transient volatile CopyOnWriteArraySet listeners = new CopyOnWriteArraySet(); /** * Event listener, actually RTMP connection @@ -144,7 +144,7 @@ public class SharedObject extends AttributeStore implements ISharedObjectStatist /** * Manages listener statistics. */ - protected StatisticsCounter listenerStats = new StatisticsCounter(); + protected transient StatisticsCounter listenerStats = new StatisticsCounter(); /** * Counts number of "change" events. diff --git a/src/main/java/org/red5/server/so/SharedObjectScope.java b/src/main/java/org/red5/server/so/SharedObjectScope.java index 015e8ff7..31a43b51 100644 --- a/src/main/java/org/red5/server/so/SharedObjectScope.java +++ b/src/main/java/org/red5/server/so/SharedObjectScope.java @@ -61,22 +61,22 @@ public class SharedObjectScope extends BasicScope implements ISharedObject, Stat /** * Lock to synchronize shared object updates from multiple threads */ - private final ReentrantLock lock = new ReentrantLock(); + private final transient ReentrantLock lock = new ReentrantLock(); /** * Server-side listeners */ - private CopyOnWriteArraySet serverListeners = new CopyOnWriteArraySet(); + private transient CopyOnWriteArraySet serverListeners = new CopyOnWriteArraySet(); /** * Event handlers */ - private ConcurrentMap handlers = new ConcurrentHashMap(1, 0.9f, 1); + private transient ConcurrentMap handlers = new ConcurrentHashMap(1, 0.9f, 1); /** * Security handlers */ - private CopyOnWriteArraySet securityHandlers = new CopyOnWriteArraySet(); + private transient CopyOnWriteArraySet securityHandlers = new CopyOnWriteArraySet(); /** * Scoped shared object diff --git a/src/main/java/org/red5/server/stream/AbstractStream.java b/src/main/java/org/red5/server/stream/AbstractStream.java index ec1713f9..34b38a96 100644 --- a/src/main/java/org/red5/server/stream/AbstractStream.java +++ b/src/main/java/org/red5/server/stream/AbstractStream.java @@ -69,7 +69,7 @@ public abstract class AbstractStream implements IStream { /** * Lock for protecting critical sections */ - protected final Semaphore lock = new Semaphore(1, true); + protected final transient Semaphore lock = new Semaphore(1, true); /** * Return stream name diff --git a/src/main/java/org/red5/server/stream/ClientBroadcastStream.java b/src/main/java/org/red5/server/stream/ClientBroadcastStream.java index 1a239be1..0f1202c8 100644 --- a/src/main/java/org/red5/server/stream/ClientBroadcastStream.java +++ b/src/main/java/org/red5/server/stream/ClientBroadcastStream.java @@ -130,7 +130,7 @@ public class ClientBroadcastStream extends AbstractClientStream implements IClie /** * Output endpoint that providers use */ - protected IMessageOutput connMsgOut; + protected transient IMessageOutput connMsgOut; /** * Stores timestamp of first packet @@ -140,7 +140,7 @@ public class ClientBroadcastStream extends AbstractClientStream implements IClie /** * Pipe for live streaming */ - protected IPipe livePipe; + protected transient IPipe livePipe; /** * Stream published name @@ -160,17 +160,17 @@ public class ClientBroadcastStream extends AbstractClientStream implements IClie /** * Stores statistics about subscribers. */ - private StatisticsCounter subscriberStats = new StatisticsCounter(); + private transient StatisticsCounter subscriberStats = new StatisticsCounter(); /** * Listeners to get notified about received packets. */ - protected Set listeners = new CopyOnWriteArraySet(); + protected transient Set listeners = new CopyOnWriteArraySet(); /** * Recording listener */ - private WeakReference recordingListener; + private transient WeakReference recordingListener; protected long latestTimeStamp = -1;