Skip to content

Commit

Permalink
simplify reconnection logic by always reconnecting from Monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
lhoward committed Nov 27, 2024
1 parent 660dd34 commit 3ff7417
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 128 deletions.
114 changes: 23 additions & 91 deletions Sources/SwiftOCA/OCP.1/Ocp1Connection+Connect.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import OpenCombine
import SystemPackage

private extension Ocp1Error {
var connectionState: Ocp1ConnectionState? {
var ocp1ConnectionState: Ocp1ConnectionState? {
switch self {
case .notConnected:
.notConnected
Expand Down Expand Up @@ -75,7 +75,7 @@ private extension Errno {

private extension Error {
var ocp1ConnectionState: Ocp1ConnectionState {
(self as? Ocp1Error)?.connectionState ?? .connectionFailed
(self as? Ocp1Error)?.ocp1ConnectionState ?? .connectionFailed
}

var _isRecoverableConnectionError: Bool {
Expand Down Expand Up @@ -109,7 +109,8 @@ private extension Ocp1ConnectionState {
extension Ocp1Connection {
/// start receiveMessages/keepAlive monitor task
private func _startMonitor() {
let monitor = Monitor(self)
connectionID &+= 1
let monitor = Monitor(self, id: connectionID)
monitorTask = Task {
try await monitor.run()
}
Expand Down Expand Up @@ -160,18 +161,20 @@ extension Ocp1Connection {
}
}

func updateConnectionState(_ connectionState: Ocp1ConnectionState) {
private func _updateConnectionState(_ connectionState: Ocp1ConnectionState) {
logger.trace("_updateConnectionState: \(_connectionState.value) => \(connectionState)")
if connectionState == .connected {
logger.info("connected to \(self)")
}
_connectionState.send(connectionState)
}

func markConnectionConnected() {
logger.info("connected to \(self)")
_updateConnectionState(.connected)
}

private func _didConnectDevice() async throws {
if !isDatagram {
// otherwise, set connected state when we receive first keepAlive PDU
updateConnectionState(.connected)
markConnectionConnected()
}

_startMonitor()
Expand All @@ -190,13 +193,13 @@ extension Ocp1Connection {
}

public func connect() async throws {
updateConnectionState(.connecting)
_updateConnectionState(.connecting)

do {
try await _connectDeviceWithTimeout()
} catch {
logger.debug("connection failed: \(error)")
updateConnectionState(error.ocp1ConnectionState)
_updateConnectionState(error.ocp1ConnectionState)
throw error
}

Expand Down Expand Up @@ -243,7 +246,7 @@ extension Ocp1Connection {
public func disconnect() async throws {
await removeSubscriptions()

updateConnectionState(.notConnected)
_updateConnectionState(.notConnected)

let clearObjectCache = !options.flags.contains(.retainObjectCacheAfterDisconnect)
try await _disconnectDevice(clearObjectCache: clearObjectCache)
Expand All @@ -253,32 +256,8 @@ extension Ocp1Connection {
// MARK: - reconnection handling

extension Ocp1Connection {
enum ReconnectionPolicy {
/// do not try to automatically reconnect on connection failure
case noReconnect
/// try to reconnect in the keepAlive monitor task
case reconnectInMonitor
/// try to reconnect before sending the next message
case reconnectOnSend
}

///
/// Re-connection logic is as follows:
///
/// * If the connection has a heartbeat, then automatic reconnection is only
/// managed in the / heartbeat task
///
/// * If the connection does not have a heartbeat, than automatic
/// reconnection is managed when / sending a PDU
///
private var _reconnectionPolicy: ReconnectionPolicy {
if !options.flags.contains(.automaticReconnect) {
.noReconnect
} else if heartbeatTime == .zero {
.reconnectOnSend
} else {
.reconnectInMonitor
}
private var _automaticReconnect: Bool {
options.flags.contains(.automaticReconnect)
}

/// reconnect to the OCA device with exponential backoff, updating
Expand All @@ -287,7 +266,7 @@ extension Ocp1Connection {
var lastError: Error?
var backoff: Duration = options.reconnectPauseInterval

updateConnectionState(.reconnecting)
_updateConnectionState(.reconnecting)

logger
.trace(
Expand All @@ -313,69 +292,22 @@ extension Ocp1Connection {

if let lastError {
logger.debug("reconnection abandoned: \(lastError)")
updateConnectionState(lastError.ocp1ConnectionState)
_updateConnectionState(lastError.ocp1ConnectionState)
throw lastError
} else if !isDatagram && !isConnected {
logger.trace("reconnection abandoned after too many tries")
updateConnectionState(.notConnected)
_updateConnectionState(.notConnected)
throw Ocp1Error.notConnected
}
}

private var _needsReconnectOnSend: Bool {
guard _reconnectionPolicy == .reconnectOnSend else { return false }
func onMonitorError(id: Int, _ error: Error) async throws {
_updateConnectionState(error.ocp1ConnectionState)

switch _connectionState.value {
case .notConnected:
fallthrough
case .connectionTimedOut:
fallthrough
case .connectionFailed:
return true
default:
return false
}
}

func willSendMessage() async throws {
guard _needsReconnectOnSend else { return }
try await reconnectDeviceWithBackoff()
}

func didSendMessage(error: Ocp1Error? = nil) async throws {
if error == nil {
lastMessageSentTime = Monitor.now
}

if _reconnectionPolicy != .reconnectInMonitor, let error,
let connectionState = error.connectionState
{
logger
.debug(
"failed to send message: error \(error), new connection state \(connectionState); disconnecting"
)
if isConnected {
updateConnectionState(connectionState)
try await _disconnectDeviceAfterConnectionFailure()
}
}
}

private func _onMonitorError(_ error: Error) async throws {
updateConnectionState(error.ocp1ConnectionState)

if error._isRecoverableConnectionError, _reconnectionPolicy == .reconnectInMonitor {
logger.trace("expiring connection after receiving error \(error)")
if _automaticReconnect, error._isRecoverableConnectionError {
logger.trace("monitor task \(id) disconnecting: \(error)")
try await _disconnectDeviceAfterConnectionFailure()
Task.detached { try await self.reconnectDeviceWithBackoff() }
}
}

func onKeepAliveMonitorError(_ error: Error) async throws {
try await _onMonitorError(error)
}

func onReceiveMessageMonitorError(_ error: Error) async throws {
try await _onMonitorError(error)
}
}
13 changes: 2 additions & 11 deletions Sources/SwiftOCA/OCP.1/Ocp1Connection+Messages.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,9 @@ extension Ocp1Connection {
) async throws {
let messagePduData = try Self.encodeOcp1MessagePdu(messages, type: messageType)

try await willSendMessage()

do {
guard try await write(messagePduData) == messagePduData.count else {
throw Ocp1Error.pduSendingFailed
}
} catch let error as Ocp1Error {
try await didSendMessage(error: error)
throw error
guard try await write(messagePduData) == messagePduData.count else {
throw Ocp1Error.pduSendingFailed
}

try await didSendMessage()
}

private func sendMessage(
Expand Down
16 changes: 14 additions & 2 deletions Sources/SwiftOCA/OCP.1/Ocp1Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public enum Ocp1ConnectionState: OcaUint8, Codable, Sendable {

public struct Ocp1ConnectionStatistics: Sendable {
public let connectionState: Ocp1ConnectionState
public let connectionID: Int
public var isConnected: Bool { connectionState == .connected }
public let requestCount: Int
public let outstandingRequests: [OcaUint32]
Expand Down Expand Up @@ -200,6 +201,7 @@ open class Ocp1Connection: CustomStringConvertible, ObservableObject {

var subscriptions = [OcaEvent: EventSubscriptions]()
var logger = Logger(label: "com.padl.SwiftOCA")
var connectionID = 0

private var nextCommandHandle = CommandHandleBase

Expand All @@ -214,6 +216,7 @@ open class Ocp1Connection: CustomStringConvertible, ObservableObject {
public var statistics: Ocp1ConnectionStatistics {
Ocp1ConnectionStatistics(
connectionState: _connectionState.value,
connectionID: connectionID,
requestCount: Int(nextCommandHandle - CommandHandleBase),
outstandingRequests: monitor?.outstandingRequests ?? [],
cachedObjectCount: objects.count,
Expand All @@ -225,19 +228,21 @@ open class Ocp1Connection: CustomStringConvertible, ObservableObject {
}

/// Monitor structure for matching requests and responses
final class Monitor: @unchecked Sendable {
final class Monitor: @unchecked Sendable, CustomStringConvertible {
typealias Continuation = CheckedContinuation<Ocp1Response, Error>

private let _connection: Weak<Ocp1Connection>
let _connectionID: Int
private let _continuations = ManagedCriticalState<[OcaUint32: Continuation]>([:])
private var _lastMessageReceivedTime = ManagedAtomic<UInt64>(0)

static var now: UInt64 {
UInt64(time(nil))
}

init(_ connection: Ocp1Connection) {
init(_ connection: Ocp1Connection, id: Int) {
_connection = Weak(connection)
_connectionID = id
updateLastMessageReceivedTime()
}

Expand Down Expand Up @@ -302,6 +307,13 @@ open class Ocp1Connection: CustomStringConvertible, ObservableObject {
var lastMessageReceivedTime: UInt64 {
_lastMessageReceivedTime.load(ordering: .relaxed)
}

var description: String {
let connectionString: String = if let connection { connection.description }
else { "<null>" }

return "\(connectionString)[\(_connectionID)]"
}
}

/// actor for monitoring response and matching them with requests
Expand Down
48 changes: 24 additions & 24 deletions Sources/SwiftOCA/OCP.1/Ocp1ConnectionMonitor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ extension Ocp1Connection.Monitor {
@OcaConnection
private func _markDatagramConnectionConnected(_ connection: Ocp1Connection) async {
if connection.isDatagram, connection.isConnecting {
connection.updateConnectionState(.connected)
connection.markConnectionConnected()
}
}

Expand Down Expand Up @@ -136,33 +136,33 @@ extension Ocp1Connection.Monitor {
}

func receiveMessages(_ connection: Ocp1Connection) async throws {
try await withThrowingTaskGroup(of: Void.self) { @OcaConnection group in
group.addTask { [self] in
repeat {
try Task.checkCancellation()
do {
try await receiveMessage(connection)
} catch Ocp1Error.unknownPduType {
} catch Ocp1Error.invalidHandle {
} catch {
guard await !connection.isConnecting else { continue }
try await connection.onReceiveMessageMonitorError(error)
throw error
}
} while true
}
if connection.heartbeatTime > .zero {
group.addTask(priority: .background) { [self] in
do {
do {
try await withThrowingTaskGroup(of: Void.self) { @OcaConnection group in
group.addTask { [self] in
repeat {
try Task.checkCancellation()
do {
try await receiveMessage(connection)
} catch Ocp1Error.unknownPduType {
} catch Ocp1Error.invalidHandle {}
} while true
}
if connection.heartbeatTime > .zero {
group.addTask(priority: .background) { [self] in
try await keepAlive(connection)
} catch {
try await connection.onKeepAliveMonitorError(error)
throw error
}
}
try await group.next()
group.cancelAll()
}
} catch {
// if we're not already in the middle of connecting or re-connecting,
// possibly trigger a reconnect depending on the autoReconnect policy
// and the nature of the error
if await !connection.isConnecting {
try? await connection.onMonitorError(id: _connectionID, error)
}
try await group.next()
group.cancelAll()
throw error
}
}
}

0 comments on commit 3ff7417

Please sign in to comment.