Skip to content

Commit

Permalink
Merge pull request #404 from ably/fix-cm-thread-exit
Browse files Browse the repository at this point in the history
Fix cm thread exit
  • Loading branch information
funkyboy authored Jun 28, 2018
2 parents fb14716 + 8c079cf commit 59100ab
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 75 deletions.
181 changes: 106 additions & 75 deletions lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.List;


public class ConnectionManager implements Runnable, ConnectListener {
public class ConnectionManager implements ConnectListener {

private static final String TAG = ConnectionManager.class.getName();
private static final String INTERNET_CHECK_URL = "http://internet-up.ably-realtime.com/is-the-internet-up.txt";
Expand Down Expand Up @@ -223,12 +223,24 @@ public synchronized StateInfo getConnectionState() {
return state;
}

private void setState(StateIndication newState) {
Log.v(TAG, "setState(): setting " + newState.state);
/**
* Set the state the given state
* @param newState
* @return changed
*/
private boolean setState(StateIndication newState) {
ConnectionStateListener.ConnectionStateChange change;
StateInfo newStateInfo = states.get(newState.state);
synchronized(this) {
ErrorInfo reason = newState.reason; if(reason == null) reason = newStateInfo.defaultErrorInfo;
if(newState.state == state.state) {
Log.v(TAG, "setState(): unchanged " + newState.state);
return false;
}
ErrorInfo reason = newState.reason;
if(reason == null) {
reason = newStateInfo.defaultErrorInfo;
}
Log.v(TAG, "setState(): setting " + newState.state + "; reason " + reason);
change = new ConnectionStateListener.ConnectionStateChange(state.state, newState.state, newStateInfo.timeout, reason);
newStateInfo.host = newState.currentHost;
state = newStateInfo;
Expand All @@ -237,6 +249,10 @@ private void setState(StateIndication newState) {
/* any state change clears pending reauth flag */
pendingReauth = false;
}
if(state.terminal) {
clearTransport();
stopThread();
}
}

/* broadcast state change */
Expand Down Expand Up @@ -281,6 +297,7 @@ private void setState(StateIndication newState) {
}
}
}
return true;
}

public void requestState(ConnectionState state) {
Expand Down Expand Up @@ -446,13 +463,16 @@ public void onAuthError(ErrorInfo errorInfo) {
* @throws AblyException
*/
public void onMessage(ITransport transport, ProtocolMessage message) throws AblyException {
if (transport != null && this.transport != transport)
if (transport != null && this.transport != transport) {
return;
if (Log.level <= Log.VERBOSE)
Log.v(TAG, "onMessage(): " + message.action + ": " + new String(ProtocolSerializer.writeJSON(message)));
}
if (Log.level <= Log.VERBOSE) {
Log.v(TAG, "onMessage() (transport = " + transport + "): " + message.action + ": " + new String(ProtocolSerializer.writeJSON(message)));
}
try {
if(protocolListener != null)
if(protocolListener != null) {
protocolListener.onRawMessageRecv(message);
}
switch(message.action) {
case heartbeat:
onHeartbeat(message);
Expand Down Expand Up @@ -621,7 +641,7 @@ private boolean startThread() {
boolean creating = false;
synchronized(this) {
if(mgrThread == null) {
mgrThread = new Thread(this);
mgrThread = new CMThread();
state = states.get(ConnectionState.initialized);
creating = true;
}
Expand All @@ -636,6 +656,13 @@ private boolean startThread() {
return creating;
}

private void stopThread() {
if(mgrThread != null) {
mgrThread.setExiting();
mgrThread = null;
}
}

private void handleStateRequest() {
boolean handled = false;
switch(requestedState.state) {
Expand Down Expand Up @@ -746,9 +773,8 @@ private void handleStateChange(StateIndication stateChange) {
}
}
if(stateChange != null) {
if (stateChange.state != state.state) {
setState(stateChange);
} else if (stateChange.state == ConnectionState.connected) {
boolean changed = setState(stateChange);
if (!changed && stateChange.state == ConnectionState.connected) {
/* connected is special case because we want to deliver reauth notifications to listeners as an update */
connection.emitUpdate(null);
}
Expand Down Expand Up @@ -800,75 +826,72 @@ private void tryWait(long timeout) {
}
}

public void run() {
Thread thisThread = Thread.currentThread();
while(!state.terminal) {

/*
* Until we've reached a terminal state we:
* - get a state change;
* - enact that change
*/
StateIndication stateChange = null;

/* Hold the lock until we obtain a state change */
synchronized(this) {
/* if we're initialising, then tell the starting thread that
* we're ready to receive events */
if(state.state == ConnectionState.initialized) {
synchronized(thisThread) {
thisThread.notify();
class CMThread extends Thread {

private boolean exiting = false;
private void setExiting() {
exiting = true;
}

public void run() {
ConnectionManager cm = ConnectionManager.this;
while(!exiting) {
/*
* Until we're commited to exit we:
* - get a state change;
* - enact that change
*/
StateIndication stateChange = null;

/* Hold the lock until we obtain a state change */
synchronized(cm) {
/* if we're initialising, then tell the starting thread that
* we're ready to receive events */
if (state.state == ConnectionState.initialized) {
synchronized(this) {
notify();
}
}
}

while(!state.terminal && stateChange == null) {
/* wait for a state change event or for expiry of the current state */
tryWait(state.timeout);
while(!exiting && stateChange == null) {
/* wait for a state change event or for expiry of the current state */
tryWait(state.timeout);

/* if during the wait some action was requested, handle it */
if(requestedState != null) {
handleStateRequest();
continue;
}
/* if during the wait some action was requested, handle it */
if (requestedState != null) {
handleStateRequest();
continue;
}

/* if during the wait we were told that a transition
* needs to be enacted, handle that (outside the lock) */
if(indicatedState != null) {
stateChange = indicatedState;
indicatedState = null;
break;
}
/* if during the wait we were told that a transition
* needs to be enacted, handle that (outside the lock) */
if (indicatedState != null) {
stateChange = indicatedState;
indicatedState = null;
break;
}

/* if our state wants us to retry on timer expiry, do that */
if(state.retry && !suppressRetry) {
requestState(ConnectionState.connecting);
continue;
}
/* if our state wants us to retry on timer expiry, do that */
if (state.retry && !suppressRetry) {
requestState(ConnectionState.connecting);
continue;
}

if(pendingReauth) {
handleReauth();
break;
}
if (pendingReauth) {
handleReauth();
break;
}

/* no indicated state or requested action, so the timer
* expired while we were in the connecting/closing state */
stateChange = checkSuspend(new StateIndication(ConnectionState.disconnected, REASON_TIMEDOUT));
/* no indicated state or requested action, so the timer
* expired while we were in the connecting/closing state */
stateChange = checkSuspend(new StateIndication(ConnectionState.disconnected, REASON_TIMEDOUT));
}
}
}
if(exiting) { break; }

/* Enact the change without the lock */
if(stateChange != null) {
handleStateChange(stateChange);
}
}

/* we're in a terminal state; exit this thread */
synchronized(this) {
if(mgrThread == thisThread) {
mgrThread = null;
if(transport != null) {
transport.close(false);
transport = null;
/* Enact the change without the lock */
if (stateChange != null) {
handleStateChange(stateChange);
}
}
}
Expand Down Expand Up @@ -988,14 +1011,22 @@ private void closeImpl(StateIndication request) {
transport.abort(e.errorInfo);
}
} else {
/* just abort the transport */
/* just close the transport */
Log.v(TAG, "Aborting incomplete transport due to close()");
transport.abort(REASON_CLOSED);
transport.close(false);
}
transport = null;
}
notifyState(new StateIndication(ConnectionState.closed, null));
}

private void clearTransport() {
if(transport != null) {
transport.close(false);
transport = null;
}
}

/**
* Determine whether or not the client has connection to the network
* without reference to a specific ably host. This is to determine whether
Expand Down Expand Up @@ -1250,7 +1281,6 @@ private boolean isFatalError(ErrorInfo err) {
* private members
******************/

private Thread mgrThread;
final AblyRealtime ably;
private final ClientOptions options;
private final Connection connection;
Expand All @@ -1260,6 +1290,7 @@ private boolean isFatalError(ErrorInfo err) {
private final HashSet<Object> heartbeatWaiters = new HashSet<Object>();
private final Hosts hosts;

private CMThread mgrThread;
private StateInfo state;
private StateIndication indicatedState, requestedState;
private ConnectParams pendingConnect;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,46 @@ public void onConnectionStateChanged(ConnectionStateChange state) {
}
}

/**
* Connect, and then perform a close();
* verify that the closed state is reached, and immediately
* reconnect; verify that it reconnects successfully
*/
@Test
public void connectionmanager_restart_race() {
try {
ClientOptions opts = createOptions(testVars.keys[0].keyStr);
final AblyRealtime ably = new AblyRealtime(opts);
ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection);

ably.connection.once(ConnectionEvent.connected, new ConnectionStateListener() {
@Override
public void onConnectionStateChanged(ConnectionStateChange state) {
ably.close();
}
});

connectionWaiter.waitFor(ConnectionState.closed);
assertEquals("Verify closed state is reached", ConnectionState.closed, ably.connection.state);
connectionWaiter.reset();

/* reconnect */
ably.connect();

/* verify the connection is reestablished */
connectionWaiter.waitFor(ConnectionState.connected);
assertEquals("Verify connected state is reached", ConnectionState.connected, ably.connection.state);

/* close the connection */
ably.close();
connectionWaiter.waitFor(ConnectionState.closed);
assertEquals("Verify closed state is reached", ConnectionState.closed, ably.connection.state);
} catch (AblyException e) {
e.printStackTrace();
fail("init0: Unexpected exception instantiating library");
}
}

/**
* Connect, and then perform a close() from the calling ConnectionManager context;
* verify that the closed state is reached, and the connectionmanager thread has exited
Expand Down

0 comments on commit 59100ab

Please sign in to comment.