Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issues/170: Fixed message serial out of sync after recover #175

Merged
merged 3 commits into from
Oct 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 26 additions & 11 deletions lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.ably.lib.types.ErrorInfo;
import io.ably.lib.types.ProtocolMessage;
import io.ably.lib.types.ProtocolMessage.Action;
import io.ably.lib.types.ProtocolSerializer;
import io.ably.lib.util.Log;

import java.util.ArrayList;
Expand Down Expand Up @@ -271,6 +272,8 @@ public void run() {
***************************************/

public void onMessage(ProtocolMessage message) throws AblyException {
if (Log.level <= Log.VERBOSE)
Log.v(TAG, "onMessage(): " + new String(ProtocolSerializer.writeJSON(message)));
try {
if(protocolListener != null)
protocolListener.onRawMessage(message);
Expand Down Expand Up @@ -333,10 +336,17 @@ private synchronized void onConnected(ProtocolMessage message) {

/* set the new connection id */
connection.key = message.connectionKey;
if (!message.connectionId.equals(connection.id)) {
/* The connection id has changed. Reset the message serial and the
* pending message queue (which fails the messages currently in
* there). */
pendingMessages.reset(msgSerial,
new ErrorInfo("Connection resume failed", 500, 50000));
msgSerial = 0;
}
connection.id = message.connectionId;
if(message.connectionSerial != null)
connection.serial = message.connectionSerial.longValue();
msgSerial = 0;

/* indicated connected state */
setSuspendTime();
Expand Down Expand Up @@ -768,16 +778,6 @@ private class PendingMessageQueue {
private long startSerial = 0L;
private ArrayList<QueuedMessage> queue = new ArrayList<QueuedMessage>();

public PendingMessageQueue() {
/* put startSerial to 0 every time the connection is closed */
connection.on(ConnectionState.closed, new ConnectionStateListener() {
@Override
public void onConnectionStateChanged(ConnectionStateListener.ConnectionStateChange state) {
startSerial = 0L;
}
});
}

public synchronized void push(QueuedMessage msg) {
queue.add(msg);
}
Expand All @@ -790,6 +790,8 @@ public void ack(long msgSerial, int count, ErrorInfo reason) {
* we can handle it gracefully by only processing the
* relevant portion of the response */
count -= (int)(startSerial - msgSerial);
if(count < 0)
count = 0;
msgSerial = startSerial;
}
if(msgSerial > startSerial) {
Expand Down Expand Up @@ -860,6 +862,19 @@ public synchronized void nack(long serial, int count, ErrorInfo reason) {
}
}
}

/**
* reset the pending message queue, failing any currently pending messages.
* Used when a resume fails and we get a different connection id.
* @param oldMsgSerial the next message serial number for the old
* connection, and thus one more than the highest message serial
* in the queue.
*/
public synchronized void reset(long oldMsgSerial, ErrorInfo err) {
nack(startSerial, (int)(oldMsgSerial - startSerial), err);
startSerial = 0;
}

}

/*******************
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ public void abort(ErrorInfo reason) {

@Override
public void send(ProtocolMessage msg) throws AblyException {
if (Log.level <= Log.VERBOSE)
Log.v(TAG, "send(): " + new String(ProtocolSerializer.writeJSON(msg)));
try {
if(channelBinaryMode)
wsConnection.send(ProtocolSerializer.writeMsgpack(msg));
Expand Down
103 changes: 103 additions & 0 deletions lib/src/test/java/io/ably/lib/test/realtime/RealtimeRecoverTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -236,4 +236,107 @@ public void recover_implicit_connect() {
}
}

/**
* Connect to the service using two library instances to set
* up separate send and recv connections.
* Disconnect+suspend and then reconnect the send connection; verify that
* each subsequent publish causes a CompletionListener call.
*/
@Test
public void recover_verify_publish() {
AblyRealtime ablyRx = null, ablyTx = null;
String channelName = "recover_verify_publish";
int messageCount = 5;
long delay = 200;
try {
TestVars testVars = Setup.getTestVars();
ClientOptions opts = testVars.createOptions(testVars.keys[0].keyStr);
ablyRx = new AblyRealtime(opts);
ablyTx = new AblyRealtime(opts);

/* create and attach channel to send on */
final Channel channelTx = ablyTx.channels.get(channelName);
channelTx.attach();
(new ChannelWaiter(channelTx)).waitFor(ChannelState.attached);
assertEquals("Verify attached state reached for tx", channelTx.state, ChannelState.attached);

/* create and attach channel to recv on */
final Channel channelRx = ablyRx.channels.get(channelName);
channelRx.attach();
(new ChannelWaiter(channelRx)).waitFor(ChannelState.attached);
assertEquals("Verify attached state reached for rx", channelRx.state, ChannelState.attached);

/* subscribe */
MessageWaiter messageWaiter = new MessageWaiter(channelRx);

/* publish first messages to the channel */
CompletionSet msgComplete1 = new CompletionSet();
for(int i = 0; i < messageCount; i++) {
channelTx.publish("test_event", "Test message (resume_simple) " + i, msgComplete1.add());
try { Thread.sleep(delay); } catch(InterruptedException e){}
}

/* wait for the publish callback to be called */
ErrorInfo[] errors = msgComplete1.waitFor();
assertTrue("Verify success from all message callbacks", errors.length == 0);

/* wait for the subscription callback to be called */
messageWaiter.waitFor(messageCount);
assertEquals("Verify message subscriptions all called", messageWaiter.receivedMessages.size(), messageCount);
messageWaiter.reset();

/* suspend the tx connection, without closing;
* NOTE this depends on knowledge of the internal structure
* of the library, to simulate a dropped transport without
* causing the connection itself to be disposed */
System.out.println("*** about to suspend tx connection");
ablyTx.connection.connectionManager.requestState(ConnectionState.suspended);

/* wait */
try { Thread.sleep(2000L); } catch(InterruptedException e) {}

/* reconnect the tx connection */
System.out.println("*** about to reconnect tx connection");
ablyTx.connection.connect();
(new ConnectionWaiter(ablyTx.connection)).waitFor(ConnectionState.connected);

/* need to manually attach the tx channel as connection was suspended */
System.out.println("*** tx connection now connected. About to recover channel");
channelTx.attach();
(new ChannelWaiter(channelTx)).waitFor(ChannelState.attached);
assertEquals("Verify attached state reached for tx again", channelTx.state, ChannelState.attached);
System.out.println("*** tx channel now attached. About to publish");

/* publish further messages to the channel */
CompletionSet msgComplete2 = new CompletionSet();
for(int i = 0; i < messageCount; i++) {
channelTx.publish("test_event", "Test message (resume_simple) " + i, msgComplete2.add());
try { Thread.sleep(delay); } catch(InterruptedException e){}
}

/* wait for the publish callback to be called. This never finishes if
* https://github.com/ably/ably-java/issues/170
* is not fixed. */
System.out.println("*** published. About to wait for callbacks");
errors = msgComplete2.waitFor();
System.out.println("*** done");
assertTrue("Verify success from all message callbacks", errors.length == 0);

/* wait for the subscription callback to be called */
messageWaiter.waitFor(messageCount);
assertEquals("Verify message subscriptions all called after reconnection", messageWaiter.receivedMessages.size(), messageCount);

} catch (AblyException e) {
e.printStackTrace();
fail("init0: Unexpected exception instantiating library");
} finally {
if(ablyTx != null)
ablyTx.close();
if(ablyRx != null)
ablyRx.close();
}
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.ably.lib.test.common.Setup;
import io.ably.lib.test.common.Helpers.ChannelWaiter;
import io.ably.lib.test.common.Helpers.CompletionSet;
import io.ably.lib.test.common.Helpers.ConnectionWaiter;
import io.ably.lib.test.common.Helpers.MessageWaiter;
import io.ably.lib.test.common.Setup.TestVars;
import io.ably.lib.types.AblyException;
Expand Down Expand Up @@ -393,4 +394,97 @@ public void resume_multiple_interval() {
}
}

/**
* Connect to the service using two library instances to set
* up separate send and recv connections.
* Disconnect and then reconnect the send connection; verify that
* each subsequent publish causes a CompletionListener call.
*/
@Test
public void resume_verify_publish() {
AblyRealtime ablyRx = null, ablyTx = null;
String channelName = "resume_verify_publish";
int messageCount = 5;
long delay = 200;
try {
TestVars testVars = Setup.getTestVars();
ClientOptions opts = testVars.createOptions(testVars.keys[0].keyStr);
ablyRx = new AblyRealtime(opts);
ablyTx = new AblyRealtime(opts);

/* create and attach channel to send on */
final Channel channelTx = ablyTx.channels.get(channelName);
channelTx.attach();
(new ChannelWaiter(channelTx)).waitFor(ChannelState.attached);
assertEquals("Verify attached state reached for tx", channelTx.state, ChannelState.attached);

/* create and attach channel to recv on */
final Channel channelRx = ablyRx.channels.get(channelName);
channelRx.attach();
(new ChannelWaiter(channelRx)).waitFor(ChannelState.attached);
assertEquals("Verify attached state reached for rx", channelRx.state, ChannelState.attached);

/* subscribe */
MessageWaiter messageWaiter = new MessageWaiter(channelRx);

/* publish first messages to the channel */
CompletionSet msgComplete1 = new CompletionSet();
for(int i = 0; i < messageCount; i++) {
channelTx.publish("test_event", "Test message (resume_simple) " + i, msgComplete1.add());
try { Thread.sleep(delay); } catch(InterruptedException e){}
}

/* wait for the publish callback to be called */
ErrorInfo[] errors = msgComplete1.waitFor();
assertTrue("Verify success from all message callbacks", errors.length == 0);

/* wait for the subscription callback to be called */
messageWaiter.waitFor(messageCount);
assertEquals("Verify message subscriptions all called", messageWaiter.receivedMessages.size(), messageCount);
messageWaiter.reset();

/* disconnect the tx connection, without closing;
* NOTE this depends on knowledge of the internal structure
* of the library, to simulate a dropped transport without
* causing the connection itself to be disposed */
System.out.println("*** about to disconnect tx connection");
ablyTx.connection.connectionManager.requestState(ConnectionState.disconnected);

/* wait */
try { Thread.sleep(2000L); } catch(InterruptedException e) {}

/* reconnect the tx connection */
System.out.println("*** about to reconnect tx connection");
ablyTx.connection.connect();
(new ConnectionWaiter(ablyTx.connection)).waitFor(ConnectionState.connected);

/* publish further messages to the channel */
CompletionSet msgComplete2 = new CompletionSet();
for(int i = 0; i < messageCount; i++) {
channelTx.publish("test_event", "Test message (resume_simple) " + i, msgComplete2.add());
try { Thread.sleep(delay); } catch(InterruptedException e){}
}

/* wait for the publish callback to be called. This never finishes if
* https://github.com/ably/ably-java/issues/170
* is not fixed. */
System.out.println("*** published. About to wait for callbacks");
errors = msgComplete2.waitFor();
System.out.println("*** done");
assertTrue("Verify success from all message callbacks", errors.length == 0);

/* wait for the subscription callback to be called */
messageWaiter.waitFor(messageCount);
assertEquals("Verify message subscriptions all called after reconnection", messageWaiter.receivedMessages.size(), messageCount);

} catch (AblyException e) {
e.printStackTrace();
fail("init0: Unexpected exception instantiating library");
} finally {
if(ablyTx != null)
ablyTx.close();
if(ablyRx != null)
ablyRx.close();
}
}
}