Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partitioned transactional source phase 1 #930

Merged
merged 2 commits into from
Nov 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@ services:
- docker

before_install:
# upgrade to a later docker-compose which supports services.kafka.scale
- sudo rm /usr/local/bin/docker-compose
- curl -L https://github.com/docker/compose/releases/download/1.22.0/docker-compose-`uname -s`-`uname -m` > docker-compose
- chmod +x docker-compose
- sudo mv docker-compose /usr/local/bin
# fetch full history for correct current and previous version detection
- git fetch --unshallow
# using jabba for custom jdk management
Expand Down Expand Up @@ -66,7 +61,7 @@ jobs:

- stage: integration
env: CMD="tests/it:test"
name: "Run multi-broker integration tests"
name: "Run multi-broker and long running integration tests"
- env: CMD="benchmarks/it:compile"
name: "Compile benchmark tests"

Expand Down
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ lazy val `alpakka-kafka` =
| tests/it:test
| run integration tests backed by Docker containers
|
| tests/testOnly -- -t "A consume-transform-produce cycle must complete in happy-path scenario"
| run a single test with an exact name (use -z for partial match)
|
| benchmarks/it:testOnly *.AlpakkaKafkaPlainConsumer
| run a single benchmark backed by Docker containers
""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ProblemFilters.exclude[MissingTypesProblem]("akka.kafka.ConsumerMessage$PartitionOffsetCommittedMarker$")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.ConsumerMessage#PartitionOffsetCommittedMarker.apply")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.ConsumerMessage#PartitionOffsetCommittedMarker.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.ConsumerMessage#PartitionOffset.withCommittedMarker")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.kafka.ConsumerMessage#PartitionOffsetCommittedMarker.unapply")
6 changes: 2 additions & 4 deletions core/src/main/scala/akka/kafka/ConsumerMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,6 @@ object ConsumerMessage {
def withMetadata(metadata: String) =
PartitionOffsetMetadata(key, offset, metadata)

@InternalApi private[kafka] def withCommittedMarker(committedMarker: CommittedMarker) =
PartitionOffsetCommittedMarker(key, offset, committedMarker)

override def toString = s"PartitionOffset(key=$key,offset=$offset)"

override def equals(other: Any): Boolean = other match {
Expand Down Expand Up @@ -138,7 +135,8 @@ object ConsumerMessage {
@InternalApi private[kafka] final case class PartitionOffsetCommittedMarker(
override val key: GroupTopicPartition,
override val offset: Long,
private[kafka] val committedMarker: CommittedMarker
private[kafka] val committedMarker: CommittedMarker,
private[kafka] val fromPartitionedSource: Boolean
) extends PartitionOffset(key, offset)

/**
Expand Down
66 changes: 55 additions & 11 deletions core/src/main/scala/akka/kafka/internal/CommittableSources.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import akka.annotation.InternalApi
import akka.kafka.ConsumerMessage.{CommittableMessage, CommittableOffset, CommittableOffsetBatch}
import akka.kafka._
import akka.kafka.internal.KafkaConsumerActor.Internal.{Commit, CommitSingle, CommitWithoutReply}
import akka.kafka.internal.SubSourceLogic._
import akka.kafka.scaladsl.Consumer.Control
import akka.pattern.AskTimeoutException
import akka.stream.SourceShape
import akka.stream.scaladsl.Source
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.{AsyncCallback, GraphStageLogic}
import akka.util.Timeout
import akka.{Done, NotUsed}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, OffsetAndMetadata}
Expand Down Expand Up @@ -102,17 +103,34 @@ private[kafka] final class CommittableSubSource[K, V](
) {
override protected def logic(
shape: SourceShape[(TopicPartition, Source[CommittableMessage[K, V], NotUsed])]
): GraphStageLogic with Control =
new SubSourceLogic[K, V, CommittableMessage[K, V]](shape, settings, subscription, getOffsetsOnAssign, onRevoke)
with CommittableMessageBuilder[K, V]
with MetricsControl {
override def metadataFromRecord(record: ConsumerRecord[K, V]): String = _metadataFromRecord(record)
override def groupId: String = settings.properties(ConsumerConfig.GROUP_ID_CONFIG)
lazy val committer: KafkaAsyncConsumerCommitterRef = {
val ec = materializer.executionContext
new KafkaAsyncConsumerCommitterRef(consumerActor, settings.commitTimeout)(ec)
}
): GraphStageLogic with Control = {

val factory = new SubSourceStageLogicFactory[K, V, CommittableMessage[K, V]] {
def create(
shape: SourceShape[CommittableMessage[K, V]],
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)],
actorNumber: Int
): SubSourceStageLogic[K, V, CommittableMessage[K, V]] =
new CommittableSubSourceStageLogic(shape,
tp,
consumerActor,
subSourceStartedCb,
subSourceCancelledCb,
actorNumber,
settings,
_metadataFromRecord)

}
new SubSourceLogic[K, V, CommittableMessage[K, V]](shape,
settings,
subscription,
getOffsetsOnAssign,
onRevoke,
subSourceStageLogicFactory = factory)
}
}

/**
Expand Down Expand Up @@ -191,3 +209,29 @@ private[kafka] class KafkaAsyncConsumerCommitterRef(private val consumerActor: A
case _ => false
}
}

@InternalApi
private final class CommittableSubSourceStageLogic[K, V](
shape: SourceShape[CommittableMessage[K, V]],
tp: TopicPartition,
consumerActor: ActorRef,
subSourceStartedCb: AsyncCallback[(TopicPartition, ControlAndStageActor)],
subSourceCancelledCb: AsyncCallback[(TopicPartition, SubSourceCancellationStrategy)],
actorNumber: Int,
consumerSettings: ConsumerSettings[K, V],
_metadataFromRecord: ConsumerRecord[K, V] => String = CommittableMessageBuilder.NoMetadataFromRecord
) extends SubSourceStageLogic[K, V, CommittableMessage[K, V]](shape,
tp,
consumerActor,
subSourceStartedCb,
subSourceCancelledCb,
actorNumber)
with CommittableMessageBuilder[K, V] {

override def metadataFromRecord(record: ConsumerRecord[K, V]): String = _metadataFromRecord(record)
override def groupId: String = consumerSettings.properties(ConsumerConfig.GROUP_ID_CONFIG)
lazy val committer: KafkaAsyncConsumerCommitterRef = {
val ec = materializer.executionContext
new KafkaAsyncConsumerCommitterRef(consumerActor, consumerSettings.commitTimeout)(ec)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private final class CommittingProducerSinkStageLogic[K, V, IN <: Envelope[K, V,
// ---- initialization
override def preStart(): Unit = {
super.preStart()
resolveProducer()
resolveProducer(stage.producerSettings)
}

/** When the producer is set up, the sink pulls and schedules the first commit. */
Expand Down
59 changes: 34 additions & 25 deletions core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,33 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
private lazy val decider: Decider =
inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
protected val awaitingConfirmation = new AtomicInteger(0)

private var inIsClosed = false
private var completionState: Option[Try[Done]] = None

override protected def logSource: Class[_] = classOf[DefaultProducerStage[_, _, _, _, _]]

final override val producerSettings: ProducerSettings[K, V] = stage.settings

protected class DefaultInHandler extends InHandler {
override def onPush(): Unit = produce(grab(stage.in))

override def onUpstreamFinish(): Unit = {
inIsClosed = true
completionState = Some(Success(Done))
checkForCompletion()
}

override def onUpstreamFailure(ex: Throwable): Unit = {
inIsClosed = true
completionState = Some(Failure(ex))
checkForCompletion()
}
}

override def preStart(): Unit = {
super.preStart()
resolveProducer()
resolveProducer(stage.settings)
}

def checkForCompletion(): Unit =
Expand All @@ -85,11 +102,12 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
failStage(ex)
}

def postSend(msg: Envelope[K, V, P]) = ()
protected def postSend(msg: Envelope[K, V, P]): Unit = ()

override protected def producerAssigned(): Unit = resumeDemand()

protected def resumeDemand(tryToPull: Boolean = true): Unit = {
log.debug("Resume demand")
setHandler(stage.out, new OutHandler {
override def onPull(): Unit = tryPull(stage.in)
})
Expand All @@ -99,37 +117,29 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
}
}

protected def suspendDemand(): Unit =
protected def suspendDemand(): Unit = {
log.debug("Suspend demand")
suspendDemandOutHandler()
}

// factored out of suspendDemand because logging is not permitted when called from the stage logic constructor
private def suspendDemandOutHandler(): Unit = {
setHandler(
stage.out,
new OutHandler {
override def onPull(): Unit = ()
}
)
}

// suspend demand until a Producer has been created
suspendDemand()

setHandler(
stage.in,
new InHandler {
override def onPush(): Unit = produce(grab(stage.in))

override def onUpstreamFinish(): Unit = {
inIsClosed = true
completionState = Some(Success(Done))
checkForCompletion()
}
protected def initialInHandler(): Unit = producingInHandler()
protected def producingInHandler(): Unit = setHandler(stage.in, new DefaultInHandler())

override def onUpstreamFailure(ex: Throwable): Unit = {
inIsClosed = true
completionState = Some(Failure(ex))
checkForCompletion()
}
}
)
// suspend demand until a Producer has been created
suspendDemandOutHandler()
initialInHandler()

def produce(in: Envelope[K, V, P]): Unit =
protected def produce(in: Envelope[K, V, P]): Unit =
in match {
case msg: Message[K, V, P] =>
val r = Promise[Result[K, V, P]]
Expand Down Expand Up @@ -183,5 +193,4 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <:
closeProducer()
super.postStop()
}

}
35 changes: 32 additions & 3 deletions core/src/main/scala/akka/kafka/internal/DeferredProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,49 @@ import org.apache.kafka.clients.producer.Producer
import scala.util.control.NonFatal
import scala.util.{Failure, Success}

/**
* INTERNAL API
*/
@InternalApi
private[kafka] object DeferredProducer {

/**
* The [[ProducerAssignmentLifecycle]] allows us to track the state of the asynchronous producer assignment
* within the stage. This is useful when we need to manage different behavior during the assignment process. For
* example, in [[TransactionalProducerStageLogic]] we match on the lifecycle when extracting the transactional.id
* of the first message received from a partitioned source.
*/
sealed trait ProducerAssignmentLifecycle
case object Unassigned extends ProducerAssignmentLifecycle
case object AsyncCreateRequestSent extends ProducerAssignmentLifecycle
case object Assigned extends ProducerAssignmentLifecycle
}

/**
* INTERNAL API
*/
@InternalApi
private[kafka] trait DeferredProducer[K, V] {
self: GraphStageLogic with StageIdLogging =>

import DeferredProducer._

/** The Kafka producer may be created lazily, assigned via `preStart` in `assignProducer`. */
protected var producer: Producer[K, V] = _
protected var producerAssignmentLifecycle: ProducerAssignmentLifecycle = Unassigned

protected def producerSettings: ProducerSettings[K, V]
protected def producerAssigned(): Unit
protected def closeAndFailStageCb: AsyncCallback[Throwable]

private def assignProducer(p: Producer[K, V]): Unit = {
producer = p
changeProducerAssignmentLifecycle(Assigned)
producerAssigned()
}

final protected def resolveProducer(): Unit = {
val producerFuture = producerSettings.createKafkaProducerAsync()(materializer.executionContext)
final protected def resolveProducer(settings: ProducerSettings[K, V]): Unit = {
val producerFuture = settings.createKafkaProducerAsync()(materializer.executionContext)
producerFuture.value match {
case Some(Success(p)) => assignProducer(p)
case Some(Failure(e)) => failStage(e)
Expand All @@ -50,9 +72,16 @@ private[kafka] trait DeferredProducer[K, V] {
e
}
)(ExecutionContexts.sameThreadExecutionContext)
changeProducerAssignmentLifecycle(AsyncCreateRequestSent)
}
}

private def changeProducerAssignmentLifecycle(state: ProducerAssignmentLifecycle): Unit = {
val oldState = producerAssignmentLifecycle
producerAssignmentLifecycle = state
log.debug("Asynchronous producer assignment lifecycle changed '{} -> {}'", oldState, state)
}

protected def closeProducerImmediately(): Unit =
if (producer != null) {
// Discard unsent ProducerRecords after encountering a send-failure in ProducerStage
Expand All @@ -61,7 +90,7 @@ private[kafka] trait DeferredProducer[K, V] {
}

protected def closeProducer(): Unit =
if (producerSettings.closeProducerOnStop && producer != null) {
if (producerSettings.closeProducerOnStop && producerAssignmentLifecycle == Assigned) {
try {
// we do not have to check if producer was already closed in send-callback as `flush()` and `close()` are effectively no-ops in this case
producer.flush()
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/akka/kafka/internal/MessageBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ private[kafka] trait TransactionalMessageBuilderBase[K, V, Msg] extends MessageB
def committedMarker: CommittedMarker

def onMessage(consumerMessage: ConsumerRecord[K, V]): Unit

def fromPartitionedSource: Boolean
}

/** Internal API */
Expand All @@ -60,7 +62,8 @@ private[kafka] trait TransactionalMessageBuilder[K, V]
partition = rec.partition
),
offset = rec.offset,
committedMarker
committedMarker,
fromPartitionedSource
)
ConsumerMessage.TransactionalMessage(rec, offset)
}
Expand All @@ -79,7 +82,8 @@ private[kafka] trait TransactionalOffsetContextBuilder[K, V]
partition = rec.partition
),
offset = rec.offset,
committedMarker
committedMarker,
fromPartitionedSource
)
(rec, offset)
}
Expand Down
Loading