Skip to content

Commit

Permalink
Expose device events as a flow rather than channel
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Chelombitko committed Dec 17, 2024
1 parent 3bf38c3 commit c536f82
Show file tree
Hide file tree
Showing 15 changed files with 108 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.malinskiy.marathon.scenario
import com.google.gson.JsonParser
import com.malinskiy.marathon.cache.config.RemoteCacheConfiguration
import com.malinskiy.marathon.cache.gradle.GradleCacheContainer
import com.malinskiy.marathon.device.DeviceProvider
import com.malinskiy.marathon.device.DeviceEvent
import com.malinskiy.marathon.execution.CacheConfiguration
import com.malinskiy.marathon.execution.TestStatus
import com.malinskiy.marathon.test.StubDevice
Expand Down Expand Up @@ -115,7 +115,7 @@ class CacheScenarios {

devices {
delay(1000)
it.send(DeviceProvider.DeviceEvent.DeviceConnected(device))
it.send(DeviceEvent.DeviceConnected(device))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.malinskiy.marathon.device

sealed class DeviceEvent {
abstract val device: Device

class DeviceConnected(override val device: Device) : DeviceEvent()
class DeviceDisconnected(override val device: Device) : DeviceEvent()
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package com.malinskiy.marathon.device

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow

interface DeviceProvider : AutoCloseable {
sealed class DeviceEvent {
class DeviceConnected(val device: Device) : DeviceEvent()
class DeviceDisconnected(val device: Device) : DeviceEvent()
}

val deviceInitializationTimeoutMillis: Long
suspend fun initialize()
suspend fun terminate()
fun subscribe(): Channel<DeviceEvent>

val deviceEvents: Flow<DeviceEvent>
}
53 changes: 16 additions & 37 deletions core/src/main/kotlin/com/malinskiy/marathon/execution/Scheduler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.malinskiy.marathon.cache.test.CacheTestReporter
import com.malinskiy.marathon.cache.test.TestCacheLoader
import com.malinskiy.marathon.cache.test.TestCacheSaver
import com.malinskiy.marathon.device.Device
import com.malinskiy.marathon.device.DeviceEvent
import com.malinskiy.marathon.device.DevicePoolId
import com.malinskiy.marathon.device.DeviceProvider
import com.malinskiy.marathon.device.toDeviceInfo
Expand All @@ -25,6 +26,7 @@ import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeout
import java.util.concurrent.ConcurrentHashMap
Expand Down Expand Up @@ -59,7 +61,7 @@ class Scheduler(
private val scope = CoroutineScope(context)

suspend fun initialize() {
subscribeOnDevices()
subscribeToDeviceProvider()
initializeCache()

try {
Expand Down Expand Up @@ -130,64 +132,41 @@ class Scheduler(
}
}

private fun subscribeOnDevices() {
logger.debug("Subscribing to devices")
private fun subscribeToDeviceProvider() {
logger.debug("Subscribing to device provider")

scope.launch {
logger.debug("Reading messages from device provider")

for (msg in deviceProvider.subscribe()) {
when (msg) {
is DeviceProvider.DeviceEvent.DeviceConnected -> {
onDeviceConnected(msg, job, coroutineContext)
}

is DeviceProvider.DeviceEvent.DeviceDisconnected -> {
onDeviceDisconnected(msg)
deviceProvider.deviceEvents
.filter { isAllowedByConfiguration(it.device) }
.collect { event ->
when (event) {
is DeviceEvent.DeviceConnected -> onDeviceConnected(event.device, coroutineContext)
is DeviceEvent.DeviceDisconnected -> onDeviceDisconnected(event.device)
}
}
}

logger.debug("Finished reading messages from device provider")
}
}

private suspend fun onDeviceDisconnected(item: DeviceProvider.DeviceEvent.DeviceDisconnected) {
val device = item.device
if (filteredByConfiguration(device)) {
logger.debug("[{}] Filtered out by configuration. Skipping disconnection", device.serialNumber)
return
}

private suspend fun onDeviceDisconnected(device: Device) {
logger.debug("[{}] Disconnected", device.serialNumber)
pools.values.forEach {
it.send(RemoveDevice(device))
}
}

private suspend fun onDeviceConnected(
item: DeviceProvider.DeviceEvent.DeviceConnected,
parent: Job,
context: CoroutineContext
) {
val device = item.device
if (filteredByConfiguration(device)) {
logger.debug("[{}] Filtered out by configuration. Skipping connection", device.serialNumber)
return
}

private suspend fun onDeviceConnected(device: Device, context: CoroutineContext) {
val poolId = poolingStrategy.associate(device)
logger.debug("[{}] Associated with pool {}", device.serialNumber, poolId)
pools.computeIfAbsent(poolId) { id ->
logger.debug("Creating pool actor {}", id)
DevicePoolActor(id, configuration, analytics, progressReporter, track, timer, logsProvider, strictRunChecker, parent, context)
DevicePoolActor(id, configuration, analytics, progressReporter, track, timer, logsProvider, strictRunChecker, job, context)
}
pools[poolId]?.send(AddDevice(device))
?: logger.debug("[{}] Not sending AddDevice event to device pool {}", device.serialNumber, poolId)
track.deviceConnected(poolId, device.toDeviceInfo())
}

private fun filteredByConfiguration(device: Device): Boolean {
private fun isAllowedByConfiguration(device: Device): Boolean {
val whiteListAccepted = when {
configuration.includeSerialRegexes.isEmpty() -> true
else -> configuration.includeSerialRegexes.any { it.matches(device.serialNumber) }
Expand All @@ -197,6 +176,6 @@ class Scheduler(
else -> configuration.excludeSerialRegexes.none { it.matches(device.serialNumber) }
}

return !(whiteListAccepted && blacklistAccepted)
return whiteListAccepted && blacklistAccepted
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.malinskiy.marathon.scenario

import com.malinskiy.marathon.device.DeviceProvider
import com.malinskiy.marathon.device.DeviceEvent
import com.malinskiy.marathon.execution.TestStatus
import com.malinskiy.marathon.test.StubDevice
import com.malinskiy.marathon.test.Test
Expand Down Expand Up @@ -53,8 +53,8 @@ class DeviceFilteringScenario : Spek({

devices {
delay(1000)
it.send(DeviceProvider.DeviceEvent.DeviceConnected(device1))
it.send(DeviceProvider.DeviceEvent.DeviceConnected(device2))
it.send(DeviceEvent.DeviceConnected(device1))
it.send(DeviceEvent.DeviceConnected(device2))
}
}

Expand Down Expand Up @@ -104,8 +104,8 @@ class DeviceFilteringScenario : Spek({

devices {
delay(1000)
it.send(DeviceProvider.DeviceEvent.DeviceConnected(device1))
it.send(DeviceProvider.DeviceEvent.DeviceConnected(device2))
it.send(DeviceEvent.DeviceConnected(device1))
it.send(DeviceEvent.DeviceConnected(device2))
}
}

Expand Down Expand Up @@ -156,9 +156,9 @@ class DeviceFilteringScenario : Spek({

devices {
delay(1000)
it.send(DeviceProvider.DeviceEvent.DeviceConnected(device1))
it.send(DeviceProvider.DeviceEvent.DeviceConnected(device2))
it.send(DeviceProvider.DeviceEvent.DeviceConnected(device3))
it.send(DeviceEvent.DeviceConnected(device1))
it.send(DeviceEvent.DeviceConnected(device2))
it.send(DeviceEvent.DeviceConnected(device3))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.malinskiy.marathon.scenario

import com.malinskiy.marathon.device.DeviceProvider
import com.malinskiy.marathon.device.DeviceEvent
import com.malinskiy.marathon.execution.TestStatus
import com.malinskiy.marathon.test.StubDevice
import com.malinskiy.marathon.test.Test
Expand Down Expand Up @@ -55,11 +55,11 @@ class DisconnectingScenarios : Spek({

devices {
delay(1000)
it.send(DeviceProvider.DeviceEvent.DeviceConnected(device1))
it.send(DeviceEvent.DeviceConnected(device1))
delay(100)
it.send(DeviceProvider.DeviceEvent.DeviceConnected(device2))
it.send(DeviceEvent.DeviceConnected(device2))
delay(5000)
it.send(DeviceProvider.DeviceEvent.DeviceDisconnected(device1))
it.send(DeviceEvent.DeviceDisconnected(device1))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.malinskiy.marathon.scenario

import com.malinskiy.marathon.device.DeviceProvider
import com.malinskiy.marathon.device.DeviceEvent
import com.malinskiy.marathon.exceptions.ConfigurationException
import com.malinskiy.marathon.execution.TestStatus
import com.malinskiy.marathon.execution.strategy.impl.flakiness.ProbabilityBasedFlakinessStrategy
Expand Down Expand Up @@ -44,7 +44,7 @@ class InvalidConfigScenarios : Spek({

devices {
delay(1000)
it.send(DeviceProvider.DeviceEvent.DeviceConnected(device))
it.send(DeviceEvent.DeviceConnected(device))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.malinskiy.marathon.scenario

import com.malinskiy.marathon.device.DeviceProvider
import com.malinskiy.marathon.device.DeviceEvent
import com.malinskiy.marathon.execution.TestStatus
import com.malinskiy.marathon.test.StubDevice
import com.malinskiy.marathon.test.Test
Expand Down Expand Up @@ -48,7 +48,7 @@ class SuccessScenarios : Spek({

devices {
delay(1000)
it.send(DeviceProvider.DeviceEvent.DeviceConnected(device))
it.send(DeviceEvent.DeviceConnected(device))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.malinskiy.marathon.scenario

import com.malinskiy.marathon.device.DeviceProvider
import com.malinskiy.marathon.device.DeviceEvent
import com.malinskiy.marathon.execution.TestStatus
import com.malinskiy.marathon.execution.strategy.impl.retry.fixedquota.FixedQuotaRetryStrategy
import com.malinskiy.marathon.test.StubDevice
Expand Down Expand Up @@ -54,7 +54,7 @@ class UncompletedScenarios : Spek({

devices {
delay(1000)
it.send(DeviceProvider.DeviceEvent.DeviceConnected(device1))
it.send(DeviceEvent.DeviceConnected(device1))
}
}

Expand Down Expand Up @@ -99,7 +99,7 @@ class UncompletedScenarios : Spek({

devices {
delay(1000)
it.send(DeviceProvider.DeviceEvent.DeviceConnected(device1))
it.send(DeviceEvent.DeviceConnected(device1))
}
}

Expand Down Expand Up @@ -150,7 +150,7 @@ class UncompletedScenarios : Spek({

devices {
delay(1000)
it.send(DeviceProvider.DeviceEvent.DeviceConnected(device1))
it.send(DeviceEvent.DeviceConnected(device1))
}
}

Expand Down Expand Up @@ -194,7 +194,7 @@ class UncompletedScenarios : Spek({

devices {
delay(1000)
it.send(DeviceProvider.DeviceEvent.DeviceConnected(device1))
it.send(DeviceEvent.DeviceConnected(device1))
}
}

Expand Down
1 change: 1 addition & 0 deletions vendor/vendor-android/ddmlib/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ dependencies {
implementation(libs.slf4j.api)

testImplementation(project(":vendor:vendor-test"))
testImplementation(libs.kotlinx.coroutines.test)
testImplementation(libs.koin.test)
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import com.malinskiy.marathon.analytics.internal.pub.Track
import com.malinskiy.marathon.android.AndroidAppInstaller
import com.malinskiy.marathon.android.AndroidConfiguration
import com.malinskiy.marathon.android.executor.logcat.LogcatListener
import com.malinskiy.marathon.device.DeviceEvent
import com.malinskiy.marathon.device.DeviceEvent.DeviceConnected
import com.malinskiy.marathon.device.DeviceEvent.DeviceDisconnected
import com.malinskiy.marathon.device.DeviceProvider
import com.malinskiy.marathon.device.DeviceProvider.DeviceEvent.DeviceConnected
import com.malinskiy.marathon.device.DeviceProvider.DeviceEvent.DeviceDisconnected
import com.malinskiy.marathon.exceptions.NoDevicesException
import com.malinskiy.marathon.execution.Configuration
import com.malinskiy.marathon.execution.StrictRunChecker
Expand All @@ -27,6 +28,8 @@ import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import java.time.Duration
Expand All @@ -47,7 +50,7 @@ class DdmlibDeviceProvider(
) : DeviceProvider, AndroidDebugBridge.IDeviceChangeListener {

private val logger = MarathonLogging.getLogger(DdmlibDeviceProvider::class.java)
private val channel: Channel<DeviceProvider.DeviceEvent> = unboundedChannel()
private val channel: Channel<DeviceEvent> = unboundedChannel()
private val devices: ConcurrentMap<String, DdmlibAndroidDevice> = ConcurrentHashMap()

private val dispatcher = Dispatchers.IO.limitedParallelism(4)
Expand All @@ -56,6 +59,9 @@ class DdmlibDeviceProvider(

override val deviceInitializationTimeoutMillis: Long = 180_000

override val deviceEvents: Flow<DeviceEvent>
get() = channel.consumeAsFlow()

override suspend fun initialize() {
DdmPreferences.setTimeOut(DEFAULT_DDM_LIB_TIMEOUT)
@Suppress("DEPRECATION")
Expand Down Expand Up @@ -146,8 +152,6 @@ class DdmlibDeviceProvider(
AndroidDebugBridge.terminate()
}

override fun subscribe() = channel

override fun deviceChanged(device: IDevice, changeMask: Int) {
logger.debug("Device {} changed, mask = {}", device, changeMask)

Expand Down Expand Up @@ -256,7 +260,7 @@ class DdmlibDeviceProvider(
get() = config.vendorConfiguration as AndroidConfiguration

private fun CompletableJob.completeRecursively(): Boolean {
job.children
children
.filterIsInstance<CompletableJob>()
.forEach { it.complete() }
return complete()
Expand Down

This file was deleted.

Loading

0 comments on commit c536f82

Please sign in to comment.