Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1901,7 +1901,7 @@ public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition
*
* @param topic The topic to get partition metadata for
*
* @return The list of partitions
* @return The list of partitions, which will be empty when the given topic is not found
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
Expand All @@ -1924,7 +1924,7 @@ public List<PartitionInfo> partitionsFor(String topic) {
* @param topic The topic to get partition metadata for
* @param timeout The maximum of time to await topic metadata
*
* @return The list of partitions
* @return The list of partitions, which will be empty when the given topic is not found
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
Expand All @@ -1948,7 +1948,7 @@ public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
Timer timer = time.timer(timeout);
Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(
new MetadataRequest.Builder(Collections.singletonList(topic), metadata.allowAutoTopicCreation()), timer);
return topicMetadata.get(topic);
return topicMetadata.getOrDefault(topic, Collections.emptyList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious.

Is it possible that an existent topic has no partitions? If so, returning empty list may confuse users who querying nonexistent topic. Maybe we should throw UnknownTopicOrPartitionException for nonexistent topic and return empty list for existent topic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, it is worth writing docs for this case (return empty list).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you could have a topic with 0 partition.

} finally {
release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ public synchronized void updateEndOffsets(final Map<TopicPartition, Long> newOff
@Override
public synchronized List<PartitionInfo> partitionsFor(String topic) {
ensureNotClosed();
return this.partitions.get(topic);
return this.partitions.getOrDefault(topic, Collections.emptyList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1757,6 +1757,28 @@ private void consumerCloseTest(final long closeTimeoutMs,
}
}

@Test
public void testPartitionsForNonExistingTopic() {
Time time = new MockTime();
SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);

initMetadata(client, Collections.singletonMap(topic, 1));
Cluster cluster = metadata.fetch();

MetadataResponse updateResponse = RequestTestUtils.metadataResponse(cluster.nodes(),
cluster.clusterResource().clusterId(),
cluster.controller().id(),
Collections.emptyList());
client.prepareResponse(updateResponse);

ConsumerPartitionAssignor assignor = new RoundRobinAssignor();

KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
assertEquals(Collections.emptyList(), consumer.partitionsFor("non-exist-topic"));
}

@Test
public void testPartitionsForAuthenticationFailure() {
final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,12 @@ public void start() {
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
long started = time.nanoseconds();
long sleepMs = 100;
while (partitionInfos == null && time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS) {
while (partitionInfos.isEmpty() && time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS) {
time.sleep(sleepMs);
sleepMs = Math.min(2 * sleepMs, MAX_SLEEP_MS);
partitionInfos = consumer.partitionsFor(topic);
}
if (partitionInfos == null)
if (partitionInfos.isEmpty())
throw new ConnectException("Could not look up partition metadata for offset backing store topic in" +
" allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if" +
" this is your first use of the topic it may have taken too long to create.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
Expand All @@ -40,7 +41,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -260,10 +260,11 @@ public void assertNoTopicStatusInStatusTopic() {
Consumer<byte[], byte[]> verifiableConsumer = connect.kafka().createConsumer(
Collections.singletonMap("group.id", "verifiable-consumer-group-0"));

List<TopicPartition> partitions =
Optional.ofNullable(verifiableConsumer.partitionsFor(statusTopic))
.orElseThrow(() -> new AssertionError("Unable to retrieve partitions info for status topic"))
.stream()
List<PartitionInfo> partitionInfos = verifiableConsumer.partitionsFor(statusTopic);
if (partitionInfos.isEmpty()) {
throw new AssertionError("Unable to retrieve partitions info for status topic");
}
List<TopicPartition> partitions = partitionInfos.stream()
.map(info -> new TopicPartition(info.topic(), info.partition()))
.collect(Collectors.toList());
verifiableConsumer.assign(partitions);
Expand Down