Skip to content

Commit

Permalink
Merge pull request #155 from Grupo-Abraxas/chore/migrate-to-cs3
Browse files Browse the repository at this point in the history
Chore/migrate to cs3
  • Loading branch information
krabbit93 authored May 9, 2022
2 parents daeb4be + e941fc9 commit 5df9122
Show file tree
Hide file tree
Showing 15 changed files with 364 additions and 571 deletions.
12 changes: 6 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ lazy val cypher = (project in file("cypher"))
Dependencies.`cats-free`,
Dependencies.`cats-effect`,
Dependencies.`fs2-core`,
Dependencies.`fs2-re`,
Dependencies.`neo4j-driver`,
Dependencies.Test.scalatest
Dependencies.Test.scalatest,
Dependencies.Test.`slf4j-simple`
),
console / initialCommands :=
"""
Expand All @@ -72,12 +74,9 @@ lazy val opentracingNeo4j = (project in file("opentracing-neo4j"))
docSettings,
name := "slothql-opentracing-neo4j",
Compile / scalacOptions ++= Seq("-Ymacro-annotations", "-Wunused:imports"),
libraryDependencies ++= Seq(
Dependencies.`opentracing-effect`,
Dependencies.`opentracing-fs2`
)
libraryDependencies ++= Seq(Dependencies.`natchez`, Dependencies.Test.`natchez-jaeger`)
)
.dependsOn(cypher)
.dependsOn(cypher % "compile -> compile; test -> test")

// // // Scaladoc // // //

Expand Down Expand Up @@ -124,3 +123,4 @@ inThisBuild(
semanticdbVersion := scalafixSemanticdb.revision
)
)
addCommandAlias("ff", "Test/scalafix;Test/scalafmt;scalafix;scalafmt")
6 changes: 6 additions & 0 deletions cicd/docker-compose.deps.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,9 @@ services:
retries: 10
volumes:
- ../neo4j/conf:/conf
jaegertracing:
image: jaegertracing/all-in-one:1.17
ports:
- ${JAEGERTRACING_COMPACT_PORT-6833}:6831/udp
- ${JAEGERTRACING_BINARY_PORT-6834}:6832/udp
- ${JAEGERTRACING_FRONT_PORT-16688}:16686
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.arkondata.slothql

import scala.concurrent.duration.DurationInt

import cats.syntax.monadError._
import org.scalatest.EitherValues
import org.scalatest.matchers.should.Matchers
Expand All @@ -20,7 +22,7 @@ class ApocExceptionTest extends AnyWordSpec with Matchers with Neo4jUsingTest wi
val message = "Test Assertion Failed"
val query = APOC.assert(lit(false), lit(message))(lit(true))
val io = tx
.runRead(tx.query(query))
.runRead(tx.query(query), 10.seconds)
.compile
.drain
.adaptError(ApocException.adapt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import cats.syntax.apply._
import cats.syntax.foldable._
import cats.syntax.functor._
import cats.syntax.semigroupk._
import cats.{ catsInstancesForId, Applicative, Eval, Foldable, Functor, Monad, MonoidK, Semigroupal, StackSafeMonad }
import cats.{ Applicative, Eval, Foldable, Functor, Monad, MonoidK, Semigroupal, StackSafeMonad }

sealed trait CypherStatement {
val template: String
Expand Down
Original file line number Diff line number Diff line change
@@ -1,157 +1,142 @@
package com.arkondata.slothql.neo4j

import scala.annotation.implicitNotFound
import scala.concurrent.duration.{ DurationInt, FiniteDuration }
import scala.jdk.CollectionConverters._
import scala.language.existentials
import scala.jdk.DurationConverters.ScalaDurationOps

import cats.arrow.{ Arrow, FunctionK }
import cats.data.StateT
import cats.effect.concurrent.MVar
import cats.effect.syntax.bracket._
import cats.effect.syntax.effect._
import cats.effect.{ Blocker, Concurrent, ConcurrentEffect, ContextShift, ExitCase, Resource, Sync }
import cats.effect.Resource
import cats.effect.kernel.{ Async, Deferred }
import cats.effect.std.Dispatcher
import cats.instances.function._
import cats.syntax.applicative._
import cats.syntax.apply._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.monadError._
import cats.{ ~>, Applicative, Monad, StackSafeMonad }
import cats.{ ~>, Applicative, Monad }
import fs2.interop.reactivestreams._
import org.neo4j.driver.internal.types.InternalTypeSystem
import org.neo4j.driver.reactive.{ RxResult, RxSession, RxTransaction, RxTransactionWork }
import org.neo4j.driver.types.{ Node => NNode, Path => NPath, Relationship => NRelationship, Type }
import org.neo4j.driver.{ Driver, Record, Result, Session, Transaction, TransactionWork, Value }
import org.neo4j.driver.{ Driver, Record, TransactionConfig, Value }
import org.reactivestreams.Publisher
import shapeless._

import com.arkondata.slothql.cypher
import com.arkondata.slothql.cypher.CypherTransactor._
import com.arkondata.slothql.cypher.{ CypherStatement, CypherTransactor }
import com.arkondata.slothql.neo4j.util.{ fs2StreamTxCMonad, javaStreamToFs2 }

class Neo4jCypherTransactor[F[_]](protected val session: F[Session])(
implicit protected val ce: ConcurrentEffect[F],
implicit protected val cs: ContextShift[F]
import com.arkondata.slothql.neo4j.util.fs2StreamTxCMonad

class Neo4jCypherTransactor[F[_]](
driver: Driver,
completion: Deferred[F, Unit],
val defaultTimeout: FiniteDuration,
chunkSize: Int
)(implicit
dispatcher: Dispatcher[F],
F: Async[F]
) extends Neo4jCypherTransactor.Syntax[F]
with CypherTransactor[F, Record, fs2.Stream[F, *]] {
import ce.delay

override type Tx[R] = CypherTransactor.Tx[F, Record, fs2.Stream[F, *], R]

type Out[R] = fs2.Stream[F, R]
type Op[R] = Operation[Record, Out, R]

object readers extends Neo4jCypherTransactor.Readers

def runRead[A](tx: Tx[A]): Out[A] = run(tx, _.readTransaction)
def runWrite[A](tx: Tx[A]): Out[A] = run(tx, _.writeTransaction)

// // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // //

protected def run[A](tx: Tx[A], txWork0: Session => (TransactionWork[B] => B) forSome { type B }): Out[A] = {
val txWork = txWork0.asInstanceOf[Session => TransactionWork[Unit] => Unit]
def outT[R](fr: F[R]): OutT[R] = _ => fs2.Stream.eval(fr)
val r: Resource[F, Out[A]] = for {
session <- sessionResource
blocker <- blockerResource
exec = tx.mapK(FunctionK.lift(outT))
.foldMap(λ[Op ~> OutT](runOperation(blocker, _)))
tx <- transactionResource(txWork(session))
} yield exec(tx)
fs2.Stream.resource(r).flatten
}

protected def blockerResource: Resource[F, Blocker] = Blocker[F]

protected def sessionResource: Resource[F, Session] = Resource.liftF(session)
// TODO: `Resource.make(session)(s => delay(s.close()))` fails; maybe fs2.Stream.resourceWeak should be used at `run`?

// // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // //

protected def transactionResource(run: TransactionWork[Unit] => Unit): Resource[F, Transaction] =
for {
txVar <- transactionMVarResource
cLock <- closeLockMVarResource
eLock <- execLockMVarResource(cLock)
runTx = delay(run { tx =>
(
for {
_ <- txVar.put(tx)
b <- eLock.read
_ <- if (b) commitTransaction(tx) else rollbackTransaction(tx)
} yield ()
).guarantee(closeTransaction(tx))
.guaranteeCase {
case ExitCase.Completed => cLock.put(None)
case ExitCase.Error(e) => cLock.put(Some(e))
case ExitCase.Canceled => cLock.put(Some(new Exception("Canceled")))
}
.toIO
.unsafeRunSync()
})
_ <- backgroundWorkResource(runTx)
tx <- readTxAsResource(txVar)
} yield tx

protected def transactionMVarResource: Resource[F, MVar[F, Transaction]] = Resource liftF MVar.empty[F, Transaction]

protected def execLockMVarResource(lock1: MVar[F, Option[Throwable]]): Resource[F, MVar[F, Boolean]] = {
def waitClose = lock1.read.map(_.toLeft(())).rethrow
Resource.makeCase(MVar.empty[F, Boolean]) {
case (v, ExitCase.Completed) => v.put(true) >> waitClose
case (v, _) => v.put(false) >> waitClose
}
override type Out[R] = fs2.Stream[F, R]
override type Tx[R] = CypherTransactor.Tx[F, Record, Out, R]
final type TxS[R] = CypherTransactor.Tx[Out, Record, Out, R]
final type OpS[R] = Operation[Record, Out, R]

protected def unwind[A](out: Out[A]): Out[A] = out

protected def query[A](
transactor: RxTransaction,
query: CypherStatement.Prepared[A],
read: CypherTransactor.Reader[Record, A]
): Out[A] = queryWithSummary(transactor, query, read)._1

protected def queryWithSummary[A](
transactor: RxTransaction,
query: CypherStatement.Prepared[A],
read: CypherTransactor.Reader[Record, A]
): (Out[A], RxResult) = {
val rx = transactor.run(query.template, query.params.asJava)

(
rx.records()
.toStreamBuffered(chunkSize)
.evalMap(r => F.delay(read(r))),
rx
)
}

protected def closeLockMVarResource: Resource[F, MVar[F, Option[Throwable]]] =
Resource.liftF(MVar.empty[F, Option[Throwable]])
protected def gather[U, A](runOp: OpS ~> Out, value: Operation[Record, Out, U], fn: Out[U] => A): Out[A] =
fs2.Stream.emit(fn(runOp(value)))

protected def commitTransaction(tx: Transaction): F[Unit] = delay(tx.commit())
protected def rollbackTransaction(tx: Transaction): F[Unit] = delay(tx.rollback())
protected def closeTransaction(tx: Transaction): F[Unit] = delay(tx.close())
final object Tx {

protected def backgroundWorkResource(work: F[Unit]): Resource[F, F[Unit]] = Concurrent[F].background(work)

protected def readTxAsResource(txVar: MVar[F, Transaction]): Resource[F, Transaction] = Resource.liftF(txVar.read)

// // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // //
// Tx[A] ~> TxS[A]
def streamK: F ~> Out = new ~>[F, Out] {
override def apply[A](fa: F[A]): Out[A] = fs2.Stream.eval(fa)
}

protected type OutT[R] = Transaction => Out[R]
def runOp(transactor: RxTransaction): OpS ~> Out =
new ~>[OpS, Out] {

implicit protected lazy val outTMonad: Monad[OutT] = new Monad[OutT] with StackSafeMonad[OutT] {
def pure[A](x: A): OutT[A] = _ => fs2.Stream.emit(x)
def flatMap[A, B](fa: OutT[A])(f: A => OutT[B]): OutT[B] = tx => fa(tx).flatMap(f andThen (_(tx)))
override def apply[A](fa: OpS[A]): Out[A] = fa match {
case CypherTransactor.Unwind(values) => unwind(values)
case CypherTransactor.Query(query0, read) => query(transactor, query0, read)
case CypherTransactor.Gather(value, fn) => gather(this, value, fn)
}
}
}

// // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // // //
override def runRead[R](tx: Tx[R]): fs2.Stream[F, R] =
apply(tx, defaultTimeout, write = false)

protected def runOperation[A](blocker: Blocker, op: Op[A]): OutT[A] = op match {
case Unwind(out) => runUnwind(out)
case Gather(op, f) => runGather(blocker, op, f)
case Query(q, r) => runQuery(blocker, q, r)
}
override def runWrite[R](tx: Tx[R]): fs2.Stream[F, R] =
apply(tx, defaultTimeout, write = true)

protected def runUnwind[A](out: Out[A])(tx: Transaction): Out[A] = out
@inline def runRead[R](tx: Tx[R], timeout: FiniteDuration): fs2.Stream[F, R] =
apply(tx, timeout, write = false)

protected def runGather[A, B](blocker: Blocker, op: Op[B], gather: Out[B] => A)(tx: Transaction): Out[A] =
fs2.Stream.emit(gather(runOperation(blocker, op)(tx)))
@inline def runWrite[R](tx: Tx[R], timeout: FiniteDuration): fs2.Stream[F, R] =
apply(tx, timeout, write = true)

protected def runQuery[A](blocker: Blocker, q: CypherStatement.Prepared[A], reader: Reader[A])(
tx: Transaction
): fs2.Stream[F, A] =
fs2.Stream force runQuery0(tx, q).fproduct { result =>
javaStreamToFs2(blocker, ce.delay(result.stream())).evalMap(readRecord(_, reader))
}.map((runningQuery[A] _).tupled)
def apply[R](tx: Tx[R], timeout: FiniteDuration, write: Boolean): fs2.Stream[F, R] =
fs2.Stream
.resource(sessionResource)
.flatMap(
sessionFn[R](_, write)(
rx => StreamUnicastPublisher(tx.mapK(Tx.streamK).foldMap(Tx.runOp(rx)), dispatcher),
TransactionConfig.builder().withTimeout(timeout.toJava).build()
).toStreamBuffered(chunkSize)
)

protected def runQuery0(tx: Transaction, q: CypherStatement.Prepared[_]): F[Result] =
delay(tx.run(q.template, q.params.asJava))
protected def sessionFn[A](
session: RxSession,
write: Boolean
): (RxTransactionWork[Publisher[A]], TransactionConfig) => Publisher[A] = (fn, cfg) =>
if (write) session.writeTransaction(fn, cfg) else session.readTransaction(fn, cfg)

protected def runningQuery[A](result: Result, stream: fs2.Stream[F, A]): fs2.Stream[F, A] = stream
protected def sessionResource: Resource[F, RxSession] = Resource.makeCase(
completion.tryGet.map(_.isDefined).flatMap(F.raiseError(new IllegalStateException("Driver is closed")).whenA) *>
F.delay(driver.rxSession())
)((s, _) => s.close().toStreamBuffered(chunkSize).compile.drain)

protected def readRecord[A](record: Record, reader: Reader[A]): F[A] = ce.catchNonFatal(reader(record))
}

object Neo4jCypherTransactor {
type Tx[F[_], R] = CypherTransactor.Tx[F, Record, fs2.Stream[F, *], R]

def apply[F[_]: ConcurrentEffect: ContextShift](driver: Driver): Neo4jCypherTransactor[F] =
new Neo4jCypherTransactor(Sync[F].delay(driver.session()))
def apply[F[_]: Async](
driver: Driver,
defaultTimeout: FiniteDuration = 10.seconds,
chunkSize: Int = 1024
)(implicit
dispatcher: Dispatcher[F]
): F[(Neo4jCypherTransactor[F], Deferred[F, Unit])] =
Deferred[F, Unit].map(defer => (new Neo4jCypherTransactor[F](driver, defer, defaultTimeout, chunkSize), defer))

def imapK[F[_], G[_]: Monad](f: F ~> G, g: G ~> F): Tx[F, *] ~> Tx[G, *] =
λ[Tx[F, *] ~> Tx[G, *]](
Expand Down Expand Up @@ -341,7 +326,7 @@ object Neo4jCypherTransactor {
// // // Syntax // // //
// // // // // // // // //

class Syntax[F[_]: Applicative](implicit compiler: fs2.Stream.Compiler[F, F])
class Syntax[F[_]: Applicative](implicit compiler: fs2.Compiler[F, F])
extends CypherTransactor.Syntax[F, Record, fs2.Stream[F, *]] {
syntax =>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.util.stream.{ Stream => JStream }

import scala.jdk.CollectionConverters._

import cats.effect.{ Blocker, ContextShift, Resource, Sync }
import cats.effect.{ Resource, Sync }
import cats.syntax.applicative._
import cats.{ Applicative, Apply, Monad }

Expand All @@ -14,7 +14,7 @@ package object util {

implicit def fs2StreamTxCMonad[F[_], Src](implicit
A: Applicative[F],
compiler: fs2.Stream.Compiler[F, F]
compiler: fs2.Compiler[F, F]
): Monad[TxC0[F, Src, fs2.Stream[F, *], *]] =
new Monad[TxC0[F, Src, fs2.Stream[F, *], *]] {
type TxC[A] = TxC0[F, Src, fs2.Stream[F, *], A]
Expand All @@ -38,9 +38,9 @@ package object util {
}
}

def javaStreamToFs2[F[_]: Sync: ContextShift, A](blocker: Blocker, fj: F[JStream[A]]): fs2.Stream[F, A] =
def javaStreamToFs2[F[_]: Sync, A](fj: F[JStream[A]], chunkSize: Int = 1024): fs2.Stream[F, A] =
fs2.Stream
.resource(Resource.fromAutoCloseable(fj))
.flatMap(jStream => fs2.Stream.fromBlockingIterator[F](blocker, jStream.iterator().asScala))
.flatMap(jStream => fs2.Stream.fromBlockingIterator[F](jStream.iterator().asScala, chunkSize))

}
Loading

0 comments on commit 5df9122

Please sign in to comment.