diff --git a/project/Version.scala b/project/Version.scala index 283f4cc..58f9b9a 100644 --- a/project/Version.scala +++ b/project/Version.scala @@ -1,8 +1,8 @@ object Version { val Akka = "2.6.9" val Camel = "2.20.4" - val CatsEffect = "2.2.0" - val Fs2 = "2.4.4" + val CatsEffect = "3.0.0-M1" + val Fs2 = "3.0.0-M1" val Log4j = "2.13.0" val JUnitInterface = "0.11" val Scalatest = "3.2.0" diff --git a/streamz-camel-fs2/README.md b/streamz-camel-fs2/README.md index 759d01d..4d47d84 100644 --- a/streamz-camel-fs2/README.md +++ b/streamz-camel-fs2/README.md @@ -2,7 +2,7 @@ Camel DSL for FS2 ----------------- [Apache Camel endpoints](http://camel.apache.org/components.html) can be integrated into [FS2](https://github.com/functional-streams-for-scala/fs2) applications with a [DSL](#dsl). - + ### Dependencies The DSL is provided by the `streamz-camel-fs2` artifact which is available for Scala 2.11 and 2.12: @@ -10,7 +10,7 @@ The DSL is provided by the `streamz-camel-fs2` artifact which is available for S resolvers += "krasserm at bintray" at "http://dl.bintray.com/krasserm/maven" libraryDependencies += "com.github.krasserm" %% "streamz-camel-fs2" % "0.10-M2" - + ### Configuration The consumer receive timeout on Camel endpoints defaults to 500 ms. If you need to change that, you can do so in `application.conf`: @@ -31,11 +31,11 @@ Its usage requires an implicit [`StreamContext`](http://krasserm.github.io/strea ```scala import streamz.camel.StreamContext -// contains an internally managed CamelContext +// contains an internally managed CamelContext implicit val streamContext: StreamContext = StreamContext() ``` -Applications that want to re-use an existing, externally managed `CamelContext` should create a `StreamContext` with `StreamContext(camelContext: CamelContext)`: +Applications that want to re-use an existing, externally managed `CamelContext` should create a `StreamContext` with `StreamContext(camelContext: CamelContext)`: ```scala import org.apache.camel.CamelContext @@ -49,7 +49,7 @@ implicit val streamContext: StreamContext = StreamContext(camelContext) ``` A `StreamContext` internally manages an `executorService` for running blocking endpoint operations. Applications can configure a custom executor service by providing an `executorServiceFactory` during `StreamContext` creation. See [API docs](http://krasserm.github.io/streamz/scala-2.12/unidoc/streamz/camel/StreamContext$.html) for details. -After usage, a `StreamContext` should be stopped with `streamContext.stop()`. +After usage, a `StreamContext` should be stopped with `streamContext.stop()`. #### Receiving in-only message exchanges from an endpoint @@ -58,8 +58,8 @@ An FS2 stream that emits messages consumed from a Camel endpoint can be created ```scala import cats.effect.IO import fs2.Stream -import streamz.camel.StreamContext -import streamz.camel.StreamMessage +import streamz.camel.StreamContext +import streamz.camel.StreamMessage import streamz.camel.fs2.dsl._ val s1: Stream[IO, StreamMessage[String]] = receive[IO, String]("seda:q1") @@ -73,7 +73,7 @@ val s1b: Stream[IO, String] = receiveBody[IO, String]("seda:q1") This is equivalent to `receive[IO, String]("seda:q1").map(_.body)`. -`receive` and `receiveBody` can only be used with endpoints that create [in-only message exchanges](http://camel.apache.org/exchange-pattern.html). +`receive` and `receiveBody` can only be used with endpoints that create [in-only message exchanges](http://camel.apache.org/exchange-pattern.html). #### Receiving in-out message exchanges from an endpoint @@ -87,9 +87,7 @@ For sending a `StreamMessage` to a Camel endpoint, the `send` combinator should val s2: Stream[IO, StreamMessage[String]] = s1.send("seda:q2") ``` -This initiates an in-only message [exchange](http://camel.apache.org/exchange.html) with an endpoint and continues the stream with the sent `StreamMessage`. - -The `send` combinator is not only available for streams of type `Stream[IO, StreamMessage[A]]` but more generally for any `Stream[F, A]` where `F: ContextShift: Async`. +This initiates an in-only message [exchange](http://camel.apache.org/exchange.html) with an endpoint and continues the stream with the sent `StreamMessage`. ```scala val s2b: Stream[IO, String] = s1b.send("seda:q2") @@ -105,9 +103,7 @@ For sending a request `StreamMessage` to an endpoint and obtaining a reply, the val s3: Stream[IO, StreamMessage[Int]] = s2.sendRequest[Int]("bean:service?method=weight") ``` -This initiates an in-out message exchange with the endpoint and continues the stream with the output `StreamMessage`. Here, a [Bean endpoint](https://camel.apache.org/bean.html) is used to call the `weight(String): Int` method on an object that is registered in the `CamelContext` under the name `service`. The input message body is used as `weight` call argument, the output message body is assigned the return value. The `sendRequest` type parameter (`Int`) specifies the expected output value type. The output message body can also be converted to another type provided that an appropriate Camel type converter is available (`Double`, for example). - -The `sendRequest` combinator is not only available for streams of type `Stream[IO, StreamMessage[A]]` but more generally for any `Stream[F, A]` where `F: ContextShift: Async`. +This initiates an in-out message exchange with the endpoint and continues the stream with the output `StreamMessage`. Here, a [Bean endpoint](https://camel.apache.org/bean.html) is used to call the `weight(String): Int` method on an object that is registered in the `CamelContext` under the name `service`. The input message body is used as `weight` call argument, the output message body is assigned the return value. The `sendRequest` type parameter (`Int`) specifies the expected output value type. The output message body can also be converted to another type provided that an appropriate Camel type converter is available (`Double`, for example). ```scala val s3b: Stream[IO, Int] = s2b.sendRequest[Int]("bean:service?method=weight") diff --git a/streamz-camel-fs2/src/main/scala/streamz/camel/fs2/dsl/package.scala b/streamz-camel-fs2/src/main/scala/streamz/camel/fs2/dsl/package.scala index 8cbdae8..006f4a4 100644 --- a/streamz-camel-fs2/src/main/scala/streamz/camel/fs2/dsl/package.scala +++ b/streamz-camel-fs2/src/main/scala/streamz/camel/fs2/dsl/package.scala @@ -18,8 +18,7 @@ package streamz.camel.fs2 import java.util.concurrent.TimeUnit -import cats.effect.{ Async, ContextShift } -import cats.implicits._ +import cats.effect.Async import fs2._ import org.apache.camel.spi.Synchronization import org.apache.camel.{ Exchange, ExchangePattern } @@ -33,7 +32,7 @@ package object dsl { /** * Camel endpoint combinators for [[StreamMessage]] streams of type `Stream[F, StreamMessage[A]]`. */ - implicit class SendDsl[F[_]: ContextShift: Async, A](self: Stream[F, StreamMessage[A]]) { + implicit class SendDsl[F[_]: Async, A](self: Stream[F, StreamMessage[A]]) { /** * @see [[dsl.send]] */ @@ -50,7 +49,7 @@ package object dsl { /** * Camel endpoint combinators for [[StreamMessage]] body streams of type `Stream[F, A]`. */ - implicit class SendBodyDsl[F[_]: ContextShift: Async, A](self: Stream[F, A]) { + implicit class SendBodyDsl[F[_]: Async, A](self: Stream[F, A]) { /** * @see [[dsl.sendBody]] */ @@ -71,13 +70,13 @@ package object dsl { /** * @see [[dsl.send]] */ - def send[F[_]](uri: String)(implicit context: StreamContext, contextShift: ContextShift[F], async: Async[F]): Stream[F, StreamMessage[A]] = + def send[F[_]](uri: String)(implicit context: StreamContext, async: Async[F]): Stream[F, StreamMessage[A]] = new SendDsl[F, A](self.covary[F]).send(uri) /** * @see [[dsl.sendRequest()]] */ - def sendRequest[F[_], B](uri: String)(implicit context: StreamContext, tag: ClassTag[B], contextShift: ContextShift[F], async: Async[F]): Stream[F, StreamMessage[B]] = + def sendRequest[F[_], B](uri: String)(implicit context: StreamContext, tag: ClassTag[B], async: Async[F]): Stream[F, StreamMessage[B]] = new SendDsl[F, A](self.covary[F]).sendRequest(uri) } @@ -88,13 +87,13 @@ package object dsl { /** * @see [[dsl.sendBody]] */ - def send[F[_]: ContextShift: Async](uri: String)(implicit context: StreamContext): Stream[F, A] = + def send[F[_]: Async](uri: String)(implicit context: StreamContext): Stream[F, A] = new SendBodyDsl[F, A](self.covary[F]).send(uri) /** * @see [[dsl.sendRequestBody]] */ - def sendRequest[F[_]: ContextShift: Async, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Stream[F, B] = + def sendRequest[F[_]: Async, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Stream[F, B] = new SendBodyDsl[F, A](self.covary[F]).sendRequest(uri) } @@ -110,7 +109,7 @@ package object dsl { * @param uri Camel endpoint URI. * @throws org.apache.camel.TypeConversionException if type conversion fails. */ - def receive[F[_]: ContextShift: Async, A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[F, StreamMessage[A]] = { + def receive[F[_]: Async, A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[F, StreamMessage[A]] = { consume(uri).filter(_ != null) } @@ -126,7 +125,7 @@ package object dsl { * @param uri Camel endpoint URI. * @throws org.apache.camel.TypeConversionException if type conversion fails. */ - def receiveBody[F[_]: ContextShift: Async, A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[F, A] = + def receiveBody[F[_]: Async, A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[F, A] = receive(uri).map(_.body) /** @@ -136,7 +135,7 @@ package object dsl { * * @param uri Camel endpoint URI. */ - def send[F[_]: ContextShift: Async, A](uri: String)(implicit context: StreamContext): Pipe[F, StreamMessage[A], StreamMessage[A]] = + def send[F[_]: Async, A](uri: String)(implicit context: StreamContext): Pipe[F, StreamMessage[A], StreamMessage[A]] = produce[F, A, A](uri, ExchangePattern.InOnly, (message, _) => message) /** @@ -146,7 +145,7 @@ package object dsl { * * @param uri Camel endpoint URI. */ - def sendBody[F[_]: ContextShift: Async, A](uri: String)(implicit context: StreamContext): Pipe[F, A, A] = + def sendBody[F[_]: Async, A](uri: String)(implicit context: StreamContext): Pipe[F, A, A] = s => s.map(StreamMessage(_)).through(send(uri)).map(_.body) /** @@ -158,7 +157,7 @@ package object dsl { * @param uri Camel endpoint URI. * @throws org.apache.camel.TypeConversionException if type conversion fails. */ - def sendRequest[F[_]: ContextShift: Async, A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[F, StreamMessage[A], StreamMessage[B]] = + def sendRequest[F[_]: Async, A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[F, StreamMessage[A], StreamMessage[B]] = produce[F, A, B](uri, ExchangePattern.InOut, (_, exchange) => StreamMessage.from[B](exchange.getOut)) /** @@ -170,13 +169,13 @@ package object dsl { * @param uri Camel endpoint URI. * @throws org.apache.camel.TypeConversionException if type conversion fails. */ - def sendRequestBody[F[_]: ContextShift: Async, A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[F, A, B] = + def sendRequestBody[F[_]: Async, A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[F, A, B] = s => s.map(StreamMessage(_)).through(sendRequest[F, A, B](uri)).map(_.body) - private def consume[F[_], A](uri: String)(implicit context: StreamContext, tag: ClassTag[A], contextShift: ContextShift[F], F: Async[F]): Stream[F, StreamMessage[A]] = { + private def consume[F[_], A](uri: String)(implicit context: StreamContext, tag: ClassTag[A], F: Async[F]): Stream[F, StreamMessage[A]] = { val timeout = context.config.getDuration("streamz.camel.consumer.receive.timeout", TimeUnit.MILLISECONDS) Stream.repeatEval { - contextShift.shift >> F.async[StreamMessage[A]] { callback => + F.async_[StreamMessage[A]] { callback => Try(context.consumerTemplate.receive(uri, timeout)) match { case Success(null) => callback(Right(null)) @@ -199,10 +198,10 @@ package object dsl { } } - private def produce[F[_], A, B](uri: String, pattern: ExchangePattern, result: (StreamMessage[A], Exchange) => StreamMessage[B])(implicit context: StreamContext, contextShift: ContextShift[F], F: Async[F]): Pipe[F, StreamMessage[A], StreamMessage[B]] = { s => + private def produce[F[_], A, B](uri: String, pattern: ExchangePattern, result: (StreamMessage[A], Exchange) => StreamMessage[B])(implicit context: StreamContext, F: Async[F]): Pipe[F, StreamMessage[A], StreamMessage[B]] = { s => s.flatMap { message => Stream.eval { - contextShift.shift >> F.async[StreamMessage[B]] { callback => + F.async_[StreamMessage[B]] { callback => context.producerTemplate.asyncCallback(uri, context.createExchange(message, pattern), new Synchronization { override def onFailure(exchange: Exchange): Unit = callback(Left(exchange.getException)) diff --git a/streamz-camel-fs2/src/test/scala/streamz/camel/fs2/dsl/DslSpec.scala b/streamz-camel-fs2/src/test/scala/streamz/camel/fs2/dsl/DslSpec.scala index 4e3690d..ced718e 100644 --- a/streamz-camel-fs2/src/test/scala/streamz/camel/fs2/dsl/DslSpec.scala +++ b/streamz-camel-fs2/src/test/scala/streamz/camel/fs2/dsl/DslSpec.scala @@ -30,6 +30,8 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec class DslSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll { + import cats.effect.unsafe.implicits.global + val camelRegistry = new SimpleRegistry val camelContext = new DefaultCamelContext() @@ -37,7 +39,6 @@ class DslSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll { camelRegistry.put("service", new Service) implicit val streamContext = new StreamContext(camelContext) - implicit val contextShift = IO.contextShift(scala.concurrent.ExecutionContext.global) import streamContext._ diff --git a/streamz-converter/src/main/scala/streamz/converter/Converter.scala b/streamz-converter/src/main/scala/streamz/converter/Converter.scala index 0bfb7b2..f3d3620 100644 --- a/streamz-converter/src/main/scala/streamz/converter/Converter.scala +++ b/streamz-converter/src/main/scala/streamz/converter/Converter.scala @@ -23,10 +23,11 @@ import akka.stream._ import akka.stream.scaladsl.{ Flow => AkkaFlow, Sink => AkkaSink, Source => AkkaSource, _ } import akka.{ Done, NotUsed } import cats.effect._ -import cats.effect.concurrent.Deferred -import cats.effect.implicits._ +import cats.effect.kernel.Deferred import cats.implicits._ import fs2._ +import cats.effect.unsafe.UnsafeRun +import cats.effect.Resource.ExitCase trait Converter { @@ -34,9 +35,9 @@ trait Converter { * Converts an Akka Stream [[Graph]] of [[SourceShape]] to an FS2 [[Stream]]. * If the materialized value needs be obtained, use [[akkaSourceToFs2StreamMat]]. */ - def akkaSourceToFs2Stream[F[_]: Async: ContextShift, A](source: Graph[SourceShape[A], NotUsed])(implicit materializer: Materializer): Stream[F, A] = + def akkaSourceToFs2Stream[F[_]: Async, A](source: Graph[SourceShape[A], NotUsed])(implicit materializer: Materializer): Stream[F, A] = Stream.force { - Async[F].delay { + Sync[F].delay { val subscriber = AkkaSource.fromGraph(source).toMat(AkkaSink.queue[A]())(Keep.right).run() subscriberStream[F, A](subscriber) } @@ -46,8 +47,8 @@ trait Converter { * Converts an Akka Stream [[Graph]] of [[SourceShape]] to an FS2 [[Stream]]. This method returns the FS2 [[Stream]] * and the materialized value of the [[Graph]]. */ - def akkaSourceToFs2StreamMat[F[_]: Async: ContextShift, A, M](source: Graph[SourceShape[A], M])(implicit materializer: Materializer): F[(Stream[F, A], M)] = - Async[F].delay { + def akkaSourceToFs2StreamMat[F[_]: Async, A, M](source: Graph[SourceShape[A], M])(implicit materializer: Materializer): F[(Stream[F, A], M)] = + Sync[F].delay { val (mat, subscriber) = AkkaSource.fromGraph(source).toMat(AkkaSink.queue[A]())(Keep.both).run() (subscriberStream[F, A](subscriber), mat) } @@ -56,10 +57,10 @@ trait Converter { * Converts an Akka Stream [[Graph]] of [[SinkShape]] to an FS2 [[Pipe]]. * If the materialized value needs be obtained, use [[akkaSinkToFs2PipeMat]]. */ - def akkaSinkToFs2Pipe[F[_]: Concurrent: ContextShift, A](sink: Graph[SinkShape[A], NotUsed])(implicit materializer: Materializer): Pipe[F, A, Unit] = + def akkaSinkToFs2Pipe[F[_]: Async, A](sink: Graph[SinkShape[A], NotUsed])(implicit materializer: Materializer): Pipe[F, A, Unit] = (s: Stream[F, A]) => Stream.force { - Async[F].delay { + Sync[F].delay { val publisher = AkkaSource.queue[A](0, OverflowStrategy.backpressure).toMat(sink)(Keep.left).run() publisherStream[F, A](publisher, s) } @@ -69,8 +70,8 @@ trait Converter { * Converts an Akka Stream [[Graph]] of [[SinkShape]] to an FS2 [[Pipe]]. This method returns the FS2 [[Pipe]] * and the materialized value of the [[Graph]]. */ - def akkaSinkToFs2PipeMat[F[_]: Concurrent: ContextShift, A, M](sink: Graph[SinkShape[A], M])(implicit materializer: Materializer): F[(Pipe[F, A, Unit], M)] = - Concurrent[F].delay { + def akkaSinkToFs2PipeMat[F[_]: Async, A, M](sink: Graph[SinkShape[A], M])(implicit materializer: Materializer): F[(Pipe[F, A, Unit], M)] = + Sync[F].delay { val (publisher, mat) = AkkaSource.queue[A](0, OverflowStrategy.backpressure).toMat(sink)(Keep.both).run() ((s: Stream[F, A]) => publisherStream[F, A](publisher, s), mat) } @@ -81,7 +82,7 @@ trait Converter { * The stream returned by this will emit the Future's value one time at the end, * then terminate. */ - def akkaSinkToFs2PipeMat[F[_]: ConcurrentEffect: ContextShift, A, M](akkaSink: Graph[SinkShape[A], Future[M]])( + def akkaSinkToFs2PipeMat[F[_]: Async: UnsafeRun, A, M](akkaSink: Graph[SinkShape[A], Future[M]])( implicit ec: ExecutionContext, m: Materializer): F[Pipe[F, A, Either[Throwable, M]]] = @@ -90,10 +91,10 @@ trait Converter { fs2Sink <- akkaSinkToFs2PipeMat[F, A, Future[M]](akkaSink).flatMap { case (stream, mat) => // This callback tells the akka materialized future to store its result status into the Promise - val callback = ConcurrentEffect[F].delay( + val callback = Sync[F].delay( mat.onComplete { - case Failure(ex) => promise.complete(ex.asLeft).toIO.unsafeRunSync() - case Success(value) => promise.complete(value.asRight).toIO.unsafeRunSync() + case Failure(ex) => UnsafeRun[F].unsafeRunSync(promise.complete(ex.asLeft)) + case Success(value) => UnsafeRun[F].unsafeRunSync(promise.complete(value.asRight)) }) callback.map(_ => stream) } @@ -113,10 +114,10 @@ trait Converter { * Converts an Akka Stream [[Graph]] of [[FlowShape]] to an FS2 [[Pipe]]. * If the materialized value needs be obtained, use [[akkaSinkToFs2PipeMat]]. */ - def akkaFlowToFs2Pipe[F[_]: Concurrent: ContextShift, A, B](flow: Graph[FlowShape[A, B], NotUsed])(implicit materializer: Materializer): Pipe[F, A, B] = + def akkaFlowToFs2Pipe[F[_]: Async, A, B](flow: Graph[FlowShape[A, B], NotUsed])(implicit materializer: Materializer): Pipe[F, A, B] = (s: Stream[F, A]) => Stream.force { - Concurrent[F].delay { + Sync[F].delay { val src = AkkaSource.queue[A](0, OverflowStrategy.backpressure) val snk = AkkaSink.queue[B]() val (publisher, subscriber) = src.viaMat(flow)(Keep.left).toMat(snk)(Keep.both).run() @@ -128,8 +129,8 @@ trait Converter { * Converts an Akka Stream [[Graph]] of [[FlowShape]] to an FS2 [[Pipe]]. This method returns the FS2 [[Pipe]] * and the materialized value of the [[Graph]]. */ - def akkaFlowToFs2PipeMat[F[_]: Concurrent: ContextShift, A, B, M](flow: Graph[FlowShape[A, B], M])(implicit materializer: Materializer): F[(Pipe[F, A, B], M)] = - Concurrent[F].delay { + def akkaFlowToFs2PipeMat[F[_]: Async, A, B, M](flow: Graph[FlowShape[A, B], M])(implicit materializer: Materializer): F[(Pipe[F, A, B], M)] = + Sync[F].delay { val src = AkkaSource.queue[A](0, OverflowStrategy.backpressure) val snk = AkkaSink.queue[B]() val ((publisher, mat), subscriber) = src.viaMat(flow)(Keep.both).toMat(snk)(Keep.both).run() @@ -140,12 +141,12 @@ trait Converter { * Converts an FS2 [[Stream]] to an Akka Stream [[Graph]] of [[SourceShape]]. The [[Stream]] is run when the * [[Graph]] is materialized. */ - def fs2StreamToAkkaSource[F[_]: ConcurrentEffect: ContextShift, A](stream: Stream[F, A]): Graph[SourceShape[A], NotUsed] = { + def fs2StreamToAkkaSource[F[_]: Async: UnsafeRun, A](stream: Stream[F, A]): Graph[SourceShape[A], NotUsed] = { val source = AkkaSource.queue[A](0, OverflowStrategy.backpressure) // A sink that runs an FS2 publisherStream when consuming the publisher actor (= materialized value) of source val sink = AkkaSink.foreach[SourceQueueWithComplete[A]] { p => // Fire and forget Future so it runs in the background - publisherStream[F, A](p, stream).compile.drain.toIO.unsafeToFuture() + UnsafeRun[F].unsafeRunAndForget(publisherStream[F, A](p, stream).compile.drain) () } @@ -160,15 +161,15 @@ trait Converter { * Converts an FS2 [[Pipe]] to an Akka Stream [[Graph]] of [[SinkShape]]. The [[Sink]] is run when the * [[Graph]] is materialized. */ - def fs2PipeToAkkaSink[F[_]: ContextShift: Effect, A](sink: Pipe[F, A, Unit]): Graph[SinkShape[A], Future[Done]] = { + def fs2PipeToAkkaSink[F[_]: UnsafeRun, A](sink: Pipe[F, A, Unit])(implicit F: Async[F]): Graph[SinkShape[A], Future[Done]] = { val sink1: AkkaSink[A, SinkQueueWithCancel[A]] = AkkaSink.queue[A]() // A sink that runs an FS2 subscriberStream when consuming the subscriber actor (= materialized value) of sink1. // The future returned from unsafeToFuture() completes when the subscriber stream completes and is made // available as materialized value of this sink. val sink2: AkkaSink[SinkQueueWithCancel[A], Future[Done]] = AkkaFlow[SinkQueueWithCancel[A]] - .map(s => subscriberStream[F, A](s).through(sink).compile.drain.toIO.as(Done: Done).unsafeToFuture()) + .map(s => UnsafeRun[F].unsafeRunFutureCancelable(subscriberStream[F, A](s).through(sink).compile.drain.as(Done))) .toMat(AkkaSink.head)(Keep.right) - .mapMaterializedValue(ffd => Async.fromFuture(Async.fromFuture(Effect[F].pure(ffd))).toIO.unsafeToFuture()) + .mapMaterializedValue(ffd => UnsafeRun[F].unsafeRunFutureCancelable(F.fromFuture(F.fromFuture(F.pure(ffd)).map(_._1)))._1) // fromFuture dance above is because scala 2.11 lacks Future#flatten. `pure` instead of `delay` // because the future value is already strict by the time we get it. @@ -183,15 +184,14 @@ trait Converter { * Converts an FS2 [[Pipe]] to an Akka Stream [[Graph]] of [[FlowShape]]. The [[Pipe]] is run when the * [[Graph]] is materialized. */ - def fs2PipeToAkkaFlow[F[_]: ConcurrentEffect: ContextShift, A, B](pipe: Pipe[F, A, B]): Graph[FlowShape[A, B], NotUsed] = { + def fs2PipeToAkkaFlow[F[_]: Async: UnsafeRun, A, B](pipe: Pipe[F, A, B]): Graph[FlowShape[A, B], NotUsed] = { val source = AkkaSource.queue[B](0, OverflowStrategy.backpressure) val sink1: AkkaSink[A, SinkQueueWithCancel[A]] = AkkaSink.queue[A]() // A sink that runs an FS2 transformerStream when consuming the publisher actor (= materialized value) of source // and the subscriber actor (= materialized value) of sink1 val sink2 = AkkaSink.foreach[(SourceQueueWithComplete[B], SinkQueueWithCancel[A])] { ps => // Fire and forget Future so it runs in the background - ConcurrentEffect[F].toIO(transformerStream(ps._2, ps._1, pipe).compile.drain).unsafeToFuture() - () + UnsafeRun[F].unsafeRunAndForget(transformerStream(ps._2, ps._1, pipe).compile.drain) } AkkaFlow.fromGraph(GraphDSL.create(source, sink1)(Keep.both) { implicit builder => (source, sink1) => @@ -201,14 +201,14 @@ trait Converter { }).mapMaterializedValue(_ => NotUsed) } - private def subscriberStream[F[_]: Async: ContextShift, A](subscriber: SinkQueueWithCancel[A]): Stream[F, A] = { - val pull = Async.fromFuture(Async[F].delay(subscriber.pull())) - val cancel = Async[F].delay(subscriber.cancel()) + private def subscriberStream[F[_]: Async, A](subscriber: SinkQueueWithCancel[A]): Stream[F, A] = { + val pull = Async[F].fromFuture(Sync[F].delay(subscriber.pull())) + val cancel = Sync[F].delay(subscriber.cancel()) Stream.repeatEval(pull).unNoneTerminate.onFinalize(cancel) } - private def publisherStream[F[_]: Concurrent: ContextShift, A](publisher: SourceQueueWithComplete[A], stream: Stream[F, A]): Stream[F, Unit] = { - def publish(a: A): F[Option[Unit]] = Async.fromFuture(Concurrent[F].delay(publisher.offer(a))).flatMap { + private def publisherStream[F[_]: Async, A](publisher: SourceQueueWithComplete[A], stream: Stream[F, A]): Stream[F, Unit] = { + def publish(a: A): F[Option[Unit]] = Async[F].fromFuture(Sync[F].delay(publisher.offer(a))).flatMap { case QueueOfferResult.Enqueued => ().some.pure[F] case QueueOfferResult.Failure(cause) => Concurrent[F].raiseError[Option[Unit]](cause) case QueueOfferResult.QueueClosed => none[Unit].pure[F] @@ -221,21 +221,21 @@ trait Converter { case _: StreamDetachedException => none[Unit] } - def watchCompletion: F[Unit] = Async.fromFuture(Concurrent[F].delay(publisher.watchCompletion())).void - def fail(e: Throwable): F[Unit] = Concurrent[F].delay(publisher.fail(e)) >> watchCompletion - def complete: F[Unit] = Concurrent[F].delay(publisher.complete()) >> watchCompletion + def watchCompletion: F[Unit] = Async[F].fromFuture(Sync[F].delay(publisher.watchCompletion())).void + def fail(e: Throwable): F[Unit] = Sync[F].delay(publisher.fail(e)) >> watchCompletion + def complete: F[Unit] = Sync[F].delay(publisher.complete()) >> watchCompletion stream.interruptWhen(watchCompletion.attempt).evalMap(publish).unNoneTerminate .onFinalizeCase { - case ExitCase.Completed | ExitCase.Canceled => complete - case ExitCase.Error(e) => fail(e) + case ExitCase.Succeeded | ExitCase.Canceled => complete + case ExitCase.Errored(e) => fail(e) } } - private def transformerStream[F[_]: ContextShift: Concurrent, A, B](subscriber: SinkQueueWithCancel[B], publisher: SourceQueueWithComplete[A], stream: Stream[F, A]): Stream[F, B] = + private def transformerStream[F[_]: Async, A, B](subscriber: SinkQueueWithCancel[B], publisher: SourceQueueWithComplete[A], stream: Stream[F, A]): Stream[F, B] = subscriberStream[F, B](subscriber).concurrently(publisherStream[F, A](publisher, stream)) - private def transformerStream[F[_]: ContextShift: Concurrent, A, B](subscriber: SinkQueueWithCancel[A], publisher: SourceQueueWithComplete[B], pipe: Pipe[F, A, B]): Stream[F, Unit] = + private def transformerStream[F[_]: Async, A, B](subscriber: SinkQueueWithCancel[A], publisher: SourceQueueWithComplete[B], pipe: Pipe[F, A, B]): Stream[F, Unit] = subscriberStream[F, A](subscriber).through(pipe).through(s => publisherStream(publisher, s)) } @@ -244,27 +244,19 @@ trait ConverterDsl extends Converter { implicit class AkkaSourceDsl[A, M](source: Graph[SourceShape[A], M]) { /** @see [[Converter#akkaSourceToFs2Stream]] */ - def toStream[F[_]: ContextShift: Async](implicit materializer: Materializer): Stream[F, A] = + def toStream[F[_]: Async](implicit materializer: Materializer): Stream[F, A] = akkaSourceToFs2Stream(source.asInstanceOf[Graph[SourceShape[A], NotUsed]]) /** @see [[Converter#akkaSourceToFs2StreamMat]] */ - def toStreamMat[F[_]: ContextShift: Async](implicit materializer: Materializer): F[(Stream[F, A], M)] = + def toStreamMat[F[_]: Async](implicit materializer: Materializer): F[(Stream[F, A], M)] = akkaSourceToFs2StreamMat(source) - @deprecated(message = "Use `.toStream[F]` for M=NotUsed; use `.toStreamMat[F]` for other M. This version relies on side effects.", since = "0.11") - def toStream[F[_]: ContextShift: Async](onMaterialization: M => Unit = _ => ())(implicit materializer: Materializer): Stream[F, A] = - Stream.force( - akkaSourceToFs2StreamMat(source).map { - case (akkaStream, mat) => - onMaterialization(mat) - akkaStream - }) } implicit class AkkaSinkFutureDsl[A, M](sink: Graph[SinkShape[A], Future[M]]) { /** @see [[Converter#akkaSinkToFs2SinkMat]] */ - def toPipeMatWithResult[F[_]: ConcurrentEffect: ContextShift]( + def toPipeMatWithResult[F[_]: Async: UnsafeRun]( implicit ec: ExecutionContext, m: Materializer): F[Pipe[F, A, Either[Throwable, M]]] = @@ -275,77 +267,41 @@ trait ConverterDsl extends Converter { implicit class AkkaSinkDsl[A, M](sink: Graph[SinkShape[A], M]) { /** @see [[Converter#akkaSinkToFs2Sink]] */ - def toPipe[F[_]: ContextShift: Concurrent](implicit materializer: Materializer): Pipe[F, A, Unit] = + def toPipe[F[_]: Async](implicit materializer: Materializer): Pipe[F, A, Unit] = akkaSinkToFs2Pipe(sink.asInstanceOf[Graph[SinkShape[A], NotUsed]]) /** @see [[Converter#akkaSinkToFs2SinkMat]] */ - def toPipeMat[F[_]: ContextShift: Concurrent](implicit materializer: Materializer): F[(Pipe[F, A, Unit], M)] = + def toPipeMat[F[_]: Async](implicit materializer: Materializer): F[(Pipe[F, A, Unit], M)] = akkaSinkToFs2PipeMat(sink) - @deprecated(message = "Use `.toSink[F]` for M=NotUsed; use `.toSinkMat[F]` for other M. This version relies on side effects.", since = "0.11") - def toSink[F[_]: ContextShift: Concurrent](onMaterialization: M => Unit)(implicit materializer: Materializer): Pipe[F, A, Unit] = - (s: Stream[F, A]) => - Stream.force { - akkaSinkToFs2PipeMat(sink).map { - case (fs2Sink, mat) => - onMaterialization(mat) - s.through(fs2Sink) - } - } - } implicit class AkkaFlowDsl[A, B, M](flow: Graph[FlowShape[A, B], M]) { /** @see [[Converter#akkaFlowToFs2Pipe]] */ - def toPipe[F[_]: ContextShift: ConcurrentEffect](implicit materializer: Materializer): Pipe[F, A, B] = + def toPipe[F[_]: Async](implicit materializer: Materializer): Pipe[F, A, B] = akkaFlowToFs2Pipe(flow.asInstanceOf[Graph[FlowShape[A, B], NotUsed]]) /** @see [[Converter#akkaFlowToFs2PipeMat]] */ - def toPipeMat[F[_]: ContextShift: ConcurrentEffect](implicit materializer: Materializer): F[(Pipe[F, A, B], M)] = + def toPipeMat[F[_]: Async](implicit materializer: Materializer): F[(Pipe[F, A, B], M)] = akkaFlowToFs2PipeMat(flow) - @deprecated(message = "Use `.toPipe[F]` for M=NotUsed; use `.toPipeMat[F]` for other M. This version relies on side effects.", since = "0.11") - def toPipe[F[_]: ContextShift: ConcurrentEffect](onMaterialization: M => Unit = _ => ())(implicit materializer: Materializer): Pipe[F, A, B] = - (s: Stream[F, A]) => Stream.force { - akkaFlowToFs2PipeMat(flow).map { - case (fs2Pipe, mat) => - onMaterialization(mat) - s.through(fs2Pipe) - } - } - } - - implicit class FS2StreamNothingDsl[A](stream: Stream[Nothing, A]) { - - /** @see [[Converter#fs2StreamToAkkaSource]] */ - @deprecated("Use `stream.covary[F].toSource` instead", "0.10") - def toSource(implicit contextShift: ContextShift[IO]): Graph[SourceShape[A], NotUsed] = - fs2StreamToAkkaSource(stream: Stream[IO, A]) - } - - implicit class FS2StreamPureDsl[A](stream: Stream[Pure, A]) { - - /** @see [[Converter#fs2StreamToAkkaSource]] */ - @deprecated("Use `stream.covary[F].toSource` instead", "0.10") - def toSource(implicit contextShift: ContextShift[IO]): Graph[SourceShape[A], NotUsed] = - fs2StreamToAkkaSource(stream: Stream[IO, A]) } - implicit class FS2StreamIODsl[F[_]: ContextShift: ConcurrentEffect, A](stream: Stream[F, A]) { + implicit class FS2StreamIODsl[F[_]: Async: UnsafeRun, A](stream: Stream[F, A]) { /** @see [[Converter#fs2StreamToAkkaSource]] */ def toSource: Graph[SourceShape[A], NotUsed] = fs2StreamToAkkaSource(stream) } - implicit class FS2SinkIODsl[F[_]: Effect: ContextShift, A](sink: Pipe[F, A, Unit]) { + implicit class FS2SinkIODsl[F[_]: Async: UnsafeRun, A](sink: Pipe[F, A, Unit]) { /** @see [[Converter#fs2PipeToAkkaSink]] */ def toSink: Graph[SinkShape[A], Future[Done]] = fs2PipeToAkkaSink(sink) } - implicit class FS2PipeIODsl[F[_]: ContextShift: ConcurrentEffect, A, B](pipe: Pipe[F, A, B]) { + implicit class FS2PipeIODsl[F[_]: Async: UnsafeRun, A, B](pipe: Pipe[F, A, B]) { /** @see [[Converter#fs2PipeToAkkaFlow]] */ def toFlow: Graph[FlowShape[A, B], NotUsed] = diff --git a/streamz-converter/src/test/scala/streamz/converter/ConverterSpec.scala b/streamz-converter/src/test/scala/streamz/converter/ConverterSpec.scala index dac9aab..e27f34c 100644 --- a/streamz-converter/src/test/scala/streamz/converter/ConverterSpec.scala +++ b/streamz-converter/src/test/scala/streamz/converter/ConverterSpec.scala @@ -18,7 +18,6 @@ package streamz.converter import akka.Done import akka.actor.ActorSystem -import akka.stream.Materializer import akka.stream.scaladsl.{ Flow => AkkaFlow, Sink => AkkaSink, Source => AkkaSource, _ } import akka.testkit._ import cats.effect.IO @@ -42,13 +41,11 @@ object ConverterSpec { class ConverterSpec extends TestKit(ActorSystem("test")) with AnyWordSpecLike with Matchers with BeforeAndAfterAll { import ConverterSpec._ + import cats.effect.unsafe.implicits.global - private implicit val materializer = Materializer.createMaterializer(system) private implicit val dispatcher = system.dispatcher - private implicit val contextShift = IO.contextShift(scala.concurrent.ExecutionContext.global) override def afterAll(): Unit = { - materializer.shutdown() TestKit.shutdownActorSystem(system) super.afterAll() } @@ -235,7 +232,7 @@ class ConverterSpec extends TestKit(ActorSystem("test")) with AnyWordSpecLike wi def seqSink(probe: TestProbe): Pipe[IO, Int, Unit] = s => s.fold(Seq.empty[Int])(_ :+ _).map(probe.ref ! Success(_)) - .handleErrorWith(err => Stream.eval_(IO(probe.ref ! Failure(err))) ++ Stream.raiseError[IO](err)) + .handleErrorWith(err => Stream.exec(IO(probe.ref ! Failure(err))) ++ Stream.raiseError[IO](err)) .onFinalize(IO(probe.ref ! Success(Done))) "propagate elements and completion from AS sink to FS2 sink" in { diff --git a/streamz-examples/src/main/scala/streamz/examples/camel/fs2/Example.scala b/streamz-examples/src/main/scala/streamz/examples/camel/fs2/Example.scala index b33df99..edcfa10 100644 --- a/streamz-examples/src/main/scala/streamz/examples/camel/fs2/Example.scala +++ b/streamz-examples/src/main/scala/streamz/examples/camel/fs2/Example.scala @@ -20,24 +20,27 @@ import cats.effect.IO import fs2.{ Stream, text } import streamz.camel.fs2.dsl._ import streamz.examples.camel.ExampleContext +import cats.effect.{ ExitCode, IOApp } -object Example extends ExampleContext with App { - implicit val contextShift = IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global) +object Example extends IOApp with ExampleContext { - val tcpLineStream: Stream[IO, String] = - receiveBody[IO, String](tcpEndpointUri) + def run(args: List[String]) = { - val fileLineStream: Stream[IO, String] = - receiveBody[IO, String](fileEndpointUri).through(text.lines) + val tcpLineStream: Stream[IO, String] = + receiveBody[IO, String](tcpEndpointUri) - val linePrefixStream: Stream[IO, String] = - Stream.iterate(1)(_ + 1).sendRequest[IO, String](serviceEndpointUri) + val fileLineStream: Stream[IO, String] = + receiveBody[IO, String](fileEndpointUri).through(text.lines) - val stream: Stream[IO, String] = - tcpLineStream - .merge(fileLineStream) - .zipWith(linePrefixStream)((l, n) => n concat l) - .send(printerEndpointUri) + val linePrefixStream: Stream[IO, String] = + Stream.iterate(1)(_ + 1).sendRequest[IO, String](serviceEndpointUri) - stream.compile.drain.unsafeRunSync() + val stream: Stream[IO, String] = + tcpLineStream + .merge(fileLineStream) + .zipWith(linePrefixStream)((l, n) => n concat l) + .send(printerEndpointUri) + + stream.compile.drain.as(ExitCode.Success) + } } diff --git a/streamz-examples/src/main/scala/streamz/examples/camel/fs2/Snippets.scala b/streamz-examples/src/main/scala/streamz/examples/camel/fs2/Snippets.scala index 6906916..1cea821 100644 --- a/streamz-examples/src/main/scala/streamz/examples/camel/fs2/Snippets.scala +++ b/streamz-examples/src/main/scala/streamz/examples/camel/fs2/Snippets.scala @@ -23,7 +23,6 @@ import streamz.camel.fs2.dsl._ object Snippets { implicit val context = StreamContext() - implicit val contextShift = IO.contextShift(scala.concurrent.ExecutionContext.global) val s: Stream[IO, StreamMessage[Int]] = // receive stream message from endpoint @@ -37,6 +36,7 @@ object Snippets { val t: IO[Unit] = s.compile.drain // run IO (side effects only here) ... + import cats.effect.unsafe.implicits.global // Normally taken as implicit param t.unsafeRunSync() val s1: Stream[IO, StreamMessage[String]] = receive[IO, String]("seda:q1") diff --git a/streamz-examples/src/main/scala/streamz/examples/converter/Example.scala b/streamz-examples/src/main/scala/streamz/examples/converter/Example.scala index f108d6f..d0b1576 100644 --- a/streamz-examples/src/main/scala/streamz/examples/converter/Example.scala +++ b/streamz-examples/src/main/scala/streamz/examples/converter/Example.scala @@ -20,61 +20,75 @@ import akka.actor.{ ActorRefFactory, ActorSystem } import akka.stream.Materializer import akka.stream.scaladsl.{ Keep, Flow => AkkaFlow, Sink => AkkaSink, Source => AkkaSource } import akka.{ Done, NotUsed } -import cats.effect.{ ContextShift, IO } +import cats.effect.IO +import cats.implicits._ import fs2.{ Pipe, Pure, Stream } import streamz.converter._ import scala.collection.immutable.Seq import scala.concurrent._ import scala.concurrent.duration._ +import cats.effect.IOApp +import cats.effect.kernel.Resource +import cats.effect.ExitCode -object Example extends App { - val system: ActorSystem = ActorSystem("example") - val factory: ActorRefFactory = system +object Example extends IOApp { - implicit val executionContext: ExecutionContext = factory.dispatcher - implicit val materializer: Materializer = Materializer.createMaterializer(system) - implicit val contextShift: ContextShift[IO] = IO.contextShift(materializer.executionContext) + val mkSystem: Resource[IO, ActorSystem] = Resource.make(IO(ActorSystem("example")))(s => IO.fromFuture(IO(s.terminate())).map(_ => ())) - val numbers: Seq[Int] = 1 to 10 + def run(args: List[String]) = { + mkSystem.use { implicit system: ActorSystem => - // -------------------------------- - // Akka Stream to FS2 conversions - // -------------------------------- + val numbers: List[Int] = (1 to 10).toList - def f(i: Int) = List(s"$i-1", s"$i-2") + // -------------------------------- + // Akka Stream to FS2 conversions + // -------------------------------- - val aSink1: AkkaSink[Int, Future[Done]] = AkkaSink.foreach[Int](println) - val fSink1: Pipe[IO, Int, Unit] = aSink1.toPipe[IO] + def f(i: Int) = List(s"$i-1", s"$i-2") - val aSource1: AkkaSource[Int, NotUsed] = AkkaSource(numbers) - val fStream1: Stream[IO, Int] = aSource1.toStream[IO] + val aSink1: AkkaSink[Int, Future[Done]] = AkkaSink.foreach[Int](println) + val fSink1: Pipe[IO, Int, Unit] = aSink1.toPipe[IO] - val aFlow1: AkkaFlow[Int, String, NotUsed] = AkkaFlow[Int].mapConcat(f) - val fPipe1: Pipe[IO, Int, String] = aFlow1.toPipe[IO] + val aSource1: AkkaSource[Int, NotUsed] = AkkaSource(numbers) + val fStream1: Stream[IO, Int] = aSource1.toStream[IO] - fStream1.through(fSink1).compile.drain.unsafeRunSync() // prints numbers - assert(fStream1.compile.toVector.unsafeRunSync() == numbers) - assert(fStream1.through(fPipe1).compile.toVector.unsafeRunSync() == numbers.flatMap(f)) + val aFlow1: AkkaFlow[Int, String, NotUsed] = AkkaFlow[Int].mapConcat(f) + val fPipe1: Pipe[IO, Int, String] = aFlow1.toPipe[IO] - // -------------------------------- - // FS2 to Akka Stream conversions - // -------------------------------- + val akkaToFs2Example = for { + _ <- fStream1.through(fSink1).compile.drain + numbersResult <- fStream1.compile.toList + numbersPipeResult <- fStream1.through(fPipe1).compile.toList + } yield { + assert(numbersResult === numbers) + assert(numbersPipeResult === numbers.flatMap(f)) + } - def g(i: Int) = i + 10 + // -------------------------------- + // FS2 to Akka Stream conversions + // -------------------------------- - val fSink2: Pipe[IO, Int, Unit] = s => s.map(g).evalMap(i => IO(println(i))) - val aSink2: AkkaSink[Int, Future[Done]] = AkkaSink.fromGraph(fSink2.toSink) + def g(i: Int) = i + 10 - val fStream2: Stream[Pure, Int] = Stream.emits(numbers) - val aSource2: AkkaSource[Int, NotUsed] = AkkaSource.fromGraph(fStream2.covary[IO].toSource) + val fSink2: Pipe[IO, Int, Unit] = s => s.map(g).evalMap(i => IO(println(i))) + val aSink2: AkkaSink[Int, Future[Done]] = AkkaSink.fromGraph(fSink2.toSink) - val fpipe2: Pipe[IO, Int, Int] = s => s.map(g) - val aFlow2: AkkaFlow[Int, Int, NotUsed] = AkkaFlow.fromGraph(fpipe2.toFlow) + val fStream2: Stream[Pure, Int] = Stream.emits(numbers) + val aSource2: AkkaSource[Int, NotUsed] = AkkaSource.fromGraph(fStream2.covary[IO].toSource) - aSource2.toMat(aSink2)(Keep.right).run() // prints numbers - assert(Await.result(aSource2.toMat(AkkaSink.seq)(Keep.right).run(), 5.seconds) == numbers) - assert(Await.result(aSource2.via(aFlow2).toMat(AkkaSink.seq)(Keep.right).run(), 5.seconds) == numbers.map(g)) + val fpipe2: Pipe[IO, Int, Int] = s => s.map(g) + val aFlow2: AkkaFlow[Int, Int, NotUsed] = AkkaFlow.fromGraph(fpipe2.toFlow) - materializer.shutdown() - system.terminate() -} + val fs2ToAkkaExample = for { + _ <- IO.fromFuture(IO(aSource2.toMat(aSink2)(Keep.right).run())) // prints numbers + numbersResult <- IO.fromFuture(IO(aSource2.toMat(AkkaSink.seq)(Keep.right).run())) + numbersFlowResult <- IO.fromFuture(IO(aSource2.via(aFlow2).toMat(AkkaSink.seq)(Keep.right).run())) + } yield { + assert(numbersResult.toList === numbers) + assert(numbersFlowResult.toList === numbers.map(g)) + } + + akkaToFs2Example >> fs2ToAkkaExample.as(ExitCode.Success) + } + } +} \ No newline at end of file