diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 3934627837e74..c0fd4cde09dbf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -285,11 +285,15 @@ public interface Listener { private Cluster getClusterForCurrentTopics(Cluster cluster) { Set unauthorizedTopics = new HashSet<>(); + Set unknownTopics = new HashSet<>(); Collection partitionInfos = new ArrayList<>(); List nodes = Collections.emptyList(); if (cluster != null) { unauthorizedTopics.addAll(cluster.unauthorizedTopics()); unauthorizedTopics.retainAll(this.topics.keySet()); + + unknownTopics.addAll(cluster.unknownTopics()); + unknownTopics.retainAll(this.topics.keySet()); for (String topic : this.topics.keySet()) { List partitionInfoList = cluster.partitionsForTopic(topic); @@ -299,6 +303,6 @@ private Cluster getClusterForCurrentTopics(Cluster cluster) { } nodes = cluster.nodes(); } - return new Cluster(nodes, partitionInfos, unauthorizedTopics); + return new Cluster(nodes, partitionInfos, unauthorizedTopics, unknownTopics); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 04b41ba1d13ee..0a9e6590a2702 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1375,6 +1375,10 @@ public void wakeup() { this.client.wakeup(); } + public void addListener(KafkaConsumerListener listener) { + fetcher.addListener(listener); + } + private void close(boolean swallowException) { log.trace("Closing the Kafka consumer."); AtomicReference firstException = new AtomicReference<>(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumerListener.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumerListener.java new file mode 100644 index 0000000000000..144407ed2b6ce --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumerListener.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +public interface KafkaConsumerListener { + + void onException(Exception e); + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index ddfb5841e3f92..e222b6ea2fecf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -16,6 +16,7 @@ import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumerListener; import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -30,6 +31,7 @@ import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; @@ -90,7 +92,11 @@ public class Fetcher { private final Map offsetOutOfRangePartitions; private final Set unauthorizedTopics; private final Map recordTooLargePartitions; + private final Set unknownTopicOrPartitions; + private final List listeners = new ArrayList<>(); + + public Fetcher(ConsumerNetworkClient client, int minBytes, int maxWaitMs, @@ -123,6 +129,7 @@ public Fetcher(ConsumerNetworkClient client, this.offsetOutOfRangePartitions = new HashMap<>(); this.unauthorizedTopics = new HashSet<>(); this.recordTooLargePartitions = new HashMap<>(); + this.unknownTopicOrPartitions = new HashSet<>(); this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix); this.retryBackoffMs = retryBackoffMs; @@ -203,6 +210,7 @@ public Map> getTopicMetadata(MetadataRequest request do { RequestFuture future = sendMetadataRequest(request); client.poll(future, remaining); + fireListeners(future); if (future.failed() && !future.isRetriable()) throw future.exception(); @@ -308,8 +316,11 @@ else if (strategy == OffsetResetStrategy.LATEST) private long listOffset(TopicPartition partition, long timestamp) { while (true) { RequestFuture future = sendListOffsetRequest(partition, timestamp); + client.poll(future); + fireListeners(future); + if (future.succeeded()) return future.value(); @@ -379,6 +390,15 @@ private void throwIfRecordTooLarge() throws RecordTooLargeException { copiedRecordTooLargePartitions); } + private void fireIfUnknownTopicOrPartition() throws UnknownTopicOrPartitionException { + Set copiedUnknownTopicOrPartitions = new HashSet<>(this.unknownTopicOrPartitions); + this.unknownTopicOrPartitions.clear(); + + if (!copiedUnknownTopicOrPartitions.isEmpty()) { + fireListeners(new UnknownTopicOrPartitionException(copiedUnknownTopicOrPartitions)); + } + } + /** * Return the fetched records, empty the record buffer and update the consumed position. * @@ -396,6 +416,7 @@ public Map>> fetchedRecords() { throwIfOffsetOutOfRange(); throwIfUnauthorizedTopics(); throwIfRecordTooLarge(); + fireIfUnknownTopicOrPartition(); int maxRecords = maxPollRecords; Iterator> iterator = records.iterator(); @@ -462,9 +483,23 @@ private RequestFuture sendListOffsetRequest(final TopicPartition topicPart partitions.put(topicPartition, new ListOffsetRequest.PartitionData(timestamp, 1)); PartitionInfo info = metadata.fetch().partition(topicPartition); if (info == null) { - metadata.add(topicPartition.topic()); - log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition); - return RequestFuture.staleMetadata(); + boolean partitionExists = false; + List availablePartitionsForTopic = metadata.fetch().availablePartitionsForTopic(topicPartition.topic()); + if (availablePartitionsForTopic != null) { + for (PartitionInfo partitionInfo : availablePartitionsForTopic) { + if (partitionInfo.partition() == topicPartition.partition()) { + partitionExists = true; + break; + } + } + } + if (metadata.fetch().unknownTopics().contains(topicPartition.topic()) || !partitionExists) { + return RequestFuture.failure(new UnknownTopicOrPartitionException(topicPartition)); + } else { + metadata.add(topicPartition.topic()); + log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition); + return RequestFuture.staleMetadata(); + } } else if (info.leader() == null) { log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", topicPartition); return RequestFuture.leaderNotAvailable(); @@ -486,7 +521,7 @@ public void onSuccess(ClientResponse response, RequestFuture future) { * @param topicPartition The partition that was fetched * @param clientResponse The response from the server. */ - private void handleListOffsetResponse(TopicPartition topicPartition, + private void handleListOffsetResponse(final TopicPartition topicPartition, ClientResponse clientResponse, RequestFuture future) { ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody()); @@ -499,11 +534,12 @@ private void handleListOffsetResponse(TopicPartition topicPartition, log.debug("Fetched offset {} for partition {}", offset, topicPartition); future.complete(offset); - } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() - || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { + } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()) { log.debug("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.", topicPartition); future.raise(Errors.forCode(errorCode)); + } else if (errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { + future.raise(new UnknownTopicOrPartitionException(topicPartition)); } else { log.warn("Attempt to fetch offsets for partition {} failed due to: {}", topicPartition, Errors.forCode(errorCode).message()); @@ -612,9 +648,11 @@ private void handleFetchResponse(ClientResponse resp, FetchRequest request) { this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size()); totalBytes += bytes; totalCount += parsed.size(); - } else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() - || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { + } else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()) { + this.metadata.requestUpdate(); + } else if (partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { this.metadata.requestUpdate(); + unknownTopicOrPartitions.add(tp); } else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) { long fetchOffset = request.fetchData().get(tp).offset; if (subscriptions.hasDefaultOffsetResetPolicy()) @@ -813,4 +851,21 @@ public void recordTopicFetchMetrics(String topic, int bytes, int records) { recordsFetched.record(records); } } + + private void fireListeners(RequestFuture future) { + if (future.exception() != null) + fireListeners(future.exception()); + } + + private void fireListeners(Exception e) { + for (KafkaConsumerListener listener : listeners) + listener.onException(e); + } + + public void addListener(KafkaConsumerListener listener) { + this.listeners.add(listener); + //potentially we could go lower-level eg connection refused .. + //this.client.add(listener); + } + } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index a61ee93daceec..13362416f05f7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -42,6 +42,7 @@ import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.MetricName; @@ -533,6 +534,10 @@ private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedExce throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); if (metadata.fetch().unauthorizedTopics().contains(topic)) throw new TopicAuthorizationException(topic); + if (metadata.fetch().unknownTopics().contains(topic)) { + //topic does not exist, we can ignore the partition and just use the topic name + throw new UnknownTopicOrPartitionException(topic); + } remainingWaitMs = maxWaitMs - elapsed; } return time.milliseconds() - begin; diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java index e1bf581b3e5d1..0fc895a13e07a 100644 --- a/clients/src/main/java/org/apache/kafka/common/Cluster.java +++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java @@ -32,6 +32,7 @@ public final class Cluster { private final boolean isBootstrapConfigured; private final List nodes; private final Set unauthorizedTopics; + private final Set unknownTopics; private final Map partitionsByTopicPartition; private final Map> partitionsByTopic; private final Map> availablePartitionsByTopic; @@ -43,16 +44,25 @@ public final class Cluster { * @param nodes The nodes in the cluster * @param partitions Information about a subset of the topic-partitions this cluster hosts */ + @Deprecated public Cluster(Collection nodes, Collection partitions, Set unauthorizedTopics) { - this(false, nodes, partitions, unauthorizedTopics); + this(nodes, partitions, unauthorizedTopics, Collections.emptySet()); + } + + public Cluster(Collection nodes, + Collection partitions, + Set unauthorizedTopics, + Set unknownTopics) { + this(false, nodes, partitions, unauthorizedTopics, unknownTopics); } private Cluster(boolean isBootstrapConfigured, Collection nodes, Collection partitions, - Set unauthorizedTopics) { + Set unauthorizedTopics, + Set unknownTopics) { this.isBootstrapConfigured = isBootstrapConfigured; // make a randomized, unmodifiable copy of the nodes @@ -105,13 +115,14 @@ private Cluster(boolean isBootstrapConfigured, this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue())); this.unauthorizedTopics = Collections.unmodifiableSet(unauthorizedTopics); + this.unknownTopics = Collections.unmodifiableSet(unknownTopics); } /** * Create an empty cluster instance with no nodes and no topic-partitions. */ public static Cluster empty() { - return new Cluster(new ArrayList(0), new ArrayList(0), Collections.emptySet()); + return new Cluster(new ArrayList(0), new ArrayList(0), Collections.emptySet(), Collections.emptySet()); } /** @@ -124,7 +135,7 @@ public static Cluster bootstrap(List addresses) { int nodeId = -1; for (InetSocketAddress address : addresses) nodes.add(new Node(nodeId--, address.getHostString(), address.getPort())); - return new Cluster(true, nodes, new ArrayList(0), Collections.emptySet()); + return new Cluster(true, nodes, new ArrayList(0), Collections.emptySet(), Collections.emptySet()); } /** @@ -133,7 +144,7 @@ public static Cluster bootstrap(List addresses) { public Cluster withPartitions(Map partitions) { Map combinedPartitions = new HashMap<>(this.partitionsByTopicPartition); combinedPartitions.putAll(partitions); - return new Cluster(this.nodes, combinedPartitions.values(), new HashSet<>(this.unauthorizedTopics)); + return new Cluster(this.nodes, combinedPartitions.values(), new HashSet<>(this.unauthorizedTopics), new HashSet<>(this.unknownTopics)); } /** @@ -227,6 +238,10 @@ public boolean isBootstrapConfigured() { return isBootstrapConfigured; } + public Set unknownTopics() { + return unknownTopics; + } + @Override public String toString() { return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")"; diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java index 70fd55c8788be..4b85109e3c27c 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java @@ -12,6 +12,12 @@ */ package org.apache.kafka.common.errors; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import org.apache.kafka.common.TopicPartition; + /** * This topic/partition doesn't exist */ @@ -19,19 +25,20 @@ public class UnknownTopicOrPartitionException extends InvalidMetadataException { private static final long serialVersionUID = 1L; - public UnknownTopicOrPartitionException() { + public UnknownTopicOrPartitionException(final String message) { + super(message); } - public UnknownTopicOrPartitionException(String message) { - super(message); + public UnknownTopicOrPartitionException(TopicPartition unknownTopicOrPartition) { + this(new HashSet(Arrays.asList(unknownTopicOrPartition))); } - public UnknownTopicOrPartitionException(Throwable throwable) { - super(throwable); + public UnknownTopicOrPartitionException(Set unknownTopicOrPartitions) { + this(unknownTopicOrPartitions, null); } - public UnknownTopicOrPartitionException(String message, Throwable throwable) { - super(message, throwable); + public UnknownTopicOrPartitionException(Set unknownTopicOrPartitions, Throwable throwable) { + super(org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION.message() + unknownTopicOrPartitions.toString(), throwable); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index 78b35f8de08e7..e11c44efbe01f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -265,7 +265,7 @@ public Cluster cluster() { } } - return new Cluster(this.brokers, partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED)); + return new Cluster(this.brokers, partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED), topicsByError(Errors.UNKNOWN_TOPIC_OR_PARTITION)); } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index 5defb13a51c50..35ab9afa5bf43 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -134,7 +134,7 @@ public void testUpdateWithNeedMetadataForAllTopics() { Arrays.asList( new PartitionInfo("topic", 0, null, null, null), new PartitionInfo("topic1", 0, null, null, null)), - Collections.emptySet()), + Collections.emptySet(), Collections.emptySet()), 100); assertArrayEquals("Metadata got updated with wrong set of topics.", @@ -161,7 +161,7 @@ public void onMetadataUpdate(Cluster cluster) { Arrays.asList( new PartitionInfo("topic", 0, null, null, null), new PartitionInfo("topic1", 0, null, null, null)), - Collections.emptySet()), + Collections.emptySet(), Collections.emptySet()), 100); assertEquals("Listener did not update topics list correctly", @@ -187,7 +187,7 @@ public void onMetadataUpdate(Cluster cluster) { Arrays.asList( new PartitionInfo("topic", 0, null, null, null), new PartitionInfo("topic1", 0, null, null, null)), - Collections.emptySet()), + Collections.emptySet(), Collections.emptySet()), 100); metadata.removeListener(listener); @@ -197,7 +197,7 @@ public void onMetadataUpdate(Cluster cluster) { Arrays.asList( new PartitionInfo("topic2", 0, null, null, null), new PartitionInfo("topic3", 0, null, null, null)), - Collections.emptySet()), + Collections.emptySet(), Collections.emptySet()), 100); assertEquals("Listener did not update topics list correctly", diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 8fad30f986b7a..b4d609bf0421e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumerListener; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -33,6 +34,8 @@ import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; @@ -440,6 +443,27 @@ public void testUpdateFetchPositionDisconnect() { assertEquals(5, (long) subscriptions.position(tp)); } + @Test(expected = WakeupException.class) + public void testUpdateFetchPositionUnknownTopic() { + + KafkaConsumerListener kcListener = new KafkaConsumerListener() { + @Override + public void onException(Exception e) { + if (e instanceof UnknownTopicOrPartitionException) + throw new WakeupException(); + } + }; + fetcher.addListener(kcListener); + + subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST); + + client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), + listOffsetResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, Collections.emptyList()), false); + + fetcher.updateFetchPositions(Collections.singleton(tp)); + } + @Test public void testGetAllTopics() { // sending response before request, as getTopicMetadata is a blocking call diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java index 0a0bdd8e42fb3..901786995c472 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java @@ -59,7 +59,7 @@ public void testAutoCompleteMock() throws Exception { public void testPartitioner() throws Exception { PartitionInfo partitionInfo0 = new PartitionInfo(topic, 0, null, null, null); PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, null); - Cluster cluster = new Cluster(new ArrayList(0), asList(partitionInfo0, partitionInfo1), Collections.emptySet()); + Cluster cluster = new Cluster(new ArrayList(0), asList(partitionInfo0, partitionInfo1), Collections.emptySet(), Collections.emptySet()); MockProducer producer = new MockProducer(cluster, true, new DefaultPartitioner(), new StringSerializer(), new StringSerializer()); ProducerRecord record = new ProducerRecord(topic, "key", "value"); Future metadata = producer.send(record); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java index fd8a5bc3b0d4c..bc4796679b272 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java @@ -37,7 +37,7 @@ public class DefaultPartitionerTest { private List partitions = asList(new PartitionInfo(topic, 1, null, nodes, nodes), new PartitionInfo(topic, 2, node1, nodes, nodes), new PartitionInfo(topic, 0, node0, nodes, nodes)); - private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions, Collections.emptySet()); + private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions, Collections.emptySet(), Collections.emptySet()); @Test public void testKeyPartitionIsStable() { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 43ac15a09a4a0..37f87cca27331 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -64,7 +64,7 @@ public class RecordAccumulatorTest { private byte[] key = "key".getBytes(); private byte[] value = "value".getBytes(); private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value); - private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3), Collections.emptySet()); + private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3), Collections.emptySet(), Collections.emptySet()); private Metrics metrics = new Metrics(time); private final long maxBlockTimeMs = 1000; diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index a818d5336d6ab..0fd04df21b0c9 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -74,7 +74,7 @@ public static Cluster clusterWith(int nodes, Map topicPartition for (int i = 0; i < partitions; i++) parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns)); } - return new Cluster(asList(ns), parts, Collections.emptySet()); + return new Cluster(asList(ns), parts, Collections.emptySet(), Collections.emptySet()); } public static Cluster clusterWith(int nodes, String topic, int partitions) {