diff --git a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java index 4c45025b9..e1bf95772 100644 --- a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java +++ b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java @@ -264,41 +264,45 @@ private boolean setState(StateIndication newState) { /* if now connected, send queued messages, etc */ if(state.sendEvents) { sendQueuedMessages(); - for(Channel channel : ably.channels.values()) { - channel.setConnected(); - } - } else { - if(!state.queueEvents) { - failQueuedMessages(state.defaultErrorInfo); - } - for(Channel channel : ably.channels.values()) { - switch (state.state) { - case disconnected: - /* (RTL3e) If the connection state enters the - * DISCONNECTED state, it will have no effect on the - * channel states. */ - break; - case failed: - /* (RTL3a) If the connection state enters the FAILED - * state, then an ATTACHING or ATTACHED channel state - * will transition to FAILED, set the - * Channel#errorReason and emit the error event. */ - channel.setConnectionFailed(change.reason); - break; - case closed: - /* (RTL3b) If the connection state enters the CLOSED - * state, then an ATTACHING or ATTACHED channel state - * will transition to DETACHED. */ - channel.setConnectionClosed(state.defaultErrorInfo); - break; - case suspended: - /* (RTL3c) If the connection state enters the SUSPENDED - * state, then an ATTACHING or ATTACHED channel state - * will transition to SUSPENDED. */ - channel.setSuspended(state.defaultErrorInfo, true); - break; + } else if(!state.queueEvents) { + failQueuedMessages(state.defaultErrorInfo); + } + switch (state.state) { + case connected: + for(Channel channel : ably.channels.values()) { + channel.setConnected(); } - } + break; + case disconnected: + /* (RTL3e) If the connection state enters the + * DISCONNECTED state, it will have no effect on the + * channel states. */ + break; + case failed: + /* (RTL3a) If the connection state enters the FAILED + * state, then an ATTACHING or ATTACHED channel state + * will transition to FAILED, set the + * Channel#errorReason and emit the error event. */ + for(Channel channel : ably.channels.values()) { + channel.setConnectionFailed(change.reason); + } + break; + case closed: + /* (RTL3b) If the connection state enters the CLOSED + * state, then an ATTACHING or ATTACHED channel state + * will transition to DETACHED. */ + for(Channel channel : ably.channels.values()) { + channel.setConnectionClosed(state.defaultErrorInfo); + } + break; + case suspended: + /* (RTL3c) If the connection state enters the SUSPENDED + * state, then an ATTACHING or ATTACHED channel state + * will transition to SUSPENDED. */ + for(Channel channel : ably.channels.values()) { + channel.setSuspended(state.defaultErrorInfo, true); + } + break; } return true; } @@ -308,32 +312,23 @@ public void requestState(ConnectionState state) { } public synchronized void requestState(StateIndication state) { - Log.v(TAG, "requestState(): requesting " + state.state + "; id = " + connection.key); + Log.v(TAG, "requestState(): requesting " + state.state + "; id = " + connection.id); requestedState = state; notify(); } - synchronized void notifyState(ITransport transport, StateIndication state) { - if(this.transport != transport) { - Log.v(TAG, "notifyState: notification received for superseded transport"); - return; - } - /* if this transition signifies the end of the transport, clear the transport */ - if(states.get(state.state).terminal) { - this.transport = null; - } - notifyState(state); - } - - synchronized void notifyState(StateIndication state) { - Log.v(TAG, "notifyState(): notifying " + state.state + "; id = " + connection.key); - if (Thread.currentThread() == mgrThread) { - handleStateChange(state); - } - else { - indicatedState = state; - notify(); + private synchronized void requestState(ITransport transport, StateIndication state) { + if(transport != null) { + if(this.transport != transport) { + Log.v(TAG, "requestState: notification received for superseded transport"); + return; + } + /* if this transition signifies the end of the transport, clear the transport */ + if(states.get(state.state).terminal) { + this.transport = null; + } } + requestState(state); } public void ping(final CompletionListener listener) { @@ -482,16 +477,18 @@ public void onMessage(ITransport transport, ProtocolMessage message) throws Ably break; case error: ErrorInfo reason = message.error; - if(reason == null) + if(reason == null) { Log.e(TAG, "onMessage(): ERROR message received (no error detail)"); - else + } else { Log.e(TAG, "onMessage(): ERROR message received; message = " + reason.message + "; code = " + reason.code); + } - /* an error message may signify an error state in a channel, or in the connection */ - if(message.channel != null) + /* an error message may signify an error state in a channel, or in the connection */ + if(message.channel != null) { onChannelMessage(message); - else + } else { onError(message); + } break; case connected: onConnected(message); @@ -535,19 +532,6 @@ private void onChannelMessage(ProtocolMessage message) { } private synchronized void onConnected(ProtocolMessage message) { - /* Set the http host to try and ensure that realtime and rest use the - * same region: - * - if we're on the default realtime host, set http to the default - * rest host - * - otherwise (the realtime host has been overridden or has fallen - * back), set http to the same as realtime. - */ - if (pendingConnect.host == options.realtimeHost) { - ably.httpCore.setPreferredHost(options.restHost); - } else { - ably.httpCore.setPreferredHost(pendingConnect.host); - } - /* if the returned connection id differs from * the existing connection id, then this means * we need to suspend all existing attachments to @@ -594,12 +578,12 @@ private synchronized void onConnected(ProtocolMessage message) { try { ably.auth.setClientId(clientId); } catch (AblyException e) { - notifyState(transport, new StateIndication(ConnectionState.failed, e.errorInfo)); + requestState(transport, new StateIndication(ConnectionState.failed, e.errorInfo)); } /* indicated connected state */ setSuspendTime(); - notifyState(new StateIndication(ConnectionState.connected, error)); + requestState(new StateIndication(ConnectionState.connected, error)); } private synchronized void onDisconnected(ProtocolMessage message) { @@ -611,7 +595,7 @@ private synchronized void onClosed(ProtocolMessage message) { this.onError(message); } else { connection.key = null; - notifyState(new StateIndication(ConnectionState.closed, null)); + requestState(new StateIndication(ConnectionState.closed, null)); } } @@ -620,7 +604,7 @@ private synchronized void onError(ProtocolMessage message) { ErrorInfo reason = message.error; ably.auth.onAuthError(reason); ConnectionState destinationState = isFatalError(reason) ? ConnectionState.failed : ConnectionState.disconnected; - notifyState(transport, new StateIndication(destinationState, reason)); + requestState(transport, new StateIndication(destinationState, reason)); } private void onAck(ProtocolMessage message) { @@ -676,50 +660,49 @@ private void stopThread() { } } - private void handleStateRequest() { - boolean handled = false; - switch(requestedState.state) { + /** + * Handle a request to transition to a new state + * @return StateIndication: the resulting state, if changed; null otherwise + */ + private StateIndication handleStateRequest() { + /* the resulting state will by default be that requested */ + StateIndication transitionState = requestedState; + + /* clear requestState here; actions below may trigger a subsequent state change */ + requestedState = null; + + /* handle connection request */ + if(transitionState.state == ConnectionState.connecting) { + if(connectImpl(transitionState)) { + return transitionState; + } else { + return new StateIndication(ConnectionState.failed, new ErrorInfo("Connection failed; no host available", 404, 80000), null, requestedState.currentHost); + } + } + /* no other requests can move from a terminal state */ + if(state.terminal) { + /* no change */ + return null; + } + + switch(transitionState.state) { case disconnected: if(transport != null) { transport.close(false); - handled = true; } break; case failed: if(transport != null) { - transport.abort(requestedState.reason); - handled = true; + transport.abort(transitionState.reason); } break; - case closed: - /* if already failed, don't transition to closed */ - if(state.state == ConnectionState.failed) { - handled = true; - break; - } - - if(transport != null) { - transport.close(state.state == ConnectionState.connected); - handled = true; - } - break; - case connecting: - if(!connectImpl(requestedState)) { - indicatedState = new StateIndication(ConnectionState.failed, new ErrorInfo("Connection failed; no host available", 404, 80000), null, requestedState.currentHost); - } - - handled = true; - break; case closing: - closeImpl(requestedState); - handled = true; + closeImpl(transitionState); + break; default: + break; } - if(!handled) { - /* the transport wasn't there, so we just transition directly */ - indicatedState = requestedState; - } - requestedState = null; + return transitionState; } private boolean checkConnectionStale() { @@ -828,7 +811,7 @@ private StateIndication checkSuspend(StateIndication stateChange) { } private void tryWait(long timeout) { - if(requestedState == null && indicatedState == null && !pendingReauth) { + if(requestedState == null && !pendingReauth) { try { if(timeout == 0) { wait(); @@ -871,17 +854,14 @@ public void run() { tryWait(state.timeout); /* if during the wait some action was requested, handle it */ - if (requestedState != null) { - handleStateRequest(); + if (pendingReauth) { + handleReauth(); continue; } - /* if during the wait we were told that a transition - * needs to be enacted, handle that (outside the lock) */ - if (indicatedState != null) { - stateChange = indicatedState; - indicatedState = null; - break; + if (requestedState != null) { + stateChange = handleStateRequest(); + continue; } /* if our state wants us to retry on timer expiry, do that */ @@ -890,11 +870,6 @@ public void run() { continue; } - if (pendingReauth) { - handleReauth(); - break; - } - /* no indicated state or requested action, so the timer * expired while we were in the connecting/closing state */ stateChange = checkSuspend(new StateIndication(ConnectionState.disconnected, REASON_TIMEDOUT)); @@ -928,8 +903,8 @@ private void handleReauth() { errorInfo = e.errorInfo; } - /* report error in UPDATE event */ - if (state.state == ConnectionState.connected && errorInfo != null) { + /* report connection state in UPDATE event */ + if (state.state == ConnectionState.connected) { connection.emitUpdate(errorInfo); } } @@ -949,13 +924,26 @@ public void onTransportAvailable(ITransport transport, TransportParams params) { @Override public synchronized void onTransportUnavailable(ITransport transport, TransportParams params, ErrorInfo reason) { + this.onTransportUnavailable(transport, params, reason, ConnectionState.disconnected); + } + + @Override + public synchronized void onTransportUnavailable(ITransport transport, TransportParams params, ErrorInfo reason, ConnectionState state) { if (this.transport != transport) { /* This is from a transport that we have already abandoned. */ Log.v(TAG, "onTransportUnavailable: ignoring disconnection event from superseded transport"); return; } + if(reason == null) { + reason = states.get(state).defaultErrorInfo; + } + if(state == ConnectionState.failed) { + Log.e(TAG, "onTransportUnavailable: unexpected exception in WsClient: " + reason.message); + } else { + Log.i(TAG, "onTransportUnavailable: disconnected: " + reason.message); + } ably.auth.onAuthError(reason); - notifyState(new StateIndication(ConnectionState.disconnected, reason, null, transport.getHost())); + requestState(new StateIndication(state, reason, null, transport.getHost())); this.transport = null; } @@ -985,9 +973,6 @@ private boolean connectImpl(StateIndication request) { pendingConnect.host = host; lastUsedHost = host; - /* enter the connecting state */ - notifyState(request); - /* try the connection */ ITransport transport; try { @@ -1011,9 +996,6 @@ private boolean connectImpl(StateIndication request) { private void closeImpl(StateIndication request) { boolean isConnected = state.state == ConnectionState.connected; - /* enter the closing state */ - notifyState(request); - /* close or abort transport */ if(transport != null) { if(isConnected) { @@ -1026,12 +1008,12 @@ private void closeImpl(StateIndication request) { } } else { /* just close the transport */ - Log.v(TAG, "Aborting incomplete transport due to close()"); + Log.v(TAG, "Closing incomplete transport"); transport.close(false); } transport = null; } - notifyState(new StateIndication(ConnectionState.closed, null)); + requestState(new StateIndication(ConnectionState.closed, null)); } private void clearTransport() { @@ -1346,7 +1328,7 @@ private boolean isFatalError(ErrorInfo err) { private CMThread mgrThread; private StateInfo state; private ErrorInfo stateError; - private StateIndication indicatedState, requestedState; + private StateIndication requestedState; private ConnectParams pendingConnect; private boolean pendingReauth; private boolean suppressRetry; /* for tests only; modified via reflection */ diff --git a/lib/src/main/java/io/ably/lib/transport/ITransport.java b/lib/src/main/java/io/ably/lib/transport/ITransport.java index 3480c7ea0..ca8dd28a8 100644 --- a/lib/src/main/java/io/ably/lib/transport/ITransport.java +++ b/lib/src/main/java/io/ably/lib/transport/ITransport.java @@ -1,5 +1,6 @@ package io.ably.lib.transport; +import io.ably.lib.realtime.ConnectionState; import io.ably.lib.types.AblyException; import io.ably.lib.types.ClientOptions; import io.ably.lib.types.ErrorInfo; @@ -84,6 +85,7 @@ public Param[] getConnectParams(Param[] baseParams) { public static interface ConnectListener { public void onTransportAvailable(ITransport transport, TransportParams params); public void onTransportUnavailable(ITransport transport, TransportParams params, ErrorInfo reason); + public void onTransportUnavailable(ITransport transport, TransportParams params, ErrorInfo reason, ConnectionState state); } /** diff --git a/lib/src/main/java/io/ably/lib/transport/WebSocketTransport.java b/lib/src/main/java/io/ably/lib/transport/WebSocketTransport.java index 802d8410a..a3af42132 100644 --- a/lib/src/main/java/io/ably/lib/transport/WebSocketTransport.java +++ b/lib/src/main/java/io/ably/lib/transport/WebSocketTransport.java @@ -67,6 +67,8 @@ public void connect(ConnectListener connectListener) { Param[] connectParams = params.getConnectParams(authParams); if(connectParams.length > 0) wsUri = HttpUtils.encodeParams(wsUri, connectParams); + + Log.d(TAG, "connect(); wsUri = " + wsUri); synchronized(this) { wsConnection = new WsClient(URI.create(wsUri)); if(isTls) { @@ -88,6 +90,7 @@ public void connect(ConnectListener connectListener) { @Override public void close(boolean sendClose) { + Log.d(TAG, "close(); sendClose = " + sendClose); synchronized(this) { if(wsConnection != null) { if(sendClose) { @@ -105,17 +108,19 @@ public void close(boolean sendClose) { @Override public void abort(ErrorInfo reason) { + Log.d(TAG, "abort(); reason = " + reason); synchronized(this) { if(wsConnection != null) { wsConnection.close(); wsConnection = null; } } - connectionManager.notifyState(this, new StateIndication(ConnectionState.failed, reason)); + connectListener.onTransportUnavailable(this, params, reason, ConnectionState.failed); } @Override public void send(ProtocolMessage msg) throws AblyException { + Log.d(TAG, "send(); action = " + msg.action); try { if(channelBinaryMode) { byte[] encodedMsg = ProtocolSerializer.writeMsgpack(msg); @@ -151,17 +156,17 @@ public WsClient(URI serverUri) { @Override public void onOpen(ServerHandshake handshakedata) { - if(connectListener != null) { - connectListener.onTransportAvailable(WebSocketTransport.this, params); - connectListener = null; - } + Log.d(TAG, "onOpen()"); + connectListener.onTransportAvailable(WebSocketTransport.this, params); flagActivity(); } @Override public void onMessage(ByteBuffer blob) { try { - connectionManager.onMessage(WebSocketTransport.this, ProtocolSerializer.readMsgpack(blob.array())); + ProtocolMessage msg = ProtocolSerializer.readMsgpack(blob.array()); + Log.d(TAG, "onMessage(): msg (binary) = " + msg); + connectionManager.onMessage(WebSocketTransport.this, msg); } catch (AblyException e) { String msg = "Unexpected exception processing received binary message"; Log.e(TAG, msg, e); @@ -172,7 +177,9 @@ public void onMessage(ByteBuffer blob) { @Override public void onMessage(String string) { try { - connectionManager.onMessage(WebSocketTransport.this, ProtocolSerializer.fromJSON(string)); + ProtocolMessage msg = ProtocolSerializer.fromJSON(string); + Log.d(TAG, "onMessage(): msg (text) = " + msg); + connectionManager.onMessage(WebSocketTransport.this, msg); } catch (AblyException e) { String msg = "Unexpected exception processing received text message"; Log.e(TAG, msg, e); @@ -183,6 +190,7 @@ public void onMessage(String string) { /* This allows us to detect a websocket ping, so we don't need Ably pings. */ @Override public void onWebsocketPing( WebSocket conn, Framedata f ) { + Log.d(TAG, "onWebsocketPing()"); /* Call superclass to ensure the pong is sent. */ super.onWebsocketPing( conn, f ); flagActivity(); @@ -190,7 +198,7 @@ public void onWebsocketPing( WebSocket conn, Framedata f ) { @Override public void onClose(int wsCode, String wsReason, boolean remote) { - flagActivity(); + Log.d(TAG, "onClose(): wsCode = " + wsCode + "; wsReason = " + wsReason + "; remote = " + remote); ConnectionState newState; ErrorInfo reason; switch(wsCode) { @@ -234,18 +242,13 @@ public void onClose(int wsCode, String wsReason, boolean remote) { synchronized(WebSocketTransport.this) { wsConnection = null; } - connectionManager.notifyState(WebSocketTransport.this, new StateIndication(newState, reason)); + connectListener.onTransportUnavailable(WebSocketTransport.this, params, reason, newState); dispose(); } @Override public void onError(Exception e) { - String msg = "Unexpected exception in WsClient"; - Log.e(TAG, msg, e); - if(connectListener != null) { - connectListener.onTransportUnavailable(WebSocketTransport.this, params, new ErrorInfo(e.getMessage(), 503, 80000)); - connectListener = null; - } + connectListener.onTransportUnavailable(WebSocketTransport.this, params, new ErrorInfo(e.getMessage(), 503, 80000)); } private void dispose() { diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java index fe194a1df..97e4ffc3b 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java @@ -1065,72 +1065,6 @@ public void ensure_detach_with_error_does_not_move_to_failed() { } } - @Test - public void detach_on_clean_connection_preserves_channel() { - AblyRealtime ably = null; - try { - ClientOptions opts = createOptions(testVars.keys[0].keyStr); - - /* connect with these options to get a valid connection recover key */ - ably = new AblyRealtime(opts); - - /* wait until connected */ - (new ConnectionWaiter(ably.connection)).waitFor(ConnectionState.connected); - assertEquals("Verify connected state reached", ably.connection.state, ConnectionState.connected); - - /* keep connection details and close */ - String oldConnectionKey = ably.connection.key; - String recoverConnectionKey = ably.connection.recoveryKey; - ably.close(); - - /* establish a new connection */ - ably = new AblyRealtime(opts); - - /* wait until connected */ - (new ConnectionWaiter(ably.connection)).waitFor(ConnectionState.connected); - assertEquals("Verify connected state reached", ably.connection.state, ConnectionState.connected); - - /* create a channel and attach */ - final String channelName = "detach_on_clean_connection_preserves_channel"; - final Channel channel = ably.channels.get(channelName); - channel.attach(); - (new ChannelWaiter(channel)).waitFor(ChannelState.attached); - assertEquals("Verify attached state reached", channel.state, ChannelState.attached); - - /* disconnect the connection, without closing; - * NOTE this depends on knowledge of the internal structure - * of the library, to simulate a dropped transport without - * causing the connection itself to be disposed */ - ably.connection.connectionManager.requestState(ConnectionState.failed); - - /* wait */ - try { Thread.sleep(5000L); } catch(InterruptedException e) {} - - /* reconnect the connection; this time attempting to recover the (now-closed) recovery key */ - ably.options.recover = recoverConnectionKey; - ably.connection.key = null; - ably.connection.connect(); - - /* wait until connected */ - (new ConnectionWaiter(ably.connection)).waitFor(ConnectionState.connected); - assertEquals("Verify connected state reached", ably.connection.state, ConnectionState.connected); - assertNotEquals("Verify new connection established", oldConnectionKey, ably.connection.id); - assertNotNull("Verify error was returned with connected state", ably.connection.reason); - - /* verify existing channel is failed but not removed */ - (new ChannelWaiter(channel)).waitFor(ChannelState.failed); - assertEquals("Verify failed state reached", channel.state, ChannelState.failed); - assertTrue("Verify the original channel remains in the channel set", ably.channels.get(channelName) == channel); - - } catch (AblyException e) { - e.printStackTrace(); - fail("init0: Unexpected exception instantiating library"); - } finally { - if(ably != null) - ably.close(); - } - } - @Test public void channel_state_on_connection_suspended() { AblyRealtime ably = null;