Skip to content

Conversation

@C0urante
Copy link
Contributor

@C0urante C0urante commented Feb 17, 2022

Implements a source task wrapper (ExactlyOnceWorkerSourceTask) that follows the behavior described in KIP-618 for writing source records and their offsets in transactions with user-configurable (and sometimes connector-defined) boundaries.

Relies on changes from:

@C0urante C0urante changed the title KAFKA-10000: Exactly-once source tasks KAFKA-10000: Exactly-once source tasks (KIP-618) Feb 17, 2022
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a placeholder implementation. This class is fully implemented in the downstream PR #11781

@C0urante
Copy link
Contributor Author

Jenkins build failures appear to be due to the issue addressed by #10702. I've pushed a change that applies the same fix from that PR to the newly-added failing test classes for this PR.

@C0urante C0urante marked this pull request as draft February 18, 2022 06:35
@C0urante
Copy link
Contributor Author

Converting to draft until upstream PRs are reviewed.

@C0urante C0urante force-pushed the kafka-10000-source-task-split branch 2 times, most recently from ce75140 to 1837704 Compare March 3, 2022 17:09
@C0urante C0urante force-pushed the kafka-10000-source-task-split branch from 1837704 to 48f01b1 Compare May 10, 2022 02:47
@C0urante
Copy link
Contributor Author

Given that all merge conflicts have been resolved and #11775 has already been approved, marking this as ready for review.

@C0urante C0urante marked this pull request as ready for review May 10, 2022 02:48
@C0urante C0urante force-pushed the kafka-10000-source-task-split branch from 48f01b1 to 1d95571 Compare May 10, 2022 02:53
Copy link
Member

@mimaison mimaison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @C0urante for the PR. I've made a first pass over the main (I've not looked at tests yet) and it looks pretty good. I left a few minor comments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we undo this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies, will revert and remove similar changes from other PRs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you planning to address this TODO in one of the other PRs? Same below in ExactlyOnceSourceTaskBuilder

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes; per-connector offsets topics are implemented in #11781.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment ? so it's clear to anyone that stumbles on this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can we move this block above the admin block above? so it follows the same flow than SourceTaskBuilder.doBuild()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: All other messages are capitalized

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point; given that Utils::closeQuietly uses the second parameter in the middle of a log message I'm inclined to lowercase each of these.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Let's keep the new line

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use DistributedConfig.transactionalProducerId() here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the two serve different purposes.

  • DistributedConfig::transactionalProducerId computes the transactional ID that the leader should use when writing to the config topic
  • Worker::transactionalId computes the transactional ID that exactly-once source tasks should use when writing to Kafka

Copy link
Member

@mimaison mimaison May 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that makes sense. I thought we wanted to use DistributedConfig::transactionalProducerId as a prefix in Worker::transactionalId but that's not necessary.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this worth a comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 added a comment and updated the method name.

@mimaison
Copy link
Member

@rhauch @kkonstantine This is a pretty important feature, can you take a look? The first few PRs were relatively simple but in the remaining PRs things are getting a bit more involved so it would be good to get another pair of eyes on it.

@C0urante C0urante force-pushed the kafka-10000-source-task-split branch 3 times, most recently from a36b039 to ca5e253 Compare May 17, 2022 13:16
@C0urante
Copy link
Contributor Author

Thanks @mimaison. I've addressed your comments and this is ready for another round

@mimaison
Copy link
Member

Thanks for the updates @C0urante ! I've on PTO next week, I'll take a look at the tests when I'm back.

@C0urante C0urante force-pushed the kafka-10000-source-task-split branch from ca5e253 to 5f16a11 Compare May 31, 2022 14:55
@C0urante
Copy link
Contributor Author

C0urante commented Jun 1, 2022

@mimaison I'm hoping you can take a look sometime this week. There's still a lot left to review for this KIP and it's worrying that the 3.3.0 feature freeze deadline is creeping closer with little progress being made.

Copy link
Member

@tombentley tombentley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @C0urante, I've made an initial pass.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should they be completed by this point? I.e. what in the Java memory model guarantees it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a fair point. Right now this is a result of the threading model of the producer, which blocks on the completion of all in-flight batches and their callbacks before sending a request to the broker to end a transaction. This is not technically guaranteed by the documented behavior for Producer::commitTransaction, though; the only guarantees we get from the Javadocs are:

This method will flush any unsent records before actually committing the transaction. Further, if any of the send(ProducerRecord) calls which were part of the transaction hit irrecoverable errors, this method will throw the last received exception immediately and the transaction will not be committed.

If we don't want to rely on this technically-undocumented behavior, we can add some more bookkeeping to ensure this ourselves. Alternatively, we can rely on this behavior and I can remove the comment above about producer callback guarantees and the bookkeeping we already do on that front (which I've done for now).

@hachikuji @guozhangwang do you have thoughts about whether it should be part of the contract for Producer::commitTransaction that all user-supplied callbacks for records within the transaction are fired before the method returns? The use case is tracking record metadata that's only available after the record is ack'd and then reporting it to source tasks, before clearing a local cache of that metadata.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@C0urante thanks for the explanation. I don't think a revised Producer contract would be sufficient though. It's not the blocking per-se, it's the visibility of mutations to commitableRecords done on those threads being correctly synchronized with reads on this line. Producer can't help with that because it doesn't know about commitableRecords, and so the writes and the read could be re-ordered anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all we need is a syncrhonized(commitableRecords) around these accesses.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, fair. I was taking a charitable interpretation of the Javadocs, which state:

If multiple threads access a linked hash map concurrently, and at least one of the threads modifies the map structurally, it must be synchronized externally.

In this case, by the time we get to this line, we don't technically have multiple threads accessing the map concurrently; it's guaranteed that this will be the only thread reading from or writing to the map. But it costs very little to add the synchronization barrier, so I've done that.

Also emailed Jason and Guozhang for their thoughts on callbacks with transactional producers. Hoping to hear back soon 🤞

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it's a documentation gap. It is intended for callbacks to be invoked before EndTxn is sent. It would be a little strange otherwise. Worth noting that the callback is just for notification. Even if the callback raises an exception, we won't stop an active commit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Jason, filed https://issues.apache.org/jira/browse/KAFKA-13967 to track the producer docs improvement and will have a PR out for it shortly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadocs PR: #12264

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this worth a comment?

@C0urante C0urante force-pushed the kafka-10000-source-task-split branch from 5f16a11 to 0a9f7c1 Compare June 2, 2022 14:34
@C0urante
Copy link
Contributor Author

C0urante commented Jun 2, 2022

Thanks Tom. Useful comments RE thread safety and an interesting point about transactional producer callbacks. Hoping we can do things the easy way on that front and let the clients library do some of the lifting for us, but if we can't settle on that by mid next week or actively decide against it before then, can do that manually in the ExactlyOnceWorkerSourceTask class instead.

@C0urante C0urante force-pushed the kafka-10000-source-task-split branch from 0a9f7c1 to 32e0333 Compare June 7, 2022 04:29
@C0urante C0urante force-pushed the kafka-10000-source-task-split branch from 32e0333 to a0e5a64 Compare June 8, 2022 04:50
Copy link
Member

@tombentley tombentley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again @C0urante.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Producer has it's own TransactionManager. Maybe we should call this TransactionBoundaryManager to avoid confusion. It's also a better description of what this is seeking to abstract over imho.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, no need to complicate IDE class lookups. Changed to TransactionBoundaryManager

@C0urante C0urante force-pushed the kafka-10000-source-task-split branch from a0e5a64 to 14a107e Compare June 10, 2022 02:51
Copy link
Member

@mimaison mimaison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @C0urante

@mimaison
Copy link
Member

@C0urante I merged #11779 so this needs a rebase before merging

@C0urante C0urante force-pushed the kafka-10000-source-task-split branch from 14a107e to 87ccec4 Compare June 10, 2022 16:48
@C0urante
Copy link
Contributor Author

Thanks Mickael, finished the rebase and handled the TODO from #11779 (comment).

@mimaison mimaison merged commit 9e8ef8b into apache:trunk Jun 13, 2022
@C0urante C0urante deleted the kafka-10000-source-task-split branch June 13, 2022 15:04
ableegoldman added a commit to confluentinc/kafka that referenced this pull request Jun 14, 2022
CONFLUENT: Sync from apache/kafka trunk to confluentinc/kafka master (13 Jun 2022)

apache/trunk: (7 commits)
KAFKA-13891: reset generation when syncgroup failed with REBALANCE_IN…(apache#12140)
KAFKA-10000: Exactly-once source tasks (apache#11780)
KAFKA-13436: Omitted BrokerTopicMetrics metrics in the documentation (apache#11473)
MINOR: Use Exit.addShutdownHook instead of directly adding hooks to R…(apache#12283)
KAFKA-13846: Adding overloaded metricOrElseCreate method (apache#12121)
KAFKA-13935 Fix static usages of IBP in KRaft mode (apache#12250)
HOTFIX: null check keys of ProducerRecord when computing sizeInBytes (apache#12288)


Conflicts:
None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants