Skip to content

Commit

Permalink
Merge pull request #1941 from ably/fix/1940-TO3g-violation
Browse files Browse the repository at this point in the history
[ECO-4858] TO3g compliance
  • Loading branch information
maratal authored Jul 3, 2024
2 parents da113e8 + 3fbd977 commit d1458ad
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 95 deletions.
67 changes: 32 additions & 35 deletions Source/ARTRealtime.m
Original file line number Diff line number Diff line change
Expand Up @@ -742,18 +742,13 @@ - (void)performTransitionToState:(ARTRealtimeConnectionState)state withParams:(A
}

if ([self shouldSendEvents]) {
[self sendQueuedMessages];

// Channels
for (ARTRealtimeChannelInternal *channel in channels) {
if (stateChange.previous == ARTRealtimeInitialized ||
stateChange.previous == ARTRealtimeConnecting ||
stateChange.previous == ARTRealtimeDisconnected) {
// RTL4i
[channel _attach:nil];
}
ARTAttachRequestParams *const params = [[ARTAttachRequestParams alloc] initWithReason:stateChange.reason];
[channel proceedAttachDetachWithParams:params];
}
} else if (![self shouldQueueEvents]) {
[self sendQueuedMessages];
}
else if (!self.isActive) {
if (!channelStateChangeParams) {
if (stateChange.reason) {
channelStateChangeParams = [[ARTChannelStateChangeParams alloc] initWithState:ARTStateError
Expand Down Expand Up @@ -867,12 +862,6 @@ - (void)onConnected:(ARTProtocolMessage *)message {
else {
ARTLogWarn(self.logger, @"RT:%p connection \"%@\" has reconnected, but resume failed. Error: \"%@\"", self, message.connectionId, message.error.message);
}
// Reattach all channels regardless resume success - RTN15c6, RTN15c7
for (ARTRealtimeChannelInternal *channel in self.channels.nosyncIterable) {
ARTAttachRequestParams *const params = [[ARTAttachRequestParams alloc] initWithReason:message.error];
[channel reattachWithParams:params];
}
_resuming = false;
}
// If there's no previous connectionId, then don't reset the msgSerial
//as it may have been set by recover data (unless the recover failed).
Expand Down Expand Up @@ -1218,9 +1207,9 @@ - (BOOL)shouldSendEvents {
}
}

- (BOOL)shouldQueueEvents {
if(!self.options.queueMessages) {
return false;
- (BOOL)isActive {
if (self.shouldSendEvents) {
return true;
}
switch (self.connection.state_nosync) {
case ARTRealtimeInitialized:
Expand All @@ -1234,10 +1223,6 @@ - (BOOL)shouldQueueEvents {
}
}

- (BOOL)isActive {
return [self shouldQueueEvents] || [self shouldSendEvents];
}

- (void)sendImpl:(ARTProtocolMessage *)pm reuseMsgSerial:(BOOL)reuseMsgSerial sentCallback:(ARTCallback)sentCallback ackCallback:(ARTStatusCallback)ackCallback {
if (pm.ackRequired) {
if (!reuseMsgSerial) { // RTN19a2
Expand Down Expand Up @@ -1284,22 +1269,34 @@ - (void)send:(ARTProtocolMessage *)msg reuseMsgSerial:(BOOL)reuseMsgSerial sentC
if ([self shouldSendEvents]) {
[self sendImpl:msg reuseMsgSerial:reuseMsgSerial sentCallback:sentCallback ackCallback:ackCallback];
}
else if ([self shouldQueueEvents]) {
ARTQueuedMessage *lastQueuedMessage = self.queuedMessages.lastObject; //RTL6d5
BOOL merged = [lastQueuedMessage mergeFrom:msg sentCallback:nil ackCallback:ackCallback];
if (!merged) {
ARTQueuedMessage *qm = [[ARTQueuedMessage alloc] initWithProtocolMessage:msg sentCallback:sentCallback ackCallback:ackCallback];
[self.queuedMessages addObject:qm];
ARTLogDebug(self.logger, @"RT:%p (channel: %@) protocol message with action '%lu - %@' has been queued (%@)", self, msg.channel, (unsigned long)msg.action, ARTProtocolMessageActionToStr(msg.action), msg.messages);
// see RTL6c2, RTN19, RTN7 and TO3g
else if (msg.ackRequired) {
if (self.isActive && self.options.queueMessages) {
ARTQueuedMessage *lastQueuedMessage = self.queuedMessages.lastObject; //RTL6d5
BOOL merged = [lastQueuedMessage mergeFrom:msg sentCallback:nil ackCallback:ackCallback];
if (!merged) {
ARTQueuedMessage *qm = [[ARTQueuedMessage alloc] initWithProtocolMessage:msg sentCallback:sentCallback ackCallback:ackCallback];
[self.queuedMessages addObject:qm];
ARTLogDebug(self.logger, @"RT:%p (channel: %@) protocol message with action '%lu - %@' has been queued (%@)", self, msg.channel, (unsigned long)msg.action, ARTProtocolMessageActionToStr(msg.action), msg.messages);
}
else {
ARTLogVerbose(self.logger, @"RT:%p (channel: %@) message %@ has been bundled to %@", self, msg.channel, msg, lastQueuedMessage.msg);
}
}
// RTL6c4
else {
ARTLogVerbose(self.logger, @"RT:%p (channel: %@) message %@ has been bundled to %@", self, msg.channel, msg, lastQueuedMessage.msg);
ARTErrorInfo *error = self.connection.error_nosync;
ARTLogDebug(self.logger, @"RT:%p (channel: %@) protocol message with action '%lu - %@' can't be sent or queued: %@", self, msg.channel, (unsigned long)msg.action, ARTProtocolMessageActionToStr(msg.action), error);
if (sentCallback) {
sentCallback(error);
}
if (ackCallback) {
ackCallback([ARTStatus state:ARTStateError info:error]);
}
}
}
else if (ackCallback) {
ARTErrorInfo *error = self.connection.errorReason_nosync;
if (!error) error = [ARTErrorInfo createWithCode:ARTErrorChannelOperationFailed status:400 message:[NSString stringWithFormat:@"not possile to send message (state is %@)", ARTRealtimeConnectionStateToStr(self.connection.state_nosync)]];
ackCallback([ARTStatus state:ARTStateError info:error]);
else {
ARTLogDebug(self.logger, @"RT:%p (channel: %@) sending protocol message with action '%lu - %@' was ignored: %@", self, msg.channel, (unsigned long)msg.action, ARTProtocolMessageActionToStr(msg.action), self.connection.error_nosync);
}
}

Expand Down
65 changes: 38 additions & 27 deletions Source/ARTRealtimeChannel.m
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,15 @@ - (void)reattachWithParams:(ARTAttachRequestParams *)params {
}
}

- (void)proceedAttachDetachWithParams:(ARTAttachRequestParams *)params {
if (self.state_nosync == ARTChannelEventDetaching) {
ARTLogDebug(self.logger, @"RT:%p C:%p (%@) %@ proceeding with detach", _realtime, self, self.name, ARTRealtimeChannelStateToStr(self.state_nosync));
[self internalDetach:nil];
} else {
[self reattachWithParams:params];
}
}

- (void)internalAttach:(ARTCallback)callback withParams:(ARTAttachRequestParams *)params {
switch (self.state_nosync) {
case ARTRealtimeChannelDetaching: {
Expand All @@ -907,9 +916,11 @@ - (void)internalAttach:(ARTCallback)callback withParams:(ARTAttachRequestParams

if (callback) [_attachedEventEmitter once:callback];
// Set state: Attaching
const ARTState state = params.reason ? ARTStateError : ARTStateOk;
ARTChannelStateChangeParams *const stateChangeParams = [[ARTChannelStateChangeParams alloc] initWithState:state errorInfo:params.reason storeErrorInfo:NO retryAttempt:params.retryAttempt];
[self performTransitionToState:ARTRealtimeChannelAttaching withParams:stateChangeParams];
if (self.state_nosync != ARTRealtimeChannelAttaching) {
const ARTState state = params.reason ? ARTStateError : ARTStateOk;
ARTChannelStateChangeParams *const stateChangeParams = [[ARTChannelStateChangeParams alloc] initWithState:state errorInfo:params.reason storeErrorInfo:NO retryAttempt:params.retryAttempt];
[self performTransitionToState:ARTRealtimeChannelAttaching withParams:stateChangeParams];
}
[self attachAfterChecks];
}

Expand Down Expand Up @@ -960,17 +971,6 @@ - (void)_detach:(ARTCallback)callback {
ARTLogDebug(self.logger, @"RT:%p C:%p (%@) can't detach when not attached", _realtime, self, self.name);
if (callback) callback(nil);
return;
case ARTRealtimeChannelAttaching: {
ARTLogDebug(self.logger, @"RT:%p C:%p (%@) waiting for the completion of the attaching operation", _realtime, self, self.name);
[_attachedEventEmitter once:^(ARTErrorInfo *errorInfo) {
if (callback && errorInfo) {
callback(errorInfo);
return;
}
[self _detach:callback];
}];
return;
}
case ARTRealtimeChannelDetaching:
ARTLogDebug(self.logger, @"RT:%p C:%p (%@) already detaching", _realtime, self, self.name);
if (callback) [_detachedEventEmitter once:callback];
Expand All @@ -993,6 +993,27 @@ - (void)_detach:(ARTCallback)callback {
default:
break;
}
[self internalDetach:callback];
}

- (void)internalDetach:(ARTCallback)callback {
switch (self.state_nosync) {
case ARTRealtimeChannelAttaching: {
ARTLogDebug(self.logger, @"RT:%p C:%p (%@) waiting for the completion of the attaching operation", _realtime, self, self.name);
[_attachedEventEmitter once:^(ARTErrorInfo *errorInfo) {
if (callback && errorInfo) {
callback(errorInfo);
return;
}
[self _detach:callback];
}];
return;
}
default:
break;
}

_errorReason = nil;

if (![self.realtime isActive]) {
ARTLogDebug(self.logger, @"RT:%p C:%p (%@) can't detach when not in an active state", _realtime, self, self.name);
Expand All @@ -1005,10 +1026,10 @@ - (void)_detach:(ARTCallback)callback {
ARTChannelStateChangeParams *const params = [[ARTChannelStateChangeParams alloc] initWithState:ARTStateOk];
[self performTransitionToState:ARTRealtimeChannelDetaching withParams:params];

[self detachAfterChecks:callback];
[self detachAfterChecks];
}

- (void)detachAfterChecks:(ARTCallback)callback {
- (void)detachAfterChecks {
ARTProtocolMessage *detachMessage = [[ARTProtocolMessage alloc] init];
detachMessage.action = ARTProtocolMessageDetach;
detachMessage.channel = self.name;
Expand All @@ -1026,17 +1047,7 @@ - (void)detachAfterChecks:(ARTCallback)callback {
[self performTransitionToState:ARTRealtimeChannelAttached withParams:params];
[self->_detachedEventEmitter emit:nil with:errorInfo];
}] startTimer];

if (![self.realtime shouldQueueEvents]) {
ARTEventListener *reconnectedListener = [self.realtime.connectedEventEmitter once:^(NSNull *n) {
// Disconnected and connected while detaching, re-detach.
[self detachAfterChecks:callback];
}];
[_detachedEventEmitter once:^(ARTErrorInfo *err) {
[self.realtime.connectedEventEmitter off:reconnectedListener];
}];
}


if (self.presence.syncInProgress_nosync) {
[self.presence failsSync:[ARTErrorInfo createWithCode:ARTErrorChannelOperationFailed message:@"channel is being DETACHED"]];
}
Expand Down
1 change: 0 additions & 1 deletion Source/PrivateHeaders/Ably/ARTRealtime+Private.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ NS_ASSUME_NONNULL_BEGIN

// State properties
- (BOOL)shouldSendEvents;
- (BOOL)shouldQueueEvents;

// Message sending
- (void)sendQueuedMessages;
Expand Down
2 changes: 1 addition & 1 deletion Source/PrivateHeaders/Ably/ARTRealtimeChannel+Private.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ NS_ASSUME_NONNULL_BEGIN

- (instancetype)initWithRealtime:(ARTRealtimeInternal *)realtime andName:(NSString *)name withOptions:(ARTRealtimeChannelOptions *)options logger:(ARTInternalLog *)logger;

- (void)reattachWithParams:(ARTAttachRequestParams *)params;
- (void)proceedAttachDetachWithParams:(ARTAttachRequestParams *)params;

- (void)_attach:(nullable ARTCallback)callback;
- (void)_detach:(nullable ARTCallback)callback;
Expand Down
31 changes: 13 additions & 18 deletions Test/Tests/RealtimeClientChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ class RealtimeClientChannelTests: XCTestCase {
XCTAssertFalse(stateChange.resumed)
}
}
channel.attach()
channel.publish(nil, data: "A message")
}

Expand All @@ -583,6 +584,7 @@ class RealtimeClientChannelTests: XCTestCase {
XCTAssertFalse(stateChange.resumed)
}
}
recoveredChannel.attach()
}
}

Expand Down Expand Up @@ -712,7 +714,6 @@ class RealtimeClientChannelTests: XCTestCase {
done()
}
}
expect(channel.state).toEventually(equal(.attached), timeout: testTimeout)
}

// TO3g and https://github.com/ably/ably-cocoa/issues/1004
Expand All @@ -727,28 +728,20 @@ class RealtimeClientChannelTests: XCTestCase {
defer { client.dispose(); client.close() }
client.internal.setReachabilityClass(TestReachability.self)
let channel = client.channels.get(test.uniqueChannelName())

waitUntil(timeout: testTimeout) { done in
client.connection.once(.connected) { _ in
done()
}
client.connect()
}


client.connect()
expect(client.connection.state).toEventually(equal(.connected), timeout: testTimeout)

channel.attach()
expect(channel.state).toEventually(equal(.attached), timeout: testTimeout)

waitUntil(timeout: testTimeout) { done in
channel.publish(nil, data: "message") { error in
XCTAssertNil(error)
done()
}
}

XCTAssertEqual(channel.state, .attached)
channel.on { stateChange in
if stateChange.current != .attached {
fail("Channel state should not change")
}
}

waitUntil(timeout: testTimeout) { done in
client.connection.once(.disconnected) { stateChange in
expect(stateChange.reason?.message).to(satisfyAnyOf(contain("unreachable host"), contain("network is down")))
Expand All @@ -766,7 +759,7 @@ class RealtimeClientChannelTests: XCTestCase {
}

channel.off()
XCTAssertEqual(channel.state, .attached)
expect(channel.state).toEventually(equal(ARTRealtimeChannelState.attached), timeout: testTimeout)
}

// RTL3b
Expand Down Expand Up @@ -1216,8 +1209,8 @@ class RealtimeClientChannelTests: XCTestCase {
XCTAssertNil(stateChange.reason)
done()
}

client.connect()
channel.attach()
}
XCTAssertEqual(channel.state, .attached)
}
Expand Down Expand Up @@ -2868,6 +2861,7 @@ class RealtimeClientChannelTests: XCTestCase {
XCTAssertNil(stateChange.reason)
done()
}
channel.attach()
}

let expectationEvent0 = XCTestExpectation(description: "event0")
Expand Down Expand Up @@ -4614,6 +4608,7 @@ class RealtimeClientChannelTests: XCTestCase {
partialDone()
}
}
channel.attach()
}

waitUntil(timeout: testTimeout) { done in
Expand Down
10 changes: 7 additions & 3 deletions Test/Tests/RealtimeClientConnectionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2565,7 +2565,8 @@ class RealtimeClientConnectionTests: XCTestCase {
let client = AblyTests.newRealtime(options).client
defer { client.dispose(); client.close() }
let channel = client.channels.get(test.uniqueChannelName())

channel.attach()

XCTAssertTrue(client.waitUntilConnected())
let expectedConnectionId = client.connection.id

Expand Down Expand Up @@ -2601,7 +2602,9 @@ class RealtimeClientConnectionTests: XCTestCase {
defer { client.dispose(); client.close() }
let channel1 = client.channels.get(test.uniqueChannelName())
let channel2 = client.channels.get(test.uniqueChannelName(prefix: "second_"))

channel1.attach()
channel2.attach()

XCTAssertTrue(client.waitUntilConnected())
expect(channel1.state).toEventually(equal(ARTRealtimeChannelState.attached), timeout: testTimeout)
expect(channel2.state).toEventually(equal(ARTRealtimeChannelState.attached), timeout: testTimeout)
Expand Down Expand Up @@ -2642,7 +2645,8 @@ class RealtimeClientConnectionTests: XCTestCase {
let client = AblyTests.newRealtime(options).client
defer { client.dispose(); client.close() }
let channel = client.channels.get(test.uniqueChannelName())

channel.attach()

XCTAssertTrue(client.waitUntilConnected())
let expectedConnectionId = client.connection.id

Expand Down
Loading

0 comments on commit d1458ad

Please sign in to comment.