Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/no connection serial recovery key #980

Merged
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
fe52c3f
Implemented connectionRecoveryKey
sacOO7 Nov 24, 2023
c3e7576
Renamed file to recoverykeycontext
sacOO7 Nov 24, 2023
2549547
Added a test to check for encoded recovery key
sacOO7 Nov 27, 2023
c767de2
Added test for encoding and decoding recovery key
sacOO7 Nov 27, 2023
25b9cdc
Added channel serial to channel properties
sacOO7 Nov 27, 2023
a7938e3
Added method to set channelSerials from recover option
sacOO7 Nov 27, 2023
2a8bbbd
Added method for getting channel serials to Channels class
sacOO7 Nov 27, 2023
6461967
Marked recoveryKey field as deprecated
sacOO7 Nov 28, 2023
1897227
Merge branch 'feature/no-connection-serial' into feature/no-connectio…
sacOO7 Nov 28, 2023
180d68f
Added explicit method for creating a recovery key
sacOO7 Nov 28, 2023
1509938
Simplified recoveryKeyContext class
sacOO7 Nov 28, 2023
169392e
Setting recovery key and serials from clientOption
sacOO7 Nov 28, 2023
c6af891
Refactored recoverykey to use createRecoveryKey method
sacOO7 Nov 28, 2023
e7dc410
Removed all connection serial references from the code
sacOO7 Nov 28, 2023
130305b
Added explicit null checks for recoveryKey
sacOO7 Nov 28, 2023
3c04523
Implemented channel serial for message reeived
sacOO7 Nov 29, 2023
0efffdf
Added missing implementation for channel detach when attached msg rec…
sacOO7 Nov 29, 2023
3ebeed7
Updated code to send explicit detach message when attached received in
sacOO7 Nov 29, 2023
d465489
Clearing channel serial as per RTP5a1
sacOO7 Nov 30, 2023
b42ff87
resetting message serial on failed connection resume or recover
sacOO7 Dec 1, 2023
5b8ab5e
Fixed AblyRealtime as class imports
sacOO7 Dec 1, 2023
bb42aea
refactored ably protocol and agent headers in accordance with version id
sacOO7 Dec 1, 2023
5b8e5b7
Updated test for protocol version
sacOO7 Dec 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 29 additions & 6 deletions lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,10 @@
import io.ably.lib.rest.AblyRest;
import io.ably.lib.rest.Auth;
import io.ably.lib.transport.ConnectionManager;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.ChannelOptions;
import io.ably.lib.types.ClientOptions;
import io.ably.lib.types.ErrorInfo;
import io.ably.lib.types.ProtocolMessage;
import io.ably.lib.types.ReadOnlyMap;
import io.ably.lib.types.*;
ttypic marked this conversation as resolved.
Show resolved Hide resolved
import io.ably.lib.util.InternalMap;
import io.ably.lib.util.Log;
import io.ably.lib.util.StringUtils;

/**
* A client that extends the functionality of the {@link AblyRest} and provides additional realtime-specific features.
Expand Down Expand Up @@ -71,6 +67,14 @@ public void onConnectionStateChanged(ConnectionStateListener.ConnectionStateChan
}
});

if (!StringUtils.isNullOrEmpty(options.recover)) {
RecoveryKeyContext recoveryKeyContext = RecoveryKeyContext.decode(options.recover);
if (recoveryKeyContext != null) {
setChannelSerialsFromRecoverOption(recoveryKeyContext.getChannelSerials());
connection.connectionManager.msgSerial = recoveryKeyContext.getMsgSerial(); //RTN16f
}
}

if(options.autoConnect) connection.connect();
}

Expand Down Expand Up @@ -274,6 +278,25 @@ private void clear() {
}
}

protected void setChannelSerialsFromRecoverOption(Map<String, String> serials) {
for (Map.Entry<String, String> entry : serials.entrySet()) {
String channelName = entry.getKey();
String channelSerial = entry.getValue();
Channel channel = this.channels.get(channelName);
if (channel != null) {
channel.properties.channelSerial = channelSerial;
}
}
}

protected Map<String, String> getChannelSerials() {
Map<String, String> channelSerials = new HashMap<>();
for (Channel channel : this.channels.values()) {
channelSerials.put(channel.name, channel.properties.channelSerial);
}
return channelSerials;
}

/********************
* internal
********************/
Expand Down
37 changes: 31 additions & 6 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.ably.lib.util.EventEmitter;
import io.ably.lib.util.Log;
import io.ably.lib.util.ReconnectionStrategy;
import io.ably.lib.util.StringUtils;

/**
* Enables messages to be published and subscribed to.
Expand Down Expand Up @@ -131,6 +132,11 @@ private void setState(ChannelState newState, ErrorInfo reason, boolean resumed,
this.retryCount = 0;
}

// RTP5a1
if (newState == ChannelState.detached || newState == ChannelState.suspended || newState == ChannelState.failed) {
properties.channelSerial = null;
ttypic marked this conversation as resolved.
Show resolved Hide resolved
}

if(notifyStateChange) {
/* broadcast state change */
emit(newState, stateChange);
Expand Down Expand Up @@ -248,8 +254,9 @@ private void attachImpl(final boolean forceReattach, final CompletionListener li
}
}
if(this.decodeFailureRecoveryInProgress) {
attachMessage.channelSerial = this.lastPayloadProtocolMessageChannelSerial;
Log.v(TAG, "attach(); message decode recovery in progress.");
}
attachMessage.channelSerial = properties.channelSerial;
try {
if (listener != null) {
on(new ChannelStateCompletionListener(listener, ChannelState.attached, ChannelState.failed));
Expand Down Expand Up @@ -325,7 +332,10 @@ private void detachImpl(CompletionListener listener) throws AblyException {
if(!connectionManager.isActive())
throw AblyException.fromErrorInfo(connectionManager.getStateErrorInfo());

/* send detach request */
sendDetachMessage(listener);
}

private void sendDetachMessage(CompletionListener listener) throws AblyException {
ProtocolMessage detachMessage = new ProtocolMessage(Action.detach, this.name);
try {
if (listener != null) {
Expand All @@ -338,7 +348,7 @@ private void detachImpl(CompletionListener listener) throws AblyException {
} else {
setState(ChannelState.detaching, null);
}
connectionManager.send(detachMessage, true, null);
ably.connection.connectionManager.send(detachMessage, true, null);
} catch(AblyException e) {
throw e;
}
Expand Down Expand Up @@ -399,7 +409,15 @@ private void setAttached(ProtocolMessage message) {
Log.v(TAG, String.format(Locale.ROOT, "Server initiated attach for channel %s", name));
/* emit UPDATE event according to RTL12 */
emitUpdate(null, resumed);
} else {
} else if (state == ChannelState.detaching || state == ChannelState.detached) { //RTL5k
Log.v(TAG, "setAttached(): channel is in detaching state, as per RTL5k sending detach message!");
try {
sendDetachMessage(null);
} catch (AblyException e) {
Log.e(TAG, e.getMessage(), e);
}
}
else {
this.attachResume = true;
setState(ChannelState.attached, message.error, resumed);
presence.setAttached(message.hasFlag(Flag.has_presence), this.ably.connection.id);
Expand Down Expand Up @@ -850,7 +868,6 @@ private void onMessage(final ProtocolMessage protocolMessage) {
}

lastPayloadMessageId = lastMessage.id;
lastPayloadProtocolMessageChannelSerial = protocolMessage.channelSerial;

for (final Message msg : messages) {
this.listeners.onMessage(msg);
Expand Down Expand Up @@ -1264,6 +1281,15 @@ else if(stateChange.current.equals(failureState)) {
}

void onChannelMessage(ProtocolMessage msg) {
// RTL15b
if (!StringUtils.isNullOrEmpty(msg.channelSerial) && (msg.action == Action.message ||
msg.action == Action.presence || msg.action == Action.attached)) {
Log.v(TAG, String.format(
Locale.ROOT, "Setting channel serial for channelName - %s, previous - %s, current - %s",
name, properties.channelSerial, msg.channelSerial));
properties.channelSerial = msg.channelSerial;
}

switch(msg.action) {
case attached:
setAttached(msg);
Expand Down Expand Up @@ -1369,7 +1395,6 @@ public void once(ChannelState state, ChannelStateListener listener) {
*/
private Set<ChannelMode> modes;
private String lastPayloadMessageId;
private String lastPayloadProtocolMessageChannelSerial;
private boolean decodeFailureRecoveryInProgress;
private final DecodingContext decodingContext;
}
32 changes: 22 additions & 10 deletions lib/src/main/java/io/ably/lib/realtime/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.ably.lib.transport.ConnectionManager;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.ErrorInfo;
import io.ably.lib.types.RecoveryKeyContext;
import io.ably.lib.util.EventEmitter;
import io.ably.lib.util.Log;
import io.ably.lib.util.PlatformAgentProvider;
Expand Down Expand Up @@ -49,25 +50,36 @@ public class Connection extends EventEmitter<ConnectionEvent, ConnectionStateLis
* for more information.
* <p>
* Spec: RTN16b, RTN16c
* @deprecated use createRecoveryKey method instead.
*/
@Deprecated
public String recoveryKey;

/**
* A unique public identifier for this connection, used to identify this member.
* <p>
* Spec: RTN8
* Spec: RTN16g
*
* @return a json string which incorporates the @connectionKey@, the current @msgSerial@,
* and a collection of pairs of channel @name@ and current @channelSerial@ for every currently attached channel.
*/
public String id;
public String createRecoveryKey() {
if (key == null || key.isEmpty() || this.state == ConnectionState.closing ||
this.state == ConnectionState.closed ||
this.state == ConnectionState.failed ||
this.state == ConnectionState.suspended
) {
//RTN16h
return null;
}

return new RecoveryKeyContext(key, connectionManager.msgSerial, ably.getChannelSerials()).encode();
}

/**
* The serial number of the last message to be received on this connection,
* used automatically by the library when recovering or resuming a connection.
* When recovering a connection explicitly, the recoveryKey is used in the recover
* client options as it contains both the key and the last message serial.
* A unique public identifier for this connection, used to identify this member.
* <p>
* Spec: RTN10
* Spec: RTN8
*/
public long serial;
public String id;

/**
* Explicitly calling connect() is unnecessary unless the autoConnect attribute of the {@link io.ably.lib.types.ClientOptions}
Expand Down
23 changes: 10 additions & 13 deletions lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.ably.lib.util.Log;
import io.ably.lib.util.PlatformAgentProvider;
import io.ably.lib.util.ReconnectionStrategy;
import io.ably.lib.util.StringUtils;

public class ConnectionManager implements ConnectListener {
final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
Expand Down Expand Up @@ -1193,19 +1194,19 @@ public void onMessage(ITransport transport, ProtocolMessage message) throws Ably
}

private void onChannelMessage(ProtocolMessage message) {
if(message.connectionSerial != null) {
connection.serial = message.connectionSerial.longValue();
if (connection.key != null)
connection.recoveryKey = connection.key + ":" + message.connectionSerial;
}
channels.onMessage(message);
connection.recoveryKey = connection.createRecoveryKey();
}

private synchronized void onConnected(ProtocolMessage message) {
final ErrorInfo error = message.error;
boolean reattachOnResumeFailure = false; // this will indicate that channel must reattach when connected
// event is received

boolean isConnectionResumeOrRecoverAttempt = !StringUtils.isNullOrEmpty(connection.key) ||
!StringUtils.isNullOrEmpty(ably.options.recover);
ably.options.recover = null; // RTN16k, explicitly setting null, so it won't be used for subsequent connection requests

connection.reason = error;
if (connection.id != null) { // there was a previous connection, so this is a resume and RTN15c applies
Log.d(TAG, "There was a connection resume");
ttypic marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -1240,12 +1241,6 @@ private synchronized void onConnected(ProtocolMessage message) {

connection.id = message.connectionId;

if(message.connectionSerial != null) {
connection.serial = message.connectionSerial;
if (connection.key != null)
connection.recoveryKey = connection.key + ":" + message.connectionSerial;
}

ConnectionDetails connectionDetails = message.connectionDetails;
/* Get any parameters from connectionDetails. */
connection.key = connectionDetails.connectionKey; //RTN16d
Expand All @@ -1260,6 +1255,9 @@ private synchronized void onConnected(ProtocolMessage message) {
requestState(transport, new StateIndication(ConnectionState.failed, e.errorInfo));
return;
}

connection.recoveryKey = connection.createRecoveryKey();

/* indicated connected currentState */
final StateIndication stateIndication = new StateIndication(ConnectionState.connected, error, null, null,
reattachOnResumeFailure);
Expand Down Expand Up @@ -1504,7 +1502,6 @@ private class ConnectParams extends TransportParams {
ConnectParams(ClientOptions options, PlatformAgentProvider platformAgentProvider) {
super(options, platformAgentProvider);
this.connectionKey = connection.key;
this.connectionSerial = String.valueOf(connection.serial);
this.port = Defaults.getPort(options);
}
}
Expand Down Expand Up @@ -1905,7 +1902,7 @@ private boolean isFatalError(ErrorInfo err) {
private boolean suppressRetry; /* for tests only; modified via reflection */
private ITransport transport;
private long suspendTime;
private long msgSerial;
public long msgSerial;
private long lastActivity;
private CMConnectivityListener connectivityListener;
private long connectionStateTtl = Defaults.connectionStateTtl;
Expand Down
26 changes: 7 additions & 19 deletions lib/src/main/java/io/ably/lib/transport/ITransport.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
package io.ably.lib.transport;

import io.ably.lib.types.AblyException;
import io.ably.lib.types.ClientOptions;
import io.ably.lib.types.ErrorInfo;
import io.ably.lib.types.Param;
import io.ably.lib.types.ProtocolMessage;
import io.ably.lib.types.*;
import io.ably.lib.util.AgentHeaderCreator;
import io.ably.lib.util.Log;
import io.ably.lib.util.PlatformAgentProvider;
import io.ably.lib.util.StringUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;

import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public interface ITransport {

Expand All @@ -39,7 +34,6 @@ class TransportParams {
protected String host;
protected int port;
protected String connectionKey;
protected String connectionSerial;
protected Mode mode;
protected boolean heartbeats;
private final PlatformAgentProvider platformAgentProvider;
Expand Down Expand Up @@ -68,20 +62,14 @@ public Param[] getConnectParams(Param[] baseParams) {
paramList.add(new Param("format", (options.useBinaryProtocol ? "msgpack" : "json")));
if(!options.echoMessages)
paramList.add(new Param("echo", "false"));
if(connectionKey != null) {
if(!StringUtils.isNullOrEmpty(connectionKey)) {
mode = Mode.resume;
paramList.add(new Param("resume", connectionKey));
if(connectionSerial != null)
paramList.add(new Param("connectionSerial", connectionSerial));
} else if(options.recover != null) {
} else if(!StringUtils.isNullOrEmpty(options.recover)) { // RTN16k
mode = Mode.recover;
Pattern recoverSpec = Pattern.compile("^([\\w\\-\\!]+):(\\-?\\d+)$");
Matcher match = recoverSpec.matcher(options.recover);
if(match.matches()) {
paramList.add(new Param("recover", match.group(1)));
paramList.add(new Param("connectionSerial", match.group(2)));
} else {
Log.e(TAG, "Invalid recover string specified");
RecoveryKeyContext recoveryKeyContext = RecoveryKeyContext.decode(options.recover);
if (recoveryKeyContext != null) {
paramList.add(new Param("recover", recoveryKeyContext.getConnectionKey()));
}
}
if(options.clientId != null)
Expand Down
8 changes: 8 additions & 0 deletions lib/src/main/java/io/ably/lib/types/ChannelProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,13 @@ public class ChannelProperties {
*/
public String attachSerial;

/**
* ChannelSerial contains the channelSerial from latest ProtocolMessage of action type
* Message/PresenceMessage received on the channel.
* <p>
* Spec: CP2b, RTL15b
*/
public String channelSerial;

public ChannelProperties() {}
}
4 changes: 0 additions & 4 deletions lib/src/main/java/io/ably/lib/types/ProtocolMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ public ProtocolMessage(Action action, String channel) {
public String channel;
public String channelSerial;
public String connectionId;
public Long connectionSerial;
public Long msgSerial;
public long timestamp;
public Message[] messages;
Expand Down Expand Up @@ -198,9 +197,6 @@ ProtocolMessage readMsgpack(MessageUnpacker unpacker) throws IOException {
case "connectionId":
connectionId = unpacker.unpackString();
break;
case "connectionSerial":
connectionSerial = Long.valueOf(unpacker.unpackLong());
break;
case "msgSerial":
msgSerial = Long.valueOf(unpacker.unpackLong());
break;
Expand Down
Loading
Loading