Skip to content

Commit

Permalink
Merge pull request #1028 from ably/feature/attach-on-subcribe-channel…
Browse files Browse the repository at this point in the history
…Option

Fix implicit attach on subscribe
  • Loading branch information
sacOO7 authored Sep 18, 2024
2 parents e0e2317 + 3ecab48 commit 7a360ae
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 3 deletions.
23 changes: 20 additions & 3 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,17 @@ public synchronized void unsubscribe() {
eventListeners.clear();
}

/**
* <p>
* Checks if {@link io.ably.lib.types.ChannelOptions#attachOnSubscribe} is true.
* </p>
* Defaults to {@code true} when {@link io.ably.lib.realtime.ChannelBase#options} is null.
* <p>Spec: TB4, RTL7g, RTL7gh, RTP6d, RTP6e</p>
*/
protected boolean attachOnSubscribeEnabled() {
return options == null || options.attachOnSubscribe;
}

/**
* Registers a listener for messages on this channel.
* The caller supplies a listener function, which is called each time one or more messages arrives on the channel.
Expand All @@ -704,7 +715,9 @@ public synchronized void unsubscribe() {
public synchronized void subscribe(MessageListener listener) throws AblyException {
Log.v(TAG, "subscribe(); channel = " + this.name);
listeners.add(listener);
attach();
if (attachOnSubscribeEnabled()) {
attach();
}
}

/**
Expand Down Expand Up @@ -739,7 +752,9 @@ public synchronized void unsubscribe(MessageListener listener) {
public synchronized void subscribe(String name, MessageListener listener) throws AblyException {
Log.v(TAG, "subscribe(); channel = " + this.name + "; event = " + name);
subscribeImpl(name, listener);
attach();
if (attachOnSubscribeEnabled()) {
attach();
}
}

/**
Expand Down Expand Up @@ -773,7 +788,9 @@ public synchronized void subscribe(String[] names, MessageListener listener) thr
Log.v(TAG, "subscribe(); channel = " + this.name + "; (multiple events)");
for(String name : names)
subscribeImpl(name, listener);
attach();
if (attachOnSubscribeEnabled()) {
attach();
}
}

/**
Expand Down
6 changes: 6 additions & 0 deletions lib/src/main/java/io/ably/lib/realtime/Presence.java
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,12 @@ public void unsubscribe() {
* @throws AblyException
*/
private void implicitAttachOnSubscribe(CompletionListener completionListener) throws AblyException {
if (!channel.attachOnSubscribeEnabled()) {
if (completionListener != null) {
completionListener.onSuccess();
}
return;
}
if (channel.state == ChannelState.failed) {
String errorString = String.format(Locale.ROOT, "Channel %s: subscribe in FAILED channel state", channel.name);
Log.v(TAG, errorString);
Expand Down
11 changes: 11 additions & 0 deletions lib/src/main/java/io/ably/lib/types/ChannelOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ public class ChannelOptions {
*/
public boolean encrypted;

/**
* <p>
* Determines whether calling {@link io.ably.lib.realtime.Channel#subscribe Channel.subscribe} or
* {@link io.ably.lib.realtime.Presence#subscribe Presence.subscribe} method
* should trigger an implicit attach.
* </p>
* <p>Defaults to {@code true}.</p>
* <p>Spec: TB4, RTL7g, RTL7gh, RTP6d, RTP6e</p>
*/
public boolean attachOnSubscribe = true;

public boolean hasModes() {
return null != modes && 0 != modes.length;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,71 @@ public void onMessage(Message message) {
}
}

/**
* <p>
* Validates a client can subscribe to messages without implicit channel attach
* Refer Spec TB4, RTL7g, RTL7gh
* </p>
* @throws AblyException
*/
@Test
public void subscribe_without_implicit_attach() {
String channelName = "subscribe_" + testParams.name;
AblyRealtime ably = null;
try {
ClientOptions opts = createOptions(testVars.keys[0].keyStr);
ably = new AblyRealtime(opts);

/* create a channel and set attachOnSubscribe to false */
final Channel channel = ably.channels.get(channelName);
ChannelOptions chOpts = new ChannelOptions();
chOpts.attachOnSubscribe = false;
channel.setOptions(chOpts);

List<Boolean> receivedMsg = Collections.synchronizedList(new ArrayList<>());

/* Check for all subscriptions without ATTACHING state */
channel.subscribe(message -> receivedMsg.add(true));
assertEquals(ChannelState.initialized, channel.state);

channel.subscribe("test_event", message -> receivedMsg.add(true));
assertEquals(ChannelState.initialized, channel.state);

channel.subscribe(new String[]{"test_event1", "test_event2"}, message -> receivedMsg.add(true));
assertEquals(ChannelState.initialized, channel.state);

channel.attach();
(new ChannelWaiter(channel)).waitFor(ChannelState.attached);

channel.publish("test_event", "hi there");
// Expecting two msg: one from the wildcard subscription and one from test_event subscription
Exception conditionError = new Helpers.ConditionalWaiter().
wait(() -> receivedMsg.size() == 2, 5000);
assertNull(conditionError);

receivedMsg.clear();
channel.publish("test_event1", "hi there");
// Expecting two msg: one from the wildcard subscription and one from test_event1 subscription
conditionError = new Helpers.ConditionalWaiter().
wait(() -> receivedMsg.size() == 2, 5000);
assertNull(conditionError);

receivedMsg.clear();
channel.publish("test_event2", "hi there");
// Expecting two msg: one from the wildcard subscription and one from test_event2 subscription
conditionError = new Helpers.ConditionalWaiter().
wait(() -> receivedMsg.size() == 2, 5000);
assertNull(conditionError);

} catch (AblyException e) {
e.printStackTrace();
fail("subscribe_without_implicit_attach: Unexpected exception");
} finally {
if(ably != null)
ably.close();
}
}

/**
* <p>
* Verifies that unsubscribe call with no argument removes all listeners,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1624,6 +1624,68 @@ public void onPresenceMessage(PresenceMessage message) {
}
}

/**
* <p>
* Validates a client can subscribe to presence without implicit channel attach
* Refer Spec TB4, RTP6d, RTP6e
* </p>
* @throws AblyException
*/
@Test
public void presence_subscribe_without_implicit_attach() {
String ablyChannel = "subscribe_" + testParams.name;
AblyRealtime ably = null;
try {
ClientOptions option1 = createOptions(testVars.keys[0].keyStr);
option1.clientId = "client1";
ably = new AblyRealtime(option1);

/* create a channel and set attachOnSubscribe to false */
final Channel channel = ably.channels.get(ablyChannel);
ChannelOptions chOpts = new ChannelOptions();
chOpts.attachOnSubscribe = false;
channel.setOptions(chOpts);

List<Boolean> receivedPresenceMsg = Collections.synchronizedList(new ArrayList<>());
CompletionWaiter completionWaiter = new CompletionWaiter();

/* Check for all subscriptions without ATTACHING state */
channel.presence.subscribe(m -> receivedPresenceMsg.add(true), completionWaiter);
assertEquals(1, completionWaiter.successCount);
assertEquals(ChannelState.initialized, channel.state);

channel.presence.subscribe(Action.enter, m -> receivedPresenceMsg.add(true), completionWaiter);
assertEquals(2, completionWaiter.successCount);
assertEquals(ChannelState.initialized, channel.state);

channel.presence.subscribe(EnumSet.of(Action.enter, Action.leave),m -> receivedPresenceMsg.add(true));
assertEquals(ChannelState.initialized, channel.state);

channel.attach();
(new ChannelWaiter(channel)).waitFor(ChannelState.attached);

channel.presence.enter("enter client1", null);
// Expecting 3 msg: one from the wildcard subscription and two from specific event subscription
Exception conditionError = new Helpers.ConditionalWaiter().
wait(() -> receivedPresenceMsg.size() == 3, 5000);
assertNull(conditionError);

receivedPresenceMsg.clear();
channel.presence.leave(null);
// Expecting 2 msg: one from the wildcard subscription and one from specific event subscription
conditionError = new Helpers.ConditionalWaiter().
wait(() -> receivedPresenceMsg.size() == 2, 5000);
assertNull(conditionError);

} catch (AblyException e) {
e.printStackTrace();
fail("presence_subscribe_without_implicit_attach: Unexpected exception");
} finally {
if(ably != null)
ably.close();
}
}

/**
* <p>
* Validates a client sending multiple presence updates when the channel is in the attaching
Expand Down

0 comments on commit 7a360ae

Please sign in to comment.