Skip to content

Commit

Permalink
Gracefully handle build cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Chelombitko committed Dec 11, 2024
1 parent e6aa69a commit 497d2a3
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ class TestCacheLoader(

private lateinit var cacheCheckCompleted: Deferred<Unit>

fun initialize(scope: CoroutineScope) = with(scope) {
cacheCheckCompleted = async {
fun initialize(scope: CoroutineScope) {
cacheCheckCompleted = scope.async {
// TODO: check concurrently
for (test in testsToCheck) {
var result: CacheResult? = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ class TestCacheSaver(
private val tasks: Channel<SaveTask> = unboundedChannel()
private lateinit var completableDeferred: Deferred<Unit>

fun initialize(scope: CoroutineScope) = with(scope) {
completableDeferred = async {
fun initialize(scope: CoroutineScope) {
completableDeferred = scope.async {
for (task in tasks) {
val cacheKey = testCacheKeyProvider.getCacheKey(task.poolId, task.result.test)
cache.store(cacheKey, task.result)
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/kotlin/com/malinskiy/marathon/device/Device.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,4 @@ interface Device {
)

suspend fun prepare(configuration: Configuration)
fun dispose()
}

Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class Scheduler(

private val logger = MarathonLogging.logger("Scheduler")

private val scope: CoroutineScope = CoroutineScope(context)
private val scope = CoroutineScope(context)

suspend fun initialize() {
logger.debug { "Initializing scheduler" }
Expand Down Expand Up @@ -124,8 +124,8 @@ class Scheduler(
}
}

private fun subscribeOnDevices(job: Job): Job {
return scope.launch {
private fun subscribeOnDevices(job: Job) {
scope.launch {
logger.debug { "Reading messages from device provider" }

for (msg in deviceProvider.subscribe()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,4 @@ class DeviceStub(
}

override suspend fun prepare(configuration: Configuration) {}

override fun dispose() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.malinskiy.marathon.device.Device
import java.awt.image.BufferedImage
import java.util.concurrent.TimeUnit

interface AndroidDevice : Device {
interface AndroidDevice : Device, AutoCloseable {
val apiLevel: Int
val version: AndroidVersion

Expand All @@ -27,6 +27,4 @@ interface AndroidDevice : Device {
remoteFilePath: String,
options: ScreenRecorderOptions
)

fun waitForAsyncWork()
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,44 +9,37 @@ import com.malinskiy.marathon.report.attachment.AttachmentProvider
import com.malinskiy.marathon.test.Test
import com.malinskiy.marathon.test.toSimpleSafeTestName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.newFixedThreadPoolContext
import kotlin.coroutines.CoroutineContext

class ScreenCapturerTestRunListener(
private val attachmentManager: AttachmentManager,
private val device: AndroidDevice
) : TestRunListener, CoroutineScope, AttachmentProvider {
private val device: AndroidDevice,
private val coroutineScope: CoroutineScope
) : TestRunListener, AttachmentProvider {

private val attachmentListeners = mutableListOf<AttachmentListener>()

override fun registerListener(listener: AttachmentListener) {
attachmentListeners.add(listener)
}

private var screenCapturerJob: Job? = null
private var screenCapturer: ScreenCapturer? = null
private val logger = MarathonLogging.logger(ScreenCapturerTestRunListener::class.java.simpleName)
private val dispatcher = Dispatchers.IO.limitedParallelism(1)

@OptIn(DelicateCoroutinesApi::class)
private val threadPoolDispatcher = newFixedThreadPoolContext(1, "ScreenCapturer - ${device.serialNumber}")
override val coroutineContext: CoroutineContext
get() = threadPoolDispatcher
override fun registerListener(listener: AttachmentListener) {
attachmentListeners.add(listener)
}

override fun testStarted(test: Test) {
logger.debug { "Starting recording for ${test.toSimpleSafeTestName()}" }
screenCapturer = ScreenCapturer(device, attachmentManager, test)
screenCapturerJob = async {
screenCapturerJob = coroutineScope.async(dispatcher) {
screenCapturer?.start()
}
}

override fun testEnded(test: Test, testMetrics: Map<String, String>) {
logger.debug { "Finished recording for ${test.toSimpleSafeTestName()}" }
screenCapturerJob?.cancel()
threadPoolDispatcher.close()

screenCapturer?.attachment?.let { attachment ->
attachmentListeners.forEach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,16 @@ import com.malinskiy.marathon.test.TestBatch
import com.malinskiy.marathon.time.Timer
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.newFixedThreadPoolContext
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.cancel
import java.awt.image.BufferedImage
import java.io.File
import java.io.IOException
import java.util.UUID
import java.util.concurrent.TimeUnit
import kotlin.coroutines.CoroutineContext

class DdmlibAndroidDevice(
val ddmsDevice: IDevice,
Expand All @@ -75,15 +74,18 @@ class DdmlibAndroidDevice(
private val reportsFileManager: FileManager,
private val serialStrategy: SerialStrategy,
private val logcatListener: LogcatListener,
private val strictRunChecker: StrictRunChecker
) : Device, CoroutineScope, AndroidDevice {
private val strictRunChecker: StrictRunChecker,
parentJob: Job = Job(),
) : Device, AndroidDevice {
override val fileManager = RemoteFileManager(this)

override val version: AndroidVersion by lazy { ddmsDevice.version }
private val nullOutputReceiver = NullOutputReceiver()
private val parentJob: Job = Job()

private var logcatReceiver: CliLogcatReceiver? = null
private val dispatcher = Dispatchers.IO.limitedParallelism(1)
private val job = SupervisorJob(parentJob)
private val coroutineScope = CoroutineScope(job + dispatcher)
private val logger = MarathonLogging.logger(DdmlibAndroidDevice::class.java.simpleName)

private val logMessagesListener: (List<LogCatMessage>) -> Unit = {
it.forEach { msg ->
logcatListener.onMessage(this, msg.toMarathonLogcatMessage())
Expand All @@ -102,7 +104,7 @@ class DdmlibAndroidDevice(

override fun executeCommand(command: String, errorMessage: String) {
try {
ddmsDevice.safeExecuteShellCommand(command, nullOutputReceiver)
ddmsDevice.safeExecuteShellCommand(command, NullOutputReceiver())
} catch (e: TimeoutException) {
logger.error("$errorMessage while executing $command", e)
} catch (e: AdbCommandRejectedException) {
Expand Down Expand Up @@ -147,14 +149,6 @@ class DdmlibAndroidDevice(
)
}

override fun waitForAsyncWork() {
runBlocking(context = coroutineContext) {
parentJob.children.forEach {
it.join()
}
}
}

private fun bufferedImageFrom(rawImage: RawImage): BufferedImage {
val image = BufferedImage(rawImage.width, rawImage.height, BufferedImage.TYPE_INT_ARGB)

Expand All @@ -169,15 +163,6 @@ class DdmlibAndroidDevice(
return image
}

@OptIn(DelicateCoroutinesApi::class)
private val dispatcher by lazy {
newFixedThreadPoolContext(1, "AndroidDevice - execution - ${ddmsDevice.serialNumber}")
}

override val coroutineContext: CoroutineContext = dispatcher

private val logger = MarathonLogging.logger(DdmlibAndroidDevice::class.java.simpleName)

override val abi: String by lazy {
ddmsDevice.getProperty("ro.product.cpu.abi") ?: "Unknown"
}
Expand Down Expand Up @@ -231,6 +216,7 @@ class DdmlibAndroidDevice(
?: serialNumber.takeIf { it.isNotEmpty() }
?: UUID.randomUUID().toString()
}

SerialStrategy.MARATHON_PROPERTY -> marathonSerialProp
SerialStrategy.BOOT_PROPERTY -> serialProp
SerialStrategy.HOSTNAME -> hostName
Expand Down Expand Up @@ -276,15 +262,15 @@ class DdmlibAndroidDevice(
val androidComponentInfo = testBatch.componentInfo as AndroidComponentInfo

try {
async { ensureInstalled(androidComponentInfo) }.await()
coroutineScope.async { ensureInstalled(androidComponentInfo) }.await()
} catch (@Suppress("TooGenericExceptionCaught") e: Throwable) {
logger.error(e) { "Terminating device $serialNumber due to installation failures" }
throw DeviceLostException(e)
}

safePrintToLogcat(SERVICE_LOGS_TAG, "\"batch_started: {${testBatch.id}}\"")

val deferredResult = async {
val deferredResult = coroutineScope.async {
val listeners = createListeners(configuration, devicePoolId, testBatch, deferred, progressReporter)
val listener = DdmlibTestRunListener(testBatch.componentInfo, listeners)
AndroidDeviceTestRunner(this@DdmlibAndroidDevice).execute(configuration, testBatch, listener)
Expand Down Expand Up @@ -334,21 +320,16 @@ class DdmlibAndroidDevice(

override suspend fun prepare(configuration: Configuration) {
track.trackDevicePreparing(this) {
val deferred = async {
val logcatReceiver = CliLogcatReceiver(adbPath, reportsFileManager, ddmsDevice, logMessagesListener)
val deferred = coroutineScope.async {
clearLogcat(ddmsDevice)

logcatReceiver = CliLogcatReceiver(adbPath, reportsFileManager, ddmsDevice, logMessagesListener)
logcatReceiver?.start()
logcatReceiver.start()
}
job.invokeOnCompletion { logcatReceiver.close() }
deferred.await()
}
}

override fun dispose() {
logcatReceiver?.dispose()
dispatcher.close()
}

private fun selectRecorderType(preferred: DeviceFeature?, features: Collection<DeviceFeature>) = when {
features.contains(preferred) -> preferred
features.contains(DeviceFeature.VIDEO) -> DeviceFeature.VIDEO
Expand All @@ -370,6 +351,10 @@ class DdmlibAndroidDevice(
return receiver.output()
}

override fun close() {
coroutineScope.cancel()
}

private fun prepareRecorderListener(
feature: DeviceFeature,
attachmentProviders: MutableList<AttachmentProvider>
Expand All @@ -381,7 +366,7 @@ class DdmlibAndroidDevice(
}

DeviceFeature.SCREENSHOT -> {
ScreenCapturerTestRunListener(attachmentManager, this)
ScreenCapturerTestRunListener(attachmentManager, this, coroutineScope)
.also { attachmentProviders.add(it) }
}
}
Expand Down
Loading

0 comments on commit 497d2a3

Please sign in to comment.