Skip to content

Commit

Permalink
Only queue publish and presence messages (TO3g, see also ably/specifi…
Browse files Browse the repository at this point in the history
  • Loading branch information
maratal committed Jun 30, 2024
1 parent 675f294 commit 16b3806
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 25 deletions.
36 changes: 24 additions & 12 deletions Source/ARTRealtime.m
Original file line number Diff line number Diff line change
Expand Up @@ -1275,22 +1275,34 @@ - (void)send:(ARTProtocolMessage *)msg reuseMsgSerial:(BOOL)reuseMsgSerial sentC
if ([self shouldSendEvents]) {
[self sendImpl:msg reuseMsgSerial:reuseMsgSerial sentCallback:sentCallback ackCallback:ackCallback];
}
else if ([self shouldQueueEvents]) {
ARTQueuedMessage *lastQueuedMessage = self.queuedMessages.lastObject; //RTL6d5
BOOL merged = [lastQueuedMessage mergeFrom:msg sentCallback:nil ackCallback:ackCallback];
if (!merged) {
ARTQueuedMessage *qm = [[ARTQueuedMessage alloc] initWithProtocolMessage:msg sentCallback:sentCallback ackCallback:ackCallback];
[self.queuedMessages addObject:qm];
ARTLogDebug(self.logger, @"RT:%p (channel: %@) protocol message with action '%lu - %@' has been queued (%@)", self, msg.channel, (unsigned long)msg.action, ARTProtocolMessageActionToStr(msg.action), msg.messages);
// see RTL6c2, RTN19, RTN7 and TO3g
else if (msg.ackRequired) {
if ([self shouldQueueEvents]) {
ARTQueuedMessage *lastQueuedMessage = self.queuedMessages.lastObject; //RTL6d5
BOOL merged = [lastQueuedMessage mergeFrom:msg sentCallback:nil ackCallback:ackCallback];
if (!merged) {
ARTQueuedMessage *qm = [[ARTQueuedMessage alloc] initWithProtocolMessage:msg sentCallback:sentCallback ackCallback:ackCallback];
[self.queuedMessages addObject:qm];
ARTLogDebug(self.logger, @"RT:%p (channel: %@) protocol message with action '%lu - %@' has been queued (%@)", self, msg.channel, (unsigned long)msg.action, ARTProtocolMessageActionToStr(msg.action), msg.messages);
}
else {
ARTLogVerbose(self.logger, @"RT:%p (channel: %@) message %@ has been bundled to %@", self, msg.channel, msg, lastQueuedMessage.msg);
}
}
// RTL6c4
else {
ARTLogVerbose(self.logger, @"RT:%p (channel: %@) message %@ has been bundled to %@", self, msg.channel, msg, lastQueuedMessage.msg);
ARTErrorInfo *error = self.connection.error_nosync;
ARTLogDebug(self.logger, @"RT:%p (channel: %@) protocol message with action '%lu - %@' can't be sent or queued: %@", self, msg.channel, (unsigned long)msg.action, ARTProtocolMessageActionToStr(msg.action), error);
if (sentCallback) {
sentCallback(error);
}
if (ackCallback) {
ackCallback([ARTStatus state:ARTStateError info:error]);
}
}
}
else if (ackCallback) {
ARTErrorInfo *error = self.connection.errorReason_nosync;
if (!error) error = [ARTErrorInfo createWithCode:ARTErrorChannelOperationFailed status:400 message:[NSString stringWithFormat:@"not possile to send message (state is %@)", ARTRealtimeConnectionStateToStr(self.connection.state_nosync)]];
ackCallback([ARTStatus state:ARTStateError info:error]);
else {
ARTLogDebug(self.logger, @"RT:%p (channel: %@) sending protocol message with action '%lu - %@' was ignored: %@", self, msg.channel, (unsigned long)msg.action, ARTProtocolMessageActionToStr(msg.action), self.connection.error_nosync);
}
}

Expand Down
10 changes: 7 additions & 3 deletions Test/Tests/RealtimeClientConnectionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2565,7 +2565,8 @@ class RealtimeClientConnectionTests: XCTestCase {
let client = AblyTests.newRealtime(options).client
defer { client.dispose(); client.close() }
let channel = client.channels.get(test.uniqueChannelName())

channel.attach()

XCTAssertTrue(client.waitUntilConnected())
let expectedConnectionId = client.connection.id

Expand Down Expand Up @@ -2601,7 +2602,9 @@ class RealtimeClientConnectionTests: XCTestCase {
defer { client.dispose(); client.close() }
let channel1 = client.channels.get(test.uniqueChannelName())
let channel2 = client.channels.get(test.uniqueChannelName(prefix: "second_"))

channel1.attach()
channel2.attach()

XCTAssertTrue(client.waitUntilConnected())
expect(channel1.state).toEventually(equal(ARTRealtimeChannelState.attached), timeout: testTimeout)
expect(channel2.state).toEventually(equal(ARTRealtimeChannelState.attached), timeout: testTimeout)
Expand Down Expand Up @@ -2642,7 +2645,8 @@ class RealtimeClientConnectionTests: XCTestCase {
let client = AblyTests.newRealtime(options).client
defer { client.dispose(); client.close() }
let channel = client.channels.get(test.uniqueChannelName())

channel.attach()

XCTAssertTrue(client.waitUntilConnected())
let expectedConnectionId = client.connection.id

Expand Down
20 changes: 10 additions & 10 deletions Test/Tests/RealtimeClientPresenceTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -647,12 +647,6 @@ class RealtimeClientPresenceTests: XCTestCase {

waitUntil(timeout: testTimeout) { done in
let partialDone = AblyTests.splitDone(2, done: done)
channel2.presence.enterClient("Client 2", data: nil) { error in
XCTAssertNil(error)
XCTAssertEqual(client2.internal.queuedMessages.count, 0)
XCTAssertEqual(channel2.state, ARTRealtimeChannelState.attached)
partialDone()
}
channel2.presence.subscribe(.enter) { _ in
if channel2.presence.syncComplete {
XCTAssertEqual(channel2.internal.presence.members.count, 2)
Expand All @@ -662,8 +656,13 @@ class RealtimeClientPresenceTests: XCTestCase {
channel2.presence.unsubscribe()
partialDone()
}

XCTAssertEqual(client2.internal.queuedMessages.count, 1)
channel2.presence.enterClient("Client 2", data: nil) { error in
XCTAssertNil(error)
XCTAssertEqual(client2.internal.queuedMessages.count, 0)
XCTAssertEqual(channel2.state, ARTRealtimeChannelState.attached)
partialDone()
}
XCTAssertEqual(channel2.internal.presence.pendingPresence.count, 1)
XCTAssertFalse(channel2.presence.syncComplete)
XCTAssertEqual(channel2.internal.presence.members.count, 0)
}
Expand Down Expand Up @@ -2997,7 +2996,7 @@ class RealtimeClientPresenceTests: XCTestCase {
done()
}
XCTAssertEqual(client.connection.state, ARTRealtimeConnectionState.connecting)
XCTAssertEqual(client.internal.queuedMessages.count, 1)
XCTAssertEqual(channel.internal.presence.pendingPresence.count, 1)
}
}

Expand Down Expand Up @@ -3071,7 +3070,7 @@ class RealtimeClientPresenceTests: XCTestCase {

waitUntil(timeout: testTimeout) { done in
channel.presence.enterClient("user", data: nil) { error in
XCTAssertEqual(error?.code, ARTErrorCode.invalidTransportHandle.intValue)
XCTAssertEqual(error?.code, ARTErrorCode.unableToEnterPresenceChannelInvalidState.intValue)
XCTAssertEqual(channel.presence.internal.pendingPresence.count, 0)
done()
}
Expand Down Expand Up @@ -3355,6 +3354,7 @@ class RealtimeClientPresenceTests: XCTestCase {
expect(client.connection.state).toEventually(equal(.closed), timeout: testTimeout)
}
let channel = client.channels.get(channelName)
channel.attach()
expect(channel.internal.presence.syncInProgress).toEventually(beTrue(), timeout: testTimeout)

waitUntil(timeout: testTimeout) { done in
Expand Down
4 changes: 4 additions & 0 deletions Test/Tests/RestClientChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ class RestClientChannelTests: XCTestCase {
let chanelName = test.uniqueChannelName(prefix: "ch1")

let subscriber = realtime.channels.get(chanelName)
subscriber.attach()
waitUntil(timeout: testTimeout) { done in
subscriber.once(.attached) { _ in
done()
Expand Down Expand Up @@ -423,6 +424,7 @@ class RestClientChannelTests: XCTestCase {
let chanelName = test.uniqueChannelName(prefix: "ch1")

let subscriber = realtime.channels.get(chanelName)
subscriber.attach()
waitUntil(timeout: testTimeout) { done in
subscriber.once(.attached) { _ in
done()
Expand Down Expand Up @@ -453,6 +455,7 @@ class RestClientChannelTests: XCTestCase {
let chanelName = test.uniqueChannelName(prefix: "ch1")

let subscriber = realtime.channels.get(chanelName)
subscriber.attach()
waitUntil(timeout: testTimeout) { done in
subscriber.once(.attached) { _ in
done()
Expand Down Expand Up @@ -484,6 +487,7 @@ class RestClientChannelTests: XCTestCase {
let chanelName = test.uniqueChannelName(prefix: "ch1")

let subscriber = realtime.channels.get(chanelName)
subscriber.attach()
waitUntil(timeout: testTimeout) { done in
subscriber.once(.attached) { _ in
done()
Expand Down

0 comments on commit 16b3806

Please sign in to comment.