diff --git a/core/src/main/kotlin/com/malinskiy/marathon/cache/test/TestCacheLoader.kt b/core/src/main/kotlin/com/malinskiy/marathon/cache/test/TestCacheLoader.kt index a51b9fb79..93dd13533 100644 --- a/core/src/main/kotlin/com/malinskiy/marathon/cache/test/TestCacheLoader.kt +++ b/core/src/main/kotlin/com/malinskiy/marathon/cache/test/TestCacheLoader.kt @@ -34,8 +34,8 @@ class TestCacheLoader( private lateinit var cacheCheckCompleted: Deferred - fun initialize(scope: CoroutineScope) = with(scope) { - cacheCheckCompleted = async { + fun initialize(scope: CoroutineScope) { + cacheCheckCompleted = scope.async { // TODO: check concurrently for (test in testsToCheck) { var result: CacheResult? = null diff --git a/core/src/main/kotlin/com/malinskiy/marathon/cache/test/TestCacheSaver.kt b/core/src/main/kotlin/com/malinskiy/marathon/cache/test/TestCacheSaver.kt index 842c3335b..6d01b9a74 100644 --- a/core/src/main/kotlin/com/malinskiy/marathon/cache/test/TestCacheSaver.kt +++ b/core/src/main/kotlin/com/malinskiy/marathon/cache/test/TestCacheSaver.kt @@ -21,8 +21,8 @@ class TestCacheSaver( private val tasks: Channel = unboundedChannel() private lateinit var completableDeferred: Deferred - fun initialize(scope: CoroutineScope) = with(scope) { - completableDeferred = async { + fun initialize(scope: CoroutineScope) { + completableDeferred = scope.async { for (task in tasks) { val cacheKey = testCacheKeyProvider.getCacheKey(task.poolId, task.result.test) cache.store(cacheKey, task.result) diff --git a/core/src/main/kotlin/com/malinskiy/marathon/device/Device.kt b/core/src/main/kotlin/com/malinskiy/marathon/device/Device.kt index 52d4adaa8..e63ecd6a3 100644 --- a/core/src/main/kotlin/com/malinskiy/marathon/device/Device.kt +++ b/core/src/main/kotlin/com/malinskiy/marathon/device/Device.kt @@ -25,6 +25,4 @@ interface Device { ) suspend fun prepare(configuration: Configuration) - fun dispose() } - diff --git a/core/src/main/kotlin/com/malinskiy/marathon/execution/Scheduler.kt b/core/src/main/kotlin/com/malinskiy/marathon/execution/Scheduler.kt index a2a7b06af..5f050fd33 100644 --- a/core/src/main/kotlin/com/malinskiy/marathon/execution/Scheduler.kt +++ b/core/src/main/kotlin/com/malinskiy/marathon/execution/Scheduler.kt @@ -57,7 +57,7 @@ class Scheduler( private val logger = MarathonLogging.logger("Scheduler") - private val scope: CoroutineScope = CoroutineScope(context) + private val scope = CoroutineScope(context) suspend fun initialize() { logger.debug { "Initializing scheduler" } @@ -124,8 +124,8 @@ class Scheduler( } } - private fun subscribeOnDevices(job: Job): Job { - return scope.launch { + private fun subscribeOnDevices(job: Job) { + scope.launch { logger.debug { "Reading messages from device provider" } for (msg in deviceProvider.subscribe()) { diff --git a/core/src/test/kotlin/com/malinskiy/marathon/device/DeviceStub.kt b/core/src/test/kotlin/com/malinskiy/marathon/device/DeviceStub.kt index 09b156320..d5be6cabe 100644 --- a/core/src/test/kotlin/com/malinskiy/marathon/device/DeviceStub.kt +++ b/core/src/test/kotlin/com/malinskiy/marathon/device/DeviceStub.kt @@ -26,6 +26,4 @@ class DeviceStub( } override suspend fun prepare(configuration: Configuration) {} - - override fun dispose() {} } diff --git a/vendor/vendor-android/base/src/main/kotlin/com/malinskiy/marathon/android/AndroidDevice.kt b/vendor/vendor-android/base/src/main/kotlin/com/malinskiy/marathon/android/AndroidDevice.kt index ac6fe0e5e..98462ed1b 100644 --- a/vendor/vendor-android/base/src/main/kotlin/com/malinskiy/marathon/android/AndroidDevice.kt +++ b/vendor/vendor-android/base/src/main/kotlin/com/malinskiy/marathon/android/AndroidDevice.kt @@ -7,7 +7,7 @@ import com.malinskiy.marathon.device.Device import java.awt.image.BufferedImage import java.util.concurrent.TimeUnit -interface AndroidDevice : Device { +interface AndroidDevice : Device, AutoCloseable { val apiLevel: Int val version: AndroidVersion @@ -27,6 +27,4 @@ interface AndroidDevice : Device { remoteFilePath: String, options: ScreenRecorderOptions ) - - fun waitForAsyncWork() } diff --git a/vendor/vendor-android/base/src/main/kotlin/com/malinskiy/marathon/android/executor/listeners/screenshot/ScreenCapturerTestRunListener.kt b/vendor/vendor-android/base/src/main/kotlin/com/malinskiy/marathon/android/executor/listeners/screenshot/ScreenCapturerTestRunListener.kt index 76f363c6a..68f48ab1c 100644 --- a/vendor/vendor-android/base/src/main/kotlin/com/malinskiy/marathon/android/executor/listeners/screenshot/ScreenCapturerTestRunListener.kt +++ b/vendor/vendor-android/base/src/main/kotlin/com/malinskiy/marathon/android/executor/listeners/screenshot/ScreenCapturerTestRunListener.kt @@ -9,36 +9,30 @@ import com.malinskiy.marathon.report.attachment.AttachmentProvider import com.malinskiy.marathon.test.Test import com.malinskiy.marathon.test.toSimpleSafeTestName import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.async -import kotlinx.coroutines.newFixedThreadPoolContext -import kotlin.coroutines.CoroutineContext class ScreenCapturerTestRunListener( private val attachmentManager: AttachmentManager, - private val device: AndroidDevice -) : TestRunListener, CoroutineScope, AttachmentProvider { + private val device: AndroidDevice, + private val coroutineScope: CoroutineScope +) : TestRunListener, AttachmentProvider { private val attachmentListeners = mutableListOf() - - override fun registerListener(listener: AttachmentListener) { - attachmentListeners.add(listener) - } - private var screenCapturerJob: Job? = null private var screenCapturer: ScreenCapturer? = null private val logger = MarathonLogging.logger(ScreenCapturerTestRunListener::class.java.simpleName) + private val dispatcher = Dispatchers.IO.limitedParallelism(1) - @OptIn(DelicateCoroutinesApi::class) - private val threadPoolDispatcher = newFixedThreadPoolContext(1, "ScreenCapturer - ${device.serialNumber}") - override val coroutineContext: CoroutineContext - get() = threadPoolDispatcher + override fun registerListener(listener: AttachmentListener) { + attachmentListeners.add(listener) + } override fun testStarted(test: Test) { logger.debug { "Starting recording for ${test.toSimpleSafeTestName()}" } screenCapturer = ScreenCapturer(device, attachmentManager, test) - screenCapturerJob = async { + screenCapturerJob = coroutineScope.async(dispatcher) { screenCapturer?.start() } } @@ -46,7 +40,6 @@ class ScreenCapturerTestRunListener( override fun testEnded(test: Test, testMetrics: Map) { logger.debug { "Finished recording for ${test.toSimpleSafeTestName()}" } screenCapturerJob?.cancel() - threadPoolDispatcher.close() screenCapturer?.attachment?.let { attachment -> attachmentListeners.forEach { diff --git a/vendor/vendor-android/ddmlib/src/main/kotlin/com/malinskiy/marathon/android/ddmlib/DdmlibAndroidDevice.kt b/vendor/vendor-android/ddmlib/src/main/kotlin/com/malinskiy/marathon/android/ddmlib/DdmlibAndroidDevice.kt index fb4f07245..34a3f4715 100644 --- a/vendor/vendor-android/ddmlib/src/main/kotlin/com/malinskiy/marathon/android/ddmlib/DdmlibAndroidDevice.kt +++ b/vendor/vendor-android/ddmlib/src/main/kotlin/com/malinskiy/marathon/android/ddmlib/DdmlibAndroidDevice.kt @@ -53,17 +53,16 @@ import com.malinskiy.marathon.test.TestBatch import com.malinskiy.marathon.time.Timer import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.async -import kotlinx.coroutines.newFixedThreadPoolContext -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.cancel import java.awt.image.BufferedImage import java.io.File import java.io.IOException import java.util.UUID import java.util.concurrent.TimeUnit -import kotlin.coroutines.CoroutineContext class DdmlibAndroidDevice( val ddmsDevice: IDevice, @@ -75,15 +74,18 @@ class DdmlibAndroidDevice( private val reportsFileManager: FileManager, private val serialStrategy: SerialStrategy, private val logcatListener: LogcatListener, - private val strictRunChecker: StrictRunChecker -) : Device, CoroutineScope, AndroidDevice { + private val strictRunChecker: StrictRunChecker, + parentJob: Job = Job(), +) : Device, AndroidDevice { override val fileManager = RemoteFileManager(this) override val version: AndroidVersion by lazy { ddmsDevice.version } - private val nullOutputReceiver = NullOutputReceiver() - private val parentJob: Job = Job() - private var logcatReceiver: CliLogcatReceiver? = null + private val dispatcher = Dispatchers.IO.limitedParallelism(1) + private val job = SupervisorJob(parentJob) + private val coroutineScope = CoroutineScope(job + dispatcher) + private val logger = MarathonLogging.logger(DdmlibAndroidDevice::class.java.simpleName) + private val logMessagesListener: (List) -> Unit = { it.forEach { msg -> logcatListener.onMessage(this, msg.toMarathonLogcatMessage()) @@ -102,7 +104,7 @@ class DdmlibAndroidDevice( override fun executeCommand(command: String, errorMessage: String) { try { - ddmsDevice.safeExecuteShellCommand(command, nullOutputReceiver) + ddmsDevice.safeExecuteShellCommand(command, NullOutputReceiver()) } catch (e: TimeoutException) { logger.error("$errorMessage while executing $command", e) } catch (e: AdbCommandRejectedException) { @@ -147,14 +149,6 @@ class DdmlibAndroidDevice( ) } - override fun waitForAsyncWork() { - runBlocking(context = coroutineContext) { - parentJob.children.forEach { - it.join() - } - } - } - private fun bufferedImageFrom(rawImage: RawImage): BufferedImage { val image = BufferedImage(rawImage.width, rawImage.height, BufferedImage.TYPE_INT_ARGB) @@ -169,15 +163,6 @@ class DdmlibAndroidDevice( return image } - @OptIn(DelicateCoroutinesApi::class) - private val dispatcher by lazy { - newFixedThreadPoolContext(1, "AndroidDevice - execution - ${ddmsDevice.serialNumber}") - } - - override val coroutineContext: CoroutineContext = dispatcher - - private val logger = MarathonLogging.logger(DdmlibAndroidDevice::class.java.simpleName) - override val abi: String by lazy { ddmsDevice.getProperty("ro.product.cpu.abi") ?: "Unknown" } @@ -231,6 +216,7 @@ class DdmlibAndroidDevice( ?: serialNumber.takeIf { it.isNotEmpty() } ?: UUID.randomUUID().toString() } + SerialStrategy.MARATHON_PROPERTY -> marathonSerialProp SerialStrategy.BOOT_PROPERTY -> serialProp SerialStrategy.HOSTNAME -> hostName @@ -276,7 +262,7 @@ class DdmlibAndroidDevice( val androidComponentInfo = testBatch.componentInfo as AndroidComponentInfo try { - async { ensureInstalled(androidComponentInfo) }.await() + coroutineScope.async { ensureInstalled(androidComponentInfo) }.await() } catch (@Suppress("TooGenericExceptionCaught") e: Throwable) { logger.error(e) { "Terminating device $serialNumber due to installation failures" } throw DeviceLostException(e) @@ -284,7 +270,7 @@ class DdmlibAndroidDevice( safePrintToLogcat(SERVICE_LOGS_TAG, "\"batch_started: {${testBatch.id}}\"") - val deferredResult = async { + val deferredResult = coroutineScope.async { val listeners = createListeners(configuration, devicePoolId, testBatch, deferred, progressReporter) val listener = DdmlibTestRunListener(testBatch.componentInfo, listeners) AndroidDeviceTestRunner(this@DdmlibAndroidDevice).execute(configuration, testBatch, listener) @@ -334,21 +320,16 @@ class DdmlibAndroidDevice( override suspend fun prepare(configuration: Configuration) { track.trackDevicePreparing(this) { - val deferred = async { + val logcatReceiver = CliLogcatReceiver(adbPath, reportsFileManager, ddmsDevice, logMessagesListener) + val deferred = coroutineScope.async { clearLogcat(ddmsDevice) - - logcatReceiver = CliLogcatReceiver(adbPath, reportsFileManager, ddmsDevice, logMessagesListener) - logcatReceiver?.start() + logcatReceiver.start() } + job.invokeOnCompletion { logcatReceiver.close() } deferred.await() } } - override fun dispose() { - logcatReceiver?.dispose() - dispatcher.close() - } - private fun selectRecorderType(preferred: DeviceFeature?, features: Collection) = when { features.contains(preferred) -> preferred features.contains(DeviceFeature.VIDEO) -> DeviceFeature.VIDEO @@ -370,6 +351,10 @@ class DdmlibAndroidDevice( return receiver.output() } + override fun close() { + coroutineScope.cancel() + } + private fun prepareRecorderListener( feature: DeviceFeature, attachmentProviders: MutableList @@ -381,7 +366,7 @@ class DdmlibAndroidDevice( } DeviceFeature.SCREENSHOT -> { - ScreenCapturerTestRunListener(attachmentManager, this) + ScreenCapturerTestRunListener(attachmentManager, this, coroutineScope) .also { attachmentProviders.add(it) } } } diff --git a/vendor/vendor-android/ddmlib/src/main/kotlin/com/malinskiy/marathon/android/ddmlib/DdmlibDeviceProvider.kt b/vendor/vendor-android/ddmlib/src/main/kotlin/com/malinskiy/marathon/android/ddmlib/DdmlibDeviceProvider.kt index cf7b497d6..1ff5b2e64 100644 --- a/vendor/vendor-android/ddmlib/src/main/kotlin/com/malinskiy/marathon/android/ddmlib/DdmlibDeviceProvider.kt +++ b/vendor/vendor-android/ddmlib/src/main/kotlin/com/malinskiy/marathon/android/ddmlib/DdmlibDeviceProvider.kt @@ -19,19 +19,21 @@ import com.malinskiy.marathon.io.AttachmentManager import com.malinskiy.marathon.io.FileManager import com.malinskiy.marathon.log.MarathonLogging import com.malinskiy.marathon.time.Timer +import kotlinx.coroutines.CompletableJob import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.currentCoroutineContext import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.launch -import kotlinx.coroutines.newFixedThreadPoolContext import java.time.Duration import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentMap import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean -import kotlin.coroutines.CoroutineContext class DdmlibDeviceProvider( private val track: Track, @@ -42,18 +44,15 @@ class DdmlibDeviceProvider( private val strictRunChecker: StrictRunChecker, private val logcatListener: LogcatListener, private val attachmentManager: AttachmentManager -) : DeviceProvider, AndroidDebugBridge.IDeviceChangeListener, CoroutineScope { +) : DeviceProvider, AndroidDebugBridge.IDeviceChangeListener { private val logger = MarathonLogging.logger("AndroidDeviceProvider") - private lateinit var adb: AndroidDebugBridge - private val channel: Channel = unboundedChannel() private val devices: ConcurrentMap = ConcurrentHashMap() - @OptIn(DelicateCoroutinesApi::class) - private val bootWaitContext = newFixedThreadPoolContext(4, "AndroidDeviceProvider-BootWait") - override val coroutineContext: CoroutineContext - get() = bootWaitContext + private val dispatcher = Dispatchers.IO.limitedParallelism(4) + private val job = SupervisorJob() + private val coroutineScope = CoroutineScope(job + dispatcher) override val deviceInitializationTimeoutMillis: Long = 180_000 @@ -63,7 +62,7 @@ class DdmlibDeviceProvider( AndroidDebugBridge.initIfNeeded(false) AndroidDebugBridge.addDeviceChangeListener(this) - adb = AndroidDebugBridge.createBridge(vendorConfiguration.adbPath.absolutePath, false, ADB_INIT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS) + val adb = AndroidDebugBridge.createBridge(vendorConfiguration.adbPath.absolutePath, false, ADB_INIT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS) logger.debug { "Created ADB bridge" } var getDevicesCountdown = config.noDevicesTimeoutMillis @@ -123,9 +122,9 @@ class DdmlibDeviceProvider( } if (newAndroidDevice != androidDevice) { - logger.debug { "There was a device with the same serial number as the new device ($newAndroidDevice), disposing old device" } - androidDevice.dispose() - logger.debug { "Old device disposed ($androidDevice)" } + logger.debug { "There was a device with the same serial number as the new device ($newAndroidDevice), closing the old device" } + androidDevice.close() + logger.debug { "Old device closed ($androidDevice)" } } return newAndroidDevice @@ -142,17 +141,15 @@ class DdmlibDeviceProvider( private fun AndroidDebugBridge.hasDevices(): Boolean = devices.isNotEmpty() override suspend fun terminate() { - devices.values.forEach { - it.waitForAsyncWork() - } + job.completeRecursively() + job.join() channel.close() } override fun close() { - channel.close() - bootWaitContext.close() - AndroidDebugBridge.removeDeviceChangeListener(this) + channel.close() + coroutineScope.cancel() AndroidDebugBridge.terminate() } @@ -161,20 +158,20 @@ class DdmlibDeviceProvider( override fun deviceChanged(device: IDevice, changeMask: Int) { logger.debug { "Device changed: $device" } - launch(context = bootWaitContext) { - val maybeNewAndroidDevice = - DdmlibAndroidDevice( - ddmsDevice = device, - adbPath = vendorConfiguration.adbPath, - track = track, - timer = timer, - androidAppInstaller = androidAppInstaller, - attachmentManager = attachmentManager, - reportsFileManager = fileManager, - serialStrategy = vendorConfiguration.serialStrategy, - logcatListener = logcatListener, - strictRunChecker = strictRunChecker - ) + coroutineScope.launch { + val maybeNewAndroidDevice = DdmlibAndroidDevice( + ddmsDevice = device, + adbPath = vendorConfiguration.adbPath, + track = track, + timer = timer, + androidAppInstaller = androidAppInstaller, + attachmentManager = attachmentManager, + reportsFileManager = fileManager, + serialStrategy = vendorConfiguration.serialStrategy, + logcatListener = logcatListener, + strictRunChecker = strictRunChecker, + parentJob = job + ) val healthy = maybeNewAndroidDevice.healthy logger.debug { "Device ${device.serialNumber} changed state. Healthy = $healthy" } @@ -186,6 +183,7 @@ class DdmlibDeviceProvider( // This shouldn't have any side effects even if device was previously removed logger.debug { "Device is not healthy, notifying disconnected $device" } notifyDisconnected(maybeNewAndroidDevice) + maybeNewAndroidDevice.close() } } } @@ -193,7 +191,7 @@ class DdmlibDeviceProvider( override fun deviceConnected(device: IDevice) { logger.debug { "Device connected: $device" } - launch { + coroutineScope.launch { val maybeNewAndroidDevice = DdmlibAndroidDevice( ddmsDevice = device, track = track, @@ -204,7 +202,8 @@ class DdmlibDeviceProvider( reportsFileManager = fileManager, adbPath = vendorConfiguration.adbPath, logcatListener = logcatListener, - strictRunChecker = strictRunChecker + strictRunChecker = strictRunChecker, + parentJob = job ) val healthy = maybeNewAndroidDevice.healthy @@ -220,10 +219,10 @@ class DdmlibDeviceProvider( override fun deviceDisconnected(device: IDevice) { logger.debug { "Device ${device.serialNumber} disconnected" } - launch { + coroutineScope.launch { matchDdmsToDevice(device)?.let { notifyDisconnected(it) - it.dispose() + it.close() devices.remove(it.serialNumber) } } @@ -248,7 +247,7 @@ class DdmlibDeviceProvider( logger.debug { "Device ${device.serialNumber} is still booting..." } } - if (Thread.interrupted() || !isActive) { + if (Thread.interrupted() || !currentCoroutineContext().isActive) { booted = true break } @@ -273,6 +272,13 @@ class DdmlibDeviceProvider( private val vendorConfiguration: AndroidConfiguration get() = config.vendorConfiguration as AndroidConfiguration + private fun CompletableJob.completeRecursively(): Boolean { + job.children + .filterIsInstance() + .forEach { it.complete() } + return complete() + } + companion object { private val ADB_INIT_TIMEOUT = Duration.ofSeconds(60) private const val DEFAULT_DDM_LIB_TIMEOUT = 30000 diff --git a/vendor/vendor-android/ddmlib/src/main/kotlin/com/malinskiy/marathon/android/ddmlib/shell/CliLogcatReceiver.kt b/vendor/vendor-android/ddmlib/src/main/kotlin/com/malinskiy/marathon/android/ddmlib/shell/CliLogcatReceiver.kt index d7e4f7c5c..33e901868 100644 --- a/vendor/vendor-android/ddmlib/src/main/kotlin/com/malinskiy/marathon/android/ddmlib/shell/CliLogcatReceiver.kt +++ b/vendor/vendor-android/ddmlib/src/main/kotlin/com/malinskiy/marathon/android/ddmlib/shell/CliLogcatReceiver.kt @@ -10,12 +10,12 @@ import java.text.DateFormat import java.text.SimpleDateFormat import java.util.Date -class CliLogcatReceiver( +internal class CliLogcatReceiver( private val adbPath: File, private val fileManager: FileManager, private val device: IDevice, private val listener: (List) -> Unit -) { +) : AutoCloseable { private var tailer: Tailer? = null private var process: Process? = null @@ -33,7 +33,7 @@ class CliLogcatReceiver( ) } - fun dispose() { + override fun close() { tailer?.stop() process?.destroyForcibly() } diff --git a/vendor/vendor-test/src/main/kotlin/com/malinskiy/marathon/test/StubDevice.kt b/vendor/vendor-test/src/main/kotlin/com/malinskiy/marathon/test/StubDevice.kt index 8ecf1e44d..03ab5d155 100644 --- a/vendor/vendor-test/src/main/kotlin/com/malinskiy/marathon/test/StubDevice.kt +++ b/vendor/vendor-test/src/main/kotlin/com/malinskiy/marathon/test/StubDevice.kt @@ -72,8 +72,4 @@ class StubDevice( logger.debug { "Preparing" } delay(prepareTimeMillis) } - - override fun dispose() { - logger.debug { "Disposing" } - } }