Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Add additional factory methods to Kinesis to create Akka Source #61

Merged
merged 5 commits into from
Apr 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ An Akka `Source` is provided that can be used with streams. It is possible to cr
directly from the consumer name that is defined in the configuration.
Every message that is emitted to the stream is of type `CommitableEvent[ConsumerEvent]` and has to be committed
explicitly downstream with a call to `event.commit()`. It is possible to map to a different type of `CommittableEvent`
via the `map` and `mapAsync` functionality.
via the `map` and `mapAsync` functionality. A `KinesisConsumer` is created internally for the `Kinesis.source`, when the factory method isn't defined.

```scala
import com.weightwatchers.reactive.kinesis.stream._
Expand All @@ -325,7 +325,29 @@ Kinesis
.runWith(Sink.seq)
```

A `KinesisConsumer` is used internally for the `Kinesis.source`. All rules described here for the `KinesisConsumer` also apply for the stream source.
Or you can explicitly path a lambda, to create the `KinesisConsumer`.

```scala
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.scaladsl.Sink
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer
import com.weightwatchers.reactive.kinesis.stream._

val sys = ActorSystem("kinesis-consumer-system")

Kinesis
.source(
"consumer-name",
(conf: KinesisConsumer.ConsumerConf, eventProcessor: ActorRef) => KinesisConsumer(conf, eventProcessor, sys)
)
.take(100)
.map(event => event.map(_.payloadAsString())) // read and map payload as string
.mapAsync(10)(event => event.mapAsync(Downloader.download(event.payload))) // handle an async message
.map(event => event.commit()) // mark the event as handled by calling commit
.runWith(Sink.seq)
```

All rules described here for the `KinesisConsumer` also apply for the stream source.

<a name="usage-usage-consumer-graceful-shutdown"></a>
### Graceful Shutdown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
package com.weightwatchers.reactive.kinesis.stream

import akka.{Done, NotUsed}
import akka.actor.{ActorSystem, Props}
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.stream.scaladsl.{Sink, Source}
import com.amazonaws.auth.AWSCredentialsProvider
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import com.weightwatchers.reactive.kinesis.consumer.{ConsumerService, KinesisConsumer}
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf
import com.weightwatchers.reactive.kinesis.models.{ConsumerEvent, ProducerEvent}
import com.weightwatchers.reactive.kinesis.producer.{KinesisProducerActor, ProducerConf}
Expand All @@ -33,6 +34,23 @@ import scala.concurrent.Future
*/
object Kinesis extends LazyLogging {

/**
* Create a source, that provides KinesisEvents.
* Please note: every KinesisEvent has to be committed during the user flow!
* Uncommitted events will be retransmitted after a timeout.
*
* @param consumerConf the configuration to connect to Kinesis.
* @param createConsumer factory function to create ConsumerService from eventProcessor ActorRef.
* @param system the actor system.
* @return A source of KinesisEvent objects.
*/
def source(
consumerConf: ConsumerConf,
createConsumer: ActorRef => ConsumerService
)(implicit system: ActorSystem): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
Source.fromGraph(new KinesisSourceGraphStage(consumerConf, createConsumer, system))
}

/**
* Create a source, that provides KinesisEvents.
* Please note: every KinesisEvent has to be committed during the user flow!
Expand All @@ -45,7 +63,7 @@ object Kinesis extends LazyLogging {
def source(
consumerConf: ConsumerConf
)(implicit system: ActorSystem): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
Source.fromGraph(new KinesisSourceGraphStage(consumerConf, system))
source(consumerConf, KinesisConsumer(consumerConf, _, system))
}

/**
Expand All @@ -69,12 +87,69 @@ object Kinesis extends LazyLogging {
* @param system the actor system to use.
* @return A source of KinesisEvent objects.
*/
def source(consumerName: String, inConfig: String = "kinesis")(
def source(consumerName: String, inConfig: String)(
implicit system: ActorSystem
): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
source(ConsumerConf(system.settings.config.getConfig(inConfig), consumerName))
}

/**
* Create a source by using the actor system configuration, that provides KinesisEvents.
* Please note: every KinesisEvent has to be committed during the user flow!
* Uncommitted events will be retransmitted after a timeout.
*
* A minimal application conf file should look like this:
* {{{
* kinesis {
* application-name = "SampleService"
* consumer-name {
* stream-name = "sample-stream"
* }
* }
* }}}
* See kinesis reference.conf for a list of all available config options.
*
* @param consumerName the name of the consumer in the application.conf.
* @param system the actor system to use.
* @return A source of KinesisEvent objects.
*/
def source(consumerName: String)(
implicit system: ActorSystem
): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
source(consumerName, "kinesis")
}

/**
* Create a source by using the actor system configuration, that provides KinesisEvents.
* Please note: every KinesisEvent has to be committed during the user flow!
* Uncommitted events will be retransmitted after a timeout.
*
* A minimal application conf file should look like this:
* {{{
* kinesis {
* application-name = "SampleService"
* consumer-name {
* stream-name = "sample-stream"
* }
* }
* }}}
* See kinesis reference.conf for a list of all available config options.
*
* @param consumerName the name of the consumer in the application.conf.
* @param createConsumer factory function to create ConsumerService from eventProcessor ActorRef.
* @param inConfig the name of the sub-config for kinesis.
* @param system the actor system to use.
* @return A source of KinesisEvent objects.
*/
def source(
consumerName: String,
createConsumer: (ConsumerConf, ActorRef) => ConsumerService,
inConfig: String = "kinesis"
)(implicit system: ActorSystem): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
val consumerConf = ConsumerConf(system.settings.config.getConfig(inConfig), consumerName)
source(consumerConf, createConsumer(consumerConf, _))
}

/**
* Create a Sink that accepts ProducerEvents, which get published to Kinesis.
*
Expand All @@ -91,9 +166,10 @@ object Kinesis extends LazyLogging {
* @param system the actor system.
* @return A sink that accepts ProducerEvents.
*/
def sink(props: => Props, maxOutStanding: Int)(
implicit system: ActorSystem
): Sink[ProducerEvent, Future[Done]] = {
def sink(
props: => Props,
maxOutStanding: Int
)(implicit system: ActorSystem): Sink[ProducerEvent, Future[Done]] = {
Sink.fromGraph(new KinesisSinkGraphStage(props, maxOutStanding, system))
}

Expand Down Expand Up @@ -143,11 +219,11 @@ object Kinesis extends LazyLogging {
* @param system the actor system.
* @return A sink that accepts ProducerEvents.
*/
def sink(kinesisConfig: Config,
producerName: String,
credentialsProvider: Option[AWSCredentialsProvider])(
implicit system: ActorSystem
): Sink[ProducerEvent, Future[Done]] = {
def sink(
kinesisConfig: Config,
producerName: String,
credentialsProvider: Option[AWSCredentialsProvider]
)(implicit system: ActorSystem): Sink[ProducerEvent, Future[Done]] = {
sink(
ProducerConf(kinesisConfig, producerName, credentialsProvider)
)
Expand Down Expand Up @@ -183,11 +259,11 @@ object Kinesis extends LazyLogging {
* @param system the actor system.
* @return A sink that accepts ProducerEvents.
*/
def sink(producerName: String,
inConfig: String = "kinesis",
credentialsProvider: Option[AWSCredentialsProvider] = None)(
implicit system: ActorSystem
): Sink[ProducerEvent, Future[Done]] = {
def sink(
producerName: String,
inConfig: String = "kinesis",
credentialsProvider: Option[AWSCredentialsProvider] = None
)(implicit system: ActorSystem): Sink[ProducerEvent, Future[Done]] = {
sink(system.settings.config.getConfig(inConfig), producerName, credentialsProvider)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,6 @@ class KinesisSourceGraphStage(config: ConsumerConf,
extends GraphStage[SourceShape[CommittableEvent[ConsumerEvent]]]
with LazyLogging {

/**
* Ctor that uses the KinesisConsumer as ConsumerService implementation.
*/
def this(config: ConsumerConf, actorSystem: ActorSystem) = {
this(config, KinesisConsumer(config, _, actorSystem), actorSystem)
}

private[this] val out: Outlet[CommittableEvent[ConsumerEvent]] = Outlet("KinesisSource.out")
override val shape: SourceShape[CommittableEvent[ConsumerEvent]] = SourceShape.of(out)

Expand Down