Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,26 +166,41 @@ SnapshotRegistry snapshotRegistry() {
* @return The result.
*/
ControllerResult<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges,
boolean newlyCreatedResource) {
Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges,
boolean newlyCreatedResource
) {
List<ApiMessageAndVersion> outputRecords = new ArrayList<>();
Map<ConfigResource, ApiError> outputResults = new HashMap<>();
for (Entry<ConfigResource, Map<String, Entry<OpType, String>>> resourceEntry :
configChanges.entrySet()) {
incrementalAlterConfigResource(resourceEntry.getKey(),
ApiError apiError = incrementalAlterConfigResource(resourceEntry.getKey(),
resourceEntry.getValue(),
newlyCreatedResource,
outputRecords,
outputResults);
outputRecords);
outputResults.put(resourceEntry.getKey(), apiError);
}
return ControllerResult.atomicOf(outputRecords, outputResults);
}

private void incrementalAlterConfigResource(ConfigResource configResource,
Map<String, Entry<OpType, String>> keysToOps,
boolean newlyCreatedResource,
List<ApiMessageAndVersion> outputRecords,
Map<ConfigResource, ApiError> outputResults) {
ControllerResult<ApiError> incrementalAlterConfig(
ConfigResource configResource,
Map<String, Entry<OpType, String>> keyToOps,
boolean newlyCreatedResource
) {
List<ApiMessageAndVersion> outputRecords = new ArrayList<>();
ApiError apiError = incrementalAlterConfigResource(configResource,
keyToOps,
newlyCreatedResource,
outputRecords);
return ControllerResult.atomicOf(outputRecords, apiError);
}

private ApiError incrementalAlterConfigResource(
ConfigResource configResource,
Map<String, Entry<OpType, String>> keysToOps,
boolean newlyCreatedResource,
List<ApiMessageAndVersion> outputRecords
) {
List<ApiMessageAndVersion> newRecords = new ArrayList<>();
for (Entry<String, Entry<OpType, String>> keysToOpsEntry : keysToOps.entrySet()) {
String key = keysToOpsEntry.getKey();
Expand All @@ -208,10 +223,9 @@ private void incrementalAlterConfigResource(ConfigResource configResource,
case APPEND:
case SUBTRACT:
if (!configSchema.isSplittable(configResource.type(), key)) {
outputResults.put(configResource, new ApiError(
return new ApiError(
INVALID_CONFIG, "Can't " + opType + " to " +
"key " + key + " because its type is not LIST."));
return;
"key " + key + " because its type is not LIST.");
}
List<String> oldValueList = getParts(newValue, key, configResource);
if (opType == APPEND) {
Expand Down Expand Up @@ -239,11 +253,10 @@ private void incrementalAlterConfigResource(ConfigResource configResource,
}
ApiError error = validateAlterConfig(configResource, newRecords, Collections.emptyList(), newlyCreatedResource);
if (error.isFailure()) {
outputResults.put(configResource, error);
return;
return error;
}
outputRecords.addAll(newRecords);
outputResults.put(configResource, ApiError.NONE);
return ApiError.NONE;
}

private ApiError validateAlterConfig(ConfigResource configResource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,25 +536,33 @@ public void replay(RemoveTopicRecord record) {
"Topic '" + t.name() + "' already exists.")));

// Verify that the configurations for the new topics are OK, and figure out what
// ConfigRecords should be created.
// configurations should be created.
Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges =
computeConfigChanges(topicErrors, request.topics());
ControllerResult<Map<ConfigResource, ApiError>> configResult =
configurationControl.incrementalAlterConfigs(configChanges, true);
for (Entry<ConfigResource, ApiError> entry : configResult.response().entrySet()) {
if (entry.getValue().isFailure()) {
topicErrors.put(entry.getKey().name(), entry.getValue());
}
}
records.addAll(configResult.records());

// Try to create whatever topics are needed.
Map<String, CreatableTopicResult> successes = new HashMap<>();
for (CreatableTopic topic : request.topics()) {
if (topicErrors.containsKey(topic.name())) continue;
// Figure out what ConfigRecords should be created, if any.
ConfigResource configResource = new ConfigResource(TOPIC, topic.name());
Map<String, Entry<OpType, String>> keyToOps = configChanges.get(configResource);
List<ApiMessageAndVersion> configRecords;
if (keyToOps != null) {
ControllerResult<ApiError> configResult =
configurationControl.incrementalAlterConfig(configResource, keyToOps, true);
if (configResult.response().isFailure()) {
topicErrors.put(topic.name(), configResult.response());
continue;
} else {
configRecords = configResult.records();
}
} else {
configRecords = Collections.emptyList();
}
ApiError error;
try {
error = createTopic(topic, records, successes, describable.contains(topic.name()));
error = createTopic(topic, records, successes, configRecords, describable.contains(topic.name()));
} catch (ApiException e) {
error = ApiError.fromThrowable(e);
}
Expand Down Expand Up @@ -597,6 +605,7 @@ public void replay(RemoveTopicRecord record) {
private ApiError createTopic(CreatableTopic topic,
List<ApiMessageAndVersion> records,
Map<String, CreatableTopicResult> successes,
List<ApiMessageAndVersion> configRecords,
boolean authorizedToReturnConfigs) {
Map<String, String> creationConfigs = translateCreationConfigs(topic.configs());
Map<Integer, PartitionRegistration> newParts = new HashMap<>();
Expand Down Expand Up @@ -725,6 +734,8 @@ private ApiError createTopic(CreatableTopic topic,
records.add(new ApiMessageAndVersion(new TopicRecord().
setName(topic.name()).
setTopicId(topicId), (short) 0));
// ConfigRecords go after TopicRecord but before PartitionRecord(s).
records.addAll(configRecords);
for (Entry<Integer, PartitionRegistration> partEntry : newParts.entrySet()) {
int partitionIndex = partEntry.getKey();
PartitionRegistration info = partEntry.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kafka.controller;

import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.PolicyViolationException;
Expand Down Expand Up @@ -168,6 +169,30 @@ public void testIncrementalAlterConfigs() {
true));
}

@Test
public void testIncrementalAlterConfig() {
ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
setKafkaConfigSchema(SCHEMA).
build();
Map<String, Entry<AlterConfigOp.OpType, String>> keyToOps = toMap(entry("abc", entry(APPEND, "123")));

ControllerResult<ApiError> result = manager.
incrementalAlterConfig(MYTOPIC, keyToOps, true);

assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
setName("abc").setValue("123"), CONFIG_RECORD.highestSupportedVersion())),
ApiError.NONE), result);

RecordTestUtils.replayAll(manager, result.records());

assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
setName("abc").setValue(null), CONFIG_RECORD.highestSupportedVersion())),
ApiError.NONE),
manager.incrementalAlterConfig(MYTOPIC, toMap(entry("abc", entry(DELETE, "xyz"))), true));
}

@Test
public void testIncrementalAlterMultipleConfigValues() {
ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
Expand Down Expand Up @@ -580,6 +581,17 @@ public void testCreateTopicsWithConfigs() throws Exception {
replicationControl.createTopics(request1, Collections.singleton("foo"));
assertEquals((short) 0, result1.response().topics().find("foo").errorCode());

List<ApiMessageAndVersion> records1 = result1.records();
assertEquals(3, records1.size());
ApiMessageAndVersion record0 = records1.get(0);
assertEquals(TopicRecord.class, record0.message().getClass());

ApiMessageAndVersion record1 = records1.get(1);
assertEquals(ConfigRecord.class, record1.message().getClass());

ApiMessageAndVersion lastRecord = records1.get(2);
assertEquals(PartitionRecord.class, lastRecord.message().getClass());

ctx.replay(result1.records());
assertEquals(
"notNull",
Expand All @@ -605,6 +617,49 @@ public void testCreateTopicsWithConfigs() throws Exception {
"Null value not supported for topic configs: foo",
result2.response().topics().find("bar").errorMessage()
);

CreateTopicsRequestData request3 = new CreateTopicsRequestData();
request3.topics().add(new CreatableTopic().setName("baz")
.setNumPartitions(-1).setReplicationFactor((short) -2)
.setConfigs(validConfigs));

ControllerResult<CreateTopicsResponseData> result3 =
replicationControl.createTopics(request3, Collections.singleton("baz"));
assertEquals(INVALID_REPLICATION_FACTOR.code(), result3.response().topics().find("baz").errorCode());
assertEquals(Collections.emptyList(), result3.records());

// Test request with multiple topics together.
CreateTopicsRequestData request4 = new CreateTopicsRequestData();
String batchedTopic1 = "batched-topic-1";
request4.topics().add(new CreatableTopic().setName(batchedTopic1)
.setNumPartitions(-1).setReplicationFactor((short) -1)
.setConfigs(validConfigs));
String batchedTopic2 = "batched-topic2";
request4.topics().add(new CreatableTopic().setName(batchedTopic2)
.setNumPartitions(-1).setReplicationFactor((short) -2)
.setConfigs(validConfigs));

Set<String> request4Topics = new HashSet<>();
request4Topics.add(batchedTopic1);
request4Topics.add(batchedTopic2);
ControllerResult<CreateTopicsResponseData> result4 =
replicationControl.createTopics(request4, request4Topics);

assertEquals(Errors.NONE.code(), result4.response().topics().find(batchedTopic1).errorCode());
assertEquals(INVALID_REPLICATION_FACTOR.code(), result4.response().topics().find(batchedTopic2).errorCode());

assertEquals(3, result4.records().size());
assertEquals(TopicRecord.class, result4.records().get(0).message().getClass());
TopicRecord batchedTopic1Record = (TopicRecord) result4.records().get(0).message();
assertEquals(batchedTopic1, batchedTopic1Record.name());
assertEquals(new ConfigRecord()
.setResourceName(batchedTopic1)
.setResourceType(ConfigResource.Type.TOPIC.id())
.setName("foo")
.setValue("notNull"),
result4.records().get(1).message());
assertEquals(PartitionRecord.class, result4.records().get(2).message().getClass());
assertEquals(batchedTopic1Record.topicId(), ((PartitionRecord) result4.records().get(2).message()).topicId());
}

@Test
Expand Down