Skip to content

Commit

Permalink
prepareAndCache methods to get F[StatementId]
Browse files Browse the repository at this point in the history
  • Loading branch information
vbergeron-ledger committed Nov 28, 2022
1 parent 423b4c9 commit 9c6d435
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 51 deletions.
57 changes: 30 additions & 27 deletions modules/core/shared/src/main/scala/Session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,20 @@ trait Session[F[_]] {
* @group Commands
*/
def prepare[A](command: Command[A]): Resource[F, PreparedCommand[F, A]]

/**
* Prepares then cache a query, yielding a `PreparedQuery` which can be executed multiple
* times with different arguments.
* @group Queries
*/
def prepareAndCache[A, B](query: Query[A, B]): F[PreparedQuery[F, A, B]]

/**
* Prepares then cache an `INSERT`, `UPDATE`, or `DELETE` command that returns no rows. The resulting
* `PreparedCommand` can be executed multiple times with different arguments.
* @group Commands
*/
def prepareAndCache[A](command: Command[A]): F[PreparedCommand[F, A]]

/**
* Transform a `Command` into a `Pipe` from inputs to `Completion`s.
Expand Down Expand Up @@ -285,22 +299,14 @@ object Session {
socketOptions: List[SocketOption] = Session.DefaultSocketOptions,
commandCache: Int = 1024,
queryCache: Int = 1024,
<<<<<<< HEAD
parseCache: Int = 1024,
readTimeout: Duration = Duration.Inf,
): Resource[F, Resource[F, Session[F]]] = {

def session(socketGroup: SocketGroup[F], sslOp: Option[SSLNegotiation.Options[F]], cache: Describe.Cache[F]): Resource[F, Session[F]] =
fromSocketGroup[F](socketGroup, host, port, user, database, password, debug, strategy, socketOptions, sslOp, parameters, cache, readTimeout)
=======
parseCache: Int = 1024
): Resource[F, Resource[F, Session[F]]] = {

def session(socketGroup: SocketGroup[F], sslOp: Option[SSLNegotiation.Options[F]], cache: Describe.Cache[F]): Resource[F, Session[F]] =
for {
pc <- Resource.eval(Parse.Cache.empty[F](parseCache))
s <- fromSocketGroup[F](socketGroup, host, port, user, database, password, debug, strategy, socketOptions, sslOp, parameters, cache, pc)
s <- fromSocketGroup[F](socketGroup, host, port, user, database, password, debug, strategy, socketOptions, sslOp, parameters, cache, pc, readTimeout)
} yield s
>>>>>>> Add parse step caching + boilerplate

val logger: String => F[Unit] = s => Console[F].println(s"TLS: $s")

Expand Down Expand Up @@ -330,11 +336,8 @@ object Session {
parameters: Map[String, String] = Session.DefaultConnectionParameters,
commandCache: Int = 1024,
queryCache: Int = 1024,
<<<<<<< HEAD
parseCache: Int = 1024,
readTimeout: Duration = Duration.Inf,
=======
parseCache: Int = 1024
>>>>>>> Add parse step caching + boilerplate
): Resource[F, Session[F]] =
pooled(
host = host,
Expand All @@ -349,11 +352,8 @@ object Session {
parameters = parameters,
commandCache = commandCache,
queryCache = queryCache,
<<<<<<< HEAD
parseCache = parseCache,
readTimeout = readTimeout
=======
parseCache = parseCache
>>>>>>> Add parse step caching + boilerplate
).flatten

def fromSocketGroup[F[_]: Temporal: Trace: Console](
Expand All @@ -369,19 +369,12 @@ object Session {
sslOptions: Option[SSLNegotiation.Options[F]],
parameters: Map[String, String],
describeCache: Describe.Cache[F],
<<<<<<< HEAD
parseCache: Parse.Cache[F],
readTimeout: Duration = Duration.Inf,
): Resource[F, Session[F]] =
for {
namer <- Resource.eval(Namer[F])
proto <- Protocol[F](host, port, debug, namer, socketGroup, socketOptions, sslOptions, describeCache, readTimeout)
=======
parseCache: Parse.Cache[F]
): Resource[F, Session[F]] =
for {
namer <- Resource.eval(Namer[F])
proto <- Protocol[F](host, port, debug, namer, socketGroup, socketOptions, sslOptions, describeCache, parseCache)
>>>>>>> Add parse step caching + boilerplate
proto <- Protocol[F](host, port, debug, namer, socketGroup, socketOptions, sslOptions, describeCache, parseCache, readTimeout)
_ <- Resource.eval(proto.startup(user, database, password, parameters))
sess <- Resource.make(fromProtocol(proto, namer, strategy))(_ => proto.cleanup)
} yield sess
Expand Down Expand Up @@ -448,6 +441,12 @@ object Session {

override def prepare[A](command: Command[A]): Resource[F, PreparedCommand[F, A]] =
proto.prepare(command, typer).map(PreparedCommand.fromProto(_))

override def prepareAndCache[A, B](query: Query[A, B]): F[PreparedQuery[F, A, B]] =
proto.prepareAndCache(query, typer).map(PreparedQuery.fromProto(_))

override def prepareAndCache[A](command: Command[A]): F[PreparedCommand[F, A]] =
proto.prepareAndCache(command, typer).map(PreparedCommand.fromProto(_))

override def transaction[A]: Resource[F, Transaction[F]] =
Transaction.fromSession(this, namer, none, none)
Expand Down Expand Up @@ -506,6 +505,10 @@ object Session {
override def prepare[A, B](query: Query[A,B]): Resource[G,PreparedQuery[G,A,B]] = outer.prepare(query).mapK(fk).map(_.mapK(fk))

override def prepare[A](command: Command[A]): Resource[G,PreparedCommand[G,A]] = outer.prepare(command).mapK(fk).map(_.mapK(fk))

override def prepareAndCache[A, B](query: Query[A,B]): G[PreparedQuery[G,A,B]] = fk(outer.prepareAndCache(query)).map(_.mapK(fk))

override def prepareAndCache[A](command: Command[A]): G[PreparedCommand[G,A]] = fk(outer.prepareAndCache(command)).map(_.mapK(fk))

override def transaction[A]: Resource[G,Transaction[G]] = outer.transaction[A].mapK(fk).map(_.mapK(fk))

Expand Down
18 changes: 18 additions & 0 deletions modules/core/shared/src/main/scala/net/Protocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ trait Protocol[F[_]] {
* which will be closed after use.
*/
def prepare[A, B](query: Query[A, B], ty: Typer): Resource[F, Protocol.PreparedQuery[F, A, B]]

/**
* Prepare a command (a statement that produces no rows), yielding a `Protocol.PreparedCommand`
* which will cached per session and closed on session close.
*/
def prepareAndCache[A](command: Command[A], ty: Typer): F[Protocol.PreparedCommand[F, A]]

/**
* Prepare a query (a statement that produces rows), yielding a `Protocol.PreparedCommand` which
* which will cached per session and closed on session close.
*/
def prepareAndCache[A, B](query: Query[A, B], ty: Typer): F[Protocol.PreparedQuery[F, A, B]]

/**
* Execute a non-parameterized command (a statement that produces no rows), yielding a
Expand Down Expand Up @@ -240,6 +252,12 @@ object Protocol {

override def prepare[A, B](query: Query[A, B], ty: Typer): Resource[F, PreparedQuery[F, A, B]] =
protocol.Prepare[F](describeCache, parseCache).apply(query, ty)

override def prepareAndCache[A](command: Command[A], ty: Typer): F[PreparedCommand[F, A]] =
protocol.Prepare[F](describeCache, parseCache).prepareAndCache(command, ty)

override def prepareAndCache[A, B](query: Query[A, B], ty: Typer): F[PreparedQuery[F, A, B]] =
protocol.Prepare[F](describeCache, parseCache).prepareAndCache(query, ty)

override def execute(command: Command[Void]): F[Completion] =
protocol.Query[F].apply(command)
Expand Down
45 changes: 22 additions & 23 deletions modules/core/shared/src/main/scala/net/protocol/Parse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import natchez.Trace
import cats.data.OptionT

trait Parse[F[_]] {
def apply[A](statement: Statement[A], ty: Typer): Resource[F, StatementId]
def prepareAndCache[A](statement: Statement[A], ty: Typer): F[StatementId]
def apply[A](statement: Statement[A], ty: Typer): Resource[F, StatementId] =
Resource.eval(prepareAndCache(statement, ty))
}

object Parse {
Expand All @@ -30,37 +32,34 @@ object Parse {
implicit ev: MonadError[F, Throwable]
): Parse[F] =
new Parse[F] {

override def apply[A](statement: Statement[A], ty: Typer): Resource[F, StatementId] =
override def prepareAndCache[A](statement: Statement[A], ty: Typer): F[StatementId] =
statement.encoder.oids(ty) match {

case Right(os) if os.length > Short.MaxValue =>
Resource.eval(TooManyParametersException(statement).raiseError[F, StatementId])
TooManyParametersException(statement).raiseError[F, StatementId]

case Right(os) =>
Resource.eval {
OptionT(cache.value.get(statement)).getOrElseF {
exchange("parse") {
for {
id <- nextName("statement").map(StatementId(_))
_ <- Trace[F].put(
"statement-name" -> id.value,
"statement-sql" -> statement.sql,
"statement-parameter-types" -> os.map(n => ty.typeForOid(n, -1).getOrElse(n)).mkString("[", ", ", "]")
)
_ <- send(ParseMessage(id.value, statement.sql, os))
_ <- send(Flush)
_ <- flatExpect {
case ParseComplete => ().pure[F]
case ErrorResponse(info) => syncAndFail(statement, info)
}
} yield id
}
OptionT(cache.value.get(statement)).getOrElseF {
exchange("parse") {
for {
id <- nextName("statement").map(StatementId(_))
_ <- Trace[F].put(
"statement-name" -> id.value,
"statement-sql" -> statement.sql,
"statement-parameter-types" -> os.map(n => ty.typeForOid(n, -1).getOrElse(n)).mkString("[", ", ", "]")
)
_ <- send(ParseMessage(id.value, statement.sql, os))
_ <- send(Flush)
_ <- flatExpect {
case ParseComplete => ().pure[F]
case ErrorResponse(info) => syncAndFail(statement, info)
}
} yield id
}
}

case Left(err) =>
Resource.eval(UnknownTypeException(statement, err, ty.strategy).raiseError[F, StatementId])
UnknownTypeException(statement, err, ty.strategy).raiseError[F, StatementId]

}

Expand Down
33 changes: 33 additions & 0 deletions modules/core/shared/src/main/scala/net/protocol/Prepare.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package skunk.net.protocol

import cats.effect.Resource
import cats.MonadError
import cats.syntax.flatMap._
import cats.syntax.functor._
import skunk.~
import skunk.data.Completion
import skunk.net.MessageSocket
Expand All @@ -17,6 +19,9 @@ import natchez.Trace
trait Prepare[F[_]] {
def apply[A](command: skunk.Command[A], ty: Typer): Resource[F, PreparedCommand[F, A]]
def apply[A, B](query: skunk.Query[A, B], ty: Typer): Resource[F, PreparedQuery[F, A, B]]

def prepareAndCache[A](command: skunk.Command[A], ty: Typer): F[PreparedCommand[F, A]]
def prepareAndCache[A, B](query: skunk.Query[A, B], ty: Typer): F[PreparedQuery[F, A, B]]
}

object Prepare {
Expand Down Expand Up @@ -53,6 +58,34 @@ object Prepare {
}
}
}

override def prepareAndCache[A](command: skunk.Command[A], ty: Typer): F[PreparedCommand[F, A]] =
for {
id <- Parse[F](parseCache).prepareAndCache(command, ty)
_ <- Describe[F](describeCache).apply(command, id, ty)
} yield new PreparedCommand[F, A](id, command) { pc =>
def bind(args: A, origin: Origin): Resource[F, CommandPortal[F, A]] =
Bind[F].apply(this, args, origin).map {
new CommandPortal[F, A](_, pc, args, origin) {
val execute: F[Completion] =
Execute[F].apply(this)
}
}
}

override def prepareAndCache[A, B](query: skunk.Query[A, B], ty: Typer): F[PreparedQuery[F, A, B]] =
for {
id <- Parse[F](parseCache).prepareAndCache(query, ty)
rd <- Describe[F](describeCache).apply(query, id, ty)
} yield new PreparedQuery[F, A, B](id, query, rd) { pq =>
def bind(args: A, origin: Origin): Resource[F, QueryPortal[F, A, B]] =
Bind[F].apply(this, args, origin).map {
new QueryPortal[F, A, B](_, pq, args, origin) {
def execute(maxRows: Int): F[List[B] ~ Boolean] =
Execute[F].apply(this, maxRows, ty)
}
}
}

}

Expand Down
4 changes: 3 additions & 1 deletion modules/tests/shared/src/test/scala/simulation/SimTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import skunk.util.Namer
import skunk.util.Origin
import skunk.util.Typer
import skunk.net.protocol.Describe
import skunk.net.protocol.Parse

trait SimTest extends FTest with SimMessageSocket.DSL {

Expand All @@ -42,7 +43,8 @@ trait SimTest extends FTest with SimMessageSocket.DSL {
bms <- SimMessageSocket(sim).map(new SimulatedBufferedMessageSocket(_))
nam <- Namer[IO]
dc <- Describe.Cache.empty[IO](1024, 1024)
pro <- Protocol.fromMessageSocket(bms, nam, dc)
pc <- Parse.Cache.empty[IO](1024)
pro <- Protocol.fromMessageSocket(bms, nam, dc, pc)
_ <- pro.startup(user, database, password, Session.DefaultConnectionParameters)
ses <- Session.fromProtocol(pro, nam, Typer.Strategy.BuiltinsOnly)
} yield ses
Expand Down

0 comments on commit 9c6d435

Please sign in to comment.