Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix failing auto-commit check for kafka-clients >= v3.7.0 #762

Merged
merged 9 commits into from
May 16, 2024

Conversation

nscuro
Copy link
Contributor

@nscuro nscuro commented May 3, 2024

This PR includes the changes made by @ondryaso in #746 to resolve the failing checkAutoCommitIsDisabled for kafka-clients >= v3.7.0 (#721).

It further includes the changes requested by @rkolesnev in his review of #746.

Checklist

  • Documentation (if applicable)
  • Changelog

Copy link

cla-assistant bot commented May 3, 2024

CLA assistant check
All committers have signed the CLA.

@nscuro
Copy link
Contributor Author

nscuro commented May 3, 2024

@rkolesnev One thing about this solution is that it raises the baseline kafka-clients version to 3.7.0. Earlier versions will break, requiring users to enable the new ignoreReflectiveAccessExceptionsForAutoCommitDisabledCheck option.

This should probably be part of the changelog, and would likely even justify a version bump to 0.6.x. I didn't see any examples of breaking change entries in the changelog, is there a preferred place to put those?

@nscuro nscuro mentioned this pull request May 3, 2024
2 tasks
@rkolesnev
Copy link
Contributor

/sem-approve

@rkolesnev
Copy link
Contributor

@nscuro - hmm, I haven't dug too deeply into it - but i was under assumption that there aren't any breaking public interface changes - and for the checks on internals - like the auto commit check - it should be possible to support both versions with reflection checks i recon.
If you have time and are willing to see if its possible - please have a look. Otherwise i will try to spend some time on it towards end of this week.

@nscuro
Copy link
Contributor Author

nscuro commented May 7, 2024

[...] it should be possible to support both versions with reflection checks i recon.

Yes it definitely is. However since certain classes only exist in one version and not the other, these reflection checks will be quite extensive / ugly. I can try to look into it.

Do you have any guidance on how it working with both versions can be tested in CI? It seems the GitHub Actions workflow had a matrix configuration to cover multiple Kafka versions, the same does not appear to be true for the Semaphore pipeline.

@nscuro
Copy link
Contributor Author

nscuro commented May 7, 2024

Seems like a test is failing, but clicking on the Details link of the Semaphore check gives me a 404.

@rkolesnev
Copy link
Contributor

rkolesnev commented May 8, 2024

[...] it should be possible to support both versions with reflection checks i recon.

Yes it definitely is. However since certain classes only exist in one version and not the other, these reflection checks will be quite extensive / ugly. I can try to look into it.

Yeah - i understand the ugliness - actually it maybe possible to still compile against the 3.7+ version, but have a branching logic check to only cast / use the new classes if supplied Consumer object is 3.7+ - so reflection check would still be required but only top level one - to see if it should take 3.7+ checks or <3.7 checks.

Do you have any guidance on how it working with both versions can be tested in CI? It seems the GitHub Actions workflow had a matrix configuration to cover multiple Kafka versions, the same does not appear to be true for the Semaphore pipeline.

Nope - not really - i am not sure about those GH Actions workflows - I do not have access to them anymore.
Only suggestion i can think of - that is quick to do - is to test twice with pre 3.7 clients and 3.7+ locally. Or with pre-3.7 locally and 3.7+ on Semaphore as main test.
I will do same before merging for regression testing this.

@rkolesnev
Copy link
Contributor

Seems like a test is failing, but clicking on the Details link of the Semaphore check gives me a 404.

Hmm yep - i have tested anonymous / public access to those jobs just yesterday - something must be off today - raised for internal IT to check.

@rkolesnev
Copy link
Contributor

/sem-approve

@rkolesnev
Copy link
Contributor

Regarding CI jobs - there seems to be something off either in permissions or in how link is generated - it's being looked at.
In the mean time - changing the link to remove /summary works, i.e. instead of
https://confluentinc.semaphoreci.com/workflows/c3c0985c-3ee4-4f76-a468-fe09a12fb763/summary?pipeline_id=7ac730b5-b10a-417a-9b53-92670ddb6982
editing to
https://confluentinc.semaphoreci.com/workflows/c3c0985c-3ee4-4f76-a468-fe09a12fb763
or
https://confluentinc.semaphoreci.com/workflows/c3c0985c-3ee4-4f76-a468-fe09a12fb763?pipeline_id=7ac730b5-b10a-417a-9b53-92670ddb6982
Both open up.

@nscuro
Copy link
Contributor Author

nscuro commented May 13, 2024

Thanks @rkolesnev! I also saw your message in the Confluent Slack, I can confirm the Details link works now. :)

Will revise the PR to support both < 3.7.0 and >= 3.7.0 versions of kafka-clients.

Signed-off-by: nscuro <nscuro@protonmail.com>
@nscuro
Copy link
Contributor Author

nscuro commented May 13, 2024

The code now supports both versions of kafka-clients. I restructured it into multiple methods because the deep nesting got unbearable with the additional checks.

The PCMetricsTest fails for both 3.6.0 and 3.7.0. Appears to be related to these warnings:

35:52.990  WARN  [ForkJoinPool-1-worker-1]  (PartitionStateManager.java:113)#onPartitionsAssigned New assignment of partition which already exists and isn't recorded as removed in partition state. Could be a state bug - was the partition revocation somehow missed, or is this a race? Please file a GH issue. Partition: input-0.6491912947494951-0, state: PartitionState(module=io.confluent.parallelconsumer.internal.PCModule@6aba781f, tp=input-0.6491912947494951-0, incompleteOffsets={}, bootstrapPhase=true, dirty=false, offsetHighestSeen=-1, offsetHighestSucceeded=-1, allowedMoreRecords=true, partitionsAssignmentEpoch=0, lastCommittedOffset=0, lastCommittedOffsetGauge=io.micrometer.core.instrument.internal.DefaultGauge@4937313c, highestSeenOffsetGauge=io.micrometer.core.instrument.internal.DefaultGauge@2b75c86c, highestCompletedOffsetGauge=io.micrometer.core.instrument.internal.DefaultGauge@e33fe00a, highestSequentialSucceededOffsetGauge=io.micrometer.core.instrument.internal.DefaultGauge@1239e523, numberOfIncompletesGauge=io.micrometer.core.instrument.internal.DefaultGauge@587996ba, ephochGauge=io.micrometer.core.instrument.internal.DefaultGauge@ca6bc982, ratioPayloadUsedDistributionSummary=io.micrometer.core.instrument.cumulative.CumulativeDistributionSummary@1a1997ee, ratioMetadataSpaceUsedDistributionSummary=io.micrometer.core.instrument.cumulative.CumulativeDistributionSummary@b5f6ba60, pcMetrics=io.confluent.parallelconsumer.metrics.PCMetrics@575b2833, stateChangedSinceCommitStart=false)
35:52.997  WARN  [ForkJoinPool-1-worker-1]  (PartitionStateManager.java:113)#onPartitionsAssigned New assignment of partition which already exists and isn't recorded as removed in partition state. Could be a state bug - was the partition revocation somehow missed, or is this a race? Please file a GH issue. Partition: input-0.6491912947494951-1, state: PartitionState(module=io.confluent.parallelconsumer.internal.PCModule@6aba781f, tp=input-0.6491912947494951-1, incompleteOffsets={}, bootstrapPhase=true, dirty=false, offsetHighestSeen=-1, offsetHighestSucceeded=-1, allowedMoreRecords=true, partitionsAssignmentEpoch=0, lastCommittedOffset=0, lastCommittedOffsetGauge=io.micrometer.core.instrument.internal.DefaultGauge@494548bd, highestSeenOffsetGauge=io.micrometer.core.instrument.internal.DefaultGauge@2b83dfed, highestCompletedOffsetGauge=io.micrometer.core.instrument.internal.DefaultGauge@e34df78b, highestSequentialSucceededOffsetGauge=io.micrometer.core.instrument.internal.DefaultGauge@1247fca4, numberOfIncompletesGauge=io.micrometer.core.instrument.internal.DefaultGauge@5887ae3b, ephochGauge=io.micrometer.core.instrument.internal.DefaultGauge@ca79e103, ratioPayloadUsedDistributionSummary=io.micrometer.core.instrument.cumulative.CumulativeDistributionSummary@1a27af6f, ratioMetadataSpaceUsedDistributionSummary=io.micrometer.core.instrument.cumulative.CumulativeDistributionSummary@b604d1e1, pcMetrics=io.confluent.parallelconsumer.metrics.PCMetrics@575b2833, stateChangedSinceCommitStart=false)

I am unsure why this is happening. The ConsumerRebalanceListener interface has not changed since 5 years, so we're definitely not missing any new callbacks. Any tips as to where I should look to debug this?

@rkolesnev
Copy link
Contributor

rkolesnev commented May 14, 2024

@nscuro - i've notices this as well.
In master we have Kafka version as 3.5.0 - i wonder if there is some behaviour change even between 3.5 and 3.6 that is causing this... Maybe not actual interface, but behaviour or default rebalance protocol has changed... I want to dig into it when i get a bit of time as i think there is an issue when Cooperative rebalance strategy is used.
And afaik there is net new consumer rebalance protocol in the works with some of the rebalance logic moving into brokers from consumers - KIP-848.

@nscuro
Copy link
Contributor Author

nscuro commented May 14, 2024

@rkolesnev The rebalance protocol was a good hint! I checked the release notes but didn't find anything related.

I ended up comparing stack dumps when using Kafka v3.5.x vs 3.6.x. It turns out the issue is much simpler:

The failing test uses LongPollingMockConsumer, which extends Kafka's MockConsumer. Before Kafka 3.6.x, MockConsumer#rebalance did not invoke the rebalance listener. Hence LongPollingMockConsumer#rebalance did it:

@Override
public synchronized void rebalance(final Collection<TopicPartition> newAssignment) {
super.rebalance(newAssignment);
ConsumerRebalanceListener rebalanceListeners = getRebalanceListener();
if (rebalanceListeners != null) {
rebalanceListeners.onPartitionsAssigned(newAssignment);
}
}

Since Kafka 3.6.x, MockConsumer#rebalance does invoke the rebalance listener: apache/kafka@3.5...3.6#diff-0ebcab2d2727abe3f0f1e3a94321b068ef2478911a33861d072a7a2ecbb1cf1f

This caused PC's onPartitionsAssigned callback to be erroneously invoked twice, when used with LongPollingMockConsumer.

To summarize, the issue was limited to test code, no impact on production logic. I'll raise a separate PR to bump the Kafka version to 3.6.2, and resolve the duplicate rebalance listener invocation.

@nscuro
Copy link
Contributor Author

nscuro commented May 14, 2024

Raised #765.

I think it would make sense to merge that one first, I can then rebase this PR, which should yield a clean test run.

@rkolesnev
Copy link
Contributor

@nscuro - good find!
Sure - i will review the #765 and get it merged today.

@rkolesnev
Copy link
Contributor

/sem-approve

@rkolesnev rkolesnev merged commit bafad0b into confluentinc:master May 16, 2024
2 checks passed
@nscuro nscuro deleted the issue-721 branch May 16, 2024 15:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants