Skip to content

Commit

Permalink
Upgrade to cats-effect, fs2 3.0.0-M1 builds
Browse files Browse the repository at this point in the history
  • Loading branch information
Daenyth committed Oct 8, 2020
1 parent e9a4496 commit 5d5983e
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 188 deletions.
4 changes: 2 additions & 2 deletions project/Version.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
object Version {
val Akka = "2.6.9"
val Camel = "2.20.4"
val CatsEffect = "2.2.0"
val Fs2 = "2.4.4"
val CatsEffect = "3.0.0-M1"
val Fs2 = "3.0.0-M1"
val Log4j = "2.13.0"
val JUnitInterface = "0.11"
val Scalatest = "3.2.0"
Expand Down
24 changes: 10 additions & 14 deletions streamz-camel-fs2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ Camel DSL for FS2
-----------------

[Apache Camel endpoints](http://camel.apache.org/components.html) can be integrated into [FS2](https://github.com/functional-streams-for-scala/fs2) applications with a [DSL](#dsl).

### Dependencies

The DSL is provided by the `streamz-camel-fs2` artifact which is available for Scala 2.11 and 2.12:

resolvers += "krasserm at bintray" at "http://dl.bintray.com/krasserm/maven"

libraryDependencies += "com.github.krasserm" %% "streamz-camel-fs2" % "0.10-M2"

### Configuration

The consumer receive timeout on Camel endpoints defaults to 500 ms. If you need to change that, you can do so in `application.conf`:
Expand All @@ -31,11 +31,11 @@ Its usage requires an implicit [`StreamContext`](http://krasserm.github.io/strea
```scala
import streamz.camel.StreamContext

// contains an internally managed CamelContext
// contains an internally managed CamelContext
implicit val streamContext: StreamContext = StreamContext()
```

Applications that want to re-use an existing, externally managed `CamelContext` should create a `StreamContext` with `StreamContext(camelContext: CamelContext)`:
Applications that want to re-use an existing, externally managed `CamelContext` should create a `StreamContext` with `StreamContext(camelContext: CamelContext)`:

```scala
import org.apache.camel.CamelContext
Expand All @@ -49,7 +49,7 @@ implicit val streamContext: StreamContext = StreamContext(camelContext)
```
A `StreamContext` internally manages an `executorService` for running blocking endpoint operations. Applications can configure a custom executor service by providing an `executorServiceFactory` during `StreamContext` creation. See [API docs](http://krasserm.github.io/streamz/scala-2.12/unidoc/streamz/camel/StreamContext$.html) for details.

After usage, a `StreamContext` should be stopped with `streamContext.stop()`.
After usage, a `StreamContext` should be stopped with `streamContext.stop()`.

#### Receiving in-only message exchanges from an endpoint

Expand All @@ -58,8 +58,8 @@ An FS2 stream that emits messages consumed from a Camel endpoint can be created
```scala
import cats.effect.IO
import fs2.Stream
import streamz.camel.StreamContext
import streamz.camel.StreamMessage
import streamz.camel.StreamContext
import streamz.camel.StreamMessage
import streamz.camel.fs2.dsl._

val s1: Stream[IO, StreamMessage[String]] = receive[IO, String]("seda:q1")
Expand All @@ -73,7 +73,7 @@ val s1b: Stream[IO, String] = receiveBody[IO, String]("seda:q1")

This is equivalent to `receive[IO, String]("seda:q1").map(_.body)`.

`receive` and `receiveBody` can only be used with endpoints that create [in-only message exchanges](http://camel.apache.org/exchange-pattern.html).
`receive` and `receiveBody` can only be used with endpoints that create [in-only message exchanges](http://camel.apache.org/exchange-pattern.html).

#### Receiving in-out message exchanges from an endpoint

Expand All @@ -87,9 +87,7 @@ For sending a `StreamMessage` to a Camel endpoint, the `send` combinator should
val s2: Stream[IO, StreamMessage[String]] = s1.send("seda:q2")
```

This initiates an in-only message [exchange](http://camel.apache.org/exchange.html) with an endpoint and continues the stream with the sent `StreamMessage`.

The `send` combinator is not only available for streams of type `Stream[IO, StreamMessage[A]]` but more generally for any `Stream[F, A]` where `F: ContextShift: Async`.
This initiates an in-only message [exchange](http://camel.apache.org/exchange.html) with an endpoint and continues the stream with the sent `StreamMessage`.

```scala
val s2b: Stream[IO, String] = s1b.send("seda:q2")
Expand All @@ -105,9 +103,7 @@ For sending a request `StreamMessage` to an endpoint and obtaining a reply, the
val s3: Stream[IO, StreamMessage[Int]] = s2.sendRequest[Int]("bean:service?method=weight")
```

This initiates an in-out message exchange with the endpoint and continues the stream with the output `StreamMessage`. Here, a [Bean endpoint](https://camel.apache.org/bean.html) is used to call the `weight(String): Int` method on an object that is registered in the `CamelContext` under the name `service`. The input message body is used as `weight` call argument, the output message body is assigned the return value. The `sendRequest` type parameter (`Int`) specifies the expected output value type. The output message body can also be converted to another type provided that an appropriate Camel type converter is available (`Double`, for example).

The `sendRequest` combinator is not only available for streams of type `Stream[IO, StreamMessage[A]]` but more generally for any `Stream[F, A]` where `F: ContextShift: Async`.
This initiates an in-out message exchange with the endpoint and continues the stream with the output `StreamMessage`. Here, a [Bean endpoint](https://camel.apache.org/bean.html) is used to call the `weight(String): Int` method on an object that is registered in the `CamelContext` under the name `service`. The input message body is used as `weight` call argument, the output message body is assigned the return value. The `sendRequest` type parameter (`Int`) specifies the expected output value type. The output message body can also be converted to another type provided that an appropriate Camel type converter is available (`Double`, for example).

```scala
val s3b: Stream[IO, Int] = s2b.sendRequest[Int]("bean:service?method=weight")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ package streamz.camel.fs2

import java.util.concurrent.TimeUnit

import cats.effect.{ Async, ContextShift }
import cats.implicits._
import cats.effect.Async
import fs2._
import org.apache.camel.spi.Synchronization
import org.apache.camel.{ Exchange, ExchangePattern }
Expand All @@ -33,7 +32,7 @@ package object dsl {
/**
* Camel endpoint combinators for [[StreamMessage]] streams of type `Stream[F, StreamMessage[A]]`.
*/
implicit class SendDsl[F[_]: ContextShift: Async, A](self: Stream[F, StreamMessage[A]]) {
implicit class SendDsl[F[_]: Async, A](self: Stream[F, StreamMessage[A]]) {
/**
* @see [[dsl.send]]
*/
Expand All @@ -50,7 +49,7 @@ package object dsl {
/**
* Camel endpoint combinators for [[StreamMessage]] body streams of type `Stream[F, A]`.
*/
implicit class SendBodyDsl[F[_]: ContextShift: Async, A](self: Stream[F, A]) {
implicit class SendBodyDsl[F[_]: Async, A](self: Stream[F, A]) {
/**
* @see [[dsl.sendBody]]
*/
Expand All @@ -71,13 +70,13 @@ package object dsl {
/**
* @see [[dsl.send]]
*/
def send[F[_]](uri: String)(implicit context: StreamContext, contextShift: ContextShift[F], async: Async[F]): Stream[F, StreamMessage[A]] =
def send[F[_]](uri: String)(implicit context: StreamContext, async: Async[F]): Stream[F, StreamMessage[A]] =
new SendDsl[F, A](self.covary[F]).send(uri)

/**
* @see [[dsl.sendRequest()]]
*/
def sendRequest[F[_], B](uri: String)(implicit context: StreamContext, tag: ClassTag[B], contextShift: ContextShift[F], async: Async[F]): Stream[F, StreamMessage[B]] =
def sendRequest[F[_], B](uri: String)(implicit context: StreamContext, tag: ClassTag[B], async: Async[F]): Stream[F, StreamMessage[B]] =
new SendDsl[F, A](self.covary[F]).sendRequest(uri)
}

Expand All @@ -88,13 +87,13 @@ package object dsl {
/**
* @see [[dsl.sendBody]]
*/
def send[F[_]: ContextShift: Async](uri: String)(implicit context: StreamContext): Stream[F, A] =
def send[F[_]: Async](uri: String)(implicit context: StreamContext): Stream[F, A] =
new SendBodyDsl[F, A](self.covary[F]).send(uri)

/**
* @see [[dsl.sendRequestBody]]
*/
def sendRequest[F[_]: ContextShift: Async, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Stream[F, B] =
def sendRequest[F[_]: Async, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Stream[F, B] =
new SendBodyDsl[F, A](self.covary[F]).sendRequest(uri)
}

Expand All @@ -110,7 +109,7 @@ package object dsl {
* @param uri Camel endpoint URI.
* @throws org.apache.camel.TypeConversionException if type conversion fails.
*/
def receive[F[_]: ContextShift: Async, A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[F, StreamMessage[A]] = {
def receive[F[_]: Async, A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[F, StreamMessage[A]] = {
consume(uri).filter(_ != null)
}

Expand All @@ -126,7 +125,7 @@ package object dsl {
* @param uri Camel endpoint URI.
* @throws org.apache.camel.TypeConversionException if type conversion fails.
*/
def receiveBody[F[_]: ContextShift: Async, A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[F, A] =
def receiveBody[F[_]: Async, A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[F, A] =
receive(uri).map(_.body)

/**
Expand All @@ -136,7 +135,7 @@ package object dsl {
*
* @param uri Camel endpoint URI.
*/
def send[F[_]: ContextShift: Async, A](uri: String)(implicit context: StreamContext): Pipe[F, StreamMessage[A], StreamMessage[A]] =
def send[F[_]: Async, A](uri: String)(implicit context: StreamContext): Pipe[F, StreamMessage[A], StreamMessage[A]] =
produce[F, A, A](uri, ExchangePattern.InOnly, (message, _) => message)

/**
Expand All @@ -146,7 +145,7 @@ package object dsl {
*
* @param uri Camel endpoint URI.
*/
def sendBody[F[_]: ContextShift: Async, A](uri: String)(implicit context: StreamContext): Pipe[F, A, A] =
def sendBody[F[_]: Async, A](uri: String)(implicit context: StreamContext): Pipe[F, A, A] =
s => s.map(StreamMessage(_)).through(send(uri)).map(_.body)

/**
Expand All @@ -158,7 +157,7 @@ package object dsl {
* @param uri Camel endpoint URI.
* @throws org.apache.camel.TypeConversionException if type conversion fails.
*/
def sendRequest[F[_]: ContextShift: Async, A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[F, StreamMessage[A], StreamMessage[B]] =
def sendRequest[F[_]: Async, A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[F, StreamMessage[A], StreamMessage[B]] =
produce[F, A, B](uri, ExchangePattern.InOut, (_, exchange) => StreamMessage.from[B](exchange.getOut))

/**
Expand All @@ -170,13 +169,13 @@ package object dsl {
* @param uri Camel endpoint URI.
* @throws org.apache.camel.TypeConversionException if type conversion fails.
*/
def sendRequestBody[F[_]: ContextShift: Async, A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[F, A, B] =
def sendRequestBody[F[_]: Async, A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[F, A, B] =
s => s.map(StreamMessage(_)).through(sendRequest[F, A, B](uri)).map(_.body)

private def consume[F[_], A](uri: String)(implicit context: StreamContext, tag: ClassTag[A], contextShift: ContextShift[F], F: Async[F]): Stream[F, StreamMessage[A]] = {
private def consume[F[_], A](uri: String)(implicit context: StreamContext, tag: ClassTag[A], F: Async[F]): Stream[F, StreamMessage[A]] = {
val timeout = context.config.getDuration("streamz.camel.consumer.receive.timeout", TimeUnit.MILLISECONDS)
Stream.repeatEval {
contextShift.shift >> F.async[StreamMessage[A]] { callback =>
F.async_[StreamMessage[A]] { callback =>
Try(context.consumerTemplate.receive(uri, timeout)) match {
case Success(null) =>
callback(Right(null))
Expand All @@ -199,10 +198,10 @@ package object dsl {
}
}

private def produce[F[_], A, B](uri: String, pattern: ExchangePattern, result: (StreamMessage[A], Exchange) => StreamMessage[B])(implicit context: StreamContext, contextShift: ContextShift[F], F: Async[F]): Pipe[F, StreamMessage[A], StreamMessage[B]] = { s =>
private def produce[F[_], A, B](uri: String, pattern: ExchangePattern, result: (StreamMessage[A], Exchange) => StreamMessage[B])(implicit context: StreamContext, F: Async[F]): Pipe[F, StreamMessage[A], StreamMessage[B]] = { s =>
s.flatMap { message =>
Stream.eval {
contextShift.shift >> F.async[StreamMessage[B]] { callback =>
F.async_[StreamMessage[B]] { callback =>
context.producerTemplate.asyncCallback(uri, context.createExchange(message, pattern), new Synchronization {
override def onFailure(exchange: Exchange): Unit =
callback(Left(exchange.getException))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class DslSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll {
import cats.effect.unsafe.implicits.global

val camelRegistry = new SimpleRegistry
val camelContext = new DefaultCamelContext()

camelContext.setRegistry(camelRegistry)
camelRegistry.put("service", new Service)

implicit val streamContext = new StreamContext(camelContext)
implicit val contextShift = IO.contextShift(scala.concurrent.ExecutionContext.global)

import streamContext._

Expand Down
Loading

0 comments on commit 5d5983e

Please sign in to comment.