diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientListener.java b/clients/src/main/java/org/apache/kafka/clients/ClientListener.java new file mode 100644 index 0000000000000..8defd5fd9b0f3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/ClientListener.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients; + +import java.util.Map; + +import org.apache.kafka.common.protocol.Errors; + +public interface ClientListener { + + void onMetadataErrors(Map topicErrors); + + void onClientException(Exception e); + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java index 8c6e39a1a1c3b..b7a8e4de3b708 100644 --- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java @@ -133,4 +133,6 @@ public interface KafkaClient extends Closeable { */ public void wakeup(); + public void addListener(ClientListener clientListener); + } diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index b134631bd8b11..ea60cab5cd403 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -35,6 +35,7 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Random; @@ -79,6 +80,9 @@ public class NetworkClient implements KafkaClient { private final Time time; + private final List listeners = new ArrayList<>(); + + public NetworkClient(Selectable selector, Metadata metadata, String clientId, @@ -596,9 +600,10 @@ private void handleResponse(RequestHeader header, Struct body, long now) { Cluster cluster = response.cluster(); // check if any topics metadata failed to get updated Map errors = response.errors(); - if (!errors.isEmpty()) + if (!errors.isEmpty()) { log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), errors); - + fireListeners(Collections.unmodifiableMap(errors)); + } // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being // created which means we will get errors and no nodes until it exists if (cluster.nodes().size() > 0) { @@ -656,4 +661,13 @@ private void maybeUpdate(long now, Node node) { } + private void fireListeners(Map errors) { + for (ClientListener listener : listeners) + listener.onMetadataErrors(errors); + } + + public void addListener(ClientListener listener) { + this.listeners.add(listener); + } + } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 04b41ba1d13ee..ab97d1d0a9ebd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -12,6 +12,7 @@ */ package org.apache.kafka.clients.consumer; +import org.apache.kafka.clients.ClientListener; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NetworkClient; @@ -1438,4 +1439,8 @@ private void release() { if (refcount.decrementAndGet() == 0) currentThread.set(NO_CURRENT_THREAD); } + + public void addListener(ClientListener clientListener) { + fetcher.addListener(clientListener); + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index b65a5b7b203e1..b43912a0fe55f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -12,6 +12,7 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.ClientListener; import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.KafkaClient; @@ -427,4 +428,8 @@ public void onComplete(ClientResponse response) { } } } + + public void addListener(ClientListener clientListener) { + client.addListener(clientListener); + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index ddfb5841e3f92..8967e9c12ad86 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -13,6 +13,7 @@ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.ClientListener; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -30,6 +31,7 @@ import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; @@ -91,6 +93,8 @@ public class Fetcher { private final Set unauthorizedTopics; private final Map recordTooLargePartitions; + private final List listeners = new ArrayList<>(); + public Fetcher(ConsumerNetworkClient client, int minBytes, int maxWaitMs, @@ -309,9 +313,11 @@ private long listOffset(TopicPartition partition, long timestamp) { while (true) { RequestFuture future = sendListOffsetRequest(partition, timestamp); client.poll(future); - + if (future.succeeded()) return future.value(); + else + fireListeners(future.exception()); if (!future.isRetriable()) throw future.exception(); @@ -462,9 +468,14 @@ private RequestFuture sendListOffsetRequest(final TopicPartition topicPart partitions.put(topicPartition, new ListOffsetRequest.PartitionData(timestamp, 1)); PartitionInfo info = metadata.fetch().partition(topicPartition); if (info == null) { - metadata.add(topicPartition.topic()); - log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition); - return RequestFuture.staleMetadata(); + if (metadata.fetch().topics().contains(topicPartition.topic())) { + log.debug("Partition {} does not exist", topicPartition); + return RequestFuture.failure(new UnknownTopicOrPartitionException(topicPartition.toString())); + } else { + metadata.add(topicPartition.topic()); + log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition); + return RequestFuture.staleMetadata(); + } } else if (info.leader() == null) { log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", topicPartition); return RequestFuture.leaderNotAvailable(); @@ -813,4 +824,15 @@ public void recordTopicFetchMetrics(String topic, int bytes, int records) { recordsFetched.record(records); } } + + private void fireListeners(Exception e) { + for (ClientListener listener : this.listeners) + listener.onClientException(e); + } + + public void addListener(ClientListener listener) { + this.listeners.add(listener); + this.client.addListener(listener); + } + } diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 9fbbb88c48c57..adad9f5402368 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.utils.Time; @@ -66,6 +67,7 @@ public FutureResponse(Struct responseBody, boolean disconnected, RequestMatcher private final Queue requests = new ArrayDeque<>(); private final Queue responses = new ArrayDeque<>(); private final Queue futureResponses = new ArrayDeque<>(); + private final List listeners = new ArrayList<>(); public MockClient(Time time) { this.time = time; @@ -285,4 +287,12 @@ public interface RequestMatcher { boolean matches(ClientRequest request); } + private void fireListeners(Map errors) { + for (ClientListener listener : listeners) + listener.onMetadataErrors(errors); + } + + public void addListener(ClientListener listener) { + this.listeners.add(listener); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 8fad30f986b7a..22ed2b4a0173f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.ClientListener; import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; @@ -33,6 +34,7 @@ import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; @@ -440,6 +442,35 @@ public void testUpdateFetchPositionDisconnect() { assertEquals(5, (long) subscriptions.position(tp)); } + + @Test(expected = UnknownTopicOrPartitionException.class) + public void testUpdateFetchPositionUnknownTopic() { + + ClientListener listener = new ClientListener() { + @Override + public void onMetadataErrors(Map topicErrors) { + if (topicErrors.containsValue(Errors.UNKNOWN_TOPIC_OR_PARTITION)) + throw new UnknownTopicOrPartitionException(); + } + + @Override + public void onClientException(Exception e) { + if (e instanceof UnknownTopicOrPartitionException) + throw (UnknownTopicOrPartitionException) e; + } + }; + + fetcher.addListener(listener); + + subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST); + + client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), + listOffsetResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, Collections.emptyList()), false); + + fetcher.updateFetchPositions(Collections.singleton(tp)); + } + @Test public void testGetAllTopics() { // sending response before request, as getTopicMetadata is a blocking call