diff --git a/Simperium/SPChangeProcessor.h b/Simperium/SPChangeProcessor.h index 7938231d..a55e9a45 100644 --- a/Simperium/SPChangeProcessor.h +++ b/Simperium/SPChangeProcessor.h @@ -16,6 +16,7 @@ #pragma mark Constants #pragma mark ==================================================================================== +typedef void(^SPChangeSuccessHandlerBlockType)(NSString *simperiumKey, NSString *version); typedef void(^SPChangeErrorHandlerBlockType)(NSString *simperiumKey, NSString *version, NSError *error); typedef void(^SPChangeEnumerationBlockType)(NSDictionary *change); @@ -48,10 +49,10 @@ typedef NS_ENUM(NSInteger, SPProcessorErrors) { - (void)reset; - (void)notifyOfRemoteChanges:(NSArray *)changes bucket:(SPBucket *)bucket; -- (void)processRemoteChanges:(NSArray *)changes bucket:(SPBucket *)bucket errorHandler:(SPChangeErrorHandlerBlockType)errorHandler; +- (void)processRemoteChanges:(NSArray *)changes bucket:(SPBucket *)bucket successHandler:(SPChangeSuccessHandlerBlockType)successHandler errorHandler:(SPChangeErrorHandlerBlockType)errorHandler; - (void)enqueueObjectForMoreChanges:(NSString *)key bucket:(SPBucket *)bucket; -- (void)enqueueObjectDeletion:(NSString *)key bucket:(SPBucket *)bucket; +- (void)enqueueObjectForDeletion:(NSString *)key bucket:(SPBucket *)bucket; - (void)enqueueObjectForRetry:(NSString *)key bucket:(SPBucket *)bucket overrideRemoteData:(BOOL)overrideRemoteData; - (void)discardPendingChanges:(NSString *)key bucket:(SPBucket *)bucket; diff --git a/Simperium/SPChangeProcessor.m b/Simperium/SPChangeProcessor.m index cc2f409c..da666aaa 100644 --- a/Simperium/SPChangeProcessor.m +++ b/Simperium/SPChangeProcessor.m @@ -450,10 +450,11 @@ - (void)notifyOfRemoteChanges:(NSArray *)changes bucket:(SPBucket *)bucket { [[NSNotificationCenter defaultCenter] postNotificationName:ProcessorWillChangeObjectsNotification object:bucket userInfo:userInfo]; } -- (void)processRemoteChanges:(NSArray *)changes bucket:(SPBucket *)bucket errorHandler:(SPChangeErrorHandlerBlockType)errorHandler { +- (void)processRemoteChanges:(NSArray *)changes bucket:(SPBucket *)bucket successHandler:(SPChangeSuccessHandlerBlockType)successHandler errorHandler:(SPChangeErrorHandlerBlockType)errorHandler { NSAssert([NSThread isMainThread] == NO, @"This should get called on the processor's queue!"); NSAssert([bucket isKindOfClass:[SPBucket class]], @"Invalid Bucket"); + NSAssert(successHandler, @"Please, provide a success handler!"); NSAssert(errorHandler, @"Please, provide an error handler!"); @autoreleasepool { @@ -476,6 +477,9 @@ - (void)processRemoteChanges:(NSArray *)changes bucket:(SPBucket *)bucket errorH continue; } + // Signal Success + successHandler(key, version); + // Persist LastChangeSignature: do it inside the loop in case something happens to abort the loop NSString *changeVersion = change[CH_CHANGE_VERSION]; @@ -511,7 +515,7 @@ - (void)enqueueObjectForMoreChanges:(NSString *)key bucket:(SPBucket *)bucket { [self.keysForObjectsWithMoreChanges save]; } -- (void)enqueueObjectDeletion:(NSString *)key bucket:(SPBucket *)bucket { +- (void)enqueueObjectForDeletion:(NSString *)key bucket:(SPBucket *)bucket { NSAssert( [key isKindOfClass:[NSString class]], @"Missing key" ); NSAssert( [bucket isKindOfClass:[SPBucket class]], @"Missing Bucket"); diff --git a/Simperium/SPIndexProcessor.h b/Simperium/SPIndexProcessor.h index 8c86e477..609c6825 100644 --- a/Simperium/SPIndexProcessor.h +++ b/Simperium/SPIndexProcessor.h @@ -24,7 +24,14 @@ typedef void(^SPChangeHandlerBlockType)(NSString *key); #pragma mark ==================================================================================== @interface SPIndexProcessor : NSObject + - (void)processIndex:(NSArray *)indexArray bucket:(SPBucket *)bucket versionHandler:(SPVersionHandlerBlockType)versionHandler; - (void)processVersions:(NSArray *)versions bucket:(SPBucket *)bucket changeHandler:(SPChangeHandlerBlockType)changeHandler; + +- (void)enableRebaseForAllObjects; +- (void)enableRebaseForObjectWithKey:(NSString *)simperiumKey; +- (void)disableRebaseForObjectWithKey:(NSString *)simperiumKey; + - (NSArray*)exportIndexStatus:(SPBucket *)bucket; + @end diff --git a/Simperium/SPIndexProcessor.m b/Simperium/SPIndexProcessor.m index 68990cbb..f3847753 100644 --- a/Simperium/SPIndexProcessor.m +++ b/Simperium/SPIndexProcessor.m @@ -35,12 +35,29 @@ typedef NS_ENUM(NSInteger, SPVersion) { }; +#pragma mark ==================================================================================== +#pragma mark Private +#pragma mark ==================================================================================== + +@interface SPIndexProcessor () +@property (nonatomic, strong) NSMutableSet *keysForObjectsWithRebaseDisabled; +@end + + #pragma mark ==================================================================================== #pragma mark SPIndexProcessor #pragma mark ==================================================================================== @implementation SPIndexProcessor +- (instancetype)init { + self = [super init]; + if (self) { + self.keysForObjectsWithRebaseDisabled = [NSMutableSet set]; + } + return self; +} + // Process an index of keys from the Simperium service for a particular bucket - (void)processIndex:(NSArray *)indexArray bucket:(SPBucket *)bucket versionHandler:(SPVersionHandlerBlockType)versionHandler { @@ -205,7 +222,9 @@ - (void)processVersions:(NSArray *)versions bucket:(SPBucket *)bucket changeHand SPLogWarn(@"Simperium successfully reloaded local entity (%@): %@", bucket.name, key); // 3. Rebase + apply localDiff - if (localDiff.count) { + BOOL isRebaseDisabled = [self.keysForObjectsWithRebaseDisabled containsObject:key]; + + if (localDiff.count && !isRebaseDisabled) { // 3.1. Calculate Delta: LocalGhost > RemoteMembers NSDictionary *remoteDiff = [bucket.differ diffFromDictionary:localGhost.memberData toObject:object]; @@ -237,7 +256,13 @@ - (void)processVersions:(NSArray *)versions bucket:(SPBucket *)bucket changeHand [rebasedKeys addObject:key]; } + // 4. Keep track of changed Keys [changedKeys addObject:key]; + + // 5. Cleanup + if (isRebaseDisabled) { + [self.keysForObjectsWithRebaseDisabled removeObject:key]; + } } // 4. Update the ghost with the remote member data + version @@ -286,6 +311,18 @@ - (void)processVersions:(NSArray *)versions bucket:(SPBucket *)bucket changeHand } } +- (void)enableRebaseForAllObjects { + [self.keysForObjectsWithRebaseDisabled removeAllObjects]; +} + +- (void)enableRebaseForObjectWithKey:(NSString *)simperiumKey { + [self.keysForObjectsWithRebaseDisabled removeObject:simperiumKey]; +} + +- (void)disableRebaseForObjectWithKey:(NSString *)simperiumKey { + [self.keysForObjectsWithRebaseDisabled addObject:simperiumKey]; +} + - (NSArray*)exportIndexStatus:(SPBucket *)bucket { // This routine shall be used for debugging purposes! diff --git a/Simperium/SPWebSocketChannel.m b/Simperium/SPWebSocketChannel.m index 220fe75c..84d0c621 100644 --- a/Simperium/SPWebSocketChannel.m +++ b/Simperium/SPWebSocketChannel.m @@ -158,7 +158,7 @@ - (void)sendObjectDeletion:(id)object { SPChangeProcessor *processor = object.bucket.changeProcessor; if (_indexing || !_authenticated || processor.reachedMaxPendings) { - [processor enqueueObjectDeletion:key bucket:object.bucket]; + [processor enqueueObjectForDeletion:key bucket:object.bucket]; } else { NSSet *wrappedKey = [NSSet setWithObject:key]; NSArray *changes = [processor processLocalDeletionsWithKeys:wrappedKey]; @@ -250,6 +250,12 @@ - (void)handleAuthResponse:(NSString *)responseString bucket:(SPBucket *)bucket self.onLocalChangesSent = nil; self.objectVersionsPending = 0; + // Reset disable-rebase mechanism + dispatch_async(bucket.processorQueue, ^{ + [bucket.indexProcessor enableRebaseForAllObjects]; + }); + + // Download the index, on the 1st sync if (bucket.lastChangeSignature == nil) { [self requestLatestVersionsForBucket:bucket]; } else { @@ -277,14 +283,15 @@ - (void)processBatchChanges:(NSArray *)changes bucket:(SPBucket *)bucket { SPLogVerbose(@"Simperium handling changes %@", changes); - SPChangeProcessor *processor = bucket.changeProcessor; + SPChangeProcessor *changeProcessor = bucket.changeProcessor; + SPIndexProcessor *indexProcessor = bucket.indexProcessor; // Changing entities and saving the context will clear Core Data's updatedObjects. Stash them so // sync will still work for any unsaved changes. [bucket.storage stashUnsavedObjects]; // Notify the delegates on the main thread that we're about to apply remote changes - [bucket.changeProcessor notifyOfRemoteChanges:changes bucket:bucket]; + [changeProcessor notifyOfRemoteChanges:changes bucket:bucket]; __weak __typeof(self) weakSelf = self; @@ -294,21 +301,27 @@ - (void)processBatchChanges:(NSArray *)changes bucket:(SPBucket *)bucket { return; } - [processor processRemoteChanges:changes bucket:bucket errorHandler:^(NSString *simperiumKey, NSString *version, NSError *error) { + SPChangeSuccessHandlerBlockType successHandler = ^(NSString *simperiumKey, NSString *version) { + + [indexProcessor enableRebaseForObjectWithKey:simperiumKey]; + }; + + SPChangeErrorHandlerBlockType errorHandler = ^(NSString *simperiumKey, NSString *version, NSError *error) { SPLogError(@"Simperium Received Error [%@] for object with key [%@]", error.localizedDescription, simperiumKey); - if (error.code == SPProcessorErrorsSentDuplicateChange) { - [processor discardPendingChanges:simperiumKey bucket:bucket]; + if (error.code == SPProcessorErrorsSentDuplicateChange) { + [changeProcessor discardPendingChanges:simperiumKey bucket:bucket]; } else if (error.code == SPProcessorErrorsSentInvalidChange) { - [processor enqueueObjectForRetry:simperiumKey bucket:bucket overrideRemoteData:YES]; + [changeProcessor enqueueObjectForRetry:simperiumKey bucket:bucket overrideRemoteData:YES]; + [indexProcessor disableRebaseForObjectWithKey:simperiumKey]; } else if (error.code == SPProcessorErrorsServerError) { - [processor enqueueObjectForRetry:simperiumKey bucket:bucket overrideRemoteData:NO]; + [changeProcessor enqueueObjectForRetry:simperiumKey bucket:bucket overrideRemoteData:NO]; } else if (error.code == SPProcessorErrorsClientError) { - [processor discardPendingChanges:simperiumKey bucket:bucket]; + [changeProcessor discardPendingChanges:simperiumKey bucket:bucket]; } else if (error.code == SPProcessorErrorsReceivedInvalidChange) { @@ -316,7 +329,9 @@ - (void)processBatchChanges:(NSArray *)changes bucket:(SPBucket *)bucket { [weakSelf requestVersion:version forObjectWithKey:simperiumKey]; }); } - }]; + }; + + [changeProcessor processRemoteChanges:changes bucket:bucket successHandler:successHandler errorHandler:errorHandler]; // After remote changes have been processed, check to see if any local changes were attempted (and queued) diff --git a/SimperiumTests/SPChangeProcessorTests.m b/SimperiumTests/SPChangeProcessorTests.m index e5699cc9..4dbece9a 100644 --- a/SimperiumTests/SPChangeProcessorTests.m +++ b/SimperiumTests/SPChangeProcessorTests.m @@ -126,6 +126,9 @@ - (void)testProcessRemoteChangeWithInvalidDelta { __block NSInteger errorCount = 0; [bucket.changeProcessor processRemoteChanges:changes.allValues bucket:bucket + successHandler:^(NSString *simperiumKey, NSString *version) { + XCTAssertFalse(true, @"This should not get executed"); + } errorHandler:^(NSString *simperiumKey, NSString *version, NSError *error) { XCTAssertTrue(error.code == SPProcessorErrorsReceivedInvalidChange, @"Invalid error code"); ++errorCount; diff --git a/SimperiumTests/SPIndexProcessorTests.m b/SimperiumTests/SPIndexProcessorTests.m index ea201f88..887d1cf8 100644 --- a/SimperiumTests/SPIndexProcessorTests.m +++ b/SimperiumTests/SPIndexProcessorTests.m @@ -468,4 +468,116 @@ - (void)testProcessVersionsWithExistingObjectsAndLocalPendingChangesFailsRebasin XCTAssertEqualObjects(config.ghost.memberData, data, @"Invalid Ghost MemberData"); } +- (void)testProcessVersionsWithExistingObjectsAndLocalPendingChangesWithRebaseDisabled { + + // =================================================================================================== + // Testing values! + // =================================================================================================== + // + NSString *originalLog = @"Original Captains Log"; + NSString *localPendingLog = @"Local Captains Log"; + NSString *newRemoteLog = @"Remote Captains Log"; + NSString *expectedLog = newRemoteLog; + + + // =================================================================================================== + // Helpers + // =================================================================================================== + // + MockSimperium* s = [MockSimperium mockSimperium]; + SPBucket* bucket = [s bucketForName:NSStringFromClass([Config class])]; + id storage = bucket.storage; + + + // =================================================================================================== + // Insert Config + // =================================================================================================== + // + Config* config = [storage insertNewObjectForBucketName:bucket.name simperiumKey:nil]; + config.captainsLog = originalLog; + + + // =================================================================================================== + // Manually Intialize SPGhost: we're not relying on the backend to confirm these additions! + // =================================================================================================== + // + NSMutableDictionary *memberData = [config.dictionary mutableCopy]; + SPGhost *ghost = [[SPGhost alloc] initWithKey:config.simperiumKey memberData:memberData]; + ghost.version = @"1"; + config.ghost = ghost; + config.ghostData = [memberData sp_JSONString]; + + [storage save]; + + NSLog(@"<> Successfully inserted Config object"); + + + // =================================================================================================== + // Prepare Remote Entity Message + // =================================================================================================== + // + NSString *endVersion = [NSString stringWithFormat:@"%d", config.ghost.version.intValue + 1]; + + NSDictionary *data = @{ + NSStringFromSelector(@selector(captainsLog)) : newRemoteLog, + }; + + NSArray *versions = @[ @[ config.simperiumKey, endVersion, data ] ]; + + NSLog(@"<> Successfully generated versions"); + + + // =================================================================================================== + // Add local pending changes + // =================================================================================================== + // + config.captainsLog = localPendingLog; + + [storage save]; + + + // =================================================================================================== + // Disable Rebase + // =================================================================================================== + // + dispatch_async(bucket.processorQueue, ^{ + [bucket.indexProcessor disableRebaseForObjectWithKey:config.simperiumKey]; + }); + + + // =================================================================================================== + // Process remote changes + // =================================================================================================== + // + StartBlock(); + + dispatch_async(bucket.processorQueue, ^{ + + [bucket.indexProcessor processVersions:versions bucket:bucket changeHandler:^(NSString *key) { + XCTAssertTrue(true, @"This should not get called"); + }]; + + dispatch_async(dispatch_get_main_queue(), ^{ + EndBlock(); + }); + }); + + WaitUntilBlockCompletes(); + + NSLog(@"<> Finished processing versions"); + + + // =================================================================================================== + // Verify if the indexProcessor actually did its job + // =================================================================================================== + // + + [storage refaultObjects:@[config]]; + + XCTAssertEqualObjects(config.captainsLog, expectedLog, @"Invalid Log"); + XCTAssertEqualObjects(config.ghost.version, endVersion, @"Invalid Ghost Version"); + XCTAssertEqualObjects(config.ghost.memberData, data, @"Invalid Ghost MemberData"); +} + + @end