From d913336f5a385421121b19b61ff650e2b30e3746 Mon Sep 17 00:00:00 2001 From: Anton Malinskiy Date: Tue, 25 Jun 2024 23:15:06 +1000 Subject: [PATCH] fix(core): improve return batch behavior --- .../marathon/execution/DevicePoolActor.kt | 3 +- .../marathon/execution/DevicePoolMessage.kt | 8 +-- .../marathon/execution/device/DeviceActor.kt | 6 +- .../marathon/execution/queue/QueueActor.kt | 6 +- .../execution/device/DeviceActorTest.kt | 72 +++++++++++++++++++ 5 files changed, 86 insertions(+), 9 deletions(-) create mode 100644 core/src/test/kotlin/com/malinskiy/marathon/execution/device/DeviceActorTest.kt diff --git a/core/src/main/kotlin/com/malinskiy/marathon/execution/DevicePoolActor.kt b/core/src/main/kotlin/com/malinskiy/marathon/execution/DevicePoolActor.kt index cccfdb718..aaf233bdf 100644 --- a/core/src/main/kotlin/com/malinskiy/marathon/execution/DevicePoolActor.kt +++ b/core/src/main/kotlin/com/malinskiy/marathon/execution/DevicePoolActor.kt @@ -86,6 +86,7 @@ class DevicePoolActor( } private suspend fun deviceReturnedTestBatch(device: Device, batch: TestBatch, reason: String) { + logger.debug { "pool $poolId: device ${device.serialNumber} returned test batch" } queue.send(QueueMessage.ReturnBatch(device.toDeviceInfo(), batch, reason)) } @@ -98,7 +99,7 @@ class DevicePoolActor( } // Requests a batch of tests for a random device from the list of devices not running tests at the moment. - // When @avoidingDevice is not null, attemtps to send the request for any other device whenever available. + // When @avoidingDevice is not null, attempts to send the request for any other device whenever available. private suspend fun maybeRequestBatch(avoidingDevice: Device? = null) { val availableDevices = devices.values.asSequence() .map { it as DeviceActor } diff --git a/core/src/main/kotlin/com/malinskiy/marathon/execution/DevicePoolMessage.kt b/core/src/main/kotlin/com/malinskiy/marathon/execution/DevicePoolMessage.kt index 84ac39287..c7cc335b1 100644 --- a/core/src/main/kotlin/com/malinskiy/marathon/execution/DevicePoolMessage.kt +++ b/core/src/main/kotlin/com/malinskiy/marathon/execution/DevicePoolMessage.kt @@ -11,10 +11,10 @@ sealed class DevicePoolMessage { object Terminate : FromScheduler() } - sealed class FromDevice(val device: Device) : DevicePoolMessage() { - class IsReady(device: Device) : FromDevice(device) - class CompletedTestBatch(device: Device, val results: TestBatchResults) : FromDevice(device) - class ReturnTestBatch(device: Device, val batch: TestBatch, val reason: String) : FromDevice(device) + sealed class FromDevice(open val device: Device) : DevicePoolMessage() { + data class IsReady(override val device: Device) : FromDevice(device) + data class CompletedTestBatch(override val device: Device, val results: TestBatchResults) : FromDevice(device) + data class ReturnTestBatch(override val device: Device, val batch: TestBatch, val reason: String) : FromDevice(device) } sealed class FromQueue : DevicePoolMessage() { diff --git a/core/src/main/kotlin/com/malinskiy/marathon/execution/device/DeviceActor.kt b/core/src/main/kotlin/com/malinskiy/marathon/execution/device/DeviceActor.kt index 91e1cf0b5..7c01a7cca 100644 --- a/core/src/main/kotlin/com/malinskiy/marathon/execution/device/DeviceActor.kt +++ b/core/src/main/kotlin/com/malinskiy/marathon/execution/device/DeviceActor.kt @@ -17,10 +17,12 @@ import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CompletionHandler import kotlinx.coroutines.Job +import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.async import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext import kotlin.coroutines.CoroutineContext class DeviceActor( @@ -216,7 +218,9 @@ class DeviceActor( private fun returnBatchAnd(batch: TestBatch, reason: String, completionHandler: CompletionHandler = {}): Job { return launch { - pool.send(DevicePoolMessage.FromDevice.ReturnTestBatch(device, batch, reason)) + withContext(NonCancellable) { + pool.send(DevicePoolMessage.FromDevice.ReturnTestBatch(device, batch, reason)) + } }.apply { invokeOnCompletion(completionHandler) } diff --git a/core/src/main/kotlin/com/malinskiy/marathon/execution/queue/QueueActor.kt b/core/src/main/kotlin/com/malinskiy/marathon/execution/queue/QueueActor.kt index 999e4925a..04919d76d 100644 --- a/core/src/main/kotlin/com/malinskiy/marathon/execution/queue/QueueActor.kt +++ b/core/src/main/kotlin/com/malinskiy/marathon/execution/queue/QueueActor.kt @@ -73,7 +73,7 @@ class QueueActor( } is QueueMessage.ReturnBatch -> { - onReturnBatch(msg.device, msg.batch) + onReturnBatch(msg.device, msg.batch, msg.reason) } } } @@ -124,8 +124,8 @@ class QueueActor( } } - private suspend fun onReturnBatch(device: DeviceInfo, batch: TestBatch) { - logger.debug { "onReturnBatch ${device.serialNumber}" } + private suspend fun onReturnBatch(device: DeviceInfo, batch: TestBatch, reason: String) { + logger.debug { "onReturnBatch ${device.serialNumber}. reason=$reason" } val uncompletedTests = batch.tests val results = uncompletedTests.map { diff --git a/core/src/test/kotlin/com/malinskiy/marathon/execution/device/DeviceActorTest.kt b/core/src/test/kotlin/com/malinskiy/marathon/execution/device/DeviceActorTest.kt new file mode 100644 index 000000000..1912a044a --- /dev/null +++ b/core/src/test/kotlin/com/malinskiy/marathon/execution/device/DeviceActorTest.kt @@ -0,0 +1,72 @@ +package com.malinskiy.marathon.execution.device + +import com.malinskiy.marathon.config.Configuration +import com.malinskiy.marathon.config.strategy.BatchingStrategyConfiguration +import com.malinskiy.marathon.config.vendor.VendorConfiguration +import com.malinskiy.marathon.device.DevicePoolId +import com.malinskiy.marathon.execution.DevicePoolMessage +import com.malinskiy.marathon.execution.TestStatus +import com.malinskiy.marathon.generateTest +import com.malinskiy.marathon.test.StubDevice +import com.malinskiy.marathon.test.TestBatch +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.runBlocking +import org.amshove.kluent.shouldBeEqualTo +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import java.io.File + +class DeviceActorTest { + lateinit var job: Job + lateinit var poolChannel: Channel + + @BeforeEach + fun setup() { + poolChannel = Channel() + job = Job() + } + + @AfterEach + fun teardown() { + job.cancel() + } + + @Test + fun `terminate while in progress`() { + val devicePoolId = DevicePoolId("test") + val device = StubDevice(prepareTimeMillis = 1000L, testTimeMillis = 10000L) + val actor = DeviceActor( + devicePoolId, poolChannel, defaultConfiguration.copy( + uncompletedTestRetryQuota = 0, + batchingStrategy = BatchingStrategyConfiguration.FixedSizeBatchingStrategyConfiguration(size = 1) + ), device, job, Dispatchers.Unconfined + ) + + runBlocking { + val test1 = generateTest() + val testBatch = TestBatch(listOf(test1)) + device.executionResults = mapOf(test1 to Array(1) { TestStatus.FAILURE }) + + actor.send(DeviceEvent.Initialize) + var message = poolChannel.receive() + message.shouldBeEqualTo(DevicePoolMessage.FromDevice.IsReady(device)) + + actor.send(DeviceEvent.Execute(testBatch)) + actor.send(DeviceEvent.Terminate) + + message = poolChannel.receive() + message.shouldBeEqualTo(DevicePoolMessage.FromDevice.ReturnTestBatch(device, testBatch, "Device serial-1 terminated")) + } + } + + private val defaultConfiguration = Configuration.Builder( + name = "", + outputDir = File(""), + ).apply { + vendorConfiguration = VendorConfiguration.StubVendorConfiguration + analyticsTracking = false + }.build() +}