From 70d6ff42debf7e17478beb899fb5756bfbdbfbb5 Mon Sep 17 00:00:00 2001 From: Josep Prat Date: Thu, 17 Oct 2024 17:52:35 +0200 Subject: [PATCH 1/4] Bump version to 3.8.1 --- docs/js/templateData.js | 2 +- gradle.properties | 2 +- streams/quickstart/java/pom.xml | 2 +- .../java/src/main/resources/archetype-resources/pom.xml | 2 +- streams/quickstart/pom.xml | 2 +- tests/kafkatest/__init__.py | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/js/templateData.js b/docs/js/templateData.js index 813a04ecad1e..fb49ca9b1847 100644 --- a/docs/js/templateData.js +++ b/docs/js/templateData.js @@ -19,6 +19,6 @@ limitations under the License. var context={ "version": "38", "dotVersion": "3.8", - "fullDotVersion": "3.8.1-SNAPSHOT", + "fullDotVersion": "3.8.1", "scalaVersion": "2.13" }; diff --git a/gradle.properties b/gradle.properties index 1b402a27a736..eb4113576016 100644 --- a/gradle.properties +++ b/gradle.properties @@ -23,7 +23,7 @@ group=org.apache.kafka # - streams/quickstart/pom.xml # - streams/quickstart/java/src/main/resources/archetype-resources/pom.xml # - streams/quickstart/java/pom.xml -version=3.8.1-SNAPSHOT +version=3.8.1 scalaVersion=2.13.14 # Adding swaggerVersion in gradle.properties to have a single version in place for swagger # New version of Swagger 2.2.14 requires minimum JDK 11. diff --git a/streams/quickstart/java/pom.xml b/streams/quickstart/java/pom.xml index b28e9e1cc701..124e19872841 100644 --- a/streams/quickstart/java/pom.xml +++ b/streams/quickstart/java/pom.xml @@ -26,7 +26,7 @@ org.apache.kafka streams-quickstart - 3.8.1-SNAPSHOT + 3.8.1 .. diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml index 8933bd35d679..086a01839fa6 100644 --- a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml +++ b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml @@ -29,7 +29,7 @@ UTF-8 - 3.8.1-SNAPSHOT + 3.8.1 1.7.36 diff --git a/streams/quickstart/pom.xml b/streams/quickstart/pom.xml index 8859807dfbd8..dac39dbb8f20 100644 --- a/streams/quickstart/pom.xml +++ b/streams/quickstart/pom.xml @@ -22,7 +22,7 @@ org.apache.kafka streams-quickstart pom - 3.8.1-SNAPSHOT + 3.8.1 Kafka Streams :: Quickstart diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py index f41f1c06cb2a..9941ad4db5ab 100644 --- a/tests/kafkatest/__init__.py +++ b/tests/kafkatest/__init__.py @@ -22,4 +22,4 @@ # Instead, in development branches, the version should have a suffix of the form ".devN" # # For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be something like "1.0.0.dev0" -__version__ = '3.8.1.dev0' +__version__ = '3.8.1' From 129c4cc29f080bf3f39cf5ca218ba525ca6999fc Mon Sep 17 00:00:00 2001 From: Josep Prat Date: Tue, 29 Oct 2024 13:31:09 +0100 Subject: [PATCH 2/4] MINOR: Update 3.8 branch to 3.8.2-SNAPSHOT Signed-off-by: Josep Prat --- docs/js/templateData.js | 2 +- gradle.properties | 2 +- kafka-merge-pr.py | 2 +- streams/quickstart/java/pom.xml | 2 +- .../java/src/main/resources/archetype-resources/pom.xml | 2 +- streams/quickstart/pom.xml | 2 +- tests/kafkatest/__init__.py | 2 +- tests/kafkatest/version.py | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/js/templateData.js b/docs/js/templateData.js index fb49ca9b1847..d5a1a729674a 100644 --- a/docs/js/templateData.js +++ b/docs/js/templateData.js @@ -19,6 +19,6 @@ limitations under the License. var context={ "version": "38", "dotVersion": "3.8", - "fullDotVersion": "3.8.1", + "fullDotVersion": "3.8.2-SNAPSHOT", "scalaVersion": "2.13" }; diff --git a/gradle.properties b/gradle.properties index eb4113576016..7c2480a654b5 100644 --- a/gradle.properties +++ b/gradle.properties @@ -23,7 +23,7 @@ group=org.apache.kafka # - streams/quickstart/pom.xml # - streams/quickstart/java/src/main/resources/archetype-resources/pom.xml # - streams/quickstart/java/pom.xml -version=3.8.1 +version=3.8.2-SNAPSHOT scalaVersion=2.13.14 # Adding swaggerVersion in gradle.properties to have a single version in place for swagger # New version of Swagger 2.2.14 requires minimum JDK 11. diff --git a/kafka-merge-pr.py b/kafka-merge-pr.py index 05d192ac18d3..45da88e89ad3 100755 --- a/kafka-merge-pr.py +++ b/kafka-merge-pr.py @@ -70,7 +70,7 @@ DEV_BRANCH_NAME = "trunk" -DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "3.8.1-SNAPSHOT") +DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "3.8.2-SNAPSHOT") ORIGINAL_HEAD = "" diff --git a/streams/quickstart/java/pom.xml b/streams/quickstart/java/pom.xml index 124e19872841..8e089d535b10 100644 --- a/streams/quickstart/java/pom.xml +++ b/streams/quickstart/java/pom.xml @@ -26,7 +26,7 @@ org.apache.kafka streams-quickstart - 3.8.1 + 3.8.2-SNAPSHOT .. diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml index 086a01839fa6..8e6f27346fc6 100644 --- a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml +++ b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml @@ -29,7 +29,7 @@ UTF-8 - 3.8.1 + 3.8.2-SNAPSHOT 1.7.36 diff --git a/streams/quickstart/pom.xml b/streams/quickstart/pom.xml index dac39dbb8f20..2bc72814d92a 100644 --- a/streams/quickstart/pom.xml +++ b/streams/quickstart/pom.xml @@ -22,7 +22,7 @@ org.apache.kafka streams-quickstart pom - 3.8.1 + 3.8.2-SNAPSHOT Kafka Streams :: Quickstart diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py index 9941ad4db5ab..d9e6b1829b8b 100644 --- a/tests/kafkatest/__init__.py +++ b/tests/kafkatest/__init__.py @@ -22,4 +22,4 @@ # Instead, in development branches, the version should have a suffix of the form ".devN" # # For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be something like "1.0.0.dev0" -__version__ = '3.8.1' +__version__ = '3.8.2-SNAPSHOT' diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 93dbcc5524fd..5d69e0e13a93 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -122,7 +122,7 @@ def get_version(node=None): return DEV_BRANCH DEV_BRANCH = KafkaVersion("dev") -DEV_VERSION = KafkaVersion("3.8.1-SNAPSHOT") +DEV_VERSION = KafkaVersion("3.8.2-SNAPSHOT") # This should match the LATEST_PRODUCTION version defined in MetadataVersion.java LATEST_STABLE_METADATA_VERSION = "3.8" From b2638a4533109ccf2829a16c36325fa6c874ea0f Mon Sep 17 00:00:00 2001 From: Jonah Hooper Date: Thu, 31 Oct 2024 16:54:03 -0400 Subject: [PATCH 3/4] [KAFKA-17870] Fail CreateTopicsRequest if total number of partitions exceeds 10k (#17604) We fail the entire CreateTopicsRequest action if there are more than 10k total partitions being created in this topic for this specific request. The usual pattern for this API to try and succeed with some topics. Since the 10k limit applies to all topics then no topic should be created if they all exceede it. Reviewers: Colin P. McCabe --- .../kafka/server/KRaftClusterTest.scala | 2 +- .../controller/ReplicationControlManager.java | 31 +++++++++++++++ .../ReplicationControlManagerTest.java | 38 +++++++++++++++++++ 3 files changed, 70 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index 8be2b994116f..59e51d8505f9 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -1288,7 +1288,7 @@ class KRaftClusterTest { () => admin.createTopics(newTopics).all().get()) assertNotNull(executionException.getCause) assertEquals(classOf[PolicyViolationException], executionException.getCause.getClass) - assertEquals("Unable to perform excessively large batch operation.", + assertEquals("Excessively large number of partitions per request.", executionException.getCause.getMessage) } finally { admin.close() diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 9b412ad105da..482ac2c4aee9 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -151,6 +151,7 @@ */ public class ReplicationControlManager { static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000; + static final int MAX_PARTITIONS_PER_BATCH = 10_000; static class Builder { private SnapshotRegistry snapshotRegistry = null; @@ -605,6 +606,8 @@ ControllerResult createTopics( Map topicErrors = new HashMap<>(); List records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP); + validateTotalNumberOfPartitions(request, defaultNumPartitions); + // Check the topic names. validateNewTopicNames(topicErrors, request.topics(), topicsWithCollisionChars); @@ -1150,6 +1153,34 @@ ControllerResult alterPartition( return ControllerResult.of(records, response); } + /** + * Validates that a batch of topics will create less than {@value MAX_PARTITIONS_PER_BATCH}. Exceeding this number of topics per batch + * has led to out-of-memory exceptions. We use this validation to fail earlier to avoid allocating the memory. + * Validates an upper bound number of partitions. The actual number may be smaller if some topics are misconfigured. + * + * @param request a batch of topics to create. + * @param defaultNumPartitions default number of partitions to assign if unspecified. + * @throws PolicyViolationException if total number of partitions exceeds {@value MAX_PARTITIONS_PER_BATCH}. + */ + static void validateTotalNumberOfPartitions(CreateTopicsRequestData request, int defaultNumPartitions) { + int totalPartitions = 0; + for (CreatableTopic topic: request.topics()) { + if (topic.assignments().isEmpty()) { + if (topic.numPartitions() == -1) { + totalPartitions += defaultNumPartitions; + } else if (topic.numPartitions() > 0) { + totalPartitions += topic.numPartitions(); + } + } else { + totalPartitions += topic.assignments().size(); + } + + } + if (totalPartitions > MAX_PARTITIONS_PER_BATCH) { + throw new PolicyViolationException("Excessively large number of partitions per request."); + } + } + /** * Validate the partition information included in the alter partition request. * diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index a5cfcce07b00..27c82caeab3c 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -556,6 +556,44 @@ public void configure(Map configs) { } } + @Test + public void testExcessiveNumberOfTopicsCannotBeCreated() { + // number of partitions is explicitly set without assignments + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + CreateTopicsRequestData request = new CreateTopicsRequestData(); + request.topics().add(new CreatableTopic().setName("foo"). + setNumPartitions(5000).setReplicationFactor((short) 1)); + request.topics().add(new CreatableTopic().setName("bar"). + setNumPartitions(5000).setReplicationFactor((short) 1)); + request.topics().add(new CreatableTopic().setName("baz"). + setNumPartitions(1).setReplicationFactor((short) 1)); + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + PolicyViolationException error = assertThrows( + PolicyViolationException.class, + () -> replicationControl.createTopics(requestContext, request, Set.of("foo", "bar", "baz"))); + assertEquals(error.getMessage(), "Excessively large number of partitions per request."); + } + + @Test + public void testExcessiveNumberOfTopicsCannotBeCreatedWithAssignments() { + CreateTopicsRequestData request = new CreateTopicsRequestData(); + request.topics().add(new CreatableTopic().setName("foo"). + setNumPartitions(-1).setReplicationFactor((short) 1)); + CreateTopicsRequestData.CreatableReplicaAssignmentCollection assignments = + new CreateTopicsRequestData.CreatableReplicaAssignmentCollection(); + assignments.add(new CreatableReplicaAssignment().setPartitionIndex(1)); + assignments.add(new CreatableReplicaAssignment().setPartitionIndex(2)); + request.topics().add(new CreatableTopic() + .setName("baz") + .setAssignments(assignments)); + PolicyViolationException error = assertThrows( + PolicyViolationException.class, + () -> ReplicationControlManager.validateTotalNumberOfPartitions(request, 9999) + ); + assertEquals(error.getMessage(), "Excessively large number of partitions per request."); + } + @Test public void testCreateTopics() { ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build(); From 14bd65149e353164b8750d306a3da49f1e5b8ce5 Mon Sep 17 00:00:00 2001 From: ShivsundarR Date: Wed, 6 Nov 2024 15:13:01 -0500 Subject: [PATCH 4/4] Removed Set.of usage (#17703) Reviewers: Lianet Magrans , Federico Valeri , Chia-Ping Tsai --- .../apache/kafka/controller/ReplicationControlManagerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 27c82caeab3c..924763dc7b14 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -119,6 +119,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; @@ -571,7 +572,7 @@ public void testExcessiveNumberOfTopicsCannotBeCreated() { ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); PolicyViolationException error = assertThrows( PolicyViolationException.class, - () -> replicationControl.createTopics(requestContext, request, Set.of("foo", "bar", "baz"))); + () -> replicationControl.createTopics(requestContext, request, Stream.of("foo", "bar", "baz").collect(Collectors.toSet()))); assertEquals(error.getMessage(), "Excessively large number of partitions per request."); }