Skip to content

Commit

Permalink
Merge pull request #317 from /issues/316-selectively-disable-rebase-o…
Browse files Browse the repository at this point in the history
…n-error

Selectively Disable Rebase on Error
  • Loading branch information
jleandroperez committed Jul 23, 2014
2 parents 69d6479 + c70a7a9 commit 80da5de
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 15 deletions.
5 changes: 3 additions & 2 deletions Simperium/SPChangeProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#pragma mark Constants
#pragma mark ====================================================================================

typedef void(^SPChangeSuccessHandlerBlockType)(NSString *simperiumKey, NSString *version);
typedef void(^SPChangeErrorHandlerBlockType)(NSString *simperiumKey, NSString *version, NSError *error);
typedef void(^SPChangeEnumerationBlockType)(NSDictionary *change);

Expand Down Expand Up @@ -48,10 +49,10 @@ typedef NS_ENUM(NSInteger, SPProcessorErrors) {
- (void)reset;

- (void)notifyOfRemoteChanges:(NSArray *)changes bucket:(SPBucket *)bucket;
- (void)processRemoteChanges:(NSArray *)changes bucket:(SPBucket *)bucket errorHandler:(SPChangeErrorHandlerBlockType)errorHandler;
- (void)processRemoteChanges:(NSArray *)changes bucket:(SPBucket *)bucket successHandler:(SPChangeSuccessHandlerBlockType)successHandler errorHandler:(SPChangeErrorHandlerBlockType)errorHandler;

- (void)enqueueObjectForMoreChanges:(NSString *)key bucket:(SPBucket *)bucket;
- (void)enqueueObjectDeletion:(NSString *)key bucket:(SPBucket *)bucket;
- (void)enqueueObjectForDeletion:(NSString *)key bucket:(SPBucket *)bucket;
- (void)enqueueObjectForRetry:(NSString *)key bucket:(SPBucket *)bucket overrideRemoteData:(BOOL)overrideRemoteData;
- (void)discardPendingChanges:(NSString *)key bucket:(SPBucket *)bucket;

Expand Down
8 changes: 6 additions & 2 deletions Simperium/SPChangeProcessor.m
Original file line number Diff line number Diff line change
Expand Up @@ -450,10 +450,11 @@ - (void)notifyOfRemoteChanges:(NSArray *)changes bucket:(SPBucket *)bucket {
[[NSNotificationCenter defaultCenter] postNotificationName:ProcessorWillChangeObjectsNotification object:bucket userInfo:userInfo];
}

- (void)processRemoteChanges:(NSArray *)changes bucket:(SPBucket *)bucket errorHandler:(SPChangeErrorHandlerBlockType)errorHandler {
- (void)processRemoteChanges:(NSArray *)changes bucket:(SPBucket *)bucket successHandler:(SPChangeSuccessHandlerBlockType)successHandler errorHandler:(SPChangeErrorHandlerBlockType)errorHandler {

NSAssert([NSThread isMainThread] == NO, @"This should get called on the processor's queue!");
NSAssert([bucket isKindOfClass:[SPBucket class]], @"Invalid Bucket");
NSAssert(successHandler, @"Please, provide a success handler!");
NSAssert(errorHandler, @"Please, provide an error handler!");

@autoreleasepool {
Expand All @@ -476,6 +477,9 @@ - (void)processRemoteChanges:(NSArray *)changes bucket:(SPBucket *)bucket errorH
continue;
}

// Signal Success
successHandler(key, version);

// Persist LastChangeSignature: do it inside the loop in case something happens to abort the loop
NSString *changeVersion = change[CH_CHANGE_VERSION];

Expand Down Expand Up @@ -511,7 +515,7 @@ - (void)enqueueObjectForMoreChanges:(NSString *)key bucket:(SPBucket *)bucket {
[self.keysForObjectsWithMoreChanges save];
}

- (void)enqueueObjectDeletion:(NSString *)key bucket:(SPBucket *)bucket {
- (void)enqueueObjectForDeletion:(NSString *)key bucket:(SPBucket *)bucket {
NSAssert( [key isKindOfClass:[NSString class]], @"Missing key" );
NSAssert( [bucket isKindOfClass:[SPBucket class]], @"Missing Bucket");

Expand Down
7 changes: 7 additions & 0 deletions Simperium/SPIndexProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,14 @@ typedef void(^SPChangeHandlerBlockType)(NSString *key);
#pragma mark ====================================================================================

@interface SPIndexProcessor : NSObject

- (void)processIndex:(NSArray *)indexArray bucket:(SPBucket *)bucket versionHandler:(SPVersionHandlerBlockType)versionHandler;
- (void)processVersions:(NSArray *)versions bucket:(SPBucket *)bucket changeHandler:(SPChangeHandlerBlockType)changeHandler;

- (void)enableRebaseForAllObjects;
- (void)enableRebaseForObjectWithKey:(NSString *)simperiumKey;
- (void)disableRebaseForObjectWithKey:(NSString *)simperiumKey;

- (NSArray*)exportIndexStatus:(SPBucket *)bucket;

@end
39 changes: 38 additions & 1 deletion Simperium/SPIndexProcessor.m
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,29 @@ typedef NS_ENUM(NSInteger, SPVersion) {
};


#pragma mark ====================================================================================
#pragma mark Private
#pragma mark ====================================================================================

@interface SPIndexProcessor ()
@property (nonatomic, strong) NSMutableSet *keysForObjectsWithRebaseDisabled;
@end


#pragma mark ====================================================================================
#pragma mark SPIndexProcessor
#pragma mark ====================================================================================

@implementation SPIndexProcessor

- (instancetype)init {
self = [super init];
if (self) {
self.keysForObjectsWithRebaseDisabled = [NSMutableSet set];
}
return self;
}

// Process an index of keys from the Simperium service for a particular bucket
- (void)processIndex:(NSArray *)indexArray bucket:(SPBucket *)bucket versionHandler:(SPVersionHandlerBlockType)versionHandler {

Expand Down Expand Up @@ -205,7 +222,9 @@ - (void)processVersions:(NSArray *)versions bucket:(SPBucket *)bucket changeHand
SPLogWarn(@"Simperium successfully reloaded local entity (%@): %@", bucket.name, key);

// 3. Rebase + apply localDiff
if (localDiff.count) {
BOOL isRebaseDisabled = [self.keysForObjectsWithRebaseDisabled containsObject:key];

if (localDiff.count && !isRebaseDisabled) {

// 3.1. Calculate Delta: LocalGhost > RemoteMembers
NSDictionary *remoteDiff = [bucket.differ diffFromDictionary:localGhost.memberData toObject:object];
Expand Down Expand Up @@ -237,7 +256,13 @@ - (void)processVersions:(NSArray *)versions bucket:(SPBucket *)bucket changeHand
[rebasedKeys addObject:key];
}

// 4. Keep track of changed Keys
[changedKeys addObject:key];

// 5. Cleanup
if (isRebaseDisabled) {
[self.keysForObjectsWithRebaseDisabled removeObject:key];
}
}

// 4. Update the ghost with the remote member data + version
Expand Down Expand Up @@ -286,6 +311,18 @@ - (void)processVersions:(NSArray *)versions bucket:(SPBucket *)bucket changeHand
}
}

- (void)enableRebaseForAllObjects {
[self.keysForObjectsWithRebaseDisabled removeAllObjects];
}

- (void)enableRebaseForObjectWithKey:(NSString *)simperiumKey {
[self.keysForObjectsWithRebaseDisabled removeObject:simperiumKey];
}

- (void)disableRebaseForObjectWithKey:(NSString *)simperiumKey {
[self.keysForObjectsWithRebaseDisabled addObject:simperiumKey];
}

- (NSArray*)exportIndexStatus:(SPBucket *)bucket {

// This routine shall be used for debugging purposes!
Expand Down
35 changes: 25 additions & 10 deletions Simperium/SPWebSocketChannel.m
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ - (void)sendObjectDeletion:(id<SPDiffable>)object {
SPChangeProcessor *processor = object.bucket.changeProcessor;

if (_indexing || !_authenticated || processor.reachedMaxPendings) {
[processor enqueueObjectDeletion:key bucket:object.bucket];
[processor enqueueObjectForDeletion:key bucket:object.bucket];
} else {
NSSet *wrappedKey = [NSSet setWithObject:key];
NSArray *changes = [processor processLocalDeletionsWithKeys:wrappedKey];
Expand Down Expand Up @@ -250,6 +250,12 @@ - (void)handleAuthResponse:(NSString *)responseString bucket:(SPBucket *)bucket
self.onLocalChangesSent = nil;
self.objectVersionsPending = 0;

// Reset disable-rebase mechanism
dispatch_async(bucket.processorQueue, ^{
[bucket.indexProcessor enableRebaseForAllObjects];
});

// Download the index, on the 1st sync
if (bucket.lastChangeSignature == nil) {
[self requestLatestVersionsForBucket:bucket];
} else {
Expand Down Expand Up @@ -277,14 +283,15 @@ - (void)processBatchChanges:(NSArray *)changes bucket:(SPBucket *)bucket {

SPLogVerbose(@"Simperium handling changes %@", changes);

SPChangeProcessor *processor = bucket.changeProcessor;
SPChangeProcessor *changeProcessor = bucket.changeProcessor;
SPIndexProcessor *indexProcessor = bucket.indexProcessor;

// Changing entities and saving the context will clear Core Data's updatedObjects. Stash them so
// sync will still work for any unsaved changes.
[bucket.storage stashUnsavedObjects];

// Notify the delegates on the main thread that we're about to apply remote changes
[bucket.changeProcessor notifyOfRemoteChanges:changes bucket:bucket];
[changeProcessor notifyOfRemoteChanges:changes bucket:bucket];


__weak __typeof(self) weakSelf = self;
Expand All @@ -294,29 +301,37 @@ - (void)processBatchChanges:(NSArray *)changes bucket:(SPBucket *)bucket {
return;
}

[processor processRemoteChanges:changes bucket:bucket errorHandler:^(NSString *simperiumKey, NSString *version, NSError *error) {
SPChangeSuccessHandlerBlockType successHandler = ^(NSString *simperiumKey, NSString *version) {

[indexProcessor enableRebaseForObjectWithKey:simperiumKey];
};

SPChangeErrorHandlerBlockType errorHandler = ^(NSString *simperiumKey, NSString *version, NSError *error) {

SPLogError(@"Simperium Received Error [%@] for object with key [%@]", error.localizedDescription, simperiumKey);

if (error.code == SPProcessorErrorsSentDuplicateChange) {
[processor discardPendingChanges:simperiumKey bucket:bucket];
if (error.code == SPProcessorErrorsSentDuplicateChange) {
[changeProcessor discardPendingChanges:simperiumKey bucket:bucket];

} else if (error.code == SPProcessorErrorsSentInvalidChange) {
[processor enqueueObjectForRetry:simperiumKey bucket:bucket overrideRemoteData:YES];
[changeProcessor enqueueObjectForRetry:simperiumKey bucket:bucket overrideRemoteData:YES];
[indexProcessor disableRebaseForObjectWithKey:simperiumKey];

} else if (error.code == SPProcessorErrorsServerError) {
[processor enqueueObjectForRetry:simperiumKey bucket:bucket overrideRemoteData:NO];
[changeProcessor enqueueObjectForRetry:simperiumKey bucket:bucket overrideRemoteData:NO];

} else if (error.code == SPProcessorErrorsClientError) {
[processor discardPendingChanges:simperiumKey bucket:bucket];
[changeProcessor discardPendingChanges:simperiumKey bucket:bucket];

} else if (error.code == SPProcessorErrorsReceivedInvalidChange) {

dispatch_async(dispatch_get_main_queue(), ^{
[weakSelf requestVersion:version forObjectWithKey:simperiumKey];
});
}
}];
};

[changeProcessor processRemoteChanges:changes bucket:bucket successHandler:successHandler errorHandler:errorHandler];


// After remote changes have been processed, check to see if any local changes were attempted (and queued)
Expand Down
3 changes: 3 additions & 0 deletions SimperiumTests/SPChangeProcessorTests.m
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ - (void)testProcessRemoteChangeWithInvalidDelta {
__block NSInteger errorCount = 0;
[bucket.changeProcessor processRemoteChanges:changes.allValues
bucket:bucket
successHandler:^(NSString *simperiumKey, NSString *version) {
XCTAssertFalse(true, @"This should not get executed");
}
errorHandler:^(NSString *simperiumKey, NSString *version, NSError *error) {
XCTAssertTrue(error.code == SPProcessorErrorsReceivedInvalidChange, @"Invalid error code");
++errorCount;
Expand Down
112 changes: 112 additions & 0 deletions SimperiumTests/SPIndexProcessorTests.m
Original file line number Diff line number Diff line change
Expand Up @@ -468,4 +468,116 @@ - (void)testProcessVersionsWithExistingObjectsAndLocalPendingChangesFailsRebasin
XCTAssertEqualObjects(config.ghost.memberData, data, @"Invalid Ghost MemberData");
}

- (void)testProcessVersionsWithExistingObjectsAndLocalPendingChangesWithRebaseDisabled {

// ===================================================================================================
// Testing values!
// ===================================================================================================
//
NSString *originalLog = @"Original Captains Log";
NSString *localPendingLog = @"Local Captains Log";
NSString *newRemoteLog = @"Remote Captains Log";
NSString *expectedLog = newRemoteLog;


// ===================================================================================================
// Helpers
// ===================================================================================================
//
MockSimperium* s = [MockSimperium mockSimperium];
SPBucket* bucket = [s bucketForName:NSStringFromClass([Config class])];
id<SPStorageProvider> storage = bucket.storage;


// ===================================================================================================
// Insert Config
// ===================================================================================================
//
Config* config = [storage insertNewObjectForBucketName:bucket.name simperiumKey:nil];
config.captainsLog = originalLog;


// ===================================================================================================
// Manually Intialize SPGhost: we're not relying on the backend to confirm these additions!
// ===================================================================================================
//
NSMutableDictionary *memberData = [config.dictionary mutableCopy];
SPGhost *ghost = [[SPGhost alloc] initWithKey:config.simperiumKey memberData:memberData];
ghost.version = @"1";
config.ghost = ghost;
config.ghostData = [memberData sp_JSONString];

[storage save];

NSLog(@"<> Successfully inserted Config object");


// ===================================================================================================
// Prepare Remote Entity Message
// ===================================================================================================
//
NSString *endVersion = [NSString stringWithFormat:@"%d", config.ghost.version.intValue + 1];

NSDictionary *data = @{
NSStringFromSelector(@selector(captainsLog)) : newRemoteLog,
};

NSArray *versions = @[ @[ config.simperiumKey, endVersion, data ] ];

NSLog(@"<> Successfully generated versions");


// ===================================================================================================
// Add local pending changes
// ===================================================================================================
//
config.captainsLog = localPendingLog;

[storage save];


// ===================================================================================================
// Disable Rebase
// ===================================================================================================
//
dispatch_async(bucket.processorQueue, ^{
[bucket.indexProcessor disableRebaseForObjectWithKey:config.simperiumKey];
});


// ===================================================================================================
// Process remote changes
// ===================================================================================================
//
StartBlock();

dispatch_async(bucket.processorQueue, ^{

[bucket.indexProcessor processVersions:versions bucket:bucket changeHandler:^(NSString *key) {
XCTAssertTrue(true, @"This should not get called");
}];

dispatch_async(dispatch_get_main_queue(), ^{
EndBlock();
});
});

WaitUntilBlockCompletes();

NSLog(@"<> Finished processing versions");


// ===================================================================================================
// Verify if the indexProcessor actually did its job
// ===================================================================================================
//

[storage refaultObjects:@[config]];

XCTAssertEqualObjects(config.captainsLog, expectedLog, @"Invalid Log");
XCTAssertEqualObjects(config.ghost.version, endVersion, @"Invalid Ghost Version");
XCTAssertEqualObjects(config.ghost.memberData, data, @"Invalid Ghost MemberData");
}


@end

0 comments on commit 80da5de

Please sign in to comment.