Skip to content

Conversation

@C0urante
Copy link
Contributor

Implements support for per-connector offsets topics as described in KIP-618.

Relies on changes from:

@C0urante
Copy link
Contributor Author

Converting to draft until upstream PRs are reviewed.

@C0urante C0urante force-pushed the kafka-10000-per-connector-offsets-topics branch 2 times, most recently from 0ce5eea to 93760f5 Compare March 3, 2022 17:25
@C0urante C0urante force-pushed the kafka-10000-per-connector-offsets-topics branch 4 times, most recently from 58956c3 to 05ca505 Compare June 10, 2022 17:14
@C0urante C0urante marked this pull request as ready for review June 10, 2022 17:14
@C0urante
Copy link
Contributor Author

Given that all merge conflicts have been resolved and #11780 has already been approved, marking this as ready for review.

@C0urante C0urante force-pushed the kafka-10000-per-connector-offsets-topics branch 2 times, most recently from 11ed861 to fe67568 Compare June 13, 2022 15:31
Copy link
Member

@tombentley tombentley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @C0urante! This is a first pass. I haven't looked at tests yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this call to get could result in a leaked TopicAdmin, and thus Admin, because offsetStoreForRegularSourceTask doesn't guarantee to invoke topicAdminCreator (it calls it topicAdminSupplier) (when it does, the Admin will eventually we closed via ConnectorOffsetBackingStore.stop).

In any case I think the ownership of the Admin here (responsibility for ultimately closing it) is pretty unclear, but it's not clear to me why it needs to be. So a comment to explain would be great, if a clean up of the logic is no possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call won't result in a leak because the AbstractWorkerSourceTask class will always close its TopicAdmin during shutdown; see AbstractWorkerSourceTask::close. And, given the idempotent nature of Admin::close, it's fine to close it from both the AbstractWorkerSourceTask and ConnectorOffsetBackingStore classes.

The underlying point about ownership being unclear is fair, though, and the control flow that creates the topic admin in this method is especially convoluted. I've added a couple comments that hopefully clear things up. I wish the logic itself could be better but as it is, things are easy to test and that's taken advantage of by adding a ton of coverage in the WorkerTest suite. Happy to refactor if there's an easily-testable approach that's clearer, though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conditions under which we actually end up calling topicAdminCreator.get() are:

sourceConfig.offsetsTopic() != null && config.connectorOffsetsTopicsPermitted() // connector-specific store
|| config.topicCreationEnable() && sourceConfig.usesTopicCreation() // task needs for topic creation

These are the only circumstances in which topicAdmin.get() would return non-null.

So I wonder if we should just put that logic directly in doBuild(), and have offsetStoreForRegularSourceTask() accept a nullable topicAdminSupplier (which will be guaranteed non-null in the case where it's actually called).

Wdyt?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the tests it doesn't seem we have any that assert that the Admin actually gets closed. WorkerSourceTaskTest asserts this, but for this code we're reliant that every invocation of doBuild always returns a WorkerSourceTask which is always stopped. I can see this is pretty hard to test though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I added the logic for checking those conditions to doBuild. It's still a little clunky since we're checking regularSourceTaskUsesConnectorSpecificOffsetsStore in two places now (doBuild and offsetStoreForRegularSourceTask), but for testing purposes it's easier to work with since we can continue to unit test offsetStoreForRegularSourceTask independently without having to go through doBuild. And it's certainly more readable than the caching Supplier logic that it replaced.

Good point about ensuring that we stop WorkerSourceTask instances, we definitely do a poor job of that when we fail to build a task. In fact, this has already been documented as a bug with an accompanying fix PR. If that's worth pursuing at some point let me know and I can rebase onto the latest trunk.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! When you have time it would be great if you could rebase that PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it correct that if workerGetFuture.cancel was already cancelled, and returned true, then we won't try to cancel connectorGetFuture?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, good point. Switched to | instead of ||, which disables short-circuiting so that it's guaranteed that cancel is called on both futures.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also added a comment to this part just in case someone sees that and things that a single pipe instead of a double pipe is just a typo.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's quite confusing that timeout is initially in unit, but in this reassignment becomes ms. I think maybe a new timeoutRemainingMs would be clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 done.

@C0urante C0urante force-pushed the kafka-10000-per-connector-offsets-topics branch 3 times, most recently from 631f08b to ceb7c82 Compare June 14, 2022 13:17
Copy link
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made a pass, left some comments. Thanks.

Comment on lines 79 to 81
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, addressed

Comment on lines 106 to 107
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, addressed

Comment on lines +438 to +456
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set admin = null in this case, like the comment said:
// Forget the reference to the admin so that we won't even try to use the admin the next time this method is called ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to keep throwing the exception here if the method is called again; nullifying the admin field would stop that from happening. That does remind me though, we should probably copy the same error message from here into the code path for starting connector offset stores. I've done that in my latest push; LMKWYT.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could user overrides the transactional.id to null? Should we add this test case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that can happen without causing other issues, since the user-supplied configs have come from either the JSON REST API or the Java properties worker file. The former doesn't deserializes things into a literal null and although the latter does, as of #11333, we fail validation for connector configs in that case.

But, given how little additional effort it is, I don't see a problem with adding that check. At the very least, it helps prevent a 500 instead of a 400 response during config validation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor kinda confuses the ownership of the Admin client. I think things are cleaner when the TopicAdmin instantiates (and thus owns the admin). Note, it looks like there are no callers for TopicAdmin.admin. It seems that the call sites in doBuild could simply pass the map of configs (and the bootstrapServers looked up from that), rather than instantiating the admin and then passing it to the TopicAdmin.

Obviously the test code has slightly different requirements, meaning we still need this constructor. I did also wonder whether we could also get rid of bootstrapServers by defining toString on KafkaAdminClient and using that for the logging and exceptions here in TopicAdmin. Perhaps that's worth a followup PR at some point, (though perhaps there are benefits to hiding bootstrap servers from receivers of clients).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good points. I've:

  • Demoted the visibility of the constructor that accepts an Admin instance back to package-private
  • Documented that it's the caller's responsibility to close the topic admin in order to release the resources allocated for its Admin
  • Removed the unused admin method
  • Switched over to the TopicAdmin(Map<String, Object>) constructor in the Worker class

I originally did this refactoring because I was managing the lifecycle of the Admin differently, but kept part of it because it made testing much easier when all you had to pass in was an Admin and a String instead of an Admin and an entire config map. It also seemed like an anti-pattern to have a constructor that accepted both an already-instantiated client object, and the configuration for that client object.

@C0urante
Copy link
Contributor Author

Hey guys--thanks for the reviews, really appreciate the rapid responses here. I found a bug that's been a bit trickier to solve than expected and have had little time to work on it this week. I plan to push the next draft by Friday at the very latest.

If it matters, the bug is that the offset stores for regular (non-exactly-once) source tasks, and source connectors, are never started. I'm planning on fixing that first, then adding an integration test case to #11782 to simulate a soft downgrade where someone disables exactly-once support on their worker after creating a connector and letting it run for a bit, and finally, manually auditing the changes for KIP-618 to catch any other potential bugs related to improper initialization or cleanup of resources.

@C0urante C0urante force-pushed the kafka-10000-per-connector-offsets-topics branch from ceb7c82 to 0a6f219 Compare June 17, 2022 02:20
@C0urante
Copy link
Contributor Author

Pushed the next draft which should fix the offset store startup bug and address outstanding review comments.

@C0urante C0urante force-pushed the kafka-10000-per-connector-offsets-topics branch from 0a6f219 to 64e9628 Compare June 17, 2022 21:00
@C0urante
Copy link
Contributor Author

I've completed the remaining follow-up tasks.

Audit of resource allocation and cleanup:

  • Documented ownership expectations more clearly with the KafkaBasedLog and KafkaOffsetBackingStore classes, especially with regards to their underlying Kafka clients
  • Found and fixed a small bug in ExactlyOnceWorkerSourceTask where its transactionMetrics field was never closed
  • Found a long-lived bug where HeaderConverter instances are never closed; given that this was not introduced by any of the changes for KIP-618, I've filed https://issues.apache.org/jira/browse/KAFKA-14007 and would like to address it separately
  • Found no other resource allocation, initialization, or cleanup bugs in the code paths related to KIP-618

Integration testing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we close the TopicAdmin for source tasks in AbstractWorkerSourceTask::close here, this may seem redundant. However, I've chosen to keep it in for two reasons:

  1. The WorkerConnector class has no access to the TopicAdmin used for its offset store; we'd have to add another constructor parameter to that class and manually close the TopicAdmin during shutdown if we removed this line
  2. It seems safer to leave this in. Although Autocloseable::close is not required to be idempotent, it is strongly recommended, and since all current calls to ConnectorOffsetBackingStore::stop are wrapped in Utils::closeQuietly anyways, the risks are low. On the other hand, by leaving this out, we would make it easier to introduce a resource leak in the future, which would have more serious consequences.

Copy link
Member

@tombentley tombentley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @C0urante, I left a few more comment, but they're nits at this stage.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we could improve this slightly by not comparing as strings, but as Set<String>. That way at least the order wouldn't matter, which it looks like it does right now.

I've wondered whether we should add some kind of cluster.id to the client configs which could be used to assert that the client was connected to the expected cluster. If we did do that, and they were specified in the configs here, then you could safely know at this point whether they were the same cluster without needing to connect at this point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, we can make this order-independent by comparing sets. Not too much work, so I've done that.

I have a long-term vision of relocating this logic into the WorkerTask class (or its subclasses) so that we can ping the remote Kafka cluster, grab its ID there, and decide at that point whether or not we need to construct an additional offsets store, all without having to worry about blocking up the herder tick thread. This would obviate the need for users to do any additional work at all, but it would require a significant refactoring of this part of the code base, so I've held off on it for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might newTopicSpecification be a better name? It's just the "description" is a bit ambiguous: it could be describing an existing topic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, done

Comment on lines +262 to +281
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great doc, but maybe we should also have some on those constructors themselves?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 good point, done

@C0urante C0urante force-pushed the kafka-10000-per-connector-offsets-topics branch from 02cf6c6 to ef5a43b Compare June 18, 2022 19:24
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discovered this bug while working on #12307 for https://issues.apache.org/jira/browse/KAFKA-14006.

Copy link
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made another pass, left some minor comments. Overall LGTM. During the review, I found a bug KAFKA-14012. But it's not related to this change and can be fixed in another PR. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've add all bootstrap servers into producerBootstrapServers set in L1672, why should we re-create another hashSet here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, left in accidentally. Removed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OffsetBackingStore#start() -> OffsetBackingStore#stop() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and [close] the {@link TopicAdmin} used by that store.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we log trace to this primary store error, but warn to the secondary store error in L290?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The failure to write to the primary store will probably be logged elsewhere at a higher level, since we report it to the callback. We swallow failures to write to the secondary store, so this is the only chance we get to log those.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused variable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used here.

Added this. on writes to this field and others in the configure method to make it easier to identify instance vs. function variables.

@C0urante C0urante force-pushed the kafka-10000-per-connector-offsets-topics branch from ef5a43b to c6918fe Compare June 20, 2022 13:47
Copy link
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@showuon
Copy link
Member

showuon commented Jun 21, 2022

Failed tests are unrelated:

Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testSnapshotOnlyAfterConfiguredMinBytes()

@showuon showuon merged commit b7b7615 into apache:trunk Jun 21, 2022
@C0urante C0urante deleted the kafka-10000-per-connector-offsets-topics branch June 21, 2022 11:51
mjsax pushed a commit to confluentinc/kafka that referenced this pull request Jun 30, 2022
Implements support for per-connector offsets topics as described in KIP-618.

Reviewers: Luke Chen <showuon@gmail.com>, Tom Bentley <tbentley@redhat.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants