Skip to content

Commit

Permalink
realtime: more cleanup (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxcom committed Dec 16, 2023
1 parent b1c646e commit d49fa27
Showing 1 changed file with 62 additions and 49 deletions.
111 changes: 62 additions & 49 deletions src/main/scala/ru/org/linux/realtime/RealtimeEventHub.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
package ru.org.linux.realtime

import akka.Done
import akka.actor.typed.Scheduler
import akka.actor.typed.{ActorRef, Behavior, PostStop, Scheduler}
import akka.actor.typed.scaladsl.AskPattern.Askable
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, PoisonPill, Props, SupervisorStrategy, Terminated, Timers}
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.{Actor, ActorLogging, ActorSystem, Props, SupervisorStrategy, Terminated, Timers}
import akka.util.Timeout
import com.typesafe.scalalogging.StrictLogging
import org.springframework.beans.factory.annotation.Qualifier
Expand All @@ -41,21 +42,24 @@ import scala.concurrent.duration.*
import scala.jdk.CollectionConverters.*
import scala.util.control.NonFatal


// TODO ignore list support
// TODO fix face conditions on simultaneous posting comment, subscription and missing processing
class RealtimeEventHub extends Actor with ActorLogging with Timers {
private val topicSubscriptions: mutable.MultiDict[Int, ActorRef] = mutable.MultiDict[Int, ActorRef]()
private val userSubscriptions: mutable.MultiDict[Int, ActorRef] = mutable.MultiDict[Int, ActorRef]()
private val sessions = new mutable.HashMap[String, ActorRef]
private val topicSubscriptions: mutable.MultiDict[Int, ActorRef[SessionProtocol]] = mutable.MultiDict[Int, ActorRef[SessionProtocol]]()
private val userSubscriptions: mutable.MultiDict[Int, ActorRef[SessionProtocol]] = mutable.MultiDict[Int, ActorRef[SessionProtocol]]()
private val sessions = new mutable.HashMap[String, ActorRef[SessionProtocol]]
private var maxDataSize: Int = 0

timers.startTimerWithFixedDelay(Tick, Tick, 5.minutes)

import akka.actor.typed.scaladsl.adapter.*

override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy

override def receive: Receive = {
case SessionStarted(session, user, replyTo) if !sessions.contains(session.getId) =>
val actor = context.actorOf(RealtimeSessionActor.props(session))
val actor: ActorRef[SessionProtocol] = context.spawnAnonymous(RealtimeSessionActor.behavior(session))
context.watch(actor)

sessions += (session.getId -> actor)
Expand All @@ -77,7 +81,8 @@ class RealtimeEventHub extends Actor with ActorLogging with Timers {
topicSubscriptions += (topic -> actor)

replyTo ! Done
case Terminated(actorRef) =>
case Terminated(untypedRef) =>
val actorRef: ActorRef[SessionProtocol] = untypedRef
log.debug(s"RealtimeSessionActor $actorRef terminated")

topicSubscriptions.sets.find(_._2.contains(actorRef)).foreach { case (msgid, _) =>
Expand All @@ -96,7 +101,7 @@ class RealtimeEventHub extends Actor with ActorLogging with Timers {
sessions.get(id) foreach { actor =>
log.debug("Session was terminated, stopping actor")

actor ! PoisonPill
actor ! TerminateSession
}
case msg@NewComment(msgid, _) =>
log.debug(s"New comment in topic $msgid")
Expand All @@ -117,16 +122,20 @@ class RealtimeEventHub extends Actor with ActorLogging with Timers {
}

object RealtimeEventHub {
sealed trait SessionProtocol

sealed trait Protocol

case class NewComment(msgid: Int, cid: Int) extends Protocol
case class RefreshEvents(users: Set[Int]) extends Protocol
private[realtime] case object Tick extends Protocol
case class NewComment(msgid: Int, cid: Int) extends SessionProtocol with Protocol
case class RefreshEvents(users: Set[Int]) extends SessionProtocol with Protocol
private[realtime] case object Tick extends SessionProtocol with Protocol

private[realtime] case class SessionStarted(session: WebSocketSession, user: Option[Int], replyTo: akka.actor.typed.ActorRef[Done.type]) extends Protocol
private[realtime] case class SubscribeTopic(session: WebSocketSession, topic: Int, replyTo: akka.actor.typed.ActorRef[Done.type]) extends Protocol
private[realtime] case class SessionStarted(session: WebSocketSession, user: Option[Int], replyTo: ActorRef[Done.type]) extends Protocol
private[realtime] case class SubscribeTopic(session: WebSocketSession, topic: Int, replyTo: ActorRef[Done.type]) extends Protocol
private[realtime] case class SessionTerminated(session: String) extends Protocol

private[realtime] case object TerminateSession extends SessionProtocol

def props: Props = Props(new RealtimeEventHub())

private[realtime] def notifyComment(session: WebSocketSession, comment: Int): Unit = {
Expand All @@ -137,49 +146,53 @@ object RealtimeEventHub {
session.sendMessage(new TextMessage(s"events-refresh"))
}

def notifyEvents(realtimeEventHub: akka.actor.typed.ActorRef[RefreshEvents], users: java.lang.Iterable[Integer]): Unit = {
def notifyEvents(realtimeEventHub: ActorRef[RefreshEvents], users: java.lang.Iterable[Integer]): Unit = {
realtimeEventHub ! RefreshEvents(users.asScala.map(_.toInt).toSet)
}
}

class RealtimeSessionActor(session: WebSocketSession) extends Actor with ActorLogging with Timers {
timers.startTimerWithFixedDelay(Tick, Tick, initialDelay = 5.seconds, delay = 1.minute)

override def receive: Receive = {
case NewComment(_, cid) =>
try {
notifyComment(session, cid)
} catch handleExceptions

case RefreshEvents(_) =>
try {
notifyEvent(session)
} catch handleExceptions
case Tick =>
// log.debug("Sending keepalive")
try {
session.sendMessage(new PingMessage())
} catch handleExceptions
}

private def handleExceptions: PartialFunction[Throwable, Unit] = {
case ex: IOException =>
log.debug(s"Terminated by IOException ${ex.toString}")
context.stop(self)
}
object RealtimeSessionActor {
def behavior(session: WebSocketSession): Behavior[SessionProtocol] = Behaviors.setup { context =>
Behaviors.withTimers { timers =>
timers.startTimerWithFixedDelay(Tick, Tick, initialDelay = 5.seconds, delay = 1.minute)

def handleExceptions: PartialFunction[Throwable, Behavior[SessionProtocol]] = {
case ex: IOException =>
context.log.debug(s"Terminated by IOException ${ex.toString}")
Behaviors.stopped
}

@scala.throws[Exception](classOf[Exception])
override def postStop(): Unit = {
session.close()
Behaviors.receiveMessage[SessionProtocol] {
case NewComment(_, cid) =>
try {
notifyComment(session, cid)
Behaviors.same
} catch handleExceptions

case RefreshEvents(_) =>
try {
notifyEvent(session)
Behaviors.same
} catch handleExceptions
case Tick =>
// log.debug("Sending keepalive")
try {
session.sendMessage(new PingMessage())
Behaviors.same
} catch handleExceptions
case TerminateSession =>
Behaviors.stopped
}.receiveSignal {
case (_, PostStop) =>
session.close()
Behaviors.same
}
}
}
}

object RealtimeSessionActor {
def props(session: WebSocketSession): Props = Props(new RealtimeSessionActor(session))
}

@Service
class RealtimeWebsocketHandler(@Qualifier("realtimeHubWS") hub: akka.actor.typed.ActorRef[Protocol],
class RealtimeWebsocketHandler(@Qualifier("realtimeHubWS") hub: ActorRef[Protocol],
topicDao: TopicDao, commentService: CommentReadService,
actorSystem: ActorSystem) extends TextWebSocketHandler
with StrictLogging {
Expand Down Expand Up @@ -259,10 +272,10 @@ class RealtimeWebsocketHandler(@Qualifier("realtimeHubWS") hub: akka.actor.typed
@Configuration
class RealtimeConfigurationBeans(actorSystem: ActorSystem) {
@Bean(Array("realtimeHubWS"))
def hub: akka.actor.typed.ActorRef[Protocol] = {
def hub: ActorRef[Protocol] = {
import akka.actor.typed.scaladsl.adapter.*

actorSystem.actorOf(RealtimeEventHub.props)
actorSystem.actorOf(RealtimeEventHub.props, "realtimeHubWS")
}
}

Expand Down

0 comments on commit d49fa27

Please sign in to comment.