diff --git a/Sources/SwiftOCA/OCP.1/Ocp1Connection+Connect.swift b/Sources/SwiftOCA/OCP.1/Ocp1Connection+Connect.swift index 06a4942..a2b8515 100644 --- a/Sources/SwiftOCA/OCP.1/Ocp1Connection+Connect.swift +++ b/Sources/SwiftOCA/OCP.1/Ocp1Connection+Connect.swift @@ -27,7 +27,7 @@ import OpenCombine import SystemPackage private extension Ocp1Error { - var connectionState: Ocp1ConnectionState? { + var ocp1ConnectionState: Ocp1ConnectionState? { switch self { case .notConnected: .notConnected @@ -75,7 +75,7 @@ private extension Errno { private extension Error { var ocp1ConnectionState: Ocp1ConnectionState { - (self as? Ocp1Error)?.connectionState ?? .connectionFailed + (self as? Ocp1Error)?.ocp1ConnectionState ?? .connectionFailed } var _isRecoverableConnectionError: Bool { @@ -109,7 +109,8 @@ private extension Ocp1ConnectionState { extension Ocp1Connection { /// start receiveMessages/keepAlive monitor task private func _startMonitor() { - let monitor = Monitor(self) + connectionID &+= 1 + let monitor = Monitor(self, id: connectionID) monitorTask = Task { try await monitor.run() } @@ -160,18 +161,20 @@ extension Ocp1Connection { } } - func updateConnectionState(_ connectionState: Ocp1ConnectionState) { + private func _updateConnectionState(_ connectionState: Ocp1ConnectionState) { logger.trace("_updateConnectionState: \(_connectionState.value) => \(connectionState)") - if connectionState == .connected { - logger.info("connected to \(self)") - } _connectionState.send(connectionState) } + func markConnectionConnected() { + logger.info("connected to \(self)") + _updateConnectionState(.connected) + } + private func _didConnectDevice() async throws { if !isDatagram { // otherwise, set connected state when we receive first keepAlive PDU - updateConnectionState(.connected) + markConnectionConnected() } _startMonitor() @@ -190,13 +193,13 @@ extension Ocp1Connection { } public func connect() async throws { - updateConnectionState(.connecting) + _updateConnectionState(.connecting) do { try await _connectDeviceWithTimeout() } catch { logger.debug("connection failed: \(error)") - updateConnectionState(error.ocp1ConnectionState) + _updateConnectionState(error.ocp1ConnectionState) throw error } @@ -243,7 +246,7 @@ extension Ocp1Connection { public func disconnect() async throws { await removeSubscriptions() - updateConnectionState(.notConnected) + _updateConnectionState(.notConnected) let clearObjectCache = !options.flags.contains(.retainObjectCacheAfterDisconnect) try await _disconnectDevice(clearObjectCache: clearObjectCache) @@ -253,32 +256,8 @@ extension Ocp1Connection { // MARK: - reconnection handling extension Ocp1Connection { - enum ReconnectionPolicy { - /// do not try to automatically reconnect on connection failure - case noReconnect - /// try to reconnect in the keepAlive monitor task - case reconnectInMonitor - /// try to reconnect before sending the next message - case reconnectOnSend - } - - /// - /// Re-connection logic is as follows: - /// - /// * If the connection has a heartbeat, then automatic reconnection is only - /// managed in the / heartbeat task - /// - /// * If the connection does not have a heartbeat, than automatic - /// reconnection is managed when / sending a PDU - /// - private var _reconnectionPolicy: ReconnectionPolicy { - if !options.flags.contains(.automaticReconnect) { - .noReconnect - } else if heartbeatTime == .zero { - .reconnectOnSend - } else { - .reconnectInMonitor - } + private var _automaticReconnect: Bool { + options.flags.contains(.automaticReconnect) } /// reconnect to the OCA device with exponential backoff, updating @@ -287,7 +266,7 @@ extension Ocp1Connection { var lastError: Error? var backoff: Duration = options.reconnectPauseInterval - updateConnectionState(.reconnecting) + _updateConnectionState(.reconnecting) logger .trace( @@ -313,69 +292,22 @@ extension Ocp1Connection { if let lastError { logger.debug("reconnection abandoned: \(lastError)") - updateConnectionState(lastError.ocp1ConnectionState) + _updateConnectionState(lastError.ocp1ConnectionState) throw lastError } else if !isDatagram && !isConnected { logger.trace("reconnection abandoned after too many tries") - updateConnectionState(.notConnected) + _updateConnectionState(.notConnected) throw Ocp1Error.notConnected } } - private var _needsReconnectOnSend: Bool { - guard _reconnectionPolicy == .reconnectOnSend else { return false } + func onMonitorError(id: Int, _ error: Error) async throws { + _updateConnectionState(error.ocp1ConnectionState) - switch _connectionState.value { - case .notConnected: - fallthrough - case .connectionTimedOut: - fallthrough - case .connectionFailed: - return true - default: - return false - } - } - - func willSendMessage() async throws { - guard _needsReconnectOnSend else { return } - try await reconnectDeviceWithBackoff() - } - - func didSendMessage(error: Ocp1Error? = nil) async throws { - if error == nil { - lastMessageSentTime = Monitor.now - } - - if _reconnectionPolicy != .reconnectInMonitor, let error, - let connectionState = error.connectionState - { - logger - .debug( - "failed to send message: error \(error), new connection state \(connectionState); disconnecting" - ) - if isConnected { - updateConnectionState(connectionState) - try await _disconnectDeviceAfterConnectionFailure() - } - } - } - - private func _onMonitorError(_ error: Error) async throws { - updateConnectionState(error.ocp1ConnectionState) - - if error._isRecoverableConnectionError, _reconnectionPolicy == .reconnectInMonitor { - logger.trace("expiring connection after receiving error \(error)") + if _automaticReconnect, error._isRecoverableConnectionError { + logger.trace("monitor task \(id) disconnecting: \(error)") try await _disconnectDeviceAfterConnectionFailure() Task.detached { try await self.reconnectDeviceWithBackoff() } } } - - func onKeepAliveMonitorError(_ error: Error) async throws { - try await _onMonitorError(error) - } - - func onReceiveMessageMonitorError(_ error: Error) async throws { - try await _onMonitorError(error) - } } diff --git a/Sources/SwiftOCA/OCP.1/Ocp1Connection+Messages.swift b/Sources/SwiftOCA/OCP.1/Ocp1Connection+Messages.swift index 6ba6c1a..6116935 100644 --- a/Sources/SwiftOCA/OCP.1/Ocp1Connection+Messages.swift +++ b/Sources/SwiftOCA/OCP.1/Ocp1Connection+Messages.swift @@ -23,18 +23,9 @@ extension Ocp1Connection { ) async throws { let messagePduData = try Self.encodeOcp1MessagePdu(messages, type: messageType) - try await willSendMessage() - - do { - guard try await write(messagePduData) == messagePduData.count else { - throw Ocp1Error.pduSendingFailed - } - } catch let error as Ocp1Error { - try await didSendMessage(error: error) - throw error + guard try await write(messagePduData) == messagePduData.count else { + throw Ocp1Error.pduSendingFailed } - - try await didSendMessage() } private func sendMessage( diff --git a/Sources/SwiftOCA/OCP.1/Ocp1Connection.swift b/Sources/SwiftOCA/OCP.1/Ocp1Connection.swift index 0f31758..504d7ed 100644 --- a/Sources/SwiftOCA/OCP.1/Ocp1Connection.swift +++ b/Sources/SwiftOCA/OCP.1/Ocp1Connection.swift @@ -141,6 +141,7 @@ public enum Ocp1ConnectionState: OcaUint8, Codable, Sendable { public struct Ocp1ConnectionStatistics: Sendable { public let connectionState: Ocp1ConnectionState + public let connectionID: Int public var isConnected: Bool { connectionState == .connected } public let requestCount: Int public let outstandingRequests: [OcaUint32] @@ -200,6 +201,7 @@ open class Ocp1Connection: CustomStringConvertible, ObservableObject { var subscriptions = [OcaEvent: EventSubscriptions]() var logger = Logger(label: "com.padl.SwiftOCA") + var connectionID = 0 private var nextCommandHandle = CommandHandleBase @@ -214,6 +216,7 @@ open class Ocp1Connection: CustomStringConvertible, ObservableObject { public var statistics: Ocp1ConnectionStatistics { Ocp1ConnectionStatistics( connectionState: _connectionState.value, + connectionID: connectionID, requestCount: Int(nextCommandHandle - CommandHandleBase), outstandingRequests: monitor?.outstandingRequests ?? [], cachedObjectCount: objects.count, @@ -225,10 +228,11 @@ open class Ocp1Connection: CustomStringConvertible, ObservableObject { } /// Monitor structure for matching requests and responses - final class Monitor: @unchecked Sendable { + final class Monitor: @unchecked Sendable, CustomStringConvertible { typealias Continuation = CheckedContinuation private let _connection: Weak + let _connectionID: Int private let _continuations = ManagedCriticalState<[OcaUint32: Continuation]>([:]) private var _lastMessageReceivedTime = ManagedAtomic(0) @@ -236,8 +240,9 @@ open class Ocp1Connection: CustomStringConvertible, ObservableObject { UInt64(time(nil)) } - init(_ connection: Ocp1Connection) { + init(_ connection: Ocp1Connection, id: Int) { _connection = Weak(connection) + _connectionID = id updateLastMessageReceivedTime() } @@ -302,6 +307,13 @@ open class Ocp1Connection: CustomStringConvertible, ObservableObject { var lastMessageReceivedTime: UInt64 { _lastMessageReceivedTime.load(ordering: .relaxed) } + + var description: String { + let connectionString: String = if let connection { connection.description } + else { "" } + + return "\(connectionString)[\(_connectionID)]" + } } /// actor for monitoring response and matching them with requests diff --git a/Sources/SwiftOCA/OCP.1/Ocp1ConnectionMonitor.swift b/Sources/SwiftOCA/OCP.1/Ocp1ConnectionMonitor.swift index dc7f502..7d393e7 100644 --- a/Sources/SwiftOCA/OCP.1/Ocp1ConnectionMonitor.swift +++ b/Sources/SwiftOCA/OCP.1/Ocp1ConnectionMonitor.swift @@ -91,7 +91,7 @@ extension Ocp1Connection.Monitor { @OcaConnection private func _markDatagramConnectionConnected(_ connection: Ocp1Connection) async { if connection.isDatagram, connection.isConnecting { - connection.updateConnectionState(.connected) + connection.markConnectionConnected() } } @@ -136,33 +136,33 @@ extension Ocp1Connection.Monitor { } func receiveMessages(_ connection: Ocp1Connection) async throws { - try await withThrowingTaskGroup(of: Void.self) { @OcaConnection group in - group.addTask { [self] in - repeat { - try Task.checkCancellation() - do { - try await receiveMessage(connection) - } catch Ocp1Error.unknownPduType { - } catch Ocp1Error.invalidHandle { - } catch { - guard await !connection.isConnecting else { continue } - try await connection.onReceiveMessageMonitorError(error) - throw error - } - } while true - } - if connection.heartbeatTime > .zero { - group.addTask(priority: .background) { [self] in - do { + do { + try await withThrowingTaskGroup(of: Void.self) { @OcaConnection group in + group.addTask { [self] in + repeat { + try Task.checkCancellation() + do { + try await receiveMessage(connection) + } catch Ocp1Error.unknownPduType { + } catch Ocp1Error.invalidHandle {} + } while true + } + if connection.heartbeatTime > .zero { + group.addTask(priority: .background) { [self] in try await keepAlive(connection) - } catch { - try await connection.onKeepAliveMonitorError(error) - throw error } } + try await group.next() + group.cancelAll() + } + } catch { + // if we're not already in the middle of connecting or re-connecting, + // possibly trigger a reconnect depending on the autoReconnect policy + // and the nature of the error + if await !connection.isConnecting { + try? await connection.onMonitorError(id: _connectionID, error) } - try await group.next() - group.cancelAll() + throw error } } }