From be98c3361316a421e21e0badad274a3d9c497e6c Mon Sep 17 00:00:00 2001 From: Saeed Rezaee Date: Mon, 29 Jul 2024 18:05:51 +0200 Subject: [PATCH] Fix crash when stopping haraClient in Downloading/Updating state This commit fixes the crash when haraClient is stopped during the Downloading/Updating state due to actors were not closed properly. (Reported in #54) UF-930 Signed-off-by: Saeed Rezaee --- .../ddiclient/api/actors/AbstractActor.kt | 49 +++- .../ddiclient/api/actors/ActionManager.kt | 2 +- .../ddiclient/api/actors/ConnectionManager.kt | 15 +- .../ddiclient/api/actors/DeploymentManager.kt | 6 +- .../ddiclient/api/actors/DownloadManager.kt | 12 +- .../ddiclient/api/actors/FileDownloader.kt | 49 +++- .../api/actors/NotificationManager.kt | 2 +- .../hara/ddiclient/api/actors/RootActor.kt | 7 +- .../ddiclient/api/actors/UpdateManager.kt | 14 +- .../integrationtest/HaraClientStoppingTest.kt | 277 ++++++++++++++++++ .../abstractions/AbstractHaraMessageTest.kt | 5 +- .../abstractions/AbstractTest.kt | 18 +- 12 files changed, 383 insertions(+), 73 deletions(-) create mode 100644 src/test/kotlin/org/eclipse/hara/ddiclient/integrationtest/HaraClientStoppingTest.kt diff --git a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/AbstractActor.kt b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/AbstractActor.kt index 97a3d5b..08ba1f0 100644 --- a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/AbstractActor.kt +++ b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/AbstractActor.kt @@ -41,15 +41,24 @@ abstract class AbstractActor protected constructor(private val actorScope: Actor private var __receive__: Receive = EmptyReceive - private val childs: MutableMap = emptyMap().toMutableMap() + private val children: MutableMap = emptyMap().toMutableMap() - protected fun child(name: String) = childs[name] + protected fun child(name: String) = children[name] protected fun become(receive: Receive) { __receive__ = receive } protected val LOG = LoggerFactory.getLogger(this::class.java) - protected fun unhandled(msg: Any) { + protected suspend fun handleMsgDefault(msg: Any) { + when (msg) { + is ConnectionManager.Companion.Message.In.Stop -> { + stopActor() + } + else -> unhandled(msg) + } + } + + private fun unhandled(msg: Any) { if (LOG.isWarnEnabled) { LOG.warn("received unexpected message $msg in ${coroutineContext[CoroutineName]} actor") } @@ -59,22 +68,22 @@ abstract class AbstractActor protected constructor(private val actorScope: Actor protected val name: String = coroutineContext[CoroutineName]!!.name + protected open suspend fun stopActor() { + forEachActorNode { it.send(ConnectionManager.Companion.Message.In.Stop) } + channel.close() + } + protected open fun beforeCloseChannel() { - childs.forEach { (_, c) -> c.close() } + children.forEach { (_, c) -> c.close() } } - protected fun forEachActorNode(ope: (ActorRef) -> Unit) { - childs.forEach { (_, actorRef) -> ope(actorRef) } + protected suspend fun forEachActorNode(ope: suspend (ActorRef) -> Unit) { + children.forEach { (_, actorRef) -> ope(actorRef) } } override val channel: Channel = object : Channel by actorScope.channel { override suspend fun send(element: Any) { - if(actorScope.channel.isClosedForSend){ - LOG.debug("Channel is close for send. Message {} isn't sent to actor {}.", element.javaClass.simpleName, name) - } else { - LOG.debug("Send message {} to actor {}.", element.javaClass.simpleName, name) - actorScope.channel.send(element) - } + actorScope.channel.sendMessageToChannelIfOpen(element, name) } override fun close(cause: Throwable?): Boolean { @@ -94,7 +103,7 @@ abstract class AbstractActor protected constructor(private val actorScope: Actor val childRef = actorScope.actor( Dispatchers.IO.plus(CoroutineName(name)).plus(ParentActor(this.channel)).plus(context), capacity, start, onCompletion) { __workflow__(LOG, block)() } - childs.put(name, childRef) + children.put(name, childRef) return childRef } @@ -166,3 +175,17 @@ data class ParentActor(val ref: ActorRef) : AbstractCoroutineContextElement(Pare class ActorException(val actorName: String, val actorRef: ActorRef, throwable: Throwable) : Exception(throwable) class ActorCreationException(val actorName: String, val actorRef: ActorRef, throwable: Throwable) : Exception(throwable) + + +@OptIn(ExperimentalCoroutinesApi::class) +suspend fun SendChannel.sendMessageToChannelIfOpen(message: Any, + name: String = this.toString()) { + val logger = LoggerFactory.getLogger(this::class.java) + if (isClosedForSend) { + logger.debug("Channel is close for send. Message {} isn't sent to actor {}.", + message.javaClass.simpleName, name) + } else { + logger.debug("Send message {} to actor {}.", message.javaClass.simpleName, name) + send(message) + } +} \ No newline at end of file diff --git a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/ActionManager.kt b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/ActionManager.kt index bdb143b..7b2f6b8 100644 --- a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/ActionManager.kt +++ b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/ActionManager.kt @@ -79,7 +79,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { LOG.warn("ErrMsg. Not yet implemented") } - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } diff --git a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/ConnectionManager.kt b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/ConnectionManager.kt index f8c02a7..e011a4f 100644 --- a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/ConnectionManager.kt +++ b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/ConnectionManager.kt @@ -46,7 +46,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { } init{ - CoroutineScope(Dispatchers.IO).launch{ + scope.launch{ val isSuccessful: (Response) -> Boolean = { response -> when(response.code()){ @@ -72,7 +72,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { channel.send(In.Ping) } if (msg is In.DeploymentFeedback) { - notificationManager.send(MessageListener.Message.Event + notificationManager.sendMessageToChannelIfOpen(MessageListener.Message.Event .DeployFeedbackRequestResult(success, msg.feedback.id, msg.closeAction, msg.feedback.status.details)) } @@ -113,15 +113,13 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { is In.Start -> become(runningReceive(startPing(state))) - is In.Stop -> {} - is In.Register -> become(stoppedReceive(state.withReceiver(msg.listener))) is In.Unregister -> become(stoppedReceive(state.withoutReceiver(msg.listener))) is In.SetPing -> become(stoppedReceive(state.copy(clientPingInterval = msg.duration, lastPing = Instant.EPOCH))) - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } @@ -130,7 +128,10 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { is In.Start -> {} - is In.Stop -> become(stoppedReceive(stopPing(state))) + is In.Stop -> { + become(stoppedReceive(stopPing(state))) + stopActor() + } is In.Register -> become(runningReceive(state.withReceiver(msg.listener))) @@ -158,7 +159,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { } else -> { - unhandled(msg) + handleMsgDefault(msg) } } } diff --git a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/DeploymentManager.kt b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/DeploymentManager.kt index 59a1268..3280761 100644 --- a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/DeploymentManager.kt +++ b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/DeploymentManager.kt @@ -36,7 +36,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { stopUpdateAndNotify(msg) } - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } @@ -61,7 +61,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { is CancelForced -> { stopUpdate() } - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } @@ -95,7 +95,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { is CancelForced -> { LOG.info("Force cancel ignored") } - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } diff --git a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/DownloadManager.kt b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/DownloadManager.kt index 6375e35..f3fde5b 100644 --- a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/DownloadManager.kt +++ b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/DownloadManager.kt @@ -72,7 +72,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { } } - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } @@ -196,7 +196,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { stopUpdate() } - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } @@ -221,7 +221,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { Status.ERROR, "Failed to download file with md5 ${msg.md5} due to ${msg.message}", msg.message) } - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } @@ -323,12 +323,6 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { } } - //todo remove FileDownloader.Companion.Message.Stop message and use default implementation of beforeCloseChannel - @OptIn(ExperimentalCoroutinesApi::class) - override fun beforeCloseChannel() { - forEachActorNode { actorRef -> if(!actorRef.isClosedForSend) launch { actorRef.send(FileDownloader.Companion.Message.Stop) } } - } - init { become(beforeStartReceive()) } diff --git a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/FileDownloader.kt b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/FileDownloader.kt index aa237c4..8527e11 100644 --- a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/FileDownloader.kt +++ b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/FileDownloader.kt @@ -25,7 +25,6 @@ import java.lang.StringBuilder import java.security.DigestInputStream import java.security.MessageDigest import kotlin.time.DurationUnit -import kotlin.time.ExperimentalTime import kotlin.time.toDuration @OptIn(ObsoleteCoroutinesApi::class) @@ -40,6 +39,8 @@ private constructor( private val downloadBehavior: DownloadBehavior = coroutineContext[HaraClientContext]!!.downloadBehavior private val notificationManager = coroutineContext[NMActor]!!.ref private val connectionManager = coroutineContext[CMActor]!!.ref + private var downloadJob: Job? = null + private var downloadScope: CoroutineScope = CoroutineScope(Dispatchers.IO) private fun beforeStart(state: State): Receive = { msg -> when (msg) { @@ -54,9 +55,7 @@ private constructor( } } - is Message.Stop -> this.cancel() - - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } @@ -84,21 +83,18 @@ private constructor( tryDownload(newState, msg.cause) } - is Message.Stop -> this.cancel() - - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } - @OptIn(ExperimentalTime::class) - private suspend fun tryDownload(state: State, error:Throwable? = null) = withContext(Dispatchers.IO){ + private suspend fun tryDownload(state: State, error:Throwable? = null) { when(val tryDownload = downloadBehavior.onAttempt(state.currentAttempt, "${state.actionId}-${fileToDownload.md5}", error)){ is DownloadBehavior.Try.Stop -> channel.send(Message.TrialExhausted) is DownloadBehavior.Try.After -> { - launch { + downloadJob = downloadScope.launch { if(error != null){ val errorMessage = "Retry download of ${fileToDownload.fileName} due to: $error. The download will start in ${tryDownload.seconds.toDuration(DurationUnit.SECONDS)}." parent!!.send(Message.Info(channel, fileToDownload.md5, errorMessage)) @@ -136,14 +132,27 @@ private constructor( val timer = checkDownloadProgress(inputStream, queue, actionId) runCatching { - file.outputStream().use { - inputStream.copyTo(it) + inputStream.use { inputStream -> + file.outputStream().use { outputStream -> + val buffer = ByteArray(DEFAULT_BUFFER_SIZE) + var progressBytes = 0L + var bytes = inputStream.read(buffer) + while (bytes >= 0) { + if (!downloadScope.isActive) { + LOG.info("Download of ${fileToDownload.fileName} was cancelled") + return + } + outputStream.write(buffer, 0, bytes) + progressBytes += bytes + bytes = inputStream.read(buffer) + } + } } }.also { timer.purge() timer.cancel() }.onFailure { - throw it + throw it } } @@ -209,6 +218,19 @@ private constructor( } } + override suspend fun stopActor() { + runCatching { + downloadJob?.let { + LOG.debug("Cancelling download job $it") + if (it.isActive) { + it.cancel() + downloadScope.cancel() + } + } + } + super.stopActor() + } + private fun State.nextAttempt():Int = if (currentAttempt == Int.MAX_VALUE) currentAttempt else currentAttempt + 1 init { @@ -244,7 +266,6 @@ private constructor( sealed class Message { object Start : Message() - object Stop : Message() object FileDownloaded : Message() object FileChecked : Message() diff --git a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/NotificationManager.kt b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/NotificationManager.kt index cfeb41e..b23056d 100644 --- a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/NotificationManager.kt +++ b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/NotificationManager.kt @@ -25,7 +25,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { is MessageListener.Message -> listeners.forEach { it.onMessage(msg) } - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } diff --git a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/RootActor.kt b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/RootActor.kt index a571238..708f15d 100644 --- a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/RootActor.kt +++ b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/RootActor.kt @@ -23,12 +23,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { when (msg) { is In.Start, In.ForcePing -> child("connectionManager")!!.send(msg) - is In.Stop -> { - child("connectionManager")!!.send(msg) - channel.close() - } - - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } diff --git a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/UpdateManager.kt b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/UpdateManager.kt index 464b07f..cfc2891 100644 --- a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/UpdateManager.kt +++ b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/UpdateManager.kt @@ -62,7 +62,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { LOG.info(message) } - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } @@ -89,7 +89,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { stopUpdate() } - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } @@ -146,7 +146,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { sendFeedback(msg.info.id, closed, Progress(0,0), success, "No update applied" ) - notificationManager.send(MessageListener.Message.Event.NoUpdate) + notificationManager.sendMessageToChannelIfOpen(MessageListener.Message.Event.NoUpdate) } updaterError.isNotEmpty() -> { @@ -154,14 +154,14 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { parent!!.send(DeploymentManager.Companion.Message.UpdateFailed) sendFeedback(msg.info.id, closed, Progress(updaters.size, updaterError[0].first), failure, *details.toTypedArray()) - notificationManager.send(MessageListener.Message.Event.UpdateFinished(successApply = false, details = details)) + notificationManager.sendMessageToChannelIfOpen(MessageListener.Message.Event.UpdateFinished(successApply = false, details = details)) } else -> { parent!!.send(DeploymentManager.Companion.Message.UpdateFinished) sendFeedback(msg.info.id, closed, Progress(updaters.size, updaters.size), success, *details.toTypedArray()) - notificationManager.send(MessageListener.Message.Event.UpdateFinished(successApply = true, details = details)) + notificationManager.sendMessageToChannelIfOpen(MessageListener.Message.Event.UpdateFinished(successApply = true, details = details)) } } } @@ -201,7 +201,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { vararg messages: String ) { val request = DeploymentFeedbackRequest.newInstance(id, execution, progress, finished, *messages) - connectionManager.send(DeploymentFeedback(request)) + connectionManager.sendMessageToChannelIfOpen(DeploymentFeedback(request)) } private fun convert(swModule: Updater.SwModule, pathCalculator: (Updater.SwModule.Artifact) -> String): Updater.SwModuleWithPath = @@ -225,7 +225,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { private suspend fun stopUpdate() { LOG.info("Stopping update") channel.cancel() - notificationManager.send(MessageListener.Message.State.CancellingUpdate) + notificationManager.sendMessageToChannelIfOpen(MessageListener.Message.State.CancellingUpdate) parent!!.send(ActionManager.Companion.Message.UpdateStopped) } diff --git a/src/test/kotlin/org/eclipse/hara/ddiclient/integrationtest/HaraClientStoppingTest.kt b/src/test/kotlin/org/eclipse/hara/ddiclient/integrationtest/HaraClientStoppingTest.kt new file mode 100644 index 0000000..20e98c0 --- /dev/null +++ b/src/test/kotlin/org/eclipse/hara/ddiclient/integrationtest/HaraClientStoppingTest.kt @@ -0,0 +1,277 @@ +/* + * + * * Copyright © 2017-2024 Kynetics LLC + * * + * * This program and the accompanying materials are made + * * available under the terms of the Eclipse Public License 2.0 + * * which is available at https://www.eclipse.org/legal/epl-2.0/ + * * + * * SPDX-License-Identifier: EPL-2.0 + * + */ +package org.eclipse.hara.ddiclient.integrationtest + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.CoroutineStart +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.eclipse.hara.ddiclient.api.DownloadBehavior +import org.eclipse.hara.ddiclient.api.HaraClient +import org.eclipse.hara.ddiclient.api.MessageListener +import org.eclipse.hara.ddiclient.api.Updater +import org.eclipse.hara.ddiclient.integrationtest.abstractions.AbstractHaraMessageTest.ExpectedMessage +import org.eclipse.hara.ddiclient.integrationtest.abstractions.AbstractTest +import org.eclipse.hara.ddiclient.integrationtest.api.management.AssignDistributionType +import org.eclipse.hara.ddiclient.integrationtest.api.management.HawkbitAssignDistributionBody +import org.eclipse.hara.ddiclient.integrationtest.utils.TestUtils +import org.eclipse.hara.ddiclient.integrationtest.utils.internalLog +import org.eclipse.hara.ddiclient.integrationtest.utils.logCurrentFunctionName +import org.testng.annotations.AfterClass +import org.testng.annotations.Test +import kotlin.coroutines.cancellation.CancellationException + +class HaraClientStoppingTest : AbstractTest() { + + companion object { + const val TARGET_ID = "HaraClientStoppingTest" + } + + private val testScope = CoroutineScope(Dispatchers.Default) + private var haraScope = CoroutineScope(Dispatchers.Default) + + private val fiveSecondsDelayDownloadBehavior = object : DownloadBehavior { + override fun onAttempt(attempt: Int, artifactId: String, + previousError: Throwable?): DownloadBehavior.Try { + return DownloadBehavior.Try.After(5) + } + } + + private fun createMessageListener(channel: Channel): MessageListener { + return object : MessageListener { + override fun onMessage(message: MessageListener.Message) { + runBlocking { + "Received message: $message".internalLog() + channel.send(ExpectedMessage.HaraMessage(message)) + } + } + } + } + + + @AfterClass + override fun afterTest() { + super.afterTest() + setPollingTime("00:00:10") + } + + @Test(enabled = true, priority = 41, timeOut = 45_000, invocationCount = 1) + fun haraClientShouldStopPollingAfterBeingStoppedInDownloadingState() { + logCurrentFunctionName() + stoppingHaraClientWhileInDownloadingStateTestTemplate(TestUtils.OS_DISTRIBUTION_ID) + } + + @Test(enabled = true, priority = 42, timeOut = 45_000, invocationCount = 1) + fun haraClientShouldStopPollingAfterBeingStoppedInDownloadingStateForMultipleArtifacts() { + logCurrentFunctionName() + stoppingHaraClientWhileInDownloadingStateTestTemplate( + TestUtils.OS_WITH_APPS_DISTRIBUTION_ID) + } + + @Test(enabled = true, priority = 43, timeOut = 60_000, invocationCount = 1) + fun haraClientShouldStopPollingAfterBeingStoppedInUpdatingState() { + logCurrentFunctionName() + runTest { testJob -> + setPollingTime("00:00:05") + val expectedMessageChannel = Channel(5, BufferOverflow.DROP_OLDEST) + + val updater = object : Updater { + override fun apply(modules: Set, + messenger: Updater.Messenger): Updater.UpdateResult { + haraScope.launch { + "Applying long running fake update (8 sec) for modules: $modules".internalLog() + delay(8_000) + "Update applied".internalLog() + messenger.sendMessageToServer("Update applied") + } + return Updater.UpdateResult(true) + } + } + + haraClientStopTestTemplate( + updater = updater, + expectedMessageChannel = expectedMessageChannel, + ) { msg, testClient -> + if (msg is MessageListener.Message.State.Updating) { + testClient.stop() + "Client stopped".internalLog() + runBlockWhileEnsuringPollingIsNotDetected( + expectedMessageChannel = expectedMessageChannel) { + "waiting for 10 seconds to ensure that updating and polling is stopped".internalLog() + delay(10_000) // wait for update to finish + testJob?.cancel() + } + } + } + } + } + + @Test(enabled = true, priority = 44, timeOut = 45_000, invocationCount = 1) + fun haraClientShouldPollAfterRestartedInDownloadingState() { + logCurrentFunctionName() + runTest { testJob -> + + setPollingTime("00:00:10") + reCreateTestTargetOnServer(TARGET_ID) + assignHeavyOTAUpdateToTheTarget(TestUtils.OS_DISTRIBUTION_ID) + + val expectedMessage = mutableListOf() + val expectedMessageChannel = Channel(5, BufferOverflow.DROP_OLDEST) + + var testClient = createClient( + expectedMessageChannel, downloadBehavior = fiveSecondsDelayDownloadBehavior) + + testClient.startAsync() + + listenToMessages(expectedMessageChannel, testClient) { msg, _ -> + if (expectedMessage.isNotEmpty()) { + assertEquals(msg, expectedMessage.removeFirst().message) + testJob?.cancel() + } else if (msg is MessageListener.Message.Event.StartDownloadFile) { + testClient.stop() + "Client stopped".internalLog() + delay(2_000) + // The client should start polling after restarting the HaraClient + testClient = createClient(expectedMessageChannel) + expectedMessage.add( + ExpectedMessage.HaraMessage(MessageListener.Message.Event.Polling)) + testClient.startAsync() + } + } + } + } + + private fun stoppingHaraClientWhileInDownloadingStateTestTemplate(distributionId: Int) { + runTest { testJob -> + setPollingTime("00:00:05") + val expectedMessageChannel = Channel(5, BufferOverflow.DROP_OLDEST) + + haraClientStopTestTemplate( + downloadBehavior = fiveSecondsDelayDownloadBehavior, + expectedMessageChannel = expectedMessageChannel, + distributionId = distributionId + ) { msg, testClient -> + if (msg is MessageListener.Message.Event.StartDownloadFile) { + testClient.stop() + "Client stopped".internalLog() + runBlockWhileEnsuringPollingIsNotDetected( + expectedMessageChannel = expectedMessageChannel) { + "waiting for 10 seconds to ensure that download and polling is stopped".internalLog() + delay(10_000) + testJob?.cancel() + } + } + } + } + } + + private suspend fun haraClientStopTestTemplate( + downloadBehavior: DownloadBehavior = TestUtils.downloadBehavior, + expectedMessageChannel: Channel = + Channel(5, BufferOverflow.DROP_OLDEST), + updater: Updater = TestUtils.updater, + distributionId: Int = TestUtils.OS_DISTRIBUTION_ID, + onMessageReceive: suspend (MessageListener.Message, HaraClient) -> Unit + ) { + + reCreateTestTargetOnServer(TARGET_ID) + assignHeavyOTAUpdateToTheTarget(distributionId) + + val client = createClient( + expectedMessageChannel, downloadBehavior = downloadBehavior, updater = updater) + + client.startAsync() + + listenToMessages(expectedMessageChannel, client, onMessageReceive) + } + + private suspend fun listenToMessages( + expectedMessageChannel: Channel, + client: HaraClient, + onMessageReceive: suspend (MessageListener.Message, HaraClient) -> Unit) { + for (msg in expectedMessageChannel) { + if (msg is ExpectedMessage.HaraMessage) { + onMessageReceive(msg.message, client) + } + } + } + + private fun runTest(testBlock: suspend (Deferred?) -> Unit) { + runBlocking { + try { + var testJob: Deferred? = null + testJob = testScope.async(start = CoroutineStart.LAZY) { + launch { + // Catching exceptions in haraClient scope by checking if it is still active + while (haraScope.isActive) { + delay(100) + } + throw RuntimeException( + "Test failed: HaraClient scope is closed before the test is finished") + } + testBlock(testJob) + } + testJob.await() + } catch (ignored: CancellationException) { + } + } + } + + private suspend fun runBlockWhileEnsuringPollingIsNotDetected( + delay: Long = 1_000, + expectedMessageChannel: Channel, + block: suspend () -> Unit) { + testScope.launch { + block() + } + val job = testScope.async { + delay(delay) + for (msg in expectedMessageChannel) { + if (msg is ExpectedMessage.HaraMessage && msg.message is MessageListener.Message.Event.Polling) { + throw IllegalStateException( + "Test failed: Client is polling after being stopped") + } + } + } + try { + job.await() + } catch (ignored: CancellationException) { + } + } + + private fun createClient( + expectedMessageChannel: Channel, + downloadBehavior: DownloadBehavior = TestUtils.downloadBehavior, + updater: Updater = TestUtils.updater): HaraClient { + val messageListener = createMessageListener(expectedMessageChannel) + haraScope = CoroutineScope(Dispatchers.Default) + return clientFromTargetId( + downloadBehavior = downloadBehavior, updater = updater, + messageListeners = listOf(messageListener), + scope = haraScope).invoke(TARGET_ID) + } + + private suspend fun assignHeavyOTAUpdateToTheTarget( + distributionId: Int) { + val distribution = HawkbitAssignDistributionBody( + distributionId, AssignDistributionType.FORCED, 0) + assignDistributionToTheTarget(TARGET_ID, distribution) + } + +} diff --git a/src/test/kotlin/org/eclipse/hara/ddiclient/integrationtest/abstractions/AbstractHaraMessageTest.kt b/src/test/kotlin/org/eclipse/hara/ddiclient/integrationtest/abstractions/AbstractHaraMessageTest.kt index 6f75d21..f2439d5 100644 --- a/src/test/kotlin/org/eclipse/hara/ddiclient/integrationtest/abstractions/AbstractHaraMessageTest.kt +++ b/src/test/kotlin/org/eclipse/hara/ddiclient/integrationtest/abstractions/AbstractHaraMessageTest.kt @@ -114,13 +114,14 @@ abstract class AbstractHaraMessageTest : AbstractTest() { downloadBehavior: DownloadBehavior, okHttpClientBuilder: OkHttpClient.Builder, targetToken: String?, - gatewayToken: String?): (String) -> HaraClient { + gatewayToken: String?, + scope: CoroutineScope): (String) -> HaraClient { return super.clientFromTargetId( directoryDataProvider, configDataProvider, updater, listOf(messageListener), deploymentPermitProvider, downloadBehavior, okHttpClientBuilder, - targetToken, gatewayToken) + targetToken, gatewayToken, scope) } protected fun expectMessages(vararg messages: MessageListener.Message) { diff --git a/src/test/kotlin/org/eclipse/hara/ddiclient/integrationtest/abstractions/AbstractTest.kt b/src/test/kotlin/org/eclipse/hara/ddiclient/integrationtest/abstractions/AbstractTest.kt index cfe68d6..f53b7d4 100644 --- a/src/test/kotlin/org/eclipse/hara/ddiclient/integrationtest/abstractions/AbstractTest.kt +++ b/src/test/kotlin/org/eclipse/hara/ddiclient/integrationtest/abstractions/AbstractTest.kt @@ -13,7 +13,6 @@ package org.eclipse.hara.ddiclient.integrationtest.abstractions import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Deferred import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import kotlinx.coroutines.runBlocking @@ -44,10 +43,6 @@ abstract class AbstractTest { protected lateinit var managementApi: ManagementApi - private val throwableScope = CoroutineScope(Dispatchers.Default) - - private var throwableJob: Deferred? = null - protected var client: HaraClient? = null set(value) { safeStopClient() @@ -104,12 +99,13 @@ abstract class AbstractTest { } } - protected suspend fun assert(assertionBlock: () -> Unit) { - throwableJob = throwableScope.async { + protected suspend fun assert(assertionBlock: suspend () -> Unit) { + val throwableScope = CoroutineScope(Dispatchers.Default) + val throwableJob = throwableScope.async { assertionBlock() } try { - throwableJob?.await() + throwableJob.await() } catch (ignored: CancellationException) { } } @@ -124,7 +120,8 @@ abstract class AbstractTest { downloadBehavior: DownloadBehavior = TestUtils.downloadBehavior, okHttpClientBuilder: OkHttpClient.Builder = OkHttpClient.Builder().addOkhttpLogger(), targetToken: String? = "", - gatewayToken: String? = TestUtils.gatewayToken): (String) -> HaraClient = + gatewayToken: String? = TestUtils.gatewayToken, + scope: CoroutineScope = CoroutineScope(Dispatchers.Default)): (String) -> HaraClient = { targetId -> val clientData = HaraClientData( TestUtils.tenantName, @@ -142,7 +139,8 @@ abstract class AbstractTest { listOf(*messageListeners.toTypedArray()), listOf(updater), downloadBehavior, - httpBuilder = okHttpClientBuilder + httpBuilder = okHttpClientBuilder, + scope = scope ) client