diff --git a/README.md b/README.md index e996365475b3..8423f31f13eb 100644 --- a/README.md +++ b/README.md @@ -4,10 +4,8 @@ See our [web site](https://kafka.apache.org) for details on the project. You need to have [Java](http://www.oracle.com/technetwork/java/javase/downloads/index.html) installed. -We build and test Apache Kafka with 11, 17 and 21. We set the `release` parameter in javac and scalac -to `11` to ensure the generated binaries are compatible with Java 11 or higher (independently of the Java version -used for compilation). Java 11 support for the broker and tools has been deprecated since Apache Kafka 3.7 and removal -of both is planned for Apache Kafka 4.0.([KIP-1013](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789510) for more details). +We build and test Apache Kafka with 17 and 23. The `release` parameter in javac and scalac is set to `11` for the clients +and streams modules, and `17` for the broker and tools, ensuring compatibility with their respective minimum Java versions. Scala 2.13 is the only supported version in Apache Kafka. diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java index 1ed2d49c9628..82b0f111f534 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeLogDirsResult.java @@ -19,15 +19,11 @@ import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.annotation.InterfaceStability; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.DescribeLogDirsResponse; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; -import java.util.stream.Stream; /** @@ -43,36 +39,6 @@ public class DescribeLogDirsResult { this.futures = futures; } - /** - * Return a map from brokerId to future which can be used to check the information of partitions on each individual broker. - * @deprecated Deprecated Since Kafka 2.7. Use {@link #descriptions()}. - */ - @Deprecated - public Map>> values() { - return descriptions().entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - entry -> entry.getValue().thenApply(this::convertMapValues))); - } - - @SuppressWarnings("deprecation") - private Map convertMapValues(Map map) { - Stream> stream = map.entrySet().stream(); - return stream.collect(Collectors.toMap( - Map.Entry::getKey, - infoEntry -> { - LogDirDescription logDir = infoEntry.getValue(); - return new DescribeLogDirsResponse.LogDirInfo(logDir.error() == null ? Errors.NONE : Errors.forException(logDir.error()), - logDir.replicaInfos().entrySet().stream().collect(Collectors.toMap( - Map.Entry::getKey, - replicaEntry -> new DescribeLogDirsResponse.ReplicaInfo( - replicaEntry.getValue().size(), - replicaEntry.getValue().offsetLag(), - replicaEntry.getValue().isFuture()) - ))); - })); - } - /** * Return a map from brokerId to future which can be used to check the information of partitions on each individual broker. * The result of the future is a map from broker log directory path to a description of that log directory. @@ -81,18 +47,6 @@ public Map>> descriptions() return futures; } - /** - * Return a future which succeeds only if all the brokers have responded without error - * @deprecated Deprecated Since Kafka 2.7. Use {@link #allDescriptions()}. - */ - @Deprecated - public KafkaFuture>> all() { - return allDescriptions().thenApply(map -> map.entrySet().stream().collect(Collectors.toMap( - Map.Entry::getKey, - entry -> convertMapValues(entry.getValue()) - ))); - } - /** * Return a future which succeeds only if all the brokers have responded without error. * The result of the future is a map from brokerId to a map from broker log directory path diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index f60c97ad6fe5..a900fac95aa8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -161,13 +161,23 @@ public synchronized void subscribe(Pattern pattern) { } @Override - public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { - throw new UnsupportedOperationException("Subscribe to RE2/J regular expression not supported in MockConsumer yet"); + public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener listener) { + if (listener == null) + throw new IllegalArgumentException("RebalanceListener cannot be null"); + subscribe(pattern, Optional.of(listener)); } @Override public void subscribe(SubscriptionPattern pattern) { - throw new UnsupportedOperationException("Subscribe to RE2/J regular expression not supported in MockConsumer yet"); + subscribe(pattern, Optional.empty()); + } + + private void subscribe(SubscriptionPattern pattern, Optional listener) { + if (pattern == null || pattern.toString().isEmpty()) + throw new IllegalArgumentException("Topic pattern cannot be " + (pattern == null ? "null" : "empty")); + ensureNotClosed(); + committed.clear(); + this.subscriptions.subscribe(pattern, listener); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 04af4e450c77..32e5fe32f485 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1799,8 +1799,10 @@ public void subscribe(Pattern pattern) { } @Override - public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) { - subscribeToRegex(pattern, Optional.ofNullable(callback)); + public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener listener) { + if (listener == null) + throw new IllegalArgumentException("RebalanceListener cannot be null"); + subscribeToRegex(pattern, Optional.of(listener)); } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java index 0177b2f0f553..b2245d3edce9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java @@ -17,7 +17,6 @@ package org.apache.kafka.common.requests; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.message.DescribeLogDirsResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; @@ -69,71 +68,6 @@ public static DescribeLogDirsResponse parse(ByteBuffer buffer, short version) { return new DescribeLogDirsResponse(new DescribeLogDirsResponseData(new ByteBufferAccessor(buffer), version)); } - // Note this class is part of the public API, reachable from Admin.describeLogDirs() - /** - * Possible error code: - * - * KAFKA_STORAGE_ERROR (56) - * UNKNOWN (-1) - * - * @deprecated Deprecated Since Kafka 2.7. - * Use {@link org.apache.kafka.clients.admin.DescribeLogDirsResult#descriptions()} - * and {@link org.apache.kafka.clients.admin.DescribeLogDirsResult#allDescriptions()} to access the replacement - * class {@link org.apache.kafka.clients.admin.LogDirDescription}. - */ - @Deprecated - public static class LogDirInfo { - public final Errors error; - public final Map replicaInfos; - - public LogDirInfo(Errors error, Map replicaInfos) { - this.error = error; - this.replicaInfos = replicaInfos; - } - - @Override - public String toString() { - return "(error=" + - error + - ", replicas=" + - replicaInfos + - ")"; - } - } - - // Note this class is part of the public API, reachable from Admin.describeLogDirs() - - /** - * @deprecated Deprecated Since Kafka 2.7. - * Use {@link org.apache.kafka.clients.admin.DescribeLogDirsResult#descriptions()} - * and {@link org.apache.kafka.clients.admin.DescribeLogDirsResult#allDescriptions()} to access the replacement - * class {@link org.apache.kafka.clients.admin.ReplicaInfo}. - */ - @Deprecated - public static class ReplicaInfo { - - public final long size; - public final long offsetLag; - public final boolean isFuture; - - public ReplicaInfo(long size, long offsetLag, boolean isFuture) { - this.size = size; - this.offsetLag = offsetLag; - this.isFuture = isFuture; - } - - @Override - public String toString() { - return "(size=" + - size + - ", offsetLag=" + - offsetLag + - ", isFuture=" + - isFuture + - ")"; - } - } - @Override public boolean shouldClientThrottle(short version) { return version >= 1; diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 17e0a20dbf90..782dd00d0a3e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -2320,50 +2320,6 @@ public void testDescribeLogDirsWithVolumeBytes() throws ExecutionException, Inte } } - @SuppressWarnings("deprecation") - @Test - public void testDescribeLogDirsDeprecated() throws ExecutionException, InterruptedException { - Set brokers = singleton(0); - TopicPartition tp = new TopicPartition("topic", 12); - String logDir = "/var/data/kafka"; - Errors error = Errors.NONE; - int offsetLag = 24; - long partitionSize = 1234567890; - - try (AdminClientUnitTestEnv env = mockClientEnv()) { - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - env.kafkaClient().prepareResponseFrom( - prepareDescribeLogDirsResponse(error, logDir, tp, partitionSize, offsetLag), - env.cluster().nodeById(0)); - - DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers); - - Map>> deprecatedValues = result.values(); - assertEquals(brokers, deprecatedValues.keySet()); - assertNotNull(deprecatedValues.get(0)); - assertDescriptionContains(deprecatedValues.get(0).get(), logDir, tp, error, offsetLag, partitionSize); - - Map> deprecatedAll = result.all().get(); - assertEquals(brokers, deprecatedAll.keySet()); - assertDescriptionContains(deprecatedAll.get(0), logDir, tp, error, offsetLag, partitionSize); - } - } - - @SuppressWarnings("deprecation") - private static void assertDescriptionContains(Map descriptionsMap, - String logDir, TopicPartition tp, Errors error, - int offsetLag, long partitionSize) { - assertNotNull(descriptionsMap); - assertEquals(singleton(logDir), descriptionsMap.keySet()); - assertEquals(error, descriptionsMap.get(logDir).error); - Map allReplicaInfos = - descriptionsMap.get(logDir).replicaInfos; - assertEquals(singleton(tp), allReplicaInfos.keySet()); - assertEquals(partitionSize, allReplicaInfos.get(tp).size); - assertEquals(offsetLag, allReplicaInfos.get(tp).offsetLag); - assertFalse(allReplicaInfos.get(tp).isFuture); - } - @Test public void testDescribeLogDirsOfflineDir() throws ExecutionException, InterruptedException { Set brokers = singleton(0); @@ -2396,39 +2352,6 @@ public void testDescribeLogDirsOfflineDir() throws ExecutionException, Interrupt } } - @SuppressWarnings("deprecation") - @Test - public void testDescribeLogDirsOfflineDirDeprecated() throws ExecutionException, InterruptedException { - Set brokers = singleton(0); - String logDir = "/var/data/kafka"; - Errors error = Errors.KAFKA_STORAGE_ERROR; - - try (AdminClientUnitTestEnv env = mockClientEnv()) { - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - env.kafkaClient().prepareResponseFrom( - prepareDescribeLogDirsResponse(error, logDir, emptyList()), - env.cluster().nodeById(0)); - - DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers); - - Map>> deprecatedValues = result.values(); - assertEquals(brokers, deprecatedValues.keySet()); - assertNotNull(deprecatedValues.get(0)); - Map valuesMap = deprecatedValues.get(0).get(); - assertEquals(singleton(logDir), valuesMap.keySet()); - assertEquals(error, valuesMap.get(logDir).error); - assertEquals(emptySet(), valuesMap.get(logDir).replicaInfos.keySet()); - - Map> deprecatedAll = result.all().get(); - assertEquals(brokers, deprecatedAll.keySet()); - Map allMap = deprecatedAll.get(0); - assertNotNull(allMap); - assertEquals(singleton(logDir), allMap.keySet()); - assertEquals(error, allMap.get(logDir).error); - assertEquals(emptySet(), allMap.get(logDir).replicaInfos.keySet()); - } - } - @Test public void testDescribeReplicaLogDirs() throws ExecutionException, InterruptedException { TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 12, 1); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java index b69a3e56955e..41c9f199d15e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java @@ -35,6 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class MockConsumerTest { @@ -163,5 +164,20 @@ public void onPartitionsAssigned(Collection partitions) { assertEquals(1, revoked.size()); assertTrue(revoked.contains(topicPartitionList.get(0))); } + + @Test + public void testRe2JPatternSubscription() { + assertThrows(IllegalArgumentException.class, () -> consumer.subscribe((SubscriptionPattern) null)); + assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(new SubscriptionPattern(""))); + + SubscriptionPattern pattern = new SubscriptionPattern("t.*"); + assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(pattern, null)); + + consumer.subscribe(pattern); + assertTrue(consumer.subscription().isEmpty()); + // Check that the subscription to pattern was successfully applied in the mock consumer (using a different + // subscription type should fail) + assertThrows(IllegalStateException.class, () -> consumer.subscribe(List.of("topic1"))); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 62e3f23caf98..0f87fb9037c4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -1864,6 +1864,9 @@ public void testSubscribeToRe2JPatternValidation() { assertEquals("Topic pattern to subscribe to cannot be empty", t.getMessage()); assertDoesNotThrow(() -> consumer.subscribe(new SubscriptionPattern("t*"))); + + assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(new SubscriptionPattern("t*"), null)); + assertDoesNotThrow(() -> consumer.subscribe(new SubscriptionPattern("t*"), mock(ConsumerRebalanceListener.class))); } @Test diff --git a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala index 95c0b015ac53..e24731261ac6 100644 --- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala +++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala @@ -444,35 +444,6 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { () => admin.incrementalAlterConfigs(configs).all().get(), "Disabling remote storage feature on the topic level is not supported.") } - @ParameterizedTest - @ValueSource(strings = Array("zk")) - def testUpdateInvalidRemoteStorageConfigUnderZK(quorum: String): Unit = { - val admin = createAdminClient() - val errorMsg = "It is invalid to set `remote.log.delete.on.disable` or `remote.log.copy.disable` under Zookeeper's mode." - val topicConfig = new Properties - topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") - TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, - topicConfig = topicConfig) - - val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]() - configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName), - util.Arrays.asList( - new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"), - AlterConfigOp.OpType.SET), - )) - assertThrowsException(classOf[InvalidConfigurationException], - () => admin.incrementalAlterConfigs(configs).all().get(), errorMsg) - - configs.clear() - configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName), - util.Arrays.asList( - new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true"), - AlterConfigOp.OpType.SET), - )) - assertThrowsException(classOf[InvalidConfigurationException], - () => admin.incrementalAlterConfigs(configs).all().get(), errorMsg) - } - @ParameterizedTest @ValueSource(strings = Array("kraft")) def testTopicDeletion(quorum: String): Unit = { @@ -501,7 +472,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, brokerCount, topicConfig = topicConfig) - val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnectOrNull).head + val tsDisabledProps = TestUtils.createBrokerConfigs(1, null).head instanceConfigs = List(KafkaConfig.fromProps(tsDisabledProps)) recreateBrokers(startup = true) @@ -519,7 +490,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, brokerCount, topicConfig = topicConfig) - val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnectOrNull).head + val tsDisabledProps = TestUtils.createBrokerConfigs(1, null).head instanceConfigs = List(KafkaConfig.fromProps(tsDisabledProps)) recreateBrokers(startup = true) diff --git a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala index 125411168d46..fd6cac4a5328 100644 --- a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala +++ b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala @@ -181,7 +181,7 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup { * is thrown immediately, and is not affected by connection.failed.authentication.delay.ms. */ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) def testServerAuthenticationFailure(quorum: String, groupProtocol: String): Unit = { // Setup client with a non-existent service principal, so that server authentication fails on the client val clientLoginContext = jaasClientLoginModule(kafkaClientSaslMechanism, Some("another-kafka-service")) diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index e05ea8b18a06..384d067e2b6e 100755 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -17,14 +17,13 @@ package kafka.admin -import java.util.{Collections, Optional} +import java.util.Collections import kafka.controller.ReplicaAssignment import kafka.server.{BaseRequestTest, BrokerServer} import kafka.utils.TestUtils import kafka.utils.TestUtils._ import org.apache.kafka.clients.admin.{Admin, NewPartitions, NewTopic} import org.apache.kafka.common.errors.InvalidReplicaAssignmentException -import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{BeforeEach, TestInfo} @@ -178,7 +177,7 @@ class AddPartitionsTest extends BaseRequestTest { } @ParameterizedTest - @ValueSource(strings = Array("zk")) // TODO: add kraft support + @ValueSource(strings = Array("kraft")) def testReplicaPlacementAllServers(quorum: String): Unit = { admin.createPartitions(Collections.singletonMap(topic3, NewPartitions.increaseTo(7))).all().get() @@ -194,17 +193,19 @@ class AddPartitionsTest extends BaseRequestTest { new MetadataRequest.Builder(Seq(topic3).asJava, false).build) assertEquals(1, response.topicMetadata.size) val topicMetadata = response.topicMetadata.asScala.head - validateLeaderAndReplicas(topicMetadata, 0, 2, Set(2, 3, 0, 1)) - validateLeaderAndReplicas(topicMetadata, 1, 3, Set(3, 2, 0, 1)) - validateLeaderAndReplicas(topicMetadata, 2, 0, Set(0, 3, 1, 2)) - validateLeaderAndReplicas(topicMetadata, 3, 1, Set(1, 0, 2, 3)) - validateLeaderAndReplicas(topicMetadata, 4, 2, Set(2, 3, 0, 1)) - validateLeaderAndReplicas(topicMetadata, 5, 3, Set(3, 0, 1, 2)) - validateLeaderAndReplicas(topicMetadata, 6, 0, Set(0, 1, 2, 3)) + + assertEquals(7, topicMetadata.partitionMetadata.size) + for (partition <- topicMetadata.partitionMetadata.asScala) { + val replicas = partition.replicaIds.asScala.toSet + assertEquals(4, replicas.size, s"Partition ${partition.partition} should have 4 replicas") + assertTrue(replicas.subsetOf(Set(0, 1, 2, 3)), s"Replicas should only include brokers 0-3") + assertTrue(partition.leaderId.isPresent, s"Partition ${partition.partition} should have a leader") + assertTrue(replicas.contains(partition.leaderId.get), "Leader should be one of the replicas") + } } @ParameterizedTest - @ValueSource(strings = Array("zk")) // TODO: add kraft support + @ValueSource(strings = Array("kraft")) def testReplicaPlacementPartialServers(quorum: String): Unit = { admin.createPartitions(Collections.singletonMap(topic2, NewPartitions.increaseTo(3))).all().get() @@ -216,19 +217,15 @@ class AddPartitionsTest extends BaseRequestTest { new MetadataRequest.Builder(Seq(topic2).asJava, false).build) assertEquals(1, response.topicMetadata.size) val topicMetadata = response.topicMetadata.asScala.head - validateLeaderAndReplicas(topicMetadata, 0, 1, Set(1, 2)) - validateLeaderAndReplicas(topicMetadata, 1, 2, Set(0, 2)) - validateLeaderAndReplicas(topicMetadata, 2, 3, Set(1, 3)) - } - - def validateLeaderAndReplicas(metadata: TopicMetadata, partitionId: Int, expectedLeaderId: Int, - expectedReplicas: Set[Int]): Unit = { - val partitionOpt = metadata.partitionMetadata.asScala.find(_.partition == partitionId) - assertTrue(partitionOpt.isDefined, s"Partition $partitionId should exist") - val partition = partitionOpt.get - assertEquals(Optional.of(expectedLeaderId), partition.leaderId, "Partition leader id should match") - assertEquals(expectedReplicas, partition.replicaIds.asScala.toSet, "Replica set should match") + assertEquals(3, topicMetadata.partitionMetadata.size) + for (partition <- topicMetadata.partitionMetadata.asScala) { + val replicas = partition.replicaIds.asScala.toSet + assertEquals(2, replicas.size, s"Partition ${partition.partition} should have 2 replicas") + assertTrue(replicas.subsetOf(Set(0, 1, 2, 3)), s"Replicas should only include brokers 0-3") + assertTrue(partition.leaderId.isPresent, s"Partition ${partition.partition} should have a leader") + assertTrue(replicas.contains(partition.leaderId.get), "Leader should be one of the replicas") + } } } diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala index 74ef9d96964f..940703c828c8 100644 --- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala @@ -20,7 +20,6 @@ package kafka.server import java.util import kafka.network.SocketServer import kafka.utils.{Logging, TestUtils} -import org.apache.kafka.common.Uuid import org.apache.kafka.common.message.DeleteTopicsRequestData import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState import org.apache.kafka.common.protocol.Errors @@ -125,96 +124,6 @@ class DeleteTopicsRequestTest extends BaseRequestTest with Logging { } } - /* - * Only run this test against ZK cluster. The KRaft controller doesn't perform operations that have timed out. - */ - @ParameterizedTest - @ValueSource(strings = Array("zk")) - def testErrorDeleteTopicRequests(quorum: String): Unit = { - val timeout = 30000 - val timeoutTopic = "invalid-timeout" - - // Basic - validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder( - new DeleteTopicsRequestData() - .setTopicNames(util.Arrays.asList("invalid-topic")) - .setTimeoutMs(timeout)).build(), - Map("invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION)) - - // Partial - createTopic("partial-topic-1") - validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder( - new DeleteTopicsRequestData() - .setTopicNames(util.Arrays.asList("partial-topic-1", "partial-invalid-topic")) - .setTimeoutMs(timeout)).build(), - Map( - "partial-topic-1" -> Errors.NONE, - "partial-invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION - ) - ) - - // Topic IDs - createTopic("topic-id-1") - val validId = getTopicIds()("topic-id-1") - val invalidId = Uuid.randomUuid - validateErrorDeleteTopicRequestsWithIds(new DeleteTopicsRequest.Builder( - new DeleteTopicsRequestData() - .setTopics(util.Arrays.asList(new DeleteTopicState().setTopicId(invalidId), - new DeleteTopicState().setTopicId(validId))) - .setTimeoutMs(timeout)).build(), - Map( - invalidId -> Errors.UNKNOWN_TOPIC_ID, - validId -> Errors.NONE - ) - ) - - // Timeout - createTopic(timeoutTopic, 5, 2) - // Must be a 0ms timeout to avoid transient test failures. Even a timeout of 1ms has succeeded in the past. - validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder( - new DeleteTopicsRequestData() - .setTopicNames(util.Arrays.asList(timeoutTopic)) - .setTimeoutMs(0)).build(), - Map(timeoutTopic -> Errors.REQUEST_TIMED_OUT)) - // The topic should still get deleted eventually - TestUtils.waitUntilTrue(() => !brokers.head.metadataCache.contains(timeoutTopic), s"Topic $timeoutTopic is never deleted") - validateTopicIsDeleted(timeoutTopic) - } - - private def validateErrorDeleteTopicRequests(request: DeleteTopicsRequest, expectedResponse: Map[String, Errors]): Unit = { - val response = sendDeleteTopicsRequest(request) - val errors = response.data.responses - - val errorCount = response.errorCounts().asScala.foldLeft(0)(_+_._2) - assertEquals(expectedResponse.size, errorCount, "The response size should match") - - expectedResponse.foreach { case (topic, expectedError) => - assertEquals(expectedResponse(topic).code, errors.find(topic).errorCode, "The response error should match") - // If no error validate the topic was deleted - if (expectedError == Errors.NONE) { - validateTopicIsDeleted(topic) - } - } - } - - private def validateErrorDeleteTopicRequestsWithIds(request: DeleteTopicsRequest, expectedResponse: Map[Uuid, Errors]): Unit = { - val response = sendDeleteTopicsRequest(request) - val responses = response.data.responses - val errors = responses.asScala.map(result => result.topicId() -> result.errorCode()).toMap - val names = responses.asScala.map(result => result.topicId() -> result.name()).toMap - - val errorCount = response.errorCounts().asScala.foldLeft(0)(_+_._2) - assertEquals(expectedResponse.size, errorCount, "The response size should match") - - expectedResponse.foreach { case (topic, expectedError) => - assertEquals(expectedResponse(topic).code, errors(topic), "The response error should match") - // If no error validate the topic was deleted - if (expectedError == Errors.NONE) { - validateTopicIsDeleted(names(topic)) - } - } - } - private def validateTopicIsDeleted(topic: String): Unit = { val metadata = connectAndReceive[MetadataResponse](new MetadataRequest.Builder( List(topic).asJava, true).build).topicMetadata.asScala @@ -230,7 +139,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest with Logging { } @ParameterizedTest - @ValueSource(strings = Array("zk")) + @ValueSource(strings = Array("kraft")) def testDeleteTopicsVersions(quorum: String): Unit = { // This test assumes that the current valid versions are 0-6 please adjust the test if there are changes. assertEquals(0, DeleteTopicsRequestData.LOWEST_SUPPORTED_VERSION) diff --git a/docs/upgrade.html b/docs/upgrade.html index 9cc3aca61c3a..4ed85d69c654 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -38,6 +38,12 @@
Notable changes in 4
  • The bufferpool-wait-time-total, io-waittime-total, and iotime-total metrics were removed. Please use bufferpool-wait-time-ns-total, io-wait-time-ns-total, and io-time-ns-total metrics as replacements, respectively.
  • +
  • The kafka.common.requests.DescribeLogDirsResponse.LogDirInfo class was removed. Please use the kafka.clients.admin.DescribeLogDirsResult.descriptions() class + and kafka.clients.admin.DescribeLogDirsResult.allDescriptions()instead. +
  • +
  • The kafka.common.requests.DescribeLogDirsResponse.ReplicaInfo class was removed. Please use the kafka.clients.admin.DescribeLogDirsResult.descriptions() class + and kafka.clients.admin.DescribeLogDirsResult.allDescriptions()instead. +
  • Broker diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java index d60a13915b54..d9c7c91bb5c7 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java @@ -29,7 +29,6 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -76,6 +75,7 @@ import static org.apache.kafka.streams.KafkaStreams.State.ERROR; import static org.apache.kafka.streams.KafkaStreams.State.REBALANCING; import static org.apache.kafka.streams.KafkaStreams.State.RUNNING; +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -168,17 +168,21 @@ public void shouldThrowAnExceptionWhenNumberOfPartitionsOfRepartitionOperationDo .to(outputTopic); final Properties streamsConfiguration = createStreamsConfig(topologyOptimization); - builder.build(streamsConfiguration); - - startStreams(builder, REBALANCING, ERROR, streamsConfiguration, (t, e) -> expectedThrowable.set(e)); - - final String expectedMsg = String.format("Number of partitions [%s] of repartition topic [%s] " + - "doesn't match number of partitions [%s] of the source topic.", - inputTopicRepartitionedNumOfPartitions, - toRepartitionTopicName(inputTopicRepartitionName), - topicBNumberOfPartitions); - assertNotNull(expectedThrowable.get()); - assertTrue(expectedThrowable.get().getMessage().contains(expectedMsg)); + try (final KafkaStreams ks = new KafkaStreams(builder.build(streamsConfiguration), streamsConfiguration)) { + ks.setUncaughtExceptionHandler(exception -> { + expectedThrowable.set(exception); + return SHUTDOWN_CLIENT; + }); + ks.start(); + TestUtils.waitForCondition(() -> ks.state() == ERROR, 30_000, "Kafka Streams never went into error state"); + final String expectedMsg = String.format("Number of partitions [%s] of repartition topic [%s] " + + "doesn't match number of partitions [%s] of the source topic.", + inputTopicRepartitionedNumOfPartitions, + toRepartitionTopicName(inputTopicRepartitionName), + topicBNumberOfPartitions); + assertNotNull(expectedThrowable.get()); + assertTrue(expectedThrowable.get().getMessage().contains(expectedMsg)); + } } @ParameterizedTest @@ -723,7 +727,7 @@ public void shouldGoThroughRebalancingCorrectly(final String topologyOptimizatio ) ); - kafkaStreamsToClose.close(); + kafkaStreamsToClose.close(Duration.ofSeconds(5)); sendEvents( timestamp, @@ -814,36 +818,12 @@ private void sendEvents(final String topic, } private KafkaStreams startStreams(final StreamsBuilder builder, final Properties streamsConfiguration) throws InterruptedException { - return startStreams(builder, REBALANCING, RUNNING, streamsConfiguration, null); - } - - private KafkaStreams startStreams(final StreamsBuilder builder, - final State expectedOldState, - final State expectedNewState, - final Properties streamsConfiguration, - final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) throws InterruptedException { final CountDownLatch latch; final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(streamsConfiguration), streamsConfiguration); - - if (uncaughtExceptionHandler == null) { - latch = new CountDownLatch(1); - } else { - latch = new CountDownLatch(2); - kafkaStreams.setUncaughtExceptionHandler(e -> { - uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e); - latch.countDown(); - if (e instanceof RuntimeException) { - throw (RuntimeException) e; - } else if (e instanceof Error) { - throw (Error) e; - } else { - throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", e); - } - }); - } + latch = new CountDownLatch(1); kafkaStreams.setStateListener((newState, oldState) -> { - if (expectedOldState == oldState && expectedNewState == newState) { + if (REBALANCING == oldState && RUNNING == newState) { latch.countDown(); } }); diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index 2879436e5000..b9cc75b9fde2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -384,7 +384,9 @@ public synchronized GlobalKTable globalTable(final String topic, final MaterializedInternal> materializedInternal = new MaterializedInternal<>( Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()), - internalStreamsBuilder, topic + "-"); + internalStreamsBuilder, + topic + "-", + true /* force materializing global tables */); return internalStreamsBuilder.globalTable(topic, consumedInternal, materializedInternal); } @@ -517,7 +519,7 @@ public synchronized GlobalKTable globalTable(final String topic, */ public synchronized StreamsBuilder addStateStore(final StoreBuilder builder) { Objects.requireNonNull(builder, "builder can't be null"); - internalStreamsBuilder.addStateStore(new StoreBuilderWrapper(builder)); + internalStreamsBuilder.addStateStore(StoreBuilderWrapper.wrapStoreBuilder(builder)); return this; } @@ -556,7 +558,7 @@ public synchronized StreamsBuilder addGlobalStore(final StoreBuilder< Objects.requireNonNull(storeBuilder, "storeBuilder can't be null"); Objects.requireNonNull(consumed, "consumed can't be null"); internalStreamsBuilder.addGlobalStore( - new StoreBuilderWrapper(storeBuilder), + StoreBuilderWrapper.wrapStoreBuilder(storeBuilder), topic, new ConsumedInternal<>(consumed), stateUpdateSupplier, diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java index 8fd34a473270..35fe13faa381 100644 --- a/streams/src/main/java/org/apache/kafka/streams/Topology.java +++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java @@ -34,7 +34,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.SinkNode; import org.apache.kafka.streams.processor.internals.SourceNode; -import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper; +import org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier; import org.apache.kafka.streams.state.StoreBuilder; import java.util.Set; @@ -853,14 +853,13 @@ public synchronized Topology addGlobalStore(final StoreBuilder sto final String processorName, final ProcessorSupplier stateUpdateSupplier) { internalTopologyBuilder.addGlobalStore( - new StoreBuilderWrapper(storeBuilder), sourceName, null, keyDeserializer, valueDeserializer, topic, processorName, - stateUpdateSupplier, + new StoreDelegatingProcessorSupplier<>(stateUpdateSupplier, Set.of(storeBuilder)), true ); return this; @@ -899,14 +898,13 @@ public synchronized Topology addGlobalStore(final StoreBuilder sto final String processorName, final ProcessorSupplier stateUpdateSupplier) { internalTopologyBuilder.addGlobalStore( - new StoreBuilderWrapper(storeBuilder), sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, - stateUpdateSupplier, + new StoreDelegatingProcessorSupplier<>(stateUpdateSupplier, Set.of(storeBuilder)), true ); return this; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index 92dde06e9c05..1e148ac047c9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -140,7 +140,7 @@ public KTable table(final String topic, final String tableSourceName = named .orElseGenerateWithPrefix(this, KTableImpl.SOURCE_NAME); - final KTableSource tableSource = new KTableSource<>(materialized.storeName(), materialized.queryableStoreName()); + final KTableSource tableSource = new KTableSource<>(materialized); final ProcessorParameters processorParameters = new ProcessorParameters<>(tableSource, tableSourceName); final TableSourceNode tableSourceNode = TableSourceNode.tableSourceNodeBuilder() @@ -148,7 +148,6 @@ public KTable table(final String topic, .withSourceName(sourceName) .withNodeName(tableSourceName) .withConsumedInternal(consumed) - .withMaterializedInternal(materialized) .withProcessorParameters(processorParameters) .build(); tableSourceNode.setOutputVersioned(materialized.storeSupplier() instanceof VersionedBytesStoreSupplier); @@ -186,9 +185,7 @@ public GlobalKTable globalTable(final String topic, final String processorName = named .orElseGenerateWithPrefix(this, KTableImpl.SOURCE_NAME); - // enforce store name as queryable name to always materialize global table stores - final String storeName = materialized.storeName(); - final KTableSource tableSource = new KTableSource<>(storeName, storeName); + final KTableSource tableSource = new KTableSource<>(materialized); final ProcessorParameters processorParameters = new ProcessorParameters<>(tableSource, processorName); @@ -197,12 +194,12 @@ public GlobalKTable globalTable(final String topic, .isGlobalKTable(true) .withSourceName(sourceName) .withConsumedInternal(consumed) - .withMaterializedInternal(materialized) .withProcessorParameters(processorParameters) .build(); addGraphNode(root, tableSourceNode); + final String storeName = materialized.storeName(); return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<>(storeName), materialized.queryableStoreName()); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index ec2fd211efb3..b650724055b1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -658,10 +658,7 @@ public KTable toTable(final Named named, subTopologySourceNodes = this.subTopologySourceNodes; } - final KTableSource tableSource = new KTableSource<>( - materializedInternal.storeName(), - materializedInternal.queryableStoreName() - ); + final KTableSource tableSource = new KTableSource<>(materializedInternal); final ProcessorParameters processorParameters = new ProcessorParameters<>(tableSource, name); final GraphNode tableNode = new StreamToTableNode<>( name, @@ -1171,7 +1168,7 @@ private KStream doStreamTableJoin(final KTable table, bufferStoreName = Optional.of(name + "-Buffer"); final RocksDBTimeOrderedKeyValueBuffer.Builder storeBuilder = new RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferStoreName.get(), joinedInternal.gracePeriod(), name); - builder.addStateStore(new StoreBuilderWrapper(storeBuilder)); + builder.addStateStore(StoreBuilderWrapper.wrapStoreBuilder(storeBuilder)); } final ProcessorSupplier processorSupplier = new KStreamKTableJoin<>( diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java index 394c13005880..12bb6c19db8b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java @@ -262,7 +262,7 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier, private static StoreFactory joinWindowStoreBuilderFromSupplier(final WindowBytesStoreSupplier storeSupplier, final Serde keySerde, final Serde valueSerde) { - return new StoreBuilderWrapper(Stores.windowStoreBuilder( + return StoreBuilderWrapper.wrapStoreBuilder(Stores.windowStoreBuilder( storeSupplier, keySerde, valueSerde diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 09efdb780069..2c75167f019e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -590,7 +590,7 @@ public KTable suppress(final Suppressed suppressed) { final ProcessorGraphNode> node = new TableSuppressNode<>( name, new ProcessorParameters<>(suppressionSupplier, name), - new StoreBuilderWrapper(storeBuilder) + StoreBuilderWrapper.wrapStoreBuilder(storeBuilder) ); node.setOutputVersioned(false); @@ -1227,10 +1227,7 @@ private KTable doJoinOnForeignKey(final KTable forei materializedInternal.withKeySerde(keySerde); } - final KTableSource resultProcessorSupplier = new KTableSource<>( - materializedInternal.storeName(), - materializedInternal.queryableStoreName() - ); + final KTableSource resultProcessorSupplier = new KTableSource<>(materializedInternal); final StoreFactory resultStore = new KeyValueStoreMaterializer<>(materializedInternal); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java index b29a4fa51f13..e41f2bf06dd6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java @@ -17,12 +17,16 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.processor.internals.StoreFactory; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper; @@ -30,6 +34,7 @@ import org.slf4j.LoggerFactory; import java.util.Objects; +import java.util.Set; import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT; @@ -40,15 +45,17 @@ public class KTableSource implements ProcessorSupplier> materialized) { + this.storeName = materialized.storeName(); Objects.requireNonNull(storeName, "storeName can't be null"); - - this.storeName = storeName; - this.queryableName = queryableName; + this.queryableName = materialized.queryableStoreName(); this.sendOldValues = false; + this.storeFactory = new KeyValueStoreMaterializer<>(materialized); } public String queryableName() { @@ -60,6 +67,15 @@ public Processor> get() { return new KTableSourceProcessor(); } + @Override + public Set> stores() { + if (materialized()) { + return Set.of(new StoreFactory.FactoryWrappingStoreBuilder<>(storeFactory)); + } else { + return null; + } + } + // when source ktable requires sending old values, we just // need to set the queryable name as the store name to enforce materialization public void enableSendingOldValues() { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java index cf6ce76f8d56..d6cd130ba6db 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java @@ -39,12 +39,19 @@ public MaterializedInternal(final Materialized materialized) { public MaterializedInternal(final Materialized materialized, final InternalNameProvider nameProvider, final String generatedStorePrefix) { + this(materialized, nameProvider, generatedStorePrefix, false); + } + + public MaterializedInternal(final Materialized materialized, + final InternalNameProvider nameProvider, + final String generatedStorePrefix, + final boolean forceQueryable) { super(materialized); // if storeName is not provided, the corresponding KTable would never be queryable; // but we still need to provide an internal name for it in case we materialize. - queryable = storeName() != null; - if (!queryable && nameProvider != null) { + queryable = forceQueryable || storeName() != null; + if (storeName() == null && nameProvider != null) { storeName = nameProvider.newStoreName(generatedStorePrefix); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java index df6e7c263e6f..a9093ad47701 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java @@ -20,8 +20,11 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; +import org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier; import org.apache.kafka.streams.processor.internals.StoreFactory; +import java.util.Set; + public class GlobalStoreNode extends StateStoreNode { private final String sourceName; @@ -52,15 +55,16 @@ public GlobalStoreNode(final StoreFactory storeBuilder, @Override public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { storeBuilder.withLoggingDisabled(); - topologyBuilder.addGlobalStore(storeBuilder, - sourceName, + topologyBuilder.addGlobalStore(sourceName, consumed.timestampExtractor(), consumed.keyDeserializer(), consumed.valueDeserializer(), topic, processorName, - stateUpdateSupplier, - reprocessOnRestore); + new StoreDelegatingProcessorSupplier<>( + stateUpdateSupplier, + Set.of(new StoreFactory.FactoryWrappingStoreBuilder<>(storeBuilder)) + ), reprocessOnRestore); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java index f0f8e0dcb4a9..5e776a5c733d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java @@ -17,15 +17,10 @@ package org.apache.kafka.streams.kstream.internals.graph; -import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.internals.ConsumedInternal; import org.apache.kafka.streams.kstream.internals.KTableSource; -import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer; -import org.apache.kafka.streams.kstream.internals.MaterializedInternal; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; -import org.apache.kafka.streams.processor.internals.StoreFactory; -import org.apache.kafka.streams.state.KeyValueStore; import java.util.Collections; import java.util.Iterator; @@ -36,7 +31,6 @@ */ public class TableSourceNode extends SourceGraphNode { - private final MaterializedInternal materializedInternal; private final ProcessorParameters processorParameters; private final String sourceName; private final boolean isGlobalKTable; @@ -46,7 +40,6 @@ private TableSourceNode(final String nodeName, final String sourceName, final String topic, final ConsumedInternal consumedInternal, - final MaterializedInternal materializedInternal, final ProcessorParameters processorParameters, final boolean isGlobalKTable) { @@ -57,7 +50,6 @@ private TableSourceNode(final String nodeName, this.sourceName = sourceName; this.isGlobalKTable = isGlobalKTable; this.processorParameters = processorParameters; - this.materializedInternal = materializedInternal; } @@ -68,7 +60,6 @@ public void reuseSourceTopicForChangeLog(final boolean shouldReuseSourceTopicFor @Override public String toString() { return "TableSourceNode{" + - "materializedInternal=" + materializedInternal + ", processorParameters=" + processorParameters + ", sourceName='" + sourceName + '\'' + ", isGlobalKTable=" + isGlobalKTable + @@ -93,12 +84,8 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { throw new IllegalStateException("A table source node must have a single topic as input"); } - final StoreFactory storeFactory = - new KeyValueStoreMaterializer<>((MaterializedInternal>) materializedInternal); - if (isGlobalKTable) { topologyBuilder.addGlobalStore( - storeFactory, sourceName, consumedInternal().timestampExtractor(), consumedInternal().keyDeserializer(), @@ -116,16 +103,16 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { consumedInternal().valueDeserializer(), topicName); - topologyBuilder.addProcessor(processorParameters.processorName(), processorParameters.processorSupplier(), sourceName); + processorParameters.addProcessorTo(topologyBuilder, new String[] {sourceName}); - // only add state store if the source KTable should be materialized + // if the KTableSource should not be materialized, stores will be null or empty final KTableSource tableSource = (KTableSource) processorParameters.processorSupplier(); - if (tableSource.materialized()) { - topologyBuilder.addStateStore(storeFactory, nodeName()); - + if (tableSource.stores() != null) { if (shouldReuseSourceTopicForChangelog) { - storeFactory.withLoggingDisabled(); - topologyBuilder.connectSourceStoreAndTopic(storeFactory.name(), topicName); + tableSource.stores().forEach(store -> { + store.withLoggingDisabled(); + topologyBuilder.connectSourceStoreAndTopic(store.name(), topicName); + }); } } } @@ -138,7 +125,6 @@ public static final class TableSourceNodeBuilder { private String sourceName; private String topic; private ConsumedInternal consumedInternal; - private MaterializedInternal materializedInternal; private ProcessorParameters processorParameters; private boolean isGlobalKTable = false; @@ -155,11 +141,6 @@ public TableSourceNodeBuilder withTopic(final String topic) { return this; } - public TableSourceNodeBuilder withMaterializedInternal(final MaterializedInternal materializedInternal) { - this.materializedInternal = materializedInternal; - return this; - } - public TableSourceNodeBuilder withConsumedInternal(final ConsumedInternal consumedInternal) { this.consumedInternal = consumedInternal; return this; @@ -185,7 +166,6 @@ public TableSourceNode build() { sourceName, topic, consumedInternal, - materializedInternal, processorParameters, isGlobalKTable); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 2c5e798b62df..9f65a415d951 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyConfig; +import org.apache.kafka.streams.TopologyDescription; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.processor.StateStore; @@ -607,7 +608,7 @@ public final void addProcessor(final String name, public final void addStateStore(final StoreBuilder storeBuilder, final String... processorNames) { - addStateStore(new StoreBuilderWrapper(storeBuilder), false, processorNames); + addStateStore(StoreBuilderWrapper.wrapStoreBuilder(storeBuilder), false, processorNames); } public final void addStateStore(final StoreFactory storeFactory, @@ -638,8 +639,7 @@ public final void addStateStore(final StoreFactory storeFactory, nodeGroups = null; } - public final void addGlobalStore(final StoreFactory storeFactory, - final String sourceName, + public final void addGlobalStore(final String sourceName, final TimestampExtractor timestampExtractor, final Deserializer keyDeserializer, final Deserializer valueDeserializer, @@ -647,8 +647,15 @@ public final void addGlobalStore(final StoreFactory storeFactory, final String processorName, final ProcessorSupplier stateUpdateSupplier, final boolean reprocessOnRestore) { - Objects.requireNonNull(storeFactory, "store builder must not be null"); ApiUtils.checkSupplier(stateUpdateSupplier); + final Set> stores = stateUpdateSupplier.stores(); + if (stores == null || stores.size() != 1) { + throw new IllegalArgumentException( + "Global stores must pass in suppliers with exactly one store but got " + + (stores != null ? stores.size() : 0)); + } + final StoreFactory storeFactory = + StoreBuilderWrapper.wrapStoreBuilder(stores.iterator().next()); validateGlobalStoreArguments(sourceName, topic, processorName, @@ -2105,8 +2112,8 @@ public int compare(final TopologyDescription.Subtopology subtopology1, private static final SubtopologyComparator SUBTOPOLOGY_COMPARATOR = new SubtopologyComparator(); public static final class TopologyDescription implements org.apache.kafka.streams.TopologyDescription { - private final TreeSet subtopologies = new TreeSet<>(SUBTOPOLOGY_COMPARATOR); - private final TreeSet globalStores = new TreeSet<>(GLOBALSTORE_COMPARATOR); + private final TreeSet subtopologies = new TreeSet<>(SUBTOPOLOGY_COMPARATOR); + private final TreeSet globalStores = new TreeSet<>(GLOBALSTORE_COMPARATOR); private final String namedTopology; public TopologyDescription() { @@ -2117,21 +2124,21 @@ public TopologyDescription(final String namedTopology) { this.namedTopology = namedTopology; } - public void addSubtopology(final TopologyDescription.Subtopology subtopology) { + public void addSubtopology(final Subtopology subtopology) { subtopologies.add(subtopology); } - public void addGlobalStore(final TopologyDescription.GlobalStore globalStore) { + public void addGlobalStore(final GlobalStore globalStore) { globalStores.add(globalStore); } @Override - public Set subtopologies() { + public Set subtopologies() { return Collections.unmodifiableSet(subtopologies); } @Override - public Set globalStores() { + public Set globalStores() { return Collections.unmodifiableSet(globalStores); } @@ -2144,17 +2151,17 @@ public String toString() { } else { sb.append("Topology: ").append(namedTopology).append(":\n "); } - final TopologyDescription.Subtopology[] sortedSubtopologies = - subtopologies.descendingSet().toArray(new TopologyDescription.Subtopology[0]); - final TopologyDescription.GlobalStore[] sortedGlobalStores = + final Subtopology[] sortedSubtopologies = + subtopologies.descendingSet().toArray(new Subtopology[0]); + final GlobalStore[] sortedGlobalStores = globalStores.descendingSet().toArray(new GlobalStore[0]); int expectedId = 0; int subtopologiesIndex = sortedSubtopologies.length - 1; int globalStoresIndex = sortedGlobalStores.length - 1; while (subtopologiesIndex != -1 && globalStoresIndex != -1) { sb.append(" "); - final TopologyDescription.Subtopology subtopology = sortedSubtopologies[subtopologiesIndex]; - final TopologyDescription.GlobalStore globalStore = sortedGlobalStores[globalStoresIndex]; + final Subtopology subtopology = sortedSubtopologies[subtopologiesIndex]; + final GlobalStore globalStore = sortedGlobalStores[globalStoresIndex]; if (subtopology.id() == expectedId) { sb.append(subtopology); subtopologiesIndex--; @@ -2165,13 +2172,13 @@ public String toString() { expectedId++; } while (subtopologiesIndex != -1) { - final TopologyDescription.Subtopology subtopology = sortedSubtopologies[subtopologiesIndex]; + final Subtopology subtopology = sortedSubtopologies[subtopologiesIndex]; sb.append(" "); sb.append(subtopology); subtopologiesIndex--; } while (globalStoresIndex != -1) { - final TopologyDescription.GlobalStore globalStore = sortedGlobalStores[globalStoresIndex]; + final GlobalStore globalStore = sortedGlobalStores[globalStoresIndex]; sb.append(" "); sb.append(globalStore); globalStoresIndex--; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java index b8522b8e2cd9..4648533af1df 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java @@ -38,7 +38,15 @@ public class StoreBuilderWrapper implements StoreFactory { private final StoreBuilder builder; private final Set connectedProcessorNames = new HashSet<>(); - public StoreBuilderWrapper(final StoreBuilder builder) { + public static StoreFactory wrapStoreBuilder(final StoreBuilder builder) { + if (builder instanceof FactoryWrappingStoreBuilder) { + return ((FactoryWrappingStoreBuilder) builder).storeFactory(); + } else { + return new StoreBuilderWrapper(builder); + } + } + + private StoreBuilderWrapper(final StoreBuilder builder) { this.builder = builder; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreDelegatingProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreDelegatingProcessorSupplier.java new file mode 100644 index 000000000000..cce8281e15eb --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreDelegatingProcessorSupplier.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.state.StoreBuilder; + +import java.util.Set; + +public class StoreDelegatingProcessorSupplier implements ProcessorSupplier { + + private final ProcessorSupplier delegate; + private final Set> stores; + + public StoreDelegatingProcessorSupplier( + final ProcessorSupplier delegate, + final Set> stores + ) { + this.delegate = delegate; + this.stores = stores; + } + + @Override + public Set> stores() { + return stores; + } + + @Override + public Processor get() { + return delegate.get(); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java index b05c334c27f5..7542f4c5bd84 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java @@ -19,6 +19,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.StoreBuilder; import java.util.Map; import java.util.Set; @@ -75,4 +76,79 @@ default void configure(final StreamsConfig config) { boolean isCompatibleWith(StoreFactory storeFactory); + class FactoryWrappingStoreBuilder implements StoreBuilder { + + private final StoreFactory storeFactory; + + public FactoryWrappingStoreBuilder(final StoreFactory storeFactory) { + this.storeFactory = storeFactory; + } + + public StoreFactory storeFactory() { + return storeFactory; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final FactoryWrappingStoreBuilder that = (FactoryWrappingStoreBuilder) o; + + return storeFactory.isCompatibleWith(that.storeFactory); + } + + @Override + public int hashCode() { + return storeFactory.hashCode(); + } + + @Override + public StoreBuilder withCachingEnabled() { + throw new IllegalStateException("Should not try to modify StoreBuilder wrapper"); + } + + @Override + public StoreBuilder withCachingDisabled() { + storeFactory.withCachingDisabled(); + return this; + } + + @Override + public StoreBuilder withLoggingEnabled(final Map config) { + throw new IllegalStateException("Should not try to modify StoreBuilder wrapper"); + } + + @Override + public StoreBuilder withLoggingDisabled() { + storeFactory.withLoggingDisabled(); + return this; + } + + @SuppressWarnings("unchecked") + @Override + public T build() { + return (T) storeFactory.build(); + } + + @Override + public Map logConfig() { + return storeFactory.logConfig(); + } + + @Override + public boolean loggingEnabled() { + return storeFactory.loggingEnabled(); + } + + @Override + public String name() { + return storeFactory.name(); + } + } + } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index fc601d5b7376..5210dd0b3c60 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -55,7 +55,7 @@ import org.apache.kafka.streams.state.internals.RocksDBStore; import org.apache.kafka.streams.state.internals.RocksDBWindowStore; import org.apache.kafka.streams.state.internals.WrappedStateStore; -import org.apache.kafka.streams.utils.TestUtils.CountingProcessorWrapper; +import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper; import org.apache.kafka.test.MockApiProcessorSupplier; import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.MockPredicate; @@ -65,6 +65,7 @@ import org.apache.kafka.test.StreamsTestUtils; import org.hamcrest.CoreMatchers; +import org.hamcrest.Matchers; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -75,12 +76,12 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Random; import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import static java.util.Arrays.asList; @@ -1417,10 +1418,10 @@ public void shouldUseDefaultNameForGlobalStoreProcessor() { @Test public void shouldWrapProcessorsForProcess() { final Map props = dummyStreamsConfigMap(); - props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, CountingProcessorWrapper.class); + props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); - final AtomicInteger wrappedProcessorCount = new AtomicInteger(); - props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessorCount); + final Set wrappedProcessors = Collections.synchronizedSet(new HashSet<>()); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors); final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); @@ -1430,56 +1431,84 @@ public void shouldWrapProcessorsForProcess() { final Random random = new Random(); builder.stream("input") - .process((ProcessorSupplier) () -> record -> System.out.println("Processing: " + random.nextInt())) - .processValues(() -> record -> System.out.println("Processing: " + random.nextInt())) + .process((ProcessorSupplier) () -> record -> System.out.println("Processing: " + random.nextInt()), Named.as("processor1")) + .processValues(() -> record -> System.out.println("Processing: " + random.nextInt()), Named.as("processor2")) .to("output"); builder.build(); - assertThat(wrappedProcessorCount.get(), CoreMatchers.is(2)); + assertThat(wrappedProcessors.size(), CoreMatchers.is(2)); + assertThat(wrappedProcessors, Matchers.containsInAnyOrder("processor1", "processor2")); } @Test public void shouldWrapProcessorsForAggregationOperators() { final Map props = dummyStreamsConfigMap(); - props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, CountingProcessorWrapper.class); + props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); - final AtomicInteger wrappedProcessorCount = new AtomicInteger(); - props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessorCount); + final Set wrappedProcessors = Collections.synchronizedSet(new HashSet<>()); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors); final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); builder.stream("input") .groupByKey() - .count() // wrapped 1 - .toStream()// wrapped 2 + .count(Named.as("count")) // wrapped 1 + .toStream(Named.as("toStream"))// wrapped 2 .to("output"); builder.build(); - assertThat(wrappedProcessorCount.get(), CoreMatchers.is(2)); + assertThat(wrappedProcessors.size(), CoreMatchers.is(2)); + assertThat(wrappedProcessors, Matchers.containsInAnyOrder("count", "toStream")); } @Test public void shouldWrapProcessorsForStatelessOperators() { final Map props = dummyStreamsConfigMap(); - props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, CountingProcessorWrapper.class); + props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); - final AtomicInteger wrappedProcessorCount = new AtomicInteger(); - props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessorCount); + final Set wrappedProcessors = Collections.synchronizedSet(new HashSet<>()); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors); final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); builder.stream("input") - .filter((k, v) -> true) // wrapped 1 - .map(KeyValue::new) // wrapped 2 - .selectKey((k, v) -> k) // wrapped 3 - .peek((k, v) -> { }) // wrapped 4 - .flatMapValues(e -> new ArrayList<>()) // wrapped 5 - .toTable() // wrapped 6 - .toStream() // wrapped 7 + .filter((k, v) -> true, Named.as("filter")) // wrapped 1 + .map(KeyValue::new, Named.as("map")) // wrapped 2 + .selectKey((k, v) -> k, Named.as("selectKey")) // wrapped 3 + .peek((k, v) -> { }, Named.as("peek")) // wrapped 4 + .flatMapValues(e -> new ArrayList<>(), Named.as("flatMap")) // wrapped 5 + .toTable(Named.as("toTable")) // wrapped 6 (note named as toTable-repartition-filter) + .toStream(Named.as("toStream")) // wrapped 7 .to("output"); builder.build(); - assertThat(wrappedProcessorCount.get(), CoreMatchers.is(7)); + assertThat(wrappedProcessors.size(), CoreMatchers.is(7)); + assertThat(wrappedProcessors, Matchers.containsInAnyOrder( + "filter", "map", "selectKey", "peek", "flatMap", "toTable-repartition-filter", + "toStream" + )); + } + + @Test + public void shouldWrapProcessorsForTableSource() { + final Map props = dummyStreamsConfigMap(); + props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); + + final Set wrappedProcessors = Collections.synchronizedSet(new HashSet<>()); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors); + + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + + builder.table("input") // wrapped 1 (named KTABLE_SOURCE-0000000002) + .toStream(Named.as("toStream")) // wrapped 2 + .to("output"); + + builder.build(); + assertThat(wrappedProcessors.size(), CoreMatchers.is(2)); + assertThat(wrappedProcessors, Matchers.containsInAnyOrder( + "KTABLE-SOURCE-0000000002", + "toStream" + )); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index b001c98868f0..4467e252b92d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -42,7 +42,7 @@ import org.apache.kafka.streams.processor.internals.RecordCollectorTest; import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers; -import org.apache.kafka.streams.utils.TestUtils.CountingProcessorWrapper; +import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper; import org.apache.log4j.Level; import org.junit.jupiter.api.BeforeEach; @@ -1230,13 +1230,13 @@ public void shouldReturnDefaultProcessorWrapperClass() { @Test public void shouldAllowConfiguringProcessorWrapperWithClass() { - props.put(StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG, CountingProcessorWrapper.class); + props.put(StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); new StreamsConfig(props); } @Test public void shouldAllowConfiguringProcessorWrapperWithClassName() { - props.put(StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG, CountingProcessorWrapper.class.getName()); + props.put(StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class.getName()); new StreamsConfig(props); } diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java index 461aa1a29211..833eada8d5e4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java @@ -45,13 +45,14 @@ import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder; -import org.apache.kafka.streams.utils.TestUtils.CountingProcessorWrapper; +import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper; import org.apache.kafka.test.MockApiProcessorSupplier; import org.apache.kafka.test.MockKeyValueStore; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; import org.apache.kafka.test.StreamsTestUtils; +import org.hamcrest.Matchers; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -71,7 +72,6 @@ import java.util.Properties; import java.util.Random; import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import static java.time.Duration.ofMillis; @@ -2425,10 +2425,10 @@ public void readOnlyStateStoresShouldNotLog() { @Test public void shouldWrapProcessors() { final Map props = dummyStreamsConfigMap(); - props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, CountingProcessorWrapper.class); + props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); - final AtomicInteger wrappedProcessorCount = new AtomicInteger(); - props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessorCount); + final Set wrappedProcessors = Collections.synchronizedSet(new HashSet<>()); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors); final Topology topology = new Topology(new TopologyConfig(new StreamsConfig(props))); @@ -2453,7 +2453,8 @@ public void shouldWrapProcessors() { () -> (Processor) record -> System.out.println("Processing: " + random.nextInt()), "p2" ); - assertThat(wrappedProcessorCount.get(), is(3)); + assertThat(wrappedProcessors.size(), is(3)); + assertThat(wrappedProcessors, Matchers.containsInAnyOrder("p1", "p2", "p3")); } @SuppressWarnings("deprecation") diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java index 2988e14e720e..bf70d4768397 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java @@ -16,23 +16,29 @@ */ package org.apache.kafka.streams.kstream.internals.graph; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.internals.ConsumedInternal; import org.apache.kafka.streams.kstream.internals.KTableSource; import org.apache.kafka.streams.kstream.internals.MaterializedInternal; import org.apache.kafka.streams.kstream.internals.graph.TableSourceNode.TableSourceNodeBuilder; +import org.apache.kafka.streams.processor.api.ProcessorWrapper; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; +import org.apache.kafka.streams.state.KeyValueStore; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.STRICT_STUBS) @@ -43,6 +49,12 @@ public class TableSourceNodeTest { private InternalTopologyBuilder topologyBuilder = mock(InternalTopologyBuilder.class); + @BeforeEach + public void before() { + when(topologyBuilder.wrapProcessorSupplier(any(), any())) + .thenAnswer(iom -> ProcessorWrapper.asWrapped(iom.getArgument(1))); + } + @Test public void shouldConnectStateStoreToInputTopicIfInputTopicIsUsedAsChangelog() { final boolean shouldReuseSourceTopicForChangelog = true; @@ -59,12 +71,13 @@ public void shouldConnectStateStoreToChangelogTopic() { private void buildTableSourceNode(final boolean shouldReuseSourceTopicForChangelog) { final TableSourceNodeBuilder tableSourceNodeBuilder = TableSourceNode.tableSourceNodeBuilder(); + final MaterializedInternal> + materializedInternal = new MaterializedInternal<>(Materialized.as(STORE_NAME)); final TableSourceNode tableSourceNode = tableSourceNodeBuilder .withTopic(TOPIC) - .withMaterializedInternal(new MaterializedInternal<>(Materialized.as(STORE_NAME))) .withConsumedInternal(new ConsumedInternal<>(Consumed.as("node-name"))) .withProcessorParameters( - new ProcessorParameters<>(new KTableSource<>(STORE_NAME, STORE_NAME), null)) + new ProcessorParameters<>(new KTableSource<>(materializedInternal), null)) .build(); tableSourceNode.reuseSourceTopicForChangeLog(shouldReuseSourceTopicForChangelog); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java index 244a246bd203..e4f78c900d10 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java @@ -41,6 +41,7 @@ import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.test.MockStateRestoreListener; import org.apache.kafka.test.TestUtils; @@ -108,15 +109,17 @@ public void process(final Record record) { } }; + final StoreFactory storeFactory = + new KeyValueStoreMaterializer<>(materialized).withLoggingDisabled(); + final StoreBuilder storeBuilder = new StoreFactory.FactoryWrappingStoreBuilder<>(storeFactory); builder.addGlobalStore( - new KeyValueStoreMaterializer<>(materialized).withLoggingDisabled(), "sourceName", null, null, null, GLOBAL_STORE_TOPIC_NAME, "processorName", - processorSupplier, + new StoreDelegatingProcessorSupplier<>(processorSupplier, Set.of(storeBuilder)), false ); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 9d46569c27b7..e3add9755ae8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -95,7 +95,8 @@ public class InternalTopologyBuilderTest { private final Serde stringSerde = Serdes.String(); private final InternalTopologyBuilder builder = new InternalTopologyBuilder(); - private final StoreFactory storeBuilder = new MockKeyValueStoreBuilder("testStore", false).asFactory(); + private final StoreBuilder storeBuilder = new MockKeyValueStoreBuilder("testStore", false); + private final StoreFactory storeFactory = new MockKeyValueStoreBuilder("testStore", false).asFactory(); @Test public void shouldAddSourceWithOffsetReset() { @@ -225,7 +226,6 @@ public void testAddGlobalStoreWithBadSupplier() { final IllegalArgumentException exception = assertThrows( IllegalArgumentException.class, () -> builder.addGlobalStore( - new MockKeyValueStoreBuilder("global-store", false).asFactory().withLoggingDisabled(), "globalSource", null, null, @@ -331,18 +331,20 @@ public void testPatternAndNameSourceTopics() { @Test public void testPatternSourceTopicsWithGlobalTopics() { + final StoreBuilder storeBuilder = + new MockKeyValueStoreBuilder("global-store", false) + .withLoggingDisabled(); builder.setApplicationId("X"); builder.addSource(null, "source-1", null, null, null, Pattern.compile("topic-1")); builder.addSource(null, "source-2", null, null, null, Pattern.compile("topic-2")); builder.addGlobalStore( - new MockKeyValueStoreBuilder("global-store", false).asFactory().withLoggingDisabled(), "globalSource", null, null, null, "globalTopic", "global-processor", - new MockApiProcessorSupplier<>(), + new StoreDelegatingProcessorSupplier<>(new MockApiProcessorSupplier<>(), Set.of(storeBuilder)), false ); builder.initializeSubscription(); @@ -356,18 +358,20 @@ public void testPatternSourceTopicsWithGlobalTopics() { @Test public void testNameSourceTopicsWithGlobalTopics() { + final StoreBuilder storeBuilder = + new MockKeyValueStoreBuilder("global-store", false) + .withLoggingDisabled(); builder.setApplicationId("X"); builder.addSource(null, "source-1", null, null, null, "topic-1"); builder.addSource(null, "source-2", null, null, null, "topic-2"); builder.addGlobalStore( - new MockKeyValueStoreBuilder("global-store", false).asFactory().withLoggingDisabled(), "globalSource", null, null, null, "globalTopic", "global-processor", - new MockApiProcessorSupplier<>(), + new StoreDelegatingProcessorSupplier<>(new MockApiProcessorSupplier<>(), Set.of(storeBuilder)), false ); builder.initializeSubscription(); @@ -427,14 +431,14 @@ public void testNamedTopicMatchesAlreadyProvidedPattern() { @Test public void testAddStateStoreWithNonExistingProcessor() { - assertThrows(TopologyException.class, () -> builder.addStateStore(storeBuilder, "no-such-processor")); + assertThrows(TopologyException.class, () -> builder.addStateStore(storeFactory, "no-such-processor")); } @Test public void testAddStateStoreWithSource() { builder.addSource(null, "source-1", null, null, null, "topic-1"); try { - builder.addStateStore(storeBuilder, "source-1"); + builder.addStateStore(storeFactory, "source-1"); fail("Should throw TopologyException with store cannot be added to source"); } catch (final TopologyException expected) { /* ok */ } } @@ -444,7 +448,7 @@ public void testAddStateStoreWithSink() { builder.addSource(null, "source-1", null, null, null, "topic-1"); builder.addSink("sink-1", "topic-1", null, null, null, "source-1"); try { - builder.addStateStore(storeBuilder, "sink-1"); + builder.addStateStore(storeFactory, "sink-1"); fail("Should throw TopologyException with store cannot be added to sink"); } catch (final TopologyException expected) { /* ok */ } } @@ -454,7 +458,7 @@ public void shouldNotAllowToAddStoresWithSameName() { final StoreBuilder> otherBuilder = new MockKeyValueStoreBuilder("testStore", false); - builder.addStateStore(storeBuilder); + builder.addStateStore(storeFactory); final TopologyException exception = assertThrows( TopologyException.class, @@ -469,24 +473,23 @@ public void shouldNotAllowToAddStoresWithSameName() { @Test public void shouldNotAllowToAddStoresWithSameNameWhenFirstStoreIsGlobal() { - final StoreFactory globalBuilder = - new MockKeyValueStoreBuilder("testStore", false).asFactory().withLoggingDisabled(); + final StoreBuilder globalBuilder = + new MockKeyValueStoreBuilder("testStore", false).withLoggingDisabled(); builder.addGlobalStore( - globalBuilder, "global-store", null, null, null, "global-topic", "global-processor", - new MockApiProcessorSupplier<>(), + new StoreDelegatingProcessorSupplier<>(new MockApiProcessorSupplier<>(), Set.of(globalBuilder)), false ); final TopologyException exception = assertThrows( TopologyException.class, - () -> builder.addStateStore(storeBuilder) + () -> builder.addStateStore(storeFactory) ); assertThat( @@ -497,22 +500,21 @@ public void shouldNotAllowToAddStoresWithSameNameWhenFirstStoreIsGlobal() { @Test public void shouldNotAllowToAddStoresWithSameNameWhenSecondStoreIsGlobal() { - final StoreFactory globalBuilder = - new MockKeyValueStoreBuilder("testStore", false).asFactory().withLoggingDisabled(); + final StoreBuilder globalBuilder = + new MockKeyValueStoreBuilder("testStore", false).withLoggingDisabled(); - builder.addStateStore(storeBuilder); + builder.addStateStore(storeFactory); final TopologyException exception = assertThrows( TopologyException.class, () -> builder.addGlobalStore( - globalBuilder, "global-store", null, null, null, "global-topic", "global-processor", - new MockApiProcessorSupplier<>(), + new StoreDelegatingProcessorSupplier<>(new MockApiProcessorSupplier<>(), Set.of(globalBuilder)), false ) ); @@ -525,34 +527,32 @@ public void shouldNotAllowToAddStoresWithSameNameWhenSecondStoreIsGlobal() { @Test public void shouldNotAllowToAddGlobalStoresWithSameName() { - final StoreFactory firstGlobalBuilder = - new MockKeyValueStoreBuilder("testStore", false).asFactory().withLoggingDisabled(); - final StoreFactory secondGlobalBuilder = - new MockKeyValueStoreBuilder("testStore", false).asFactory().withLoggingDisabled(); + final StoreBuilder> firstGlobalBuilder = + new MockKeyValueStoreBuilder("testStore", false).withLoggingDisabled(); + final StoreBuilder> secondGlobalBuilder = + new MockKeyValueStoreBuilder("testStore", false).withLoggingDisabled(); builder.addGlobalStore( - firstGlobalBuilder, "global-store", null, null, null, "global-topic", "global-processor", - new MockApiProcessorSupplier<>(), + new StoreDelegatingProcessorSupplier<>(new MockApiProcessorSupplier<>(), Set.of(firstGlobalBuilder)), false ); final TopologyException exception = assertThrows( TopologyException.class, () -> builder.addGlobalStore( - secondGlobalBuilder, "global-store-2", null, null, null, "global-topic", "global-processor-2", - new MockApiProcessorSupplier<>(), + new StoreDelegatingProcessorSupplier<>(new MockApiProcessorSupplier<>(), Set.of(secondGlobalBuilder)), false ) ); @@ -565,35 +565,35 @@ public void shouldNotAllowToAddGlobalStoresWithSameName() { @Test public void testAddStateStore() { - builder.addStateStore(storeBuilder); + builder.addStateStore(storeFactory); builder.setApplicationId("X"); builder.addSource(null, "source-1", null, null, null, "topic-1"); builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(), "source-1"); assertEquals(0, builder.buildTopology().stateStores().size()); - builder.connectProcessorAndStateStores("processor-1", storeBuilder.name()); + builder.connectProcessorAndStateStores("processor-1", storeFactory.name()); final List suppliers = builder.buildTopology().stateStores(); assertEquals(1, suppliers.size()); - assertEquals(storeBuilder.name(), suppliers.get(0).name()); + assertEquals(storeFactory.name(), suppliers.get(0).name()); } @Test public void testStateStoreNamesForSubtopology() { - builder.addStateStore(storeBuilder); + builder.addStateStore(storeFactory); builder.setApplicationId("X"); builder.addSource(null, "source-1", null, null, null, "topic-1"); builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(), "source-1"); - builder.connectProcessorAndStateStores("processor-1", storeBuilder.name()); + builder.connectProcessorAndStateStores("processor-1", storeFactory.name()); builder.addSource(null, "source-2", null, null, null, "topic-2"); builder.addProcessor("processor-2", new MockApiProcessorSupplier<>(), "source-2"); builder.buildTopology(); final Set stateStoreNames = builder.stateStoreNamesForSubtopology(0); - assertThat(stateStoreNames, equalTo(Set.of(storeBuilder.name()))); + assertThat(stateStoreNames, equalTo(Set.of(storeFactory.name()))); final Set emptyStoreNames = builder.stateStoreNamesForSubtopology(1); assertThat(emptyStoreNames, equalTo(Set.of())); @@ -607,13 +607,13 @@ public void shouldAllowAddingSameStoreBuilderMultipleTimes() { builder.setApplicationId("X"); builder.addSource(null, "source-1", null, null, null, "topic-1"); - builder.addStateStore(storeBuilder); + builder.addStateStore(storeFactory); builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(), "source-1"); - builder.connectProcessorAndStateStores("processor-1", storeBuilder.name()); + builder.connectProcessorAndStateStores("processor-1", storeFactory.name()); - builder.addStateStore(storeBuilder); + builder.addStateStore(storeFactory); builder.addProcessor("processor-2", new MockApiProcessorSupplier<>(), "source-1"); - builder.connectProcessorAndStateStores("processor-2", storeBuilder.name()); + builder.connectProcessorAndStateStores("processor-2", storeFactory.name()); assertEquals(1, builder.buildTopology().stateStores().size()); } @@ -763,15 +763,16 @@ public void shouldAllowIncrementalBuilds() { assertNotEquals(oldNodeGroups, newNodeGroups); oldNodeGroups = newNodeGroups; + + final StoreBuilder globalBuilder = new MockKeyValueStoreBuilder("global-store", false).withLoggingDisabled(); builder.addGlobalStore( - new MockKeyValueStoreBuilder("global-store", false).asFactory().withLoggingDisabled(), "globalSource", null, null, null, "globalTopic", "global-processor", - new MockApiProcessorSupplier<>(), + new StoreDelegatingProcessorSupplier<>(new MockApiProcessorSupplier<>(), Set.of(globalBuilder)), false ); newNodeGroups = builder.nodeGroups(); @@ -879,7 +880,7 @@ private Set nodeNames(final Collection> nodes) public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() { builder.addSource(null, "source", null, null, null, "topic"); builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source"); - builder.addStateStore(storeBuilder, "processor"); + builder.addStateStore(storeFactory, "processor"); final Map> stateStoreNameToSourceTopic = builder.stateStoreNameToFullSourceTopicNames(); assertEquals(1, stateStoreNameToSourceTopic.size()); assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("testStore")); @@ -889,7 +890,7 @@ public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() { public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() { builder.addSource(null, "source", null, null, null, "topic"); builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source"); - builder.addStateStore(storeBuilder, "processor"); + builder.addStateStore(storeFactory, "processor"); final Map> stateStoreNameToSourceTopic = builder.stateStoreNameToFullSourceTopicNames(); assertEquals(1, stateStoreNameToSourceTopic.size()); assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("testStore")); @@ -901,7 +902,7 @@ public void shouldCorrectlyMapStateStoreToInternalTopics() { builder.addInternalTopic("internal-topic", InternalTopicProperties.empty()); builder.addSource(null, "source", null, null, null, "internal-topic"); builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source"); - builder.addStateStore(storeBuilder, "processor"); + builder.addStateStore(storeFactory, "processor"); final Map> stateStoreNameToSourceTopic = builder.stateStoreNameToFullSourceTopicNames(); assertEquals(1, stateStoreNameToSourceTopic.size()); assertEquals(Collections.singletonList("appId-internal-topic"), stateStoreNameToSourceTopic.get("testStore")); @@ -975,7 +976,7 @@ public void shouldAddInternalTopicConfigForNonWindowNonVersionedStores() { builder.setApplicationId("appId"); builder.addSource(null, "source", null, null, null, "topic"); builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source"); - builder.addStateStore(storeBuilder, "processor"); + builder.addStateStore(storeFactory, "processor"); builder.buildTopology(); final Map topicGroups = builder.subtopologyToTopicsInfo(); final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next(); @@ -1183,7 +1184,7 @@ public void shouldSortProcessorNodesCorrectly() { public void shouldConnectRegexMatchedTopicsToStateStore() { builder.addSource(null, "ingest", null, null, null, Pattern.compile("topic-\\d+")); builder.addProcessor("my-processor", new MockApiProcessorSupplier<>(), "ingest"); - builder.addStateStore(storeBuilder, "my-processor"); + builder.addStateStore(storeFactory, "my-processor"); final Set updatedTopics = new HashSet<>(); @@ -1195,7 +1196,7 @@ public void shouldConnectRegexMatchedTopicsToStateStore() { builder.setApplicationId("test-app"); final Map> stateStoreAndTopics = builder.stateStoreNameToFullSourceTopicNames(); - final List topics = stateStoreAndTopics.get(storeBuilder.name()); + final List topics = stateStoreAndTopics.get(storeFactory.name()); assertEquals(2, topics.size(), "Expected to contain two topics"); @@ -1208,14 +1209,13 @@ public void shouldConnectRegexMatchedTopicsToStateStore() { public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() { final String sameNameForSourceAndProcessor = "sameName"; assertThrows(TopologyException.class, () -> builder.addGlobalStore( - storeBuilder, sameNameForSourceAndProcessor, null, null, null, "anyTopicName", sameNameForSourceAndProcessor, - new MockApiProcessorSupplier<>(), + new StoreDelegatingProcessorSupplier<>(new MockApiProcessorSupplier<>(), Set.of(storeBuilder)), false )); } @@ -1351,16 +1351,17 @@ public void shouldHaveCorrectInternalTopicConfigWhenInternalTopicPropertiesAreNo public void shouldConnectGlobalStateStoreToInputTopic() { final String globalStoreName = "global-store"; final String globalTopic = "global-topic"; + final StoreBuilder storeBuilder = + new MockKeyValueStoreBuilder(globalStoreName, false).withLoggingDisabled(); builder.setApplicationId("X"); builder.addGlobalStore( - new MockKeyValueStoreBuilder(globalStoreName, false).asFactory().withLoggingDisabled(), "globalSource", null, null, null, globalTopic, "global-processor", - new MockApiProcessorSupplier<>(), + new StoreDelegatingProcessorSupplier<>(new MockApiProcessorSupplier<>(), Set.of(storeBuilder)), false ); builder.initializeSubscription(); diff --git a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java index 0f0bcf90698d..24ac2f2306db 100644 --- a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java @@ -28,10 +28,12 @@ import java.lang.reflect.Method; import java.time.Duration; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.Set; import java.util.stream.Collectors; import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG; @@ -115,30 +117,30 @@ public static Map dummyStreamsConfigMap() { * To retrieve the current count, pass an instance of AtomicInteger into the configs * alongside the wrapper itself. Use the config key defined with {@link #PROCESSOR_WRAPPER_COUNTER_CONFIG} */ - public static class CountingProcessorWrapper implements ProcessorWrapper { + public static class RecordingProcessorWrapper implements ProcessorWrapper { - private AtomicInteger wrappedProcessorCount; + private Set wrappedProcessorNames; @Override public void configure(final Map configs) { if (configs.containsKey(PROCESSOR_WRAPPER_COUNTER_CONFIG)) { - wrappedProcessorCount = (AtomicInteger) configs.get(PROCESSOR_WRAPPER_COUNTER_CONFIG); + wrappedProcessorNames = (Set) configs.get(PROCESSOR_WRAPPER_COUNTER_CONFIG); } else { - wrappedProcessorCount = new AtomicInteger(); + wrappedProcessorNames = Collections.synchronizedSet(new HashSet<>()); } } @Override public WrappedProcessorSupplier wrapProcessorSupplier(final String processorName, final ProcessorSupplier processorSupplier) { - wrappedProcessorCount.incrementAndGet(); + wrappedProcessorNames.add(processorName); return ProcessorWrapper.asWrapped(processorSupplier); } @Override public WrappedFixedKeyProcessorSupplier wrapFixedKeyProcessorSupplier(final String processorName, final FixedKeyProcessorSupplier processorSupplier) { - wrappedProcessorCount.incrementAndGet(); + wrappedProcessorNames.add(processorName); return ProcessorWrapper.asWrappedFixedKey(processorSupplier); } } diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java index 2faf89b16223..15c896ad0762 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java +++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java @@ -39,6 +39,6 @@ public MockKeyValueStore build() { } public StoreFactory asFactory() { - return new StoreBuilderWrapper(this); + return StoreBuilderWrapper.wrapStoreBuilder(this); } } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java index bb797a8887e3..86f41a729729 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java @@ -19,13 +19,14 @@ import kafka.api.AbstractSaslTest; import kafka.api.Both$; import kafka.security.JaasTestUtils; -import kafka.zk.ConfigEntityChangeNotificationZNode; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.errors.SaslAuthenticationException; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.metadata.storage.Formatter; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; @@ -38,8 +39,12 @@ import java.io.File; import java.io.IOException; import java.time.Duration; +import java.util.Arrays; import java.util.Collections; import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import scala.Option; import scala.Some$; @@ -89,9 +94,13 @@ public int brokerCount() { @Override public void configureSecurityBeforeServersStart(TestInfo testInfo) { super.configureSecurityBeforeServersStart(testInfo); - zkClient().makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path()); - // Create broker credentials before starting brokers - createScramCredentials(zkConnect(), JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD); + } + + @Override + public void addFormatterSettings(Formatter formatter) { + formatter.setClusterId("XcZZOzUqS4yHOjhMQB6JLQ"); + formatter.setScramArguments(Arrays.asList("SCRAM-SHA-256=[name=" + JaasTestUtils.KAFKA_SCRAM_ADMIN + + ",password=" + JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD + "]")); } @Override @@ -106,13 +115,13 @@ public void setUp(TestInfo testInfo) { startSasl(jaasSections(KAFKA_SERVER_SASL_MECHANISMS, Some$.MODULE$.apply(KAFKA_CLIENT_SASL_MECHANISM), Both$.MODULE$, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)); super.setUp(testInfo); - createTopic( - TOPIC, - NUM_PARTITIONS, - BROKER_COUNT, - new Properties(), - listenerName(), - new Properties()); + try (Admin admin = createPrivilegedAdminClient()) { + admin.createTopics(Collections.singletonList( + new NewTopic(TOPIC, NUM_PARTITIONS, (short) BROKER_COUNT))).all(). + get(5, TimeUnit.MINUTES); + } catch (ExecutionException | InterruptedException | TimeoutException e) { + throw new RuntimeException(e); + } } @AfterEach @@ -124,7 +133,7 @@ public void tearDown() { // NOTE: Not able to refer TestInfoUtils#TestWithParameterizedQuorumName() in the ParameterizedTest name. @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}") - @MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit") + @MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly") public void testConsumerGroupServiceWithAuthenticationFailure(String quorum, String groupProtocol) throws Exception { ConsumerGroupCommand.ConsumerGroupService consumerGroupService = prepareConsumerGroupService(); try (Consumer consumer = createConsumer()) { @@ -137,7 +146,7 @@ public void testConsumerGroupServiceWithAuthenticationFailure(String quorum, Str } @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}") - @MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit") + @MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly") public void testConsumerGroupServiceWithAuthenticationSuccess(String quorum, String groupProtocol) throws Exception { createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KAFKA_SCRAM_USER_2, JaasTestUtils.KAFKA_SCRAM_PASSWORD_2); ConsumerGroupCommand.ConsumerGroupService consumerGroupService = prepareConsumerGroupService();