diff --git a/Simperium/SPWebSocketChannel.m b/Simperium/SPWebSocketChannel.m index 6360035d..9839cf55 100644 --- a/Simperium/SPWebSocketChannel.m +++ b/Simperium/SPWebSocketChannel.m @@ -283,84 +283,82 @@ - (void)handleRemoteChanges:(NSArray *)changes bucket:(SPBucket *)bucket { self.changesBatch = [NSMutableArray arrayWithCapacity:SPWebsocketChangesBatchSize]; self.started = YES; - dispatch_async(dispatch_get_main_queue(), ^{ - [self processBatchChanges:receivedBatch bucket:bucket]; - }); + [self processBatchChanges:receivedBatch bucket:bucket]; }); } - (void)processBatchChanges:(NSArray *)changes bucket:(SPBucket *)bucket { - NSAssert([NSThread isMainThread], @"This should get called on the main thread!"); + NSAssert([NSThread isMainThread] == false, @"This should NOT get called on the main thread!"); SPLogVerbose(@"Simperium handling changes (%@) %@", bucket.name ,changes); SPChangeProcessor *changeProcessor = bucket.changeProcessor; SPIndexProcessor *indexProcessor = bucket.indexProcessor; + __weak __typeof(self) weakSelf = self; - // Changing entities and saving the context will clear Core Data's updatedObjects. Stash them so - // sync will still work for any unsaved changes. - [bucket.storage stashUnsavedObjects]; - - // Notify the delegates on the main thread that we're about to apply remote changes - [changeProcessor notifyOfRemoteChanges:changes bucket:bucket]; + dispatch_sync(dispatch_get_main_queue(), ^{ + // Changing entities and saving the context will clear Core Data's updatedObjects. Stash them so + // sync will still work for any unsaved changes. + [bucket.storage stashUnsavedObjects]; + + // Notify the delegates on the main thread that we're about to apply remote changes + [changeProcessor notifyOfRemoteChanges:changes bucket:bucket]; + }); + // Failsafe: Don't proceed if we just got deauthenticated + if (!self.authenticated) { + return; + } - __weak __typeof(self) weakSelf = self; + // Process the changes! + SPChangeSuccessHandlerBlockType successHandler = ^(NSString *simperiumKey, NSString *version) { + + [indexProcessor enableRebaseForObjectWithKey:simperiumKey]; + }; - dispatch_async(bucket.processorQueue, ^{ - if (!self.authenticated) { - return; - } - - SPChangeSuccessHandlerBlockType successHandler = ^(NSString *simperiumKey, NSString *version) { - - [indexProcessor enableRebaseForObjectWithKey:simperiumKey]; - }; + SPChangeErrorHandlerBlockType errorHandler = ^(NSString *simperiumKey, NSString *version, NSError *error) { + + SPLogError(@"Simperium Received Error [%@] for object with key [%@]", error.localizedDescription, simperiumKey); - SPChangeErrorHandlerBlockType errorHandler = ^(NSString *simperiumKey, NSString *version, NSError *error) { + if (error.code == SPProcessorErrorsClientOutOfSync) { + dispatch_async(dispatch_get_main_queue(), ^{ + [weakSelf requestLatestVersionsForBucket:bucket]; + }); - SPLogError(@"Simperium Received Error [%@] for object with key [%@]", error.localizedDescription, simperiumKey); + } else if (error.code == SPProcessorErrorsSentDuplicateChange) { + [changeProcessor discardPendingChanges:simperiumKey bucket:bucket]; - if (error.code == SPProcessorErrorsClientOutOfSync) { + } else if (error.code == SPProcessorErrorsSentInvalidChange) { + [changeProcessor enqueueObjectForRetry:simperiumKey bucket:bucket overrideRemoteData:YES]; + [indexProcessor disableRebaseForObjectWithKey:simperiumKey]; + + } else if (error.code == SPProcessorErrorsServerError) { + [changeProcessor enqueueObjectForRetry:simperiumKey bucket:bucket overrideRemoteData:NO]; + + } else if (error.code == SPProcessorErrorsClientError) { + [changeProcessor discardPendingChanges:simperiumKey bucket:bucket]; + + } else if (error.code == SPProcessorErrorsReceivedInvalidChange) { + + // Do not generate reentrant calls: let the index processor handle the reload + if (weakSelf.indexing) { + [indexProcessor enableReloadForObjectWithKey:simperiumKey]; + } else { dispatch_async(dispatch_get_main_queue(), ^{ - [weakSelf requestLatestVersionsForBucket:bucket]; + [weakSelf requestVersion:version forObjectWithKey:simperiumKey]; }); - - } else if (error.code == SPProcessorErrorsSentDuplicateChange) { - [changeProcessor discardPendingChanges:simperiumKey bucket:bucket]; - - } else if (error.code == SPProcessorErrorsSentInvalidChange) { - [changeProcessor enqueueObjectForRetry:simperiumKey bucket:bucket overrideRemoteData:YES]; - [indexProcessor disableRebaseForObjectWithKey:simperiumKey]; - - } else if (error.code == SPProcessorErrorsServerError) { - [changeProcessor enqueueObjectForRetry:simperiumKey bucket:bucket overrideRemoteData:NO]; - - } else if (error.code == SPProcessorErrorsClientError) { - [changeProcessor discardPendingChanges:simperiumKey bucket:bucket]; - - } else if (error.code == SPProcessorErrorsReceivedInvalidChange) { - - // Do not generate reentrant calls: let the index processor handle the reload - if (weakSelf.indexing) { - [indexProcessor enableReloadForObjectWithKey:simperiumKey]; - } else { - dispatch_async(dispatch_get_main_queue(), ^{ - [weakSelf requestVersion:version forObjectWithKey:simperiumKey]; - }); - } } - }; - - [changeProcessor processRemoteChanges:changes bucket:bucket successHandler:successHandler errorHandler:errorHandler]; - + } + }; + + [changeProcessor processRemoteChanges:changes bucket:bucket successHandler:successHandler errorHandler:errorHandler]; + - // After remote changes have been processed, check to see if any local changes were attempted (and queued) - // in the meantime, and send them. - dispatch_async(dispatch_get_main_queue(), ^{ - [self sendChangesForBucket:bucket]; - }); + // After remote changes have been processed, check to see if any local changes were attempted (and queued) + // in the meantime, and send them. + dispatch_async(dispatch_get_main_queue(), ^{ + [self sendChangesForBucket:bucket]; }); }