Skip to content

Commit

Permalink
catch connection monitor error from taskGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
lhoward committed Nov 27, 2024
1 parent 283b866 commit 1087b6b
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 34 deletions.
12 changes: 2 additions & 10 deletions Sources/SwiftOCA/OCP.1/Ocp1Connection+Connect.swift
Original file line number Diff line number Diff line change
Expand Up @@ -301,21 +301,13 @@ extension Ocp1Connection {
}
}

private func _onMonitorError(_ error: Error) async throws {
func onMonitorError(id: Int, _ error: Error) async throws {
_updateConnectionState(error.ocp1ConnectionState)

if _automaticReconnect, error._isRecoverableConnectionError {
logger.trace("monitor task disconnecting: \(error)")
logger.trace("monitor task \(id) disconnecting: \(error)")
try await _disconnectDeviceAfterConnectionFailure()
Task.detached { try await self.reconnectDeviceWithBackoff() }
}
}

func onKeepAliveMonitorError(_ error: Error) async throws {
try await _onMonitorError(error)
}

func onReceiveMessageMonitorError(_ error: Error) async throws {
try await _onMonitorError(error)
}
}
2 changes: 1 addition & 1 deletion Sources/SwiftOCA/OCP.1/Ocp1Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ open class Ocp1Connection: CustomStringConvertible, ObservableObject {
typealias Continuation = CheckedContinuation<Ocp1Response, Error>

private let _connection: Weak<Ocp1Connection>
private let _connectionID: Int
let _connectionID: Int
private let _continuations = ManagedCriticalState<[OcaUint32: Continuation]>([:])
private var _lastMessageReceivedTime = ManagedAtomic<UInt64>(0)

Expand Down
41 changes: 18 additions & 23 deletions Sources/SwiftOCA/OCP.1/Ocp1ConnectionMonitor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -136,33 +136,28 @@ extension Ocp1Connection.Monitor {
}

func receiveMessages(_ connection: Ocp1Connection) async throws {
try await withThrowingTaskGroup(of: Void.self) { @OcaConnection group in
group.addTask { [self] in
repeat {
try Task.checkCancellation()
do {
try await receiveMessage(connection)
} catch Ocp1Error.unknownPduType {
} catch Ocp1Error.invalidHandle {
} catch {
guard await !connection.isConnecting else { continue }
try await connection.onReceiveMessageMonitorError(error)
throw error
}
} while true
}
if connection.heartbeatTime > .zero {
group.addTask(priority: .background) { [self] in
do {
do {
try await withThrowingTaskGroup(of: Void.self) { @OcaConnection group in
group.addTask { [self] in
repeat {
try Task.checkCancellation()
do {
try await receiveMessage(connection)
} catch Ocp1Error.unknownPduType {
} catch Ocp1Error.invalidHandle {}
} while true
}
if connection.heartbeatTime > .zero {
group.addTask(priority: .background) { [self] in
try await keepAlive(connection)
} catch {
try await connection.onKeepAliveMonitorError(error)
throw error
}
}
try await group.next()
group.cancelAll()
}
try await group.next()
group.cancelAll()
} catch {
try? await connection.onMonitorError(id: _connectionID, error)
throw error
}
}
}

0 comments on commit 1087b6b

Please sign in to comment.