Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spring Kafka Wait For Assignment Failed After Update To SpringBoot 3.2.0 #2978

Closed
dimasadryantos opened this issue Dec 12, 2023 · 10 comments · Fixed by #2983
Closed

Spring Kafka Wait For Assignment Failed After Update To SpringBoot 3.2.0 #2978

dimasadryantos opened this issue Dec 12, 2023 · 10 comments · Fixed by #2983
Assignees
Milestone

Comments

@dimasadryantos
Copy link

Problem :

  • I have test where I configure multiple topics and have test container waiting for assignments, after updating to SpringBoot 3.2.0, my integration test freezing and returning :

Caused by: java.lang.IllegalStateException: Expected 2 but got 1 partitions

    @EventListener(ContextRefreshedEvent::class)
    fun waitForAssignment() {
        for (messageListenerContainer in kafkaListenerEndpointRegistry.listenerContainers) {
            if (shouldWaitForAssignment(messageListenerContainer)) {
                ContainerTestUtils.waitForAssignment(messageListenerContainer, embeddedKafka.partitionsPerTopic)
            }
        }
    }
@scottfrederick
Copy link

Thanks for getting in touch. Please try downgrading your Spring Kafka dependency to 3.0.13 to match the version that was managed with Spring Boot 3.1.x. You can do this by setting the spring-kafka.version property in your build. This will help us to know if the problem is in Spring Boot or Spring Kafka.

If that doesn't change the problem, please provide a complete minimal sample that reproduces the problem. You can share it with us by pushing it to a separate repository on GitHub or by zipping it and attaching it to this issue.

@PospolitaNV
Copy link

I have the same issue

@dimasadryantos
Copy link
Author

@scottfrederick Hi Thank you for your feedback, yes I will attach sample project for reproducing the issue

@ThomHurks
Copy link

In my integration tests I had the same issue after upgrading to 3.2, I worked around it by making sure an integration test is for a single topic and has partitions set to 1 on the embedded Kafka annotation. It seems like a weird race condition, because when I attached a debugger I could see the number of expected partitions was correct, but when just running the test it would fail with the same error as reported here.

@dimasadryantos
Copy link
Author

Hi @ThomHurks could you add example project to reproduce the error? As I tried with my project the error didn't appear

@spring-projects-issues
Copy link

If you would like us to look at this issue, please provide the requested information. If the information is not provided within the next 7 days this issue will be closed.

@spring-projects-issues
Copy link

Closing due to lack of requested feedback. If you would like us to look at this issue, please provide the requested information and we will re-open the issue.

@spring-projects-issues spring-projects-issues closed this as not planned Won't fix, can't repro, duplicate, stale Dec 31, 2023
@rgolder1
Copy link

rgolder1 commented Jan 4, 2024

More information on my observation of this change in behaviour:
https://github.com/lydtechconsulting/introduction-to-kafka-with-spring-boot/wiki#error-running-integration-tests-with-spring-boot-32
I am unclear if the change is a bug, or a fix to what was a bug, or just a functional change in behaviour.

Steps to reproduce:

  • clone the above project
  • switch to branch 13-dlt
  • run the Spring Boot test OrderDispatchIntegrationTest - test passes
  • change the Spring Boot version in the pom.xml from 3.0.5 to 3.2.1
  • run the Spring Boot test OrderDispatchIntegrationTest - test fails with error:
    java.lang.IllegalStateException: Expected 2 but got 1 partitions

@wilkinsona
Copy link
Member

Thanks, @rgolder1. The problem does not occur with Spring Boot 3.2.1 when Spring Kafka is downgraded to 3.0.13 by adding <spring-kafka.version>3.0.13</spring-kafka.version> to the pom's <properties> so the change in behavior appears to be do to a change in Spring Kafka.

/cc @artembilan who maintains Spring Kafka

@artembilan
Copy link
Member

Thank you for great example to play with!

Although it is too complicated to digest quickly, but here is a workaround for now:

@EmbeddedKafka(controlledShutdown = true, kraft = false)

Starting with Spring for Apache Kafka 3.1, the embedded Kafka is switched to a new KRaft model: https://developer.confluent.io/learn/kraft/.

Apparently it does not take into account a number of partitions set by default in the @EmbeddedKafka.partitions attribute when those topics are created automatically.
Another workaround is like:

@EmbeddedKafka(controlledShutdown = true, partitions = 1)

or more fine-grained:

@EmbeddedKafka(controlledShutdown = true, topics = { OrderDispatchIntegrationTest.ORDER_CREATED_TOPIC, OrderDispatchIntegrationTest.DISPATCH_TRACKING_TOPIC, OrderDispatchIntegrationTest.ORDER_DISPATCHED_TOPIC, OrderDispatchIntegrationTest.ORDER_CREATED_DLT_TOPIC })

to make an embedded Kafka to create those topics for us upfront.

As for the problem with KRaft.
I see there is a logic in the EmbeddedKafkaZKBroker:

				if (!this.brokerProperties.containsKey(KafkaConfig.NumPartitionsProp())) {
					brokerConfigProperties.setProperty(KafkaConfig.NumPartitionsProp(), "" + this.partitionsPerTopic);
				}

So, we propagate a specific property into Kafka broker started via embedded Kafka for topics auto-creation.
Doesn't look like we have similar logic in the EmbeddedKafkaKraftBroker:

	private void addDefaultBrokerPropsIfAbsent(Properties brokerConfig, int numBrokers) {
		brokerConfig.putIfAbsent(KafkaConfig.DeleteTopicEnableProp(), "true");
		brokerConfig.putIfAbsent(KafkaConfig.GroupInitialRebalanceDelayMsProp(), "0");
		brokerConfig.putIfAbsent(KafkaConfig.OffsetsTopicReplicationFactorProp(), String.valueOf(numBrokers));
	}

So, feel free to raise a respective issue in the https://github.com/spring-projects/spring-kafka/issues.

Or, @wilkinsona , we can just transfer this issue over there.

@wilkinsona wilkinsona reopened this Jan 4, 2024
@wilkinsona wilkinsona transferred this issue from spring-projects/spring-boot Jan 4, 2024
@artembilan artembilan added this to the 3.1.2 milestone Jan 4, 2024
@artembilan artembilan self-assigned this Jan 11, 2024
artembilan added a commit to artembilan/spring-kafka that referenced this issue Jan 11, 2024
Fixes: spring-projects#2978

If we don't create topics manually, that can be done automatically on the broker side
according to its configuration.

For that goal the `EmbeddedKafkaKraftBroker` is missing to populate
`KafkaConfig.NumPartitionsProp(): "" + this.partitionsPerTopic` broker property from
`@EmbeddedKafka` configuration

* Propagate `partitionsPerTopic` option down to the embedded broker(s) in the `EmbeddedKafkaKraftBroker`
* Some other simple refactoring in the `EmbeddedKafkaKraftBroker`
* Verify the option propagated via new unit test in the `KafkaTestUtilsTests.topicAutomaticallyCreatedWithProperNumberOfPartitions()`
sobychacko pushed a commit that referenced this issue Jan 17, 2024
Fixes: #2978

If we don't create topics manually, that can be done automatically on the broker side
according to its configuration.

For that goal the `EmbeddedKafkaKraftBroker` is missing to populate
`KafkaConfig.NumPartitionsProp(): "" + this.partitionsPerTopic` broker property from
`@EmbeddedKafka` configuration

* Propagate `partitionsPerTopic` option down to the embedded broker(s) in the `EmbeddedKafkaKraftBroker`
* Some other simple refactoring in the `EmbeddedKafkaKraftBroker`
* Verify the option propagated via new unit test in the `KafkaTestUtilsTests.topicAutomaticallyCreatedWithProperNumberOfPartitions()`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants