Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
Expand Down Expand Up @@ -251,6 +252,41 @@ default DeleteTopicsResult deleteTopics(Collection<String> topics) {
* @return The DeleteTopicsResult.
*/
DeleteTopicsResult deleteTopics(Collection<String> topics, DeleteTopicsOptions options);

/**
* This is a convenience method for {@link #deleteTopicsWithIds(Collection, DeleteTopicsOptions)}
* with default options. See the overload for more details.
* <p>
* This operation is supported by brokers with version 2.8.0 or higher.
*
* @param topics The topic IDs for the topics to delete.
* @return The DeleteTopicsWithIdsResult.
*/
default DeleteTopicsWithIdsResult deleteTopicsWithIds(Collection<Uuid> topics) {
return deleteTopicsWithIds(topics, new DeleteTopicsOptions());
}

/**
* Delete a batch of topics.
* <p>
* This operation is not transactional so it may succeed for some topics while fail for others.
* <p>
* It may take several seconds after the {@link DeleteTopicsWithIdsResult} returns
* success for all the brokers to become aware that the topics are gone.
* During this time, {@link #listTopics()} and {@link #describeTopics(Collection)}
* may continue to return information about the deleted topics.
* <p>
* If delete.topic.enable is false on the brokers, deleteTopicsWithIds will mark
* the topics for deletion, but not actually delete them. The futures will
* return successfully in this case.
* <p>
* This operation is supported by brokers with version 2.8.0 or higher.
*
* @param topics The topic IDs for the topics to delete.
* @param options The options to use when deleting the topics.
* @return The DeleteTopicsWithIdsResult.
*/
DeleteTopicsWithIdsResult deleteTopicsWithIds(Collection<Uuid> topics, DeleteTopicsOptions options);

/**
* List the topics available in the cluster with the default options.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.errors.ApiException;

Expand Down Expand Up @@ -68,6 +69,19 @@ public KafkaFuture<Config> config(String topic) {
return futures.get(topic).thenApply(TopicMetadataAndConfig::config);
}

/**
* Returns a future that provides topic ID for the topic when the request completes.
* <p>
* If broker version doesn't support replication factor in the response, throw
* {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
* If broker returned an error for topic configs, throw appropriate exception. For example,
* {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user does not
* have permission to describe topic configs.
*/
public KafkaFuture<Uuid> topicId(String topic) {
return futures.get(topic).thenApply(TopicMetadataAndConfig::topicId);
}

/**
* Returns a future that provides number of partitions in the topic when the request completes.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed all these comments say "If broker version doesn't support replication factor in the response..." should each actually say whatever the method is returning (numPartitions, topicId, etc.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the create result, we wouldn't see UnsupportedVersionException for topicId() just because topic ids are not enabled, would we?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could either do that or just return Uuid.ZERO_UUID

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning Uuid.ZERO_UUID should be fine. The comment just seems odd, I am not sure why we have it that way in the other methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it was odd too.

* <p>
Expand Down Expand Up @@ -96,23 +110,31 @@ public KafkaFuture<Integer> replicationFactor(String topic) {

public static class TopicMetadataAndConfig {
private final ApiException exception;
private final Uuid topicId;
private final int numPartitions;
private final int replicationFactor;
private final Config config;

TopicMetadataAndConfig(int numPartitions, int replicationFactor, Config config) {
TopicMetadataAndConfig(Uuid topicId, int numPartitions, int replicationFactor, Config config) {
this.exception = null;
this.topicId = topicId;
this.numPartitions = numPartitions;
this.replicationFactor = replicationFactor;
this.config = config;
}

TopicMetadataAndConfig(ApiException exception) {
this.exception = exception;
this.topicId = Uuid.ZERO_UUID;
this.numPartitions = UNKNOWN;
this.replicationFactor = UNKNOWN;
this.config = null;
}

public Uuid topicId() {
ensureSuccess();
return topicId;
}

public int numPartitions() {
ensureSuccess();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Collection;
import java.util.Map;

/**
* The result of the {@link Admin#deleteTopicsWithIds(Collection)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DeleteTopicsWithIdsResult {
final Map<Uuid, KafkaFuture<Void>> futures;

DeleteTopicsWithIdsResult(Map<Uuid, KafkaFuture<Void>> futures) {
this.futures = futures;
}

/**
* Return a map from topic IDs to futures which can be used to check the status of
* individual deletions.
*/
public Map<Uuid, KafkaFuture<Void>> values() {
return futures;
}

/**
* Return a future which succeeds only if all the topic deletions succeed.
*/
public KafkaFuture<Void> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
Expand All @@ -67,6 +68,7 @@
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnacceptableCredentialException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
Expand Down Expand Up @@ -108,6 +110,7 @@
import org.apache.kafka.common.message.DeleteRecordsResponseData;
import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsTopicResult;
import org.apache.kafka.common.message.DeleteTopicsRequestData;
import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState;
import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
import org.apache.kafka.common.message.DescribeClusterRequestData;
import org.apache.kafka.common.message.DescribeConfigsRequestData;
Expand Down Expand Up @@ -1554,7 +1557,7 @@ public void handleResponse(AbstractResponse abstractResponse) {
Config topicConfig = new Config(configs.stream()
.map(this::configEntry)
.collect(Collectors.toSet()));
topicMetadataAndConfig = new TopicMetadataAndConfig(result.numPartitions(),
topicMetadataAndConfig = new TopicMetadataAndConfig(result.topicId(), result.numPartitions(),
result.replicationFactor(),
topicConfig);
}
Expand Down Expand Up @@ -1625,6 +1628,32 @@ public DeleteTopicsResult deleteTopics(final Collection<String> topicNames,
return new DeleteTopicsResult(new HashMap<>(topicFutures));
}

@Override
public DeleteTopicsWithIdsResult deleteTopicsWithIds(final Collection<Uuid> topicIds,
final DeleteTopicsOptions options) {
final Map<Uuid, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(topicIds.size());
final List<Uuid> validTopicIds = new ArrayList<>(topicIds.size());
for (Uuid topicId : topicIds) {
if (topicId.equals(Uuid.ZERO_UUID)) {
KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
future.completeExceptionally(new UnknownTopicIdException("The given topic ID '" +
topicId + "' cannot be represented in a request."));
topicFutures.put(topicId, future);
} else if (!topicFutures.containsKey(topicId)) {
topicFutures.put(topicId, new KafkaFutureImpl<>());
validTopicIds.add(topicId);
}
}
if (!validTopicIds.isEmpty()) {
final long now = time.milliseconds();
final long deadline = calcDeadlineMs(now, options.timeoutMs());
final Call call = getDeleteTopicsWithIdsCall(options, topicFutures, validTopicIds,
Collections.emptyMap(), now, deadline);
runnable.call(call, now);
}
return new DeleteTopicsWithIdsResult(new HashMap<>(topicFutures));
}

private Call getDeleteTopicsCall(final DeleteTopicsOptions options,
final Map<String, KafkaFutureImpl<Void>> futures,
final List<String> topics,
Expand Down Expand Up @@ -1696,6 +1725,79 @@ void handleFailure(Throwable throwable) {
}
};
}

private Call getDeleteTopicsWithIdsCall(final DeleteTopicsOptions options,
final Map<Uuid, KafkaFutureImpl<Void>> futures,
final List<Uuid> topicIds,
final Map<Uuid, ThrottlingQuotaExceededException> quotaExceededExceptions,
final long now,
final long deadline) {
return new Call("deleteTopics", deadline, new ControllerNodeProvider()) {
@Override
DeleteTopicsRequest.Builder createRequest(int timeoutMs) {
return new DeleteTopicsRequest.Builder(
new DeleteTopicsRequestData()
.setTopics(topicIds.stream().map(
topic -> new DeleteTopicState().setTopicId(topic)).collect(Collectors.toList()))
.setTimeoutMs(timeoutMs));
}

@Override
void handleResponse(AbstractResponse abstractResponse) {
// Check for controller change
handleNotControllerError(abstractResponse);
// Handle server responses for particular topics.
final DeleteTopicsResponse response = (DeleteTopicsResponse) abstractResponse;
final List<Uuid> retryTopics = new ArrayList<>();
final Map<Uuid, ThrottlingQuotaExceededException> retryTopicQuotaExceededExceptions = new HashMap<>();
for (DeletableTopicResult result : response.data().responses()) {
KafkaFutureImpl<Void> future = futures.get(result.topicId());
if (future == null) {
log.warn("Server response mentioned unknown topic ID {}", result.topicId());
} else {
ApiError error = new ApiError(result.errorCode(), result.errorMessage());
if (error.isFailure()) {
if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) {
ThrottlingQuotaExceededException quotaExceededException = new ThrottlingQuotaExceededException(
response.throttleTimeMs(), error.messageWithFallback());
if (options.shouldRetryOnQuotaViolation()) {
retryTopics.add(result.topicId());
retryTopicQuotaExceededExceptions.put(result.topicId(), quotaExceededException);
} else {
future.completeExceptionally(quotaExceededException);
}
} else {
future.completeExceptionally(error.exception());
}
} else {
future.complete(null);
}
}
}
// If there are topics to retry, retry them; complete unrealized futures otherwise.
if (retryTopics.isEmpty()) {
// The server should send back a response for every topic. But do a sanity check anyway.
completeUnrealizedFutures(futures.entrySet().stream(),
topic -> "The controller response did not contain a result for topic " + topic);
} else {
final long now = time.milliseconds();
final Call call = getDeleteTopicsWithIdsCall(options, futures, retryTopics,
retryTopicQuotaExceededExceptions, now, deadline);
runnable.call(call, now);
}
}

@Override
void handleFailure(Throwable throwable) {
// If there were any topics retries due to a quota exceeded exception, we propagate
// the initial error back to the caller if the request timed out.
maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(),
throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now));
// Fail all the other remaining futures
completeAllExceptionally(futures.values(), throwable);
}
};
}

@Override
public ListTopicsResult listTopics(final ListTopicsOptions options) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

public class UnknownTopicIdException extends InvalidMetadataException {

private static final long serialVersionUID = 1L;

public UnknownTopicIdException(String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnstableOffsetCommitException;
import org.apache.kafka.common.errors.UnsupportedByAuthenticationException;
Expand Down Expand Up @@ -350,7 +351,8 @@ public enum Errors {
POSITION_OUT_OF_RANGE(
99,
"Requested position is not greater than or equal to zero, and less than the size of the snapshot.",
PositionOutOfRangeException::new);
PositionOutOfRangeException::new),
UNKNOWN_TOPIC_ID(100, "This server does not host this topic ID.", UnknownTopicIdException::new);

private static final Logger log = LoggerFactory.getLogger(Errors.class);

Expand Down
Loading