Skip to content

Commit

Permalink
With transactional partitioned source
Browse files Browse the repository at this point in the history
  • Loading branch information
charlibot authored and seglo committed Nov 27, 2019
1 parent 7967fb0 commit 7934d20
Show file tree
Hide file tree
Showing 30 changed files with 1,643 additions and 422 deletions.
17 changes: 12 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@ services:
- docker

before_install:
# upgrade to a later docker-compose which supports services.kafka.scale
- sudo rm /usr/local/bin/docker-compose
- curl -L https://github.com/docker/compose/releases/download/1.22.0/docker-compose-`uname -s`-`uname -m` > docker-compose
- chmod +x docker-compose
- sudo mv docker-compose /usr/local/bin
# fetch full history for correct current and previous version detection
- git fetch --unshallow
# using jabba for custom jdk management
Expand All @@ -25,6 +20,14 @@ script:

jobs:
include:
- stage: debug
name: "debug test: TransactionsSpec x50"
script: sbt "tests/testOnly *.TransactionsSpec -DtimesToRepeat=50"
- name: "debug test: must provide consistency when multiple transactional streams are being restarted x10"
script: sbt "tests/it:testOnly *.TransactionsSourceSpec -- -z \"must provide consistency when multiple transactional streams are being restarted\" -DtimesToRepeat=10"
- name: "debug test: must provide consistency when multiple partitioned transactional streams are being restarted x10"
script: sbt "tests/it:testOnly *.TransactionsPartitionedSourceSpec -- -z \"must provide consistency when multiple partitioned transactional streams are being restarted\" -DtimesToRepeat=10"

- stage: check
script: sbt verifyCodeStyle
name: "Code style check. Run locally with: sbt verifyCodeStyle"
Expand Down Expand Up @@ -80,6 +83,10 @@ jobs:
name: "Publish API and reference documentation"

stages:
# runs on master commits and PRs
- name: debug
if: NOT tag =~ ^v

# runs on master commits and PRs
- name: check
if: NOT tag =~ ^v
Expand Down
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ lazy val `alpakka-kafka` =
| tests/it:test
| run integration tests backed by Docker containers
|
| tests/testOnly -- -t "A consume-transform-produce cycle must complete in happy-path scenario"
| run a single test with an exact name (use -z for partial match)
|
| benchmarks/it:testOnly *.AlpakkaKafkaPlainConsumer
| run a single benchmark backed by Docker containers
""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ProblemFilters.exclude[MissingTypesProblem]("akka.kafka.ConsumerMessage$PartitionOffsetCommittedMarker$")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.ConsumerMessage#PartitionOffsetCommittedMarker.apply")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.ConsumerMessage#PartitionOffsetCommittedMarker.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.ConsumerMessage#PartitionOffset.withCommittedMarker")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.kafka.ConsumerMessage#PartitionOffsetCommittedMarker.unapply")
6 changes: 2 additions & 4 deletions core/src/main/scala/akka/kafka/ConsumerMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,6 @@ object ConsumerMessage {
def withMetadata(metadata: String) =
PartitionOffsetMetadata(key, offset, metadata)

@InternalApi private[kafka] def withCommittedMarker(committedMarker: CommittedMarker) =
PartitionOffsetCommittedMarker(key, offset, committedMarker)

override def toString = s"PartitionOffset(key=$key,offset=$offset)"

override def equals(other: Any): Boolean = other match {
Expand Down Expand Up @@ -138,7 +135,8 @@ object ConsumerMessage {
@InternalApi private[kafka] final case class PartitionOffsetCommittedMarker(
override val key: GroupTopicPartition,
override val offset: Long,
private[kafka] val committedMarker: CommittedMarker
private[kafka] val committedMarker: CommittedMarker,
private[kafka] val fromPartitionedSource: Boolean
) extends PartitionOffset(key, offset)

/**
Expand Down
65 changes: 54 additions & 11 deletions core/src/main/scala/akka/kafka/internal/CommittableSources.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import akka.kafka.scaladsl.Consumer.Control
import akka.pattern.AskTimeoutException
import akka.stream.SourceShape
import akka.stream.scaladsl.Source
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.{AsyncCallback, GraphStageLogic}
import akka.util.Timeout
import akka.{Done, NotUsed}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, OffsetAndMetadata}
Expand Down Expand Up @@ -102,17 +102,34 @@ private[kafka] final class CommittableSubSource[K, V](
) {
override protected def logic(
shape: SourceShape[(TopicPartition, Source[CommittableMessage[K, V], NotUsed])]
): GraphStageLogic with Control =
new SubSourceLogic[K, V, CommittableMessage[K, V]](shape, settings, subscription, getOffsetsOnAssign, onRevoke)
with CommittableMessageBuilder[K, V]
with MetricsControl {
override def metadataFromRecord(record: ConsumerRecord[K, V]): String = _metadataFromRecord(record)
override def groupId: String = settings.properties(ConsumerConfig.GROUP_ID_CONFIG)
lazy val committer: KafkaAsyncConsumerCommitterRef = {
val ec = materializer.executionContext
new KafkaAsyncConsumerCommitterRef(consumerActor, settings.commitTimeout)(ec)
}
): GraphStageLogic with Control = {

val factory = new SubSourceStageLogicFactory[K, V, CommittableMessage[K, V]] {
def create(
shape: SourceShape[CommittableMessage[K, V]],
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)],
actorNumber: Int
): SubSourceStageLogic[K, V, CommittableMessage[K, V]] =
new CommittableSubSourceStageLogic(shape,
tp,
consumerActor,
subSourceStartedCb,
subSourceCancelledCb,
actorNumber,
settings,
_metadataFromRecord)

}
new SubSourceLogic[K, V, CommittableMessage[K, V]](shape,
settings,
subscription,
getOffsetsOnAssign,
onRevoke,
subSourceStageLogicFactory = factory)
}
}

/**
Expand Down Expand Up @@ -191,3 +208,29 @@ private[kafka] class KafkaAsyncConsumerCommitterRef(private val consumerActor: A
case _ => false
}
}

@InternalApi
private class CommittableSubSourceStageLogic[K, V](
shape: SourceShape[CommittableMessage[K, V]],
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)],
actorNumber: Int,
consumerSettings: ConsumerSettings[K, V],
_metadataFromRecord: ConsumerRecord[K, V] => String = CommittableMessageBuilder.NoMetadataFromRecord
) extends SubSourceStageLogic[K, V, CommittableMessage[K, V]](shape,
tp,
consumerActor,
subSourceStartedCb,
subSourceCancelledCb,
actorNumber)
with CommittableMessageBuilder[K, V] {

override def metadataFromRecord(record: ConsumerRecord[K, V]): String = _metadataFromRecord(record)
override def groupId: String = consumerSettings.properties(ConsumerConfig.GROUP_ID_CONFIG)
lazy val committer: KafkaAsyncConsumerCommitterRef = {
val ec = materializer.executionContext
new KafkaAsyncConsumerCommitterRef(consumerActor, consumerSettings.commitTimeout)(ec)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V,
// ---- initialization
override def preStart(): Unit = {
super.preStart()
resolveProducer()
resolveProducer(stage.producerSettings)
}

/** When the producer is set up, the sink pulls and schedules the first commit. */
Expand Down
22 changes: 16 additions & 6 deletions core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
private lazy val decider: Decider =
inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
protected val awaitingConfirmation = new AtomicInteger(0)

private var inIsClosed = false
private var completionState: Option[Try[Done]] = None

Expand All @@ -60,7 +61,7 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:

override def preStart(): Unit = {
super.preStart()
resolveProducer()
resolveProducer(stage.settings)
}

def checkForCompletion(): Unit =
Expand All @@ -85,11 +86,14 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
failStage(ex)
}

def postSend(msg: Envelope[K, V, P]) = ()
def filterSend(msg: Envelope[K, V, P]): Boolean = true

def postSend(msg: Envelope[K, V, P]): Unit = ()

override protected def producerAssigned(): Unit = resumeDemand()

protected def resumeDemand(tryToPull: Boolean = true): Unit = {
log.debug("Resume demand")
setHandler(stage.out, new OutHandler {
override def onPull(): Unit = tryPull(stage.in)
})
Expand All @@ -99,21 +103,28 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
}
}

protected def suspendDemand(): Unit =
protected def suspendDemand(fromStageLogicConstructor: Boolean = false): Unit = {
// not permitted to access stage logic members from constructor
if (!fromStageLogicConstructor) log.debug("Suspend demand")
setHandler(
stage.out,
new OutHandler {
override def onPull(): Unit = ()
}
)
}

// suspend demand until a Producer has been created
suspendDemand()
suspendDemand(fromStageLogicConstructor = true)

setHandler(
stage.in,
new InHandler {
override def onPush(): Unit = produce(grab(stage.in))
override def onPush(): Unit = {
val msg = grab(stage.in)
if (filterSend(msg))
produce(msg)
}

override def onUpstreamFinish(): Unit = {
inIsClosed = true
Expand Down Expand Up @@ -183,5 +194,4 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
closeProducer()
super.postStop()
}

}
28 changes: 25 additions & 3 deletions core/src/main/scala/akka/kafka/internal/DeferredProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,42 @@ import org.apache.kafka.clients.producer.Producer
import scala.util.control.NonFatal
import scala.util.{Failure, Success}

/**
* INTERNAL API
*/
@InternalApi
private[kafka] object DeferredProducer {
sealed trait ProducerAssignmentLifecycle
case object Unassigned extends ProducerAssignmentLifecycle
case object AsyncCreateRequestSent extends ProducerAssignmentLifecycle
case object Assigned extends ProducerAssignmentLifecycle
}

/**
* INTERNAL API
*/
@InternalApi
private[kafka] trait DeferredProducer[K, V] {
self: GraphStageLogic with StageLogging =>

import DeferredProducer._

/** The Kafka producer may be created lazily, assigned via `preStart` in `assignProducer`. */
protected var producer: Producer[K, V] = _
protected var producerAssignmentLifecycle: ProducerAssignmentLifecycle = Unassigned

protected def producerSettings: ProducerSettings[K, V]
protected def producerAssigned(): Unit
protected def closeAndFailStageCb: AsyncCallback[Throwable]

private def assignProducer(p: Producer[K, V]): Unit = {
producer = p
changeProducerAssignmentLifecycle(Assigned)
producerAssigned()
}

final protected def resolveProducer(): Unit = {
val producerFuture = producerSettings.createKafkaProducerAsync()(materializer.executionContext)
final protected def resolveProducer(settings: ProducerSettings[K, V]): Unit = {
val producerFuture = settings.createKafkaProducerAsync()(materializer.executionContext)
producerFuture.value match {
case Some(Success(p)) => assignProducer(p)
case Some(Failure(e)) => failStage(e)
Expand All @@ -50,9 +65,16 @@ private[kafka] trait DeferredProducer[K, V] {
e
}
)(ExecutionContexts.sameThreadExecutionContext)
changeProducerAssignmentLifecycle(AsyncCreateRequestSent)
}
}

protected def changeProducerAssignmentLifecycle(state: ProducerAssignmentLifecycle): Unit = {
val oldState = producerAssignmentLifecycle
producerAssignmentLifecycle = state
log.debug("Asynchronous producer assignment lifecycle changed '{} -> {}'", oldState, state)
}

protected def closeProducerImmediately(): Unit =
if (producer != null) {
// Discard unsent ProducerRecords after encountering a send-failure in ProducerStage
Expand All @@ -61,7 +83,7 @@ private[kafka] trait DeferredProducer[K, V] {
}

protected def closeProducer(): Unit =
if (producerSettings.closeProducerOnStop && producer != null) {
if (producerSettings.closeProducerOnStop && producerAssignmentLifecycle == Assigned) {
try {
// we do not have to check if producer was already closed in send-callback as `flush()` and `close()` are effectively no-ops in this case
producer.flush()
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/akka/kafka/internal/MessageBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ private[kafka] trait TransactionalMessageBuilderBase[K, V, Msg] extends MessageB
def committedMarker: CommittedMarker

def onMessage(consumerMessage: ConsumerRecord[K, V]): Unit

def fromPartitionedSource: Boolean
}

/** Internal API */
Expand All @@ -60,7 +62,8 @@ private[kafka] trait TransactionalMessageBuilder[K, V]
partition = rec.partition
),
offset = rec.offset,
committedMarker
committedMarker,
fromPartitionedSource
)
ConsumerMessage.TransactionalMessage(rec, offset)
}
Expand All @@ -79,7 +82,8 @@ private[kafka] trait TransactionalOffsetContextBuilder[K, V]
partition = rec.partition
),
offset = rec.offset,
committedMarker
committedMarker,
fromPartitionedSource
)
(rec, offset)
}
Expand Down
32 changes: 27 additions & 5 deletions core/src/main/scala/akka/kafka/internal/PlainSources.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.kafka.scaladsl.Consumer.Control
import akka.kafka.{AutoSubscription, ConsumerSettings, ManualSubscription, Subscription}
import akka.stream.SourceShape
import akka.stream.scaladsl.Source
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.{AsyncCallback, GraphStageLogic}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition

Expand Down Expand Up @@ -51,8 +51,30 @@ private[kafka] final class PlainSubSource[K, V](
) {
override protected def logic(
shape: SourceShape[(TopicPartition, Source[ConsumerRecord[K, V], NotUsed])]
): GraphStageLogic with Control =
new SubSourceLogic[K, V, ConsumerRecord[K, V]](shape, settings, subscription, getOffsetsOnAssign, onRevoke)
with PlainMessageBuilder[K, V]
with MetricsControl
): GraphStageLogic with Control = {

val factory = new SubSourceStageLogicFactory[K, V, ConsumerRecord[K, V]] {
def create(
shape: SourceShape[ConsumerRecord[K, V]],
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)],
actorNumber: Int
): SubSourceStageLogic[K, V, ConsumerRecord[K, V]] =
new SubSourceStageLogic[K, V, ConsumerRecord[K, V]](shape,
tp,
consumerActor,
subSourceStartedCb,
subSourceCancelledCb,
actorNumber) with PlainMessageBuilder[K, V]
}

new SubSourceLogic[K, V, ConsumerRecord[K, V]](shape,
settings,
subscription,
getOffsetsOnAssign,
onRevoke,
subSourceStageLogicFactory = factory)
}
}
Loading

0 comments on commit 7934d20

Please sign in to comment.