Skip to content

KAFKA-12558: Do not prematurely mutate partiton state and provide con…#11818

Merged
C0urante merged 6 commits intoapache:trunkfrom
emilnkrastev:KAFKA-12558
Jan 10, 2023
Merged

KAFKA-12558: Do not prematurely mutate partiton state and provide con…#11818
C0urante merged 6 commits intoapache:trunkfrom
emilnkrastev:KAFKA-12558

Conversation

@emilnkrastev
Copy link
Contributor

This PR addresses the issue described here KAFKA-12558.

Additionally, The PR also allows to configure the max outstanding syncs in MirrorSourceTask because it is currently hardcoded.
A lot of offset syncs messages are lost during burst of messages in the source cluster or when the MirrorMaker has a lot to catch up (fist run or being inactive for a while). In such scenario it will take a while to sync the offsets in the destination cluster with partitions without regular activity even with reaching the maximum parallelism - 1 task per partition.

The PR tries to mitigate the issue by providing a way to change the maximum allowed concurrent offset syncs so that there are less offset syncs loses.

Here are my steps to reproduce the offset syncs issue because of the max outstanding syncs limited to 10:

  1. Source topic with 12 partitions and 1400 messages with minimal activity. Messages are getting produced on daily basis
  2. Run MirrorMaker2 process within the destination cluster network with offset syncs topic location set to target and 5 tasks
  3. 372 offset syncs messages arrived in the destination cluster offset syncs topic
  4. 9 out of 12 partitions are not synced correctly in the destination cluster
  5. Waiting for hours and more for new messages to arrive in source Kafka cluster which will sync the correct offsets

@gharris1727
Copy link
Contributor

gharris1727 commented Dec 19, 2022

@emilnkrastev Thanks for the fix! Are you still interested in making this change?

If so, I would suggest removing the configuration changes, as those would require a KIP. We can much more easily merge a targeted bug fix.

Also I noticed that your name and email are not set in your commit. You may want to configure these so that the change can be attributed to you.

Thanks!

@emilnkrastev
Copy link
Contributor Author

@gharris1727 I'm still interested in the change and I will update the MR in the next couple of days.

Thanks for the reply!

@emilnkrastev
Copy link
Contributor Author

@gharris1727 The PR is updated. Could you please take a look?

@gharris1727
Copy link
Contributor

Thanks for the update @emilnkrastev.

It appears there are some CI failures that mention offsets translation that we need to look into, especially in MirrorConnectorsIntegrationSSLTest

Additionally, I'm trying to think of ways to reliably test this behavior. You've translated the existing test to work with the new update signature, but we haven't really targeted the semaphore balking behavior in a regression test. The lack of code coverage is partly why we didn't notice this before.

Here's a couple of ideas that might work, i'll let you choose whichever one you think makes more sense.

  • Mock the semaphore acquire/release and unit test to simulate balking in MirrorSourceTaskTest
  • Mock the producer and delay the send callback to cause the real semaphore to balk in MirrorSourceTaskTest
  • Refactor the MirrorSourceTask semaphore to make it more unit-testable.
  • Add a test to MirrorConnectorsIntegrationTestBase with a large contention on this semaphore and assert that the checkpoint offsets are sensible.

Thanks!

@emilnkrastev
Copy link
Contributor Author

@gharris1727 The PR has been updated by adding additional unit test for testing partition state mutation and changing the update logic. The MirrorConnectorsIntegrationSSLTest Integration test was failing due to missing synced offsets and it was fix by always updating the previous upstream/downstream offset (the original logic is doing it).

There are still test failures but I believe there are not related to the PR changes. I can see similar test failures on the other PR's CI pipelines.

Copy link
Contributor

@gharris1727 gharris1727 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks so much @emilnkrastev, this LGTM once some small nits are resolved.
The test looks great, and I'm glad that we're actually covering this code path. If all of the other tests are fine with those three instance variables being null, I'm wondering how much test coverage we're really missing right now.


assertTrue(backupOffsets.containsKey(
new TopicPartition("primary.test-topic-1", 0)), "Offsets not translated downstream to backup cluster. Found: " + backupOffsets);
for (int i = 0; i < NUM_PARTITIONS; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice change!

@gharris1727
Copy link
Contributor

@C0urante could you take a look at this, please?

@C0urante
Copy link
Contributor

C0urante commented Dec 29, 2022

Thanks for this fix, @emilnkrastev. I want to make sure I understand the goal here before commenting on the actual code changes (although I've made sure to read through them carefully before writing this).

It looks like incorrect updates to the partitionStates field (or its contents) might result in the starvation of offset syncs for some topic partitions. The task will believe that it has performed syncs that it hasn't, and as a result, may never see that the lag between the (true) last-synced offset and the offset for the last-replicated record has exceeded the value for the user-configured offset.lag.max property.

However, it looks like KAFKA-12468 and possibly KAFKA-12558 describe a different problem: it's not just a matter of offset syncs being delayed for longer than expected or even indefinitely; instead, the offset syncs that do make it to the downstream cluster actually contain incorrect values, which in some extreme cases can result in things like negative consumer lag.

I don't see how this PR would address an issue with correctness in the actual offsets used for syncs; instead, it seems like the value here is that it will allow offset syncs to take place in cases where they should be but are currently not.

Is that a fair assessment of the goal here? If so, I'm happy to review/merge a fix with that goal, and if not, would you mind shedding some light on what the goal is and where I might be missing something?

@emilnkrastev
Copy link
Contributor Author

@C0urante your assessment of the goal is correct - the PR changes are aiming to allow syncs to take place where they should be.

I believe there are 2 different problems - one that results in delayed offset syncs and the other that results in negative lag in the destination cluster. I'm trying to mitigate the first problem (delayed offset syncs).

@C0urante
Copy link
Contributor

Thanks @emilnkrastev, good to know we're on the same page!

Thinking about this a little more, I wonder if a slightly more-involved approach is warranted. With the current proposal, it looks like we might still miss some cases that are predicated on the last-seen (not last-synced) upstream/downstream offsets.

For example, if the downstream topic is deleted and then recreated, the downstreamOffset < previousDownstreamOffset part of the condition in shouldSyncOffsets will evaluate to true, and we'll try to send an offset sync as a result. If there are too many in-flight offset syncs already and we can't acquire access to the semaphore, we'll skip that sync--but since we'll also have already updated the previousDownstreamOffset field, the next call to shouldSyncOffsets won't (necessarily) return true, even though we should still attempt a sync in that case.

For a short-term fix, I think the PartitionState class could be adapted to "remember" whether syncs are necessary, and allow external callers to clear that state whenever an offset sync has actually been performed. That way, we never run the risk of "dropping" offset syncs that were supposed to be performed but were blocked by access to the semaphore.

How does that sound?

@emilnkrastev
Copy link
Contributor Author

@C0urante Your suggestion sounds perfect.

I have updated the PR.

Copy link
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @emilnkrastev! The functional parts LGTM; just a few thoughts about the testing strategy.

Copy link
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @emilnkrastev! One final round of comments and this should be good to go.

Copy link
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @emilnkrastev!

@C0urante C0urante merged commit 6e7e2e0 into apache:trunk Jan 10, 2023
AnatolyPopov pushed a commit to aiven/kafka that referenced this pull request Jan 12, 2023
…rror Maker 2 (apache#11818)

Reviewers: Greg Harris <greg.harris@aiven.io>, Chris Egerton <chrise@aiven.io>
ijuma added a commit to fvaleri/kafka that referenced this pull request Jan 13, 2023
* apache-github/trunk:
  KAFKA-14601: Improve exception handling in KafkaEventQueue apache#13089
  KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface (apache#12886)
  KAFKA-14530: Check state updater more often (apache#13017)
  KAFKA-14304 Use boolean for ZK migrating brokers in RPC/record (apache#13103)
  KAFKA-14003 Kafka Streams JUnit4 to JUnit5 migration part 2 (apache#12301)
  KAFKA-14607: Move Scheduler/KafkaScheduler to server-common (apache#13092)
  KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface (apache#12870)
  KAFKA-14557; Lock metadata log dir (apache#13058)
  MINOR: Implement toString method for TopicAssignment and PartitionAssignment (apache#13101)
  KAFKA-12558: Do not prematurely mutate internal partition state in Mirror Maker 2 (apache#11818)
  KAFKA-14540: Fix DataOutputStreamWritable#writeByteBuffer (apache#13032)
  KAFKA-14600: Reduce flakiness in ProducerIdExpirationTest (apache#13087)
  KAFKA-14279: Add 3.3.x streams system tests (apache#13077)
  MINOR: bump streams quickstart pom versions and add to list in gradle.properties (apache#13064)
  MINOR: Update KRaft cluster upgrade documentation for 3.4 (apache#13063)
  KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller. (apache#12998)
  KAFKA-14570: Fix parenthesis in verifyFullFetchResponsePartitions output (apache#13072)
  MINOR: Remove public mutable fields from ProducerAppendInfo (apache#13091)
ijuma added a commit to confluentinc/kafka that referenced this pull request Jan 17, 2023
…master

* apache-github/trunk: (23 commits)
  MINOR: Include the inner exception stack trace when re-throwing an exception (apache#12229)
  MINOR: Fix docs to state that sendfile implemented in `TransferableRecords` instead of `MessageSet` (apache#13109)
  Update ProducerConfig.java (apache#13115)
  KAFKA-14618; Fix off by one error in snapshot id (apache#13108)
  KAFKA-13709 (follow-up): Avoid mention of 'exactly-once delivery' or 'delivery guarantees' in Connect (apache#13106)
  KAFKA-14367; Add `TxnOffsetCommit` to the new `GroupCoordinator` interface (apache#12901)
  KAFKA-14568: Move FetchDataInfo and related to storage module (apache#13085)
  KAFKA-14612: Make sure to write a new topics ConfigRecords to metadata log iff the topic is created (apache#13104)
  KAFKA-14601: Improve exception handling in KafkaEventQueue apache#13089
  KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface (apache#12886)
  KAFKA-14530: Check state updater more often (apache#13017)
  KAFKA-14304 Use boolean for ZK migrating brokers in RPC/record (apache#13103)
  KAFKA-14003 Kafka Streams JUnit4 to JUnit5 migration part 2 (apache#12301)
  KAFKA-14607: Move Scheduler/KafkaScheduler to server-common (apache#13092)
  KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface (apache#12870)
  KAFKA-14557; Lock metadata log dir (apache#13058)
  MINOR: Implement toString method for TopicAssignment and PartitionAssignment (apache#13101)
  KAFKA-12558: Do not prematurely mutate internal partition state in Mirror Maker 2 (apache#11818)
  KAFKA-14540: Fix DataOutputStreamWritable#writeByteBuffer (apache#13032)
  KAFKA-14600: Reduce flakiness in ProducerIdExpirationTest (apache#13087)
  ...
guozhangwang pushed a commit to guozhangwang/kafka that referenced this pull request Jan 25, 2023
…rror Maker 2 (apache#11818)

Reviewers: Greg Harris <greg.harris@aiven.io>, Chris Egerton <chrise@aiven.io>
AnatolyPopov pushed a commit to aiven/kafka that referenced this pull request Feb 22, 2023
…rror Maker 2 (apache#11818)

Reviewers: Greg Harris <greg.harris@aiven.io>, Chris Egerton <chrise@aiven.io>
C0urante pushed a commit that referenced this pull request Feb 23, 2023
…rror Maker 2 (#11818)

Reviewers: Greg Harris <greg.harris@aiven.io>, Chris Egerton <chrise@aiven.io>
C0urante pushed a commit that referenced this pull request Feb 23, 2023
…rror Maker 2 (#11818)

Reviewers: Greg Harris <greg.harris@aiven.io>, Chris Egerton <chrise@aiven.io>
AnatolyPopov pushed a commit to aiven/kafka that referenced this pull request Mar 24, 2023
…rror Maker 2 (apache#11818)

Reviewers: Greg Harris <greg.harris@aiven.io>, Chris Egerton <chrise@aiven.io>
giuseppelillo pushed a commit to aiven/kafka that referenced this pull request Mar 29, 2023
…rror Maker 2 (apache#11818)

Reviewers: Greg Harris <greg.harris@aiven.io>, Chris Egerton <chrise@aiven.io>
giuseppelillo pushed a commit to aiven/kafka that referenced this pull request Apr 6, 2023
…rror Maker 2 (apache#11818)

Reviewers: Greg Harris <greg.harris@aiven.io>, Chris Egerton <chrise@aiven.io>
jeqo pushed a commit to aiven/kafka that referenced this pull request Sep 28, 2023
…rror Maker 2 (apache#11818)

Reviewers: Greg Harris <greg.harris@aiven.io>, Chris Egerton <chrise@aiven.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants