Skip to content

Conversation

@gharris1727
Copy link
Contributor

These tests contain typos that leak sockets, mostly through clients that are left open.

The WorkerTest had rather involved resource leaks, which arose because the typical lifetime of the WorkerTask was partially or totally mocked. The WorkerTask subclass constructors accept clients as arguments, and take ownership of those clients, and the partial mocking prevented those clients from being closed appropriately.

Now, all of (WorkerSourceTask, ExactlyOnceWorkerSourceTask, WorkerSinkTask) have their constructors mocked, and those constructors close the passed-in resources immediately. This also allows the test to avoid waiting for some cancellation timeouts to expire, making the test faster to run.

The other systematic typo that was present was the OffsetStore not being configured or closed, which caused a consumer to be leaked. This is because the constructor is instantiated in the factory method (e.g. offsetStoreForExactlyOnceSourceTask), but the KafkaOffsetBackingStore.offsetLog field field is only initialized inside of configure(WorkerConfig). Rather than making OffsetBackingStore implementations close the consumer even when configure is not called, I made the test use a more realistic lifecycle and actually call configure(WorkerConfig) and stop().

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Signed-off-by: Greg Harris <greg.harris@aiven.io>
Copy link
Member

@mimaison mimaison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

…est cleanup

Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
@divijvaidya
Copy link
Member

Unrelated test failures, all tests modified in this PR are successful.

[Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/org.apache.kafka.connect.integration/OffsetsApiIntegrationTest/Build___JDK_8_and_Scala_2_12___testResetSinkConnectorOffsetsZombieSinkTasks/)
[Build / JDK 8 and Scala 2.12 / kafka.api.TransactionsBounceTest.testWithGroupMetadata()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/kafka.api/TransactionsBounceTest/Build___JDK_8_and_Scala_2_12___testWithGroupMetadata__/)
[Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testConfigurationOperations()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_8_and_Scala_2_12___testConfigurationOperations__/)
[Build / JDK 21 and Scala 2.13 / org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/org.apache.kafka.connect.integration/OffsetsApiIntegrationTest/Build___JDK_21_and_Scala_2_13___testResetSinkConnectorOffsetsZombieSinkTasks/)
[Build / JDK 21 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testConfigResourceExistenceChecker()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_21_and_Scala_2_13___testConfigResourceExistenceChecker__/)
[Build / JDK 21 and Scala 2.13 / org.apache.kafka.tools.TopicCommandIntegrationTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String).kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/org.apache.kafka.tools/TopicCommandIntegrationTest/Build___JDK_21_and_Scala_2_13___testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress_String__kraft/)
[Build / JDK 21 and Scala 2.13 / org.apache.kafka.tools.TopicCommandIntegrationTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String).kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/org.apache.kafka.tools/TopicCommandIntegrationTest/Build___JDK_21_and_Scala_2_13___testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress_String__kraft_2/)
[Build / JDK 11 and Scala 2.13 / kafka.api.PlaintextConsumerTest.testCoordinatorFailover(String, String).quorum=zk.groupProtocol=generic](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/kafka.api/PlaintextConsumerTest/Build___JDK_11_and_Scala_2_13___testCoordinatorFailover_String__String__quorum_zk_groupProtocol_generic/)
[Build / JDK 11 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_11_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated__/)
[Build / JDK 17 and Scala 2.13 / kafka.api.SaslPlaintextConsumerTest.testCoordinatorFailover(String, String).quorum=zk.groupProtocol=generic](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/kafka.api/SaslPlaintextConsumerTest/Build___JDK_17_and_Scala_2_13___testCoordinatorFailover_String__String__quorum_zk_groupProtocol_generic/)
[Build / JDK 17 and Scala 2.13 / kafka.api.TransactionsBounceTest.testWithGroupId()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/kafka.api/TransactionsBounceTest/Build___JDK_17_and_Scala_2_13___testWithGroupId__/)
[Build / JDK 17 and Scala 2.13 / kafka.api.TransactionsBounceTest.testWithGroupId()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/kafka.api/TransactionsBounceTest/Build___JDK_17_and_Scala_2_13___testWithGroupId___2/)

@divijvaidya divijvaidya merged commit cd1eb63 into apache:trunk Jan 1, 2024
yyu1993 pushed a commit to yyu1993/kafka that referenced this pull request Feb 15, 2024
This commit fixes to prevent socket leaks, mostly through clients that are left open.

Reviewers: Mickael Maison <mickael.maison@gmail.com>
AnatolyPopov pushed a commit to aiven/kafka that referenced this pull request Feb 16, 2024
This commit fixes to prevent socket leaks, mostly through clients that are left open.

Reviewers: Mickael Maison <mickael.maison@gmail.com>
clolov pushed a commit to clolov/kafka that referenced this pull request Apr 5, 2024
This commit fixes to prevent socket leaks, mostly through clients that are left open.

Reviewers: Mickael Maison <mickael.maison@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

connect tests Test fixes (including flaky tests)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants