Skip to content

Commit

Permalink
reduce actor-hopping my moving Monitor to @OcaConnection actor
Browse files Browse the repository at this point in the history
  • Loading branch information
lhoward committed Nov 27, 2024
1 parent 4126764 commit 0f9d4c6
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 14 deletions.
1 change: 1 addition & 0 deletions Sources/SwiftOCA/OCP.1/Ocp1Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ open class Ocp1Connection: CustomStringConvertible, ObservableObject {
}

/// Monitor structure for matching requests and responses
@OcaConnection
final class Monitor: @unchecked Sendable, CustomStringConvertible {
typealias Continuation = CheckedContinuation<Ocp1Response, Error>

Expand Down
26 changes: 12 additions & 14 deletions Sources/SwiftOCA/OCP.1/Ocp1ConnectionMonitor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ extension Ocp1Connection.Monitor {
/// just parse enough of the protocol in order to read rest of message
/// `syncVal: OcaUint8` || `protocolVersion: OcaUint16` || `pduSize: OcaUint32`
guard messagePduData.count >= Ocp1Connection.MinimumPduSize else {
await connection.logger.warning("PDU of size \(messagePduData.count) is too short")
connection.logger.warning("PDU of size \(messagePduData.count) is too short")
throw Ocp1Error.pduTooShort
}
guard messagePduData[0] == Ocp1SyncValue else {
await connection.logger.warning(
connection.logger.warning(
"PDU has invalid sync value \(messagePduData.prefix(1).hexEncodedString())"
)
throw Ocp1Error.invalidSyncValue
Expand All @@ -45,7 +45,7 @@ extension Ocp1Connection.Monitor {
let pduSize: OcaUint32 = messagePduData.decodeInteger(index: 3)
guard pduSize >= (Ocp1Connection.MinimumPduSize - 1)
else { // doesn't include sync byte
await connection.logger.warning("PDU size \(pduSize) is less than minimum PDU size")
connection.logger.warning("PDU size \(pduSize) is less than minimum PDU size")
throw Ocp1Error.invalidPduSize
}

Expand All @@ -66,10 +66,10 @@ extension Ocp1Connection.Monitor {
) async throws {
switch message {
case is Ocp1Command:
await connection.logger.warning("device sent unexpected command \(message); ignoring")
connection.logger.warning("device sent unexpected command \(message); ignoring")
case let notification as Ocp1Notification1:
if notification.parameters.parameterCount == 2 {
await connection.notifySubscribers(
connection.notifySubscribers(
of: notification.parameters.eventData.event,
with: notification.parameters.eventData.eventParameters
)
Expand All @@ -82,13 +82,12 @@ extension Ocp1Connection.Monitor {
break
case let notification as Ocp1Notification2:
try notification.throwIfException()
await connection.notifySubscribers(of: notification.event, with: notification.data)
connection.notifySubscribers(of: notification.event, with: notification.data)
default:
throw Ocp1Error.unknownPduType
}
}

@OcaConnection
private func _markDatagramConnectionConnected(_ connection: Ocp1Connection) async {
if connection.isDatagram, connection.isConnecting {
connection.markConnectionConnected()
Expand All @@ -111,22 +110,21 @@ extension Ocp1Connection.Monitor {
}

private func keepAlive(_ connection: Ocp1Connection) async throws {
let heartbeatTime = await connection.heartbeatTime
let keepAliveThreshold = heartbeatTime * 3
let keepAliveThreshold = connection.heartbeatTime * 3

repeat {
let now = Self.now
if now - lastMessageReceivedTime >= keepAliveThreshold.seconds {
await connection.logger
connection.logger
.info(
"\(connection): no heartbeat packet received in past \(keepAliveThreshold)"
)
throw Ocp1Error.missingKeepalive
}

let timeSinceLastMessageSent = await now - connection.lastMessageSentTime
var sleepTime = heartbeatTime
if timeSinceLastMessageSent >= heartbeatTime.seconds {
let timeSinceLastMessageSent = now - connection.lastMessageSentTime
var sleepTime = connection.heartbeatTime
if timeSinceLastMessageSent >= connection.heartbeatTime.seconds {
try await connection.sendKeepAlive()
} else {
sleepTime -= .seconds(timeSinceLastMessageSent)
Expand Down Expand Up @@ -159,7 +157,7 @@ extension Ocp1Connection.Monitor {
// if we're not already in the middle of connecting or re-connecting,
// possibly trigger a reconnect depending on the autoReconnect policy
// and the nature of the error
if await !connection.isConnecting {
if !connection.isConnecting {
try? await connection.onMonitorError(id: _connectionID, error)
}
throw error
Expand Down

0 comments on commit 0f9d4c6

Please sign in to comment.