Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ECO-4858] TO3g compliance #1941

Merged
merged 7 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 32 additions & 35 deletions Source/ARTRealtime.m
Original file line number Diff line number Diff line change
Expand Up @@ -742,18 +742,13 @@ - (void)performTransitionToState:(ARTRealtimeConnectionState)state withParams:(A
}

if ([self shouldSendEvents]) {
[self sendQueuedMessages];

// Channels
for (ARTRealtimeChannelInternal *channel in channels) {
if (stateChange.previous == ARTRealtimeInitialized ||
stateChange.previous == ARTRealtimeConnecting ||
stateChange.previous == ARTRealtimeDisconnected) {
// RTL4i
[channel _attach:nil];
}
ARTAttachRequestParams *const params = [[ARTAttachRequestParams alloc] initWithReason:stateChange.reason];
[channel proceedAttachDetachWithParams:params];
}
} else if (![self shouldQueueEvents]) {
[self sendQueuedMessages];
}
else if (!self.isActive) {
if (!channelStateChangeParams) {
if (stateChange.reason) {
channelStateChangeParams = [[ARTChannelStateChangeParams alloc] initWithState:ARTStateError
Expand Down Expand Up @@ -867,12 +862,6 @@ - (void)onConnected:(ARTProtocolMessage *)message {
else {
ARTLogWarn(self.logger, @"RT:%p connection \"%@\" has reconnected, but resume failed. Error: \"%@\"", self, message.connectionId, message.error.message);
}
// Reattach all channels regardless resume success - RTN15c6, RTN15c7
for (ARTRealtimeChannelInternal *channel in self.channels.nosyncIterable) {
ARTAttachRequestParams *const params = [[ARTAttachRequestParams alloc] initWithReason:message.error];
[channel reattachWithParams:params];
}
_resuming = false;
}
// If there's no previous connectionId, then don't reset the msgSerial
//as it may have been set by recover data (unless the recover failed).
Expand Down Expand Up @@ -1218,9 +1207,9 @@ - (BOOL)shouldSendEvents {
}
}

- (BOOL)shouldQueueEvents {
if(!self.options.queueMessages) {
return false;
- (BOOL)isActive {
if (self.shouldSendEvents) {
return true;
}
switch (self.connection.state_nosync) {
case ARTRealtimeInitialized:
Expand All @@ -1234,10 +1223,6 @@ - (BOOL)shouldQueueEvents {
}
}

- (BOOL)isActive {
return [self shouldQueueEvents] || [self shouldSendEvents];
}

- (void)sendImpl:(ARTProtocolMessage *)pm reuseMsgSerial:(BOOL)reuseMsgSerial sentCallback:(ARTCallback)sentCallback ackCallback:(ARTStatusCallback)ackCallback {
if (pm.ackRequired) {
if (!reuseMsgSerial) { // RTN19a2
Expand Down Expand Up @@ -1284,22 +1269,34 @@ - (void)send:(ARTProtocolMessage *)msg reuseMsgSerial:(BOOL)reuseMsgSerial sentC
if ([self shouldSendEvents]) {
[self sendImpl:msg reuseMsgSerial:reuseMsgSerial sentCallback:sentCallback ackCallback:ackCallback];
}
else if ([self shouldQueueEvents]) {
ARTQueuedMessage *lastQueuedMessage = self.queuedMessages.lastObject; //RTL6d5
BOOL merged = [lastQueuedMessage mergeFrom:msg sentCallback:nil ackCallback:ackCallback];
if (!merged) {
ARTQueuedMessage *qm = [[ARTQueuedMessage alloc] initWithProtocolMessage:msg sentCallback:sentCallback ackCallback:ackCallback];
[self.queuedMessages addObject:qm];
ARTLogDebug(self.logger, @"RT:%p (channel: %@) protocol message with action '%lu - %@' has been queued (%@)", self, msg.channel, (unsigned long)msg.action, ARTProtocolMessageActionToStr(msg.action), msg.messages);
// see RTL6c2, RTN19, RTN7 and TO3g
else if (msg.ackRequired) {
if (self.isActive && self.options.queueMessages) {
ARTQueuedMessage *lastQueuedMessage = self.queuedMessages.lastObject; //RTL6d5
BOOL merged = [lastQueuedMessage mergeFrom:msg sentCallback:nil ackCallback:ackCallback];
if (!merged) {
ARTQueuedMessage *qm = [[ARTQueuedMessage alloc] initWithProtocolMessage:msg sentCallback:sentCallback ackCallback:ackCallback];
[self.queuedMessages addObject:qm];
ARTLogDebug(self.logger, @"RT:%p (channel: %@) protocol message with action '%lu - %@' has been queued (%@)", self, msg.channel, (unsigned long)msg.action, ARTProtocolMessageActionToStr(msg.action), msg.messages);
}
else {
ARTLogVerbose(self.logger, @"RT:%p (channel: %@) message %@ has been bundled to %@", self, msg.channel, msg, lastQueuedMessage.msg);
}
}
// RTL6c4
else {
ARTLogVerbose(self.logger, @"RT:%p (channel: %@) message %@ has been bundled to %@", self, msg.channel, msg, lastQueuedMessage.msg);
ARTErrorInfo *error = self.connection.error_nosync;
ARTLogDebug(self.logger, @"RT:%p (channel: %@) protocol message with action '%lu - %@' can't be sent or queued: %@", self, msg.channel, (unsigned long)msg.action, ARTProtocolMessageActionToStr(msg.action), error);
if (sentCallback) {
sentCallback(error);
}
if (ackCallback) {
ackCallback([ARTStatus state:ARTStateError info:error]);
}
}
}
else if (ackCallback) {
ARTErrorInfo *error = self.connection.errorReason_nosync;
if (!error) error = [ARTErrorInfo createWithCode:ARTErrorChannelOperationFailed status:400 message:[NSString stringWithFormat:@"not possile to send message (state is %@)", ARTRealtimeConnectionStateToStr(self.connection.state_nosync)]];
ackCallback([ARTStatus state:ARTStateError info:error]);
else {
ARTLogDebug(self.logger, @"RT:%p (channel: %@) sending protocol message with action '%lu - %@' was ignored: %@", self, msg.channel, (unsigned long)msg.action, ARTProtocolMessageActionToStr(msg.action), self.connection.error_nosync);
}
}

Expand Down
65 changes: 38 additions & 27 deletions Source/ARTRealtimeChannel.m
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,15 @@ - (void)reattachWithParams:(ARTAttachRequestParams *)params {
}
}

- (void)proceedAttachDetachWithParams:(ARTAttachRequestParams *)params {
if (self.state_nosync == ARTChannelEventDetaching) {
ARTLogDebug(self.logger, @"RT:%p C:%p (%@) %@ proceeding with detach", _realtime, self, self.name, ARTRealtimeChannelStateToStr(self.state_nosync));
[self internalDetach:nil];
} else {
[self reattachWithParams:params];
}
}

- (void)internalAttach:(ARTCallback)callback withParams:(ARTAttachRequestParams *)params {
switch (self.state_nosync) {
case ARTRealtimeChannelDetaching: {
Expand All @@ -896,9 +905,11 @@ - (void)internalAttach:(ARTCallback)callback withParams:(ARTAttachRequestParams

if (callback) [_attachedEventEmitter once:callback];
// Set state: Attaching
const ARTState state = params.reason ? ARTStateError : ARTStateOk;
ARTChannelStateChangeParams *const stateChangeParams = [[ARTChannelStateChangeParams alloc] initWithState:state errorInfo:params.reason storeErrorInfo:NO retryAttempt:params.retryAttempt];
[self performTransitionToState:ARTRealtimeChannelAttaching withParams:stateChangeParams];
if (self.state_nosync != ARTRealtimeChannelAttaching) {
const ARTState state = params.reason ? ARTStateError : ARTStateOk;
ARTChannelStateChangeParams *const stateChangeParams = [[ARTChannelStateChangeParams alloc] initWithState:state errorInfo:params.reason storeErrorInfo:NO retryAttempt:params.retryAttempt];
[self performTransitionToState:ARTRealtimeChannelAttaching withParams:stateChangeParams];
}
[self attachAfterChecks];
}

Expand Down Expand Up @@ -949,17 +960,6 @@ - (void)_detach:(ARTCallback)callback {
ARTLogDebug(self.logger, @"RT:%p C:%p (%@) can't detach when not attached", _realtime, self, self.name);
if (callback) callback(nil);
return;
case ARTRealtimeChannelAttaching: {
ARTLogDebug(self.logger, @"RT:%p C:%p (%@) waiting for the completion of the attaching operation", _realtime, self, self.name);
[_attachedEventEmitter once:^(ARTErrorInfo *errorInfo) {
if (callback && errorInfo) {
callback(errorInfo);
return;
}
[self _detach:callback];
}];
return;
}
case ARTRealtimeChannelDetaching:
ARTLogDebug(self.logger, @"RT:%p C:%p (%@) already detaching", _realtime, self, self.name);
if (callback) [_detachedEventEmitter once:callback];
Expand All @@ -982,6 +982,27 @@ - (void)_detach:(ARTCallback)callback {
default:
break;
}
[self internalDetach:callback];
}

- (void)internalDetach:(ARTCallback)callback {
switch (self.state_nosync) {
case ARTRealtimeChannelAttaching: {
ARTLogDebug(self.logger, @"RT:%p C:%p (%@) waiting for the completion of the attaching operation", _realtime, self, self.name);
[_attachedEventEmitter once:^(ARTErrorInfo *errorInfo) {
if (callback && errorInfo) {
callback(errorInfo);
return;
}
[self _detach:callback];
}];
return;
}
default:
break;
}

_errorReason = nil;

if (![self.realtime isActive]) {
ARTLogDebug(self.logger, @"RT:%p C:%p (%@) can't detach when not in an active state", _realtime, self, self.name);
Expand All @@ -994,10 +1015,10 @@ - (void)_detach:(ARTCallback)callback {
ARTChannelStateChangeParams *const params = [[ARTChannelStateChangeParams alloc] initWithState:ARTStateOk];
[self performTransitionToState:ARTRealtimeChannelDetaching withParams:params];

[self detachAfterChecks:callback];
[self detachAfterChecks];
}

- (void)detachAfterChecks:(ARTCallback)callback {
- (void)detachAfterChecks {
ARTProtocolMessage *detachMessage = [[ARTProtocolMessage alloc] init];
detachMessage.action = ARTProtocolMessageDetach;
detachMessage.channel = self.name;
Expand All @@ -1015,17 +1036,7 @@ - (void)detachAfterChecks:(ARTCallback)callback {
[self performTransitionToState:ARTRealtimeChannelAttached withParams:params];
[self->_detachedEventEmitter emit:nil with:errorInfo];
}] startTimer];

if (![self.realtime shouldQueueEvents]) {
ARTEventListener *reconnectedListener = [self.realtime.connectedEventEmitter once:^(NSNull *n) {
// Disconnected and connected while detaching, re-detach.
[self detachAfterChecks:callback];
}];
[_detachedEventEmitter once:^(ARTErrorInfo *err) {
[self.realtime.connectedEventEmitter off:reconnectedListener];
}];
}


if (self.presence.syncInProgress_nosync) {
[self.presence failsSync:[ARTErrorInfo createWithCode:ARTErrorChannelOperationFailed message:@"channel is being DETACHED"]];
}
Expand Down
1 change: 0 additions & 1 deletion Source/PrivateHeaders/Ably/ARTRealtime+Private.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ NS_ASSUME_NONNULL_BEGIN

// State properties
- (BOOL)shouldSendEvents;
- (BOOL)shouldQueueEvents;

// Message sending
- (void)sendQueuedMessages;
Expand Down
2 changes: 1 addition & 1 deletion Source/PrivateHeaders/Ably/ARTRealtimeChannel+Private.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ NS_ASSUME_NONNULL_BEGIN

- (instancetype)initWithRealtime:(ARTRealtimeInternal *)realtime andName:(NSString *)name withOptions:(ARTRealtimeChannelOptions *)options logger:(ARTInternalLog *)logger;

- (void)reattachWithParams:(ARTAttachRequestParams *)params;
- (void)proceedAttachDetachWithParams:(ARTAttachRequestParams *)params;

- (void)_attach:(nullable ARTCallback)callback;
- (void)_detach:(nullable ARTCallback)callback;
Expand Down
31 changes: 13 additions & 18 deletions Test/Tests/RealtimeClientChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ class RealtimeClientChannelTests: XCTestCase {
XCTAssertFalse(stateChange.resumed)
}
}
channel.attach()
channel.publish(nil, data: "A message")
}

Expand All @@ -583,6 +584,7 @@ class RealtimeClientChannelTests: XCTestCase {
XCTAssertFalse(stateChange.resumed)
}
}
recoveredChannel.attach()
}
}

Expand Down Expand Up @@ -712,7 +714,6 @@ class RealtimeClientChannelTests: XCTestCase {
done()
}
}
expect(channel.state).toEventually(equal(.attached), timeout: testTimeout)
}

// TO3g and https://github.com/ably/ably-cocoa/issues/1004
Expand All @@ -727,28 +728,20 @@ class RealtimeClientChannelTests: XCTestCase {
defer { client.dispose(); client.close() }
client.internal.setReachabilityClass(TestReachability.self)
let channel = client.channels.get(test.uniqueChannelName())

waitUntil(timeout: testTimeout) { done in
client.connection.once(.connected) { _ in
done()
}
client.connect()
}


client.connect()
expect(client.connection.state).toEventually(equal(.connected), timeout: testTimeout)

channel.attach()
expect(channel.state).toEventually(equal(.attached), timeout: testTimeout)

waitUntil(timeout: testTimeout) { done in
channel.publish(nil, data: "message") { error in
XCTAssertNil(error)
done()
}
}

XCTAssertEqual(channel.state, .attached)
channel.on { stateChange in
if stateChange.current != .attached {
fail("Channel state should not change")
}
}

waitUntil(timeout: testTimeout) { done in
client.connection.once(.disconnected) { stateChange in
expect(stateChange.reason?.message).to(satisfyAnyOf(contain("unreachable host"), contain("network is down")))
Expand All @@ -766,7 +759,7 @@ class RealtimeClientChannelTests: XCTestCase {
}

channel.off()
XCTAssertEqual(channel.state, .attached)
expect(channel.state).toEventually(equal(ARTRealtimeChannelState.attached), timeout: testTimeout)
}

// RTL3b
Expand Down Expand Up @@ -1216,8 +1209,8 @@ class RealtimeClientChannelTests: XCTestCase {
XCTAssertNil(stateChange.reason)
done()
}

client.connect()
channel.attach()
}
XCTAssertEqual(channel.state, .attached)
}
Expand Down Expand Up @@ -2868,6 +2861,7 @@ class RealtimeClientChannelTests: XCTestCase {
XCTAssertNil(stateChange.reason)
done()
}
channel.attach()
}

let expectationEvent0 = XCTestExpectation(description: "event0")
Expand Down Expand Up @@ -4614,6 +4608,7 @@ class RealtimeClientChannelTests: XCTestCase {
partialDone()
}
}
channel.attach()
}

waitUntil(timeout: testTimeout) { done in
Expand Down
10 changes: 7 additions & 3 deletions Test/Tests/RealtimeClientConnectionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2565,7 +2565,8 @@ class RealtimeClientConnectionTests: XCTestCase {
let client = AblyTests.newRealtime(options).client
defer { client.dispose(); client.close() }
let channel = client.channels.get(test.uniqueChannelName())

channel.attach()

XCTAssertTrue(client.waitUntilConnected())
let expectedConnectionId = client.connection.id

Expand Down Expand Up @@ -2601,7 +2602,9 @@ class RealtimeClientConnectionTests: XCTestCase {
defer { client.dispose(); client.close() }
let channel1 = client.channels.get(test.uniqueChannelName())
let channel2 = client.channels.get(test.uniqueChannelName(prefix: "second_"))

channel1.attach()
channel2.attach()

XCTAssertTrue(client.waitUntilConnected())
expect(channel1.state).toEventually(equal(ARTRealtimeChannelState.attached), timeout: testTimeout)
expect(channel2.state).toEventually(equal(ARTRealtimeChannelState.attached), timeout: testTimeout)
Expand Down Expand Up @@ -2642,7 +2645,8 @@ class RealtimeClientConnectionTests: XCTestCase {
let client = AblyTests.newRealtime(options).client
defer { client.dispose(); client.close() }
let channel = client.channels.get(test.uniqueChannelName())

channel.attach()

XCTAssertTrue(client.waitUntilConnected())
let expectedConnectionId = client.connection.id

Expand Down
Loading
Loading