KAFKA-10000: Use transactional producer for config topic (KIP-618)#11778
Conversation
|
Converting to draft until upstream PRs are reviewed. |
cb77582 to
0085eee
Compare
There was a problem hiding this comment.
Moved to a different package to leverage package-private fields and methods while testing other classes in this package (such as the KafkaConfigBackingStore class).
There was a problem hiding this comment.
org.apache.kafka.connect.storage is a public package, and AFAICS this change in public API wasn't mentioned in KIP-618.
There was a problem hiding this comment.
That package contains a mix of public and private API. The public parts are all documented in the Javadocs for the package and include the Converter interface and the SimpleHeaderConverter class; the private parts (which include everything in the connect/runtime module) contain the ClusterConfigState class.
There was a problem hiding this comment.
OK. I missed that this package move on its own won't make this class a public API.
0085eee to
4599afe
Compare
|
Given that all merge conflicts have been resolved and #11775 has already been approved, marking this as ready for review. |
4599afe to
65fddc7
Compare
65fddc7 to
a9bf688
Compare
|
@tombentley the 3.3.0 feature freeze is a little over a month away and there are still seven open pull requests for this feature. Do you have time to take a look? |
|
@tombentley If you do not have time to take a look, is there anyone you can recommend in your place? |
|
@tombentley hello? Is anyone actually maintaining Kafka Connect anymore or should we all just stop contributing to it? |
tombentley
left a comment
There was a problem hiding this comment.
Thanks @C0urante! I left a few comments, but mostly this looks good.
There was a problem hiding this comment.
Note that it is not necessary to wrap every write to the config topic in this method, only the writes that should be performed exclusively by the leader.
Should we make this part of the method name, e.g. writeToConfigTopicAsLeader
There was a problem hiding this comment.
Fair enough, done 👍
There was a problem hiding this comment.
org.apache.kafka.connect.storage is a public package, and AFAICS this change in public API wasn't mentioned in KIP-618.
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
If we're talking about a "fencable" producer, then shouldn't this be "fencably", rather than "fencibly"?
There was a problem hiding this comment.
I realise that we're not trying to support a thread cancellation mechanism here, but shouldn't we set the thread's interrupted status when catching InterruptedException?
There was a problem hiding this comment.
This part isn't new, it's just an inlining of a private method that was only being used in one place. Considering it follows the logic used in all similar methods for this class and there haven't been any bugs reported related to it that I'm aware of, I'd like to err on the side of caution here and keep this logic as-is for now, though we can address in a follow-up if this is a genuine issue.
335991c to
6c94f37
Compare
|
Thanks Tom. I hope the package shuffling with |
6c94f37 to
c312555
Compare
|
I'll take a look this week. |
|
Thanks, Luke. |
showuon
left a comment
There was a problem hiding this comment.
Reviewed the implementation code. Left some comments. Will continue with the test code tomorrow. Thanks.
There was a problem hiding this comment.
nit: [Please] disable the worker's exactly-once support....
There was a problem hiding this comment.
Is this expected even when we enable idempotence? I thought the idempotent producer can make sure the order when max in-flight <= 5? Could we set to 5 here?
There was a problem hiding this comment.
Good point, changed it to set to 5. Considered an alternative where we don't explicitly set anything and let the default (5) take effect unless the user overrides it in their worker config file, but we'd have to lower it to 5 if it were higher than that in order to prevent reordering, and there just doesn't seem to be enough value in permitting that kind of flexibility to do the extra work to add the necessary safeguards around it.
There was a problem hiding this comment.
I agree we put default to 5 for fencableProducer for now. And improve it for customization to <= 5 value if necessary in the future.
c312555 to
7d84e77
Compare
|
Thanks Luke, appreciate it. |
showuon
left a comment
There was a problem hiding this comment.
Thanks for the update. Left some comments.
There was a problem hiding this comment.
Should we close the fencableProducer when ProducerFencedException thrown? Looks like we only recreate one afterwards, right? And what about other exceptions? Should we also close fencableProducer in those cases?
There was a problem hiding this comment.
Yes, we should close it in this case. I originally wanted to be more conservative about other cases, but after thinking it over, I'd rather not risk getting into a bad state with a transactional producer that can't be fixed and requires a new one. Given the expected low frequency of writes to the config topic, it's probably fine to also close the producer and then construct a new one in claimWritePrivileges in this case. Good catch!
There was a problem hiding this comment.
I agree we put default to 5 for fencableProducer for now. And improve it for customization to <= 5 value if necessary in the future.
7d84e77 to
ac9674c
Compare
|
Failed tests are unrelated: |
|
Thanks Luke, and thanks Tom! |
Implements the behavior described in KIP-618: using a transactional producer for writes to the config topic that should only be performed by the leader of the cluster.
Relies on changes from: