Skip to content

Commit

Permalink
Fix: Realtime transport should fire events on a serial queue (#578)
Browse files Browse the repository at this point in the history
* Realtime: transport should fire events on a serial queue

* Rest: should fire events on background queues

* EventEmitter: accept a custom queue

* Specs: call dispose and other enhancements

* Fix: Timeouts are doubles, not integers

* Fix: Realtime should handle internal events first

* Fix: store immediately presence members in the presence map

* Fix: defer until next event loop execution using a CFRunLoopObserverRef observing the kCFRunLoopExit

* Fix: avoid run loop's traditional dispatching machinery

 - could get corrupted by other application code, i.e. the test suite
blocks the current runloop. Using a `dispatch_semaphore` instead.

* Add test

* Test suite: timings

* Enhance: RTN5, better reliability
  • Loading branch information
ricardopereira authored Feb 5, 2017
1 parent ea83396 commit f81703c
Show file tree
Hide file tree
Showing 19 changed files with 244 additions and 147 deletions.
18 changes: 18 additions & 0 deletions Examples/Tests/TestsTests/TestsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,23 @@ class TestsTests: XCTestCase {
client.channels.get("test").publish(nil, data: "Get this!")

self.waitForExpectationsWithTimeout(10, handler: nil)

let backgroundRealtimeExpectation = self.expectationWithDescription("Realtime in a Background Queue")
NSURLSession.sharedSession().dataTaskWithURL(NSURL(string:"https://ably.io")!) { _ in
let realtime = ARTRealtime(key: key as String)
realtime.channels.get("foo").attach { _ in
defer { backgroundRealtimeExpectation.fulfill() }
}
}.resume()
self.waitForExpectationsWithTimeout(10, handler: nil)

let backgroundRestExpectation = self.expectationWithDescription("Rest in a Background Queue")
NSURLSession.sharedSession().dataTaskWithURL(NSURL(string:"https://ably.io")!) { _ in
let rest = ARTRest(key: key as String)
rest.channels.get("foo").history { _ in
defer { backgroundRestExpectation.fulfill() }
}
}.resume()
self.waitForExpectationsWithTimeout(10, handler: nil)
}
}
7 changes: 5 additions & 2 deletions Source/ARTConnection.m
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ @interface ARTConnection ()

@end

@implementation ARTConnection
@implementation ARTConnection {
_Nonnull dispatch_queue_t _queue;
}

- (instancetype)initWithRealtime:(ARTRealtime *)realtime {
if (self == [super init]) {
_eventEmitter = [[ARTEventEmitter alloc] init];
_queue = dispatch_queue_create("io.ably.realtime.connection", DISPATCH_QUEUE_SERIAL);
_eventEmitter = [[ARTEventEmitter alloc] initWithQueue:_queue];
_realtime = realtime;
_serial = -1;
}
Expand Down
4 changes: 2 additions & 2 deletions Source/ARTDefault.m
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ @implementation ARTDefault
NSString *const ARTDefault_libraryVersion = @"0.8.9";
NSString *const ARTDefault_platform = @"ios-";

static int _realtimeRequestTimeout = 10.0;
static int _connectionStateTtl = 60.0;
static NSTimeInterval _realtimeRequestTimeout = 10.0;
static NSTimeInterval _connectionStateTtl = 60.0;

+ (NSArray*)fallbackHosts {
return @[@"a.ably-realtime.com", @"b.ably-realtime.com", @"c.ably-realtime.com", @"d.ably-realtime.com", @"e.ably-realtime.com"];
Expand Down
3 changes: 3 additions & 0 deletions Source/ARTEventEmitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ ART_ASSUME_NONNULL_BEGIN

@interface __GENERIC(ARTEventEmitter, EventType, ItemType) : NSObject

- (instancetype)init;
- (instancetype)initWithQueue:(dispatch_queue_t)queue;

- (__GENERIC(ARTEventListener, ItemType) *)on:(EventType)event callback:(void (^)(ItemType __art_nullable))cb;
- (__GENERIC(ARTEventListener, ItemType) *)on:(void (^)(ItemType __art_nullable))cb;

Expand Down
45 changes: 25 additions & 20 deletions Source/ARTEventEmitter.m
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,23 @@ - (void)artRemoveWhere:(BOOL (^)(id))cond {

@interface ARTEventListener ()

- (instancetype)initWithBlock:(void (^)(id __art_nonnull))block;
- (instancetype)initWithBlock:(void (^)(id __art_nonnull))block queue:(dispatch_queue_t)queue;
- (void)setTimerWithDeadline:(NSTimeInterval)deadline onTimeout:(void (^)())onTimeout;
- (void)off;

@end

@implementation ARTEventListener {
void (^_block)(id __art_nonnull);
CFRunLoopTimerRef _timer;
_Nonnull dispatch_queue_t _queue;
_Nullable dispatch_block_t _timerBlock;
}

- (instancetype)initWithBlock:(void (^)(id __art_nonnull))block {
- (instancetype)initWithBlock:(void (^)(id __art_nonnull))block queue:(dispatch_queue_t)queue {
self = [self init];
if (self) {
_block = block;
_queue = queue;
}
return self;
}
Expand All @@ -55,19 +57,15 @@ - (void)call:(id)argument {

- (void)setTimerWithDeadline:(NSTimeInterval)deadline onTimeout:(void (^)())onTimeout {
[self cancelTimer];
CFAbsoluteTime timeoutDate = CFAbsoluteTimeGetCurrent() + deadline;

CFRunLoopRef rl = CFRunLoopGetCurrent();
_timer = CFRunLoopTimerCreateWithHandler(kCFAllocatorDefault, timeoutDate, 0, 0, 0, onTimeout);
CFRunLoopAddTimer(rl, _timer, kCFRunLoopDefaultMode);
_timerBlock = dispatch_block_create(0, ^{
onTimeout();
});
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, deadline * NSEC_PER_SEC), _queue, _timerBlock);
}

- (void)cancelTimer {
if (_timer) {
CFRunLoopTimerInvalidate(_timer);
CFRunLoopRemoveTimer(CFRunLoopGetCurrent(), _timer, kCFRunLoopDefaultMode);
CFRelease(_timer);
_timer = nil;
if (_timerBlock) {
dispatch_block_cancel(_timerBlock);
}
}

Expand All @@ -79,7 +77,7 @@ - (void)off {

@implementation ARTEventEmitterEntry

-(instancetype)initWithListener:(ARTEventListener *)listener once:(BOOL)once {
- (instancetype)initWithListener:(ARTEventListener *)listener once:(BOOL)once {
self = [self init];
if (self) {
_listener = listener;
Expand All @@ -97,23 +95,31 @@ - (BOOL)isEqual:(id)object {

@end

@implementation ARTEventEmitter
@implementation ARTEventEmitter {
_Nonnull dispatch_queue_t _queue;
}

- (instancetype)init {
return [self initWithQueue:dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0)];
}

- (instancetype)initWithQueue:(dispatch_queue_t)queue {
self = [super init];
if (self) {
_queue = queue;
[self resetListeners];
}
return self;
}

- (ARTEventListener *)on:(id)event callback:(void (^)(id __art_nonnull))cb {
ARTEventListener *listener = [[ARTEventListener alloc] initWithBlock:cb];
ARTEventListener *listener = [[ARTEventListener alloc] initWithBlock:cb queue:_queue];
[self addOnEntry:[[ARTEventEmitterEntry alloc] initWithListener:listener once:false] event:event];
return listener;
}

- (ARTEventListener *)once:(id)event callback:(void (^)(id __art_nonnull))cb {
ARTEventListener *listener = [[ARTEventListener alloc] initWithBlock:cb];
ARTEventListener *listener = [[ARTEventListener alloc] initWithBlock:cb queue:_queue];
[self addOnEntry:[[ARTEventEmitterEntry alloc] initWithListener:listener once:true] event:event];
return listener;
}
Expand All @@ -123,13 +129,13 @@ - (void)addOnEntry:(ARTEventEmitterEntry *)entry event:(id)event {
}

- (ARTEventListener *)on:(void (^)(id __art_nonnull))cb {
ARTEventListener *listener = [[ARTEventListener alloc] initWithBlock:cb];
ARTEventListener *listener = [[ARTEventListener alloc] initWithBlock:cb queue:_queue];
[self addOnAllEntry:[[ARTEventEmitterEntry alloc] initWithListener:listener once:false]];
return listener;
}

- (ARTEventListener *)once:(void (^)(id __art_nonnull))cb {
ARTEventListener *listener = [[ARTEventListener alloc] initWithBlock:cb];
ARTEventListener *listener = [[ARTEventListener alloc] initWithBlock:cb queue:_queue];
[self addOnAllEntry:[[ARTEventEmitterEntry alloc] initWithListener:listener once:true]];
return listener;
}
Expand Down Expand Up @@ -182,7 +188,6 @@ - (void)emit:(id)event with:(id)data {
if (entry.once) {
[toRemoveFromListeners addObject:entry];
}

[toCall addObject:entry];
}

Expand Down
3 changes: 0 additions & 3 deletions Source/ARTHttp.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ ART_ASSUME_NONNULL_BEGIN
@end

@interface ARTHttp : NSObject<ARTHTTPExecutor>
{

}

@property (nonatomic, weak) ARTLog *logger;

Expand Down
23 changes: 10 additions & 13 deletions Source/ARTHttp.m
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,14 @@ - (void)cancel {

#pragma mark - ARTHttp

@implementation ARTHttp
@implementation ARTHttp {
_Nullable dispatch_queue_t _queue;
}

- (instancetype)init {
self = [super init];
if (self) {
_queue = dispatch_queue_create("io.ably.rest.http", DISPATCH_QUEUE_SERIAL);
_urlSession = [[ARTURLSessionServerTrust alloc] init];
_baseUrl = nil;
}
Expand All @@ -172,12 +175,10 @@ - (void)executeRequest:(NSMutableURLRequest *)request completion:(void (^)(NSHTT
[self.logger debug:@"%@ %@", request.HTTPMethod, request.URL.absoluteString];
[self.logger verbose:@"Headers %@", request.allHTTPHeaderFields];

CFRunLoopRef currentRunloop = CFRunLoopGetCurrent();

[_urlSession get:request completion:^(NSHTTPURLResponse *response, NSData *data, NSError *error) {
NSHTTPURLResponse *httpResponse = (NSHTTPURLResponse *)response;
CFRunLoopPerformBlock(currentRunloop, kCFRunLoopCommonModes, ^{

dispatch_async(_queue, ^{
if (error) {
[self.logger error:@"%@ %@: error %@", request.HTTPMethod, request.URL.absoluteString, error];
} else {
Expand All @@ -190,7 +191,6 @@ - (void)executeRequest:(NSMutableURLRequest *)request completion:(void (^)(NSHTT
}
callback(httpResponse, data, error);
});
CFRunLoopWakeUp(currentRunloop);
}];
}

Expand All @@ -216,12 +216,10 @@ - (void)executeRequest:(NSMutableURLRequest *)request completion:(void (^)(NSHTT
request.HTTPBody = artRequest.body;
[self.logger debug:@"ARTHttp: makeRequest %@", [request allHTTPHeaderFields]];

__block CFRunLoopRef rl = CFRunLoopGetCurrent();

[self.urlSession get:request completion:^(NSHTTPURLResponse *response, NSData *data, NSError *error) {
NSHTTPURLResponse *httpResponse = (NSHTTPURLResponse *)response;
[self.logger verbose:@"ARTHttp: Got response %@, err %@", response, error];

if(error) {
[self.logger error:@"ARTHttp receieved error: %@", error];
cb([ARTHttpResponse responseWithStatus:500 headers:nil body:nil]);
Expand All @@ -231,17 +229,16 @@ - (void)executeRequest:(NSMutableURLRequest *)request completion:(void (^)(NSHTT
int status = (int)httpResponse.statusCode;
[self.logger debug:@"ARTHttp response status is %d", status];
[self.logger verbose:@"ARTHttp received response %@",[NSJSONSerialization JSONObjectWithData:data options:0 error:nil]];
CFRunLoopPerformBlock(rl, kCFRunLoopDefaultMode, ^{

dispatch_async(_queue, ^{
cb([ARTHttpResponse responseWithStatus:status headers:httpResponse.allHeaderFields body:data]);
});
} else {
CFRunLoopPerformBlock(rl, kCFRunLoopDefaultMode, ^{
dispatch_async(_queue, ^{
cb([ARTHttpResponse response]);
});
}
}
CFRunLoopWakeUp(rl);
}];
return nil;
}
Expand Down
3 changes: 3 additions & 0 deletions Source/ARTPresenceMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ ART_ASSUME_NONNULL_BEGIN
@property (readonly, nonatomic, assign) BOOL syncComplete;
@property (readonly, nonatomic, getter=getSyncInProgress) BOOL syncInProgress;

- (instancetype)init UNAVAILABLE_ATTRIBUTE;
- (instancetype)initWithQueue:(_Nonnull dispatch_queue_t)queue;

- (void)put:(ARTPresenceMessage *)message;
- (void)clean;

Expand Down
4 changes: 2 additions & 2 deletions Source/ARTPresenceMap.m
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ @interface ARTPresenceMap () {

@implementation ARTPresenceMap

- (id)init {
- (instancetype)initWithQueue:(_Nonnull dispatch_queue_t)queue {
self = [super init];
if(self) {
_recentMembers = [NSMutableDictionary dictionary];
_syncStarted = false;
_syncComplete = false;
_syncEndedEventEmitter = [[ARTEventEmitter alloc] init];
_syncEndedEventEmitter = [[ARTEventEmitter alloc] initWithQueue:queue];
}
return self;
}
Expand Down
Loading

0 comments on commit f81703c

Please sign in to comment.