Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix hara client stopping crash #54 #55

Merged
merged 2 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ To build this project from source:
./gradlew assemble
```

to build this project and run the tests (`docker 19.03.0+` and `docker-compose 1.27.0+` required):
to build this project and run the tests (`docker compose v2` required):

```shell
./gradlew build
Expand Down
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ task stopHawkbitServer() {
if (!keep_test_container_alive) {
exec {
workingDir 'docker/test/'
commandLine 'docker-compose', 'down'
commandLine 'docker', 'compose', 'down'
}
}
}
Expand All @@ -253,14 +253,14 @@ task restartHawkbitServer() {
doFirst {
exec {
workingDir 'docker/test/'
commandLine 'docker-compose', 'down'
commandLine 'docker', 'compose', 'down'
}
}

doLast{
exec {
workingDir 'docker/test/'
commandLine 'docker-compose', 'up', '--detach'
commandLine 'docker', 'compose', 'up', '--detach'
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion docker/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# LAUNCH UPDATE-SERVER

```$shell
$docker-compose up
$docker compose up
```

# DATA
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import org.eclipse.hara.ddiclient.api.actors.ConnectionManager
import org.eclipse.hara.ddiclient.api.actors.RootActor
import org.eclipse.hara.ddiclient.api.actors.HaraClientContext
import okhttp3.OkHttpClient
import org.eclipse.hara.ddiclient.api.actors.sendMessageToChannelIfOpen
import org.slf4j.LoggerFactory

class HaraClientDefaultImpl : HaraClient {

private val LOG = LoggerFactory.getLogger(this::class.java)
private var rootActor: ActorRef? = null

private val debouncingForcePingChannel: Channel<ConnectionManager.Companion.Message.In.ForcePing> =
Expand Down Expand Up @@ -66,13 +69,20 @@ class HaraClientDefaultImpl : HaraClient {

@OptIn(ExperimentalCoroutinesApi::class)
override fun stop() = runBlocking {
rootActor!!.send(ConnectionManager.Companion.Message.In.Stop)
rootActor!!.sendMessageToChannelIfOpen(ConnectionManager.Companion.Message.In.Stop)
if(!debouncingForcePingChannel.isClosedForSend){
debouncingForcePingChannel.close()
}
}

override fun forcePing() = runBlocking { debouncingForcePingChannel.send(ConnectionManager.Companion.Message.In.ForcePing) }
@OptIn(ExperimentalCoroutinesApi::class)
override fun forcePing() = runBlocking {
if (!debouncingForcePingChannel.isClosedForSend) {
debouncingForcePingChannel.send(ConnectionManager.Companion.Message.In.ForcePing)
} else {
LOG.warn("HaraClient is stopped. Cannot force ping.")
}
}

companion object{
const val FORCE_PING_DEBOUNCING_TIME = 30_000L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,24 @@ abstract class AbstractActor protected constructor(private val actorScope: Actor

private var __receive__: Receive = EmptyReceive

private val childs: MutableMap<String, ActorRef> = emptyMap<String, ActorRef>().toMutableMap()
private val children: MutableMap<String, ActorRef> = emptyMap<String, ActorRef>().toMutableMap()

protected fun child(name: String) = childs[name]
protected fun child(name: String) = children[name]

protected fun become(receive: Receive) { __receive__ = receive }

protected val LOG = LoggerFactory.getLogger(this::class.java)

protected fun unhandled(msg: Any) {
protected suspend fun handleMsgDefault(msg: Any) {
when (msg) {
is ConnectionManager.Companion.Message.In.Stop -> {
stopActor()
}
else -> unhandled(msg)
}
}

private fun unhandled(msg: Any) {
if (LOG.isWarnEnabled) {
LOG.warn("received unexpected message $msg in ${coroutineContext[CoroutineName]} actor")
}
Expand All @@ -59,22 +68,22 @@ abstract class AbstractActor protected constructor(private val actorScope: Actor

protected val name: String = coroutineContext[CoroutineName]!!.name

protected open suspend fun stopActor() {
forEachActorNode { it.send(ConnectionManager.Companion.Message.In.Stop) }
channel.close()
}

protected open fun beforeCloseChannel() {
childs.forEach { (_, c) -> c.close() }
children.forEach { (_, c) -> c.close() }
}

protected fun forEachActorNode(ope: (ActorRef) -> Unit) {
childs.forEach { (_, actorRef) -> ope(actorRef) }
protected suspend fun forEachActorNode(ope: suspend (ActorRef) -> Unit) {
children.forEach { (_, actorRef) -> ope(actorRef) }
}

override val channel: Channel<Any> = object : Channel<Any> by actorScope.channel {
override suspend fun send(element: Any) {
if(actorScope.channel.isClosedForSend){
LOG.debug("Channel is close for send. Message {} isn't sent to actor {}.", element.javaClass.simpleName, name)
} else {
LOG.debug("Send message {} to actor {}.", element.javaClass.simpleName, name)
actorScope.channel.send(element)
}
actorScope.channel.sendMessageToChannelIfOpen(element, name)
}

override fun close(cause: Throwable?): Boolean {
Expand All @@ -94,7 +103,7 @@ abstract class AbstractActor protected constructor(private val actorScope: Actor
val childRef = actorScope.actor<Any>(
Dispatchers.IO.plus(CoroutineName(name)).plus(ParentActor(this.channel)).plus(context),
capacity, start, onCompletion) { __workflow__(LOG, block)() }
childs.put(name, childRef)
children.put(name, childRef)
return childRef
}

Expand Down Expand Up @@ -166,3 +175,17 @@ data class ParentActor(val ref: ActorRef) : AbstractCoroutineContextElement(Pare

class ActorException(val actorName: String, val actorRef: ActorRef, throwable: Throwable) : Exception(throwable)
class ActorCreationException(val actorName: String, val actorRef: ActorRef, throwable: Throwable) : Exception(throwable)


@OptIn(ExperimentalCoroutinesApi::class)
suspend fun SendChannel<Any>.sendMessageToChannelIfOpen(message: Any,
name: String = this.toString()) {
val logger = LoggerFactory.getLogger(this::class.java)
if (isClosedForSend) {
logger.debug("Channel is close for send. Message {} isn't sent to actor {}.",
message.javaClass.simpleName, name)
} else {
logger.debug("Send message {} to actor {}.", message.javaClass.simpleName, name)
send(message)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
LOG.warn("ErrMsg. Not yet implemented")
}

else -> unhandled(msg)
else -> handleMsgDefault(msg)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
}

init{
CoroutineScope(Dispatchers.IO).launch{
scope.launch{

val isSuccessful: (Response<Unit>) -> Boolean = { response ->
when(response.code()){
Expand All @@ -72,7 +72,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
channel.send(In.Ping)
}
if (msg is In.DeploymentFeedback) {
notificationManager.send(MessageListener.Message.Event
notificationManager.sendMessageToChannelIfOpen(MessageListener.Message.Event
.DeployFeedbackRequestResult(success, msg.feedback.id,
msg.closeAction, msg.feedback.status.details))
}
Expand Down Expand Up @@ -113,15 +113,13 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {

is In.Start -> become(runningReceive(startPing(state)))

is In.Stop -> {}

is In.Register -> become(stoppedReceive(state.withReceiver(msg.listener)))

is In.Unregister -> become(stoppedReceive(state.withoutReceiver(msg.listener)))

is In.SetPing -> become(stoppedReceive(state.copy(clientPingInterval = msg.duration, lastPing = Instant.EPOCH)))

else -> unhandled(msg)
else -> handleMsgDefault(msg)
}
}

Expand All @@ -130,7 +128,10 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {

is In.Start -> {}

is In.Stop -> become(stoppedReceive(stopPing(state)))
is In.Stop -> {
become(stoppedReceive(stopPing(state)))
stopActor()
}

is In.Register -> become(runningReceive(state.withReceiver(msg.listener)))

Expand Down Expand Up @@ -158,7 +159,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
}

else -> {
unhandled(msg)
handleMsgDefault(msg)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
stopUpdateAndNotify(msg)
}

else -> unhandled(msg)
else -> handleMsgDefault(msg)
}
}

Expand All @@ -61,7 +61,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
is CancelForced -> {
stopUpdate()
}
else -> unhandled(msg)
else -> handleMsgDefault(msg)
}
}

Expand Down Expand Up @@ -95,7 +95,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
is CancelForced -> {
LOG.info("Force cancel ignored")
}
else -> unhandled(msg)
else -> handleMsgDefault(msg)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
}
}

else -> unhandled(msg)
else -> handleMsgDefault(msg)
}
}

Expand Down Expand Up @@ -196,7 +196,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
stopUpdate()
}

else -> unhandled(msg)
else -> handleMsgDefault(msg)
}
}

Expand All @@ -221,7 +221,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
Status.ERROR, "Failed to download file with md5 ${msg.md5} due to ${msg.message}", msg.message)
}

else -> unhandled(msg)
else -> handleMsgDefault(msg)
}
}

Expand Down Expand Up @@ -323,12 +323,6 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
}
}

//todo remove FileDownloader.Companion.Message.Stop message and use default implementation of beforeCloseChannel
@OptIn(ExperimentalCoroutinesApi::class)
override fun beforeCloseChannel() {
forEachActorNode { actorRef -> if(!actorRef.isClosedForSend) launch { actorRef.send(FileDownloader.Companion.Message.Stop) } }
}

init {
become(beforeStartReceive())
}
Expand Down
Loading
Loading