From c536f82515d1ab6e7d6983fdb06a566a78ab3d20 Mon Sep 17 00:00:00 2001 From: Sergey Chelombitko Date: Tue, 17 Dec 2024 16:11:25 +0000 Subject: [PATCH] Expose device events as a flow rather than channel --- .../marathon/scenario/CacheScenarios.kt | 4 +- .../malinskiy/marathon/device/DeviceEvent.kt | 8 +++ .../marathon/device/DeviceProvider.kt | 10 ++-- .../malinskiy/marathon/execution/Scheduler.kt | 53 ++++++------------- .../scenario/DeviceFilteringScenario.kt | 16 +++--- .../scenario/DisconnectingScenarios.kt | 8 +-- .../scenario/InvalidConfigScenarios.kt | 4 +- .../marathon/scenario/SuccessScenarios.kt | 4 +- .../marathon/scenario/UncompletedScenarios.kt | 10 ++-- vendor/vendor-android/ddmlib/build.gradle.kts | 1 + .../android/ddmlib/DdmlibDeviceProvider.kt | 16 +++--- .../android/AndroidDeviceProviderSpek.kt | 30 ----------- .../android/DdmlibDeviceProviderTest.kt | 32 +++++++++++ .../marathon/test/StubDeviceProvider.kt | 22 ++++---- .../test/factory/ConfigurationFactory.kt | 4 +- 15 files changed, 108 insertions(+), 114 deletions(-) create mode 100644 core/src/main/kotlin/com/malinskiy/marathon/device/DeviceEvent.kt delete mode 100644 vendor/vendor-android/ddmlib/src/test/kotlin/com/malinskiy/marathon/android/AndroidDeviceProviderSpek.kt create mode 100644 vendor/vendor-android/ddmlib/src/test/kotlin/com/malinskiy/marathon/android/DdmlibDeviceProviderTest.kt diff --git a/core/src/integrationTest/kotlin/com/malinskiy/marathon/scenario/CacheScenarios.kt b/core/src/integrationTest/kotlin/com/malinskiy/marathon/scenario/CacheScenarios.kt index c3a350906..c06664230 100644 --- a/core/src/integrationTest/kotlin/com/malinskiy/marathon/scenario/CacheScenarios.kt +++ b/core/src/integrationTest/kotlin/com/malinskiy/marathon/scenario/CacheScenarios.kt @@ -3,7 +3,7 @@ package com.malinskiy.marathon.scenario import com.google.gson.JsonParser import com.malinskiy.marathon.cache.config.RemoteCacheConfiguration import com.malinskiy.marathon.cache.gradle.GradleCacheContainer -import com.malinskiy.marathon.device.DeviceProvider +import com.malinskiy.marathon.device.DeviceEvent import com.malinskiy.marathon.execution.CacheConfiguration import com.malinskiy.marathon.execution.TestStatus import com.malinskiy.marathon.test.StubDevice @@ -115,7 +115,7 @@ class CacheScenarios { devices { delay(1000) - it.send(DeviceProvider.DeviceEvent.DeviceConnected(device)) + it.send(DeviceEvent.DeviceConnected(device)) } } diff --git a/core/src/main/kotlin/com/malinskiy/marathon/device/DeviceEvent.kt b/core/src/main/kotlin/com/malinskiy/marathon/device/DeviceEvent.kt new file mode 100644 index 000000000..f9953da7c --- /dev/null +++ b/core/src/main/kotlin/com/malinskiy/marathon/device/DeviceEvent.kt @@ -0,0 +1,8 @@ +package com.malinskiy.marathon.device + +sealed class DeviceEvent { + abstract val device: Device + + class DeviceConnected(override val device: Device) : DeviceEvent() + class DeviceDisconnected(override val device: Device) : DeviceEvent() +} diff --git a/core/src/main/kotlin/com/malinskiy/marathon/device/DeviceProvider.kt b/core/src/main/kotlin/com/malinskiy/marathon/device/DeviceProvider.kt index 808efbed4..e87736b3a 100644 --- a/core/src/main/kotlin/com/malinskiy/marathon/device/DeviceProvider.kt +++ b/core/src/main/kotlin/com/malinskiy/marathon/device/DeviceProvider.kt @@ -1,15 +1,11 @@ package com.malinskiy.marathon.device -import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.Flow interface DeviceProvider : AutoCloseable { - sealed class DeviceEvent { - class DeviceConnected(val device: Device) : DeviceEvent() - class DeviceDisconnected(val device: Device) : DeviceEvent() - } - val deviceInitializationTimeoutMillis: Long suspend fun initialize() suspend fun terminate() - fun subscribe(): Channel + + val deviceEvents: Flow } diff --git a/core/src/main/kotlin/com/malinskiy/marathon/execution/Scheduler.kt b/core/src/main/kotlin/com/malinskiy/marathon/execution/Scheduler.kt index e6115dcb6..dbfa231fe 100644 --- a/core/src/main/kotlin/com/malinskiy/marathon/execution/Scheduler.kt +++ b/core/src/main/kotlin/com/malinskiy/marathon/execution/Scheduler.kt @@ -7,6 +7,7 @@ import com.malinskiy.marathon.cache.test.CacheTestReporter import com.malinskiy.marathon.cache.test.TestCacheLoader import com.malinskiy.marathon.cache.test.TestCacheSaver import com.malinskiy.marathon.device.Device +import com.malinskiy.marathon.device.DeviceEvent import com.malinskiy.marathon.device.DevicePoolId import com.malinskiy.marathon.device.DeviceProvider import com.malinskiy.marathon.device.toDeviceInfo @@ -25,6 +26,7 @@ import kotlinx.coroutines.TimeoutCancellationException import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.filter import kotlinx.coroutines.launch import kotlinx.coroutines.withTimeout import java.util.concurrent.ConcurrentHashMap @@ -59,7 +61,7 @@ class Scheduler( private val scope = CoroutineScope(context) suspend fun initialize() { - subscribeOnDevices() + subscribeToDeviceProvider() initializeCache() try { @@ -130,64 +132,41 @@ class Scheduler( } } - private fun subscribeOnDevices() { - logger.debug("Subscribing to devices") + private fun subscribeToDeviceProvider() { + logger.debug("Subscribing to device provider") scope.launch { - logger.debug("Reading messages from device provider") - - for (msg in deviceProvider.subscribe()) { - when (msg) { - is DeviceProvider.DeviceEvent.DeviceConnected -> { - onDeviceConnected(msg, job, coroutineContext) - } - - is DeviceProvider.DeviceEvent.DeviceDisconnected -> { - onDeviceDisconnected(msg) + deviceProvider.deviceEvents + .filter { isAllowedByConfiguration(it.device) } + .collect { event -> + when (event) { + is DeviceEvent.DeviceConnected -> onDeviceConnected(event.device, coroutineContext) + is DeviceEvent.DeviceDisconnected -> onDeviceDisconnected(event.device) } } - } - - logger.debug("Finished reading messages from device provider") } } - private suspend fun onDeviceDisconnected(item: DeviceProvider.DeviceEvent.DeviceDisconnected) { - val device = item.device - if (filteredByConfiguration(device)) { - logger.debug("[{}] Filtered out by configuration. Skipping disconnection", device.serialNumber) - return - } - + private suspend fun onDeviceDisconnected(device: Device) { logger.debug("[{}] Disconnected", device.serialNumber) pools.values.forEach { it.send(RemoveDevice(device)) } } - private suspend fun onDeviceConnected( - item: DeviceProvider.DeviceEvent.DeviceConnected, - parent: Job, - context: CoroutineContext - ) { - val device = item.device - if (filteredByConfiguration(device)) { - logger.debug("[{}] Filtered out by configuration. Skipping connection", device.serialNumber) - return - } - + private suspend fun onDeviceConnected(device: Device, context: CoroutineContext) { val poolId = poolingStrategy.associate(device) logger.debug("[{}] Associated with pool {}", device.serialNumber, poolId) pools.computeIfAbsent(poolId) { id -> logger.debug("Creating pool actor {}", id) - DevicePoolActor(id, configuration, analytics, progressReporter, track, timer, logsProvider, strictRunChecker, parent, context) + DevicePoolActor(id, configuration, analytics, progressReporter, track, timer, logsProvider, strictRunChecker, job, context) } pools[poolId]?.send(AddDevice(device)) ?: logger.debug("[{}] Not sending AddDevice event to device pool {}", device.serialNumber, poolId) track.deviceConnected(poolId, device.toDeviceInfo()) } - private fun filteredByConfiguration(device: Device): Boolean { + private fun isAllowedByConfiguration(device: Device): Boolean { val whiteListAccepted = when { configuration.includeSerialRegexes.isEmpty() -> true else -> configuration.includeSerialRegexes.any { it.matches(device.serialNumber) } @@ -197,6 +176,6 @@ class Scheduler( else -> configuration.excludeSerialRegexes.none { it.matches(device.serialNumber) } } - return !(whiteListAccepted && blacklistAccepted) + return whiteListAccepted && blacklistAccepted } } diff --git a/core/src/test/kotlin/com/malinskiy/marathon/scenario/DeviceFilteringScenario.kt b/core/src/test/kotlin/com/malinskiy/marathon/scenario/DeviceFilteringScenario.kt index 6018742dd..0e21e749c 100644 --- a/core/src/test/kotlin/com/malinskiy/marathon/scenario/DeviceFilteringScenario.kt +++ b/core/src/test/kotlin/com/malinskiy/marathon/scenario/DeviceFilteringScenario.kt @@ -1,6 +1,6 @@ package com.malinskiy.marathon.scenario -import com.malinskiy.marathon.device.DeviceProvider +import com.malinskiy.marathon.device.DeviceEvent import com.malinskiy.marathon.execution.TestStatus import com.malinskiy.marathon.test.StubDevice import com.malinskiy.marathon.test.Test @@ -53,8 +53,8 @@ class DeviceFilteringScenario : Spek({ devices { delay(1000) - it.send(DeviceProvider.DeviceEvent.DeviceConnected(device1)) - it.send(DeviceProvider.DeviceEvent.DeviceConnected(device2)) + it.send(DeviceEvent.DeviceConnected(device1)) + it.send(DeviceEvent.DeviceConnected(device2)) } } @@ -104,8 +104,8 @@ class DeviceFilteringScenario : Spek({ devices { delay(1000) - it.send(DeviceProvider.DeviceEvent.DeviceConnected(device1)) - it.send(DeviceProvider.DeviceEvent.DeviceConnected(device2)) + it.send(DeviceEvent.DeviceConnected(device1)) + it.send(DeviceEvent.DeviceConnected(device2)) } } @@ -156,9 +156,9 @@ class DeviceFilteringScenario : Spek({ devices { delay(1000) - it.send(DeviceProvider.DeviceEvent.DeviceConnected(device1)) - it.send(DeviceProvider.DeviceEvent.DeviceConnected(device2)) - it.send(DeviceProvider.DeviceEvent.DeviceConnected(device3)) + it.send(DeviceEvent.DeviceConnected(device1)) + it.send(DeviceEvent.DeviceConnected(device2)) + it.send(DeviceEvent.DeviceConnected(device3)) } } diff --git a/core/src/test/kotlin/com/malinskiy/marathon/scenario/DisconnectingScenarios.kt b/core/src/test/kotlin/com/malinskiy/marathon/scenario/DisconnectingScenarios.kt index d493c55ca..fa19e028f 100644 --- a/core/src/test/kotlin/com/malinskiy/marathon/scenario/DisconnectingScenarios.kt +++ b/core/src/test/kotlin/com/malinskiy/marathon/scenario/DisconnectingScenarios.kt @@ -1,6 +1,6 @@ package com.malinskiy.marathon.scenario -import com.malinskiy.marathon.device.DeviceProvider +import com.malinskiy.marathon.device.DeviceEvent import com.malinskiy.marathon.execution.TestStatus import com.malinskiy.marathon.test.StubDevice import com.malinskiy.marathon.test.Test @@ -55,11 +55,11 @@ class DisconnectingScenarios : Spek({ devices { delay(1000) - it.send(DeviceProvider.DeviceEvent.DeviceConnected(device1)) + it.send(DeviceEvent.DeviceConnected(device1)) delay(100) - it.send(DeviceProvider.DeviceEvent.DeviceConnected(device2)) + it.send(DeviceEvent.DeviceConnected(device2)) delay(5000) - it.send(DeviceProvider.DeviceEvent.DeviceDisconnected(device1)) + it.send(DeviceEvent.DeviceDisconnected(device1)) } } diff --git a/core/src/test/kotlin/com/malinskiy/marathon/scenario/InvalidConfigScenarios.kt b/core/src/test/kotlin/com/malinskiy/marathon/scenario/InvalidConfigScenarios.kt index 9752902d8..c6c9f9b84 100644 --- a/core/src/test/kotlin/com/malinskiy/marathon/scenario/InvalidConfigScenarios.kt +++ b/core/src/test/kotlin/com/malinskiy/marathon/scenario/InvalidConfigScenarios.kt @@ -1,6 +1,6 @@ package com.malinskiy.marathon.scenario -import com.malinskiy.marathon.device.DeviceProvider +import com.malinskiy.marathon.device.DeviceEvent import com.malinskiy.marathon.exceptions.ConfigurationException import com.malinskiy.marathon.execution.TestStatus import com.malinskiy.marathon.execution.strategy.impl.flakiness.ProbabilityBasedFlakinessStrategy @@ -44,7 +44,7 @@ class InvalidConfigScenarios : Spek({ devices { delay(1000) - it.send(DeviceProvider.DeviceEvent.DeviceConnected(device)) + it.send(DeviceEvent.DeviceConnected(device)) } } diff --git a/core/src/test/kotlin/com/malinskiy/marathon/scenario/SuccessScenarios.kt b/core/src/test/kotlin/com/malinskiy/marathon/scenario/SuccessScenarios.kt index b360214d5..8144b2ce2 100644 --- a/core/src/test/kotlin/com/malinskiy/marathon/scenario/SuccessScenarios.kt +++ b/core/src/test/kotlin/com/malinskiy/marathon/scenario/SuccessScenarios.kt @@ -1,6 +1,6 @@ package com.malinskiy.marathon.scenario -import com.malinskiy.marathon.device.DeviceProvider +import com.malinskiy.marathon.device.DeviceEvent import com.malinskiy.marathon.execution.TestStatus import com.malinskiy.marathon.test.StubDevice import com.malinskiy.marathon.test.Test @@ -48,7 +48,7 @@ class SuccessScenarios : Spek({ devices { delay(1000) - it.send(DeviceProvider.DeviceEvent.DeviceConnected(device)) + it.send(DeviceEvent.DeviceConnected(device)) } } diff --git a/core/src/test/kotlin/com/malinskiy/marathon/scenario/UncompletedScenarios.kt b/core/src/test/kotlin/com/malinskiy/marathon/scenario/UncompletedScenarios.kt index 04322908d..a8fc72d79 100644 --- a/core/src/test/kotlin/com/malinskiy/marathon/scenario/UncompletedScenarios.kt +++ b/core/src/test/kotlin/com/malinskiy/marathon/scenario/UncompletedScenarios.kt @@ -1,6 +1,6 @@ package com.malinskiy.marathon.scenario -import com.malinskiy.marathon.device.DeviceProvider +import com.malinskiy.marathon.device.DeviceEvent import com.malinskiy.marathon.execution.TestStatus import com.malinskiy.marathon.execution.strategy.impl.retry.fixedquota.FixedQuotaRetryStrategy import com.malinskiy.marathon.test.StubDevice @@ -54,7 +54,7 @@ class UncompletedScenarios : Spek({ devices { delay(1000) - it.send(DeviceProvider.DeviceEvent.DeviceConnected(device1)) + it.send(DeviceEvent.DeviceConnected(device1)) } } @@ -99,7 +99,7 @@ class UncompletedScenarios : Spek({ devices { delay(1000) - it.send(DeviceProvider.DeviceEvent.DeviceConnected(device1)) + it.send(DeviceEvent.DeviceConnected(device1)) } } @@ -150,7 +150,7 @@ class UncompletedScenarios : Spek({ devices { delay(1000) - it.send(DeviceProvider.DeviceEvent.DeviceConnected(device1)) + it.send(DeviceEvent.DeviceConnected(device1)) } } @@ -194,7 +194,7 @@ class UncompletedScenarios : Spek({ devices { delay(1000) - it.send(DeviceProvider.DeviceEvent.DeviceConnected(device1)) + it.send(DeviceEvent.DeviceConnected(device1)) } } diff --git a/vendor/vendor-android/ddmlib/build.gradle.kts b/vendor/vendor-android/ddmlib/build.gradle.kts index 7cc7b01ab..adf7728fc 100644 --- a/vendor/vendor-android/ddmlib/build.gradle.kts +++ b/vendor/vendor-android/ddmlib/build.gradle.kts @@ -18,5 +18,6 @@ dependencies { implementation(libs.slf4j.api) testImplementation(project(":vendor:vendor-test")) + testImplementation(libs.kotlinx.coroutines.test) testImplementation(libs.koin.test) } diff --git a/vendor/vendor-android/ddmlib/src/main/kotlin/com/malinskiy/marathon/android/ddmlib/DdmlibDeviceProvider.kt b/vendor/vendor-android/ddmlib/src/main/kotlin/com/malinskiy/marathon/android/ddmlib/DdmlibDeviceProvider.kt index 071200dfb..7f57bd5b3 100644 --- a/vendor/vendor-android/ddmlib/src/main/kotlin/com/malinskiy/marathon/android/ddmlib/DdmlibDeviceProvider.kt +++ b/vendor/vendor-android/ddmlib/src/main/kotlin/com/malinskiy/marathon/android/ddmlib/DdmlibDeviceProvider.kt @@ -9,9 +9,10 @@ import com.malinskiy.marathon.analytics.internal.pub.Track import com.malinskiy.marathon.android.AndroidAppInstaller import com.malinskiy.marathon.android.AndroidConfiguration import com.malinskiy.marathon.android.executor.logcat.LogcatListener +import com.malinskiy.marathon.device.DeviceEvent +import com.malinskiy.marathon.device.DeviceEvent.DeviceConnected +import com.malinskiy.marathon.device.DeviceEvent.DeviceDisconnected import com.malinskiy.marathon.device.DeviceProvider -import com.malinskiy.marathon.device.DeviceProvider.DeviceEvent.DeviceConnected -import com.malinskiy.marathon.device.DeviceProvider.DeviceEvent.DeviceDisconnected import com.malinskiy.marathon.exceptions.NoDevicesException import com.malinskiy.marathon.execution.Configuration import com.malinskiy.marathon.execution.StrictRunChecker @@ -27,6 +28,8 @@ import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.currentCoroutineContext import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.consumeAsFlow import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import java.time.Duration @@ -47,7 +50,7 @@ class DdmlibDeviceProvider( ) : DeviceProvider, AndroidDebugBridge.IDeviceChangeListener { private val logger = MarathonLogging.getLogger(DdmlibDeviceProvider::class.java) - private val channel: Channel = unboundedChannel() + private val channel: Channel = unboundedChannel() private val devices: ConcurrentMap = ConcurrentHashMap() private val dispatcher = Dispatchers.IO.limitedParallelism(4) @@ -56,6 +59,9 @@ class DdmlibDeviceProvider( override val deviceInitializationTimeoutMillis: Long = 180_000 + override val deviceEvents: Flow + get() = channel.consumeAsFlow() + override suspend fun initialize() { DdmPreferences.setTimeOut(DEFAULT_DDM_LIB_TIMEOUT) @Suppress("DEPRECATION") @@ -146,8 +152,6 @@ class DdmlibDeviceProvider( AndroidDebugBridge.terminate() } - override fun subscribe() = channel - override fun deviceChanged(device: IDevice, changeMask: Int) { logger.debug("Device {} changed, mask = {}", device, changeMask) @@ -256,7 +260,7 @@ class DdmlibDeviceProvider( get() = config.vendorConfiguration as AndroidConfiguration private fun CompletableJob.completeRecursively(): Boolean { - job.children + children .filterIsInstance() .forEach { it.complete() } return complete() diff --git a/vendor/vendor-android/ddmlib/src/test/kotlin/com/malinskiy/marathon/android/AndroidDeviceProviderSpek.kt b/vendor/vendor-android/ddmlib/src/test/kotlin/com/malinskiy/marathon/android/AndroidDeviceProviderSpek.kt deleted file mode 100644 index f6f7d793e..000000000 --- a/vendor/vendor-android/ddmlib/src/test/kotlin/com/malinskiy/marathon/android/AndroidDeviceProviderSpek.kt +++ /dev/null @@ -1,30 +0,0 @@ -package com.malinskiy.marathon.android - -import com.malinskiy.marathon.analytics.internal.pub.Track -import com.malinskiy.marathon.android.ddmlib.DdmlibDeviceProvider -import com.malinskiy.marathon.test.factory.ConfigurationFactory -import com.malinskiy.marathon.time.SystemTimer -import kotlinx.coroutines.DelicateCoroutinesApi -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.junit.jupiter.api.Assertions.assertTrue -import org.mockito.kotlin.mock -import java.time.Clock - -@OptIn(DelicateCoroutinesApi::class) -class AndroidDeviceProviderSpek : Spek({ - given("A provider") { - group("terminate") { - it("should close the channel") { - val config = ConfigurationFactory().build() - val provider = DdmlibDeviceProvider(Track(), SystemTimer(Clock.systemDefaultZone()), config, mock(), mock(), mock(), mock(), mock()) - - provider.close() - - assertTrue(provider.subscribe().isClosedForReceive) - assertTrue(provider.subscribe().isClosedForSend) - } - } - } -}) diff --git a/vendor/vendor-android/ddmlib/src/test/kotlin/com/malinskiy/marathon/android/DdmlibDeviceProviderTest.kt b/vendor/vendor-android/ddmlib/src/test/kotlin/com/malinskiy/marathon/android/DdmlibDeviceProviderTest.kt new file mode 100644 index 000000000..957f4b329 --- /dev/null +++ b/vendor/vendor-android/ddmlib/src/test/kotlin/com/malinskiy/marathon/android/DdmlibDeviceProviderTest.kt @@ -0,0 +1,32 @@ +package com.malinskiy.marathon.android + +import com.malinskiy.marathon.analytics.internal.pub.Track +import com.malinskiy.marathon.android.ddmlib.DdmlibDeviceProvider +import com.malinskiy.marathon.test.factory.ConfigurationFactory +import com.malinskiy.marathon.time.SystemTimer +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.advanceTimeBy +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import org.mockito.kotlin.mock +import java.time.Clock + +class DdmlibDeviceProviderTest { + @Test + fun `GIVEN device provider WHEN terminating device provider THEN device events flow gets stopped`() = runTest { + val config = ConfigurationFactory().build() + val provider = DdmlibDeviceProvider(Track(), SystemTimer(Clock.systemDefaultZone()), config, mock(), mock(), mock(), mock(), mock()) + + val eventReceiver = backgroundScope.launch { + provider.deviceEvents.collect() + } + + provider.terminate() + + advanceTimeBy(1L) + + assertTrue(eventReceiver.isCompleted) + } +} diff --git a/vendor/vendor-test/src/main/kotlin/com/malinskiy/marathon/test/StubDeviceProvider.kt b/vendor/vendor-test/src/main/kotlin/com/malinskiy/marathon/test/StubDeviceProvider.kt index 8795dac77..aed711455 100644 --- a/vendor/vendor-test/src/main/kotlin/com/malinskiy/marathon/test/StubDeviceProvider.kt +++ b/vendor/vendor-test/src/main/kotlin/com/malinskiy/marathon/test/StubDeviceProvider.kt @@ -1,29 +1,33 @@ package com.malinskiy.marathon.test import com.malinskiy.marathon.actor.unboundedChannel +import com.malinskiy.marathon.device.DeviceEvent import com.malinskiy.marathon.device.DeviceProvider import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.consumeAsFlow import kotlinx.coroutines.launch class StubDeviceProvider : DeviceProvider { lateinit var coroutineScope: CoroutineScope - private val channel: Channel = unboundedChannel() - var providingLogic: (suspend (Channel) -> Unit)? = null + private val channel: Channel = unboundedChannel() + var providingLogic: (suspend (Channel) -> Unit)? = null override val deviceInitializationTimeoutMillis: Long = 180_000 override suspend fun initialize() = Unit - override fun subscribe(): Channel { - providingLogic?.let { - coroutineScope.launch { - providingLogic?.invoke(channel) + override val deviceEvents: Flow + get() { + providingLogic?.let { + coroutineScope.launch { + providingLogic?.invoke(channel) + } } - } - return channel - } + return channel.consumeAsFlow() + } override suspend fun terminate() { channel.close() diff --git a/vendor/vendor-test/src/main/kotlin/com/malinskiy/marathon/test/factory/ConfigurationFactory.kt b/vendor/vendor-test/src/main/kotlin/com/malinskiy/marathon/test/factory/ConfigurationFactory.kt index b09b7899c..e1e2125e0 100644 --- a/vendor/vendor-test/src/main/kotlin/com/malinskiy/marathon/test/factory/ConfigurationFactory.kt +++ b/vendor/vendor-test/src/main/kotlin/com/malinskiy/marathon/test/factory/ConfigurationFactory.kt @@ -1,7 +1,7 @@ package com.malinskiy.marathon.test.factory import com.malinskiy.marathon.analytics.internal.pub.Tracker -import com.malinskiy.marathon.device.DeviceProvider +import com.malinskiy.marathon.device.DeviceEvent import com.malinskiy.marathon.execution.CacheConfiguration import com.malinskiy.marathon.execution.Configuration import com.malinskiy.marathon.execution.FilteringConfiguration @@ -51,7 +51,7 @@ class ConfigurationFactory { testParser.tests = block.invoke() } - fun devices(f: suspend (Channel) -> Unit) { + fun devices(f: suspend (Channel) -> Unit) { val stubDeviceProvider = vendorConfiguration.deviceProvider stubDeviceProvider.providingLogic = f }