Skip to content

Commit

Permalink
Merge pull request #982 from ably/feature/no-connection-serial-presen…
Browse files Browse the repository at this point in the history
…ce-map

no-connection-serial-presence
  • Loading branch information
sacOO7 authored Jan 23, 2024
2 parents aedc320 + 2f252eb commit 4d68663
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 303 deletions.
2 changes: 1 addition & 1 deletion lib/src/main/java/io/ably/lib/http/HttpCore.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ <T> T httpExecute(HttpURLConnection conn, String method, Param[] headers, Reques
if(!acceptSet) { conn.setRequestProperty(HttpConstants.Headers.ACCEPT, HttpConstants.ContentTypes.JSON); }

/* pass required headers */
conn.setRequestProperty(Defaults.ABLY_PROTOCOL_VERSION_HEADER, Defaults.ABLY_PROTOCOL_VERSION);
conn.setRequestProperty(Defaults.ABLY_PROTOCOL_VERSION_HEADER, Defaults.ABLY_PROTOCOL_VERSION); // RSC7a
conn.setRequestProperty(Defaults.ABLY_AGENT_HEADER, AgentHeaderCreator.create(options.agents, platformAgentProvider));

/* prepare request body */
Expand Down
20 changes: 6 additions & 14 deletions lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -76,7 +75,7 @@ public void onConnectionStateChanged(ConnectionStateListener.ConnectionStateChan
if (!StringUtils.isNullOrEmpty(options.recover)) {
RecoveryKeyContext recoveryKeyContext = RecoveryKeyContext.decode(options.recover);
if (recoveryKeyContext != null) {
setChannelSerialsFromRecoverOption(recoveryKeyContext.getChannelSerials());
setChannelSerialsFromRecoverOption(recoveryKeyContext.getChannelSerials()); // RTN16j
connection.connectionManager.msgSerial = recoveryKeyContext.getMsgSerial(); //RTN16f
}
}
Expand Down Expand Up @@ -243,9 +242,8 @@ public void onMessage(ProtocolMessage msg) {

@Override
public void suspendAll(ErrorInfo error, boolean notifyStateChange) {
for(Iterator<Map.Entry<String, Channel>> it = map.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<String, Channel> entry = it.next();
entry.getValue().setSuspended(error, notifyStateChange);
for (Channel channel : map.values()) {
channel.setSuspended(error, notifyStateChange);
}
}

Expand All @@ -255,7 +253,7 @@ public void suspendAll(ErrorInfo error, boolean notifyStateChange) {
* @param queuedMessages Queued messages transferred from ConnectionManager
*/
@Override
public void transferToChannels(List<ConnectionManager.QueuedMessage> queuedMessages) {
public void transferToChannelQueue(List<ConnectionManager.QueuedMessage> queuedMessages) {
final Map<String, List<ConnectionManager.QueuedMessage>> channelQueueMap = new HashMap<>();
for (ConnectionManager.QueuedMessage queuedMessage : queuedMessages) {
final String channelName = queuedMessage.msg.channel;
Expand All @@ -265,16 +263,10 @@ public void transferToChannels(List<ConnectionManager.QueuedMessage> queuedMessa
channelQueueMap.get(channelName).add(queuedMessage);
}

for (Map.Entry<String, Channel> channelEntry : map.entrySet()) {
Channel channel = channelEntry.getValue();
for (Channel channel : map.values()) {
if (channel.state.isReattachable()) {
Log.d(TAG, "reAttach(); channel = " + channel.name);

if (channelQueueMap.containsKey(channel.name)){
channel.transferQueuedPresenceMessages(channelQueueMap.get(channel.name));
}else {
channel.transferQueuedPresenceMessages(null);
}
channel.transferQueuedPresenceMessages(channelQueueMap.getOrDefault(channel.name, null));
}
}
}
Expand Down
102 changes: 24 additions & 78 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ private void attachImpl(final boolean forceReattach, final CompletionListener li
if(this.decodeFailureRecoveryInProgress) {
Log.v(TAG, "attach(); message decode recovery in progress.");
}
attachMessage.channelSerial = properties.channelSerial;
attachMessage.channelSerial = properties.channelSerial; // RTL4c1
try {
if (listener != null) {
on(new ChannelStateCompletionListener(listener, ChannelState.attached, ChannelState.failed));
Expand Down Expand Up @@ -354,26 +354,6 @@ private void sendDetachMessage(CompletionListener listener) throws AblyException
}
}

public void sync() throws AblyException {
Log.v(TAG, "sync(); channel = " + name);
/* check preconditions */
switch(state) {
case initialized:
case detaching:
case detached:
throw AblyException.fromErrorInfo(new ErrorInfo("Unable to sync to channel; not attached", 40000));
default:
}
ConnectionManager connectionManager = ably.connection.connectionManager;
if(!connectionManager.isActive())
throw AblyException.fromErrorInfo(connectionManager.getStateErrorInfo());

/* send sync request */
ProtocolMessage syncMessage = new ProtocolMessage(Action.sync, this.name);
syncMessage.channelSerial = syncChannelSerial;
connectionManager.send(syncMessage, true, null);
}

/***
* internal
*
Expand All @@ -400,41 +380,44 @@ private static void callCompletionListenerError(CompletionListener listener, Err

private void setAttached(ProtocolMessage message) {
clearAttachTimers();
boolean resumed = message.hasFlag(Flag.resumed);
Log.v(TAG, "setAttached(); channel = " + name + ", resumed = " + resumed);
properties.attachSerial = message.channelSerial;
params = message.params;
modes = ChannelMode.toSet(message.flags);
if(state == ChannelState.attached) {
Log.v(TAG, String.format(Locale.ROOT, "Server initiated attach for channel %s", name));
/* emit UPDATE event according to RTL12 */
emitUpdate(null, resumed);
} else if (state == ChannelState.detaching || state == ChannelState.detached) { //RTL5k
this.attachResume = true;

if (state == ChannelState.detaching || state == ChannelState.detached) { //RTL5k
Log.v(TAG, "setAttached(): channel is in detaching state, as per RTL5k sending detach message!");
try {
sendDetachMessage(null);
} catch (AblyException e) {
Log.e(TAG, e.getMessage(), e);
}
return;
}
if(state == ChannelState.attached) {
Log.v(TAG, String.format(Locale.ROOT, "Server initiated attach for channel %s", name));
if (!message.hasFlag(Flag.resumed)) { // RTL12
presence.onAttached(message.hasFlag(Flag.has_presence), true);
emitUpdate(message.error, false);
}
}
else {
this.attachResume = true;
setState(ChannelState.attached, message.error, resumed);
presence.setAttached(message.hasFlag(Flag.has_presence), this.ably.connection.id);
presence.onAttached(message.hasFlag(Flag.has_presence), true);
setState(ChannelState.attached, message.error, message.hasFlag(Flag.resumed));
}
}

private void setDetached(ErrorInfo reason) {
clearAttachTimers();
Log.v(TAG, "setDetached(); channel = " + name);
presence.setDetached(reason);
presence.onChannelDetachedOrFailed(reason);
setState(ChannelState.detached, reason);
}

private void setFailed(ErrorInfo reason) {
clearAttachTimers();
Log.v(TAG, "setFailed(); channel = " + name);
presence.setDetached(reason);
presence.onChannelDetachedOrFailed(reason);
this.attachResume = false;
setState(ChannelState.failed, reason);
}
Expand Down Expand Up @@ -627,21 +610,9 @@ public void run() {
}

/* State changes provoked by ConnectionManager state changes. */

public void setConnected(boolean reattachOnResumeFailure) {
if (reattachOnResumeFailure && state.isReattachable()){
attach(true,null);
} else if (state == ChannelState.suspended) {
/* (RTL3d) If the connection state enters the CONNECTED state, then
* a SUSPENDED channel will initiate an attach operation. If the
* attach operation for the channel times out and the channel
* returns to the SUSPENDED state (see #RTL4f)
*/
try {
attachWithTimeout(null);
} catch (AblyException e) {
Log.e(TAG, "setConnected(): Unable to initiate attach; channel = " + name, e);
}
public void setConnected() {
if (state.isReattachable()){
attach(true,null); // RTN15c6, RTN15c7
}
}

Expand Down Expand Up @@ -675,7 +646,7 @@ public synchronized void setSuspended(ErrorInfo reason, boolean notifyStateChang
clearAttachTimers();
if (state == ChannelState.attached || state == ChannelState.attaching) {
Log.v(TAG, "setSuspended(); channel = " + name);
presence.setSuspended(reason);
presence.onChannelSuspended(reason);
setState(ChannelState.suspended, reason, false, notifyStateChange);
}
}
Expand Down Expand Up @@ -893,30 +864,6 @@ public void onError(ErrorInfo reason) {
});
}

private void onPresence(ProtocolMessage message, String syncChannelSerial) {
Log.v(TAG, "onPresence(); channel = " + name + "; syncChannelSerial = " + syncChannelSerial);
PresenceMessage[] messages = message.presence;
for(int i = 0; i < messages.length; i++) {
PresenceMessage msg = messages[i];
try {
msg.decode(options);
} catch (MessageDecodeException e) {
Log.e(TAG, String.format(Locale.ROOT, "%s on channel %s", e.errorInfo.message, name));
}
/* populate fields derived from protocol message */
if(msg.connectionId == null) msg.connectionId = message.connectionId;
if(msg.timestamp == 0) msg.timestamp = message.timestamp;
if(msg.id == null) msg.id = message.id + ':' + i;
}
presence.setPresence(messages, true, syncChannelSerial);
}

private void onSync(ProtocolMessage message) {
Log.v(TAG, "onSync(); channel = " + name);
if(message.presence != null)
onPresence(message, (syncChannelSerial = message.channelSerial));
}

private MessageMulticaster listeners = new MessageMulticaster();
private HashMap<String, MessageMulticaster> eventListeners = new HashMap<String, MessageMulticaster>();

Expand Down Expand Up @@ -1341,11 +1288,11 @@ void onChannelMessage(ProtocolMessage msg) {
}
}
break;
case presence:
onPresence(msg, null);
break;
case sync:
onSync(msg);
presence.onSync(msg);
break;
case presence:
presence.onPresence(msg);
break;
case error:
setFailed(msg.error);
Expand Down Expand Up @@ -1380,7 +1327,6 @@ public void once(ChannelState state, ChannelStateListener listener) {
final AblyRealtime ably;
final String basePath;
ChannelOptions options;
String syncChannelSerial;
/**
* Optional <a href="https://ably.com/docs/realtime/channels/channel-parameters/overview">channel parameters</a>
* that configure the behavior of the channel.
Expand Down
1 change: 1 addition & 0 deletions lib/src/main/java/io/ably/lib/realtime/ChannelState.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public ChannelEvent getChannelEvent() {
return event;
}

// RTN15c6, RTN15c7, RTL3d, RTN15g3
public boolean isReattachable() {
return this == ChannelState.attaching || this == ChannelState.attached || this == ChannelState.suspended;
}
Expand Down
15 changes: 8 additions & 7 deletions lib/src/main/java/io/ably/lib/realtime/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,27 @@ public class Connection extends EventEmitter<ConnectionEvent, ConnectionStateLis
* See <a href="https://ably.com/docs/realtime/connection#connection-state-recover-options">connection state recover options</a>
* for more information.
* <p>
* Spec: RTN16b, RTN16c
* Spec: RTN16m
* @deprecated use createRecoveryKey method instead.
*/
@Deprecated
public String recoveryKey;

/**
* Spec: RTN16g
*
* @return a json string which incorporates the @connectionKey@, the current @msgSerial@,
* and a collection of pairs of channel @name@ and current @channelSerial@ for every currently attached channel.
* createRecoveryKey is a method that returns a json string which incorporates the @connectionKey@, the
* current @msgSerial@, and a collection of pairs of channel @name@ and current @channelSerial@ for every
* currently attached channel.
* <p>
* Spec: RTN16g, RTN16c
* </p>
*/
public String createRecoveryKey() {
if (key == null || key.isEmpty() || this.state == ConnectionState.closing ||
this.state == ConnectionState.closed ||
this.state == ConnectionState.failed ||
this.state == ConnectionState.suspended
) {
//RTN16h
return null;
return null; // RTN16g2
}

return new RecoveryKeyContext(key, connectionManager.msgSerial, ably.getChannelSerials()).encode();
Expand Down
Loading

0 comments on commit 4d68663

Please sign in to comment.