Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPWebSocketChannel: Fixes Race in Batch Processing #363

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 55 additions & 57 deletions Simperium/SPWebSocketChannel.m
Original file line number Diff line number Diff line change
Expand Up @@ -283,84 +283,82 @@ - (void)handleRemoteChanges:(NSArray *)changes bucket:(SPBucket *)bucket {
self.changesBatch = [NSMutableArray arrayWithCapacity:SPWebsocketChangesBatchSize];
self.started = YES;

dispatch_async(dispatch_get_main_queue(), ^{
[self processBatchChanges:receivedBatch bucket:bucket];
});
[self processBatchChanges:receivedBatch bucket:bucket];
});
}

- (void)processBatchChanges:(NSArray *)changes bucket:(SPBucket *)bucket {

NSAssert([NSThread isMainThread], @"This should get called on the main thread!");
NSAssert([NSThread isMainThread] == false, @"This should NOT get called on the main thread!");

SPLogVerbose(@"Simperium handling changes (%@) %@", bucket.name ,changes);

SPChangeProcessor *changeProcessor = bucket.changeProcessor;
SPIndexProcessor *indexProcessor = bucket.indexProcessor;
__weak __typeof(self) weakSelf = self;

// Changing entities and saving the context will clear Core Data's updatedObjects. Stash them so
// sync will still work for any unsaved changes.
[bucket.storage stashUnsavedObjects];

// Notify the delegates on the main thread that we're about to apply remote changes
[changeProcessor notifyOfRemoteChanges:changes bucket:bucket];
dispatch_sync(dispatch_get_main_queue(), ^{
// Changing entities and saving the context will clear Core Data's updatedObjects. Stash them so
// sync will still work for any unsaved changes.
[bucket.storage stashUnsavedObjects];

// Notify the delegates on the main thread that we're about to apply remote changes
[changeProcessor notifyOfRemoteChanges:changes bucket:bucket];
});

// Failsafe: Don't proceed if we just got deauthenticated
if (!self.authenticated) {
return;
}

__weak __typeof(self) weakSelf = self;
// Process the changes!
SPChangeSuccessHandlerBlockType successHandler = ^(NSString *simperiumKey, NSString *version) {

[indexProcessor enableRebaseForObjectWithKey:simperiumKey];
};

dispatch_async(bucket.processorQueue, ^{
if (!self.authenticated) {
return;
}

SPChangeSuccessHandlerBlockType successHandler = ^(NSString *simperiumKey, NSString *version) {

[indexProcessor enableRebaseForObjectWithKey:simperiumKey];
};
SPChangeErrorHandlerBlockType errorHandler = ^(NSString *simperiumKey, NSString *version, NSError *error) {

SPLogError(@"Simperium Received Error [%@] for object with key [%@]", error.localizedDescription, simperiumKey);

SPChangeErrorHandlerBlockType errorHandler = ^(NSString *simperiumKey, NSString *version, NSError *error) {
if (error.code == SPProcessorErrorsClientOutOfSync) {
dispatch_async(dispatch_get_main_queue(), ^{
[weakSelf requestLatestVersionsForBucket:bucket];
});

SPLogError(@"Simperium Received Error [%@] for object with key [%@]", error.localizedDescription, simperiumKey);
} else if (error.code == SPProcessorErrorsSentDuplicateChange) {
[changeProcessor discardPendingChanges:simperiumKey bucket:bucket];

if (error.code == SPProcessorErrorsClientOutOfSync) {
} else if (error.code == SPProcessorErrorsSentInvalidChange) {
[changeProcessor enqueueObjectForRetry:simperiumKey bucket:bucket overrideRemoteData:YES];
[indexProcessor disableRebaseForObjectWithKey:simperiumKey];

} else if (error.code == SPProcessorErrorsServerError) {
[changeProcessor enqueueObjectForRetry:simperiumKey bucket:bucket overrideRemoteData:NO];

} else if (error.code == SPProcessorErrorsClientError) {
[changeProcessor discardPendingChanges:simperiumKey bucket:bucket];

} else if (error.code == SPProcessorErrorsReceivedInvalidChange) {

// Do not generate reentrant calls: let the index processor handle the reload
if (weakSelf.indexing) {
[indexProcessor enableReloadForObjectWithKey:simperiumKey];
} else {
dispatch_async(dispatch_get_main_queue(), ^{
[weakSelf requestLatestVersionsForBucket:bucket];
[weakSelf requestVersion:version forObjectWithKey:simperiumKey];
});

} else if (error.code == SPProcessorErrorsSentDuplicateChange) {
[changeProcessor discardPendingChanges:simperiumKey bucket:bucket];

} else if (error.code == SPProcessorErrorsSentInvalidChange) {
[changeProcessor enqueueObjectForRetry:simperiumKey bucket:bucket overrideRemoteData:YES];
[indexProcessor disableRebaseForObjectWithKey:simperiumKey];

} else if (error.code == SPProcessorErrorsServerError) {
[changeProcessor enqueueObjectForRetry:simperiumKey bucket:bucket overrideRemoteData:NO];

} else if (error.code == SPProcessorErrorsClientError) {
[changeProcessor discardPendingChanges:simperiumKey bucket:bucket];

} else if (error.code == SPProcessorErrorsReceivedInvalidChange) {

// Do not generate reentrant calls: let the index processor handle the reload
if (weakSelf.indexing) {
[indexProcessor enableReloadForObjectWithKey:simperiumKey];
} else {
dispatch_async(dispatch_get_main_queue(), ^{
[weakSelf requestVersion:version forObjectWithKey:simperiumKey];
});
}
}
};

[changeProcessor processRemoteChanges:changes bucket:bucket successHandler:successHandler errorHandler:errorHandler];

}
};

[changeProcessor processRemoteChanges:changes bucket:bucket successHandler:successHandler errorHandler:errorHandler];


// After remote changes have been processed, check to see if any local changes were attempted (and queued)
// in the meantime, and send them.
dispatch_async(dispatch_get_main_queue(), ^{
[self sendChangesForBucket:bucket];
});
// After remote changes have been processed, check to see if any local changes were attempted (and queued)
// in the meantime, and send them.
dispatch_async(dispatch_get_main_queue(), ^{
[self sendChangesForBucket:bucket];
});
}

Expand Down