Skip to content

Commit

Permalink
Integrate retryability for outgoing mutation queue
Browse files Browse the repository at this point in the history
  • Loading branch information
wooj2 committed Dec 17, 2019
1 parent 162f46a commit 3391f2f
Show file tree
Hide file tree
Showing 19 changed files with 2,018 additions and 1,461 deletions.
4 changes: 4 additions & 0 deletions Amplify.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
2861F5F799C24671B5C4DB8D /* Pods_Amplify_AmplifyTestConfigs_AmplifyTestCommon.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = C448F4F6DD01A268675E1C68 /* Pods_Amplify_AmplifyTestConfigs_AmplifyTestCommon.framework */; };
2CFB61C7E80D065C0A885A2F /* Pods_Amplify_AWSPluginsCore_AWSPluginsTestConfigs_AWSPluginsTestCommon.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = D5363CAF9EFAA822FED56808 /* Pods_Amplify_AWSPluginsCore_AWSPluginsTestConfigs_AWSPluginsTestCommon.framework */; };
3263D332138415AF42E64FF7 /* Pods_AmplifyTestApp.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = CDC7F1C368154B364CB74742 /* Pods_AmplifyTestApp.framework */; };
6BB7441023A9954900B0EB6C /* DispatchSource+MakeOneOff.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6BB7440F23A9954900B0EB6C /* DispatchSource+MakeOneOff.swift */; };
7D5ED6C78E25246DDAF2F2EC /* Pods_Amplify.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 84F3A76FB68CEFA45F4BB1BB /* Pods_Amplify.framework */; platformFilter = ios; };
7F27B1DCE59C1E674172CCD6 /* Pods_Amplify_AmplifyTestConfigs_AmplifyTests.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 976D972EC2BBCAAD023694EB /* Pods_Amplify_AmplifyTestConfigs_AmplifyTests.framework */; };
881246F5DCC59436DC932469 /* Pods_Amplify_AWSPluginsCore.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 35D92182B8445C8F9B0FAE94 /* Pods_Amplify_AWSPluginsCore.framework */; };
Expand Down Expand Up @@ -642,6 +643,7 @@
687B09E9348F8D29979A2404 /* Pods-Amplify-AmplifyAWSPlugins-AWSPinpointAnalyticsPlugin-AWSPinpointAnalyticsPluginTests.debug.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-Amplify-AmplifyAWSPlugins-AWSPinpointAnalyticsPlugin-AWSPinpointAnalyticsPluginTests.debug.xcconfig"; path = "Target Support Files/Pods-Amplify-AmplifyAWSPlugins-AWSPinpointAnalyticsPlugin-AWSPinpointAnalyticsPluginTests/Pods-Amplify-AmplifyAWSPlugins-AWSPinpointAnalyticsPlugin-AWSPinpointAnalyticsPluginTests.debug.xcconfig"; sourceTree = "<group>"; };
6AF0E4775809F0866F9C44D9 /* Pods-AmplifyAWSPlugins-AWSPluginsCore-AWSS3StoragePlugin-AWSS3StoragePluginTests.debug.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-AmplifyAWSPlugins-AWSPluginsCore-AWSS3StoragePlugin-AWSS3StoragePluginTests.debug.xcconfig"; path = "Target Support Files/Pods-AmplifyAWSPlugins-AWSPluginsCore-AWSS3StoragePlugin-AWSS3StoragePluginTests/Pods-AmplifyAWSPlugins-AWSPluginsCore-AWSS3StoragePlugin-AWSS3StoragePluginTests.debug.xcconfig"; sourceTree = "<group>"; };
6BAC32194A15ACB56F07DC87 /* Pods-AWSS3StoragePlugin.debug.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-AWSS3StoragePlugin.debug.xcconfig"; path = "Target Support Files/Pods-AWSS3StoragePlugin/Pods-AWSS3StoragePlugin.debug.xcconfig"; sourceTree = "<group>"; };
6BB7440F23A9954900B0EB6C /* DispatchSource+MakeOneOff.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "DispatchSource+MakeOneOff.swift"; sourceTree = "<group>"; };
6C41D3730B7ED4FD62A43E40 /* Pods-Amplify-AmplifyAWSPlugins-AWSAPICategoryPlugin-AWSAPICategoryPluginTests.debug.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-Amplify-AmplifyAWSPlugins-AWSAPICategoryPlugin-AWSAPICategoryPluginTests.debug.xcconfig"; path = "Target Support Files/Pods-Amplify-AmplifyAWSPlugins-AWSAPICategoryPlugin-AWSAPICategoryPluginTests/Pods-Amplify-AmplifyAWSPlugins-AWSAPICategoryPlugin-AWSAPICategoryPluginTests.debug.xcconfig"; sourceTree = "<group>"; };
6D51240C78418B733FFA6829 /* Pods-Amplify-AWSPluginsCore-AWSPluginsTestConfigs-AWSDataStoreCategoryPluginTests.debug.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-Amplify-AWSPluginsCore-AWSPluginsTestConfigs-AWSDataStoreCategoryPluginTests.debug.xcconfig"; path = "Target Support Files/Pods-Amplify-AWSPluginsCore-AWSPluginsTestConfigs-AWSDataStoreCategoryPluginTests/Pods-Amplify-AWSPluginsCore-AWSPluginsTestConfigs-AWSDataStoreCategoryPluginTests.debug.xcconfig"; sourceTree = "<group>"; };
6D62C9C57736C3BEADEB1E30 /* Pods-AWSPinpointAnalyticsPlugin.debug.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-AWSPinpointAnalyticsPlugin.debug.xcconfig"; path = "Target Support Files/Pods-AWSPinpointAnalyticsPlugin/Pods-AWSPinpointAnalyticsPlugin.debug.xcconfig"; sourceTree = "<group>"; };
Expand Down Expand Up @@ -1948,6 +1950,7 @@
FAA64FC42397344D00B9C3C6 /* AtomicValue+RangeReplaceableCollection.swift */,
FADB3A6723612940006D6FE9 /* BasicClosure.swift */,
FA56F72222B14B420039754A /* Cancellable.swift */,
6BB7440F23A9954900B0EB6C /* DispatchSource+MakeOneOff.swift */,
FA09B9402321BB78000E064D /* JSONValue.swift */,
FACD264F2386E9410068FBE6 /* JSONValue+KeyPath.swift */,
FACD264E2386E9410068FBE6 /* JSONValue+Subscript.swift */,
Expand Down Expand Up @@ -3439,6 +3442,7 @@
FAA2E8C223A00D5800E420EA /* APICategory+Resettable.swift in Sources */,
FAA2E8CC23A02A5400E420EA /* HubCategory+Resettable.swift in Sources */,
2142099823721F4400FA140C /* RESTOperationRequest.swift in Sources */,
6BB7441023A9954900B0EB6C /* DispatchSource+MakeOneOff.swift in Sources */,
21FFF994230C96CB005878EA /* StorageUploadDataOperation.swift in Sources */,
95DAAB30237E63370028544F /* IdentifyAction.swift in Sources */,
21D79FE32377F4120057D00D /* SubscriptionConnectionState.swift in Sources */,
Expand Down
42 changes: 42 additions & 0 deletions Amplify/Core/Support/DispatchSource+MakeOneOff.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//
// Copyright 2018-2019 Amazon.com,
// Inc. or its affiliates. All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

import Foundation

extension DispatchSource {
/// Convenience function to encapsulate creation of a one-off DispatchSourceTimer for different versions of Swift
///
/// - Parameters:
/// - interval: The future DispatchInterval at which to fire the timer
/// - queue: The queue on which the timer should perform its block
/// - block: The block to invoke when the timer is fired
/// - Returns: The unstarted timer
public static func makeOneOffDispatchSourceTimer(interval: DispatchTimeInterval,
queue: DispatchQueue,
block: @escaping () -> Void ) -> DispatchSourceTimer {
let deadline = DispatchTime.now() + interval
return makeOneOffDispatchSourceTimer(deadline: deadline, queue: queue, block: block)
}

/// Convenience function to encapsulate creation of a one-off DispatchSourceTimer for different versions of Swift
/// - Parameters:
/// - deadline: The time to fire the timer
/// - queue: The queue on which the timer should perform its block
/// - block: The block to invoke when the timer is fired
public static func makeOneOffDispatchSourceTimer(deadline: DispatchTime,
queue: DispatchQueue,
block: @escaping () -> Void ) -> DispatchSourceTimer {
let timer = DispatchSource.makeTimerSource(flags: DispatchSource.TimerFlags(rawValue: 0), queue: queue)
#if swift(>=4)
timer.schedule(deadline: deadline)
#else
timer.scheduleOneshot(deadline: deadline)
#endif
timer.setEventHandler(handler: block)
return timer
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//
// Copyright 2018-2019 Amazon.com,
// Inc. or its affiliates. All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

import Amplify
import Foundation
import AWSPluginsCore
import Combine

@available(iOS 13.0, *)
final class MutationRetryNotifier {
private var nextSyncTimer: DispatchSourceTimer?
private var handlerQueue = DispatchQueue.global(qos: .default)
var retryMutationCallback: () -> Void
private var reachabilitySubscription: Subscription?

init(advice: RequestRetryAdvice,
networkReachabilityPublisher: AnyPublisher<ReachabilityUpdate, DataStoreError>?,
retryMutationCallback: @escaping BasicClosure) {
self.retryMutationCallback = retryMutationCallback

networkReachabilityPublisher?.subscribe(self)

let deadline = DispatchTime.now() + advice.retryInterval
scheduleTimer(at: deadline)
}

deinit {
cancel()
}

private func scheduleTimer(at deadline: DispatchTime) {
nextSyncTimer = DispatchSource.makeOneOffDispatchSourceTimer(deadline: deadline, queue: handlerQueue) {
self.notifyCallback()
}
nextSyncTimer?.resume()
}

func cancel() {
reachabilitySubscription?.cancel()
nextSyncTimer?.cancel()
}

func notifyCallback() {
// Call the cancel routine as the purpose of retry is fulfilled
cancel()
retryMutationCallback()
}
}

@available(iOS 13.0, *)
extension MutationRetryNotifier: Subscriber {
func receive(subscription: Subscription) {
log.verbose(#function)
reachabilitySubscription = subscription
subscription.request(.unlimited)
}

func receive(_ reachabilityUpdate: ReachabilityUpdate) -> Subscribers.Demand {
if reachabilityUpdate.isOnline {
notifyCallback()
return .none
}
return .unlimited
}

func receive(completion: Subscribers.Completion<DataStoreError>) {
log.verbose(#function)
reachabilitySubscription?.cancel()
}
}

@available(iOS 13.0, *)
extension MutationRetryNotifier: DefaultLogger { }
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,9 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
return
}

//TODO: Find a way to resolve networkReachabilityPublisher
let syncMutationToCloudOperation =
SyncMutationToCloudOperation(mutationEvent: mutationEvent, api: api) { result in
SyncMutationToCloudOperation(mutationEvent: mutationEvent, api: api, networkReachabilityPublisher: nil) { result in
self.log.verbose("mutationEvent finished: \(mutationEvent); result: \(result)")
self.stateMachine.notify(action: .processedEvent)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,30 @@ import AWSPluginsCore
/// Publishes a mutation event to the specified Cloud API. Upon receipt of the API response, validates to ensure it is
/// not a retriable error. If it is, attempts a retry until either success or terminal failure. Upon success or
/// terminal failure, publishes the event response to the appropriate ModelReconciliationQueue subject.
@available(iOS 13.0, *)
class SyncMutationToCloudOperation: Operation {

private weak var api: APICategoryGraphQLBehavior?
private let mutationEvent: MutationEvent
private var mutationOperation: GraphQLOperation<MutationSync<AnyModel>>?
private let networkReachabilityPublisher: AnyPublisher<ReachabilityUpdate, DataStoreError>?
private let completion: GraphQLOperation<MutationSync<AnyModel>>.EventListener

init(mutationEvent: MutationEvent, api: APICategoryGraphQLBehavior,
private var mutationRetryNotifier: MutationRetryNotifier?
private var requestRetryablePolicy: RequestRetryablePolicy
private var currentAttemptNumber: Int

init(mutationEvent: MutationEvent,
api: APICategoryGraphQLBehavior,
networkReachabilityPublisher: AnyPublisher<ReachabilityUpdate, DataStoreError>?,
currentAttemptNumber: Int = 1,
requestRetryablePolicy: RequestRetryablePolicy? = RequestRetryablePolicy(),
completion: @escaping GraphQLOperation<MutationSync<AnyModel>>.EventListener) {
self.mutationEvent = mutationEvent
self.api = api
self.networkReachabilityPublisher = networkReachabilityPublisher
self.completion = completion

self.currentAttemptNumber = currentAttemptNumber
self.requestRetryablePolicy = requestRetryablePolicy ?? RequestRetryablePolicy()
super.init()
}

Expand All @@ -42,6 +53,13 @@ class SyncMutationToCloudOperation: Operation {
sendMutationToCloud()
}

override func cancel() {
mutationOperation?.cancel()
mutationRetryNotifier?.cancel()
let apiError = APIError.unknown("Operation cancelled", "")
finish(result: .failed(apiError))
}

private func sendMutationToCloud() {
guard !isCancelled else {
mutationOperation?.cancel()
Expand All @@ -51,14 +69,15 @@ class SyncMutationToCloudOperation: Operation {
}

log.debug(#function)
guard let api = api else {
// TODO: This should be part of our error handling routines
log.error("\(#function): API unexpectedly nil")
let apiError = APIError.unknown("API unexpectedly nil", "")
if let apiRequest = createAPIRequest() {
makeAPIRequest(apiRequest)
} else {
let apiError = APIError.unknown("Unable to create API request", "")
finish(result: .failed(apiError))
return
}
}

func createAPIRequest() -> GraphQLRequest<MutationSync<AnyModel>>? {
guard let mutationType = GraphQLMutationType(rawValue: mutationEvent.mutationType) else {
let dataStoreError = DataStoreError.decodingError(
"Invalid mutation type",
Expand All @@ -67,9 +86,9 @@ class SyncMutationToCloudOperation: Operation {
match any known GraphQL mutation type. Ensure you only send valid mutation types:
\(GraphQLMutationType.allCases)
"""
)
)
log.error(error: dataStoreError)
return
return nil
}

let request: GraphQLRequest<MutationSync<AnyModel>>
Expand All @@ -82,11 +101,21 @@ class SyncMutationToCloudOperation: Operation {
} catch {
let apiError = APIError.unknown("Couldn't decode model", "", error)
finish(result: .failed(apiError))
return
return nil
}
return request
}

log.verbose("\(#function) sending mutation with sync data: \(request)")
mutationOperation = api.mutate(request: request) { asyncEvent in
func makeAPIRequest(_ apiRequest: GraphQLRequest<MutationSync<AnyModel>>) {
guard let api = api else {
// TODO: This should be part of our error handling routines
log.error("\(#function): API unexpectedly nil")
let apiError = APIError.unknown("API unexpectedly nil", "")
finish(result: .failed(apiError))
return
}
log.verbose("\(#function) sending mutation with sync data: \(apiRequest)")
mutationOperation = api.mutate(request: apiRequest) { asyncEvent in
self.log.verbose("sendMutationToCloud received asyncEvent: \(asyncEvent)")
self.validateResponseFromCloud(asyncEvent: asyncEvent)
}
Expand Down Expand Up @@ -126,7 +155,17 @@ class SyncMutationToCloudOperation: Operation {
return
}

// TODO: Wire in actual event validation and retriability
if case .failed(let error) = asyncEvent {
let advice = getRetryAdviceIfRetryable(error: error)
if advice.shouldRetry {
self.scheduleRetry(advice: advice)
} else {
self.finish(result: .failed(error))
}
return
}

// TODO: Wire in actual event validation

// This doesn't belong here--need to add a `delete` API to the MutationEventSource and pass a
// reference into the mutation queue.
Expand All @@ -139,7 +178,36 @@ class SyncMutationToCloudOperation: Operation {
self.finish(result: asyncEvent)
}
}
}

private func getRetryAdviceIfRetryable(error: APIError) -> RequestRetryAdvice {
var advice = RequestRetryAdvice(shouldRetry: false, retryInterval: DispatchTimeInterval.never)

switch error {
case .networkError(_, _, let error):
//currently expecting APIOperationResponse to be an URLError
let urlError = error as? URLError
advice = requestRetryablePolicy.retryRequestAdvice(urlError: urlError,
httpURLResponse: nil,
attemptNumber: currentAttemptNumber)
case .httpStatusError(_, let httpURLResponse):
advice = requestRetryablePolicy.retryRequestAdvice(urlError: nil,
httpURLResponse: httpURLResponse,
attemptNumber: currentAttemptNumber)
default:
break
}
return advice
}

private func scheduleRetry(advice: RequestRetryAdvice) {
log.verbose("\(#function) scheduling retry for mutation")
mutationRetryNotifier = MutationRetryNotifier(advice: advice,
networkReachabilityPublisher: networkReachabilityPublisher) {
self.sendMutationToCloud()
self.mutationRetryNotifier = nil
}
currentAttemptNumber += 1
}

private func finish(result: AsyncEvent<Void, GraphQLResponse<MutationSync<AnyModel>>, APIError>) {
Expand All @@ -152,4 +220,5 @@ class SyncMutationToCloudOperation: Operation {
}
}

@available(iOS 13.0, *)
extension SyncMutationToCloudOperation: DefaultLogger { }
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ class NetworkReachabilityNotifier {
private var reachability: NetworkReachabilityProviding?
private var allowsCellularAccess = true

let reachabilityPublisher = PassthroughSubject<ReachabilityUpdate, Never>()
let reachabilityPublisher = PassthroughSubject<ReachabilityUpdate, DataStoreError>()
var publisher: AnyPublisher<ReachabilityUpdate, DataStoreError> {
return reachabilityPublisher.eraseToAnyPublisher()
}

public init(host: String,
allowsCellularAccess: Bool,
reachabilityFactory: NetworkReachabilityProvidingFactory.Type = Reachability.self) {
reachabilityFactory: NetworkReachabilityProvidingFactory.Type) {
self.reachability = reachabilityFactory.make(for: host)
self.allowsCellularAccess = allowsCellularAccess

Expand All @@ -43,12 +46,7 @@ class NetworkReachabilityNotifier {
deinit {
reachability?.stopNotifier()
NotificationCenter.default.removeObserver(self)
reachabilityPublisher.send(completion: Subscribers.Completion<Never>.finished)
}

func publisher() -> AnyPublisher<ReachabilityUpdate, Never> {
return reachabilityPublisher
.eraseToAnyPublisher()
reachabilityPublisher.send(completion: Subscribers.Completion<DataStoreError>.finished)
}

// MARK: - Notifications
Expand All @@ -67,7 +65,7 @@ class NetworkReachabilityNotifier {
isReachable = false
}

let reachabilityMessageUpdate = ReachabilityUpdate(isOnline: isReachable)
let reachabilityMessageUpdate = ReachabilityUpdate(isOnline: isReachable)
reachabilityPublisher.send(reachabilityMessageUpdate)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ import Foundation

struct RequestRetryAdvice {
let shouldRetry: Bool
let retryInterval: DispatchTimeInterval?
let retryInterval: DispatchTimeInterval
init(shouldRetry: Bool,
retryInterval: DispatchTimeInterval = .seconds(60)) {
self.shouldRetry = shouldRetry
self.retryInterval = retryInterval
}

}

Expand Down
Loading

0 comments on commit 3391f2f

Please sign in to comment.