Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import microsites.ExtraMdFileConfig
ThisBuild / scalaVersion := "2.13.15"
ThisBuild / crossScalaVersions := Seq("2.12.20", "2.13.15", "3.3.4")
ThisBuild / evictionErrorLevel := Level.Info
ThisBuild / mimaBaseVersion := "1.7.0"
ThisBuild / mimaBaseVersion := "1.8.0"
Test / parallelExecution := false

val blue = "\u001b[34m"
Expand Down Expand Up @@ -142,7 +142,7 @@ lazy val `redis4cats-effects` = project
.settings(
isMimaEnabled := true,
mimaPreviousArtifacts ~= { prev =>
prev.filter(artifact => VersionNumber(artifact.revision).matchesSemVer(SemanticSelector(">=1.7.2")))
prev.filter(artifact => VersionNumber(artifact.revision).matchesSemVer(SemanticSelector(">=1.8.0")))
}
)
.settings(Test / parallelExecution := false)
Expand All @@ -155,7 +155,7 @@ lazy val `redis4cats-streams` = project
.settings(
isMimaEnabled := true,
mimaPreviousArtifacts ~= { prev =>
prev.filter(artifact => VersionNumber(artifact.revision).matchesSemVer(SemanticSelector(">=1.7.2")))
prev.filter(artifact => VersionNumber(artifact.revision).matchesSemVer(SemanticSelector(">=1.8.0")))
}
)
.settings(libraryDependencies += Libraries.fs2Core)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ object PubSubDemo extends LoggerIOApp {
sub3.through(sink("#tx-ps")),
Stream.awakeEvery[IO](3.seconds) >> Stream.eval(IO(Random.nextInt(100).toString)).through(pub1),
Stream.awakeEvery[IO](5.seconds) >> Stream.emit("Pac-Man!").through(pub2),
Stream.awakeDelay[IO](11.seconds) >> pubSub.unsubscribe(gamesChannel),
Stream.awakeEvery[IO](6.seconds) >> pubSub
.pubSubSubscriptions(List(eventsChannel, gamesChannel, txChannel))
Stream.awakeDelay[IO](11.seconds) >> Stream.eval(pubSub.unsubscribe(gamesChannel)),
Stream.awakeEvery[IO](6.seconds) >> Stream
.eval(pubSub.pubSubSubscriptions(List(eventsChannel, gamesChannel, txChannel)))
.evalMap(IO.println),
Stream.sleep[IO](1.second) ++ Stream.exec(redis.transact_(ops))
).parJoinUnbounded.drain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ object PublisherDemo extends LoggerIOApp {
pub1 = pubSub.publish(eventsChannel)
} yield Stream(
Stream.awakeEvery[IO](3.seconds) >> Stream.eval(IO(Random.nextInt(100).toString)).through(pub1),
Stream.awakeEvery[IO](6.seconds) >> pubSub
.pubSubSubscriptions(eventsChannel)
.evalMap(IO.println)
Stream.awakeEvery[IO](6.seconds) >> Stream.eval(pubSub.pubSubSubscriptions(eventsChannel)).evalMap(IO.println)
).parJoin(2).drain).flatten

val program: IO[Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object PubSub {
def mkPubSubConnection[F[_]: Async: FutureLift: Log, K, V](
client: RedisClient,
codec: RedisCodec[K, V]
): Resource[F, PubSubCommands[Stream[F, *], K, V]] = {
): Resource[F, PubSubCommands[F, Stream[F, *], K, V]] = {
val (acquire, release) = acquireAndRelease[F, K, V](client, codec)
// One exclusive connection for subscriptions and another connection for publishing / stats
for {
Expand All @@ -72,7 +72,7 @@ object PubSub {
def mkPublisherConnection[F[_]: FlatMap: FutureLift: Log, K, V](
client: RedisClient,
codec: RedisCodec[K, V]
): Resource[F, PublishCommands[Stream[F, *], K, V]] = {
): Resource[F, PublishCommands[F, Stream[F, *], K, V]] = {
val (acquire, release) = acquireAndRelease[F, K, V](client, codec)
Resource.make(acquire)(release).map(new Publisher[F, K, V](_))
}
Expand All @@ -85,7 +85,7 @@ object PubSub {
def mkSubscriberConnection[F[_]: Async: FutureLift: Log, K, V](
client: RedisClient,
codec: RedisCodec[K, V]
): Resource[F, SubscribeCommands[Stream[F, *], K, V]] = {
): Resource[F, SubscribeCommands[F, Stream[F, *], K, V]] = {
val (acquire, release) = acquireAndRelease[F, K, V](client, codec)
for {
state <- Resource.eval(Ref.of[F, PubSubState[F, K, V]](PubSubState(Map.empty, Map.empty)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,39 @@ trait PubSubStats[F[_], K] {
def numSub: F[List[Subscription[K]]]
def pubSubChannels: F[List[RedisChannel[K]]]
def pubSubShardChannels: F[List[RedisChannel[K]]]
def pubSubSubscriptions(channel: RedisChannel[K]): F[Subscription[K]]
def pubSubSubscriptions(channel: RedisChannel[K]): F[Option[Subscription[K]]]
def pubSubSubscriptions(channels: List[RedisChannel[K]]): F[List[Subscription[K]]]
def shardNumSub(channels: List[RedisChannel[K]]): F[List[Subscription[K]]]
}

trait PublishCommands[F[_], K, V] extends PubSubStats[F, K] {
def publish(channel: RedisChannel[K]): F[V] => F[Unit]
/**
* @tparam F the effect type
* @tparam S the stream type
* @tparam K the channel key type
* @tparam V the value type
*/
trait PublishCommands[F[_], S[_], K, V] extends PubSubStats[F, K] {
def publish(channel: RedisChannel[K]): S[V] => S[Unit]
def publish(channel: RedisChannel[K], value: V): F[Unit]
}

trait SubscribeCommands[F[_], K, V] {
def subscribe(channel: RedisChannel[K]): F[V]
/**
* @tparam F the effect type
* @tparam S the stream type
* @tparam K the channel key type
* @tparam V the value type
*/
trait SubscribeCommands[F[_], S[_], K, V] {
def subscribe(channel: RedisChannel[K]): S[V]
def unsubscribe(channel: RedisChannel[K]): F[Unit]
def psubscribe(channel: RedisPattern[K]): F[RedisPatternEvent[K, V]]
def psubscribe(channel: RedisPattern[K]): S[RedisPatternEvent[K, V]]
def punsubscribe(channel: RedisPattern[K]): F[Unit]
}

trait PubSubCommands[F[_], K, V] extends PublishCommands[F, K, V] with SubscribeCommands[F, K, V]
/**
* @tparam F the effect type
* @tparam S the stream type
* @tparam K the channel key type
* @tparam V the value type
*/
trait PubSubCommands[F[_], S[_], K, V] extends PublishCommands[F, S, K, V] with SubscribeCommands[F, S, K, V]
Original file line number Diff line number Diff line change
Expand Up @@ -32,51 +32,51 @@ private[pubsub] class LivePubSubCommands[F[_]: Async: Log, K, V](
state: Ref[F, PubSubState[F, K, V]],
subConnection: StatefulRedisPubSubConnection[K, V],
pubConnection: StatefulRedisPubSubConnection[K, V]
) extends PubSubCommands[Stream[F, *], K, V] {
) extends PubSubCommands[F, Stream[F, *], K, V] {

private[redis4cats] val subCommands: SubscribeCommands[Stream[F, *], K, V] =
private[redis4cats] val subCommands: SubscribeCommands[F, Stream[F, *], K, V] =
new Subscriber[F, K, V](state, subConnection)
private[redis4cats] val pubSubStats: PubSubStats[Stream[F, *], K] = new LivePubSubStats(pubConnection)
private[redis4cats] val pubSubStats: PubSubStats[F, K] = new LivePubSubStats(pubConnection)

override def subscribe(channel: RedisChannel[K]): Stream[F, V] =
subCommands.subscribe(channel)

override def unsubscribe(channel: RedisChannel[K]): Stream[F, Unit] =
override def unsubscribe(channel: RedisChannel[K]): F[Unit] =
subCommands.unsubscribe(channel)

override def psubscribe(pattern: RedisPattern[K]): Stream[F, RedisPatternEvent[K, V]] =
subCommands.psubscribe(pattern)

override def punsubscribe(pattern: RedisPattern[K]): Stream[F, Unit] =
override def punsubscribe(pattern: RedisPattern[K]): F[Unit] =
subCommands.punsubscribe(pattern)

override def publish(channel: RedisChannel[K]): Stream[F, V] => Stream[F, Unit] =
_.flatMap { message =>
Stream.resource(
Resource.eval(state.get) >>= PubSubInternals.channel[F, K, V](state, subConnection).apply(channel)
) >>
Stream.eval(FutureLift[F].lift(pubConnection.async().publish(channel.underlying, message)).void)
}

override def numPat: Stream[F, Long] =
_.evalMap(publish(channel, _))

override def publish(channel: RedisChannel[K], message: V): F[Unit] = {
val resource = Resource.eval(state.get) >>= PubSubInternals.channel[F, K, V](state, subConnection).apply(channel)
resource.use(_ => FutureLift[F].lift(pubConnection.async().publish(channel.underlying, message)).void)
}

override def numPat: F[Long] =
pubSubStats.numPat

override def numSub: Stream[F, List[Subscription[K]]] =
override def numSub: F[List[Subscription[K]]] =
pubSubStats.numSub

override def pubSubChannels: Stream[F, List[RedisChannel[K]]] =
override def pubSubChannels: F[List[RedisChannel[K]]] =
pubSubStats.pubSubChannels

override def pubSubShardChannels: Stream[F, List[RedisChannel[K]]] =
override def pubSubShardChannels: F[List[RedisChannel[K]]] =
pubSubStats.pubSubShardChannels

override def pubSubSubscriptions(channel: RedisChannel[K]): Stream[F, Subscription[K]] =
override def pubSubSubscriptions(channel: RedisChannel[K]): F[Option[Subscription[K]]] =
pubSubStats.pubSubSubscriptions(channel)

override def pubSubSubscriptions(channels: List[RedisChannel[K]]): Stream[F, List[Subscription[K]]] =
override def pubSubSubscriptions(channels: List[RedisChannel[K]]): F[List[Subscription[K]]] =
pubSubStats.pubSubSubscriptions(channels)

override def shardNumSub(channels: List[RedisChannel[K]]): Stream[F, List[Subscription[K]]] =
override def shardNumSub(channels: List[RedisChannel[K]]): F[List[Subscription[K]]] =
pubSubStats.shardNumSub(channels)

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import cats.syntax.all._
import dev.profunktor.redis4cats.data._
import dev.profunktor.redis4cats.effect.FutureLift
import dev.profunktor.redis4cats.pubsub.data.Subscription
import fs2.Stream
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection
import dev.profunktor.redis4cats.JavaConversions._
import dev.profunktor.redis4cats.pubsub.internals.LivePubSubStats.toSubscription
Expand All @@ -32,51 +31,37 @@ import java.{ util => ju }
import java.lang.{ Long => JLong }
private[pubsub] class LivePubSubStats[F[_]: FlatMap: FutureLift, K, V](
pubConnection: StatefulRedisPubSubConnection[K, V]
) extends PubSubStats[Stream[F, *], K] {
) extends PubSubStats[F, K] {

override def numPat: Stream[F, Long] =
Stream
.eval {
FutureLift[F].lift(pubConnection.async().pubsubNumpat())
}
.map(Long.unbox)
override def numPat: F[Long] =
FutureLift[F].lift(pubConnection.async().pubsubNumpat()).map(Long.unbox)

override def numSub: Stream[F, List[Subscription[K]]] =
Stream
.eval {
FutureLift[F].lift(pubConnection.async().pubsubNumsub())
}
override def numSub: F[List[Subscription[K]]] =
FutureLift[F]
.lift(pubConnection.async().pubsubNumsub())
.map(toSubscription[K])

override def pubSubChannels: Stream[F, List[RedisChannel[K]]] =
Stream
.eval {
FutureLift[F].lift(pubConnection.async().pubsubChannels())
}
override def pubSubChannels: F[List[RedisChannel[K]]] =
FutureLift[F]
.lift(pubConnection.async().pubsubChannels())
.map(_.asScala.toList.map(RedisChannel[K]))

override def pubSubShardChannels: Stream[F, List[RedisChannel[K]]] =
Stream
.eval {
FutureLift[F].lift(pubConnection.async().pubsubShardChannels())
}
override def pubSubShardChannels: F[List[RedisChannel[K]]] =
FutureLift[F]
.lift(pubConnection.async().pubsubShardChannels())
.map(_.asScala.toList.map(RedisChannel[K]))

override def pubSubSubscriptions(channel: RedisChannel[K]): Stream[F, Subscription[K]] =
pubSubSubscriptions(List(channel)).map(_.headOption).unNone
override def pubSubSubscriptions(channel: RedisChannel[K]): F[Option[Subscription[K]]] =
pubSubSubscriptions(List(channel)).map(_.headOption)

override def pubSubSubscriptions(channels: List[RedisChannel[K]]): Stream[F, List[Subscription[K]]] =
Stream.eval {
FutureLift[F]
.lift(pubConnection.async().pubsubNumsub(channels.map(_.underlying): _*))
.map(toSubscription[K])
}
override def pubSubSubscriptions(channels: List[RedisChannel[K]]): F[List[Subscription[K]]] =
FutureLift[F]
.lift(pubConnection.async().pubsubNumsub(channels.map(_.underlying): _*))
.map(toSubscription[K])

override def shardNumSub(channels: List[RedisChannel[K]]): Stream[F, List[Subscription[K]]] =
Stream
.eval {
FutureLift[F].lift(pubConnection.async().pubsubShardNumsub(channels.map(_.underlying): _*))
}
override def shardNumSub(channels: List[RedisChannel[K]]): F[List[Subscription[K]]] =
FutureLift[F]
.lift(pubConnection.async().pubsubShardNumsub(channels.map(_.underlying): _*))
.map(toSubscription[K])

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,34 @@ import io.lettuce.core.pubsub.StatefulRedisPubSubConnection

private[pubsub] class Publisher[F[_]: FlatMap: FutureLift, K, V](
pubConnection: StatefulRedisPubSubConnection[K, V]
) extends PublishCommands[Stream[F, *], K, V] {
) extends PublishCommands[F, Stream[F, *], K, V] {

private[redis4cats] val pubSubStats: PubSubStats[Stream[F, *], K] = new LivePubSubStats(pubConnection)
private[redis4cats] val pubSubStats: PubSubStats[F, K] = new LivePubSubStats(pubConnection)

override def publish(channel: RedisChannel[K]): Stream[F, V] => Stream[F, Unit] =
_.evalMap(message => FutureLift[F].lift(pubConnection.async().publish(channel.underlying, message)).void)
_.evalMap(publish(channel, _))

override def pubSubChannels: Stream[F, List[RedisChannel[K]]] =
override def publish(channel: RedisChannel[K], message: V): F[Unit] =
FutureLift[F].lift(pubConnection.async().publish(channel.underlying, message)).void

override def pubSubChannels: F[List[RedisChannel[K]]] =
pubSubStats.pubSubChannels

override def pubSubSubscriptions(channel: RedisChannel[K]): Stream[F, Subscription[K]] =
override def pubSubSubscriptions(channel: RedisChannel[K]): F[Option[Subscription[K]]] =
pubSubStats.pubSubSubscriptions(channel)

override def pubSubSubscriptions(channels: List[RedisChannel[K]]): Stream[F, List[Subscription[K]]] =
override def pubSubSubscriptions(channels: List[RedisChannel[K]]): F[List[Subscription[K]]] =
pubSubStats.pubSubSubscriptions(channels)

override def numPat: Stream[F, Long] =
override def numPat: F[Long] =
pubSubStats.numPat

override def numSub: Stream[F, List[Subscription[K]]] =
override def numSub: F[List[Subscription[K]]] =
pubSubStats.numSub

override def pubSubShardChannels: Stream[F, List[RedisChannel[K]]] =
override def pubSubShardChannels: F[List[RedisChannel[K]]] =
pubSubStats.pubSubShardChannels

override def shardNumSub(channels: List[RedisChannel[K]]): Stream[F, List[Subscription[K]]] =
override def shardNumSub(channels: List[RedisChannel[K]]): F[List[Subscription[K]]] =
pubSubStats.shardNumSub(channels)
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,41 +32,37 @@ import io.lettuce.core.pubsub.StatefulRedisPubSubConnection
private[pubsub] class Subscriber[F[_]: Async: FutureLift: Log, K, V](
state: Ref[F, PubSubState[F, K, V]],
subConnection: StatefulRedisPubSubConnection[K, V]
) extends SubscribeCommands[Stream[F, *], K, V] {
) extends SubscribeCommands[F, Stream[F, *], K, V] {

override def subscribe(channel: RedisChannel[K]): Stream[F, V] =
Stream
.resource(Resource.eval(state.get) >>= PubSubInternals.channel[F, K, V](state, subConnection).apply(channel))
.evalTap(_ => FutureLift[F].lift(subConnection.async().subscribe(channel.underlying)))
.flatMap(_.subscribe(500).unNone)

override def unsubscribe(channel: RedisChannel[K]): Stream[F, Unit] =
Stream.eval {
FutureLift[F]
.lift(subConnection.async().unsubscribe(channel.underlying))
.void
.guarantee(state.get.flatMap { st =>
st.channels.get(channel.underlying).fold(Applicative[F].unit)(_.publish1(none[V]).void) *> state
.update(s => s.copy(channels = s.channels - channel.underlying))
})
}
override def unsubscribe(channel: RedisChannel[K]): F[Unit] =
FutureLift[F]
.lift(subConnection.async().unsubscribe(channel.underlying))
.void
.guarantee(state.get.flatMap { st =>
st.channels.get(channel.underlying).fold(Applicative[F].unit)(_.publish1(none[V]).void) *> state
.update(s => s.copy(channels = s.channels - channel.underlying))
})

override def psubscribe(pattern: RedisPattern[K]): Stream[F, RedisPatternEvent[K, V]] =
Stream
.resource(Resource.eval(state.get) >>= PubSubInternals.pattern[F, K, V](state, subConnection).apply(pattern))
.evalTap(_ => FutureLift[F].lift(subConnection.async().psubscribe(pattern.underlying)))
.flatMap(_.subscribe(500).unNone)

override def punsubscribe(pattern: RedisPattern[K]): Stream[F, Unit] =
Stream.eval {
FutureLift[F]
.lift(subConnection.async().punsubscribe(pattern.underlying))
.void
.guarantee(state.get.flatMap { st =>
st.patterns
.get(pattern.underlying)
.fold(Applicative[F].unit)(_.publish1(none[RedisPatternEvent[K, V]]).void) *> state
.update(s => s.copy(patterns = s.patterns - pattern.underlying))
})
}
override def punsubscribe(pattern: RedisPattern[K]): F[Unit] =
FutureLift[F]
.lift(subConnection.async().punsubscribe(pattern.underlying))
.void
.guarantee(state.get.flatMap { st =>
st.patterns
.get(pattern.underlying)
.fold(Applicative[F].unit)(_.publish1(none[RedisPatternEvent[K, V]]).void) *> state
.update(s => s.copy(patterns = s.patterns - pattern.underlying))
})
}
Loading