Skip to content

Commit

Permalink
Redis Client monitoring (#522)
Browse files Browse the repository at this point in the history
* redis cluster monitoring

* redis master slave monitoring

---------

Co-authored-by: Mikołaj Bul <m.bul@avsystem.com>
Co-authored-by: pzareba <p.zareba@avsystem.com>
  • Loading branch information
3 people authored Dec 20, 2023
1 parent 54c276e commit 6c254f4
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import com.avsystem.commons.redis.actor.RedisConnectionActor.PacksResult
import com.avsystem.commons.redis.commands.{Asking, SlotRange}
import com.avsystem.commons.redis.config.{ClusterConfig, ExecutionConfig}
import com.avsystem.commons.redis.exception._
import com.avsystem.commons.redis.monitoring.ClusterStateObserver
import com.avsystem.commons.redis.protocol._
import com.avsystem.commons.redis.util.DelayedFuture

Expand Down Expand Up @@ -41,10 +42,13 @@ import scala.concurrent.duration._
*
* @param seedNodes nodes used to fetch initial cluster state from. You don't need to list all cluster nodes, it is
* only required that at least one of the seed nodes is available during startup.
* @param config client configuration - [[ClusterConfig]]
* @param clusterStateObserver optional observer for monitoring client's state and connections - [[ClusterStateObserver]]
*/
final class RedisClusterClient(
val seedNodes: Seq[NodeAddress] = List(NodeAddress.Default),
val config: ClusterConfig = ClusterConfig()
val config: ClusterConfig = ClusterConfig(),
val clusterStateObserver: OptArg[ClusterStateObserver] = OptArg.Empty,
)(implicit system: ActorSystem) extends RedisClient with RedisKeyedExecutor {

require(seedNodes.nonEmpty, "No seed nodes provided")
Expand All @@ -59,7 +63,14 @@ final class RedisClusterClient(
@volatile private[this] var failure = Opt.empty[Throwable]

private val initPromise = Promise[Unit]()
initPromise.future.foreachNow(_ => initSuccess = true)

initPromise.future.onCompleteNow {
case Success(_) =>
clusterStateObserver.foreach(_.onClusterInitialized())
initSuccess = true
case Failure(_) =>
clusterStateObserver.foreach(_.onClusterInitFailure())
}

private def ifReady[T](code: => Future[T]): Future[T] = failure match {
case Opt.Empty if initSuccess => code
Expand Down Expand Up @@ -95,7 +106,7 @@ final class RedisClusterClient(
}

private val monitoringActor =
system.actorOf(Props(new ClusterMonitoringActor(seedNodes, config, initPromise.failure, onNewState, onTemporaryClient)))
system.actorOf(Props(new ClusterMonitoringActor(seedNodes, config, initPromise.failure, onNewState, onTemporaryClient, clusterStateObserver)))

private def determineSlot(pack: RawCommandPack): Int = {
var slot = -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.avsystem.commons.redis.exception.ClientStoppedException
*/
final class RedisConnectionClient(
val address: NodeAddress = NodeAddress.Default,
val config: ConnectionConfig = ConnectionConfig()
val config: ConnectionConfig = ConnectionConfig(),
)
(implicit system: ActorSystem) extends RedisClient with RedisConnectionExecutor { self =>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,24 @@ import com.avsystem.commons.redis.actor.RedisConnectionActor.PacksResult
import com.avsystem.commons.redis.actor.SentinelsMonitoringActor
import com.avsystem.commons.redis.config.{ExecutionConfig, MasterSlaveConfig}
import com.avsystem.commons.redis.exception.{ClientStoppedException, NoMasterException, NodeRemovedException}
import com.avsystem.commons.redis.monitoring.SentinelStateObserver
import com.avsystem.commons.redis.protocol.{ErrorMsg, RedisReply, TransactionReply}
import com.avsystem.commons.redis.util.DelayedFuture

/**
* Redis client implementation for master-slave installations with Redis Sentinels.
* [[RedisMasterSlaveClient]] is able to execute the same set of commands as [[RedisNodeClient]].
*
* @param masterName name of the master, as configured in the sentinels
* @param seedSentinels sentinel seed addresses - must point to at least one reachable sentinel
* @param masterName name of the master, as configured in the sentinels
* @param seedSentinels sentinel seed addresses - must point to at least one reachable sentinel
* @param config client configuration - [[MasterSlaveConfig]]
* @param sentinelStateObserver optional observer of client's state and connections - [[SentinelStateObserver]]
*/
final class RedisMasterSlaveClient(
val masterName: String,
val seedSentinels: Seq[NodeAddress] = Seq(NodeAddress.DefaultSentinel),
val config: MasterSlaveConfig = MasterSlaveConfig()
val config: MasterSlaveConfig = MasterSlaveConfig(),
val sentinelStateObserver: OptArg[SentinelStateObserver] = OptArg.Empty,
)(implicit system: ActorSystem) extends RedisClient with RedisNodeExecutor {

require(seedSentinels.nonEmpty, "No seed sentinel nodes provided")
Expand All @@ -45,6 +49,7 @@ final class RedisMasterSlaveClient(
private def onNewMaster(newMaster: RedisNodeClient): Unit = {
master = newMaster
masterListener(master)
sentinelStateObserver.foreach(_.onMasterChange(newMaster.address))
if (!initSuccess) {
import system.dispatcher
newMaster.initialized.onComplete { result =>
Expand All @@ -58,7 +63,7 @@ final class RedisMasterSlaveClient(
}

private val monitoringActor =
system.actorOf(Props(new SentinelsMonitoringActor(masterName, seedSentinels, config, initPromise.failure, onNewMaster)))
system.actorOf(Props(new SentinelsMonitoringActor(masterName, seedSentinels, config, initPromise.failure, onNewMaster, sentinelStateObserver)))

def setMasterListener(listener: RedisNodeClient => Unit)(implicit executor: ExecutionContext): Unit =
masterListener = newMaster => executor.execute(jRunnable(listener(newMaster)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.avsystem.commons.redis.actor.RedisConnectionActor.PacksResult
import com.avsystem.commons.redis.commands.{NodeInfo, SlotRange, SlotRangeMapping}
import com.avsystem.commons.redis.config.ClusterConfig
import com.avsystem.commons.redis.exception.{ClusterInitializationException, ErrorReplyException}
import com.avsystem.commons.redis.monitoring.ClusterStateObserver
import com.avsystem.commons.redis.util.{ActorLazyLogging, SingletonSeq}

import scala.collection.mutable
Expand All @@ -19,26 +20,23 @@ final class ClusterMonitoringActor(
config: ClusterConfig,
onClusterInitFailure: Throwable => Any,
onNewClusterState: ClusterState => Any,
onTemporaryClient: RedisNodeClient => Any
onTemporaryClient: RedisNodeClient => Any,
clusterStateObserver: OptArg[ClusterStateObserver] = OptArg.Empty,
) extends Actor with ActorLazyLogging {

import ClusterMonitoringActor._
import context._

private def createConnection(addr: NodeAddress): ActorRef =
actorOf(Props(new RedisConnectionActor(addr, config.monitoringConnectionConfigs(addr))))
actorOf(Props(new RedisConnectionActor(addr, config.monitoringConnectionConfigs(addr), clusterStateObserver)))

private def getConnection(addr: NodeAddress, seed: Boolean): ActorRef =
connections.getOrElse(addr, {
openConnection(addr, seed)
getConnection(addr, seed)
})
connections.getOrElse(addr, openConnection(addr, seed))

private def openConnection(addr: NodeAddress, seed: Boolean): Future[Unit] = {
val initPromise = Promise[Unit]()
private def openConnection(addr: NodeAddress, seed: Boolean): ActorRef = {
val connection = connections.getOrElseUpdate(addr, createConnection(addr))
connection ! RedisConnectionActor.Open(seed, initPromise)
initPromise.future
connection ! RedisConnectionActor.Open(seed, Promise[Unit]())
connection
}

private def createClient(addr: NodeAddress, clusterNode: Boolean = true) =
Expand Down Expand Up @@ -93,6 +91,7 @@ final class ClusterMonitoringActor(
case pr: PacksResult => Try(StateRefresh.decodeReplies(pr)) match {
case Success((slotRangeMapping, NodeInfosWithMyself(nodeInfos, thisNodeInfo)))
if thisNodeInfo.configEpoch >= lastEpoch =>
clusterStateObserver.foreach(_.onClusterRefresh())

lastEpoch = thisNodeInfo.configEpoch

Expand Down Expand Up @@ -137,7 +136,7 @@ final class ClusterMonitoringActor(
}

case Success(_) =>
// obsolete cluster state, ignore
// obsolete cluster state, ignore

case Failure(err: ErrorReplyException)
if state.isEmpty && seedNodes.size == 1 && config.fallbackToSingleNode &&
Expand All @@ -157,6 +156,7 @@ final class ClusterMonitoringActor(

case Failure(cause) =>
log.error("Failed to refresh cluster state", cause)
clusterStateObserver.foreach(_.onClusterRefreshFailure())
if (state.isEmpty) {
seedFailures += cause
if (seedFailures.size == seedNodes.size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.avsystem.commons.redis._
import com.avsystem.commons.redis.commands.{PubSubCommand, PubSubEvent, ReplyDecoders}
import com.avsystem.commons.redis.config.ConnectionConfig
import com.avsystem.commons.redis.exception._
import com.avsystem.commons.redis.monitoring.{ConnectionState, ConnectionStateObserver}
import com.avsystem.commons.redis.protocol.{RedisMsg, RedisReply, ValidRedisMsg}
import com.avsystem.commons.redis.util.ActorLazyLogging

Expand All @@ -20,8 +21,11 @@ import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.util.Random

final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
extends Actor with ActorLazyLogging { actor =>
final class RedisConnectionActor(
address: NodeAddress,
config: ConnectionConfig,
connectionStateObserver: OptArg[ConnectionStateObserver] = OptArg.Empty,
) extends Actor with ActorLazyLogging { actor =>

import RedisConnectionActor._
import context._
Expand Down Expand Up @@ -56,7 +60,7 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
handlePacks(packs)
case open: Open =>
onOpen(open)
become(connecting(config.reconnectionStrategy, Opt.Empty))
become(watchedConnecting(config.reconnectionStrategy, Opt.Empty))
self ! Connect
}

Expand Down Expand Up @@ -95,6 +99,11 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
}
}

private def watchedConnecting(retryStrategy: RetryStrategy, readInitSender: Opt[ActorRef]): Receive = {
connectionStateObserver.foreach(_.onConnectionStateChange(address, ConnectionState.Connecting))
connecting(retryStrategy, readInitSender)
}

private def connecting(retryStrategy: RetryStrategy, readInitSender: Opt[ActorRef]): Receive = {
case open: Open =>
onOpen(open)
Expand Down Expand Up @@ -128,7 +137,7 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
case ReadInit =>
// not sure if it's possible to receive ReadInit before Connected but just to be safe
// delay replying with ReadAck until Connected is received
become(connecting(retryStrategy, Opt(sender())))
become(watchedConnecting(retryStrategy, Opt(sender())))
case _: TcpEvent => //ignore, this is from previous connection
}

Expand Down Expand Up @@ -197,7 +206,7 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
if (delay > Duration.Zero) {
log.info(s"Next reconnection attempt to $address in $delay")
}
become(connecting(nextStrategy, Opt.Empty))
become(watchedConnecting(nextStrategy, Opt.Empty))
system.scheduler.scheduleOnce(delay, self, Connect)
case Opt.Empty =>
close(failureCause, stopSelf = false)
Expand Down Expand Up @@ -282,7 +291,7 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
config.initCommands.decodeReplies(packsResult)
log.debug(s"Successfully initialized Redis connection $localAddr->$remoteAddr")
initPromise.trySuccess(())
become(ready)
become(watchedReady)
writeIfPossible()
} catch {
// https://github.com/antirez/redis/issues/4624
Expand All @@ -301,6 +310,11 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
close(new ConnectionInitializationFailure(cause), stopSelf = false)
}

def watchedReady: Receive = {
connectionStateObserver.foreach(_.onConnectionStateChange(address, ConnectionState.Connected))
ready
}

def ready: Receive = {
case open: Open =>
onOpen(open)
Expand Down Expand Up @@ -338,7 +352,7 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
case open: Open =>
onOpen(open)
initPromise.success(())
become(ready)
become(watchedReady)
case IncomingPacks(packs) =>
packs.reply(PacksResult.Failure(cause))
case Release => //ignore
Expand Down Expand Up @@ -512,7 +526,7 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
if (stopSelf) {
stop(self)
} else {
become(closed(cause, tcpConnecting))
become(watchedClosed(cause, tcpConnecting))
}
}

Expand All @@ -522,11 +536,16 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
drain(queuedToReserve)(_.reply(failure))
}

private def watchedClosed(cause: Throwable, tcpConnecting: Boolean): Receive = {
connectionStateObserver.foreach(_.onConnectionStateChange(address, ConnectionState.Closed))
closed(cause, tcpConnecting)
}

private def closed(cause: Throwable, tcpConnecting: Boolean): Receive = {
case open: Open =>
onOpen(open)
incarnation = 0
become(connecting(config.reconnectionStrategy, Opt.Empty))
become(watchedConnecting(config.reconnectionStrategy, Opt.Empty))
if (!tcpConnecting) {
self ! Connect
}
Expand All @@ -536,11 +555,19 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
case Connected(connection, _, _, _) if tcpConnecting =>
// failure may have happened while connecting, simply close the connection
connection ! CloseConnection(immediate = true)
become(closed(cause, tcpConnecting = false))
become(watchedClosed(cause, tcpConnecting = false))
case _: TcpEvent => // ignore
case Close(_, true) =>
stop(self)
}

override def preStart(): Unit = {
connectionStateObserver.foreach(_.onConnectionStateChange(address, ConnectionState.Created))
}

override def postStop(): Unit = {
connectionStateObserver.foreach(_.onConnectionStateChange(address, ConnectionState.Removed))
}
}

object RedisConnectionActor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.avsystem.commons.redis.actor.RedisConnectionActor.PacksResult
import com.avsystem.commons.redis.commands.{PubSubEvent, Subscribe}
import com.avsystem.commons.redis.config.MasterSlaveConfig
import com.avsystem.commons.redis.exception.MasterSlaveInitializationException
import com.avsystem.commons.redis.monitoring.SentinelStateObserver
import com.avsystem.commons.redis.protocol.BulkStringMsg
import com.avsystem.commons.redis.util.ActorLazyLogging
import com.avsystem.commons.redis.{NodeAddress, RedisApi, RedisBatch, RedisNodeClient}
Expand All @@ -15,7 +16,8 @@ final class SentinelsMonitoringActor(
seedSentinels: Seq[NodeAddress],
config: MasterSlaveConfig,
onInitFailure: Throwable => Unit,
onMasterChange: RedisNodeClient => Unit
onMasterChange: RedisNodeClient => Unit,
stateObserver: OptArg[SentinelStateObserver] = OptArg.Empty,
) extends Actor with ActorLazyLogging {

import RedisApi.Batches.StringTyped._
Expand All @@ -40,7 +42,7 @@ final class SentinelsMonitoringActor(

private def openConnection(addr: NodeAddress, seed: Boolean): ActorRef = {
log.debug(s"Opening monitoring connection to sentinel $addr")
val conn = actorOf(Props(new RedisConnectionActor(addr, config.sentinelConnectionConfigs(addr))))
val conn = actorOf(Props(new RedisConnectionActor(addr, config.sentinelConnectionConfigs(addr), stateObserver)))
sentinels(addr) = conn
conn ! RedisConnectionActor.Open(seed, Promise[Unit]())
onReconnection(conn)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.avsystem.commons
package redis.monitoring

import com.avsystem.commons.redis.RedisClusterClient

/**
* Intended for monitoring [[RedisClusterClient]]'s connections.
* Should be non-blocking and handle internal exceptions by itself.
*/
trait ClusterStateObserver extends ConnectionStateObserver {
def onClusterRefresh(): Unit
def onClusterRefreshFailure(): Unit
def onClusterInitialized(): Unit
def onClusterInitFailure(): Unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.avsystem.commons
package redis.monitoring

import com.avsystem.commons.misc.{AbstractValueEnum, AbstractValueEnumCompanion, EnumCtx}
import com.avsystem.commons.redis.NodeAddress

/**
* Intended for monitoring the state of a single Redis connection.
* Should be non-blocking and handle internal exceptions by itself.
*/
trait ConnectionStateObserver {
def onConnectionStateChange(addr: NodeAddress, state: ConnectionState): Unit
}

final class ConnectionState(implicit enumCtx: EnumCtx) extends AbstractValueEnum

object ConnectionState extends AbstractValueEnumCompanion[ConnectionState] {
final val Created, Connecting, Connected, Closed, Removed: ConnectionState = new ConnectionState
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.avsystem.commons
package redis.monitoring

import com.avsystem.commons.redis.NodeAddress
import com.avsystem.commons.redis.RedisMasterSlaveClient

/**
* Intended for monitoring [[RedisMasterSlaveClient]]'s state and connections.
* Should be non-blocking and handle internal exceptions by itself.
*/
trait SentinelStateObserver extends ConnectionStateObserver {
def onMasterChange(master: NodeAddress): Unit
}

0 comments on commit 6c254f4

Please sign in to comment.