diff --git a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+Action.swift b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+Action.swift new file mode 100644 index 0000000000..8342a4c10d --- /dev/null +++ b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+Action.swift @@ -0,0 +1,57 @@ +// +// Copyright 2018-2020 Amazon.com, +// Inc. or its affiliates. All Rights Reserved. +// +// SPDX-License-Identifier: Apache-2.0 +// + +import Amplify +import Combine + +@available(iOS 13.0, *) +extension RemoteSyncEngine { + + /// Actions are declarative, they say what I just did + enum Action { + // Startup/config actions + case receivedStart + + case pausedSubscriptions + case pausedMutationQueue(APICategoryGraphQLBehavior, StorageEngineAdapter) + case initializedSubscriptions + case performedInitialSync + case activatedCloudSubscriptions(APICategoryGraphQLBehavior, MutationEventPublisher) + case activatedMutationQueue + case notifiedSyncStarted + + // Terminal actions + case receivedCancel + case errored(AmplifyError) + + var displayName: String { + switch self { + case .receivedStart: + return "receivedStart" + case .pausedSubscriptions: + return "pausedSubscriptions" + case .pausedMutationQueue: + return "pausedMutationQueue" + case .initializedSubscriptions: + return "initializedSubscriptions" + case .performedInitialSync: + return "performedInitialSync" + case .activatedCloudSubscriptions: + return "activatedCloudSubscriptions" + case .activatedMutationQueue: + return "activatedMutationQueue" + case .notifiedSyncStarted: + return "notifiedSyncStarted" + case .receivedCancel: + return "receivedCancel" + case .errored: + return "errored" + } + + } + } +} diff --git a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+IncomingEventReconciliationQueueEvent.swift b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+IncomingEventReconciliationQueueEvent.swift new file mode 100644 index 0000000000..4f0f2de5b2 --- /dev/null +++ b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+IncomingEventReconciliationQueueEvent.swift @@ -0,0 +1,41 @@ +// +// Copyright 2018-2020 Amazon.com, +// Inc. or its affiliates. All Rights Reserved. +// +// SPDX-License-Identifier: Apache-2.0 +// + +import Amplify +import Combine +import Foundation + +@available(iOS 13.0, *) +extension RemoteSyncEngine { + @available(iOS 13.0, *) + func onReceiveCompletion(receiveCompletion: Subscribers.Completion) { + if case .failure(let error) = receiveCompletion { + remoteSyncTopicPublisher.send(completion: .failure(error)) + } + if case .finished = receiveCompletion { + let unexpectedFinishError = DataStoreError.unknown("ReconcilationQueue sent .finished message", + AmplifyErrorMessages.shouldNotHappenReportBugToAWS(), + nil) + remoteSyncTopicPublisher.send(completion: .failure(unexpectedFinishError)) + } + } + + @available(iOS 13.0, *) + func onReceive(receiveValue: IncomingEventReconciliationQueueEvent) { + switch receiveValue { + case .started: + remoteSyncTopicPublisher.send(.subscriptionsActivated) + if let api = self.api { + stateMachine.notify(action: .activatedCloudSubscriptions(api, mutationEventPublisher)) + } + case .paused: + remoteSyncTopicPublisher.send(.subscriptionsPaused) + case .mutationEvent(let mutationEvent): + remoteSyncTopicPublisher.send(.mutationEvent(mutationEvent)) + } + } +} diff --git a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+Resolver.swift b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+Resolver.swift new file mode 100644 index 0000000000..39ce0ec5f0 --- /dev/null +++ b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+Resolver.swift @@ -0,0 +1,53 @@ +// +// Copyright 2018-2020 Amazon.com, +// Inc. or its affiliates. All Rights Reserved. +// +// SPDX-License-Identifier: Apache-2.0 +// + +import Amplify +import Combine + +@available(iOS 13.0, *) +extension RemoteSyncEngine { + struct Resolver { + static func resolve(currentState: State, action: Action) -> State { + switch (currentState, action) { + case (.notStarted, .receivedStart): + return .pauseSubscriptions + + case (.pauseSubscriptions, .pausedSubscriptions): + return .pauseMutationQueue + + case (.pauseMutationQueue, .pausedMutationQueue(let api, let storageEngineAdapter)): + return .initializeSubscriptions(api, storageEngineAdapter) + + case (.initializeSubscriptions, .initializedSubscriptions): + return .performInitialSync + + case (.performInitialSync, .performedInitialSync): + return .activateCloudSubscriptions + case (.performInitialSync, .errored(let error)): + return .cleanup(error) + + case (.activateCloudSubscriptions, .activatedCloudSubscriptions(let api, let mutationEventPublisher)): + return .activateMutationQueue(api, mutationEventPublisher) + case (.activateCloudSubscriptions, .errored(let error)): + return .cleanup(error) + + case (.activateMutationQueue, .activatedMutationQueue): + return .notifySyncStarted + case (.activateMutationQueue, .errored(let error)): + return .cleanup(error) + + case (.notifySyncStarted, .notifiedSyncStarted): + return .syncEngineActive + + default: + log.warn("Unexpected state transition. In \(currentState.displayName), got \(action.displayName)") + log.verbose("Unexpected state transition. In \(currentState), got \(action)") + return currentState + } + } + } +} diff --git a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+State.swift b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+State.swift new file mode 100644 index 0000000000..de50e12725 --- /dev/null +++ b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+State.swift @@ -0,0 +1,53 @@ +// +// Copyright 2018-2020 Amazon.com, +// Inc. or its affiliates. All Rights Reserved. +// +// SPDX-License-Identifier: Apache-2.0 +// + +import Amplify +import Combine +@available(iOS 13.0, *) +extension RemoteSyncEngine { + + /// States are descriptive, they say what is happening in the system right now + enum State { + case notStarted + + case pauseSubscriptions + case pauseMutationQueue + case initializeSubscriptions(APICategoryGraphQLBehavior, StorageEngineAdapter) + case performInitialSync + case activateCloudSubscriptions + case activateMutationQueue(APICategoryGraphQLBehavior, MutationEventPublisher) + case notifySyncStarted + + case syncEngineActive + + case cleanup(AmplifyError) + var displayName: String { + switch self { + case .notStarted: + return "notStarted" + case .pauseSubscriptions: + return "pauseCloudSubscriptions" + case .pauseMutationQueue: + return "pauseMutationQueue" + case .initializeSubscriptions: + return "initializeSubscriptions" + case .performInitialSync: + return "performInitialSync" + case .activateCloudSubscriptions: + return "activateCloudSubscriptions" + case .activateMutationQueue: + return "activateMutationQueue" + case .notifySyncStarted: + return "notifySyncStarted" + case .syncEngineActive: + return "syncEngineActive" + case .cleanup: + return "cleanup" + } + } + } +} diff --git a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine.swift b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine.swift index 0249acf610..0ee2049454 100644 --- a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine.swift +++ b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine.swift @@ -15,37 +15,43 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior { private weak var storageAdapter: StorageEngineAdapter? // Assigned at `start` - private weak var api: APICategoryGraphQLBehavior? + weak var api: APICategoryGraphQLBehavior? // Assigned and released inside `performInitialQueries`, but we maintain a reference so we can `reset` private var initialSyncOrchestrator: InitialSyncOrchestrator? private let initialSyncOrchestratorFactory: InitialSyncOrchestratorFactory private let mutationEventIngester: MutationEventIngester - private let mutationEventPublisher: MutationEventPublisher + let mutationEventPublisher: MutationEventPublisher private let outgoingMutationQueue: OutgoingMutationQueueBehavior private var reconciliationQueueSink: AnyCancellable? - private let remoteSyncTopicPublisher: PassthroughSubject + let remoteSyncTopicPublisher: PassthroughSubject var publisher: AnyPublisher { return remoteSyncTopicPublisher.eraseToAnyPublisher() } /// Synchronizes startup operations - let syncQueue: OperationQueue + private let syncQueue: OperationQueue + private let workQueue = DispatchQueue(label: "com.amazonaws.RemoteSyncEngineOperationQueue", + target: DispatchQueue.global()) // Assigned at `setUpCloudSubscriptions` var reconciliationQueue: IncomingEventReconciliationQueue? var reconciliationQueueFactory: IncomingEventReconciliationQueueFactory + let stateMachine: StateMachine + private var stateMachineSink: AnyCancellable? + /// Initializes the CloudSyncEngine with the specified storageAdapter as the provider for persistence of /// MutationEvents, sync metadata, and conflict resolution metadata. Immediately initializes the incoming mutation /// queue so it can begin accepting incoming mutations from DataStore. convenience init(storageAdapter: StorageEngineAdapter, outgoingMutationQueue: OutgoingMutationQueueBehavior? = nil, initialSyncOrchestratorFactory: InitialSyncOrchestratorFactory? = nil, - reconciliationQueueFactory: IncomingEventReconciliationQueueFactory? = nil) throws { + reconciliationQueueFactory: IncomingEventReconciliationQueueFactory? = nil, + stateMachine: StateMachine? = nil) throws { let mutationDatabaseAdapter = try AWSMutationDatabaseAdapter(storageAdapter: storageAdapter) let awsMutationEventPublisher = AWSMutationEventPublisher(eventSource: mutationDatabaseAdapter) let outgoingMutationQueue = outgoingMutationQueue ?? OutgoingMutationQueue() @@ -53,13 +59,16 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior { AWSIncomingEventReconciliationQueue.init(modelTypes:api:storageAdapter:) let initialSyncOrchestratorFactory = initialSyncOrchestratorFactory ?? AWSInitialSyncOrchestrator.init(api:reconciliationQueue:storageAdapter:) + let stateMachine = stateMachine ?? StateMachine(initialState: .notStarted, + resolver: RemoteSyncEngine.Resolver.resolve(currentState:action:)) self.init(storageAdapter: storageAdapter, outgoingMutationQueue: outgoingMutationQueue, mutationEventIngester: mutationDatabaseAdapter, mutationEventPublisher: awsMutationEventPublisher, initialSyncOrchestratorFactory: initialSyncOrchestratorFactory, - reconciliationQueueFactory: reconciliationQueueFactory) + reconciliationQueueFactory: reconciliationQueueFactory, + stateMachine: stateMachine) } init(storageAdapter: StorageEngineAdapter, @@ -67,7 +76,8 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior { mutationEventIngester: MutationEventIngester, mutationEventPublisher: MutationEventPublisher, initialSyncOrchestratorFactory: @escaping InitialSyncOrchestratorFactory, - reconciliationQueueFactory: @escaping IncomingEventReconciliationQueueFactory) { + reconciliationQueueFactory: @escaping IncomingEventReconciliationQueueFactory, + stateMachine: StateMachine) { self.storageAdapter = storageAdapter self.mutationEventIngester = mutationEventIngester self.mutationEventPublisher = mutationEventPublisher @@ -79,64 +89,62 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior { self.syncQueue = OperationQueue() syncQueue.name = "com.amazonaws.Amplify.\(AWSDataStorePlugin.self).CloudSyncEngine" syncQueue.maxConcurrentOperationCount = 1 + + self.stateMachine = stateMachine + self.stateMachineSink = self.stateMachine + .$state + .sink { [weak self] newState in + guard let self = self else { + return + } + self.log.verbose("New state: \(newState)") + self.workQueue.async { + self.respond(to: newState) + } + } } - func start(api: APICategoryGraphQLBehavior = Amplify.API) { + /// Listens to incoming state changes and invokes the appropriate asynchronous methods in response. + private func respond(to newState: State) { + log.verbose("\(#function): \(newState)") + + switch newState { + case .notStarted: + break + case .pauseSubscriptions: + pauseSubscriptions() + case .pauseMutationQueue: + pauseMutations() + case .initializeSubscriptions(let api, let storageAdapter): + initializeSubscriptions(api: api, storageAdapter: storageAdapter) + case .performInitialSync: + performInitialSync() + case .activateCloudSubscriptions: + activateCloudSubscriptions() + case .activateMutationQueue(let api, let mutationEventPublisher): + startMutationQueue(api: api, mutationEventPublisher: mutationEventPublisher) + case .notifySyncStarted: + notifySyncStarted() + case .syncEngineActive: + break + + case .cleanup(let amplifyError): + //todo + print("error: \(amplifyError)") - self.api = api + } + } - guard let storageAdapter = storageAdapter else { + func start(api: APICategoryGraphQLBehavior = Amplify.API) { + self.api = api + guard storageAdapter != nil else { log.error(error: DataStoreError.nilStorageAdapter()) remoteSyncTopicPublisher.send(completion: .failure(DataStoreError.nilStorageAdapter())) return } - remoteSyncTopicPublisher.send(.storageAdapterAvailable) - - let pauseSubscriptionsOp = CancelAwareBlockOperation { - self.pauseSubscriptions() - } - - let pauseMutationsOp = CancelAwareBlockOperation { - self.pauseMutations() - } - pauseMutationsOp.addDependency(pauseSubscriptionsOp) - let setUpCloudSubscriptionsOp = CancelAwareBlockOperation { - self.setUpCloudSubscriptions(api: api, storageAdapter: storageAdapter) - } - setUpCloudSubscriptionsOp.addDependency(pauseMutationsOp) - - let performInitialQueriesOp = CancelAwareBlockOperation { - self.performInitialQueries() - } - performInitialQueriesOp.addDependency(setUpCloudSubscriptionsOp) - - let activateCloudSubscriptionsOp = CancelAwareBlockOperation { - self.activateCloudSubscriptions() - } - activateCloudSubscriptionsOp.addDependency(performInitialQueriesOp) - - let startMutationQueueOp = CancelAwareBlockOperation { - self.startMutationQueue(api: api, mutationEventPublisher: self.mutationEventPublisher) - } - startMutationQueueOp.addDependency(activateCloudSubscriptionsOp) - - let updateStateOp = CancelAwareBlockOperation { - Amplify.Hub.dispatch(to: .dataStore, - payload: HubPayload(eventName: HubPayload.EventName.DataStore.syncStarted)) - self.remoteSyncTopicPublisher.send(.syncStarted) - } - updateStateOp.addDependency(startMutationQueueOp) - - syncQueue.addOperations([ - pauseSubscriptionsOp, - pauseMutationsOp, - setUpCloudSubscriptionsOp, - performInitialQueriesOp, - activateCloudSubscriptionsOp, - startMutationQueueOp, - updateStateOp - ], waitUntilFinished: false) + remoteSyncTopicPublisher.send(.storageAdapterAvailable) + stateMachine.notify(action: .receivedStart) } func submit(_ mutationEvent: MutationEvent) -> Future { @@ -144,19 +152,25 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior { } // MARK: - Startup sequence - private func pauseSubscriptions() { log.debug(#function) reconciliationQueue?.pause() + + remoteSyncTopicPublisher.send(.subscriptionsPaused) + stateMachine.notify(action: .pausedSubscriptions) } private func pauseMutations() { log.debug(#function) outgoingMutationQueue.pauseSyncingToCloud() + remoteSyncTopicPublisher.send(.mutationsPaused) + if let api = self.api, let storageAdapter = self.storageAdapter { + stateMachine.notify(action: .pausedMutationQueue(api, storageAdapter)) + } } - private func setUpCloudSubscriptions(api: APICategoryGraphQLBehavior, + private func initializeSubscriptions(api: APICategoryGraphQLBehavior, storageAdapter: StorageEngineAdapter) { log.debug(#function) let syncableModelTypes = ModelRegistry.models.filter { $0.schema.isSyncable } @@ -164,35 +178,12 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior { reconciliationQueueSink = reconciliationQueue?.publisher.sink( receiveCompletion: onReceiveCompletion(receiveCompletion:), receiveValue: onReceive(receiveValue:)) - remoteSyncTopicPublisher.send(.subscriptionsInitialized) - } - - @available(iOS 13.0, *) - private func onReceiveCompletion(receiveCompletion: Subscribers.Completion) { - if case .failure(let error) = receiveCompletion { - self.remoteSyncTopicPublisher.send(completion: .failure(error)) - } - if case .finished = receiveCompletion { - let unexpectedFinishError = DataStoreError.unknown("ReconcilationQueue sent .finished message", - AmplifyErrorMessages.shouldNotHappenReportBugToAWS(), - nil) - self.remoteSyncTopicPublisher.send(completion: .failure(unexpectedFinishError)) - } - } - @available(iOS 13.0, *) - private func onReceive(receiveValue: IncomingEventReconciliationQueueEvent) { - switch receiveValue { - case .started: - remoteSyncTopicPublisher.send(.subscriptionsActivated) - case .paused: - remoteSyncTopicPublisher.send(.subscriptionsPaused) - case .mutationEvent(let mutationEvent): - remoteSyncTopicPublisher.send(.mutationEvent(mutationEvent)) - } + remoteSyncTopicPublisher.send(.subscriptionsInitialized) + stateMachine.notify(action: .initializedSubscriptions) } - private func performInitialQueries() { + private func performInitialSync() { log.debug(#function) let initialSyncOrchestrator = initialSyncOrchestratorFactory(api, reconciliationQueue, storageAdapter) @@ -220,18 +211,30 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior { semaphore.wait() self.initialSyncOrchestrator = nil + stateMachine.notify(action: .performedInitialSync) } private func activateCloudSubscriptions() { log.debug(#function) reconciliationQueue?.start() + + //Notifying the publisher & state machine are handled in: + // RemoteSyncEngine+IncomingEventReconciliationQueueEvent.swift } private func startMutationQueue(api: APICategoryGraphQLBehavior, mutationEventPublisher: MutationEventPublisher) { log.debug(#function) outgoingMutationQueue.startSyncingToCloud(api: api, mutationEventPublisher: mutationEventPublisher) + remoteSyncTopicPublisher.send(.mutationQueueStarted) + stateMachine.notify(action: .activatedMutationQueue) + } + + private func notifySyncStarted() { + Amplify.Hub.dispatch(to: .dataStore, + payload: HubPayload(eventName: HubPayload.EventName.DataStore.syncStarted)) + remoteSyncTopicPublisher.send(.syncStarted) } func reset(onComplete: () -> Void) { diff --git a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/LocalSubscriptionTests.swift b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/LocalSubscriptionTests.swift index a6655ac1e1..c84930a411 100644 --- a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/LocalSubscriptionTests.swift +++ b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/LocalSubscriptionTests.swift @@ -24,6 +24,7 @@ class LocalSubscriptionTests: XCTestCase { let storageAdapter: SQLiteStorageEngineAdapter let storageEngine: StorageEngine + var stateMachine: MockStateMachine! do { let connection = try Connection(.inMemory) storageAdapter = try SQLiteStorageEngineAdapter(connection: connection) @@ -32,13 +33,15 @@ class LocalSubscriptionTests: XCTestCase { let outgoingMutationQueue = NoOpMutationQueue() let mutationDatabaseAdapter = try AWSMutationDatabaseAdapter(storageAdapter: storageAdapter) let awsMutationEventPublisher = AWSMutationEventPublisher(eventSource: mutationDatabaseAdapter) + stateMachine = MockStateMachine(initialState: .notStarted, resolver: RemoteSyncEngine.Resolver.resolve(currentState:action:)) let syncEngine = RemoteSyncEngine(storageAdapter: storageAdapter, outgoingMutationQueue: outgoingMutationQueue, mutationEventIngester: mutationDatabaseAdapter, mutationEventPublisher: awsMutationEventPublisher, initialSyncOrchestratorFactory: NoOpInitialSyncOrchestrator.factory, - reconciliationQueueFactory: MockAWSIncomingEventReconciliationQueue.factory) + reconciliationQueueFactory: MockAWSIncomingEventReconciliationQueue.factory, + stateMachine: stateMachine) storageEngine = StorageEngine(storageAdapter: storageAdapter, syncEngine: syncEngine) diff --git a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/RemoteEngineSyncTests.swift b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/RemoteEngineSyncTests.swift index 0de1b16988..e14b25d8ca 100644 --- a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/RemoteEngineSyncTests.swift +++ b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/RemoteEngineSyncTests.swift @@ -56,6 +56,7 @@ class RemoteEngineSyncTests: XCTestCase { func testFailureOnInitialSync() throws { let storageAdapterAvailable = expectation(description: "storageAdapterAvailable") + let subscriptionsPaused = expectation(description: "subscriptionsPaused") let mutationsPaused = expectation(description: "mutationsPaused") let subscriptionsInitialized = expectation(description: "subscriptionsInitialized") let failureOnInitialSync = expectation(description: "failureOnInitialSync") @@ -69,7 +70,7 @@ class RemoteEngineSyncTests: XCTestCase { case .storageAdapterAvailable: storageAdapterAvailable.fulfill() case .subscriptionsPaused: - XCTFail("subscriptions have not been created, so they are not paused") + subscriptionsPaused.fulfill() case .mutationsPaused: mutationsPaused.fulfill() case .subscriptionsInitialized: @@ -86,12 +87,15 @@ class RemoteEngineSyncTests: XCTestCase { remoteSyncEngine.start() wait(for: [storageAdapterAvailable, - mutationsPaused, subscriptionsInitialized, + subscriptionsPaused, + mutationsPaused, + subscriptionsInitialized, failureOnInitialSync], timeout: defaultAsyncWaitTimeout) } func testRemoteSyncEngineHappyPath() throws { let storageAdapterAvailable = expectation(description: "storageAdapterAvailable") + let subscriptionsPaused = expectation(description: "subscriptionsPaused") let mutationsPaused = expectation(description: "mutationsPaused") let subscriptionsInitialized = expectation(description: "subscriptionsInitialized") let performedInitialSync = expectation(description: "performedInitialSync") @@ -108,7 +112,7 @@ class RemoteEngineSyncTests: XCTestCase { case .storageAdapterAvailable: storageAdapterAvailable.fulfill() case .subscriptionsPaused: - XCTFail("subscriptions have not been created, so they are not paused") + subscriptionsPaused.fulfill() case .mutationsPaused: mutationsPaused.fulfill() case .subscriptionsInitialized: @@ -129,6 +133,7 @@ class RemoteEngineSyncTests: XCTestCase { remoteSyncEngine.start() wait(for: [storageAdapterAvailable, + subscriptionsPaused, mutationsPaused, subscriptionsInitialized, performedInitialSync, @@ -139,6 +144,7 @@ class RemoteEngineSyncTests: XCTestCase { func testFailsAfterSyncStarted() throws { let storageAdapterAvailable = expectation(description: "storageAdapterAvailable") + let subscriptionsPaused = expectation(description: "subscriptionsPaused") let mutationsPaused = expectation(description: "mutationsPaused") let subscriptionsInitialized = expectation(description: "subscriptionsInitialized") let performedInitialSync = expectation(description: "performedInitialSync") @@ -156,7 +162,7 @@ class RemoteEngineSyncTests: XCTestCase { case .storageAdapterAvailable: storageAdapterAvailable.fulfill() case .subscriptionsPaused: - XCTFail("subscriptions have not been created, so they are not paused") + subscriptionsPaused.fulfill() case .mutationsPaused: mutationsPaused.fulfill() case .subscriptionsInitialized: @@ -178,6 +184,7 @@ class RemoteEngineSyncTests: XCTestCase { remoteSyncEngine.start() wait(for: [storageAdapterAvailable, + subscriptionsPaused, mutationsPaused, subscriptionsInitialized, performedInitialSync, diff --git a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/TestSupport/SyncEngineTestBase.swift b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/TestSupport/SyncEngineTestBase.swift index 0bd1402863..6bb2a4a60b 100644 --- a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/TestSupport/SyncEngineTestBase.swift +++ b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/TestSupport/SyncEngineTestBase.swift @@ -24,6 +24,8 @@ class SyncEngineTestBase: XCTestCase { /// Used for DB manipulation to mock starting data for tests var storageAdapter: SQLiteStorageEngineAdapter! + var stateMachine: StateMachine! + // MARK: - Setup override func setUp() { @@ -67,13 +69,16 @@ class SyncEngineTestBase: XCTestCase { ) throws { let mutationDatabaseAdapter = try AWSMutationDatabaseAdapter(storageAdapter: storageAdapter) let awsMutationEventPublisher = AWSMutationEventPublisher(eventSource: mutationDatabaseAdapter) + stateMachine = StateMachine(initialState: .notStarted, + resolver: RemoteSyncEngine.Resolver.resolve(currentState:action:)) let syncEngine = RemoteSyncEngine(storageAdapter: storageAdapter, outgoingMutationQueue: mutationQueue, mutationEventIngester: mutationDatabaseAdapter, mutationEventPublisher: awsMutationEventPublisher, initialSyncOrchestratorFactory: initialSyncOrchestratorFactory, - reconciliationQueueFactory: AWSIncomingEventReconciliationQueue.factory) + reconciliationQueueFactory: AWSIncomingEventReconciliationQueue.factory, + stateMachine: stateMachine) let storageEngine = StorageEngine(storageAdapter: storageAdapter, syncEngine: syncEngine) diff --git a/AmplifyPlugins/DataStore/DataStoreCategoryPlugin.xcodeproj/project.pbxproj b/AmplifyPlugins/DataStore/DataStoreCategoryPlugin.xcodeproj/project.pbxproj index 1db499dae9..c1aea40cca 100644 --- a/AmplifyPlugins/DataStore/DataStoreCategoryPlugin.xcodeproj/project.pbxproj +++ b/AmplifyPlugins/DataStore/DataStoreCategoryPlugin.xcodeproj/project.pbxproj @@ -51,6 +51,10 @@ 6B01B72023A4672500AD0E97 /* RequestRetryable.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6B01B71E23A4672500AD0E97 /* RequestRetryable.swift */; }; 6B01B72223A4672500AD0E97 /* RequestRetryablePolicy.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6B01B71F23A4672500AD0E97 /* RequestRetryablePolicy.swift */; }; 6B3381BC239778E90036F046 /* AWSMutationDatabaseAdapterTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6B3381BB239778E90036F046 /* AWSMutationDatabaseAdapterTests.swift */; }; + 6B3CC61123F5E6210008ECBC /* RemoteSyncEngine+Action.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6B3CC61023F5E6210008ECBC /* RemoteSyncEngine+Action.swift */; }; + 6B3CC61523F5E63F0008ECBC /* RemoteSyncEngine+Resolver.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6B3CC61423F5E63F0008ECBC /* RemoteSyncEngine+Resolver.swift */; }; + 6B3CC61923F5E64F0008ECBC /* RemoteSyncEngine+State.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6B3CC61823F5E64F0008ECBC /* RemoteSyncEngine+State.swift */; }; + 6B3CC67C23F86D680008ECBC /* RemoteSyncEngine+IncomingEventReconciliationQueueEvent.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6B3CC67B23F86D680008ECBC /* RemoteSyncEngine+IncomingEventReconciliationQueueEvent.swift */; }; 6B4693E923A5645F006BE2C5 /* MutationRetryNotifier.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6B4693E823A5645F006BE2C5 /* MutationRetryNotifier.swift */; }; 6B4E3DF42397269C00AD962B /* OutgoingMutationQueueTestsWithMockStateMachine.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6B4E3DF32397269C00AD962B /* OutgoingMutationQueueTestsWithMockStateMachine.swift */; }; 6B4E3DF62397327E00AD962B /* MockStateMachine.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6B4E3DF52397327E00AD962B /* MockStateMachine.swift */; }; @@ -210,6 +214,10 @@ 6B01B71E23A4672500AD0E97 /* RequestRetryable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RequestRetryable.swift; sourceTree = ""; }; 6B01B71F23A4672500AD0E97 /* RequestRetryablePolicy.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RequestRetryablePolicy.swift; sourceTree = ""; }; 6B3381BB239778E90036F046 /* AWSMutationDatabaseAdapterTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AWSMutationDatabaseAdapterTests.swift; sourceTree = ""; }; + 6B3CC61023F5E6210008ECBC /* RemoteSyncEngine+Action.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "RemoteSyncEngine+Action.swift"; sourceTree = ""; }; + 6B3CC61423F5E63F0008ECBC /* RemoteSyncEngine+Resolver.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "RemoteSyncEngine+Resolver.swift"; sourceTree = ""; }; + 6B3CC61823F5E64F0008ECBC /* RemoteSyncEngine+State.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "RemoteSyncEngine+State.swift"; sourceTree = ""; }; + 6B3CC67B23F86D680008ECBC /* RemoteSyncEngine+IncomingEventReconciliationQueueEvent.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "RemoteSyncEngine+IncomingEventReconciliationQueueEvent.swift"; sourceTree = ""; }; 6B4693E823A5645F006BE2C5 /* MutationRetryNotifier.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MutationRetryNotifier.swift; sourceTree = ""; }; 6B4E3DF32397269C00AD962B /* OutgoingMutationQueueTestsWithMockStateMachine.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = OutgoingMutationQueueTestsWithMockStateMachine.swift; sourceTree = ""; }; 6B4E3DF52397327E00AD962B /* MockStateMachine.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MockStateMachine.swift; sourceTree = ""; }; @@ -429,6 +437,10 @@ isa = PBXGroup; children = ( 2149E5BE2388684F00873955 /* RemoteSyncEngine.swift */, + 6B3CC61023F5E6210008ECBC /* RemoteSyncEngine+Action.swift */, + 6B3CC61423F5E63F0008ECBC /* RemoteSyncEngine+Resolver.swift */, + 6B3CC61823F5E64F0008ECBC /* RemoteSyncEngine+State.swift */, + 6B3CC67B23F86D680008ECBC /* RemoteSyncEngine+IncomingEventReconciliationQueueEvent.swift */, FAF7CECE238C93A50095547B /* RemoteSyncEngineBehavior.swift */, 6B01B71E23A4672500AD0E97 /* RequestRetryable.swift */, 6B01B71F23A4672500AD0E97 /* RequestRetryablePolicy.swift */, @@ -1176,6 +1188,7 @@ files = ( 2149E5DE2388684F00873955 /* AWSDataStorePlugin.swift in Sources */, FAC010EC23956D5800FCE7BB /* ReconcileAndLocalSaveOperation+Resolver.swift in Sources */, + 6B3CC61923F5E64F0008ECBC /* RemoteSyncEngine+State.swift in Sources */, FA6C3FEC23988D0900A73110 /* AWSIncomingEventReconciliationQueue.swift in Sources */, 6B01B72023A4672500AD0E97 /* RequestRetryable.swift in Sources */, 6B01B72223A4672500AD0E97 /* RequestRetryablePolicy.swift in Sources */, @@ -1196,6 +1209,7 @@ FA55A54B2391EAB5002AFF2D /* MutationEventSubscriber.swift in Sources */, FA3B3F07238F23CA002EFDB3 /* OutgoingMutationQueue+Resolver.swift in Sources */, 2149E5D22388684F00873955 /* SQLStatement+Condition.swift in Sources */, + 6B3CC61123F5E6210008ECBC /* RemoteSyncEngine+Action.swift in Sources */, FAED573C238B4B03008EBED8 /* Statement+AnyModel.swift in Sources */, FA1AE9BC23988BE100DE396D /* AWSModelReconciliationQueue.swift in Sources */, 2149E5C52388684F00873955 /* StorageEngineBehavior.swift in Sources */, @@ -1209,6 +1223,7 @@ FACBA78B2394950C006349C8 /* MutationEventSource.swift in Sources */, FAC010E823956CF400FCE7BB /* ReconcileAndLocalSaveOperation+State.swift in Sources */, FA60461323980A75009E4B97 /* InitialSyncOperation.swift in Sources */, + 6B3CC67C23F86D680008ECBC /* RemoteSyncEngine+IncomingEventReconciliationQueueEvent.swift in Sources */, 2149E5C72388684F00873955 /* StorageEngine.swift in Sources */, FA3B3F05238F22F5002EFDB3 /* OutgoingMutationQueue+Action.swift in Sources */, FAED573E238B4C2F008EBED8 /* StorageEngineAdapter+UntypedModel.swift in Sources */, @@ -1234,6 +1249,7 @@ FA55A54D2391F96E002AFF2D /* AWSMutationDatabaseAdapter+MutationEventSource.swift in Sources */, 2149E5CE2388684F00873955 /* SQLStatement+CreateTable.swift in Sources */, FACBB2AE238AFAE800C29602 /* AWSDataStorePlugin+DataStoreBaseBehavior.swift in Sources */, + 6B3CC61523F5E63F0008ECBC /* RemoteSyncEngine+Resolver.swift in Sources */, 2149E5D32388684F00873955 /* Statement+Model.swift in Sources */, ); runOnlyForDeploymentPostprocessing = 0;