Skip to content

Commit

Permalink
Fix crash when stopping haraClient in Downloading/Updating state
Browse files Browse the repository at this point in the history
This commit fixes the crash when haraClient is stopped during the
 Downloading/Updating state due to actors were not closed properly.
 (Reported in #54)

UF-930
Signed-off-by: Saeed Rezaee <saeed.rezaee@kynetics.it>
  • Loading branch information
SaeedRe committed Aug 5, 2024
1 parent 3fafdce commit be98c33
Show file tree
Hide file tree
Showing 12 changed files with 383 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,24 @@ abstract class AbstractActor protected constructor(private val actorScope: Actor

private var __receive__: Receive = EmptyReceive

private val childs: MutableMap<String, ActorRef> = emptyMap<String, ActorRef>().toMutableMap()
private val children: MutableMap<String, ActorRef> = emptyMap<String, ActorRef>().toMutableMap()

protected fun child(name: String) = childs[name]
protected fun child(name: String) = children[name]

protected fun become(receive: Receive) { __receive__ = receive }

protected val LOG = LoggerFactory.getLogger(this::class.java)

protected fun unhandled(msg: Any) {
protected suspend fun handleMsgDefault(msg: Any) {
when (msg) {
is ConnectionManager.Companion.Message.In.Stop -> {
stopActor()
}
else -> unhandled(msg)
}
}

private fun unhandled(msg: Any) {
if (LOG.isWarnEnabled) {
LOG.warn("received unexpected message $msg in ${coroutineContext[CoroutineName]} actor")
}
Expand All @@ -59,22 +68,22 @@ abstract class AbstractActor protected constructor(private val actorScope: Actor

protected val name: String = coroutineContext[CoroutineName]!!.name

protected open suspend fun stopActor() {
forEachActorNode { it.send(ConnectionManager.Companion.Message.In.Stop) }
channel.close()
}

protected open fun beforeCloseChannel() {
childs.forEach { (_, c) -> c.close() }
children.forEach { (_, c) -> c.close() }
}

protected fun forEachActorNode(ope: (ActorRef) -> Unit) {
childs.forEach { (_, actorRef) -> ope(actorRef) }
protected suspend fun forEachActorNode(ope: suspend (ActorRef) -> Unit) {
children.forEach { (_, actorRef) -> ope(actorRef) }
}

override val channel: Channel<Any> = object : Channel<Any> by actorScope.channel {
override suspend fun send(element: Any) {
if(actorScope.channel.isClosedForSend){
LOG.debug("Channel is close for send. Message {} isn't sent to actor {}.", element.javaClass.simpleName, name)
} else {
LOG.debug("Send message {} to actor {}.", element.javaClass.simpleName, name)
actorScope.channel.send(element)
}
actorScope.channel.sendMessageToChannelIfOpen(element, name)
}

override fun close(cause: Throwable?): Boolean {
Expand All @@ -94,7 +103,7 @@ abstract class AbstractActor protected constructor(private val actorScope: Actor
val childRef = actorScope.actor<Any>(
Dispatchers.IO.plus(CoroutineName(name)).plus(ParentActor(this.channel)).plus(context),
capacity, start, onCompletion) { __workflow__(LOG, block)() }
childs.put(name, childRef)
children.put(name, childRef)
return childRef
}

Expand Down Expand Up @@ -166,3 +175,17 @@ data class ParentActor(val ref: ActorRef) : AbstractCoroutineContextElement(Pare

class ActorException(val actorName: String, val actorRef: ActorRef, throwable: Throwable) : Exception(throwable)
class ActorCreationException(val actorName: String, val actorRef: ActorRef, throwable: Throwable) : Exception(throwable)


@OptIn(ExperimentalCoroutinesApi::class)
suspend fun SendChannel<Any>.sendMessageToChannelIfOpen(message: Any,
name: String = this.toString()) {
val logger = LoggerFactory.getLogger(this::class.java)
if (isClosedForSend) {
logger.debug("Channel is close for send. Message {} isn't sent to actor {}.",
message.javaClass.simpleName, name)
} else {
logger.debug("Send message {} to actor {}.", message.javaClass.simpleName, name)
send(message)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
LOG.warn("ErrMsg. Not yet implemented")
}

else -> unhandled(msg)
else -> handleMsgDefault(msg)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
}

init{
CoroutineScope(Dispatchers.IO).launch{
scope.launch{

val isSuccessful: (Response<Unit>) -> Boolean = { response ->
when(response.code()){
Expand All @@ -72,7 +72,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
channel.send(In.Ping)
}
if (msg is In.DeploymentFeedback) {
notificationManager.send(MessageListener.Message.Event
notificationManager.sendMessageToChannelIfOpen(MessageListener.Message.Event
.DeployFeedbackRequestResult(success, msg.feedback.id,
msg.closeAction, msg.feedback.status.details))
}
Expand Down Expand Up @@ -113,15 +113,13 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {

is In.Start -> become(runningReceive(startPing(state)))

is In.Stop -> {}

is In.Register -> become(stoppedReceive(state.withReceiver(msg.listener)))

is In.Unregister -> become(stoppedReceive(state.withoutReceiver(msg.listener)))

is In.SetPing -> become(stoppedReceive(state.copy(clientPingInterval = msg.duration, lastPing = Instant.EPOCH)))

else -> unhandled(msg)
else -> handleMsgDefault(msg)
}
}

Expand All @@ -130,7 +128,10 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {

is In.Start -> {}

is In.Stop -> become(stoppedReceive(stopPing(state)))
is In.Stop -> {
become(stoppedReceive(stopPing(state)))
stopActor()
}

is In.Register -> become(runningReceive(state.withReceiver(msg.listener)))

Expand Down Expand Up @@ -158,7 +159,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
}

else -> {
unhandled(msg)
handleMsgDefault(msg)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
stopUpdateAndNotify(msg)
}

else -> unhandled(msg)
else -> handleMsgDefault(msg)
}
}

Expand All @@ -61,7 +61,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
is CancelForced -> {
stopUpdate()
}
else -> unhandled(msg)
else -> handleMsgDefault(msg)
}
}

Expand Down Expand Up @@ -95,7 +95,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
is CancelForced -> {
LOG.info("Force cancel ignored")
}
else -> unhandled(msg)
else -> handleMsgDefault(msg)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
}
}

else -> unhandled(msg)
else -> handleMsgDefault(msg)
}
}

Expand Down Expand Up @@ -196,7 +196,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
stopUpdate()
}

else -> unhandled(msg)
else -> handleMsgDefault(msg)
}
}

Expand All @@ -221,7 +221,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
Status.ERROR, "Failed to download file with md5 ${msg.md5} due to ${msg.message}", msg.message)
}

else -> unhandled(msg)
else -> handleMsgDefault(msg)
}
}

Expand Down Expand Up @@ -323,12 +323,6 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
}
}

//todo remove FileDownloader.Companion.Message.Stop message and use default implementation of beforeCloseChannel
@OptIn(ExperimentalCoroutinesApi::class)
override fun beforeCloseChannel() {
forEachActorNode { actorRef -> if(!actorRef.isClosedForSend) launch { actorRef.send(FileDownloader.Companion.Message.Stop) } }
}

init {
become(beforeStartReceive())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import java.lang.StringBuilder
import java.security.DigestInputStream
import java.security.MessageDigest
import kotlin.time.DurationUnit
import kotlin.time.ExperimentalTime
import kotlin.time.toDuration

@OptIn(ObsoleteCoroutinesApi::class)
Expand All @@ -40,6 +39,8 @@ private constructor(
private val downloadBehavior: DownloadBehavior = coroutineContext[HaraClientContext]!!.downloadBehavior
private val notificationManager = coroutineContext[NMActor]!!.ref
private val connectionManager = coroutineContext[CMActor]!!.ref
private var downloadJob: Job? = null
private var downloadScope: CoroutineScope = CoroutineScope(Dispatchers.IO)

private fun beforeStart(state: State): Receive = { msg ->
when (msg) {
Expand All @@ -54,9 +55,7 @@ private constructor(
}
}

is Message.Stop -> this.cancel()

else -> unhandled(msg)
else -> handleMsgDefault(msg)
}
}

Expand Down Expand Up @@ -84,21 +83,18 @@ private constructor(
tryDownload(newState, msg.cause)
}

is Message.Stop -> this.cancel()

else -> unhandled(msg)
else -> handleMsgDefault(msg)
}
}

@OptIn(ExperimentalTime::class)
private suspend fun tryDownload(state: State, error:Throwable? = null) = withContext(Dispatchers.IO){
private suspend fun tryDownload(state: State, error:Throwable? = null) {

when(val tryDownload = downloadBehavior.onAttempt(state.currentAttempt, "${state.actionId}-${fileToDownload.md5}", error)){

is DownloadBehavior.Try.Stop -> channel.send(Message.TrialExhausted)

is DownloadBehavior.Try.After -> {
launch {
downloadJob = downloadScope.launch {
if(error != null){
val errorMessage = "Retry download of ${fileToDownload.fileName} due to: $error. The download will start in ${tryDownload.seconds.toDuration(DurationUnit.SECONDS)}."
parent!!.send(Message.Info(channel, fileToDownload.md5, errorMessage))
Expand Down Expand Up @@ -136,14 +132,27 @@ private constructor(
val timer = checkDownloadProgress(inputStream, queue, actionId)

runCatching {
file.outputStream().use {
inputStream.copyTo(it)
inputStream.use { inputStream ->
file.outputStream().use { outputStream ->
val buffer = ByteArray(DEFAULT_BUFFER_SIZE)
var progressBytes = 0L
var bytes = inputStream.read(buffer)
while (bytes >= 0) {
if (!downloadScope.isActive) {
LOG.info("Download of ${fileToDownload.fileName} was cancelled")
return
}
outputStream.write(buffer, 0, bytes)
progressBytes += bytes
bytes = inputStream.read(buffer)
}
}
}
}.also {
timer.purge()
timer.cancel()
}.onFailure {
throw it
throw it
}

}
Expand Down Expand Up @@ -209,6 +218,19 @@ private constructor(
}
}

override suspend fun stopActor() {
runCatching {
downloadJob?.let {
LOG.debug("Cancelling download job $it")
if (it.isActive) {
it.cancel()
downloadScope.cancel()
}
}
}
super.stopActor()
}

private fun State.nextAttempt():Int = if (currentAttempt == Int.MAX_VALUE) currentAttempt else currentAttempt + 1

init {
Expand Down Expand Up @@ -244,7 +266,6 @@ private constructor(
sealed class Message {

object Start : Message()
object Stop : Message()

object FileDownloaded : Message()
object FileChecked : Message()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {

is MessageListener.Message -> listeners.forEach { it.onMessage(msg) }

else -> unhandled(msg)
else -> handleMsgDefault(msg)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
when (msg) {
is In.Start, In.ForcePing -> child("connectionManager")!!.send(msg)

is In.Stop -> {
child("connectionManager")!!.send(msg)
channel.close()
}

else -> unhandled(msg)
else -> handleMsgDefault(msg)
}
}

Expand Down
Loading

0 comments on commit be98c33

Please sign in to comment.