Skip to content

Commit

Permalink
RTL13b: ensure that detached+error responses form the server do not r…
Browse files Browse the repository at this point in the history
…esult in a busy loop of attach requests
  • Loading branch information
paddybyers committed Jul 24, 2018
1 parent aec15fb commit e89f420
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 13 deletions.
36 changes: 27 additions & 9 deletions lib/src/main/java/io/ably/lib/realtime/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ private void attachImpl(final CompletionListener listener) throws AblyException
throw AblyException.fromErrorInfo(connectionManager.getStateErrorInfo());

/* send attach request and pending state */
Log.v(TAG, "attach(); channel = " + name + "; sending ATTACH request");
ProtocolMessage attachMessage = new ProtocolMessage(Action.attach, this.name);
try {
if (listener != null) {
Expand Down Expand Up @@ -944,16 +945,33 @@ void onChannelMessage(ProtocolMessage msg) {
case detach:
case detached:
ChannelState oldState = state;
setDetached((msg.error != null) ? msg.error : REASON_NOT_ATTACHED);
if(oldState == ChannelState.attaching || oldState == ChannelState.attached || oldState == ChannelState.suspended) {
/* Unexpected detach, reattach when possible */
Log.v(TAG, String.format("Server initiated detach for channel %s", name));
try {
attachWithTimeout(null);
} catch (AblyException e) {
switch(oldState) {
case attached:
/* Unexpected detach, reattach when possible */
setDetached((msg.error != null) ? msg.error : REASON_NOT_ATTACHED);
Log.v(TAG, String.format("Server initiated detach for channel %s; attempting reattach", name));
try {
attachWithTimeout(null);
} catch (AblyException e) {
/* Send message error */
setDetached(e.errorInfo);
}
setDetached(e.errorInfo);
}
break;
case attaching:
/* RTL13b says we need to be suspended, but continue to retry */
Log.v(TAG, String.format("Server initiated detach for channel %s whilst attaching; moving to suspended", name));
setSuspended(msg.error, true);
reattachAfterTimeout();
break;
case detaching:
setDetached((msg.error != null) ? msg.error : REASON_NOT_ATTACHED);
break;
case detached:
case suspended:
case failed:
default:
/* do nothing */
break;
}
break;
case message:
Expand Down
149 changes: 145 additions & 4 deletions lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,7 @@ public void channel_state_on_connection_suspended() {
* Establish connection, attach channel, simulate sending attached and detached messages
* from the server, test correct behaviour
*
* Tests RTL12, RTL13
* Tests RTL12, RTL13a
*/
@Test
public void channel_server_initiated_attached_detached() throws AblyException {
Expand Down Expand Up @@ -1243,8 +1243,9 @@ public void onChannelStateChanged(ChannelStateChange stateChange) {
* Tests features RTL4c, RTL4f, RTL5f, RTL5e
*/
@Test
public void channel_reattach_failed() {
public void channel_attach_retry_failed() {
AblyRealtime ably = null;
String channelName = "channel_attach_retry_failed_" + testParams.name;
String oldTransportFractory = Defaults.TRANSPORT;
long oldRealtimeTimeout = Defaults.realtimeRequestTimeout;
try {
Expand All @@ -1265,7 +1266,7 @@ public void channel_reattach_failed() {
/* Block send() and attach */
MockWebsocketFactory.blockSend();

Channel channel = ably.channels.get("channel_reattach_test");
Channel channel = ably.channels.get(channelName);
ChannelWaiter channelWaiter = new ChannelWaiter(channel);
channel.attach();

Expand Down Expand Up @@ -1352,5 +1353,145 @@ public void onError(ErrorInfo reason) {
Defaults.realtimeRequestTimeout = oldRealtimeTimeout;
}
}
}

/*
* Initiate connection, and attach to a channel. Simulate a server-initiated detach of the channel
* and verify that the client attempts to reattach. Block the transport to prevent that from
* succeeding. Verify that the channel enters the suspended state with the appropriate error.
*
* Tests features RTL13b
*/
@Test
public void channel_reattach_failed_timeout() {
AblyRealtime ably = null;
final String channelName = "channel_reattach_failed_timeout_" + testParams.name;
String oldTransportFractory = Defaults.TRANSPORT;
long oldRealtimeTimeout = Defaults.realtimeRequestTimeout;
try {
/* Mock transport to block send */
Defaults.TRANSPORT = MockWebsocketFactory.class.getName();
/* Reduce timeout for test to run faster */
Defaults.realtimeRequestTimeout = 1000;
MockWebsocketFactory.allowSend();

ClientOptions opts = createOptions(testVars.keys[0].keyStr);
opts.channelRetryTimeout = 1000;

ably = new AblyRealtime(opts);

ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection);
connectionWaiter.waitFor(ConnectionState.connected);

Channel channel = ably.channels.get(channelName);
ChannelWaiter channelWaiter = new ChannelWaiter(channel);
channel.attach();

channelWaiter.waitFor(ChannelState.attached);

/* Block send() */
MockWebsocketFactory.blockSend();

/* Inject detached message as if from the server */
ProtocolMessage detachedMessage = new ProtocolMessage() {{
action = Action.detached;
channel = channelName;
}};
ably.connection.connectionManager.onMessage(null, detachedMessage);

/* Should get to suspended soon because send() is blocked */
ErrorInfo suspendReason = channelWaiter.waitFor(ChannelState.suspended);
assertEquals("Verify the suspended event contains the detach reason", 91200, suspendReason.code);

/* Unblock send(), and expect a transition to attached */
MockWebsocketFactory.allowSend();
channelWaiter.waitFor(ChannelState.attached);

} catch(AblyException e) {
e.printStackTrace();
fail("Unexpected exception");
} finally {
if (ably != null)
ably.close();
/* Restore default values to run other tests */
Defaults.TRANSPORT = oldTransportFractory;
Defaults.realtimeRequestTimeout = oldRealtimeTimeout;
}
}

/*
* Initiate connection, and attach to a channel. Simulate a server-initiated detach of the channel
* and verify that the client attempts to reattach. Block the transport and respond instead with
* an attach error. Verify that the channel enters the suspended state with the appropriate error.
*
* Tests features RTL13b
*/
@Test
public void channel_reattach_failed_error() {
AblyRealtime ably = null;
final String channelName = "channel_reattach_failed_error_" + testParams.name;
final int errorCode = 12345;
String oldTransportFractory = Defaults.TRANSPORT;
long oldRealtimeTimeout = Defaults.realtimeRequestTimeout;
try {
/* Mock transport to block send */
Defaults.TRANSPORT = MockWebsocketFactory.class.getName();
/* Reduce timeout for test to run faster */
Defaults.realtimeRequestTimeout = 5000;
MockWebsocketFactory.allowSend();

ClientOptions opts = createOptions(testVars.keys[0].keyStr);
opts.channelRetryTimeout = 1000;

ably = new AblyRealtime(opts);

ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection);
connectionWaiter.waitFor(ConnectionState.connected);

Channel channel = ably.channels.get(channelName);
ChannelWaiter channelWaiter = new ChannelWaiter(channel);
channel.attach();

channelWaiter.waitFor(ChannelState.attached);

/* Block send() */
MockWebsocketFactory.blockSend();

/* Inject detached message as if from the server */
ProtocolMessage detachedMessage = new ProtocolMessage() {{
action = Action.detached;
channel = channelName;
error = new ErrorInfo("Test error", errorCode);
}};
ably.connection.connectionManager.onMessage(null, detachedMessage);

/* wait for the client reattempt attachment */
channelWaiter.waitFor(ChannelState.attaching);

/* Inject detached+error message as if from the server */
ProtocolMessage errorMessage = new ProtocolMessage() {{
action = Action.detached;
channel = channelName;
error = new ErrorInfo("Test error", errorCode);
}};
ably.connection.connectionManager.onMessage(null, errorMessage);

/* Should get to suspended soon because there was an error response to the attach attempt */
ErrorInfo suspendReason = channelWaiter.waitFor(ChannelState.suspended);
assertEquals("Verify the suspended event contains the detach reason", errorCode, suspendReason.code);

/* Unblock send(), and expect a transition to attached */
MockWebsocketFactory.allowSend();
channelWaiter.waitFor(ChannelState.attached);

} catch(AblyException e) {
e.printStackTrace();
fail("Unexpected exception");
} finally {
if (ably != null)
ably.close();
/* Restore default values to run other tests */
Defaults.TRANSPORT = oldTransportFractory;
Defaults.realtimeRequestTimeout = oldRealtimeTimeout;
}
}
}

0 comments on commit e89f420

Please sign in to comment.