diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index b5d71230cf473..2cbf51c9baf80 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -166,26 +166,41 @@ SnapshotRegistry snapshotRegistry() { * @return The result. */ ControllerResult> incrementalAlterConfigs( - Map>> configChanges, - boolean newlyCreatedResource) { + Map>> configChanges, + boolean newlyCreatedResource + ) { List outputRecords = new ArrayList<>(); Map outputResults = new HashMap<>(); for (Entry>> resourceEntry : configChanges.entrySet()) { - incrementalAlterConfigResource(resourceEntry.getKey(), + ApiError apiError = incrementalAlterConfigResource(resourceEntry.getKey(), resourceEntry.getValue(), newlyCreatedResource, - outputRecords, - outputResults); + outputRecords); + outputResults.put(resourceEntry.getKey(), apiError); } return ControllerResult.atomicOf(outputRecords, outputResults); } - private void incrementalAlterConfigResource(ConfigResource configResource, - Map> keysToOps, - boolean newlyCreatedResource, - List outputRecords, - Map outputResults) { + ControllerResult incrementalAlterConfig( + ConfigResource configResource, + Map> keyToOps, + boolean newlyCreatedResource + ) { + List outputRecords = new ArrayList<>(); + ApiError apiError = incrementalAlterConfigResource(configResource, + keyToOps, + newlyCreatedResource, + outputRecords); + return ControllerResult.atomicOf(outputRecords, apiError); + } + + private ApiError incrementalAlterConfigResource( + ConfigResource configResource, + Map> keysToOps, + boolean newlyCreatedResource, + List outputRecords + ) { List newRecords = new ArrayList<>(); for (Entry> keysToOpsEntry : keysToOps.entrySet()) { String key = keysToOpsEntry.getKey(); @@ -208,10 +223,9 @@ private void incrementalAlterConfigResource(ConfigResource configResource, case APPEND: case SUBTRACT: if (!configSchema.isSplittable(configResource.type(), key)) { - outputResults.put(configResource, new ApiError( + return new ApiError( INVALID_CONFIG, "Can't " + opType + " to " + - "key " + key + " because its type is not LIST.")); - return; + "key " + key + " because its type is not LIST."); } List oldValueList = getParts(newValue, key, configResource); if (opType == APPEND) { @@ -239,11 +253,10 @@ private void incrementalAlterConfigResource(ConfigResource configResource, } ApiError error = validateAlterConfig(configResource, newRecords, Collections.emptyList(), newlyCreatedResource); if (error.isFailure()) { - outputResults.put(configResource, error); - return; + return error; } outputRecords.addAll(newRecords); - outputResults.put(configResource, ApiError.NONE); + return ApiError.NONE; } private ApiError validateAlterConfig(ConfigResource configResource, diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 63fda45d54179..ebe2c56f57888 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -536,25 +536,33 @@ public void replay(RemoveTopicRecord record) { "Topic '" + t.name() + "' already exists."))); // Verify that the configurations for the new topics are OK, and figure out what - // ConfigRecords should be created. + // configurations should be created. Map>> configChanges = computeConfigChanges(topicErrors, request.topics()); - ControllerResult> configResult = - configurationControl.incrementalAlterConfigs(configChanges, true); - for (Entry entry : configResult.response().entrySet()) { - if (entry.getValue().isFailure()) { - topicErrors.put(entry.getKey().name(), entry.getValue()); - } - } - records.addAll(configResult.records()); // Try to create whatever topics are needed. Map successes = new HashMap<>(); for (CreatableTopic topic : request.topics()) { if (topicErrors.containsKey(topic.name())) continue; + // Figure out what ConfigRecords should be created, if any. + ConfigResource configResource = new ConfigResource(TOPIC, topic.name()); + Map> keyToOps = configChanges.get(configResource); + List configRecords; + if (keyToOps != null) { + ControllerResult configResult = + configurationControl.incrementalAlterConfig(configResource, keyToOps, true); + if (configResult.response().isFailure()) { + topicErrors.put(topic.name(), configResult.response()); + continue; + } else { + configRecords = configResult.records(); + } + } else { + configRecords = Collections.emptyList(); + } ApiError error; try { - error = createTopic(topic, records, successes, describable.contains(topic.name())); + error = createTopic(topic, records, successes, configRecords, describable.contains(topic.name())); } catch (ApiException e) { error = ApiError.fromThrowable(e); } @@ -597,6 +605,7 @@ public void replay(RemoveTopicRecord record) { private ApiError createTopic(CreatableTopic topic, List records, Map successes, + List configRecords, boolean authorizedToReturnConfigs) { Map creationConfigs = translateCreationConfigs(topic.configs()); Map newParts = new HashMap<>(); @@ -725,6 +734,8 @@ private ApiError createTopic(CreatableTopic topic, records.add(new ApiMessageAndVersion(new TopicRecord(). setName(topic.name()). setTopicId(topicId), (short) 0)); + // ConfigRecords go after TopicRecord but before PartitionRecord(s). + records.addAll(configRecords); for (Entry partEntry : newParts.entrySet()) { int partitionIndex = partEntry.getKey(); PartitionRegistration info = partEntry.getValue(); diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java index b4ba2e5bd6a7c..175202a8d6303 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.controller; +import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.PolicyViolationException; @@ -168,6 +169,30 @@ public void testIncrementalAlterConfigs() { true)); } + @Test + public void testIncrementalAlterConfig() { + ConfigurationControlManager manager = new ConfigurationControlManager.Builder(). + setKafkaConfigSchema(SCHEMA). + build(); + Map> keyToOps = toMap(entry("abc", entry(APPEND, "123"))); + + ControllerResult result = manager. + incrementalAlterConfig(MYTOPIC, keyToOps, true); + + assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion( + new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic"). + setName("abc").setValue("123"), CONFIG_RECORD.highestSupportedVersion())), + ApiError.NONE), result); + + RecordTestUtils.replayAll(manager, result.records()); + + assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion( + new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic"). + setName("abc").setValue(null), CONFIG_RECORD.highestSupportedVersion())), + ApiError.NONE), + manager.incrementalAlterConfig(MYTOPIC, toMap(entry("abc", entry(DELETE, "xyz"))), true)); + } + @Test public void testIncrementalAlterMultipleConfigValues() { ConfigurationControlManager manager = new ConfigurationControlManager.Builder(). diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index d35cd6de2fe90..2d46b6ec46f29 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -61,6 +61,7 @@ import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.PartitionRecord; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; @@ -580,6 +581,17 @@ public void testCreateTopicsWithConfigs() throws Exception { replicationControl.createTopics(request1, Collections.singleton("foo")); assertEquals((short) 0, result1.response().topics().find("foo").errorCode()); + List records1 = result1.records(); + assertEquals(3, records1.size()); + ApiMessageAndVersion record0 = records1.get(0); + assertEquals(TopicRecord.class, record0.message().getClass()); + + ApiMessageAndVersion record1 = records1.get(1); + assertEquals(ConfigRecord.class, record1.message().getClass()); + + ApiMessageAndVersion lastRecord = records1.get(2); + assertEquals(PartitionRecord.class, lastRecord.message().getClass()); + ctx.replay(result1.records()); assertEquals( "notNull", @@ -605,6 +617,49 @@ public void testCreateTopicsWithConfigs() throws Exception { "Null value not supported for topic configs: foo", result2.response().topics().find("bar").errorMessage() ); + + CreateTopicsRequestData request3 = new CreateTopicsRequestData(); + request3.topics().add(new CreatableTopic().setName("baz") + .setNumPartitions(-1).setReplicationFactor((short) -2) + .setConfigs(validConfigs)); + + ControllerResult result3 = + replicationControl.createTopics(request3, Collections.singleton("baz")); + assertEquals(INVALID_REPLICATION_FACTOR.code(), result3.response().topics().find("baz").errorCode()); + assertEquals(Collections.emptyList(), result3.records()); + + // Test request with multiple topics together. + CreateTopicsRequestData request4 = new CreateTopicsRequestData(); + String batchedTopic1 = "batched-topic-1"; + request4.topics().add(new CreatableTopic().setName(batchedTopic1) + .setNumPartitions(-1).setReplicationFactor((short) -1) + .setConfigs(validConfigs)); + String batchedTopic2 = "batched-topic2"; + request4.topics().add(new CreatableTopic().setName(batchedTopic2) + .setNumPartitions(-1).setReplicationFactor((short) -2) + .setConfigs(validConfigs)); + + Set request4Topics = new HashSet<>(); + request4Topics.add(batchedTopic1); + request4Topics.add(batchedTopic2); + ControllerResult result4 = + replicationControl.createTopics(request4, request4Topics); + + assertEquals(Errors.NONE.code(), result4.response().topics().find(batchedTopic1).errorCode()); + assertEquals(INVALID_REPLICATION_FACTOR.code(), result4.response().topics().find(batchedTopic2).errorCode()); + + assertEquals(3, result4.records().size()); + assertEquals(TopicRecord.class, result4.records().get(0).message().getClass()); + TopicRecord batchedTopic1Record = (TopicRecord) result4.records().get(0).message(); + assertEquals(batchedTopic1, batchedTopic1Record.name()); + assertEquals(new ConfigRecord() + .setResourceName(batchedTopic1) + .setResourceType(ConfigResource.Type.TOPIC.id()) + .setName("foo") + .setValue("notNull"), + result4.records().get(1).message()); + assertEquals(PartitionRecord.class, result4.records().get(2).message().getClass()); + assertEquals(batchedTopic1Record.topicId(), ((PartitionRecord) result4.records().get(2).message()).topicId()); } @Test