-
Notifications
You must be signed in to change notification settings - Fork 386
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add partitioned transactional source phase 1 #930
Conversation
I received confirmation that Kafka Streams EoS creates one producer per source partition. This will introduce performance implications which will be addressed in KIP-447, Producer scalability for exactly once semantics. This work is in progress and will land in a future version of Kafka (not 2.4.0).
In the meantime we can proceed with our current implementation to have one producer per partitioned source. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many places would improve with named classes.
core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/kafka/internal/CommittableSources.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala
Outdated
Show resolved
Hide resolved
6f77477
to
405687b
Compare
6364a0a
to
209b5b7
Compare
b42561a
to
44cb47d
Compare
core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala
Outdated
Show resolved
Hide resolved
7934d20
to
fad9b2f
Compare
fad9b2f
to
e394850
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quite a chunk of code.
I'm not sure what is tested and what is not. We should find a way to document that, especially when we move things across integration tests and "fast" tests.
Sprinkle more final
on classes.
core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala
Show resolved
Hide resolved
core/src/main/scala/akka/kafka/internal/DefaultProducerStage.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/kafka/internal/TransactionalSources.scala
Outdated
Show resolved
Hide resolved
tests/src/it/scala/akka/kafka/TransactionsPartitionedSourceSpec.scala
Outdated
Show resolved
Hide resolved
I commented it out and added |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
5dd9017
to
bb5394b
Compare
bb5394b
to
4617e3e
Compare
Co-authored-by: Sean Glover <sean@seanglover.com>
4617e3e
to
df5bf2e
Compare
Purpose
This PR adds a new partitioned source with support for Kafka transactions,
Transactions.partitionedSource
. ASource
is created per partition in a consumer group subscription. EachSource
must have a downstreamTransactional.sink|flow
to commit messages transactionally. A derivedtransactional.id
is used perTransactional.sink|flow
that's based on a user-providedtransactional.id
along with thegroup.id
, topic and partition of the originally consumed message.Transactions.partitionedSource
will make it easier to run transactional workloads across consumer groups with multiple members.This PR is based on the extensive work already completed by @charlibot earlier this year. It was rebased by @raboof and updated to accommodate transactional consistency work completed by @2m and @szymonm. This new branch will serve as the integration point to get this functionality into Alpakka Kafka 2.0.0.
Update 27/11/2019
Due to the large number of changes introduced by this PR it has been repurposed as a "phase 1" to lay the groundwork to continue this work. This PR will not expose the
Transactional.partitionedSource
to the end user.References
Phase 1 tasks
Phase 2 tasks (preliminary)
ConsumerSettings.stopTimeout
to 0 to allow for faster shutdowns when a stream completes/fails.