Skip to content

Commit

Permalink
Use AsyncStream for device recordings, add biopot recording tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Supereg committed Apr 18, 2024
1 parent d418858 commit bed35cd
Show file tree
Hide file tree
Showing 16 changed files with 397 additions and 247 deletions.
22 changes: 0 additions & 22 deletions NAMS.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,6 @@
653A2549283387FE005D4D48 /* Sources */,
653A254A283387FE005D4D48 /* Frameworks */,
653A254B283387FE005D4D48 /* Resources */,
2F5B528D29BD237B002020B7 /* ShellScript */,
);
buildRules = (
);
Expand Down Expand Up @@ -1281,27 +1280,6 @@
};
/* End PBXResourcesBuildPhase section */

/* Begin PBXShellScriptBuildPhase section */
2F5B528D29BD237B002020B7 /* ShellScript */ = {
isa = PBXShellScriptBuildPhase;
alwaysOutOfDate = 1;
buildActionMask = 2147483647;
files = (
);
inputFileListPaths = (
);
inputPaths = (
);
outputFileListPaths = (
);
outputPaths = (
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
shellScript = "if [ \"${CONFIGURATION}\" = \"Debug\" ]; then\n export PATH=\"$PATH:/opt/homebrew/bin\"\n if which swiftlint > /dev/null; then\n swiftlint\n else\n echo \"warning: SwiftLint not installed, download from https://github.com/realm/SwiftLint\"\n fi\nfi\n";
};
/* End PBXShellScriptBuildPhase section */

/* Begin PBXSourcesBuildPhase section */
653A2549283387FE005D4D48 /* Sources */ = {
isa = PBXSourcesBuildPhase;
Expand Down
14 changes: 3 additions & 11 deletions NAMS/Devices/Biopot/BiopotDevice.swift
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,8 @@ class BiopotDevice: BluetoothDevice, Identifiable, NAMSDevice {
}
}

func prepareRecording() async throws {
logger.debug("Preparing to record for biopot \(self.name ?? "")")
try await service.prepareRecording()
}

func startRecording(_ session: EEGRecordingSession) async throws {
try await service.startRecording(session)
}

func stopRecording() async throws {
try await service.stopRecording()
func startRecording() async throws -> AsyncStream<CombinedEEGSample> {
try await service.startRecording()
}

Check warning on line 117 in NAMS/Devices/Biopot/BiopotDevice.swift

View check run for this annotation

Codecov / codecov/patch

NAMS/Devices/Biopot/BiopotDevice.swift#L115-L117

Added lines #L115 - L117 were not covered by tests
}

Expand Down Expand Up @@ -167,6 +158,7 @@ extension BiopotDevice {
batteryCharging: true
))
biopot.service.$samplingConfiguration.inject(SamplingConfiguration())
biopot.service.$deviceConfiguration.inject(DeviceConfiguration())
biopot.deviceInformation.$firmwareRevision.inject("1.2.3")
biopot.deviceInformation.$serialNumber.inject(serial)
biopot.deviceInformation.$hardwareRevision.inject("3.1")
Expand Down
53 changes: 41 additions & 12 deletions NAMS/Devices/Biopot/BiopotService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ class BiopotService: BluetoothService {
var dataAcquisition: Data? // either `DataAcquisition10` or `DataAcquisition11` depending on the configuration.


@EEGProcessing private var recordingSession: EEGRecordingSession?
@EEGProcessing private var recordingStream: AsyncStream<CombinedEEGSample>.Continuation? {
willSet {
recordingStream?.finish()
}
}

@EEGProcessing private var nextExpectedSampleCount: UInt32 = 0

Expand All @@ -66,7 +70,7 @@ class BiopotService: BluetoothService {
}

@EEGProcessing
func prepareRecording() async throws {
private func prepareRecording() async throws {
do {
// make sure the value is up to date before the recording session is created
try await $deviceConfiguration.read()
Expand All @@ -85,17 +89,42 @@ class BiopotService: BluetoothService {
}

Check warning on line 89 in NAMS/Devices/Biopot/BiopotService.swift

View check run for this annotation

Codecov / codecov/patch

NAMS/Devices/Biopot/BiopotService.swift#L73-L89

Added lines #L73 - L89 were not covered by tests

@EEGProcessing
func startRecording(_ session: EEGRecordingSession) async throws {
recordingSession = session
func startRecording() async throws -> AsyncStream<CombinedEEGSample> {
try await self.prepareRecording()
try await self.enableRecording()
return _makeStream()
}

Check warning on line 96 in NAMS/Devices/Biopot/BiopotService.swift

View check run for this annotation

Codecov / codecov/patch

NAMS/Devices/Biopot/BiopotService.swift#L92-L96

Added lines #L92 - L96 were not covered by tests

@EEGProcessing
func stopRecording() async throws {
try await $dataControl.write(.paused)
recordingSession = nil
func _makeStream() -> AsyncStream<CombinedEEGSample> { // swiftlint:disable:this identifier_name
AsyncStream { continuation in
continuation.onTermination = { [weak self] termination in
guard case .cancelled = termination else {
return // we don't care about finished sequences!

Check warning on line 103 in NAMS/Devices/Biopot/BiopotService.swift

View check run for this annotation

Codecov / codecov/patch

NAMS/Devices/Biopot/BiopotService.swift#L103

Added line #L103 was not covered by tests
}

clearProcessing()
Task { @EEGProcessing [weak self] in
do {
try await self?.stopRecording()
} catch {
self?.logger.error("Failed to stop recording for device: \(error)")
}
}
}
recordingStream = continuation
}
}

@EEGProcessing
private func stopRecording() async throws {
defer {
recordingStream?.finish() // might already be cancelled, but just to be safe
recordingStream = nil

clearProcessing()
}

try await $dataControl.write(.paused)
}

@EEGProcessing
Expand Down Expand Up @@ -180,7 +209,7 @@ class BiopotService: BluetoothService {
return

Check warning on line 209 in NAMS/Devices/Biopot/BiopotService.swift

View check run for this annotation

Codecov / codecov/patch

NAMS/Devices/Biopot/BiopotService.swift#L208-L209

Added lines #L208 - L209 were not covered by tests
}

guard recordingSession != nil else {
guard recordingStream != nil else {
logger.warning("Received incoming data acquisition \(acquisition.totalSampleCount) while recording session was not present anymore!")
return

Check warning on line 214 in NAMS/Devices/Biopot/BiopotService.swift

View check run for this annotation

Codecov / codecov/patch

NAMS/Devices/Biopot/BiopotService.swift#L213-L214

Added lines #L213 - L214 were not covered by tests
}
Expand All @@ -193,7 +222,7 @@ class BiopotService: BluetoothService {


// See explanation below, this eventually clears the first received packet from the buffer
if nextExpectedSampleCount == 0 && acquisition.totalSampleCount != 0 && packetBuffer.first?.totalSampleCount == 0 {
if nextExpectedSampleCount == 0 && acquisition.totalSampleCount > 0 && packetBuffer.first?.totalSampleCount == 0 {
processBufferedPackets()
}

Expand Down Expand Up @@ -254,14 +283,14 @@ class BiopotService: BluetoothService {

@EEGProcessing
private func processAcquisition(_ acquisition: SomeDataAcquisition) {
guard let recordingSession else {
guard let recordingStream else {
return // we checked that earlier, if it is gone now, something went completely wrong

Check warning on line 287 in NAMS/Devices/Biopot/BiopotService.swift

View check run for this annotation

Codecov / codecov/patch

NAMS/Devices/Biopot/BiopotService.swift#L287

Added line #L287 was not covered by tests
}


for sample in acquisition.samples {
let combinedSample = CombinedEEGSample(channels: sample.channels)
recordingSession.append(combinedSample)
recordingStream.yield(combinedSample)
}
}
}
6 changes: 6 additions & 0 deletions NAMS/Devices/Biopot/Characteristics/DataAcquisition.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ struct DataAcquisition10: DataAcquisition {
let samples: [BiopotSample] // 10 samples

let receivedDate: Date

init(totalSampleCount: UInt32, samples: [BiopotSample], receivedDate: Date = .now) {
self.totalSampleCount = totalSampleCount
self.samples = samples
self.receivedDate = receivedDate
}

Check warning on line 30 in NAMS/Devices/Biopot/Characteristics/DataAcquisition.swift

View check run for this annotation

Codecov / codecov/patch

NAMS/Devices/Biopot/Characteristics/DataAcquisition.swift#L26-L30

Added lines #L26 - L30 were not covered by tests
}


Expand Down
20 changes: 20 additions & 0 deletions NAMS/Devices/Biopot/Characteristics/DeviceConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,26 @@ struct DeviceConfiguration {
let dataSize: UInt8
let syncEnabled: Bool
let serialNumber: UInt32

init(
channelCount: UInt8 = 8,
accelerometerStatus: AccelerometerStatus = .off,
impedanceStatus: Bool = false,
memoryStatus: Bool = false,
samplesPerChannel: UInt8 = 9,
dataSize: UInt8 = 24,
syncEnabled: Bool = false,
serialNumber: UInt32 = 127
) {
self.channelCount = channelCount
self.accelerometerStatus = accelerometerStatus
self.impedanceStatus = impedanceStatus
self.memoryStatus = memoryStatus
self.samplesPerChannel = samplesPerChannel
self.dataSize = dataSize
self.syncEnabled = syncEnabled
self.serialNumber = serialNumber
}
}


Expand Down
12 changes: 2 additions & 10 deletions NAMS/Devices/ConnectedDevice.swift
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,8 @@ extension ConnectedDevice: NAMSDevice {
await underlyingDevice.disconnect()
}

func prepareRecording() async throws {
try await underlyingDevice.prepareRecording()
}

func startRecording(_ session: EEGRecordingSession) async throws {
try await underlyingDevice.startRecording(session)
}

func stopRecording() async throws {
try await underlyingDevice.stopRecording()
func startRecording() async throws -> AsyncStream<CombinedEEGSample> {
try await underlyingDevice.startRecording()
}

@MainActor
Expand Down
54 changes: 40 additions & 14 deletions NAMS/Devices/Mock/MockDevice.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,24 @@
import BluetoothViews
import EDFFormat
import Foundation
import OSLog
import SpeziBluetooth


@Observable
class MockDevice: NAMSDevice {
private static let sampleRate = 60

private let logger = Logger(subsystem: "edu.stanford.nams", category: "MockDevice")

let id: UUID
let name: String

private var measurementGenerator: MockMeasurementGenerator

var state: PeripheralState
var deviceInformation: MuseDeviceInformation? // we are just reusing muse data model
@MainActor private var disconnectHandler: ((ConnectedDevice) -> Void)?

var equipmentCode: String {
if let deviceInformation {
Expand All @@ -32,10 +36,6 @@ class MockDevice: NAMSDevice {
}
}

/// The currently associated recording session.
@MainActor private var recordingSession: EEGRecordingSession?
@MainActor private var disconnectHandler: ((ConnectedDevice) -> Void)?

var connectionState: ConnectionState {
switch state {
case .disconnected:
Expand All @@ -49,6 +49,13 @@ class MockDevice: NAMSDevice {
}
}

/// The currently associated recording session.
@MainActor private var recordingStream: AsyncStream<CombinedEEGSample>.Continuation? {
willSet {
recordingStream?.finish()
}
}

@ObservationIgnored private var eegTimer: Timer? {
willSet {
eegTimer?.invalidate()
Expand Down Expand Up @@ -157,12 +164,8 @@ class MockDevice: NAMSDevice {
}
}

func prepareRecording() async throws {}

@MainActor
func startRecording(_ session: EEGRecordingSession) throws {
self.recordingSession = session

func startRecording() throws -> AsyncStream<CombinedEEGSample> {
// schedule timer to generate fake EEG data
let timer = Timer(timeInterval: 0.1, repeats: true) { timer in
// its running on the main RunLoop so this is safe to assume
Expand All @@ -173,19 +176,42 @@ class MockDevice: NAMSDevice {
RunLoop.main.add(timer, forMode: .common)
self.eegTimer = timer

generateRecording(timer: timer) // make sure there is data instantly
defer {
generateRecording(timer: timer) // make sure there is data instantly
}

logger.info("Started recording for mock device.")

return makeStream()
}

@MainActor
private func makeStream() -> AsyncStream<CombinedEEGSample> {
AsyncStream { continuation in
continuation.onTermination = { [weak self] termination in
guard case .cancelled = termination else {
return // we don't care about finished sequences!

Check warning on line 193 in NAMS/Devices/Mock/MockDevice.swift

View check run for this annotation

Codecov / codecov/patch

NAMS/Devices/Mock/MockDevice.swift#L193

Added line #L193 was not covered by tests
}

Task { @EEGProcessing [weak self] in
try await self?.stopRecording()
}
}
recordingStream = continuation
}
}

@MainActor
func stopRecording() throws {
private func stopRecording() throws {
logger.debug("Stopping recording for mock device ...")
self.eegTimer = nil
self.recordingSession = nil
self.recordingStream = nil
self.measurementGenerator = MockMeasurementGenerator(sampleRate: Self.sampleRate)
}

@MainActor
private func generateRecording(timer: Timer) {
guard let recordingSession,
guard let recordingStream,
state == .connected else {
timer.invalidate()
return

Check warning on line 217 in NAMS/Devices/Mock/MockDevice.swift

View check run for this annotation

Codecov / codecov/patch

NAMS/Devices/Mock/MockDevice.swift#L216-L217

Added lines #L216 - L217 were not covered by tests
Expand All @@ -194,7 +220,7 @@ class MockDevice: NAMSDevice {
let samples = measurementGenerator.next()
Task { @EEGProcessing in
for sample in samples {
recordingSession.append(sample)
recordingStream.yield(sample)
}
}
}
Expand Down
Loading

0 comments on commit bed35cd

Please sign in to comment.