diff --git a/README.md b/README.md index 3f1d288..2d0eec7 100644 --- a/README.md +++ b/README.md @@ -311,7 +311,7 @@ An Akka `Source` is provided that can be used with streams. It is possible to cr directly from the consumer name that is defined in the configuration. Every message that is emitted to the stream is of type `CommitableEvent[ConsumerEvent]` and has to be committed explicitly downstream with a call to `event.commit()`. It is possible to map to a different type of `CommittableEvent` -via the `map` and `mapAsync` functionality. +via the `map` and `mapAsync` functionality. A `KinesisConsumer` is created internally for the `Kinesis.source`, when the factory method isn't defined. ```scala import com.weightwatchers.reactive.kinesis.stream._ @@ -325,7 +325,29 @@ Kinesis .runWith(Sink.seq) ``` -A `KinesisConsumer` is used internally for the `Kinesis.source`. All rules described here for the `KinesisConsumer` also apply for the stream source. +Or you can explicitly path a lambda, to create the `KinesisConsumer`. + +```scala +import akka.actor.{ActorRef, ActorSystem} +import akka.stream.scaladsl.Sink +import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer +import com.weightwatchers.reactive.kinesis.stream._ + +val sys = ActorSystem("kinesis-consumer-system") + +Kinesis + .source( + "consumer-name", + (conf: KinesisConsumer.ConsumerConf, eventProcessor: ActorRef) => KinesisConsumer(conf, eventProcessor, sys) + ) + .take(100) + .map(event => event.map(_.payloadAsString())) // read and map payload as string + .mapAsync(10)(event => event.mapAsync(Downloader.download(event.payload))) // handle an async message + .map(event => event.commit()) // mark the event as handled by calling commit + .runWith(Sink.seq) +``` + +All rules described here for the `KinesisConsumer` also apply for the stream source. ### Graceful Shutdown diff --git a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala index a74ab06..b06c55b 100644 --- a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala +++ b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala @@ -17,11 +17,12 @@ package com.weightwatchers.reactive.kinesis.stream import akka.{Done, NotUsed} -import akka.actor.{ActorSystem, Props} +import akka.actor.{ActorRef, ActorSystem, Props} import akka.stream.scaladsl.{Sink, Source} import com.amazonaws.auth.AWSCredentialsProvider import com.typesafe.config.Config import com.typesafe.scalalogging.LazyLogging +import com.weightwatchers.reactive.kinesis.consumer.{ConsumerService, KinesisConsumer} import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf import com.weightwatchers.reactive.kinesis.models.{ConsumerEvent, ProducerEvent} import com.weightwatchers.reactive.kinesis.producer.{KinesisProducerActor, ProducerConf} @@ -33,6 +34,23 @@ import scala.concurrent.Future */ object Kinesis extends LazyLogging { + /** + * Create a source, that provides KinesisEvents. + * Please note: every KinesisEvent has to be committed during the user flow! + * Uncommitted events will be retransmitted after a timeout. + * + * @param consumerConf the configuration to connect to Kinesis. + * @param createConsumer factory function to create ConsumerService from eventProcessor ActorRef. + * @param system the actor system. + * @return A source of KinesisEvent objects. + */ + def source( + consumerConf: ConsumerConf, + createConsumer: ActorRef => ConsumerService + )(implicit system: ActorSystem): Source[CommittableEvent[ConsumerEvent], NotUsed] = { + Source.fromGraph(new KinesisSourceGraphStage(consumerConf, createConsumer, system)) + } + /** * Create a source, that provides KinesisEvents. * Please note: every KinesisEvent has to be committed during the user flow! @@ -45,7 +63,7 @@ object Kinesis extends LazyLogging { def source( consumerConf: ConsumerConf )(implicit system: ActorSystem): Source[CommittableEvent[ConsumerEvent], NotUsed] = { - Source.fromGraph(new KinesisSourceGraphStage(consumerConf, system)) + source(consumerConf, KinesisConsumer(consumerConf, _, system)) } /** @@ -69,12 +87,69 @@ object Kinesis extends LazyLogging { * @param system the actor system to use. * @return A source of KinesisEvent objects. */ - def source(consumerName: String, inConfig: String = "kinesis")( + def source(consumerName: String, inConfig: String)( implicit system: ActorSystem ): Source[CommittableEvent[ConsumerEvent], NotUsed] = { source(ConsumerConf(system.settings.config.getConfig(inConfig), consumerName)) } + /** + * Create a source by using the actor system configuration, that provides KinesisEvents. + * Please note: every KinesisEvent has to be committed during the user flow! + * Uncommitted events will be retransmitted after a timeout. + * + * A minimal application conf file should look like this: + * {{{ + * kinesis { + * application-name = "SampleService" + * consumer-name { + * stream-name = "sample-stream" + * } + * } + * }}} + * See kinesis reference.conf for a list of all available config options. + * + * @param consumerName the name of the consumer in the application.conf. + * @param system the actor system to use. + * @return A source of KinesisEvent objects. + */ + def source(consumerName: String)( + implicit system: ActorSystem + ): Source[CommittableEvent[ConsumerEvent], NotUsed] = { + source(consumerName, "kinesis") + } + + /** + * Create a source by using the actor system configuration, that provides KinesisEvents. + * Please note: every KinesisEvent has to be committed during the user flow! + * Uncommitted events will be retransmitted after a timeout. + * + * A minimal application conf file should look like this: + * {{{ + * kinesis { + * application-name = "SampleService" + * consumer-name { + * stream-name = "sample-stream" + * } + * } + * }}} + * See kinesis reference.conf for a list of all available config options. + * + * @param consumerName the name of the consumer in the application.conf. + * @param createConsumer factory function to create ConsumerService from eventProcessor ActorRef. + * @param inConfig the name of the sub-config for kinesis. + * @param system the actor system to use. + * @return A source of KinesisEvent objects. + */ + def source( + consumerName: String, + createConsumer: (ConsumerConf, ActorRef) => ConsumerService, + inConfig: String = "kinesis" + )(implicit system: ActorSystem): Source[CommittableEvent[ConsumerEvent], NotUsed] = { + val consumerConf = ConsumerConf(system.settings.config.getConfig(inConfig), consumerName) + source(consumerConf, createConsumer(consumerConf, _)) + } + /** * Create a Sink that accepts ProducerEvents, which get published to Kinesis. * @@ -91,9 +166,10 @@ object Kinesis extends LazyLogging { * @param system the actor system. * @return A sink that accepts ProducerEvents. */ - def sink(props: => Props, maxOutStanding: Int)( - implicit system: ActorSystem - ): Sink[ProducerEvent, Future[Done]] = { + def sink( + props: => Props, + maxOutStanding: Int + )(implicit system: ActorSystem): Sink[ProducerEvent, Future[Done]] = { Sink.fromGraph(new KinesisSinkGraphStage(props, maxOutStanding, system)) } @@ -143,11 +219,11 @@ object Kinesis extends LazyLogging { * @param system the actor system. * @return A sink that accepts ProducerEvents. */ - def sink(kinesisConfig: Config, - producerName: String, - credentialsProvider: Option[AWSCredentialsProvider])( - implicit system: ActorSystem - ): Sink[ProducerEvent, Future[Done]] = { + def sink( + kinesisConfig: Config, + producerName: String, + credentialsProvider: Option[AWSCredentialsProvider] + )(implicit system: ActorSystem): Sink[ProducerEvent, Future[Done]] = { sink( ProducerConf(kinesisConfig, producerName, credentialsProvider) ) @@ -183,11 +259,11 @@ object Kinesis extends LazyLogging { * @param system the actor system. * @return A sink that accepts ProducerEvents. */ - def sink(producerName: String, - inConfig: String = "kinesis", - credentialsProvider: Option[AWSCredentialsProvider] = None)( - implicit system: ActorSystem - ): Sink[ProducerEvent, Future[Done]] = { + def sink( + producerName: String, + inConfig: String = "kinesis", + credentialsProvider: Option[AWSCredentialsProvider] = None + )(implicit system: ActorSystem): Sink[ProducerEvent, Future[Done]] = { sink(system.settings.config.getConfig(inConfig), producerName, credentialsProvider) } } diff --git a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphStage.scala b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphStage.scala index feb7d75..279dbc5 100644 --- a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphStage.scala +++ b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphStage.scala @@ -143,13 +143,6 @@ class KinesisSourceGraphStage(config: ConsumerConf, extends GraphStage[SourceShape[CommittableEvent[ConsumerEvent]]] with LazyLogging { - /** - * Ctor that uses the KinesisConsumer as ConsumerService implementation. - */ - def this(config: ConsumerConf, actorSystem: ActorSystem) = { - this(config, KinesisConsumer(config, _, actorSystem), actorSystem) - } - private[this] val out: Outlet[CommittableEvent[ConsumerEvent]] = Outlet("KinesisSource.out") override val shape: SourceShape[CommittableEvent[ConsumerEvent]] = SourceShape.of(out)