Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip zio module changes #82

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ addCommandAlias("fmt", "scalafmtAll")
val testcontainersVersion = "0.41.4"
val circeVersion = "0.14.10"

lazy val root = project
lazy val magnumRoot = project
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the root name isn't necessary for sbt to understand that it's your root module 🤔

.in(file("."))
.aggregate(magnum, magnumPg, magnumZio)
.aggregate(magnum, magnumPg, magnumZio, magnumZioStreams)

lazy val magnum = project
.in(file("magnum"))
Expand Down Expand Up @@ -90,6 +90,17 @@ lazy val magnumZio = project
Test / fork := true,
publish / skip := false,
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % "2.1.12" % Provided
"dev.zio" %% "zio" % "2.1.13" % Provided
)
)

lazy val magnumZioStreams = project
.in(file("magnum-zio-streams"))
.dependsOn(magnum)
.settings(
Test / fork := true,
publish / skip := false,
libraryDependencies ++= Seq(
"dev.zio" %% "zio-streams" % "2.1.13" % Provided
)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.augustnagro.magnum.magzio.streams

import com.augustnagro.magnum.Query
import zio.stream.ZStream

extension [E](query: Query[e])
def stream(fetchSize: Int = 0): ZStream

def stream(): Unit = ZStream.acquireReleaseWith(acquire = )
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.augustnagro.magnum.magzio

import com.augustnagro.magnum.{Transactor as _, *}

import scala.util.control.NoStackTrace

private class Break[E](val value: E) extends NoStackTrace
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private class Break[E](val value: E) extends NoStackTrace
private final class Break[E](val value: E) extends NoStackTrace

Copy link
Contributor

@guizmaii guizmaii Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, why not use ControlThrowable here?


/** Abort this SQL Transaction or Connection and become a failed ZIO */
def fail[E](error: E)(using DbCon): Nothing = throw Break(error)
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package com.augustnagro.magnum.magzio

import com.augustnagro.magnum.{DbCon, DbTx, SqlLogger}
import zio.{IO, Semaphore, Task, Trace, UIO, ZIO}

import java.sql.Connection
import javax.sql.DataSource
import scala.reflect.ClassTag
import scala.util.control.{ControlThrowable, NonFatal}

/** Transactor lets you customize a transaction or connection's behavior. It is
* a parameter to the [[transact]] and [[connect]] methods.
*/
class Transactor private (
/** Datasource to be used */
private val dataSource: DataSource,
/** Logging configuration */
private val sqlLogger: SqlLogger,
/** Customize the underlying JDBC Connections */
private val connectionConfig: Connection => Unit,
/** Number of threads in your connection pool. This helps magzio be more
* memory efficient by limiting the number of blocking pool threads used.
* Not needed if using the ZIO virtual-thread based blocking executor
*/
private val semaphore: Option[Semaphore]
):

def withSqlLogger(sqlLogger: SqlLogger): Transactor =
new Transactor(
dataSource,
sqlLogger,
connectionConfig,
semaphore
)

def withConnectionConfig(connectionConfig: Connection => Unit): Transactor =
new Transactor(
dataSource,
sqlLogger,
connectionConfig,
semaphore
)

/** Executes a given query on a given Transactor, returning UIO.
*
* Re-implementation for ZIO of
* [[com.augustnagro.magnum.connect(dataSource: DataSource)]]
*
* Usage:
* {{{
* import com.augustnagro.magnum.magzio.*
*
* connectUIO(transactor):
* repo.findById(id)
* }}}
*/
def connectUIO[A](q: DbCon ?=> A)(using Trace): UIO[A] = connectTask(q).orDie

/** Executes a given query on a given Transactor, returning IO.
*
* Re-implementation for ZIO of
* [[com.augustnagro.magnum.connect(dataSource: DataSource)]]
*
* Usage:
* {{{
* import com.augustnagro.magnum.magzio.*
*
* connectIO(transactor):
* repo.findById(id)
* }}}
*/
def connectIO[E: ClassTag, A](q: DbCon ?=> A)(using Trace): IO[E, A] =
Copy link
Contributor

@guizmaii guizmaii Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ClassTag might become annoying for the user at the use-site.

connectTask(q).catchAll:
case break: Break[?] => ZIO.fail(break.value.asInstanceOf[E])
case error: E => ZIO.fail(error)
case NonFatal(defect) => ZIO.die(defect)

private def connectTask[A](q: DbCon ?=> A)(using Trace): Task[A] =
val task = ZIO.blocking(
ZIO.acquireReleaseWith(
acquire = ZIO.attempt(dataSource.getConnection())
)(release = conn => ZIO.attempt(conn.close()).orDie): cn =>
ZIO.attempt:
connectionConfig(cn)
q(using DbCon(cn, sqlLogger))
)
semaphore match
case Some(sem) => sem.withPermit(task)
case None => task

/** Executes a given transaction on a given Transactor
*
* Re-implementation for ZIO of
* [[com.augustnagro.magnum.transact(transactor: Transactor)]]
*
* Usage:
* {{{
* import com.augustnagro.magnum.magzio.*
*
* transact(transactor) { tx ?=> repo.insertReturning(creator) }
* }}}
*/
def transactUIO[A](q: DbTx ?=> A)(using Trace): UIO[A] =
transactTask(q).orDie

def transactIO[E: ClassTag, A](q: DbTx ?=> A)(using Trace): IO[E, A] =
transactTask(q).catchAll:
case break: Break[?] => ZIO.fail(break.value.asInstanceOf[E])
case error: E => ZIO.fail(error)
case NonFatal(defect) => ZIO.die(defect)

private def transactTask[A](q: DbTx ?=> A)(using Trace): Task[A] =
val task = ZIO.blocking {
ZIO.acquireReleaseWith(
acquire = ZIO.attempt(dataSource.getConnection())
)(release =
conn => if (conn ne null) ZIO.attempt(conn.close()).orDie else ZIO.unit
) { cn =>
ZIO.uninterruptible {
ZIO.attempt {
connectionConfig(cn)
cn.setAutoCommit(false)
try {
val res = q(using DbTx(cn, sqlLogger))
cn.commit()
res
} catch {
case NonFatal(t) =>
cn.rollback()
throw t
}
}
}
}
}
semaphore match
case Some(sem) => sem.withPermit(task)
case None => task
end transactTask
end Transactor

object Transactor:
def apply(
dataSource: DataSource,
sqlLogger: SqlLogger,
connectionConfig: Connection => Unit,
maxBlockingThreads: Option[Int]
): UIO[Transactor] =
ZIO
.fromOption(maxBlockingThreads)
.flatMap(threads => Semaphore.make(threads))
.unsome
.map(semaphoreOpt =>
new Transactor(
dataSource,
sqlLogger,
connectionConfig,
semaphoreOpt
)
)

def apply(
dataSource: DataSource,
sqlLogger: SqlLogger,
connectionConfig: Connection => Unit
): UIO[Transactor] =
apply(
dataSource,
sqlLogger,
connectionConfig,
None
)

def apply(dataSource: DataSource, sqlLogger: SqlLogger): UIO[Transactor] =
apply(
dataSource,
sqlLogger,
_ => (),
None
)

def apply(dataSource: DataSource): UIO[Transactor] =
apply(
dataSource,
SqlLogger.Default,
_ => (),
None
)

def apply(
dataSource: DataSource,
connectionConfig: Connection => Unit
): UIO[Transactor] =
apply(
dataSource,
SqlLogger.Default,
connectionConfig,
None
)
end Transactor
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.augustnagro.magnum.magzio

export com.augustnagro.magnum.{
sql,
batchUpdate,
DbCon,
DbTx,
DbType,
Id,
ImmutableRepo,
NullOrder,
Repo,
SeekDir,
SortOrder,
Spec,
SqlName,
SqlNameMapper,
Table,
TableInfo,
DbCodec
}
Loading
Loading