From e278c70900f7b2909809b8be1d4335a04d0fc435 Mon Sep 17 00:00:00 2001 From: Tarek Belkahia Date: Tue, 27 Aug 2024 03:35:13 +0100 Subject: [PATCH] Refactor TestCommand file --- .../java/maestro/cli/command/TestCommand.kt | 396 +++++++++++------- 1 file changed, 234 insertions(+), 162 deletions(-) diff --git a/maestro-cli/src/main/java/maestro/cli/command/TestCommand.kt b/maestro-cli/src/main/java/maestro/cli/command/TestCommand.kt index e72a08e36d..2e41ab4ea1 100644 --- a/maestro-cli/src/main/java/maestro/cli/command/TestCommand.kt +++ b/maestro-cli/src/main/java/maestro/cli/command/TestCommand.kt @@ -22,13 +22,17 @@ package maestro.cli.command import io.ktor.util.collections.ConcurrentSet import kotlinx.coroutines.* import kotlinx.coroutines.sync.Semaphore +import maestro.Maestro import maestro.cli.App import maestro.cli.CliError import maestro.cli.DisableAnsiMixin import maestro.cli.ShowHelpMixin +import maestro.cli.device.Device import maestro.cli.device.DeviceCreateUtil import maestro.cli.device.DeviceService +import maestro.cli.device.PickDeviceInteractor import maestro.cli.device.PickDeviceView +import maestro.cli.model.DeviceStartOptions import maestro.cli.model.TestExecutionSummary import maestro.cli.report.ReportFormat import maestro.cli.report.ReporterFactory @@ -39,12 +43,14 @@ import maestro.cli.runner.resultview.AnsiResultView import maestro.cli.runner.resultview.PlainTextResultView import maestro.cli.session.MaestroSessionManager import maestro.cli.util.PrintUtils +import maestro.cli.view.box import maestro.orchestra.error.ValidationError import maestro.orchestra.util.Env.withInjectedShellEnvVars import maestro.orchestra.workspace.WorkspaceExecutionPlanner import maestro.orchestra.workspace.WorkspaceExecutionPlanner.ExecutionPlan import maestro.orchestra.yaml.YamlCommandReader import okio.sink +import org.slf4j.LoggerFactory import picocli.CommandLine import picocli.CommandLine.Option import java.io.File @@ -55,8 +61,6 @@ import java.util.concurrent.CountDownLatch import kotlin.io.path.absolutePathString import kotlin.system.exitProcess import kotlin.time.Duration.Companion.seconds -import maestro.cli.device.PickDeviceInteractor -import org.slf4j.LoggerFactory @CommandLine.Command( name = "test", @@ -173,18 +177,9 @@ class TestCommand : Callable { } private fun handleSessions(debugOutputPath: Path, plan: ExecutionPlan): Int = runBlocking(Dispatchers.IO) { - val sharded = shards > 1 runCatching { - val deviceIds = (if (isWebFlow()) - "chromium".also { - PrintUtils.warn("Web support is an experimental feature and may be removed in future versions.\n") - } - else parent?.deviceId) - .orEmpty() - .split(",") - .map { it.trim() } - .filter { it.isNotBlank() } + val deviceIds = getPassedOptionsDeviceIds() val connectedDevices = DeviceService.listConnectedDevices() initialActiveDevices.addAll(connectedDevices.map { it.instanceId }.toSet()) @@ -193,36 +188,15 @@ class TestCommand : Callable { // Collect device configurations for missing shards, if any val availableDevices = if (deviceIds.isNotEmpty()) deviceIds.size else initialActiveDevices.size val missingDevices = effectiveShards - availableDevices - if (missingDevices > 0) { - val message = """ - Found $availableDevices active devices. - Need to create or start $missingDevices more for $effectiveShards shards. Continue? y/n - """.trimIndent() - PrintUtils.message(message) - val str = readlnOrNull()?.lowercase() - val granted = str?.isBlank() == true || str == "y" || str == "yes" - if (!granted) { - PrintUtils.message("Continuing with only $availableDevices shards.") - effectiveShards = availableDevices - } + + if (!promptForDeviceCreation(availableDevices, effectiveShards)) { + PrintUtils.message("Continuing with only $availableDevices shards.") + effectiveShards = availableDevices } - val missingDevicesConfigs = (availableDevices until effectiveShards).mapNotNull { shardIndex -> - PrintUtils.message("Creating device for shard ${shardIndex + 1}:") - PickDeviceView.requestDeviceOptions() - }.toMutableList() - - val chunkPlans = plan.flowsToRun - .withIndex() - .groupBy { it.index % effectiveShards } - .map { (shardIndex, files) -> - ExecutionPlan( - files.map { it.value }, - plan.sequence.also { - if (it?.flows?.isNotEmpty() == true && sharded) - error("Cannot run sharded tests with sequential execution.") - } - ) - } + val missingDevicesConfigs = createMissingDevices(availableDevices, effectiveShards).toMutableList() + + val sharded = effectiveShards > 1 + val chunkPlans = makeChunkPlans(plan, effectiveShards, sharded) val barrier = CountDownLatch(effectiveShards) @@ -230,115 +204,16 @@ class TestCommand : Callable { val results = (0 until effectiveShards).map { shardIndex -> async(Dispatchers.IO) { - val driverHostPort = if (!sharded) parent?.port ?: 7001 else - (7001..7128).shuffled().find { port -> - usedPorts.putIfAbsent(port, true) == null - } ?: error("No available ports found") - - // Acquire lock to execute device creation block - deviceCreationSemaphore.acquire() - - val deviceId = - // 1. Reuse existing device if deviceId provided - deviceIds.getOrNull(shardIndex) - // 2. Reuse existing device if connected device found - ?: when { - deviceIds.isNotEmpty() -> null - missingDevices >= 0 -> initialActiveDevices.elementAtOrNull(shardIndex) - initialActiveDevices.isNotEmpty() -> PickDeviceInteractor.pickDevice().instanceId - else -> null - } - // 3. Create a new device - ?: run { - val cfg = missingDevicesConfigs.first() - missingDevicesConfigs.remove(cfg) - val deviceCreated = DeviceCreateUtil.getOrCreateDevice( - platform = cfg.platform, - osVersion = cfg.osVersion, - forceCreate = true, - shardIndex = shardIndex - ) - - val device = DeviceService.startDevice( - device = deviceCreated, - driverHostPort = driverHostPort, - connectedDevices = initialActiveDevices + currentActiveDevices - ) - device.instanceId.also { - currentActiveDevices.add(it) - delay(2.seconds) - } - } - logger.info("Selected device $deviceId for shard ${shardIndex + 1} on port $driverHostPort") - - // Release lock if device ID was obtained from the connected devices - deviceCreationSemaphore.release() - // Signal that this thread has reached the barrier - barrier.countDown() - // Wait for all threads/shards to complete device creation before proceeding - barrier.await() - - MaestroSessionManager.newSession( - host = parent?.host, - port = parent?.port, - driverHostPort = driverHostPort, - deviceId = deviceId - ) { session -> - val maestro = session.maestro - val device = session.device - - if (flowFile.isDirectory || format != ReportFormat.NOOP) { - if (continuous) { - throw CommandLine.ParameterException( - commandSpec.commandLine(), - "Continuous mode is not supported for directories. $flowFile is a directory", - ) - } - - val suiteResult = TestSuiteInteractor( - maestro = maestro, - device = device, - reporter = ReporterFactory.buildReporter(format, testSuiteName), - ).runTestSuite( - executionPlan = chunkPlans[shardIndex], - env = env, - reportOut = null, - debugOutputPath = debugOutputPath - ) - - if (!flattenDebugOutput) { - TestDebugReporter.deleteOldFiles() - } - Triple(suiteResult.passedCount, suiteResult.totalTests, suiteResult) - } else { - if (continuous) { - if (!flattenDebugOutput) { - TestDebugReporter.deleteOldFiles() - } - TestRunner.runContinuous(maestro, device, flowFile, env) - } else { - val resultView = - if (DisableAnsiMixin.ansiEnabled) AnsiResultView() - else PlainTextResultView() - val resultSingle = TestRunner.runSingle( - maestro, - device, - flowFile, - env, - resultView, - debugOutputPath - ) - if (resultSingle == 1) { - printExitDebugMessage() - } - if (!flattenDebugOutput) { - TestDebugReporter.deleteOldFiles() - } - val result = if (resultSingle == 0) 1 else 0 - return@newSession Triple(result, 1, null) - } - } - } + runShardSuite( + sharded, + deviceIds, + shardIndex, + missingDevices, + missingDevicesConfigs, + barrier, + chunkPlans, + debugOutputPath + ) } }.awaitAll() @@ -358,6 +233,209 @@ class TestCommand : Callable { }.getOrDefault(0) } + private suspend fun runShardSuite( + sharded: Boolean, + deviceIds: List, + shardIndex: Int, + missingDevices: Int, + missingDevicesConfigs: MutableList, + barrier: CountDownLatch, + chunkPlans: List, + debugOutputPath: Path + ): Triple = withContext(Dispatchers.IO) { + val driverHostPort = if (!sharded) parent?.port ?: 7001 else + (7001..7128).shuffled().find { port -> + usedPorts.putIfAbsent(port, true) == null + } ?: error("No available ports found") + + // Acquire lock to execute device creation block + deviceCreationSemaphore.acquire() + + val deviceId = assignDeviceToShard(deviceIds, shardIndex, missingDevices, missingDevicesConfigs, driverHostPort) + logger.info("Selected device $deviceId for shard ${shardIndex + 1} on port $driverHostPort") + + // Release lock if device ID was obtained from the connected devices + deviceCreationSemaphore.release() + // Signal that this thread has reached the barrier + barrier.countDown() + // Wait for all threads/shards to complete device creation before proceeding + barrier.await() + + return@withContext MaestroSessionManager.newSession( + host = parent?.host, + port = parent?.port, + driverHostPort = driverHostPort, + deviceId = deviceId + ) { session -> + val maestro = session.maestro + val device = session.device + + if (flowFile.isDirectory || format != ReportFormat.NOOP) { + if (continuous) { + throw CommandLine.ParameterException( + commandSpec.commandLine(), + "Continuous mode is not supported for directories. $flowFile is a directory", + ) + } + runMultipleFlows(maestro, device, chunkPlans, shardIndex, debugOutputPath) + } else { + if (continuous) { + if (!flattenDebugOutput) { + TestDebugReporter.deleteOldFiles() + } + TestRunner.runContinuous(maestro, device, flowFile, env) + } else { + runSingleFlow(maestro, device, debugOutputPath) + } + } + } + } + + private fun runSingleFlow( + maestro: Maestro, + device: Device?, + debugOutputPath: Path + ): Triple { + val resultView = + if (DisableAnsiMixin.ansiEnabled) AnsiResultView() + else PlainTextResultView() + val resultSingle = TestRunner.runSingle( + maestro, + device, + flowFile, + env, + resultView, + debugOutputPath + ) + if (resultSingle == 1) { + printExitDebugMessage() + } + if (!flattenDebugOutput) { + TestDebugReporter.deleteOldFiles() + } + val result = if (resultSingle == 0) 1 else 0 + return Triple(result, 1, null) + } + + private fun runMultipleFlows( + maestro: Maestro, + device: Device?, + chunkPlans: List, + shardIndex: Int, + debugOutputPath: Path + ): Triple { + val suiteResult = TestSuiteInteractor( + maestro = maestro, + device = device, + reporter = ReporterFactory.buildReporter(format, testSuiteName), + ).runTestSuite( + executionPlan = chunkPlans[shardIndex], + env = env, + reportOut = null, + debugOutputPath = debugOutputPath + ) + + if (!flattenDebugOutput) { + TestDebugReporter.deleteOldFiles() + } + return Triple(suiteResult.passedCount, suiteResult.totalTests, suiteResult) + } + + private suspend fun assignDeviceToShard( + deviceIds: List, + shardIndex: Int, + missingDevices: Int, + missingDevicesConfigs: MutableList, + driverHostPort: Int + ): String = + useDevicesPassedAsOptions(deviceIds, shardIndex) + ?: useConnectedDevices(deviceIds, missingDevices, shardIndex) + ?: createNewDevice(missingDevicesConfigs, shardIndex, driverHostPort) + + private fun useDevicesPassedAsOptions(deviceIds: List, shardIndex: Int) = + deviceIds.getOrNull(shardIndex) + + private fun useConnectedDevices( + deviceIds: List, + missingDevices: Int, + shardIndex: Int + ) = when { + deviceIds.isNotEmpty() -> null + missingDevices >= 0 -> initialActiveDevices.elementAtOrNull(shardIndex) + initialActiveDevices.isNotEmpty() -> PickDeviceInteractor.pickDevice().instanceId + else -> null + } + + private suspend fun createNewDevice( + missingDevicesConfigs: MutableList, + shardIndex: Int, + driverHostPort: Int + ): String { + val cfg = missingDevicesConfigs.first() + missingDevicesConfigs.remove(cfg) + val deviceCreated = DeviceCreateUtil.getOrCreateDevice( + platform = cfg.platform, + osVersion = cfg.osVersion, + forceCreate = true, + shardIndex = shardIndex + ) + val device = DeviceService.startDevice( + device = deviceCreated, + driverHostPort = driverHostPort, + connectedDevices = initialActiveDevices + currentActiveDevices + ) + currentActiveDevices.add(device.instanceId) + delay(2.seconds) + return device.instanceId + } + + private fun makeChunkPlans( + plan: ExecutionPlan, + effectiveShards: Int, + shared: Boolean, + ) = plan.flowsToRun + .withIndex() + .groupBy { it.index % effectiveShards } + .map { (_, files) -> + val flowsToRun = files.map { it.value } + if (plan.sequence?.flows?.isNotEmpty() == true && shared) + error("Cannot run sharded tests with sequential execution.") + ExecutionPlan(flowsToRun, plan.sequence) + } + + private fun createMissingDevices( + availableDevices: Int, + effectiveShards: Int + ) = (availableDevices until effectiveShards).map { shardIndex -> + PrintUtils.message("Creating device for shard ${shardIndex + 1}:") + PickDeviceView.requestDeviceOptions() + } + + private fun promptForDeviceCreation(availableDevices: Int, effectiveShards: Int): Boolean { + val missingDevices = effectiveShards - availableDevices + if (missingDevices <= 0) return true + val message = """ + Found $availableDevices active devices. + Need to create or start $missingDevices more for $effectiveShards shards. Continue? y/n + """.trimIndent() + PrintUtils.message(message) + val str = readlnOrNull()?.lowercase() + return str?.isBlank() == true || str == "y" || str == "yes" + } + + private fun getPassedOptionsDeviceIds(): List { + val arguments = if (isWebFlow()) { + PrintUtils.warn("Web support is an experimental feature and may be removed in future versions.\n") + "chromium" + } else parent?.deviceId + val deviceIds = arguments + .orEmpty() + .split(",") + .map { it.trim() } + .filter { it.isNotBlank() } + return deviceIds + } + private fun printExitDebugMessage() { println() println("==== Debug output (logs & screenshots) ====") @@ -365,18 +443,11 @@ class TestCommand : Callable { } private fun printShardsMessage(passedTests: Int, totalTests: Int, shardResults: List) { - val box = buildString { - val lines = listOf("Passed: $passedTests/$totalTests") + - shardResults.mapIndexed { index, result -> - "[ ${result.suites.first().deviceName} ] - ${result.passedCount ?: 0}/${result.totalTests ?: 0}" - } - - val lineWidth = lines.maxOf(String::length) - append("┌${"─".repeat(lineWidth)}┐\n") - lines.forEach { append("│${it.padEnd(lineWidth)}│\n") } - append("└${"─".repeat(lineWidth)}┘") - } - PrintUtils.message(box) + val lines = listOf("Passed: $passedTests/$totalTests") + + shardResults.mapIndexed { _, result -> + "[ ${result.suites.first().deviceName} ] - ${result.passedCount ?: 0}/${result.totalTests ?: 0}" + } + PrintUtils.message(lines.joinToString("\n").box()) } private fun TestExecutionSummary.saveReport() { @@ -392,6 +463,7 @@ class TestCommand : Callable { ) } } + private fun List.mergeSummaries(): TestExecutionSummary? = reduceOrNull { acc, summary -> TestExecutionSummary( passed = acc.passed && summary.passed,