diff --git a/Sources/SwiftOCA/OCF/Errors.swift b/Sources/SwiftOCA/OCF/Errors.swift index 0d224d44..85390d7b 100644 --- a/Sources/SwiftOCA/OCF/Errors.swift +++ b/Sources/SwiftOCA/OCF/Errors.swift @@ -34,6 +34,7 @@ public enum Ocp1Error: Error, Equatable { case invalidProtocolVersion case invalidProxyMethodResponse case invalidSyncValue + case missingKeepalive case noConnectionDelegate case noInitialValue case noMatchingTypeForClass diff --git a/Sources/SwiftOCA/OCP.1/Backend/Ocp1CFSocketConnection.swift b/Sources/SwiftOCA/OCP.1/Backend/Ocp1CFSocketConnection.swift index 35caad7f..80908724 100644 --- a/Sources/SwiftOCA/OCP.1/Backend/Ocp1CFSocketConnection.swift +++ b/Sources/SwiftOCA/OCP.1/Backend/Ocp1CFSocketConnection.swift @@ -401,14 +401,14 @@ public class Ocp1CFSocketConnection: Ocp1Connection { deviceAddress.family } - override func connectDevice() async throws { + override public func connectDevice() async throws { socket = try await _CFSocketWrapper(address: deviceAddress, type: type) try await super.connectDevice() } - override public func disconnectDevice(clearObjectCache: Bool) async throws { + override public func disconnectDevice() async throws { socket = nil - try await super.disconnectDevice(clearObjectCache: clearObjectCache) + try await super.disconnectDevice() } } diff --git a/Sources/SwiftOCA/OCP.1/Backend/Ocp1FlyingSocksConnection.swift b/Sources/SwiftOCA/OCP.1/Backend/Ocp1FlyingSocksConnection.swift index d2b502e8..46b9ee7a 100644 --- a/Sources/SwiftOCA/OCP.1/Backend/Ocp1FlyingSocksConnection.swift +++ b/Sources/SwiftOCA/OCP.1/Backend/Ocp1FlyingSocksConnection.swift @@ -123,7 +123,7 @@ public class Ocp1FlyingSocksConnection: Ocp1Connection { try? asyncSocket?.close() } - override func connectDevice() async throws { + override public func connectDevice() async throws { let socket = try Socket(domain: Int32(deviceAddress.family), type: socketType) try? setSocketOptions(socket) // also connect UDP sockets to ensure we do not receive unsolicited replies @@ -135,13 +135,13 @@ public class Ocp1FlyingSocksConnection: Ocp1Connection { try await super.connectDevice() } - override public func disconnectDevice(clearObjectCache: Bool) async throws { + override public func disconnectDevice() async throws { await AsyncSocketPoolMonitor.shared.stop() if let asyncSocket { try asyncSocket.close() self.asyncSocket = nil } - try await super.disconnectDevice(clearObjectCache: clearObjectCache) + try await super.disconnectDevice() } public convenience init( diff --git a/Sources/SwiftOCA/OCP.1/Backend/Ocp1IORingConnection.swift b/Sources/SwiftOCA/OCP.1/Backend/Ocp1IORingConnection.swift index 9aeb4334..396eefeb 100644 --- a/Sources/SwiftOCA/OCP.1/Backend/Ocp1IORingConnection.swift +++ b/Sources/SwiftOCA/OCP.1/Backend/Ocp1IORingConnection.swift @@ -79,7 +79,7 @@ public class Ocp1IORingConnection: Ocp1Connection { ) } - override func connectDevice() async throws { + override public func connectDevice() async throws { let socket = try Socket( ring: IORing.shared, domain: deviceAddress.family, @@ -94,9 +94,9 @@ public class Ocp1IORingConnection: Ocp1Connection { try await super.connectDevice() } - override public func disconnectDevice(clearObjectCache: Bool) async throws { + override public func disconnectDevice() async throws { socket = nil - try await super.disconnectDevice(clearObjectCache: clearObjectCache) + try await super.disconnectDevice() } fileprivate func withMappedError( diff --git a/Sources/SwiftOCA/OCP.1/Ocp1Connection+Connect.swift b/Sources/SwiftOCA/OCP.1/Ocp1Connection+Connect.swift new file mode 100644 index 00000000..56a7c714 --- /dev/null +++ b/Sources/SwiftOCA/OCP.1/Ocp1Connection+Connect.swift @@ -0,0 +1,349 @@ +// +// Copyright (c) 2023-2024 PADL Software Pty Ltd +// +// Licensed under the Apache License, Version 2.0 (the License); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import AsyncAlgorithms +import AsyncExtensions +@preconcurrency +import Foundation +import Logging +#if canImport(Combine) +import Combine +#elseif canImport(OpenCombine) +import OpenCombine +#endif +import SystemPackage + +private extension Ocp1Error { + var connectionState: Ocp1ConnectionState? { + switch self { + case .notConnected: + .notConnected + case .connectionTimeout: + .connectionTimedOut + default: + nil + } + } +} + +private extension Error { + var ocp1ConnectionState: Ocp1ConnectionState { + (self as? Ocp1Error)?.connectionState ?? .connectionFailed + } + + var isRecoverableConnectionError: Bool { + if let error = self as? Ocp1Error { + switch error { + case .missingKeepalive: + fallthrough + case .connectionTimeout: + fallthrough + case .notConnected: + return true + default: + return false + } + } else if let error = self as? Errno { + switch error { + case .connectionAbort: + fallthrough + case .connectionReset: + fallthrough + case .connectionRefused: + return true + default: + return false + } + } else { + return false + } + } +} + +private extension Ocp1ConnectionState { + var error: Ocp1Error? { + switch self { + case .notConnected: + fallthrough + case .connectionFailed: + return .notConnected + case .connectionTimedOut: + return .connectionTimeout + default: + return nil + } + } +} + +// MARK: - monitor task management + +extension Ocp1Connection { + /// start receiveMessages/keepAlive monitor task + private func _startMonitor() { + let monitor = Monitor(self) + monitorTask = Task { + try await monitor.run() + } + self.monitor = monitor + } + + /// stop receiveMessages/keepAlive monitor task + private func _stopMonitor() { + if let monitor { + monitor.stop() + self.monitor = nil + } + if let monitorTask { + monitorTask.cancel() + self.monitorTask = nil + } + } +} + +// MARK: - connection handling + +extension Ocp1Connection { + /// connect to the OCA device, throwing `Ocp1Error.connectionTimeout` if it times out + private func _connectDeviceWithTimeout() async throws { + do { + try await withThrowingTimeout(of: options.connectionTimeout) { + try await self.connectDevice() + } + } catch Ocp1Error.responseTimeout { + throw Ocp1Error.connectionTimeout + } catch { + throw error + } + } + + private func _refreshDeviceTreeWithPolicy() async { + if options.flags.contains(.refreshDeviceTreeOnConnection) { + logger.trace("refreshing device tree") + try? await refreshDeviceTree() + } + } + + /// functions with `_` prefix (with the exception of `_updateConnectionState()`) expect the + /// caller to update the connection state + + private func _updateConnectionState(_ connectionState: Ocp1ConnectionState) { + _connectionState.send(connectionState) + } + + private func _didConnectDevice() async throws { + _updateConnectionState(.connected) + + _startMonitor() + + if heartbeatTime > .zero { + // send keepalive to open UDP connection + try await sendKeepAlive() + } + + #if canImport(Combine) || canImport(OpenCombine) + objectWillChange.send() + #endif + + await refreshSubscriptions() + await refreshCachedObjectProperties() + await _refreshDeviceTreeWithPolicy() + + logger.info("connected to \(self)") + } + + public func connect() async throws { + logger.trace("connecting...") + + _updateConnectionState(.connecting) + + do { + try await _connectDeviceWithTimeout() + } catch { + logger.trace("connection failed: \(error)") + _updateConnectionState(error.ocp1ConnectionState) + throw error + } + + let connectionState = _connectionState.value + if connectionState == .connecting { + try await _didConnectDevice() + } else if connectionState != .connected { + logger.trace("connection failed whilst attempting to connect: \(connectionState)") + throw connectionState.error ?? .notConnected + } + } +} + +// MARK: - disconnection handling + +extension Ocp1Connection { + private func _disconnectDevice(clearObjectCache: Bool) async throws { + try await disconnectDevice() + + if clearObjectCache { + await self.clearObjectCache() + } + + #if canImport(Combine) || canImport(OpenCombine) + objectWillChange.send() + #endif + + logger.info("disconnected from \(self)") + } + + /// disconnect from the OCA device, retaining the object cache + private func _disconnectDeviceAfterConnectionFailure() async throws { + try await _disconnectDevice(clearObjectCache: false) + } + + public func disconnect() async throws { + await removeSubscriptions() + + _updateConnectionState(.notConnected) + + let clearObjectCache = !options.flags.contains(.retainObjectCacheAfterDisconnect) + try await _disconnectDevice(clearObjectCache: clearObjectCache) + } +} + +// MARK: - reconnection handling + +extension Ocp1Connection { + enum ReconnectionPolicy { + /// do not try to automatically reconnect on connection failure + case noReconnect + /// try to reconnect in the keepAlive monitor task + case reconnectInMonitor + /// try to reconnect before sending the next message + case reconnectOnSend + } + + /// + /// Re-connection logic is as follows: + /// + /// * If the connection has a heartbeat, then automatic reconnection is only + /// managed in the / heartbeat task + /// + /// * If the connection does not have a heartbeat, than automatic + /// reconnection is managed when / sending a PDU + /// + private var _reconnectionPolicy: ReconnectionPolicy { + if !options.flags.contains(.automaticReconnect) { + .noReconnect + } else if heartbeatTime == .zero { + .reconnectOnSend + } else { + .reconnectInMonitor + } + } + + /// reconnect to the OCA device with exponential backoff, updating + /// connectionState + func reconnectDeviceWithBackoff() async throws { + var lastError: Error? + var backoff: Duration = options.reconnectPauseInterval + + _updateConnectionState(.reconnecting) + + logger + .trace( + "reconnecting: pauseInterval \(options.reconnectPauseInterval) maxTries \(options.reconnectMaxTries) exponentialBackoffThreshold \(options.reconnectExponentialBackoffThreshold)" + ) + + for i in 0.. OcaClassIdentification { guard objectNumber != OcaInvalidONo else { throw Ocp1Error.status(.badONo) } diff --git a/Sources/SwiftOCA/OCP.1/Ocp1Connection.swift b/Sources/SwiftOCA/OCP.1/Ocp1Connection.swift index d55e93b6..66c20799 100644 --- a/Sources/SwiftOCA/OCP.1/Ocp1Connection.swift +++ b/Sources/SwiftOCA/OCP.1/Ocp1Connection.swift @@ -20,11 +20,6 @@ import Atomics @preconcurrency import Foundation import Logging -#if canImport(Combine) -import Combine -#elseif canImport(OpenCombine) -import OpenCombine -#endif package let Ocp1MaximumDatagramPduSize = 1500 @@ -129,11 +124,18 @@ public struct Ocp1ConnectionOptions: Sendable { } public enum Ocp1ConnectionState: OcaUint8, Codable, Sendable { + /// controller has not been connected, or was explicitly disconnected case notConnected + /// controller is connecting case connecting + /// controller is connected case connected + /// controller is reconnecting (only if `automaticReconnect` flag is set) case reconnecting - case timedOut + /// missed heartbeat and `automaticReconnect` flag unset + case connectionTimedOut + /// connection failed + case connectionFailed } public struct Ocp1ConnectionStatistics: Sendable { @@ -176,7 +178,7 @@ open class Ocp1Connection: CustomStringConvertible, ObservableObject { .seconds(1) } - private let _connectionState = AsyncCurrentValueSubject(.notConnected) + let _connectionState = AsyncCurrentValueSubject(.notConnected) public let connectionState: AnyAsyncSequence /// Object interning @@ -248,11 +250,6 @@ open class Ocp1Connection: CustomStringConvertible, ObservableObject { try await receiveMessages(connection) } catch Ocp1Error.notConnected { resumeAllNotConnected() - if await connection.options.flags.contains(.automaticReconnect) { - try await connection.reconnectDevice() - } else { - throw Ocp1Error.notConnected - } } } @@ -308,8 +305,7 @@ open class Ocp1Connection: CustomStringConvertible, ObservableObject { /// actor for monitoring response and matching them with requests var monitor: Monitor? - - private var monitorTask: Task<(), Error>? + var monitorTask: Task<(), Error>? private func _configureTracing() { if options.flags.contains(.enableTracing) { @@ -334,100 +330,14 @@ open class Ocp1Connection: CustomStringConvertible, ObservableObject { return handle } - func reconnectDevice() async throws { - try await disconnectDevice(clearObjectCache: false) - - var lastError: Error? - var backoff: Duration = options.reconnectPauseInterval - - for i in 0.. .zero { - // send keepalive to open UDP connection - try await sendKeepAlive() - } + open func connectDevice() async throws {} - // refresh all objects - for (_, object) in objects { - await object.refresh() - } - - #if canImport(Combine) || canImport(OpenCombine) - objectWillChange.send() - #endif - - logger.info("Connected to \(self)") - } - - open func clearObjectCache() async { + public func clearObjectCache() async { objects = [:] await rootBlock.refreshAll() } - open func disconnectDevice(clearObjectCache: Bool) async throws { - if let monitor { - monitor.stop() - self.monitor = nil - } - monitorTask = nil - _connectionState.send(.notConnected) - - if clearObjectCache { - await self.clearObjectCache() - } - - #if canImport(Combine) || canImport(OpenCombine) - objectWillChange.send() - #endif - - logger.info("Disconnected from \(self)") - } - - public var isConnected: Bool { - _connectionState.value == .connected - } + open func disconnectDevice() async throws {} public nonisolated var description: String { connectionPrefix @@ -449,34 +359,6 @@ open class Ocp1Connection: CustomStringConvertible, ObservableObject { } } -/// Public API -public extension Ocp1Connection { - func connect() async throws { - _connectionState.send(.connecting) - do { - try await connectDeviceWithTimeout() - } catch Ocp1Error.connectionTimeout { - _connectionState.send(.timedOut) - throw Ocp1Error.connectionTimeout - } catch { - _connectionState.send(.notConnected) - throw error - } - _connectionState.send(.connected) - if options.flags.contains(.refreshDeviceTreeOnConnection) { - try? await refreshDeviceTree() - } - } - - func disconnect() async throws { - await removeSubscriptions() - try await disconnectDevice( - clearObjectCache: !options.flags - .contains(.retainObjectCacheAfterDisconnect) - ) - } -} - extension Ocp1Connection: Equatable { public nonisolated static func == (lhs: Ocp1Connection, rhs: Ocp1Connection) -> Bool { lhs.connectionPrefix == rhs.connectionPrefix diff --git a/Sources/SwiftOCA/OCP.1/Ocp1ConnectionMonitor.swift b/Sources/SwiftOCA/OCP.1/Ocp1ConnectionMonitor.swift index 0d659585..163f1282 100644 --- a/Sources/SwiftOCA/OCP.1/Ocp1ConnectionMonitor.swift +++ b/Sources/SwiftOCA/OCP.1/Ocp1ConnectionMonitor.swift @@ -113,7 +113,7 @@ extension Ocp1Connection.Monitor { .info( "\(connection): no heartbeat packet received in past \(keepAliveThreshold)" ) - throw Ocp1Error.responseTimeout + throw Ocp1Error.missingKeepalive } let timeSinceLastMessageSent = await now - connection.lastMessageSentTime @@ -135,12 +135,21 @@ extension Ocp1Connection.Monitor { do { try await receiveMessage(connection) } catch Ocp1Error.unknownPduType { - } catch Ocp1Error.invalidHandle {} + } catch Ocp1Error.invalidHandle { + } catch { + try await connection.onMonitorError(error) + throw error + } } while true } if connection.heartbeatTime > .zero { - group.addTask(priority: .background) { - try await self.keepAlive(connection) + group.addTask(priority: .background) { [self] in + do { + try await keepAlive(connection) + } catch { + try await connection.onMonitorError(error) + throw error + } } } try await group.next() diff --git a/Sources/SwiftOCADevice/OCC/ControlClasses/Agents/Grouper.swift b/Sources/SwiftOCADevice/OCC/ControlClasses/Agents/Grouper.swift index e9b1b601..942d1222 100644 --- a/Sources/SwiftOCADevice/OCC/ControlClasses/Agents/Grouper.swift +++ b/Sources/SwiftOCADevice/OCC/ControlClasses/Agents/Grouper.swift @@ -62,7 +62,7 @@ private actor OcaConnectionBroker { for addr in sequence(first: firstAddr, next: { $0.pointee.ai_next }) { let connection: Ocp1Connection let data = Data(bytes: addr.pointee.ai_addr, count: Int(addr.pointee.ai_addrlen)) - let options = Ocp1ConnectionOptions() + let options = Ocp1ConnectionOptions(flags: [.retainObjectCacheAfterDisconnect]) switch addr.pointee.ai_socktype { case SwiftOCA.SOCK_STREAM: @@ -452,7 +452,7 @@ open class OcaGrouper: OcaAgent { citizen: citizen, changeType: .citizenConnectionLost ) - try await connection.disconnectDevice(clearObjectCache: false) + try await connection.disconnect() Task { @OcaConnection in if !connection.isConnected { // check connected in case we are racing diff --git a/Sources/SwiftOCADevice/OCP.1/Backend/Local/LocalConnection.swift b/Sources/SwiftOCADevice/OCP.1/Backend/Local/LocalConnection.swift index 0c0de502..33b5caef 100644 --- a/Sources/SwiftOCADevice/OCP.1/Backend/Local/LocalConnection.swift +++ b/Sources/SwiftOCADevice/OCP.1/Backend/Local/LocalConnection.swift @@ -37,8 +37,8 @@ public final class OcaLocalConnection: Ocp1Connection { .zero } - override public func disconnectDevice(clearObjectCache: Bool) async throws { - try await super.disconnectDevice(clearObjectCache: clearObjectCache) + override public func disconnectDevice() async throws { + try await super.disconnectDevice() endpoint.responseChannel.finish() endpoint.requestChannel.finish() } diff --git a/SwiftOCA.xcodeproj/project.pbxproj b/SwiftOCA.xcodeproj/project.pbxproj index 18555b6b..455375f6 100644 --- a/SwiftOCA.xcodeproj/project.pbxproj +++ b/SwiftOCA.xcodeproj/project.pbxproj @@ -102,6 +102,8 @@ D39542302BA8FFC0003A025C /* Root+JSON.swift in Sources */ = {isa = PBXBuildFile; fileRef = D395422F2BA8FFC0003A025C /* Root+JSON.swift */; }; D3A0BE7C2B717BEC00A5AA05 /* Group.swift in Sources */ = {isa = PBXBuildFile; fileRef = D3A0BE7B2B717BEC00A5AA05 /* Group.swift */; }; D3A0BE7E2B71921900A5AA05 /* Group.swift in Sources */ = {isa = PBXBuildFile; fileRef = D3A0BE7D2B71921900A5AA05 /* Group.swift */; }; + D3AB030C2CF6768D00967322 /* Ocp1Connection+Connect.swift in Sources */ = {isa = PBXBuildFile; fileRef = D3AB030B2CF6768D00967322 /* Ocp1Connection+Connect.swift */; }; + D3AB030E2CF6BD8F00967322 /* Ocp1IORingConnection.swift in Sources */ = {isa = PBXBuildFile; fileRef = D3AB030D2CF6BD8F00967322 /* Ocp1IORingConnection.swift */; }; D3AD37792BF089CA00F3F713 /* SystemPackage in Frameworks */ = {isa = PBXBuildFile; productRef = D3AD37782BF089CA00F3F713 /* SystemPackage */; }; D3AFD3342B2A505D00A72E9A /* AudioProcessingManager.swift in Sources */ = {isa = PBXBuildFile; fileRef = D3AFD3332B2A505D00A72E9A /* AudioProcessingManager.swift */; }; D3AFD3362B2A516C00A72E9A /* DeviceTimeManager.swift in Sources */ = {isa = PBXBuildFile; fileRef = D3AFD3352B2A516C00A72E9A /* DeviceTimeManager.swift */; }; @@ -413,6 +415,8 @@ D395422F2BA8FFC0003A025C /* Root+JSON.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Root+JSON.swift"; sourceTree = ""; }; D3A0BE7B2B717BEC00A5AA05 /* Group.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Group.swift; sourceTree = ""; }; D3A0BE7D2B71921900A5AA05 /* Group.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Group.swift; sourceTree = ""; }; + D3AB030B2CF6768D00967322 /* Ocp1Connection+Connect.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Ocp1Connection+Connect.swift"; sourceTree = ""; }; + D3AB030D2CF6BD8F00967322 /* Ocp1IORingConnection.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Ocp1IORingConnection.swift; sourceTree = ""; }; D3AFD3332B2A505D00A72E9A /* AudioProcessingManager.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AudioProcessingManager.swift; sourceTree = ""; }; D3AFD3352B2A516C00A72E9A /* DeviceTimeManager.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DeviceTimeManager.swift; sourceTree = ""; }; D3AFD3372B2A52D500A72E9A /* TimeDataTypes.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = TimeDataTypes.swift; sourceTree = ""; }; @@ -914,6 +918,7 @@ D3E6E1B12A3ACA6C00BF7095 /* Ocp1Connection.swift */, D3E6E1A92A3ACA6C00BF7095 /* Ocp1ConnectionMonitor.swift */, D3E6E1AE2A3ACA6C00BF7095 /* Ocp1Connection+Codable.swift */, + D3AB030B2CF6768D00967322 /* Ocp1Connection+Connect.swift */, D3E6E1AA2A3ACA6C00BF7095 /* Ocp1Connection+Objects.swift */, D3E6E1AB2A3ACA6C00BF7095 /* Ocp1Connection+Messages.swift */, D3E6E1AC2A3ACA6C00BF7095 /* Ocp1Connection+Subscribe.swift */, @@ -927,6 +932,7 @@ D37F7E192A6BC015005F035F /* Backend */ = { isa = PBXGroup; children = ( + D3AB030D2CF6BD8F00967322 /* Ocp1IORingConnection.swift */, D36ECA2B2A5B24B30015174A /* Ocp1CFSocketConnection.swift */, D36ECA202A5AEB9B0015174A /* Ocp1FlyingSocksConnection.swift */, ); @@ -1860,6 +1866,7 @@ D3E6E1CB2A3ACA9000BF7095 /* Switch.swift in Sources */, D3E6E1B72A3ACA7B00BF7095 /* Ocp1Connection+Messages.swift in Sources */, D3AFD3592B2D226200A72E9A /* NetworkInterface.swift in Sources */, + D3AB030C2CF6768D00967322 /* Ocp1Connection+Connect.swift in Sources */, D3DA69EE2A3DD350001250A1 /* Task+Timeout.swift in Sources */, D3AFD3462B2CED0800A72E9A /* DiagnosticManager.swift in Sources */, D3E6E1BA2A3ACA7B00BF7095 /* Ocp1ConnectionMonitor.swift in Sources */, @@ -1903,6 +1910,7 @@ D3AFD3422B2CE91B00A72E9A /* MediaClockManager.swift in Sources */, D3E6E1BB2A3ACA7B00BF7095 /* Ocp1Connection.swift in Sources */, D3E6E1D22A3ACA9E00BF7095 /* SubscriptionManager.swift in Sources */, + D3AB030E2CF6BD8F00967322 /* Ocp1IORingConnection.swift in Sources */, D3AFD33A2B2A558700A72E9A /* MediaClockDataTypes.swift in Sources */, D3E6E1BC2A3ACA7E00BF7095 /* Errors.swift in Sources */, D3AFD3542B2D082B00A72E9A /* ControlNetwork.swift in Sources */,