Skip to content

Commit

Permalink
fix: Bug where subscription connections happen at the same time (#389)
Browse files Browse the repository at this point in the history
  • Loading branch information
wooj2 authored Apr 22, 2020
1 parent ca15e88 commit 81e6111
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
let awsMutationEventPublisher = AWSMutationEventPublisher(eventSource: mutationDatabaseAdapter)
let outgoingMutationQueue = outgoingMutationQueue ?? OutgoingMutationQueue()
let reconciliationQueueFactory = reconciliationQueueFactory ??
AWSIncomingEventReconciliationQueue.init(modelTypes:api:storageAdapter:)
AWSIncomingEventReconciliationQueue.init(modelTypes:api:storageAdapter:modelReconciliationQueueFactory:)
let initialSyncOrchestratorFactory = initialSyncOrchestratorFactory ??
AWSInitialSyncOrchestrator.init(dataStoreConfiguration:api:reconciliationQueue:storageAdapter:)
let stateMachine = stateMachine ?? StateMachine(initialState: .notStarted,
Expand Down Expand Up @@ -231,7 +231,7 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
storageAdapter: StorageEngineAdapter) {
log.debug(#function)
let syncableModelTypes = ModelRegistry.models.filter { $0.schema.isSyncable }
reconciliationQueue = reconciliationQueueFactory(syncableModelTypes, api, storageAdapter)
reconciliationQueue = reconciliationQueueFactory(syncableModelTypes, api, storageAdapter, nil)
reconciliationQueueSink = reconciliationQueue?.publisher.sink(
receiveCompletion: onReceiveCompletion(receiveCompletion:),
receiveValue: onReceive(receiveValue:))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import Foundation
//Used for testing:
@available(iOS 13.0, *)
typealias IncomingEventReconciliationQueueFactory =
([Model.Type], APICategoryGraphQLBehavior, StorageEngineAdapter) -> IncomingEventReconciliationQueue
([Model.Type], APICategoryGraphQLBehavior, StorageEngineAdapter, ModelReconciliationQueueFactory?) -> IncomingEventReconciliationQueue

@available(iOS 13.0, *)
final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueue {

static let factory: IncomingEventReconciliationQueueFactory = { modelTypes, api, storageAdapter in
AWSIncomingEventReconciliationQueue(modelTypes: modelTypes, api: api, storageAdapter: storageAdapter)
static let factory: IncomingEventReconciliationQueueFactory = { modelTypes, api, storageAdapter, _ in
AWSIncomingEventReconciliationQueue(modelTypes: modelTypes, api: api, storageAdapter: storageAdapter, modelReconciliationQueueFactory: nil)
}
private var modelReconciliationQueueSinks: [String: AnyCancellable]

Expand All @@ -28,21 +28,27 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu
return eventReconciliationQueueTopic.eraseToAnyPublisher()
}

private let connectionStatusSerialQueue: DispatchQueue
private var reconciliationQueues: [String: ModelReconciliationQueue]
private var reconciliationQueueConnectionStatus: [String: Bool]
private var modelReconciliationQueueFactory: ModelReconciliationQueueFactory

init(modelTypes: [Model.Type],
api: APICategoryGraphQLBehavior,
storageAdapter: StorageEngineAdapter) {
storageAdapter: StorageEngineAdapter,
modelReconciliationQueueFactory: ModelReconciliationQueueFactory? = nil) {
self.modelReconciliationQueueSinks = [:]
self.eventReconciliationQueueTopic = PassthroughSubject<IncomingEventReconciliationQueueEvent, DataStoreError>()
self.reconciliationQueues = [:]
self.reconciliationQueueConnectionStatus = [:]
self.modelReconciliationQueueFactory = modelReconciliationQueueFactory ??
AWSModelReconciliationQueue.init(modelType:storageAdapter:api:incomingSubscriptionEvents:)
//TODO: Add target for SyncEngine system to help prevent thread explosion and increase performance
// https://github.com/aws-amplify/amplify-ios/issues/399
self.connectionStatusSerialQueue = DispatchQueue(label: "com.amazonaws.DataStore.AWSIncomingEventReconciliationQueue")
for modelType in modelTypes {
let modelName = modelType.modelName
let queue = AWSModelReconciliationQueue(modelType: modelType,
storageAdapter: storageAdapter,
api: api)
let queue = self.modelReconciliationQueueFactory(modelType, storageAdapter, api, nil)
guard reconciliationQueues[modelName] == nil else {
Amplify.DataStore.log
.warn("Duplicate model name found: \(modelName), not subscribing")
Expand Down Expand Up @@ -75,7 +81,9 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu
}

private func onReceiveCompletion(completed: Subscribers.Completion<DataStoreError>) {
reconciliationQueueConnectionStatus = [:]
connectionStatusSerialQueue.async {
self.reconciliationQueueConnectionStatus = [:]
}
switch completed {
case .failure(let error):
eventReconciliationQueueTopic.send(completion: .failure(error))
Expand All @@ -89,9 +97,11 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu
case .mutationEvent(let event):
eventReconciliationQueueTopic.send(.mutationEvent(event))
case .connected(let modelName):
reconciliationQueueConnectionStatus[modelName] = true
if reconciliationQueueConnectionStatus.count == reconciliationQueues.count {
eventReconciliationQueueTopic.send(.initialized)
connectionStatusSerialQueue.async {
self.reconciliationQueueConnectionStatus[modelName] = true
if self.reconciliationQueueConnectionStatus.count == self.reconciliationQueues.count {
self.eventReconciliationQueueTopic.send(.initialized)
}
}
default:
break
Expand All @@ -101,8 +111,10 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu
func cancel() {
modelReconciliationQueueSinks.values.forEach { $0.cancel() }
reconciliationQueues.values.forEach { $0.cancel()}
reconciliationQueues = [:]
modelReconciliationQueueSinks = [:]
connectionStatusSerialQueue.async {
self.reconciliationQueues = [:]
self.modelReconciliationQueueSinks = [:]
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import AWSPluginsCore
import Combine
import Foundation

//Used for testing:
@available(iOS 13.0, *)
typealias ModelReconciliationQueueFactory =
(Model.Type, StorageEngineAdapter, APICategoryGraphQLBehavior, IncomingSubscriptionEventPublisher?) -> ModelReconciliationQueue

/// A queue of reconciliation operations, merged from incoming subscription events and responses to locally-sourced
/// mutations for a single model type.
///
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
//
// Copyright 2018-2020 Amazon.com,
// Inc. or its affiliates. All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

import Foundation
import XCTest

@testable import Amplify
@testable import AmplifyTestCommon
@testable import AWSPluginsCore
@testable import AWSDataStoreCategoryPlugin


class AWSIncomingEventReconciliationQueueTests: XCTestCase {
var storageAdapter: MockSQLiteStorageEngineAdapter!
var apiPlugin: MockAPICategoryPlugin!

override func setUp() {
MockModelReconciliationQueue.reset()
storageAdapter = MockSQLiteStorageEngineAdapter()
storageAdapter.returnOnQuery(dataStoreResult: .none)
storageAdapter.returnOnSave(dataStoreResult: .none)

apiPlugin = MockAPICategoryPlugin()

}
var operationQueue: OperationQueue!

//This test case attempts to hit a race condition, and may be required to execute multiple times
// in order to demonstrate the bug
func testTwoConnectionStatusUpdatesAtSameTime() {
let expectInitialized = expectation(description: "eventQueue expected to send out initialized state")

let modelReconciliationQueueFactory
= MockModelReconciliationQueue.init(modelType:storageAdapter:api:incomingSubscriptionEvents:)
let eventQueue = AWSIncomingEventReconciliationQueue(
modelTypes: [Post.self, Comment.self],
api: apiPlugin,
storageAdapter: storageAdapter,
modelReconciliationQueueFactory: modelReconciliationQueueFactory)
eventQueue.start()

let eventSync = eventQueue.publisher.sink(receiveCompletion: { _ in
XCTFail("Not expecting this to call")
}, receiveValue: { event in
switch event {
case .initialized:
expectInitialized.fulfill()
default:
XCTFail("Should not expect any other state")
}
})

operationQueue = OperationQueue()
operationQueue.name = "com.amazonaws.DataStore.UnitTestQueue"
operationQueue.maxConcurrentOperationCount = 2
operationQueue.underlyingQueue = DispatchQueue.global()
operationQueue.isSuspended = true

let reconciliationQueues = MockModelReconciliationQueue.mockModelReconciliationQueues
for (queueName, queue) in reconciliationQueues {
let cancellableOperation = CancelAwareBlockOperation {
queue.modelReconciliationQueueSubject.send(.connected(queueName))
}
operationQueue.addOperation(cancellableOperation)
}
operationQueue.isSuspended = false
waitForExpectations(timeout: 2)

}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//
// Copyright 2018-2020 Amazon.com,
// Inc. or its affiliates. All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

import Foundation
import Combine

@testable import Amplify
@testable import AmplifyTestCommon
@testable import AWSPluginsCore
@testable import AWSDataStoreCategoryPlugin
class MockModelReconciliationQueue: ModelReconciliationQueue {

public static var mockModelReconciliationQueues: [String: MockModelReconciliationQueue] = [:]

private let modelType: Model.Type
let modelReconciliationQueueSubject: PassthroughSubject<ModelReconciliationQueueEvent, DataStoreError>
var publisher: AnyPublisher<ModelReconciliationQueueEvent, DataStoreError> {
return modelReconciliationQueueSubject.eraseToAnyPublisher()
}

init(modelType: Model.Type,
storageAdapter: StorageEngineAdapter?,
api: APICategoryGraphQLBehavior,
incomingSubscriptionEvents: IncomingSubscriptionEventPublisher? = nil) {
self.modelReconciliationQueueSubject = PassthroughSubject<ModelReconciliationQueueEvent, DataStoreError>()
self.modelType = modelType
MockModelReconciliationQueue.mockModelReconciliationQueues[modelType.modelName] = self
}

func start() {
//no-op
}
func pause() {
//no-op
}

func cancel() {
//no-op
}

func enqueue(_ remoteModel: MutationSync<AnyModel>) {
//no-op
}

static func reset() {
MockModelReconciliationQueue.mockModelReconciliationQueues = [:]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import Combine
@testable import AWSDataStoreCategoryPlugin

class MockAWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueue {
static let factory: IncomingEventReconciliationQueueFactory = { modelTypes, api, storageAdapter in
static let factory: IncomingEventReconciliationQueueFactory = { modelTypes, api, storageAdapter, _ in
MockAWSIncomingEventReconciliationQueue(modelTypes: modelTypes, api: api, storageAdapter: storageAdapter)
}
let incomingEventSubject: PassthroughSubject<IncomingEventReconciliationQueueEvent, DataStoreError>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
6B4693E923A5645F006BE2C5 /* MutationRetryNotifier.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6B4693E823A5645F006BE2C5 /* MutationRetryNotifier.swift */; };
6B4E3DF42397269C00AD962B /* OutgoingMutationQueueTestsWithMockStateMachine.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6B4E3DF32397269C00AD962B /* OutgoingMutationQueueTestsWithMockStateMachine.swift */; };
6B4E3DF62397327E00AD962B /* MockStateMachine.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6B4E3DF52397327E00AD962B /* MockStateMachine.swift */; };
6B52DC94244784FC007F5AD3 /* AWSIncomingEventReconciliationQueueTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6B52DC93244784FC007F5AD3 /* AWSIncomingEventReconciliationQueueTests.swift */; };
6B52DC9624478E75007F5AD3 /* MockModelReconciliatinQueue.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6B52DC9524478E75007F5AD3 /* MockModelReconciliatinQueue.swift */; };
6B64027923E3584300001FD7 /* MockAWSIncomingEventReconciliationQueue.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6B64027823E3584300001FD7 /* MockAWSIncomingEventReconciliationQueue.swift */; };
6B64027B23E38B9900001FD7 /* MockOutgoingMutationQueue.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6B64027A23E38B9900001FD7 /* MockOutgoingMutationQueue.swift */; };
6B91DEBF238B9CE0004D6BEE /* ReconcileAndLocalSaveOperationTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6B91DEBE238B9CE0004D6BEE /* ReconcileAndLocalSaveOperationTests.swift */; };
Expand Down Expand Up @@ -241,6 +243,8 @@
6B4693E823A5645F006BE2C5 /* MutationRetryNotifier.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MutationRetryNotifier.swift; sourceTree = "<group>"; };
6B4E3DF32397269C00AD962B /* OutgoingMutationQueueTestsWithMockStateMachine.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = OutgoingMutationQueueTestsWithMockStateMachine.swift; sourceTree = "<group>"; };
6B4E3DF52397327E00AD962B /* MockStateMachine.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MockStateMachine.swift; sourceTree = "<group>"; };
6B52DC93244784FC007F5AD3 /* AWSIncomingEventReconciliationQueueTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AWSIncomingEventReconciliationQueueTests.swift; sourceTree = "<group>"; };
6B52DC9524478E75007F5AD3 /* MockModelReconciliatinQueue.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MockModelReconciliatinQueue.swift; sourceTree = "<group>"; };
6B64027823E3584300001FD7 /* MockAWSIncomingEventReconciliationQueue.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MockAWSIncomingEventReconciliationQueue.swift; sourceTree = "<group>"; };
6B64027A23E38B9900001FD7 /* MockOutgoingMutationQueue.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MockOutgoingMutationQueue.swift; sourceTree = "<group>"; };
6B91DEBE238B9CE0004D6BEE /* ReconcileAndLocalSaveOperationTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ReconcileAndLocalSaveOperationTests.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -732,6 +736,7 @@
FAE01F9B23997C3700B468DA /* SubscriptionSync */ = {
isa = PBXGroup;
children = (
6B52DC93244784FC007F5AD3 /* AWSIncomingEventReconciliationQueueTests.swift */,
FA4A9556239ACAD7008E876E /* ModelReconciliationQueueBehaviorTests.swift */,
FAABC224239B1B3500740F9F /* ModelReconciliationDeleteTests.swift */,
6B91DEBE238B9CE0004D6BEE /* ReconcileAndLocalSaveOperationTests.swift */,
Expand All @@ -756,6 +761,7 @@
FAE4146B239AA40600CE94C2 /* MockSQLiteStorageEngineAdapter.swift */,
FA4A955C239AD810008E876E /* MockSQLiteStorageEngineAdapterResponders.swift */,
FAABC226239B1B9100740F9F /* ReconciliationQueueTestBase.swift */,
6B52DC9524478E75007F5AD3 /* MockModelReconciliatinQueue.swift */,
);
path = Support;
sourceTree = "<group>";
Expand Down Expand Up @@ -1359,10 +1365,12 @@
FAE4146A239AA2B700CE94C2 /* RemoteSyncReconcilerTests.swift in Sources */,
2149E5FE238869CF00873955 /* RemoteSyncAPIInvocationTests.swift in Sources */,
B9FAA142238C6082009414B4 /* BaseDataStoreTests.swift in Sources */,
6B52DC9624478E75007F5AD3 /* MockModelReconciliatinQueue.swift in Sources */,
FA4A955B239AD3F4008E876E /* Foundation+TestExtensions.swift in Sources */,
6B64027B23E38B9900001FD7 /* MockOutgoingMutationQueue.swift in Sources */,
2149E600238869CF00873955 /* SQLiteStorageEngineAdapterTests.swift in Sources */,
21B3AD27242BB58000C7E1DA /* ProcessMutationErrorFromCloudOperationTests.swift in Sources */,
6B52DC94244784FC007F5AD3 /* AWSIncomingEventReconciliationQueueTests.swift in Sources */,
6B4E3DF62397327E00AD962B /* MockStateMachine.swift in Sources */,
FAF5287B2399814F0053A717 /* MockReconciliationQueue.swift in Sources */,
);
Expand Down

0 comments on commit 81e6111

Please sign in to comment.