Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new transactional producer #130

Merged
merged 28 commits into from
May 14, 2019
Merged

Add new transactional producer #130

merged 28 commits into from
May 14, 2019

Conversation

danxmoran
Copy link
Contributor

Fixes #128.

Here's an implementation which (mostly) avoids touching existing classes. There's a bunch of code copy-pasted from the existing KafkaProducer and ProducerMessage which might be nice to consolidate 😄

Copy link
Contributor Author

@danxmoran danxmoran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some design questions I could use feedback on.

src/main/scala/fs2/kafka/ConsumerSettings.scala Outdated Show resolved Hide resolved
src/main/scala/fs2/kafka/TransactionalMessage.scala Outdated Show resolved Hide resolved
src/main/scala/fs2/kafka/package.scala Outdated Show resolved Hide resolved
src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala Outdated Show resolved Hide resolved
src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala Outdated Show resolved Hide resolved
Copy link
Contributor

@vlovgr vlovgr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for this, already looking really good!

src/main/scala/fs2/kafka/ConsumerSettings.scala Outdated Show resolved Hide resolved
src/main/scala/fs2/kafka/ConsumerSettings.scala Outdated Show resolved Hide resolved
src/main/scala/fs2/kafka/TransactionalMessage.scala Outdated Show resolved Hide resolved
src/main/scala/fs2/kafka/TransactionalMessage.scala Outdated Show resolved Hide resolved
src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala Outdated Show resolved Hide resolved
src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala Outdated Show resolved Hide resolved
@danxmoran danxmoran changed the base branch from master to 0.20.x May 9, 2019 18:34
@danxmoran
Copy link
Contributor Author

@vlovgr I think I covered everything

@codecov
Copy link

codecov bot commented May 9, 2019

Codecov Report

Merging #130 into 0.20.x will decrease coverage by 0.12%.
The diff coverage is 94.2%.

Impacted file tree graph

@@            Coverage Diff             @@
##           0.20.x     #130      +/-   ##
==========================================
- Coverage   93.75%   93.62%   -0.13%     
==========================================
  Files          40       48       +8     
  Lines        1153     1208      +55     
  Branches       78       93      +15     
==========================================
+ Hits         1081     1131      +50     
- Misses         72       77       +5
Impacted Files Coverage Δ
...c/main/scala/fs2/kafka/internal/WithConsumer.scala 100% <ø> (ø) ⬆️
.../main/scala/fs2/kafka/ConsumerGroupException.scala 0% <0%> (ø)
...c/main/scala/fs2/kafka/internal/WithProducer.scala 100% <100%> (ø)
src/main/scala/fs2/kafka/package.scala 100% <100%> (ø) ⬆️
...n/scala/fs2/kafka/CommittableProducerRecords.scala 100% <100%> (ø)
src/main/scala/fs2/kafka/KafkaConsumer.scala 99.16% <100%> (-0.84%) ⬇️
src/main/scala/fs2/kafka/IsolationLevel.scala 100% <100%> (ø)
...scala/fs2/kafka/TransactionalProducerMessage.scala 100% <100%> (ø)
src/main/scala/fs2/kafka/CommittableOffset.scala 100% <100%> (ø) ⬆️
...cala/fs2/kafka/TransactionalProducerResource.scala 100% <100%> (ø)
... and 15 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 75ef99f...720ce88. Read the comment docs.

@danxmoran
Copy link
Contributor Author

⚠️ Scope creep ⚠️

I've been working to get an 0.19.x-compatible version of this PR set up in my project at work as a stop-gap while we work out the rough edges of the API here. The first problem I hit was picking a transactional ID. Alpakka asks for a user-defined ID, but I found that Kafka Streams auto-generates IDs for users. When I dug into the implementation I found that Kafka Streams uses a unique transactional ID per "task" (topic/partition). Other projects like spring-kafka have followed the same pattern. From what I've read, it's effectively the only way to ensure messages don't get double-processed when partitions get rebalanced.

Do you think this should change our design here at all? I could see a setup where we remove the transactionalId field from ProducerSettings and have TransactionalKafkaProducer manage the lifecycles of several "raw" Kafka producers under-the-hood. It'd be more complicated, but I think a big win. Without baked-in support, users who care about zombie-fencing guarantees will need to spin up a fresh TransactionalKafkaProducer every time they want to publish a new message.

@vlovgr
Copy link
Contributor

vlovgr commented May 13, 2019

If I understand correctly, TransactionalKafkaProducer in its current form is only really useful if you have a single instance in the consumer group. With multiple instances, you need a stable transactional.id across instances per topic-partition, which is not trivial to implement yourself, and would fit better in the library. I definitely agree this is something we should handle in the library.

The mentioned topic-partition strategy also means one producer per topic-partition, right? That might be a bit much in the single instance scenario, but for multiple instances it sounds acceptable. Perhaps this is even behaviour we can toggle in ProducerSettings, while the remaining API stays as is? If you agree, then we can finish this up and merge it, and then do the remaining work in a separate pull request.

(I think the most tricky part of the producer-per-topic-partition is managing creating and closing producers as partitions are assigned and revoked. Maybe we could do something clever with rebalance listeners to get this working nicely.)

@danxmoran
Copy link
Contributor Author

👍 I'm ok with this merging and setting up a new PR for improvements. Thanks for all the help and review! Is there anything I can do to help with adding docs & tests on this one?

@vlovgr
Copy link
Contributor

vlovgr commented May 13, 2019

Great! If there's anything you feel is missing in terms of docs or tests, then feel free to add it. Otherwise, I'll just have a final look through this tomorrow and then merge it. 👍

@vlovgr
Copy link
Contributor

vlovgr commented May 14, 2019

Thanks a lot for this @danxmoran! 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

2 participants