Skip to content

Commit

Permalink
fix: Clear inProcess state on startup of outgoing mutation queue
Browse files Browse the repository at this point in the history
  • Loading branch information
wooj2 committed Apr 21, 2020
1 parent 2c9bc07 commit 3436602
Show file tree
Hide file tree
Showing 10 changed files with 199 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//
// Copyright 2018-2020 Amazon.com,
// Inc. or its affiliates. All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

import Amplify
import Foundation

final class MutationEventClearState {

let storageAdapter: StorageEngineAdapter
init(storageAdapter: StorageEngineAdapter) {
self.storageAdapter = storageAdapter
}

func clearStateOutgoingMutations(completion: @escaping BasicClosure) {
let fields = MutationEvent.keys
let predicate = fields.inProcess == true
let orderClause = """
ORDER BY \(MutationEvent.keys.createdAt.stringValue) ASC
"""

storageAdapter.query(MutationEvent.self,
predicate: predicate,
paginationInput: nil,
additionalStatements: orderClause) { result in
switch result {
case .failure(let dataStoreError):
log.error("Failed on clearStateOutgoingMutations: \(dataStoreError)")
case .success(let mutationEvents):
if !mutationEvents.isEmpty {
updateMutationsState(mutationEvents: mutationEvents,
completion: completion)
} else {
completion()
}
}
}
}

private func updateMutationsState(mutationEvents: [MutationEvent], completion: @escaping BasicClosure) {
var numMutationEventsUpdated = 0
for mutationEvent in mutationEvents {
var inProcessEvent = mutationEvent
inProcessEvent.inProcess = false
storageAdapter.save(inProcessEvent, condition: nil, completion: { result in
switch result {
case .success:
numMutationEventsUpdated += 1
if numMutationEventsUpdated >= mutationEvents.count {
completion()
}
case .failure(let error):
self.log.error("Failed to update mutationEvent:\(error)")
}
})
}
}

}

@available(iOS 13.0, *)
extension MutationEventClearState: DefaultLogger { }
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ extension RemoteSyncEngine {
case receivedStart

case pausedSubscriptions
case pausedMutationQueue(APICategoryGraphQLBehavior, StorageEngineAdapter)
case pausedMutationQueue(StorageEngineAdapter)
case clearedStateOutgoingMutations(APICategoryGraphQLBehavior, StorageEngineAdapter)
case initializedSubscriptions
case performedInitialSync
case activatedCloudSubscriptions(APICategoryGraphQLBehavior, MutationEventPublisher)
Expand All @@ -41,6 +42,8 @@ extension RemoteSyncEngine {
return "pausedSubscriptions"
case .pausedMutationQueue:
return "pausedMutationQueue"
case .clearedStateOutgoingMutations:
return "resetStateOutgoingMutations"
case .initializedSubscriptions:
return "initializedSubscriptions"
case .performedInitialSync:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ extension RemoteSyncEngine {
case (.pausingSubscriptions, .pausedSubscriptions):
return .pausingMutationQueue

case (.pausingMutationQueue, .pausedMutationQueue(let api, let storageEngineAdapter)):
case (.pausingMutationQueue, .pausedMutationQueue(let storageEngineAdapter)):
return .clearingStateOutgoingMutations(storageEngineAdapter)

case (.clearingStateOutgoingMutations, .clearedStateOutgoingMutations(let api, let storageEngineAdapter)):
return .initializingSubscriptions(api, storageEngineAdapter)

case (.initializingSubscriptions, .initializedSubscriptions):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ extension RemoteSyncEngine {

case pausingSubscriptions
case pausingMutationQueue
case clearingStateOutgoingMutations(StorageEngineAdapter)
case initializingSubscriptions(APICategoryGraphQLBehavior, StorageEngineAdapter)
case performingInitialSync
case activatingCloudSubscriptions
Expand All @@ -38,6 +39,8 @@ extension RemoteSyncEngine {
return "pausingSubscriptions"
case .pausingMutationQueue:
return "pausingMutationQueue"
case .clearingStateOutgoingMutations:
return "clearingStateOutgoingMutations"
case .initializingSubscriptions:
return "initializingSubscriptions"
case .performingInitialSync:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
pauseSubscriptions()
case .pausingMutationQueue:
pauseMutations()
case .clearingStateOutgoingMutations(let storageAdapter):
clearStateOutgoingMutations(storageAdapter: storageAdapter)
case .initializingSubscriptions(let api, let storageAdapter):
initializeSubscriptions(api: api, storageAdapter: storageAdapter)
case .performingInitialSync:
Expand Down Expand Up @@ -209,8 +211,19 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
outgoingMutationQueue.pauseSyncingToCloud()

remoteSyncTopicPublisher.send(.mutationsPaused)
if let api = self.api, let storageAdapter = self.storageAdapter {
stateMachine.notify(action: .pausedMutationQueue(api, storageAdapter))
if let storageAdapter = self.storageAdapter {
stateMachine.notify(action: .pausedMutationQueue(storageAdapter))
}
}

private func clearStateOutgoingMutations(storageAdapter: StorageEngineAdapter) {
log.debug(#function)
let mutationEventClearState = MutationEventClearState(storageAdapter: storageAdapter)
mutationEventClearState.clearStateOutgoingMutations {
if let api = self.api {
self.remoteSyncTopicPublisher.send(.clearedStateOutgoingMutations)
self.stateMachine.notify(action: .clearedStateOutgoingMutations(api, storageAdapter))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enum RemoteSyncEngineEvent {
case storageAdapterAvailable
case subscriptionsPaused
case mutationsPaused
case clearedStateOutgoingMutations
case subscriptionsInitialized
case performedInitialSync
case subscriptionsActivated
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
//
// Copyright 2018-2020 Amazon.com,
// Inc. or its affiliates. All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

import Foundation
import XCTest

@testable import Amplify
@testable import AmplifyTestCommon
@testable import AWSPluginsCore
@testable import AWSDataStoreCategoryPlugin

class MutationEventClearStateTests: XCTestCase {
var mockStorageAdapter: MockSQLiteStorageEngineAdapter!
var mutationEventClearState: MutationEventClearState!

override func setUp() {
mockStorageAdapter = MockSQLiteStorageEngineAdapter()
mutationEventClearState = MutationEventClearState(storageAdapter: mockStorageAdapter)
}

func testInProcessIsSetFromTrueToFalse() {
let queryExpectation = expectation(description: "query is called")
let saveExpectation = expectation(description: "save is Called")
let completionExpectation = expectation(description: "completion handler is called")

let queryResponder = QueryModelTypePredicateAdditionalStatementsResponder<MutationEvent> { _, _, _ in
queryExpectation.fulfill()
var mutationEvent = MutationEvent(modelId: "1111-22",
modelName: "Post",
json: "{}",
mutationType: .create)
mutationEvent.inProcess = true
return .success([mutationEvent])
}
mockStorageAdapter.responders[.queryModelTypePredicateAdditionalStatements] = queryResponder

let saveResponder = SaveModelCompletionResponder<MutationEvent> { model, completion in
XCTAssertEqual("1111-22", model.modelId)
XCTAssertFalse(model.inProcess)
saveExpectation.fulfill()
completion(.success(model))
}
mockStorageAdapter.responders[.saveModelCompletion] = saveResponder

mutationEventClearState.clearStateOutgoingMutations {
completionExpectation.fulfill()
}
wait(for: [queryExpectation,
saveExpectation,
completionExpectation], timeout: 1.0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -622,9 +622,9 @@ class MutationIngesterConflictResolutionTests: SyncEngineTestBase {

tryOrFail {
try setUpStorageAdapter(preCreating: [Post.self, Comment.self])
try saveMutationEvent(of: .create, for: post, inProcess: true)
try setUpDataStore()
try startAmplifyAndWaitForSync()
try saveMutationEvent(of: .create, for: post, inProcess: true)
}

let saveResultReceived = expectation(description: "Save result received")
Expand Down Expand Up @@ -671,10 +671,10 @@ class MutationIngesterConflictResolutionTests: SyncEngineTestBase {

tryOrFail {
try setUpStorageAdapter(preCreating: [Post.self, Comment.self])
try saveMutationEvent(of: .create, for: post, inProcess: true)
try savePost(post)
try setUpDataStore()
try startAmplifyAndWaitForSync()
try savePost(post)
try saveMutationEvent(of: .create, for: post, inProcess: true)
}

var mutatedPost = post
Expand Down Expand Up @@ -725,9 +725,9 @@ class MutationIngesterConflictResolutionTests: SyncEngineTestBase {

tryOrFail {
try setUpStorageAdapter(preCreating: [Post.self, Comment.self])
try saveMutationEvent(of: .create, for: post, inProcess: true)
try setUpDataStore()
try startAmplifyAndWaitForSync()
try saveMutationEvent(of: .create, for: post, inProcess: true)
}

let deleteResultReceived = expectation(description: "Delete result received")
Expand Down
Loading

0 comments on commit 3436602

Please sign in to comment.