Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/no connection serial recovery key #980

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
fe52c3f
Implemented connectionRecoveryKey
sacOO7 Nov 24, 2023
c3e7576
Renamed file to recoverykeycontext
sacOO7 Nov 24, 2023
2549547
Added a test to check for encoded recovery key
sacOO7 Nov 27, 2023
c767de2
Added test for encoding and decoding recovery key
sacOO7 Nov 27, 2023
25b9cdc
Added channel serial to channel properties
sacOO7 Nov 27, 2023
a7938e3
Added method to set channelSerials from recover option
sacOO7 Nov 27, 2023
2a8bbbd
Added method for getting channel serials to Channels class
sacOO7 Nov 27, 2023
6461967
Marked recoveryKey field as deprecated
sacOO7 Nov 28, 2023
1897227
Merge branch 'feature/no-connection-serial' into feature/no-connectio…
sacOO7 Nov 28, 2023
180d68f
Added explicit method for creating a recovery key
sacOO7 Nov 28, 2023
1509938
Simplified recoveryKeyContext class
sacOO7 Nov 28, 2023
169392e
Setting recovery key and serials from clientOption
sacOO7 Nov 28, 2023
c6af891
Refactored recoverykey to use createRecoveryKey method
sacOO7 Nov 28, 2023
e7dc410
Removed all connection serial references from the code
sacOO7 Nov 28, 2023
130305b
Added explicit null checks for recoveryKey
sacOO7 Nov 28, 2023
3c04523
Implemented channel serial for message reeived
sacOO7 Nov 29, 2023
0efffdf
Added missing implementation for channel detach when attached msg rec…
sacOO7 Nov 29, 2023
3ebeed7
Updated code to send explicit detach message when attached received in
sacOO7 Nov 29, 2023
d465489
Clearing channel serial as per RTP5a1
sacOO7 Nov 30, 2023
b42ff87
resetting message serial on failed connection resume or recover
sacOO7 Dec 1, 2023
5b8ab5e
Fixed AblyRealtime as class imports
sacOO7 Dec 1, 2023
bb42aea
refactored ably protocol and agent headers in accordance with version id
sacOO7 Dec 1, 2023
5b8e5b7
Updated test for protocol version
sacOO7 Dec 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/src/main/java/io/ably/lib/http/HttpCore.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ <T> T httpExecute(HttpURLConnection conn, String method, Param[] headers, Reques
if(!acceptSet) { conn.setRequestProperty(HttpConstants.Headers.ACCEPT, HttpConstants.ContentTypes.JSON); }

/* pass required headers */
conn.setRequestProperty(Defaults.ABLY_VERSION_HEADER, Defaults.ABLY_VERSION);
conn.setRequestProperty(Defaults.ABLY_PROTOCOL_VERSION_HEADER, Defaults.ABLY_PROTOCOL_VERSION);
conn.setRequestProperty(Defaults.ABLY_AGENT_HEADER, AgentHeaderCreator.create(options.agents, platformAgentProvider));

/* prepare request body */
Expand Down
29 changes: 29 additions & 0 deletions lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
import io.ably.lib.types.ErrorInfo;
import io.ably.lib.types.ProtocolMessage;
import io.ably.lib.types.ReadOnlyMap;
import io.ably.lib.types.RecoveryKeyContext;
import io.ably.lib.util.InternalMap;
import io.ably.lib.util.Log;
import io.ably.lib.util.StringUtils;

/**
* A client that extends the functionality of the {@link AblyRest} and provides additional realtime-specific features.
Expand Down Expand Up @@ -71,6 +73,14 @@ public void onConnectionStateChanged(ConnectionStateListener.ConnectionStateChan
}
});

if (!StringUtils.isNullOrEmpty(options.recover)) {
RecoveryKeyContext recoveryKeyContext = RecoveryKeyContext.decode(options.recover);
if (recoveryKeyContext != null) {
setChannelSerialsFromRecoverOption(recoveryKeyContext.getChannelSerials());
connection.connectionManager.msgSerial = recoveryKeyContext.getMsgSerial(); //RTN16f
}
}

if(options.autoConnect) connection.connect();
}

Expand Down Expand Up @@ -274,6 +284,25 @@ private void clear() {
}
}

protected void setChannelSerialsFromRecoverOption(Map<String, String> serials) {
for (Map.Entry<String, String> entry : serials.entrySet()) {
String channelName = entry.getKey();
String channelSerial = entry.getValue();
Channel channel = this.channels.get(channelName);
if (channel != null) {
channel.properties.channelSerial = channelSerial;
}
}
}

protected Map<String, String> getChannelSerials() {
Map<String, String> channelSerials = new HashMap<>();
for (Channel channel : this.channels.values()) {
channelSerials.put(channel.name, channel.properties.channelSerial);
}
return channelSerials;
}

/********************
* internal
********************/
Expand Down
37 changes: 31 additions & 6 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.ably.lib.util.EventEmitter;
import io.ably.lib.util.Log;
import io.ably.lib.util.ReconnectionStrategy;
import io.ably.lib.util.StringUtils;

/**
* Enables messages to be published and subscribed to.
Expand Down Expand Up @@ -131,6 +132,11 @@ private void setState(ChannelState newState, ErrorInfo reason, boolean resumed,
this.retryCount = 0;
}

// RTP5a1
if (newState == ChannelState.detached || newState == ChannelState.suspended || newState == ChannelState.failed) {
properties.channelSerial = null;
ttypic marked this conversation as resolved.
Show resolved Hide resolved
}

if(notifyStateChange) {
/* broadcast state change */
emit(newState, stateChange);
Expand Down Expand Up @@ -248,8 +254,9 @@ private void attachImpl(final boolean forceReattach, final CompletionListener li
}
}
if(this.decodeFailureRecoveryInProgress) {
attachMessage.channelSerial = this.lastPayloadProtocolMessageChannelSerial;
Log.v(TAG, "attach(); message decode recovery in progress.");
}
attachMessage.channelSerial = properties.channelSerial;
try {
if (listener != null) {
on(new ChannelStateCompletionListener(listener, ChannelState.attached, ChannelState.failed));
Expand Down Expand Up @@ -325,7 +332,10 @@ private void detachImpl(CompletionListener listener) throws AblyException {
if(!connectionManager.isActive())
throw AblyException.fromErrorInfo(connectionManager.getStateErrorInfo());

/* send detach request */
sendDetachMessage(listener);
}

private void sendDetachMessage(CompletionListener listener) throws AblyException {
ProtocolMessage detachMessage = new ProtocolMessage(Action.detach, this.name);
try {
if (listener != null) {
Expand All @@ -338,7 +348,7 @@ private void detachImpl(CompletionListener listener) throws AblyException {
} else {
setState(ChannelState.detaching, null);
}
connectionManager.send(detachMessage, true, null);
ably.connection.connectionManager.send(detachMessage, true, null);
} catch(AblyException e) {
throw e;
}
Expand Down Expand Up @@ -399,7 +409,15 @@ private void setAttached(ProtocolMessage message) {
Log.v(TAG, String.format(Locale.ROOT, "Server initiated attach for channel %s", name));
/* emit UPDATE event according to RTL12 */
emitUpdate(null, resumed);
} else {
} else if (state == ChannelState.detaching || state == ChannelState.detached) { //RTL5k
Log.v(TAG, "setAttached(): channel is in detaching state, as per RTL5k sending detach message!");
try {
sendDetachMessage(null);
} catch (AblyException e) {
Log.e(TAG, e.getMessage(), e);
}
}
else {
this.attachResume = true;
setState(ChannelState.attached, message.error, resumed);
presence.setAttached(message.hasFlag(Flag.has_presence), this.ably.connection.id);
Expand Down Expand Up @@ -850,7 +868,6 @@ private void onMessage(final ProtocolMessage protocolMessage) {
}

lastPayloadMessageId = lastMessage.id;
lastPayloadProtocolMessageChannelSerial = protocolMessage.channelSerial;

for (final Message msg : messages) {
this.listeners.onMessage(msg);
Expand Down Expand Up @@ -1264,6 +1281,15 @@ else if(stateChange.current.equals(failureState)) {
}

void onChannelMessage(ProtocolMessage msg) {
// RTL15b
if (!StringUtils.isNullOrEmpty(msg.channelSerial) && (msg.action == Action.message ||
msg.action == Action.presence || msg.action == Action.attached)) {
Log.v(TAG, String.format(
Locale.ROOT, "Setting channel serial for channelName - %s, previous - %s, current - %s",
name, properties.channelSerial, msg.channelSerial));
properties.channelSerial = msg.channelSerial;
}

switch(msg.action) {
case attached:
setAttached(msg);
Expand Down Expand Up @@ -1369,7 +1395,6 @@ public void once(ChannelState state, ChannelStateListener listener) {
*/
private Set<ChannelMode> modes;
private String lastPayloadMessageId;
private String lastPayloadProtocolMessageChannelSerial;
private boolean decodeFailureRecoveryInProgress;
private final DecodingContext decodingContext;
}
32 changes: 22 additions & 10 deletions lib/src/main/java/io/ably/lib/realtime/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.ably.lib.transport.ConnectionManager;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.ErrorInfo;
import io.ably.lib.types.RecoveryKeyContext;
import io.ably.lib.util.EventEmitter;
import io.ably.lib.util.Log;
import io.ably.lib.util.PlatformAgentProvider;
Expand Down Expand Up @@ -49,25 +50,36 @@ public class Connection extends EventEmitter<ConnectionEvent, ConnectionStateLis
* for more information.
* <p>
* Spec: RTN16b, RTN16c
* @deprecated use createRecoveryKey method instead.
*/
@Deprecated
public String recoveryKey;

/**
* A unique public identifier for this connection, used to identify this member.
* <p>
* Spec: RTN8
* Spec: RTN16g
*
* @return a json string which incorporates the @connectionKey@, the current @msgSerial@,
* and a collection of pairs of channel @name@ and current @channelSerial@ for every currently attached channel.
*/
public String id;
public String createRecoveryKey() {
if (key == null || key.isEmpty() || this.state == ConnectionState.closing ||
this.state == ConnectionState.closed ||
this.state == ConnectionState.failed ||
this.state == ConnectionState.suspended
) {
//RTN16h
return null;
}

return new RecoveryKeyContext(key, connectionManager.msgSerial, ably.getChannelSerials()).encode();
}

/**
* The serial number of the last message to be received on this connection,
* used automatically by the library when recovering or resuming a connection.
* When recovering a connection explicitly, the recoveryKey is used in the recover
* client options as it contains both the key and the last message serial.
* A unique public identifier for this connection, used to identify this member.
* <p>
* Spec: RTN10
* Spec: RTN8
*/
public long serial;
public String id;

/**
* Explicitly calling connect() is unnecessary unless the autoConnect attribute of the {@link io.ably.lib.types.ClientOptions}
Expand Down
27 changes: 14 additions & 13 deletions lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.ably.lib.util.Log;
import io.ably.lib.util.PlatformAgentProvider;
import io.ably.lib.util.ReconnectionStrategy;
import io.ably.lib.util.StringUtils;

public class ConnectionManager implements ConnectListener {
final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
Expand Down Expand Up @@ -1193,19 +1194,23 @@ public void onMessage(ITransport transport, ProtocolMessage message) throws Ably
}

private void onChannelMessage(ProtocolMessage message) {
if(message.connectionSerial != null) {
connection.serial = message.connectionSerial.longValue();
if (connection.key != null)
connection.recoveryKey = connection.key + ":" + message.connectionSerial;
}
channels.onMessage(message);
connection.recoveryKey = connection.createRecoveryKey();
}

private synchronized void onConnected(ProtocolMessage message) {
final ErrorInfo error = message.error;
boolean reattachOnResumeFailure = false; // this will indicate that channel must reattach when connected
// event is received

boolean isConnectionResumeOrRecoverAttempt = !StringUtils.isNullOrEmpty(connection.key) ||
!StringUtils.isNullOrEmpty(ably.options.recover);
boolean failedResumeOrRecover = !message.connectionId.equals(connection.id) && message.error != null; // RTN15c7, RTN16d
if (isConnectionResumeOrRecoverAttempt && failedResumeOrRecover) { // RTN15c7
msgSerial = 0;
}
ably.options.recover = null; // RTN16k, explicitly setting null, so it won't be used for subsequent connection requests

connection.reason = error;
if (connection.id != null) { // there was a previous connection, so this is a resume and RTN15c applies
Log.d(TAG, "There was a connection resume");
ttypic marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -1240,12 +1245,6 @@ private synchronized void onConnected(ProtocolMessage message) {

connection.id = message.connectionId;

if(message.connectionSerial != null) {
connection.serial = message.connectionSerial;
if (connection.key != null)
connection.recoveryKey = connection.key + ":" + message.connectionSerial;
}

ConnectionDetails connectionDetails = message.connectionDetails;
/* Get any parameters from connectionDetails. */
connection.key = connectionDetails.connectionKey; //RTN16d
Expand All @@ -1260,6 +1259,9 @@ private synchronized void onConnected(ProtocolMessage message) {
requestState(transport, new StateIndication(ConnectionState.failed, e.errorInfo));
return;
}

connection.recoveryKey = connection.createRecoveryKey();

/* indicated connected currentState */
final StateIndication stateIndication = new StateIndication(ConnectionState.connected, error, null, null,
reattachOnResumeFailure);
Expand Down Expand Up @@ -1504,7 +1506,6 @@ private class ConnectParams extends TransportParams {
ConnectParams(ClientOptions options, PlatformAgentProvider platformAgentProvider) {
super(options, platformAgentProvider);
this.connectionKey = connection.key;
this.connectionSerial = String.valueOf(connection.serial);
this.port = Defaults.getPort(options);
}
}
Expand Down Expand Up @@ -1905,7 +1906,7 @@ private boolean isFatalError(ErrorInfo err) {
private boolean suppressRetry; /* for tests only; modified via reflection */
private ITransport transport;
private long suspendTime;
private long msgSerial;
public long msgSerial;
private long lastActivity;
private CMConnectivityListener connectivityListener;
private long connectionStateTtl = Defaults.connectionStateTtl;
Expand Down
20 changes: 7 additions & 13 deletions lib/src/main/java/io/ably/lib/transport/Defaults.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,22 @@
import io.ably.lib.BuildConfig;
import io.ably.lib.types.ClientOptions;

import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.util.Locale;

public class Defaults {
public static final float ABLY_VERSION_NUMBER = 1.0f;

/**
* The level of compatibility with the Ably service that this SDK supports, also referred to as the 'wire protocol version'.
* This value is presented as a string, as specified in G4a.
*/
public static final String ABLY_VERSION = new DecimalFormat("0.0", new DecimalFormatSymbols(Locale.ENGLISH)).format(ABLY_VERSION_NUMBER);
public static final String ABLY_PROTOCOL_VERSION = "2";

public static final String ABLY_AGENT_VERSION = String.format("%s/%s", "ably-java", BuildConfig.VERSION);

/* params */
public static final String ABLY_VERSION_PARAM = "v";
public static final String ABLY_AGENT_PARAM = "agent";
/* realtime params */
public static final String ABLY_PROTOCOL_VERSION_PARAM = "v";
public static final String ABLY_AGENT_PARAM = "agent";

/* Headers */
public static final String ABLY_VERSION_HEADER = "X-Ably-Version";
public static final String ABLY_AGENT_HEADER = "Ably-Agent";
/* http headers */
public static final String ABLY_PROTOCOL_VERSION_HEADER = "X-Ably-Version";
public static final String ABLY_AGENT_HEADER = "Ably-Agent";

/* Hosts */
public static final String[] HOST_FALLBACKS = { "A.ably-realtime.com", "B.ably-realtime.com", "C.ably-realtime.com", "D.ably-realtime.com", "E.ably-realtime.com" };
Expand Down
28 changes: 8 additions & 20 deletions lib/src/main/java/io/ably/lib/transport/ITransport.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
package io.ably.lib.transport;

import io.ably.lib.types.AblyException;
import io.ably.lib.types.ClientOptions;
import io.ably.lib.types.ErrorInfo;
import io.ably.lib.types.Param;
import io.ably.lib.types.ProtocolMessage;
import io.ably.lib.types.*;
import io.ably.lib.util.AgentHeaderCreator;
import io.ably.lib.util.Log;
import io.ably.lib.util.PlatformAgentProvider;
import io.ably.lib.util.StringUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;

import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public interface ITransport {

Expand All @@ -39,7 +34,6 @@ class TransportParams {
protected String host;
protected int port;
protected String connectionKey;
protected String connectionSerial;
protected Mode mode;
protected boolean heartbeats;
private final PlatformAgentProvider platformAgentProvider;
Expand All @@ -64,24 +58,18 @@ public ClientOptions getClientOptions() {

public Param[] getConnectParams(Param[] baseParams) {
List<Param> paramList = new ArrayList<Param>(Arrays.asList(baseParams));
paramList.add(new Param(Defaults.ABLY_VERSION_PARAM, Defaults.ABLY_VERSION));
paramList.add(new Param(Defaults.ABLY_PROTOCOL_VERSION_PARAM, Defaults.ABLY_PROTOCOL_VERSION));
paramList.add(new Param("format", (options.useBinaryProtocol ? "msgpack" : "json")));
if(!options.echoMessages)
paramList.add(new Param("echo", "false"));
if(connectionKey != null) {
if(!StringUtils.isNullOrEmpty(connectionKey)) {
mode = Mode.resume;
paramList.add(new Param("resume", connectionKey));
if(connectionSerial != null)
paramList.add(new Param("connectionSerial", connectionSerial));
} else if(options.recover != null) {
} else if(!StringUtils.isNullOrEmpty(options.recover)) { // RTN16k
mode = Mode.recover;
Pattern recoverSpec = Pattern.compile("^([\\w\\-\\!]+):(\\-?\\d+)$");
Matcher match = recoverSpec.matcher(options.recover);
if(match.matches()) {
paramList.add(new Param("recover", match.group(1)));
paramList.add(new Param("connectionSerial", match.group(2)));
} else {
Log.e(TAG, "Invalid recover string specified");
RecoveryKeyContext recoveryKeyContext = RecoveryKeyContext.decode(options.recover);
if (recoveryKeyContext != null) {
paramList.add(new Param("recover", recoveryKeyContext.getConnectionKey()));
}
}
if(options.clientId != null)
Expand Down
Loading
Loading