Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

Commit

Permalink
typelevelGH-765 implement Channel.listenR
Browse files Browse the repository at this point in the history
  • Loading branch information
mbaechler committed Dec 20, 2022
1 parent df719d6 commit b1ff248
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 10 deletions.
35 changes: 31 additions & 4 deletions modules/core/shared/src/main/scala/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package skunk

import cats.{ Contravariant, Functor, ~> }
import cats.arrow.Profunctor
import cats.effect.Resource
import cats.effect.{ Resource, MonadCancel }
import cats.effect.kernel.MonadCancelThrow
import cats.syntax.all._
import fs2.{ Pipe, Stream }
Expand Down Expand Up @@ -39,6 +39,21 @@ trait Channel[F[_], A, B] extends Pipe[F, A, Unit] { outer =>
*/
def listen(maxQueued: Int): Stream[F, Notification[B]]

/**
* Construct a `Resource[F, Stream]` that subscribes to notifications for this Channel that emits any notifications
* that arrive (this can happen at any time) once resource is acquired and unsubscribes when the resource is released.
* Note that once such a stream is started it is important to consume all notifications as quickly
* as possible to avoid blocking message processing for other operations on the `Session`
* (although typically a dedicated `Session` will receive channel notifications so this won't be
* an issue).
*
* @param maxQueued the maximum number of notifications to hold in a queue before [semantically]
* blocking message exchange on the controlling `Session`.
* @group Notifications
* @see [[https://www.postgresql.org/docs/10/static/sql-listen.html LISTEN]]
*/
def listenR(maxQueued: Int): Resource[F, Stream[F, Notification[B]]]

/** This `Channel` acts as an fs2 `Pipe`. */
def apply(sa: Stream[F, A]): Stream[F,Unit] =
sa.evalMap(notify)
Expand Down Expand Up @@ -69,22 +84,26 @@ trait Channel[F[_], A, B] extends Pipe[F, A, Unit] { outer =>
/**
* Contramap inputs from a new type `C` and map outputs to a new type `D`, yielding a
* `Channel[F, C, D]`.
*
* @group Transformations
*/
def dimap[C, D](f: C => A)(g: B => D): Channel[F, C, D] =
new Channel[F, C, D] {
def listen(maxQueued: Int): Stream[F, Notification[D]] = outer.listen(maxQueued).map(_.map(g))
def notify(message: C): F[Unit] = outer.notify(f(message))
def listenR(maxQueued: Int): Resource[F, Stream[F, Notification[D]]] = outer.listenR(maxQueued).map(_.map(_.map(g)))
}

/**
* Transform this `Channel` by a given `FunctionK`.
*
* @group Transformations
*/
def mapK[G[_]](fk: F ~> G): Channel[G, A, B] =
def mapK[G[_]: MonadCancelThrow](fk: F ~> G)(implicit f: MonadCancel[F, _]): Channel[G, A, B] =
new Channel[G, A, B] {
def listen(maxQueued: Int): Stream[G, Notification[B]] = outer.listen(maxQueued).translate(fk)
def notify(message: A): G[Unit] = fk(outer.notify(message))
def listenR(maxQueued: Int): Resource[G, Stream[G, Notification[B]]] = outer.listenR(maxQueued).map(_.translate(fk)).mapK(fk)
}

}
Expand All @@ -109,10 +128,18 @@ object Channel {
def listen(maxQueued: Int): Stream[F, Notification[String]] =
for {
_ <- Stream.resource(Resource.make(listen)(_ => unlisten))
n <- proto.notifications(maxQueued).filter(_.channel === name)
s <- Stream.resource(proto.notifications(maxQueued))
n <- s.filter(_.channel === name)
} yield n

def notify(message: String): F[Unit] =

override def listenR(maxQueued: Int): Resource[F, Stream[F, Notification[String]]] =
Resource.make(listen)(_ => unlisten)
.flatMap(_ => proto.notifications(maxQueued))
.map(stream => stream.filter(_.channel === name))


def notify(message: String): F[Unit] =
// TODO: escape the message
proto.execute(Command(s"NOTIFY ${name.value}, '$message'", Origin.unknown, Void.codec)).void

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ trait BufferedMessageSocket[F[_]] extends MessageSocket[F] {
* blocking message exchange on the controlling `Session`.
* @see [[https://www.postgresql.org/docs/10/static/sql-listen.html LISTEN]]
*/
def notifications(maxQueued: Int): Stream[F, Notification[String]]
def notifications(maxQueued: Int): Resource[F, Stream[F, Notification[String]]]


// TODO: this is an implementation leakage, fold into the factory below
Expand Down Expand Up @@ -163,8 +163,8 @@ object BufferedMessageSocket {
override def parameters: SignallingRef[F, Map[String, String]] = paSig
override def backendKeyData: Deferred[F, BackendKeyData] = bkSig

override def notifications(maxQueued: Int): Stream[F, Notification[String]] =
noTop.subscribe(maxQueued)
override def notifications(maxQueued: Int): Resource[F, Stream[F, Notification[String]]] =
noTop.subscribeAwait(maxQueued)

override protected def terminate: F[Unit] =
fib.cancel *> // stop processing incoming messages
Expand Down
4 changes: 2 additions & 2 deletions modules/core/shared/src/main/scala/net/Protocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ trait Protocol[F[_]] {
* @see [[https://www.postgresql.org/docs/10/static/sql-listen.html LISTEN]]
* @see [[https://www.postgresql.org/docs/10/static/sql-notify.html NOTIFY]]
*/
def notifications(maxQueued: Int): Stream[F, Notification[String]]
def notifications(maxQueued: Int): Resource[F, Stream[F, Notification[String]]]

/**
* Signal representing the current state of all Postgres configuration variables announced to this
Expand Down Expand Up @@ -228,7 +228,7 @@ object Protocol {
implicit val na: Namer[F] = nam
implicit val ExchangeF: protocol.Exchange[F] = ex

override def notifications(maxQueued: Int): Stream[F, Notification[String]] =
override def notifications(maxQueued: Int): Resource[F, Stream[F, Notification[String]]] =
bms.notifications(maxQueued)

override def parameters: Signal[F, Map[String, String]] =
Expand Down
18 changes: 18 additions & 0 deletions modules/tests/shared/src/test/scala/ChannelTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,22 @@ class ChannelTest extends SkunkTest {
} yield "ok"
}

sessionTest("channel with resource (coverage)") { s =>
val data = List("foo", "bar", "baz")
val ch0 = s.channel(id"channel_test")
val ch1 = ch0.mapK(FunctionK.id)
val ch2 = Functor[Channel[IO, String, *]].map(ch1)(identity[String])
val ch3 = Contravariant[Channel[IO, *, String]].contramap(ch2)(identity[String])
val ch = Profunctor[Channel[IO, *, *]].dimap(ch3)(identity[String])(identity[String])

ch.listenR(42).use { r =>
for {
_ <- data.traverse_(ch.notify)
d <- r.map(_.value).takeThrough(_ != data.last).compile.toList
_ <- assert(s"channel data $d $data", data == d)
} yield "ok"
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ trait SimTest extends FTest with SimMessageSocket.DSL {
def transactionStatus: Signal[IO,TransactionStatus] = ???
def parameters: Signal[IO,Map[String,String]] = ???
def backendKeyData: Deferred[IO,BackendKeyData] = ???
def notifications(maxQueued: Int): fs2.Stream[IO, Notification[String]] = ???
def notifications(maxQueued: Int): Resource[IO, fs2.Stream[IO, Notification[String]]] = ???
def terminate: IO[Unit] = ???
}

Expand Down

0 comments on commit b1ff248

Please sign in to comment.