Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reactivestreams to flow #16

Merged
merged 6 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
scala: [2.12.14, 2.13.7]
scala: [2.12.14, 2.13.12]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't have to do it in this PR, but we can drop 2.12 support.

java: [amazon-corretto@1.17]
runs-on: ${{ matrix.os }}
steps:
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import ReleaseTransformations._
import microsites.CdnDirectives

lazy val scala212 = "2.12.14"
lazy val scala213 = "2.13.7"
lazy val scala213 = "2.13.12"
lazy val supportedScalaVersions = List(scala212, scala213)

ThisBuild / scalaVersion := scala213
Expand Down
8 changes: 5 additions & 3 deletions circe/src/main/scala/mongo4cats/circe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object circe extends JsonCodecs {
}

object implicits {
implicit def circeEncoderToEncoder[A: Encoder] = new BsonEncoder[A] {
implicit def circeEncoderToEncoder[A: Encoder]: BsonEncoder[A] = new BsonEncoder[A] {
def apply(a: A): BsonValue = {
val json = a.asJson
val wrapped = Json.obj(RootTag := json)
Expand All @@ -48,12 +48,14 @@ object circe extends JsonCodecs {
}
}

implicit def circeDecoderToDecoder[A: Decoder] = new BsonDecoder[A] {
implicit def circeDecoderToDecoder[A: Decoder]: BsonDecoder[A] = new BsonDecoder[A] {

val decoder = Decoder.instance[A](_.as[A])

def apply(b: BsonValue) = {
val doc = BsonDocument(RootTag -> (if (b == null) new BsonNull else b)).toJson()
val json = parser.parse(doc)
val jsonWithoutRoot = json.flatMap(_.hcursor.get[Json](RootTag))
val decoder = Decoder.instance[A](_.as[A])
jsonWithoutRoot
.flatMap(decoder.decodeJson(_))
.leftMap(x =>
Expand Down
5 changes: 3 additions & 2 deletions circe/src/test/scala/mongo4cats/MongoCollectionSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ import java.time.{Instant, LocalDate}
import java.time.temporal.ChronoField.MILLI_OF_SECOND
import java.time.temporal.ChronoUnit
import scala.concurrent.Future
import mongo4cats.bson.BsonDocumentEncoder

class MongoCollectionSpec extends AsyncWordSpec with Matchers with EmbeddedMongo {

import MongoCollectionSpec._

implicit val personEnc = unsafe.circeDocumentEncoder[Person]
implicit val paymentEnc = unsafe.circeDocumentEncoder[Payment]
implicit val personEnc: BsonDocumentEncoder[Person] = unsafe.circeDocumentEncoder[Person]
implicit val paymentEnc: BsonDocumentEncoder[Payment] = unsafe.circeDocumentEncoder[Payment]

override val mongoPort: Int = 12348

Expand Down
15 changes: 8 additions & 7 deletions circe/src/test/scala/mongo4cats/circe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,21 @@ class CirceSpec extends AnyWordSpec with Matchers with EitherValues {

"circe conversions" should {
"decode null as if it was Json.null" in {
circe.implicits.circeDecoderToDecoder[Unit](Decoder.instance { c =>
c.value.asNull.toRight(DecodingFailure("wasn't null!", Nil))
}).apply(null) shouldBe Right(())
circe.implicits
.circeDecoderToDecoder[Unit](Decoder.instance { c =>
c.value.asNull.toRight(DecodingFailure("wasn't null!", Nil))
})
.apply(null) shouldBe Right(())
}

"not report the internal root tag in history when reporting errors" in {

val deco = Decoder.instance(h => {
h.get[String]("hek")(Decoder.failedWithMessage("Bad!"))
})
val deco =
Decoder.instance(h => h.get[String]("hek")(Decoder.failedWithMessage("Bad!")))

val res = circe.implicits.circeDecoderToDecoder[String](deco).apply(new BsonString("hek"))

res.left.value.msg shouldBe "An error occured during decoding BsonValue BsonString{value='hek'}: DecodingFailure(Attempt to decode value on failed cursor, List(DownField(hek)))"
res.left.value.msg shouldBe "An error occured during decoding BsonValue BsonString{value='hek'}: DecodingFailure at .hek: Missing required field"

}
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/mongo4cats/bson/BsonDecoder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ object BsonDocumentDecoder extends LowLevelDocumentDecoder {
}

trait LowLevelDocumentDecoder {
implicit def narrowDecoder[A: BsonDecoder] = BsonDocumentDecoder.instance[A] {
(b: BsonDocument) =>
implicit def narrowDecoder[A: BsonDecoder]: BsonDocumentDecoder[A] =
BsonDocumentDecoder.instance[A] { (b: BsonDocument) =>
BsonDecoder[A].apply(b: BsonValue)
}
}
}

object BsonDecoder {
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/mongo4cats/helpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ package mongo4cats

import cats.effect.Async
import fs2.Stream
import fs2.interop.reactivestreams
import fs2.interop.flow
import org.reactivestreams.Publisher
import org.reactivestreams.FlowAdapters

object helpers {

Expand All @@ -36,9 +37,9 @@ object helpers {
boundedStream(1).compile.drain

def stream[F[_]: Async]: Stream[F, T] =
reactivestreams.fromPublisher(publisher, DefaultStreamChunkSize)
flow.fromPublisher(FlowAdapters.toFlowPublisher(publisher), DefaultStreamChunkSize)

def boundedStream[F[_]: Async](chunkSize: Int): Stream[F, T] =
reactivestreams.fromPublisher(publisher, chunkSize)
flow.fromPublisher(FlowAdapters.toFlowPublisher(publisher), chunkSize)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import mongo4cats.circe.unsafe
import mongo4cats.embedded.EmbeddedMongo

import java.time.Instant
import mongo4cats.bson.BsonDocumentEncoder

object CaseClassesWithCirceCodecs extends IOApp.Simple with EmbeddedMongo {

Expand All @@ -35,8 +36,8 @@ object CaseClassesWithCirceCodecs extends IOApp.Simple with EmbeddedMongo {
registrationDate: Instant
)

implicit val addressEnc = unsafe.circeDocumentEncoder[Address]
implicit val personEnc = unsafe.circeDocumentEncoder[Person]
implicit val addressEnc: BsonDocumentEncoder[Address] = unsafe.circeDocumentEncoder[Address]
implicit val personEnc: BsonDocumentEncoder[Person] = unsafe.circeDocumentEncoder[Person]

override val run: IO[Unit] =
withRunningEmbeddedMongo("localhost", 27017) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import mongo4cats.client.MongoClient
import mongo4cats.embedded.EmbeddedMongo

import java.time.Instant
import mongo4cats.bson.BsonDocumentEncoder

object DistinctNestedClassesWithCirceCodecs extends IOApp.Simple with EmbeddedMongo {

Expand All @@ -35,8 +36,8 @@ object DistinctNestedClassesWithCirceCodecs extends IOApp.Simple with EmbeddedMo
registrationDate: Instant
)

implicit val addressEnc = unsafe.circeDocumentEncoder[Address]
implicit val personEnc = unsafe.circeDocumentEncoder[Person]
implicit val addressEnc: BsonDocumentEncoder[Address] = unsafe.circeDocumentEncoder[Address]
implicit val personEnc: BsonDocumentEncoder[Person] = unsafe.circeDocumentEncoder[Person]

override val run: IO[Unit] =
withRunningEmbeddedMongo("localhost", 27017) {
Expand Down
11 changes: 4 additions & 7 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ import sbt._

object Dependencies {
private object Versions {
val mongodb = "4.4.0"
val fs2 = "3.2.4"
val mongodb = "4.11.1"
val fs2 = "3.10.2"
val scalaCompat = "2.6.0"
val circe = "0.14.1"
val circe = "0.14.6"
val findbugsJsr305Version = "1.3.9"

val logback = "1.2.10"
val scalaTest = "3.2.10"

val testContainers = "0.39.12"
val testContainers = "0.40.10"

val embeddedMongo = "3.2.5"
val immutableValue = "2.8.8"
Expand All @@ -25,7 +25,6 @@ object Dependencies {
val findbugsJsr305Version = "com.google.code.findbugs" % "jsr305" % Versions.findbugsJsr305Version % Provided

val fs2Core = "co.fs2" %% "fs2-core" % Versions.fs2
val fs2RS = "co.fs2" %% "fs2-reactive-streams" % Versions.fs2
val scalaCompat = "org.scala-lang.modules" %% "scala-collection-compat" % Versions.scalaCompat

val circeCore = "io.circe" %% "circe-core" % Versions.circe
Expand All @@ -49,7 +48,6 @@ object Dependencies {
Libraries.mongodbDriverStreams,
Libraries.findbugsJsr305Version,
Libraries.fs2Core,
Libraries.fs2RS,
Libraries.scalaCompat
)

Expand All @@ -70,7 +68,6 @@ object Dependencies {

lazy val embedded = Seq(
Libraries.fs2Core,
Libraries.fs2RS,
Libraries.embeddedMongo,
Libraries.immutableValue,
Libraries.commonsCompress
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.7.1"
version in ThisBuild := "0.7.2"
Loading