Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion clients/src/main/java/org/apache/kafka/clients/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,15 @@ public interface Listener {

private Cluster getClusterForCurrentTopics(Cluster cluster) {
Set<String> unauthorizedTopics = new HashSet<>();
Set<String> unknownTopics = new HashSet<>();
Collection<PartitionInfo> partitionInfos = new ArrayList<>();
List<Node> nodes = Collections.emptyList();
if (cluster != null) {
unauthorizedTopics.addAll(cluster.unauthorizedTopics());
unauthorizedTopics.retainAll(this.topics.keySet());

unknownTopics.addAll(cluster.unknownTopics());
unknownTopics.retainAll(this.topics.keySet());

for (String topic : this.topics.keySet()) {
List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
Expand All @@ -299,6 +303,6 @@ private Cluster getClusterForCurrentTopics(Cluster cluster) {
}
nodes = cluster.nodes();
}
return new Cluster(nodes, partitionInfos, unauthorizedTopics);
return new Cluster(nodes, partitionInfos, unauthorizedTopics, unknownTopics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1375,6 +1375,10 @@ public void wakeup() {
this.client.wakeup();
}

public void addListener(KafkaConsumerListener listener) {
fetcher.addListener(listener);
}

private void close(boolean swallowException) {
log.trace("Closing the Kafka consumer.");
AtomicReference<Throwable> firstException = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.consumer;

public interface KafkaConsumerListener {

void onException(Exception e);

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumerListener;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
Expand All @@ -30,6 +31,7 @@
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
Expand Down Expand Up @@ -90,7 +92,11 @@ public class Fetcher<K, V> {
private final Map<TopicPartition, Long> offsetOutOfRangePartitions;
private final Set<String> unauthorizedTopics;
private final Map<TopicPartition, Long> recordTooLargePartitions;
private final Set<TopicPartition> unknownTopicOrPartitions;

private final List<KafkaConsumerListener> listeners = new ArrayList<>();


public Fetcher(ConsumerNetworkClient client,
int minBytes,
int maxWaitMs,
Expand Down Expand Up @@ -123,6 +129,7 @@ public Fetcher(ConsumerNetworkClient client,
this.offsetOutOfRangePartitions = new HashMap<>();
this.unauthorizedTopics = new HashSet<>();
this.recordTooLargePartitions = new HashMap<>();
this.unknownTopicOrPartitions = new HashSet<>();

this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix);
this.retryBackoffMs = retryBackoffMs;
Expand Down Expand Up @@ -203,6 +210,7 @@ public Map<String, List<PartitionInfo>> getTopicMetadata(MetadataRequest request
do {
RequestFuture<ClientResponse> future = sendMetadataRequest(request);
client.poll(future, remaining);
fireListeners(future);

if (future.failed() && !future.isRetriable())
throw future.exception();
Expand Down Expand Up @@ -308,8 +316,11 @@ else if (strategy == OffsetResetStrategy.LATEST)
private long listOffset(TopicPartition partition, long timestamp) {
while (true) {
RequestFuture<Long> future = sendListOffsetRequest(partition, timestamp);

client.poll(future);

fireListeners(future);

if (future.succeeded())
return future.value();

Expand Down Expand Up @@ -379,6 +390,15 @@ private void throwIfRecordTooLarge() throws RecordTooLargeException {
copiedRecordTooLargePartitions);
}

private void fireIfUnknownTopicOrPartition() throws UnknownTopicOrPartitionException {
Set<TopicPartition> copiedUnknownTopicOrPartitions = new HashSet<>(this.unknownTopicOrPartitions);
this.unknownTopicOrPartitions.clear();

if (!copiedUnknownTopicOrPartitions.isEmpty()) {
fireListeners(new UnknownTopicOrPartitionException(copiedUnknownTopicOrPartitions));
}
}

/**
* Return the fetched records, empty the record buffer and update the consumed position.
*
Expand All @@ -396,6 +416,7 @@ public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
throwIfOffsetOutOfRange();
throwIfUnauthorizedTopics();
throwIfRecordTooLarge();
fireIfUnknownTopicOrPartition();

int maxRecords = maxPollRecords;
Iterator<PartitionRecords<K, V>> iterator = records.iterator();
Expand Down Expand Up @@ -462,9 +483,23 @@ private RequestFuture<Long> sendListOffsetRequest(final TopicPartition topicPart
partitions.put(topicPartition, new ListOffsetRequest.PartitionData(timestamp, 1));
PartitionInfo info = metadata.fetch().partition(topicPartition);
if (info == null) {
metadata.add(topicPartition.topic());
log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition);
return RequestFuture.staleMetadata();
boolean partitionExists = false;
List<PartitionInfo> availablePartitionsForTopic = metadata.fetch().availablePartitionsForTopic(topicPartition.topic());
if (availablePartitionsForTopic != null) {
for (PartitionInfo partitionInfo : availablePartitionsForTopic) {
if (partitionInfo.partition() == topicPartition.partition()) {
partitionExists = true;
break;
}
}
}
if (metadata.fetch().unknownTopics().contains(topicPartition.topic()) || !partitionExists) {
return RequestFuture.failure(new UnknownTopicOrPartitionException(topicPartition));
} else {
metadata.add(topicPartition.topic());
log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition);
return RequestFuture.staleMetadata();
}
} else if (info.leader() == null) {
log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", topicPartition);
return RequestFuture.leaderNotAvailable();
Expand All @@ -486,7 +521,7 @@ public void onSuccess(ClientResponse response, RequestFuture<Long> future) {
* @param topicPartition The partition that was fetched
* @param clientResponse The response from the server.
*/
private void handleListOffsetResponse(TopicPartition topicPartition,
private void handleListOffsetResponse(final TopicPartition topicPartition,
ClientResponse clientResponse,
RequestFuture<Long> future) {
ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody());
Expand All @@ -499,11 +534,12 @@ private void handleListOffsetResponse(TopicPartition topicPartition,
log.debug("Fetched offset {} for partition {}", offset, topicPartition);

future.complete(offset);
} else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
|| errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
} else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()) {
log.debug("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
topicPartition);
future.raise(Errors.forCode(errorCode));
} else if (errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
future.raise(new UnknownTopicOrPartitionException(topicPartition));
} else {
log.warn("Attempt to fetch offsets for partition {} failed due to: {}",
topicPartition, Errors.forCode(errorCode).message());
Expand Down Expand Up @@ -612,9 +648,11 @@ private void handleFetchResponse(ClientResponse resp, FetchRequest request) {
this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size());
totalBytes += bytes;
totalCount += parsed.size();
} else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
|| partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
} else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()) {
this.metadata.requestUpdate();
} else if (partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
this.metadata.requestUpdate();
unknownTopicOrPartitions.add(tp);
} else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
long fetchOffset = request.fetchData().get(tp).offset;
if (subscriptions.hasDefaultOffsetResetPolicy())
Expand Down Expand Up @@ -813,4 +851,21 @@ public void recordTopicFetchMetrics(String topic, int bytes, int records) {
recordsFetched.record(records);
}
}

private <T> void fireListeners(RequestFuture<T> future) {
if (future.exception() != null)
fireListeners(future.exception());
}

private <T> void fireListeners(Exception e) {
for (KafkaConsumerListener listener : listeners)
listener.onException(e);
}

public void addListener(KafkaConsumerListener listener) {
this.listeners.add(listener);
//potentially we could go lower-level eg connection refused ..
//this.client.add(listener);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.MetricName;
Expand Down Expand Up @@ -533,6 +534,10 @@ private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedExce
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
if (metadata.fetch().unauthorizedTopics().contains(topic))
throw new TopicAuthorizationException(topic);
if (metadata.fetch().unknownTopics().contains(topic)) {
//topic does not exist, we can ignore the partition and just use the topic name
throw new UnknownTopicOrPartitionException(topic);
}
remainingWaitMs = maxWaitMs - elapsed;
}
return time.milliseconds() - begin;
Expand Down
25 changes: 20 additions & 5 deletions clients/src/main/java/org/apache/kafka/common/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public final class Cluster {
private final boolean isBootstrapConfigured;
private final List<Node> nodes;
private final Set<String> unauthorizedTopics;
private final Set<String> unknownTopics;
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
private final Map<String, List<PartitionInfo>> partitionsByTopic;
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
Expand All @@ -43,16 +44,25 @@ public final class Cluster {
* @param nodes The nodes in the cluster
* @param partitions Information about a subset of the topic-partitions this cluster hosts
*/
@Deprecated
public Cluster(Collection<Node> nodes,
Collection<PartitionInfo> partitions,
Set<String> unauthorizedTopics) {
this(false, nodes, partitions, unauthorizedTopics);
this(nodes, partitions, unauthorizedTopics, Collections.<String>emptySet());
}

public Cluster(Collection<Node> nodes,
Collection<PartitionInfo> partitions,
Set<String> unauthorizedTopics,
Set<String> unknownTopics) {
this(false, nodes, partitions, unauthorizedTopics, unknownTopics);
}

private Cluster(boolean isBootstrapConfigured,
Collection<Node> nodes,
Collection<PartitionInfo> partitions,
Set<String> unauthorizedTopics) {
Set<String> unauthorizedTopics,
Set<String> unknownTopics) {
this.isBootstrapConfigured = isBootstrapConfigured;

// make a randomized, unmodifiable copy of the nodes
Expand Down Expand Up @@ -105,13 +115,14 @@ private Cluster(boolean isBootstrapConfigured,
this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));

this.unauthorizedTopics = Collections.unmodifiableSet(unauthorizedTopics);
this.unknownTopics = Collections.unmodifiableSet(unknownTopics);
}

/**
* Create an empty cluster instance with no nodes and no topic-partitions.
*/
public static Cluster empty() {
return new Cluster(new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0), Collections.<String>emptySet());
return new Cluster(new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(), Collections.<String>emptySet());
}

/**
Expand All @@ -124,7 +135,7 @@ public static Cluster bootstrap(List<InetSocketAddress> addresses) {
int nodeId = -1;
for (InetSocketAddress address : addresses)
nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
return new Cluster(true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet());
return new Cluster(true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(), Collections.<String>emptySet());
}

/**
Expand All @@ -133,7 +144,7 @@ public static Cluster bootstrap(List<InetSocketAddress> addresses) {
public Cluster withPartitions(Map<TopicPartition, PartitionInfo> partitions) {
Map<TopicPartition, PartitionInfo> combinedPartitions = new HashMap<>(this.partitionsByTopicPartition);
combinedPartitions.putAll(partitions);
return new Cluster(this.nodes, combinedPartitions.values(), new HashSet<>(this.unauthorizedTopics));
return new Cluster(this.nodes, combinedPartitions.values(), new HashSet<>(this.unauthorizedTopics), new HashSet<>(this.unknownTopics));
}

/**
Expand Down Expand Up @@ -227,6 +238,10 @@ public boolean isBootstrapConfigured() {
return isBootstrapConfigured;
}

public Set<String> unknownTopics() {
return unknownTopics;
}

@Override
public String toString() {
return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,33 @@
*/
package org.apache.kafka.common.errors;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

import org.apache.kafka.common.TopicPartition;

/**
* This topic/partition doesn't exist
*/
public class UnknownTopicOrPartitionException extends InvalidMetadataException {

private static final long serialVersionUID = 1L;

public UnknownTopicOrPartitionException() {
public UnknownTopicOrPartitionException(final String message) {
super(message);
}

public UnknownTopicOrPartitionException(String message) {
super(message);
public UnknownTopicOrPartitionException(TopicPartition unknownTopicOrPartition) {
this(new HashSet<TopicPartition>(Arrays.asList(unknownTopicOrPartition)));
}

public UnknownTopicOrPartitionException(Throwable throwable) {
super(throwable);
public UnknownTopicOrPartitionException(Set<TopicPartition> unknownTopicOrPartitions) {
this(unknownTopicOrPartitions, null);
}

public UnknownTopicOrPartitionException(String message, Throwable throwable) {
super(message, throwable);
public UnknownTopicOrPartitionException(Set<TopicPartition> unknownTopicOrPartitions, Throwable throwable) {
super(org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION.message() + unknownTopicOrPartitions.toString(), throwable);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public Cluster cluster() {
}
}

return new Cluster(this.brokers, partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED));
return new Cluster(this.brokers, partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED), topicsByError(Errors.UNKNOWN_TOPIC_OR_PARTITION));
}

/**
Expand Down
Loading