Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import cats.syntax.all._
import dev.profunktor.redis4cats.connection.RedisClient
import dev.profunktor.redis4cats.data._
import dev.profunktor.redis4cats.effect._
import dev.profunktor.redis4cats.pubsub.internals.{ LivePubSubCommands, Publisher, Subscriber }
import dev.profunktor.redis4cats.pubsub.internals.{ LivePubSubCommands, PubSubState, Publisher, Subscriber }
import fs2.Stream
import dev.profunktor.redis4cats.pubsub.internals.PubSubState
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection

object PubSub {
Expand Down Expand Up @@ -58,7 +57,7 @@ object PubSub {
val (acquire, release) = acquireAndRelease[F, K, V](client, codec)
// One exclusive connection for subscriptions and another connection for publishing / stats
for {
state <- Resource.eval(Ref.of[F, PubSubState[F, K, V]](PubSubState(Map.empty, Map.empty)))
state <- Resource.eval(PubSubState.make[F, K, V])
sConn <- Resource.make(acquire)(release)
pConn <- Resource.make(acquire)(release)
} yield new LivePubSubCommands[F, K, V](state, sConn, pConn)
Expand Down Expand Up @@ -88,7 +87,7 @@ object PubSub {
): Resource[F, SubscribeCommands[F, Stream[F, *], K, V]] = {
val (acquire, release) = acquireAndRelease[F, K, V](client, codec)
for {
state <- Resource.eval(Ref.of[F, PubSubState[F, K, V]](PubSubState(Map.empty, Map.empty)))
state <- Resource.eval(PubSubState.make[F, K, V])
conn <- Resource.make(acquire)(release)
} yield new Subscriber(state, conn)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ trait PubSubStats[F[_], K] {
* @tparam V the value type
*/
trait PublishCommands[F[_], S[_], K, V] extends PubSubStats[F, K] {
def publish(channel: RedisChannel[K]): S[V] => S[Unit]
def publish(channel: RedisChannel[K], value: V): F[Unit]

/** @return The number of clients that received the message. */
def publish(channel: RedisChannel[K]): S[V] => S[Long]

/** @return The number of clients that received the message. */
def publish(channel: RedisChannel[K], value: V): F[Long]
}

/**
Expand All @@ -51,17 +55,40 @@ trait SubscribeCommands[F[_], S[_], K, V] {

/**
* Subscribes to a channel.
*
* @note If you invoke `subscribe` multiple times for the same channel, we will not call 'SUBSCRIBE' in Redis multiple
* times but instead will return a stream that will use the existing subscription to that channel. The underlying
* subscription is cleaned up when all the streams terminate or when `unsubscribe` is invoked.
*/
def subscribe(channel: RedisChannel[K]): S[V]

/** Terminates all streams that are subscribed to the channel. */
def unsubscribe(channel: RedisChannel[K]): F[Unit]

/**
* Subscribes to a pattern.
*
* @note If you invoke `subscribe` multiple times for the same pattern, we will not call 'SUBSCRIBE' in Redis multiple
* times but instead will return a stream that will use the existing subscription to that pattern. The underlying
* subscription is cleaned up when all the streams terminate or when `unsubscribe` is invoked.
*/
def psubscribe(channel: RedisPattern[K]): S[RedisPatternEvent[K, V]]

/** Terminates all streams that are subscribed to the pattern. */
def punsubscribe(channel: RedisPattern[K]): F[Unit]

/** Returns the channel subscriptions that the library keeps of.
*
* @return how many streams are subscribed to each channel.
* @see [[SubscribeCommands.subscribe]] for more information.
* */
def internalChannelSubscriptions: F[Map[RedisChannel[K], Long]]

/** Returns the pattern subscriptions that the library keeps of.
*
* @return how many streams are subscribed to each pattern.
* @see [[SubscribeCommands.psubscribe]] for more information. */
def internalPatternSubscriptions: F[Map[RedisPattern[K], Long]]
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import fs2.Stream
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection

private[pubsub] class LivePubSubCommands[F[_]: Async: Log, K, V](
state: Ref[F, PubSubState[F, K, V]],
state: PubSubState[F, K, V],
subConnection: StatefulRedisPubSubConnection[K, V],
pubConnection: StatefulRedisPubSubConnection[K, V]
) extends PubSubCommands[F, Stream[F, *], K, V] {
Expand All @@ -50,13 +50,17 @@ private[pubsub] class LivePubSubCommands[F[_]: Async: Log, K, V](
override def punsubscribe(pattern: RedisPattern[K]): F[Unit] =
subCommands.punsubscribe(pattern)

override def publish(channel: RedisChannel[K]): Stream[F, V] => Stream[F, Unit] =
override def internalChannelSubscriptions: F[Map[RedisChannel[K], Long]] =
subCommands.internalChannelSubscriptions

override def internalPatternSubscriptions: F[Map[RedisPattern[K], Long]] =
subCommands.internalPatternSubscriptions

override def publish(channel: RedisChannel[K]): Stream[F, V] => Stream[F, Long] =
_.evalMap(publish(channel, _))

override def publish(channel: RedisChannel[K], message: V): F[Unit] = {
val resource = Resource.eval(state.get) >>= PubSubInternals.channel[F, K, V](state, subConnection).apply(channel)
resource.use(_ => FutureLift[F].lift(pubConnection.async().publish(channel.underlying, message)).void)
}
override def publish(channel: RedisChannel[K], message: V): F[Long] =
FutureLift[F].lift(pubConnection.async().publish(channel.underlying, message)).map(l => l: Long)

override def numPat: F[Long] =
pubSubStats.numPat
Expand All @@ -78,5 +82,4 @@ private[pubsub] class LivePubSubCommands[F[_]: Async: Log, K, V](

override def shardNumSub(channels: List[RedisChannel[K]]): F[List[Subscription[K]]] =
pubSubStats.shardNumSub(channels)

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ private[pubsub] class LivePubSubStats[F[_]: FlatMap: FutureLift, K, V](
FutureLift[F]
.lift(pubConnection.async().pubsubShardNumsub(channels.map(_.underlying): _*))
.map(toSubscription[K])

}
object LivePubSubStats {
private def toSubscription[K](map: ju.Map[K, JLong]): List[Subscription[K]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,26 @@
package dev.profunktor.redis4cats.pubsub.internals

import scala.util.control.NoStackTrace

import cats.effect.kernel.{ Async, Ref, Resource, Sync }
import cats.effect.std.Dispatcher
import cats.syntax.all._
import cats.effect.std.{ Dispatcher }
import dev.profunktor.redis4cats.data.RedisChannel
import dev.profunktor.redis4cats.data.RedisPattern
import dev.profunktor.redis4cats.data.RedisPatternEvent
import dev.profunktor.redis4cats.effect.Log
import fs2.concurrent.Topic
import io.lettuce.core.pubsub.{ RedisPubSubListener, StatefulRedisPubSubConnection }
import io.lettuce.core.pubsub.{ RedisPubSubListener }
import io.lettuce.core.pubsub.RedisPubSubAdapter

object PubSubInternals {
case class DispatcherAlreadyShutdown() extends NoStackTrace

private[redis4cats] def channelListener[F[_]: Async, K, V](
private[redis4cats] def channelListener[F[_], K, V](
channel: RedisChannel[K],
topic: Topic[F, Option[V]],
publish: V => F[Unit],
dispatcher: Dispatcher[F]
): RedisPubSubListener[K, V] =
new RedisPubSubAdapter[K, V] {
override def message(ch: K, msg: V): Unit =
if (ch == channel.underlying) {
try {
dispatcher.unsafeRunSync(topic.publish1(Option(msg)).void)
dispatcher.unsafeRunSync(publish(msg))
} catch {
case _: IllegalStateException => throw DispatcherAlreadyShutdown()
}
Expand All @@ -50,65 +45,19 @@ object PubSubInternals {
// Do not uncomment this, as if you will do this the channel listener will get a message twice
// override def message(pattern: K, channel: K, message: V): Unit = {}
}
private[redis4cats] def patternListener[F[_]: Async, K, V](
private[redis4cats] def patternListener[F[_], K, V](
redisPattern: RedisPattern[K],
topic: Topic[F, Option[RedisPatternEvent[K, V]]],
publish: RedisPatternEvent[K, V] => F[Unit],
dispatcher: Dispatcher[F]
): RedisPubSubListener[K, V] =
new RedisPubSubAdapter[K, V] {
override def message(pattern: K, channel: K, message: V): Unit =
if (pattern == redisPattern.underlying) {
try {
dispatcher.unsafeRunSync(topic.publish1(Option(RedisPatternEvent(pattern, channel, message))).void)
dispatcher.unsafeRunSync(publish(RedisPatternEvent(pattern, channel, message)))
} catch {
case _: IllegalStateException => throw DispatcherAlreadyShutdown()
}
}
}

private[redis4cats] def channel[F[_]: Async: Log, K, V](
state: Ref[F, PubSubState[F, K, V]],
subConnection: StatefulRedisPubSubConnection[K, V]
): GetOrCreateTopicListener[F, K, V] = { channel => st =>
st.channels
.get(channel.underlying)
.fold {
for {
dispatcher <- Dispatcher.parallel[F]
topic <- Resource.eval(Topic[F, Option[V]])
_ <- Resource.eval(Log[F].info(s"Creating listener for channel: $channel"))
listener = channelListener(channel, topic, dispatcher)
_ <- Resource.make {
Sync[F].delay(subConnection.addListener(listener)) *>
state.update(s => s.copy(channels = s.channels.updated(channel.underlying, topic)))
} { _ =>
Sync[F].delay(subConnection.removeListener(listener)) *>
state.update(s => s.copy(channels = s.channels - channel.underlying))
}
} yield topic
}(Resource.pure)
}

private[redis4cats] def pattern[F[_]: Async: Log, K, V](
state: Ref[F, PubSubState[F, K, V]],
subConnection: StatefulRedisPubSubConnection[K, V]
): GetOrCreatePatternListener[F, K, V] = { channel => st =>
st.patterns
.get(channel.underlying)
.fold {
for {
dispatcher <- Dispatcher.parallel[F]
topic <- Resource.eval(Topic[F, Option[RedisPatternEvent[K, V]]])
_ <- Resource.eval(Log[F].info(s"Creating listener for pattern: $channel"))
listener = patternListener(channel, topic, dispatcher)
_ <- Resource.make {
Sync[F].delay(subConnection.addListener(listener)) *>
state.update(s => s.copy(patterns = s.patterns.updated(channel.underlying, topic)))
} { _ =>
Sync[F].delay(subConnection.removeListener(listener)) *>
state.update(s => s.copy(patterns = s.patterns - channel.underlying))
}
} yield topic
}(Resource.pure)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,21 @@

package dev.profunktor.redis4cats.pubsub.internals

import dev.profunktor.redis4cats.data.RedisPatternEvent
import fs2.concurrent.Topic
import cats.syntax.all._
import cats.effect.kernel.Concurrent
import cats.effect.std.AtomicCell
import dev.profunktor.redis4cats.data.{ RedisChannel, RedisPattern, RedisPatternEvent }

final case class PubSubState[F[_], K, V](
channels: Map[K, Topic[F, Option[V]]],
patterns: Map[K, Topic[F, Option[RedisPatternEvent[K, V]]]]
/** We use `AtomicCell` instead of `Ref` because we need locking while side-effecting. */
case class PubSubState[F[_], K, V](
channelSubs: AtomicCell[F, Map[RedisChannel[K], Redis4CatsSubscription[F, V]]],
patternSubs: AtomicCell[F, Map[RedisPattern[K], Redis4CatsSubscription[F, RedisPatternEvent[K, V]]]]
)
object PubSubState {
def make[F[_]: Concurrent, K, V]: F[PubSubState[F, K, V]] =
for {
channelSubs <- AtomicCell[F].of(Map.empty[RedisChannel[K], Redis4CatsSubscription[F, V]])
patternSubs <- AtomicCell[F].of(Map.empty[RedisPattern[K], Redis4CatsSubscription[F, RedisPatternEvent[K, V]]])
} yield apply(channelSubs, patternSubs)

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ private[pubsub] class Publisher[F[_]: FlatMap: FutureLift, K, V](

private[redis4cats] val pubSubStats: PubSubStats[F, K] = new LivePubSubStats(pubConnection)

override def publish(channel: RedisChannel[K]): Stream[F, V] => Stream[F, Unit] =
override def publish(channel: RedisChannel[K]): Stream[F, V] => Stream[F, Long] =
_.evalMap(publish(channel, _))

override def publish(channel: RedisChannel[K], message: V): F[Unit] =
FutureLift[F].lift(pubConnection.async().publish(channel.underlying, message)).void
override def publish(channel: RedisChannel[K], message: V): F[Long] =
FutureLift[F].lift(pubConnection.async().publish(channel.underlying, message)).map(l => l: Long)

override def pubSubChannels: F[List[RedisChannel[K]]] =
pubSubStats.pubSubChannels
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2018-2021 ProfunKtor
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.profunktor.redis4cats.pubsub.internals

import cats.Applicative
import fs2.concurrent.Topic

/**
* Stores an ongoing subscription.
*
* @param topic single-publisher, multiple-subscribers. The same topic is reused if `subscribe` is invoked more than
* once. The subscribers' streams are terminated when `None` is published.
* @param subscribers subscriber count, when `subscribers` reaches 0 `cleanup` is called and `None` is published
* to the topic.
*/
final private[redis4cats] case class Redis4CatsSubscription[F[_], V](
topic: Topic[F, Option[V]],
subscribers: Long,
cleanup: F[Unit]
) {
assert(subscribers > 0, s"subscribers must be > 0, was $subscribers")

def addSubscriber: Redis4CatsSubscription[F, V] = copy(subscribers = subscribers + 1)
def removeSubscriber: Redis4CatsSubscription[F, V] = copy(subscribers = subscribers - 1)
def isLastSubscriber: Boolean = subscribers == 1

def stream(onTermination: F[Unit])(implicit F: Applicative[F]): fs2.Stream[F, V] =
topic.subscribe(500).unNoneTerminate.onFinalize(onTermination)
}
Loading