diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 85a3e239e91dd..9c6a728899727 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -254,6 +254,7 @@ public class KafkaProducer implements Producer { private final Serializer valueSerializer; private final ProducerConfig producerConfig; private final long maxBlockTimeMs; + private final boolean partitionerIgnoreKeys; private final ProducerInterceptors interceptors; private final ApiVersions apiVersions; private final TransactionManager transactionManager; @@ -316,6 +317,23 @@ public KafkaProducer(Properties properties, Serializer keySerializer, Seriali this(Utils.propsToMap(properties), keySerializer, valueSerializer); } + /** + * Check if partitioner is deprecated and log a warning if it is. + */ + @SuppressWarnings("deprecation") + private void warnIfPartitionerDeprecated() { + // Using DefaultPartitioner and UniformStickyPartitioner is deprecated, see KIP-794. + if (partitioner instanceof org.apache.kafka.clients.producer.internals.DefaultPartitioner) { + log.warn("DefaultPartitioner is deprecated. Please clear " + ProducerConfig.PARTITIONER_CLASS_CONFIG + + " configuration setting to get the default partitioning behavior"); + } + if (partitioner instanceof org.apache.kafka.clients.producer.UniformStickyPartitioner) { + log.warn("UniformStickyPartitioner is deprecated. Please clear " + ProducerConfig.PARTITIONER_CLASS_CONFIG + + " configuration setting and set " + ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG + + " to 'true' to get the uniform sticky partitioning behavior"); + } + } + // visible for testing @SuppressWarnings("unchecked") KafkaProducer(ProducerConfig config, @@ -360,6 +378,8 @@ public KafkaProducer(Properties properties, Serializer keySerializer, Seriali ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class, Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)); + warnIfPartitionerDeprecated(); + this.partitionerIgnoreKeys = config.getBoolean(ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG); long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); if (keySerializer == null) { this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, @@ -397,12 +417,20 @@ public KafkaProducer(Properties properties, Serializer keySerializer, Seriali this.apiVersions = new ApiVersions(); this.transactionManager = configureTransactionState(config, logContext); + // There is no need to do work required for adaptive partitioning, if we use a custom partitioner. + boolean enableAdaptivePartitioning = partitioner == null && + config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG); + RecordAccumulator.PartitionerConfig partitionerConfig = new RecordAccumulator.PartitionerConfig( + enableAdaptivePartitioning, + config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG) + ); this.accumulator = new RecordAccumulator(logContext, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.compressionType, lingerMs(config), retryBackoffMs, deliveryTimeoutMs, + partitionerConfig, metrics, PRODUCER_METRIC_GROUP_NAME, time, @@ -468,6 +496,7 @@ public KafkaProducer(Properties properties, Serializer keySerializer, Seriali this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); + this.partitionerIgnoreKeys = config.getBoolean(ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG); this.apiVersions = new ApiVersions(); this.transactionManager = transactionManager; this.accumulator = accumulator; @@ -922,11 +951,24 @@ private void throwIfProducerClosed() { throw new IllegalStateException("Cannot perform operation after producer has been closed"); } + /** + * Call deprecated {@link Partitioner#onNewBatch} + */ + @SuppressWarnings("deprecation") + private void onNewBatch(String topic, Cluster cluster, int prevPartition) { + assert partitioner != null; + partitioner.onNewBatch(topic, cluster, prevPartition); + } + /** * Implementation of asynchronously send a record to a topic. */ private Future doSend(ProducerRecord record, Callback callback) { - TopicPartition tp = null; + // Append callback takes care of the following: + // - call interceptors and user callback on completion + // - remember partition that is calculated in RecordAccumulator.append + AppendCallbacks appendCallbacks = new AppendCallbacks(callback, this.interceptors, record); + try { throwIfProducerClosed(); // first make sure the metadata for the topic is available @@ -958,8 +1000,11 @@ private Future doSend(ProducerRecord record, Callback call " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", cce); } + + // Try to calculate partition, but note that after this call it can be RecordMetadata.UNKNOWN_PARTITION, + // which means that the RecordAccumulator would pick a partition using built-in logic (which may + // take into account broker load, the amount of data produced to each partition, etc.). int partition = partition(record, serializedKey, serializedValue, cluster); - tp = new TopicPartition(record.topic(), partition); setReadOnly(record.headers()); Header[] headers = record.headers().toArray(); @@ -968,41 +1013,38 @@ private Future doSend(ProducerRecord record, Callback call compressionType, serializedKey, serializedValue, headers); ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? nowMs : record.timestamp(); - if (log.isTraceEnabled()) { - log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); - } - // producer callback will make sure to call both 'callback' and interceptor callback - Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); - RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, - serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs); + // A custom partitioner may take advantage on the onNewBatch callback. + boolean abortOnNewBatch = partitioner != null; + + // Append the record to the accumulator. Note, that the actual partition may be + // calculated there and can be accessed via appendCallbacks.topicPartition. + RecordAccumulator.RecordAppendResult result = accumulator.append(record.topic(), partition, timestamp, serializedKey, + serializedValue, headers, appendCallbacks, remainingWaitMs, abortOnNewBatch, nowMs, cluster); + assert appendCallbacks.getPartition() != RecordMetadata.UNKNOWN_PARTITION; if (result.abortForNewBatch) { int prevPartition = partition; - partitioner.onNewBatch(record.topic(), cluster, prevPartition); + onNewBatch(record.topic(), cluster, prevPartition); partition = partition(record, serializedKey, serializedValue, cluster); - tp = new TopicPartition(record.topic(), partition); if (log.isTraceEnabled()) { log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition); } - // producer callback will make sure to call both 'callback' and interceptor callback - interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); - - result = accumulator.append(tp, timestamp, serializedKey, - serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs); + result = accumulator.append(record.topic(), partition, timestamp, serializedKey, + serializedValue, headers, appendCallbacks, remainingWaitMs, false, nowMs, cluster); } // Add the partition to the transaction (if in progress) after it has been successfully - // appended to the accumulator. We cannot do it before because the initially selected - // partition may be changed when the batch is closed (as indicated by `abortForNewBatch`). - // Note that the `Sender` will refuse to dequeue batches from the accumulator until they - // have been added to the transaction. + // appended to the accumulator. We cannot do it before because the partition may be + // unknown or the initially selected partition may be changed when the batch is closed + // (as indicated by `abortForNewBatch`). Note that the `Sender` will refuse to dequeue + // batches from the accumulator until they have been added to the transaction. if (transactionManager != null) { - transactionManager.maybeAddPartition(tp); + transactionManager.maybeAddPartition(appendCallbacks.topicPartition()); } if (result.batchIsFull || result.newBatchCreated) { - log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); + log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), appendCallbacks.getPartition()); this.sender.wakeup(); } return result.future; @@ -1011,33 +1053,28 @@ private Future doSend(ProducerRecord record, Callback call // for other exceptions throw directly } catch (ApiException e) { log.debug("Exception occurred during message send:", e); - // producer callback will make sure to call both 'callback' and interceptor callback - if (tp == null) { - // set topicPartition to -1 when null - tp = ProducerInterceptors.extractTopicPartition(record); - } - if (callback != null) { + TopicPartition tp = appendCallbacks.topicPartition(); RecordMetadata nullMetadata = new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1); callback.onCompletion(nullMetadata, e); } this.errors.record(); - this.interceptors.onSendError(record, tp, e); + this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e); if (transactionManager != null) { transactionManager.maybeTransitionToErrorState(e); } return new FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); - this.interceptors.onSendError(record, tp, e); + this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e); throw new InterruptException(e); } catch (KafkaException e) { this.errors.record(); - this.interceptors.onSendError(record, tp, e); + this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e); throw e; } catch (Exception e) { // we notify interceptor about all exceptions, since onSend is called before anything else in this method - this.interceptors.onSendError(record, tp, e); + this.interceptors.onSendError(record, appendCallbacks.topicPartition(), e); throw e; } } @@ -1321,21 +1358,33 @@ private ClusterResourceListeners configureClusterResourceListeners(Serializer /** * computes partition for given record. * if the record has partition returns the value otherwise - * calls configured partitioner class to compute the partition. + * if custom partitioner is specified, call it to compute partition + * otherwise try to calculate partition based on key. + * If there is no key or key should be ignored return + * RecordMetadata.UNKNOWN_PARTITION to indicate any partition + * can be used (the partition is then calculated by built-in + * partitioning logic). */ private int partition(ProducerRecord record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { - Integer partition = record.partition(); - if (partition != null) { - return partition; - } + if (record.partition() != null) + return record.partition(); - int customPartition = partitioner.partition( + if (partitioner != null) { + int customPartition = partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); - if (customPartition < 0) { - throw new IllegalArgumentException(String.format( + if (customPartition < 0) { + throw new IllegalArgumentException(String.format( "The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition)); + } + return customPartition; + } + + if (serializedKey != null && !partitionerIgnoreKeys) { + // hash the keyBytes to choose a partition + return Utils.toPositive(Utils.murmur2(serializedKey)) % cluster.partitionsForTopic(record.topic()).size(); + } else { + return RecordMetadata.UNKNOWN_PARTITION; } - return customPartition; } private void throwIfInvalidGroupMetadata(ConsumerGroupMetadata groupMetadata) { @@ -1403,25 +1452,54 @@ public boolean isDone() { } /** - * A callback called when producer request is complete. It in turn calls user-supplied callback (if given) and - * notifies producer interceptors about the request completion. + * Callbacks that are called by the RecordAccumulator append functions: + * - user callback + * - interceptor callbacks + * - partition callback */ - private static class InterceptorCallback implements Callback { + private class AppendCallbacks implements RecordAccumulator.AppendCallbacks { private final Callback userCallback; private final ProducerInterceptors interceptors; - private final TopicPartition tp; + private final ProducerRecord record; + protected int partition = RecordMetadata.UNKNOWN_PARTITION; - private InterceptorCallback(Callback userCallback, ProducerInterceptors interceptors, TopicPartition tp) { + private AppendCallbacks(Callback userCallback, ProducerInterceptors interceptors, ProducerRecord record) { this.userCallback = userCallback; this.interceptors = interceptors; - this.tp = tp; + this.record = record; } + @Override public void onCompletion(RecordMetadata metadata, Exception exception) { - metadata = metadata != null ? metadata : new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1); + if (metadata == null) { + metadata = new RecordMetadata(topicPartition(), -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1); + } this.interceptors.onAcknowledgement(metadata, exception); if (this.userCallback != null) this.userCallback.onCompletion(metadata, exception); } + + @Override + public void setPartition(int partition) { + assert partition != RecordMetadata.UNKNOWN_PARTITION; + this.partition = partition; + + if (log.isTraceEnabled()) { + // Log the message here, because we don't know the partition before that. + log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition); + } + } + + public int getPartition() { + return partition; + } + + public TopicPartition topicPartition() { + if (record == null) + return null; + return partition == RecordMetadata.UNKNOWN_PARTITION + ? ProducerInterceptors.extractTopicPartition(record) + : new TopicPartition(record.topic(), partition); + } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 4fd540dceaa8a..3df73b20a4d6e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -18,7 +18,6 @@ import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.clients.producer.internals.FutureRecordMetadata; import org.apache.kafka.clients.producer.internals.ProduceRequestResult; import org.apache.kafka.common.Cluster; @@ -117,10 +116,24 @@ public MockProducer(final Cluster cluster, * * Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)} */ + @SuppressWarnings("deprecation") public MockProducer(final boolean autoComplete, final Serializer keySerializer, final Serializer valueSerializer) { - this(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer); + this(Cluster.empty(), autoComplete, new org.apache.kafka.clients.producer.internals.DefaultPartitioner(), keySerializer, valueSerializer); + } + + /** + * Create a new mock producer with invented metadata the given autoComplete setting and key\value serializers. + * + * Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer)} new MockProducer(cluster, autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)} + */ + @SuppressWarnings("deprecation") + public MockProducer(final Cluster cluster, + final boolean autoComplete, + final Serializer keySerializer, + final Serializer valueSerializer) { + this(cluster, autoComplete, new org.apache.kafka.clients.producer.internals.DefaultPartitioner(), keySerializer, valueSerializer); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java index 13eaa5aaea9af..eeafc73d662c3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java @@ -44,12 +44,16 @@ public interface Partitioner extends Configurable, Closeable { void close(); /** + * Note this method is only implemented in DefatultPartitioner and UniformStickyPartitioner which + * are now deprecated. See KIP-794 for more info. + * * Notifies the partitioner a new batch is about to be created. When using the sticky partitioner, - * this method can change the chosen sticky partition for the new batch. + * this method can change the chosen sticky partition for the new batch. * @param topic The topic name * @param cluster The current cluster metadata * @param prevPartition The partition previously selected for the record that triggered a new batch */ + @Deprecated default void onNewBatch(String topic, Cluster cluster, int prevPartition) { } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 6c258933af60b..1ba6c27e79d21 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -18,7 +18,6 @@ import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; @@ -93,6 +92,26 @@ public class ProducerConfig extends AbstractConfig { + "This linger.ms setting defaults to 0, which means we'll immediately send out a record even the accumulated " + "batch size is under this batch.size setting."; + /** partitioner.adaptive.partitioning.enable */ + public static final String PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG = "partitioner.adaptive.partitioning.enable"; + private static final String PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_DOC = + "When set to 'true', the producer will try to adapt to broker performance and produce more messages to partitions hosted on faster brokers. " + + "If 'false', producer will try to distribute messages uniformly. Note: this setting has no effect if a custom partitioner is used"; + + /** partitioner.availability.timeout.ms */ + public static final String PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG = "partitioner.availability.timeout.ms"; + private static final String PARTITIONER_AVAILABILITY_TIMEOUT_MS_DOC = + "If a broker cannot process produce requests from a partition for " + PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG + " time, " + + "the partitioner treats that partition as not available. If the value is 0, this logic is disabled. " + + "Note: this setting has no effect if a custom partitioner is used or " + PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG + + " is set to 'false'"; + + /** partitioner.ignore.keys */ + public static final String PARTITIONER_IGNORE_KEYS_CONFIG = "partitioner.ignore.keys"; + private static final String PARTITIONER_IGNORE_KEYS_DOC = "When set to 'true' the producer won't use record keys to choose a partition. " + + "If 'false', producer would choose a partition based on a hash of the key when a key is present. " + + "Note: this setting has no effect if a custom partitioner is used."; + /** acks */ public static final String ACKS_CONFIG = "acks"; private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the " @@ -258,11 +277,11 @@ public class ProducerConfig extends AbstractConfig { public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class"; private static final String PARTITIONER_CLASS_DOC = "A class to use to determine which partition to be send to when produce the records. Available options are:" + "
    " + - "
  • org.apache.kafka.clients.producer.internals.DefaultPartitioner: The default partitioner. " + - "This strategy will try sticking to a partition until the batch is full, or linger.ms is up. It works with the strategy:" + + "
  • If not set, the default partitioning logic is used. " + + "This strategy will try sticking to a partition until " + BATCH_SIZE_CONFIG + " bytes is produced to the partition. It works with the strategy:" + "
      " + "
    • If no partition is specified but a key is present, choose a partition based on a hash of the key
    • " + - "
    • If no partition or key is present, choose the sticky partition that changes when the batch is full, or linger.ms is up.
    • " + + "
    • If no partition or key is present, choose the sticky partition that changes when " + BATCH_SIZE_CONFIG + " bytes are produced to the partition.
    • " + "
    " + "
  • " + "
  • org.apache.kafka.clients.producer.RoundRobinPartitioner: This partitioning strategy is that " + @@ -270,9 +289,6 @@ public class ProducerConfig extends AbstractConfig { "until we run out of partitions and start over again. Note: There's a known issue that will cause uneven distribution when new batch is created. " + "Please check KAFKA-9965 for more detail." + "
  • " + - "
  • org.apache.kafka.clients.producer.UniformStickyPartitioner: This partitioning strategy will " + - "try sticking to a partition(no matter if the 'key' is provided or not) until the batch is full, or linger.ms is up." + - "
  • " + "
" + "

Implementing the org.apache.kafka.clients.producer.Partitioner interface allows you to plug in a custom partitioner."; @@ -333,6 +349,9 @@ public class ProducerConfig extends AbstractConfig { ACKS_DOC) .define(COMPRESSION_TYPE_CONFIG, Type.STRING, CompressionType.NONE.name, in(Utils.enumOptions(CompressionType.class)), Importance.HIGH, COMPRESSION_TYPE_DOC) .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC) + .define(PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG, Type.BOOLEAN, true, Importance.LOW, PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_DOC) + .define(PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.LOW, PARTITIONER_AVAILABILITY_TIMEOUT_MS_DOC) + .define(PARTITIONER_IGNORE_KEYS_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, PARTITIONER_IGNORE_KEYS_DOC) .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC) .define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC) .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC) @@ -417,7 +436,7 @@ public class ProducerConfig extends AbstractConfig { CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC) .define(PARTITIONER_CLASS_CONFIG, Type.CLASS, - DefaultPartitioner.class, + null, Importance.MEDIUM, PARTITIONER_CLASS_DOC) .define(INTERCEPTOR_CLASSES_CONFIG, Type.LIST, diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/UniformStickyPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/UniformStickyPartitioner.java index be11d0b662445..6e4fe420df259 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/UniformStickyPartitioner.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/UniformStickyPartitioner.java @@ -23,6 +23,10 @@ /** + * NOTE this partitioner is deprecated and shouldn't be used. To use default partitioning logic + * remove partitioner.class configuration setting and set partitioner.ignore.keys=true. + * See KIP-794 for more info. + * * The partitioning strategy: *

    *
  • If a partition is specified in the record, use it @@ -33,6 +37,7 @@ * * See KIP-480 for details about sticky partitioning. */ +@Deprecated public class UniformStickyPartitioner implements Partitioner { private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache(); @@ -59,6 +64,7 @@ public void close() {} * If a batch completed for the current sticky partition, change the sticky partition. * Alternately, if no sticky partition has been determined, set one. */ + @SuppressWarnings("deprecation") public void onNewBatch(String topic, Cluster cluster, int prevPartition) { stickyPartitionCache.nextPartition(topic, cluster, prevPartition); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index 210911ada38cf..67cf485f81a55 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -279,7 +279,8 @@ public void deallocate(ByteBuffer buffer, int size) { } public void deallocate(ByteBuffer buffer) { - deallocate(buffer, buffer.capacity()); + if (buffer != null) + deallocate(buffer, buffer.capacity()); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java new file mode 100644 index 0000000000000..1c2d10f3f67d3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +/** + * Built-in default partitioner. Note, that this is just a utility class that is used directly from + * RecordAccumulator, it does not implement the Partitioner interface. + * + * The class keeps track of various bookkeeping information required for adaptive sticky partitioning + * (described in detail in KIP-794). There is one partitioner object per topic. + */ +public class BuiltInPartitioner { + private final Logger log; + private final String topic; + private final int stickyBatchSize; + + private volatile PartitionLoadStats partitionLoadStats = null; + private final AtomicReference stickyPartitionInfo = new AtomicReference<>(); + + // Visible and used for testing only. + static volatile public Supplier mockRandom = null; + + /** + * BuiltInPartitioner constructor. + * + * @param topic The topic + * @param stickyBatchSize How much to produce to partition before switch + */ + public BuiltInPartitioner(LogContext logContext, String topic, int stickyBatchSize) { + this.log = logContext.logger(BuiltInPartitioner.class); + this.topic = topic; + this.stickyBatchSize = stickyBatchSize; + } + + /** + * Calculate the next partition for the topic based on the partition load stats. + */ + private int nextPartition(Cluster cluster) { + int random = mockRandom != null ? mockRandom.get() : Utils.toPositive(ThreadLocalRandom.current().nextInt()); + + // Cache volatile variable in local variable. + PartitionLoadStats partitionLoadStats = this.partitionLoadStats; + int partition; + + if (partitionLoadStats == null) { + // We don't have stats to do adaptive partitioning (or it's disabled), just switch to the next + // partition based on uniform distribution. + List availablePartitions = cluster.availablePartitionsForTopic(topic); + if (availablePartitions.size() > 0) { + partition = availablePartitions.get(random % availablePartitions.size()).partition(); + } else { + // We don't have available partitions, just pick one among all partitions. + List partitions = cluster.partitionsForTopic(topic); + partition = random % partitions.size(); + } + } else { + // Calculate next partition based on load distribution. + // Note that partitions without leader are excluded from the partitionLoadStats. + assert partitionLoadStats.length > 0; + + int[] cumulativeFrequencyTable = partitionLoadStats.cumulativeFrequencyTable; + int weightedRandom = random % cumulativeFrequencyTable[partitionLoadStats.length - 1]; + + // By construction, the cumulative frequency table is sorted, so we can use binary + // search to find the desired index. + int searchResult = Arrays.binarySearch(cumulativeFrequencyTable, 0, partitionLoadStats.length, weightedRandom); + + // binarySearch results the index of the found element, or -(insertion_point) - 1 + // (where insertion_point is the index of the first element greater than the key). + // We need to get the index of the first value that is strictly greater, which + // would be the insertion point, except if we found the element that's equal to + // the searched value (in this case we need to get next). For example, if we have + // 4 5 8 + // and we're looking for 3, then we'd get the insertion_point = 0, and the function + // would return -0 - 1 = -1, by adding 1 we'd get 0. If we're looking for 4, we'd + // get 0, and we need the next one, so adding 1 works here as well. + int partitionIndex = Math.abs(searchResult + 1); + assert partitionIndex < partitionLoadStats.length; + partition = partitionLoadStats.partitionIds[partitionIndex]; + } + + log.trace("Switching to partition {} in topic {}", partition, topic); + return partition; + } + + /** + * Test-only function. When partition load stats are defined, return the end of range for the + * random number. + */ + public int loadStatsRangeEnd() { + assert partitionLoadStats != null; + assert partitionLoadStats.length > 0; + return partitionLoadStats.cumulativeFrequencyTable[partitionLoadStats.length - 1]; + } + + /** + * Peek currently chosen sticky partition. This method works in conjunction with {@link #isPartitionChanged} + * and {@link #updatePartitionInfo}. The workflow is the following: + * + * 1. peekCurrentPartitionInfo is called to know which partition to lock. + * 2. Lock partition's batch queue. + * 3. isPartitionChanged under lock to make sure that nobody raced us. + * 4. Append data to buffer. + * 5. updatePartitionInfo to update produced bytes and maybe switch partition. + * + * It's important that steps 3-5 are under partition's batch queue lock. + * + * @param cluster The cluster information (needed if there is no current partition) + * @return sticky partition info object + */ + StickyPartitionInfo peekCurrentPartitionInfo(Cluster cluster) { + StickyPartitionInfo partitionInfo = stickyPartitionInfo.get(); + if (partitionInfo != null) + return partitionInfo; + + // We're the first to create it. + partitionInfo = new StickyPartitionInfo(nextPartition(cluster)); + if (stickyPartitionInfo.compareAndSet(null, partitionInfo)) + return partitionInfo; + + // Someone has raced us. + return stickyPartitionInfo.get(); + } + + /** + * Check if partition is changed by a concurrent thread. NOTE this function needs to be called under + * the partition's batch queue lock. + * + * @param partitionInfo The sticky partition info object returned by peekCurrentPartitionInfo + * @return true if sticky partition object is changed (race condition) + */ + boolean isPartitionChanged(StickyPartitionInfo partitionInfo) { + // partitionInfo may be null if the caller didn't use built-in partitioner. + return partitionInfo != null && stickyPartitionInfo.get() != partitionInfo; + } + + /** + * Update partition info with the number of bytes appended and maybe switch partition. + * NOTE this function needs to be called under the partition's batch queue lock. + * + * @param partitionInfo The sticky partition info object returned by peekCurrentPartitionInfo + * @param appendedBytes The number of bytes appended to this partition + * @param cluster The cluster information + */ + void updatePartitionInfo(StickyPartitionInfo partitionInfo, int appendedBytes, Cluster cluster) { + // partitionInfo may be null if the caller didn't use built-in partitioner. + if (partitionInfo == null) + return; + + assert partitionInfo == stickyPartitionInfo.get(); + int producedBytes = partitionInfo.producedBytes.addAndGet(appendedBytes); + if (producedBytes >= stickyBatchSize) { + // We've produced enough to this partition, switch to next. + StickyPartitionInfo newPartitionInfo = new StickyPartitionInfo(nextPartition(cluster)); + stickyPartitionInfo.set(newPartitionInfo); + } + } + + /** + * Update partition load stats from the queue sizes of each partition + * NOTE: queueSizes are modified in place to avoid allocations + * + * @param queueSizes The queue sizes, partitions without leaders are excluded + * @param partitionIds The partition ids for the queues, partitions without leaders are excluded + * @param length The logical length of the arrays (could be less): we may eliminate some partitions + * based on latency, but to avoid reallocation of the arrays, we just decrement + * logical length + * Visible for testing + */ + public void updatePartitionLoadStats(int[] queueSizes, int[] partitionIds, int length) { + if (queueSizes == null) { + log.trace("No load stats for topic {}, not using adaptive", topic); + partitionLoadStats = null; + return; + } + assert queueSizes.length == partitionIds.length; + assert length <= queueSizes.length; + + // The queueSizes.length represents the number of all partitions in the topic and if we have + // less than 2 partitions, there is no need to do adaptive logic. + // If partitioner.availability.timeout.ms != 0, then partitions that experience high latencies + // (greater than partitioner.availability.timeout.ms) may be excluded, the length represents + // partitions that are not excluded. If some partitions were excluded, we'd still want to + // go through adaptive logic, even if we have one partition. + // See also RecordAccumulator#partitionReady where the queueSizes are built. + if (length < 1 || queueSizes.length < 2) { + log.trace("The number of partitions is too small: available={}, all={}, not using adaptive for topic {}", + length, queueSizes.length, topic); + partitionLoadStats = null; + return; + } + + // We build cumulative frequency table from the queue sizes in place. At the beginning + // each entry contains queue size, then we invert it (so it represents the frequency) + // and convert to a running sum. Then a uniformly distributed random variable + // in the range [0..last) would map to a partition with weighted probability. + // Example: suppose we have 3 partitions with the corresponding queue sizes: + // 0 3 1 + // Then we can invert them by subtracting the queue size from the max queue size + 1 = 4: + // 4 1 3 + // Then we can convert it into a running sum (next value adds previous value): + // 4 5 8 + // Now if we get a random number in the range [0..8) and find the first value that + // is strictly greater than the number (e.g. for 4 it would be 5), then the index of + // the value is the index of the partition we're looking for. In this example + // random numbers 0, 1, 2, 3 would map to partition[0], 4 would map to partition[1] + // and 5, 6, 7 would map to partition[2]. + + // Calculate max queue size + 1 and check if all sizes are the same. + int maxSizePlus1 = queueSizes[0]; + boolean allEqual = true; + for (int i = 1; i < length; i++) { + if (queueSizes[i] != maxSizePlus1) + allEqual = false; + if (queueSizes[i] > maxSizePlus1) + maxSizePlus1 = queueSizes[i]; + } + ++maxSizePlus1; + + if (allEqual && length == queueSizes.length) { + // No need to have complex probability logic when all queue sizes are the same, + // and we didn't exclude partitions that experience high latencies (greater than + // partitioner.availability.timeout.ms). + log.trace("All queue lengths are the same, not using adaptive for topic {}", topic); + partitionLoadStats = null; + return; + } + + // Invert and fold the queue size, so that they become separator values in the CFT. + queueSizes[0] = maxSizePlus1 - queueSizes[0]; + for (int i = 1; i < length; i++) { + queueSizes[i] = maxSizePlus1 - queueSizes[i] + queueSizes[i - 1]; + } + log.trace("Partition load stats for topic {}: CFT={}, IDs={}, length={}", + topic, queueSizes, partitionIds, length); + partitionLoadStats = new PartitionLoadStats(queueSizes, partitionIds, length); + } + + /** + * Info for the current sticky partition. + */ + public static class StickyPartitionInfo { + private final int index; + private final AtomicInteger producedBytes = new AtomicInteger(); + + StickyPartitionInfo(int index) { + this.index = index; + } + + public int partition() { + return index; + } + } + + /** + * The partition load stats for each topic that are used for adaptive partition distribution. + */ + private final static class PartitionLoadStats { + public final int[] cumulativeFrequencyTable; + public final int[] partitionIds; + public final int length; + public PartitionLoadStats(int[] cumulativeFrequencyTable, int[] partitionIds, int length) { + assert cumulativeFrequencyTable.length == partitionIds.length; + assert length <= cumulativeFrequencyTable.length; + this.cumulativeFrequencyTable = cumulativeFrequencyTable; + this.partitionIds = partitionIds; + this.length = length; + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java index cf765d1eee6aa..2c2e79fb20b04 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java @@ -23,6 +23,9 @@ import java.util.Map; /** + * NOTE this partitioner is deprecated and shouldn't be used. To use default partitioning logic + * remove partitioner.class configuration setting. See KIP-794 for more info. + * * The default partitioning strategy: *
      *
    • If a partition is specified in the record, use it @@ -31,6 +34,7 @@ * * See KIP-480 for details about sticky partitioning. */ +@Deprecated public class DefaultPartitioner implements Partitioner { private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache(); @@ -77,6 +81,7 @@ public void close() {} * If a batch completed for the current sticky partition, change the sticky partition. * Alternately, if no sticky partition has been determined, set one. */ + @SuppressWarnings("deprecation") public void onNewBatch(String topic, Cluster cluster, int prevPartition) { stickyPartitionCache.nextPartition(topic, cluster, prevPartition); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 063d117502f6a..ef1ff7d7d4a76 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -29,8 +29,10 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; + import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.utils.ProducerIdAndEpoch; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; @@ -65,6 +67,7 @@ */ public class RecordAccumulator { + private final LogContext logContext; private final Logger log; private volatile boolean closed; private final AtomicInteger flushesInProgress; @@ -74,10 +77,13 @@ public class RecordAccumulator { private final int lingerMs; private final long retryBackoffMs; private final int deliveryTimeoutMs; + private final long partitionAvailabilityTimeoutMs; // latency threshold for marking partition temporary unavailable + private final boolean enableAdaptivePartitioning; private final BufferPool free; private final Time time; private final ApiVersions apiVersions; - private final ConcurrentMap> batches; + private final ConcurrentMap topicInfoMap = new CopyOnWriteMap<>(); + private final ConcurrentMap nodeStats = new CopyOnWriteMap<>(); private final IncompleteBatches incomplete; // The following variables are only accessed by the sender thread, so we don't need to protect them. private final Set muted; @@ -96,11 +102,15 @@ public class RecordAccumulator { * latency for potentially better throughput due to more batching (and hence fewer, larger requests). * @param retryBackoffMs An artificial delay time to retry the produce request upon receiving an error. This avoids * exhausting all retries in a short period of time. + * @param deliveryTimeoutMs An upper bound on the time to report success or failure on record delivery + * @param partitionerConfig Partitioner config * @param metrics The metrics + * @param metricGrpName The metric group name * @param time The time instance to use * @param apiVersions Request API versions for current connected brokers * @param transactionManager The shared transaction state object which tracks producer IDs, epochs, and sequence * numbers per partition. + * @param bufferPool The buffer pool */ public RecordAccumulator(LogContext logContext, int batchSize, @@ -108,12 +118,14 @@ public RecordAccumulator(LogContext logContext, int lingerMs, long retryBackoffMs, int deliveryTimeoutMs, + PartitionerConfig partitionerConfig, Metrics metrics, String metricGrpName, Time time, ApiVersions apiVersions, TransactionManager transactionManager, BufferPool bufferPool) { + this.logContext = logContext; this.log = logContext.logger(RecordAccumulator.class); this.closed = false; this.flushesInProgress = new AtomicInteger(0); @@ -123,7 +135,8 @@ public RecordAccumulator(LogContext logContext, this.lingerMs = lingerMs; this.retryBackoffMs = retryBackoffMs; this.deliveryTimeoutMs = deliveryTimeoutMs; - this.batches = new CopyOnWriteMap<>(); + this.enableAdaptivePartitioning = partitionerConfig.enableAdaptivePartitioning; + this.partitionAvailabilityTimeoutMs = partitionerConfig.partitionAvailabilityTimeoutMs; this.free = bufferPool; this.incomplete = new IncompleteBatches(); this.muted = new HashSet<>(); @@ -134,6 +147,53 @@ public RecordAccumulator(LogContext logContext, registerMetrics(metrics, metricGrpName); } + /** + * Create a new record accumulator with default partitioner config + * + * @param logContext The log context used for logging + * @param batchSize The size to use when allocating {@link MemoryRecords} instances + * @param compression The compression codec for the records + * @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for + * sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some + * latency for potentially better throughput due to more batching (and hence fewer, larger requests). + * @param retryBackoffMs An artificial delay time to retry the produce request upon receiving an error. This avoids + * exhausting all retries in a short period of time. + * @param deliveryTimeoutMs An upper bound on the time to report success or failure on record delivery + * @param metrics The metrics + * @param metricGrpName The metric group name + * @param time The time instance to use + * @param apiVersions Request API versions for current connected brokers + * @param transactionManager The shared transaction state object which tracks producer IDs, epochs, and sequence + * numbers per partition. + * @param bufferPool The buffer pool + */ + public RecordAccumulator(LogContext logContext, + int batchSize, + CompressionType compression, + int lingerMs, + long retryBackoffMs, + int deliveryTimeoutMs, + Metrics metrics, + String metricGrpName, + Time time, + ApiVersions apiVersions, + TransactionManager transactionManager, + BufferPool bufferPool) { + this(logContext, + batchSize, + compression, + lingerMs, + retryBackoffMs, + deliveryTimeoutMs, + new PartitionerConfig(), + metrics, + metricGrpName, + time, + apiVersions, + transactionManager, + bufferPool); + } + private void registerMetrics(Metrics metrics, String metricGrpName) { MetricName metricName = metrics.metricName("waiting-threads", metricGrpName, "The number of user threads blocked waiting for buffer memory to enqueue their records"); Measurable waitingThreads = new Measurable() { @@ -160,91 +220,162 @@ public double measure(MetricConfig config, long now) { metrics.addMetric(metricName, availableBytes); } + private void setPartition(AppendCallbacks callbacks, int partition) { + if (callbacks != null) + callbacks.setPartition(partition); + } + /** * Add a record to the accumulator, return the append result *

      * The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created *

      * - * @param tp The topic/partition to which this record is being sent + * @param topic The topic to which this record is being sent + * @param partition The partition to which this record is being sent or RecordMetadata.UNKNOWN_PARTITION + * if any partition could be used * @param timestamp The timestamp of the record * @param key The key for the record * @param value The value for the record * @param headers the Headers for the record - * @param callback The user-supplied callback to execute when the request is complete + * @param callbacks The callbacks to execute * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available * @param abortOnNewBatch A boolean that indicates returning before a new batch is created and * running the partitioner's onNewBatch method before trying to append again * @param nowMs The current time, in milliseconds + * @param cluster The cluster metadata */ - public RecordAppendResult append(TopicPartition tp, + public RecordAppendResult append(String topic, + int partition, long timestamp, byte[] key, byte[] value, Header[] headers, - Callback callback, + AppendCallbacks callbacks, long maxTimeToBlock, boolean abortOnNewBatch, - long nowMs) throws InterruptedException { + long nowMs, + Cluster cluster) throws InterruptedException { + TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(logContext, k, batchSize)); + // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); ByteBuffer buffer = null; if (headers == null) headers = Record.EMPTY_HEADERS; try { - // check if we have an in-progress batch - Deque dq = getOrCreateDeque(tp); - synchronized (dq) { - if (closed) - throw new KafkaException("Producer closed while send in progress"); - RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs); - if (appendResult != null) - return appendResult; - } + // Loop to retry in case we encounter partitioner's race conditions. + while (true) { + // If the message doesn't have any partition affinity, so we pick a partition based on the broker + // availability and performance. Note, that here we peek current partition before we hold the + // deque lock, so we'll need to make sure that it's not changed while we were waiting for the + // deque lock. + final BuiltInPartitioner.StickyPartitionInfo partitionInfo; + final int effectivePartition; + if (partition == RecordMetadata.UNKNOWN_PARTITION) { + partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster); + effectivePartition = partitionInfo.partition(); + } else { + partitionInfo = null; + effectivePartition = partition; + } - // we don't have an in-progress record batch try to allocate a new batch - if (abortOnNewBatch) { - // Return a result that will cause another call to append. - return new RecordAppendResult(null, false, false, true); - } + // Now that we know the effective partition, let the caller know. + setPartition(callbacks, effectivePartition); + + // check if we have an in-progress batch + Deque dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>()); + synchronized (dq) { + // After taking the lock, validate that the partition hasn't changed and retry. + if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) { + log.trace("Partition {} for topic {} switched by a concurrent append, retrying", + partitionInfo.partition(), topic); + continue; + } + RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs); + if (appendResult != null) { + topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster); + return appendResult; + } + } - byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); - int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); - log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock); - buffer = free.allocate(size, maxTimeToBlock); + // we don't have an in-progress record batch try to allocate a new batch + if (abortOnNewBatch) { + // Return a result that will cause another call to append. + return new RecordAppendResult(null, false, false, true, 0); + } - // Update the current time in case the buffer allocation blocked above. - nowMs = time.milliseconds(); - synchronized (dq) { - // Need to check if producer is closed again after grabbing the dequeue lock. - if (closed) - throw new KafkaException("Producer closed while send in progress"); + if (buffer == null) { + byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); + int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); + log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock); + buffer = free.allocate(size, maxTimeToBlock); + } - RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs); - if (appendResult != null) { - // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... + synchronized (dq) { + // After taking the lock, validate that the partition hasn't changed and retry. + if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) { + log.trace("Partition {} for topic {} switched by a concurrent append, retrying", + partitionInfo.partition(), topic); + continue; + } + RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer); + // Set buffer to null, so that deallocate doesn't return it back to free pool, since it's used in the batch. + if (appendResult.newBatchCreated) + buffer = null; + topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster); return appendResult; } - - MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); - ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs); - FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers, - callback, nowMs)); - - dq.addLast(batch); - incomplete.add(batch); - - // Don't deallocate this buffer in the finally block as it's being used in the record batch - buffer = null; - return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false); } } finally { - if (buffer != null) - free.deallocate(buffer); + free.deallocate(buffer); appendsInProgress.decrementAndGet(); } } + /** + * Append a new batch to the queue + * + * @param topic The topic + * @param partition The partition (cannot be RecordMetadata.UNKNOWN_PARTITION) + * @param dq The queue + * @param timestamp The timestamp of the record + * @param key The key for the record + * @param value The value for the record + * @param headers the Headers for the record + * @param callbacks The callbacks to execute + * @param buffer The buffer for the new batch + */ + private RecordAppendResult appendNewBatch(String topic, + int partition, + Deque dq, + long timestamp, + byte[] key, + byte[] value, + Header[] headers, + AppendCallbacks callbacks, + ByteBuffer buffer) { + assert partition != RecordMetadata.UNKNOWN_PARTITION; + + // Update the current time in case the buffer allocation blocked above. + long nowMs = time.milliseconds(); + RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs); + if (appendResult != null) { + // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... + return appendResult; + } + + MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, apiVersions.maxUsableProduceMagic()); + ProducerBatch batch = new ProducerBatch(new TopicPartition(topic, partition), recordsBuilder, nowMs); + FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers, + callbacks, nowMs)); + + dq.addLast(batch); + incomplete.add(batch); + + return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false, batch.estimatedSizeInBytes()); + } + private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMagic) { if (transactionManager != null && maxUsableMagic < RecordBatch.MAGIC_VALUE_V2) { throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " + @@ -263,13 +394,18 @@ private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMag */ private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque deque, long nowMs) { + if (closed) + throw new KafkaException("Producer closed while send in progress"); ProducerBatch last = deque.peekLast(); if (last != null) { + int initialBytes = last.estimatedSizeInBytes(); FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs); - if (future == null) + if (future == null) { last.closeForRecordAppends(); - else - return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false); + } else { + int appendedBytes = last.estimatedSizeInBytes() - initialBytes; + return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false, appendedBytes); + } } return null; } @@ -298,19 +434,20 @@ public void maybeUpdateNextBatchExpiryTime(ProducerBatch batch) { */ public List expiredBatches(long now) { List expiredBatches = new ArrayList<>(); - for (Map.Entry> entry : this.batches.entrySet()) { - // expire the batches in the order of sending - Deque deque = entry.getValue(); - synchronized (deque) { - while (!deque.isEmpty()) { - ProducerBatch batch = deque.getFirst(); - if (batch.hasReachedDeliveryTimeout(deliveryTimeoutMs, now)) { - deque.poll(); - batch.abortRecordAppends(); - expiredBatches.add(batch); - } else { - maybeUpdateNextBatchExpiryTime(batch); - break; + for (TopicInfo topicInfo : topicInfoMap.values()) { + for (Deque deque : topicInfo.batches.values()) { + // expire the batches in the order of sending + synchronized (deque) { + while (!deque.isEmpty()) { + ProducerBatch batch = deque.getFirst(); + if (batch.hasReachedDeliveryTimeout(deliveryTimeoutMs, now)) { + deque.poll(); + batch.abortRecordAppends(); + expiredBatches.add(batch); + } else { + maybeUpdateNextBatchExpiryTime(batch); + break; + } } } } @@ -420,38 +557,94 @@ private void insertInSequenceOrder(Deque deque, ProducerBatch bat } /** - * Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable - * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated - * partition batches. - *

      - * A destination node is ready to send data if: - *

        - *
      1. There is at least one partition that is not backing off its send - *
      2. and those partitions are not muted (to prevent reordering if - * {@value org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION} - * is set to one)
      3. - *
      4. and any of the following are true
      5. - *
          - *
        • The record set is full
        • - *
        • The record set has sat in the accumulator for at least lingerMs milliseconds
        • - *
        • The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions - * are immediately considered ready).
        • - *
        • The accumulator has been closed
        • - *
        - *
      + * Add the leader to the ready nodes if the batch is ready + * + * @param nowMs The current time + * @param exhausted 'true' is the buffer pool is exhausted + * @param part The partition + * @param leader The leader for the partition + * @param waitedTimeMs How long batch waited + * @param backingOff Is backing off + * @param full Is batch full + * @param nextReadyCheckDelayMs The delay for next check + * @param readyNodes The set of ready nodes (to be filled in) + * @return The delay for next check */ - public ReadyCheckResult ready(Cluster cluster, long nowMs) { - Set readyNodes = new HashSet<>(); - long nextReadyCheckDelayMs = Long.MAX_VALUE; - Set unknownLeaderTopics = new HashSet<>(); + private long batchReady(long nowMs, boolean exhausted, TopicPartition part, Node leader, + long waitedTimeMs, boolean backingOff, boolean full, + long nextReadyCheckDelayMs, Set readyNodes) { + if (!readyNodes.contains(leader) && !isMuted(part)) { + long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; + boolean expired = waitedTimeMs >= timeToWaitMs; + boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting(); + boolean sendable = full + || expired + || exhausted + || closed + || flushInProgress() + || transactionCompleting; + if (sendable && !backingOff) { + readyNodes.add(leader); + } else { + long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); + // Note that this results in a conservative estimate since an un-sendable partition may have + // a leader that will later be found to have sendable data. However, this is good enough + // since we'll just wake up and then sleep again for the remaining time. + nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); + } + } + return nextReadyCheckDelayMs; + } + + /** + * Iterate over partitions to see which one have batches ready and collect leaders of those partitions + * into the set of ready nodes. If partition has no leader, add the topic to the set of topics with + * no leader. This function also calculates stats for adaptive partitioning. + * + * @param cluster The cluster metadata + * @param nowMs The current time + * @param topic The topic + * @param topicInfo The topic info + * @param nextReadyCheckDelayMs The delay for next check + * @param readyNodes The set of ready nodes (to be filled in) + * @param unknownLeaderTopics The set of topics with no leader (to be filled in) + * @return The delay for next check + */ + private long partitionReady(Cluster cluster, long nowMs, String topic, + TopicInfo topicInfo, + long nextReadyCheckDelayMs, Set readyNodes, Set unknownLeaderTopics) { + ConcurrentMap> batches = topicInfo.batches; + // Collect the queue sizes for available partitions to be used in adaptive partitioning. + int[] queueSizes = null; + int[] partitionIds = null; + if (enableAdaptivePartitioning && batches.size() >= cluster.partitionsForTopic(topic).size()) { + // We don't do adaptive partitioning until we scheduled at least a batch for all + // partitions (i.e. we have the corresponding entries in the batches map), we just + // do uniform. The reason is that we build queue sizes from the batches map, + // and if an entry is missing in the batches map, then adaptive partitioning logic + // won't know about it and won't switch to it. + queueSizes = new int[batches.size()]; + partitionIds = new int[queueSizes.length]; + } + int queueSizesIndex = -1; boolean exhausted = this.free.queued() > 0; - for (Map.Entry> entry : this.batches.entrySet()) { + for (Map.Entry> entry : batches.entrySet()) { + TopicPartition part = new TopicPartition(topic, entry.getKey()); + // Advance queueSizesIndex so that we properly index available + // partitions. Do it here so that it's done for all code paths. + Node leader = cluster.leaderFor(part); + if (leader != null && queueSizes != null) { + ++queueSizesIndex; + assert queueSizesIndex < queueSizes.length; + partitionIds[queueSizesIndex] = part.partition(); + } + Deque deque = entry.getValue(); - final ProducerBatch batch; final long waitedTimeMs; final boolean backingOff; + final int dequeSize; final boolean full; // This loop is especially hot with large partition counts. @@ -463,43 +656,81 @@ public ReadyCheckResult ready(Cluster cluster, long nowMs) { synchronized (deque) { // Deques are often empty in this path, esp with large partition counts, // so we exit early if we can. - batch = deque.peekFirst(); + ProducerBatch batch = deque.peekFirst(); if (batch == null) { continue; } waitedTimeMs = batch.waitedTimeMs(nowMs); backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs; - full = deque.size() > 1 || batch.isFull(); + dequeSize = deque.size(); + full = dequeSize > 1 || batch.isFull(); } - TopicPartition part = entry.getKey(); - Node leader = cluster.leaderFor(part); if (leader == null) { // This is a partition for which leader is not known, but messages are available to send. // Note that entries are currently not removed from batches when deque is empty. unknownLeaderTopics.add(part.topic()); - } else if (!readyNodes.contains(leader) && !isMuted(part)) { - long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; - boolean expired = waitedTimeMs >= timeToWaitMs; - boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting(); - boolean sendable = full - || expired - || exhausted - || closed - || flushInProgress() - || transactionCompleting; - if (sendable && !backingOff) { - readyNodes.add(leader); - } else { - long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); - // Note that this results in a conservative estimate since an un-sendable partition may have - // a leader that will later be found to have sendable data. However, this is good enough - // since we'll just wake up and then sleep again for the remaining time. - nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); + } else { + if (queueSizes != null) + queueSizes[queueSizesIndex] = dequeSize; + if (partitionAvailabilityTimeoutMs > 0) { + // Check if we want to exclude the partition from the list of available partitions + // if the broker hasn't responded for some time. + NodeLatencyStats nodeLatencyStats = nodeStats.get(leader.id()); + if (nodeLatencyStats != null) { + // NOTE: there is no synchronization between reading metrics, + // so we read ready time first to avoid accidentally marking partition + // unavailable if we read while the metrics are being updated. + long readyTimeMs = nodeLatencyStats.readyTimeMs; + if (readyTimeMs - nodeLatencyStats.drainTimeMs > partitionAvailabilityTimeoutMs) + --queueSizesIndex; + } } + + nextReadyCheckDelayMs = batchReady(nowMs, exhausted, part, leader, waitedTimeMs, backingOff, + full, nextReadyCheckDelayMs, readyNodes); } } + + // We've collected the queue sizes for partitions of this topic, now we can calculate + // load stats. NOTE: the stats are calculated in place, modifying the + // queueSizes array. + topicInfo.builtInPartitioner.updatePartitionLoadStats(queueSizes, partitionIds, queueSizesIndex + 1); + return nextReadyCheckDelayMs; + } + + /** + * Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable + * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated + * partition batches. + *

      + * A destination node is ready to send data if: + *

        + *
      1. There is at least one partition that is not backing off its send + *
      2. and those partitions are not muted (to prevent reordering if + * {@value org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION} + * is set to one)
      3. + *
      4. and any of the following are true
      5. + *
          + *
        • The record set is full
        • + *
        • The record set has sat in the accumulator for at least lingerMs milliseconds
        • + *
        • The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions + * are immediately considered ready).
        • + *
        • The accumulator has been closed
        • + *
        + *
      + */ + public ReadyCheckResult ready(Cluster cluster, long nowMs) { + Set readyNodes = new HashSet<>(); + long nextReadyCheckDelayMs = Long.MAX_VALUE; + Set unknownLeaderTopics = new HashSet<>(); + // Go topic by topic so that we can get queue sizes for partitions in a topic and calculate + // cumulative frequency table (used in partitioner). + for (Map.Entry topicInfoEntry : this.topicInfoMap.entrySet()) { + final String topic = topicInfoEntry.getKey(); + nextReadyCheckDelayMs = partitionReady(cluster, nowMs, topic, topicInfoEntry.getValue(), nextReadyCheckDelayMs, readyNodes, unknownLeaderTopics); + } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics); } @@ -507,11 +738,12 @@ public ReadyCheckResult ready(Cluster cluster, long nowMs) { * Check whether there are any batches which haven't been drained */ public boolean hasUndrained() { - for (Map.Entry> entry : this.batches.entrySet()) { - Deque deque = entry.getValue(); - synchronized (deque) { - if (!deque.isEmpty()) - return true; + for (TopicInfo topicInfo : topicInfoMap.values()) { + for (Deque deque : topicInfo.batches.values()) { + synchronized (deque) { + if (!deque.isEmpty()) + return true; + } } } return false; @@ -669,6 +901,36 @@ public Map> drain(Cluster cluster, Set nodes, return batches; } + public void updateNodeLatencyStats(Integer nodeId, long nowMs, boolean canDrain) { + // Don't bother with updating stats if the feature is turned off. + if (partitionAvailabilityTimeoutMs <= 0) + return; + + // When the sender gets a node (returned by the ready() function) that has data to send + // but the node is not ready (and so we cannot drain the data), we only update the + // ready time, then the difference would reflect for how long a node wasn't ready + // to send the data. Then we can temporarily remove partitions that are handled by the + // node from the list of available partitions so that the partitioner wouldn't pick + // this partition. + // NOTE: there is no synchronization for metric updates, so drainTimeMs is updated + // first to avoid accidentally marking a partition unavailable if the reader gets + // values between updates. + NodeLatencyStats nodeLatencyStats = nodeStats.computeIfAbsent(nodeId, id -> new NodeLatencyStats(nowMs)); + if (canDrain) + nodeLatencyStats.drainTimeMs = nowMs; + nodeLatencyStats.readyTimeMs = nowMs; + } + + /* Visible for testing */ + public NodeLatencyStats getNodeLatencyStats(Integer nodeId) { + return nodeStats.get(nodeId); + } + + /* Visible for testing */ + public BuiltInPartitioner getBuiltInPartitioner(String topic) { + return topicInfoMap.get(topic).builtInPartitioner; + } + /** * The earliest absolute time a batch will expire (in milliseconds) */ @@ -676,23 +938,20 @@ public long nextExpiryTimeMs() { return this.nextBatchExpiryTimeMs; } - private Deque getDeque(TopicPartition tp) { - return batches.get(tp); + /* Visible for testing */ + public Deque getDeque(TopicPartition tp) { + TopicInfo topicInfo = topicInfoMap.get(tp.topic()); + if (topicInfo == null) + return null; + return topicInfo.batches.get(tp.partition()); } /** * Get the deque for the given topic-partition, creating it if necessary. */ private Deque getOrCreateDeque(TopicPartition tp) { - Deque d = this.batches.get(tp); - if (d != null) - return d; - d = new ArrayDeque<>(); - Deque previous = this.batches.putIfAbsent(tp, d); - if (previous == null) - return d; - else - return previous; + TopicInfo topicInfo = topicInfoMap.computeIfAbsent(tp.topic(), k -> new TopicInfo(logContext, k, batchSize)); + return topicInfo.batches.computeIfAbsent(tp.partition(), k -> new ArrayDeque<>()); } /** @@ -722,11 +981,6 @@ boolean flushInProgress() { return flushesInProgress.get() > 0; } - /* Visible for testing */ - Map> batches() { - return Collections.unmodifiableMap(batches); - } - /** * Initiate the flushing of data from the accumulator...this makes all requests immediately ready */ @@ -780,7 +1034,7 @@ public void abortIncompleteBatches() { // flag set. We need to do the last abort after no thread was appending in case there was a new // batch appended by the last appending thread. abortBatches(); - this.batches.clear(); + this.topicInfoMap.clear(); } /** @@ -842,6 +1096,32 @@ public void close() { this.free.close(); } + /** + * Partitioner config for built-in partitioner + */ + public static final class PartitionerConfig { + private final boolean enableAdaptivePartitioning; + private final long partitionAvailabilityTimeoutMs; + + /** + * Partitioner config + * + * @param enableAdaptivePartitioning If it's true, partition switching adapts to broker load, otherwise partition + * switching is random. + * @param partitionAvailabilityTimeoutMs If a broker cannot process produce requests from a partition + * for the specified time, the partition is treated by the partitioner as not available. + * If the timeout is 0, this logic is disabled. + */ + public PartitionerConfig(boolean enableAdaptivePartitioning, long partitionAvailabilityTimeoutMs) { + this.enableAdaptivePartitioning = enableAdaptivePartitioning; + this.partitionAvailabilityTimeoutMs = partitionAvailabilityTimeoutMs; + } + + public PartitionerConfig() { + this(false, 0); + } + } + /* * Metadata about a record just appended to the record accumulator */ @@ -850,15 +1130,32 @@ public final static class RecordAppendResult { public final boolean batchIsFull; public final boolean newBatchCreated; public final boolean abortForNewBatch; + public final int appendedBytes; - public RecordAppendResult(FutureRecordMetadata future, boolean batchIsFull, boolean newBatchCreated, boolean abortForNewBatch) { + public RecordAppendResult(FutureRecordMetadata future, + boolean batchIsFull, + boolean newBatchCreated, + boolean abortForNewBatch, + int appendedBytes) { this.future = future; this.batchIsFull = batchIsFull; this.newBatchCreated = newBatchCreated; this.abortForNewBatch = abortForNewBatch; + this.appendedBytes = appendedBytes; } } + /* + * The callbacks passed into append + */ + public interface AppendCallbacks extends Callback { + /** + * Called to set partition (when append is called, partition may not be calculated yet). + * @param partition The partition + */ + void setPartition(int partition); + } + /* * The set of nodes that have at least one complete record batch in the accumulator */ @@ -873,4 +1170,30 @@ public ReadyCheckResult(Set readyNodes, long nextReadyCheckDelayMs, Set> batches = new CopyOnWriteMap<>(); + public final BuiltInPartitioner builtInPartitioner; + + public TopicInfo(LogContext logContext, String topic, int stickyBatchSize) { + builtInPartitioner = new BuiltInPartitioner(logContext, topic, stickyBatchSize); + } + } + + /** + * Node latency stats for each node that are used for adaptive partition distribution + * Visible for testing + */ + public final static class NodeLatencyStats { + volatile public long readyTimeMs; // last time the node had batches ready to send + volatile public long drainTimeMs; // last time the node was able to drain batches + + NodeLatencyStats(long nowMs) { + readyTimeMs = nowMs; + drainTimeMs = nowMs; + } + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 2f55e62912d76..55eb6c7be2f54 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -352,8 +352,16 @@ private long sendProducerData(long now) { while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { + // Update just the readyTimeMs of the latency stats, so that it moves forward + // every time the batch is ready (then the difference between readyTimeMs and + // drainTimeMs would represent how long data is waiting for the node). + this.accumulator.updateNodeLatencyStats(node.id(), now, false); iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now)); + } else { + // Update both readyTimeMs and drainTimeMs, this would "reset" the node + // latency. + this.accumulator.updateNodeLatencyStats(node.id(), now, true); } } diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 3e3faeaadf794..af71e3ecd33c5 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -1038,7 +1038,7 @@ public static void closeAllQuietly(AtomicReference firstException, St * * Note: changing this method in the future will possibly cause partition selection not to be * compatible with the existing messages already placed on a partition since it is used - * in producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} + * in producer's partition selection logic {@link org.apache.kafka.clients.producer.KafkaProducer} * * @param number a given number * @return a positive number. diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index ce01620803ba5..dc7db382a6229 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -1990,6 +1990,7 @@ public void testPartitionAddedToTransaction() throws Exception { } } + @SuppressWarnings("deprecation") @Test public void testPartitionAddedToTransactionAfterFullBatchRetry() throws Exception { StringSerializer serializer = new StringSerializer(); @@ -2053,21 +2054,28 @@ private FutureRecordMetadata expectAppend( )).thenReturn(initialSelectedPartition.partition()); when(ctx.accumulator.append( - eq(initialSelectedPartition), - eq(timestamp), - eq(serializedKey), - eq(serializedValue), - eq(Record.EMPTY_HEADERS), - any(Callback.class), + eq(initialSelectedPartition.topic()), // 0 + eq(initialSelectedPartition.partition()), // 1 + eq(timestamp), // 2 + eq(serializedKey), // 3 + eq(serializedValue), // 4 + eq(Record.EMPTY_HEADERS), // 5 + any(RecordAccumulator.AppendCallbacks.class), // 6 <-- anyLong(), eq(true), - anyLong() - )).thenReturn(new RecordAccumulator.RecordAppendResult( - futureRecordMetadata, - false, - false, - false - )); + anyLong(), + any() + )).thenAnswer(invocation -> { + RecordAccumulator.AppendCallbacks callbacks = + (RecordAccumulator.AppendCallbacks) invocation.getArguments()[6]; + callbacks.setPartition(initialSelectedPartition.partition()); + return new RecordAccumulator.RecordAppendResult( + futureRecordMetadata, + false, + false, + false, + 0); + }); return futureRecordMetadata; } @@ -2104,38 +2112,52 @@ private FutureRecordMetadata expectAppendWithAbortForNewBatch( .thenReturn(retrySelectedPartition.partition()); when(ctx.accumulator.append( - eq(initialSelectedPartition), - eq(timestamp), - eq(serializedKey), - eq(serializedValue), - eq(Record.EMPTY_HEADERS), - any(Callback.class), + eq(initialSelectedPartition.topic()), // 0 + eq(initialSelectedPartition.partition()), // 1 + eq(timestamp), // 2 + eq(serializedKey), // 3 + eq(serializedValue), // 4 + eq(Record.EMPTY_HEADERS), // 5 + any(RecordAccumulator.AppendCallbacks.class), // 6 <-- anyLong(), eq(true), // abortOnNewBatch - anyLong() - )).thenReturn(new RecordAccumulator.RecordAppendResult( - null, - false, - false, - true - )); + anyLong(), + any() + )).thenAnswer(invocation -> { + RecordAccumulator.AppendCallbacks callbacks = + (RecordAccumulator.AppendCallbacks) invocation.getArguments()[6]; + callbacks.setPartition(initialSelectedPartition.partition()); + return new RecordAccumulator.RecordAppendResult( + null, + false, + false, + true, + 0); + }); when(ctx.accumulator.append( - eq(retrySelectedPartition), - eq(timestamp), - eq(serializedKey), - eq(serializedValue), - eq(Record.EMPTY_HEADERS), - any(Callback.class), + eq(retrySelectedPartition.topic()), // 0 + eq(retrySelectedPartition.partition()), // 1 + eq(timestamp), // 2 + eq(serializedKey), // 3 + eq(serializedValue), // 4 + eq(Record.EMPTY_HEADERS), // 5 + any(RecordAccumulator.AppendCallbacks.class), // 6 <-- anyLong(), eq(false), // abortOnNewBatch - anyLong() - )).thenReturn(new RecordAccumulator.RecordAppendResult( - futureRecordMetadata, - false, - true, - false - )); + anyLong(), + any() + )).thenAnswer(invocation -> { + RecordAccumulator.AppendCallbacks callbacks = + (RecordAccumulator.AppendCallbacks) invocation.getArguments()[6]; + callbacks.setPartition(retrySelectedPartition.partition()); + return new RecordAccumulator.RecordAppendResult( + futureRecordMetadata, + false, + true, + false, + 0); + }); return futureRecordMetadata; } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java index ca14ab0fda3da..8c7884bd77cdc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java @@ -18,7 +18,6 @@ import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; @@ -85,7 +84,7 @@ public void testPartitioner() throws Exception { PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, null); Cluster cluster = new Cluster(null, new ArrayList<>(0), asList(partitionInfo0, partitionInfo1), Collections.emptySet(), Collections.emptySet()); - MockProducer producer = new MockProducer<>(cluster, true, new DefaultPartitioner(), new StringSerializer(), new StringSerializer()); + MockProducer producer = new MockProducer<>(cluster, true, new StringSerializer(), new StringSerializer()); ProducerRecord record = new ProducerRecord<>(topic, "key", "value"); Future metadata = producer.send(record); assertEquals(1, metadata.get().partition(), "Partition should be correct"); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/UniformStickyPartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/UniformStickyPartitionerTest.java index 0014bf8daaeef..f5484071717dc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/UniformStickyPartitionerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/UniformStickyPartitionerTest.java @@ -39,6 +39,7 @@ public class UniformStickyPartitionerTest { private final static String TOPIC_A = "TOPIC_A"; private final static String TOPIC_B = "TOPIC_B"; + @SuppressWarnings("deprecation") @Test public void testRoundRobinWithUnavailablePartitions() { // Intentionally make the partition list not in partition order to test the edge @@ -77,6 +78,7 @@ public void testRoundRobinWithUnavailablePartitions() { assertEquals(countForPart0, countForPart2, "The distribution between two available partitions should be even"); } + @SuppressWarnings("deprecation") @Test public void testRoundRobinWithKeyBytes() throws InterruptedException { List allPartitions = asList(new PartitionInfo(TOPIC_A, 0, NODES[0], NODES, NODES), @@ -140,7 +142,8 @@ public void testRoundRobinWithKeyBytes() throws InterruptedException { assertEquals(30, partitionCount.get(oldPart).intValue()); assertEquals(60, partitionCount.get(newPart).intValue()); } - + + @SuppressWarnings("deprecation") @Test public void testRoundRobinWithNullKeyBytes() throws InterruptedException { List allPartitions = asList(new PartitionInfo(TOPIC_A, 0, NODES[0], NODES, NODES), diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BuiltInPartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BuiltInPartitionerTest.java new file mode 100644 index 0000000000000..734aedc483ad1 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BuiltInPartitionerTest.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.utils.LogContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Arrays.asList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class BuiltInPartitionerTest { + private final static Node[] NODES = new Node[] { + new Node(0, "localhost", 99), + new Node(1, "localhost", 100), + new Node(2, "localhost", 101), + new Node(11, "localhost", 102) + }; + final static String TOPIC_A = "topicA"; + final static String TOPIC_B = "topicB"; + final static String TOPIC_C = "topicC"; + final LogContext logContext = new LogContext(); + + @AfterEach + public void tearDown() { + BuiltInPartitioner.mockRandom = null; + } + + @Test + public void testStickyPartitioning() { + List allPartitions = asList(new PartitionInfo(TOPIC_A, 0, NODES[0], NODES, NODES), + new PartitionInfo(TOPIC_A, 1, NODES[1], NODES, NODES), + new PartitionInfo(TOPIC_A, 2, NODES[2], NODES, NODES), + new PartitionInfo(TOPIC_B, 0, NODES[0], NODES, NODES) + ); + Cluster testCluster = new Cluster("clusterId", asList(NODES), allPartitions, + Collections.emptySet(), Collections.emptySet()); + + // Create partitions with "sticky" batch size to accommodate 3 records. + BuiltInPartitioner builtInPartitionerA = new BuiltInPartitioner(logContext, TOPIC_A, 3); + + // Test the partition is not switched until sticky batch size is reached. + // Mock random number generator with just sequential integer. + AtomicInteger mockRandom = new AtomicInteger(); + BuiltInPartitioner.mockRandom = () -> mockRandom.getAndAdd(1); + + BuiltInPartitioner.StickyPartitionInfo partitionInfo = builtInPartitionerA.peekCurrentPartitionInfo(testCluster); + int partA = partitionInfo.partition(); + builtInPartitionerA.updatePartitionInfo(partitionInfo, 1, testCluster); + + partitionInfo = builtInPartitionerA.peekCurrentPartitionInfo(testCluster); + assertEquals(partA, partitionInfo.partition()); + builtInPartitionerA.updatePartitionInfo(partitionInfo, 1, testCluster); + + partitionInfo = builtInPartitionerA.peekCurrentPartitionInfo(testCluster); + assertEquals(partA, partitionInfo.partition()); + builtInPartitionerA.updatePartitionInfo(partitionInfo, 1, testCluster); + + // After producing 3 records, partition must've switched. + assertNotEquals(partA, builtInPartitionerA.peekCurrentPartitionInfo(testCluster).partition()); + + // Check that switching works even when there is one partition. + BuiltInPartitioner builtInPartitionerB = new BuiltInPartitioner(logContext, TOPIC_B, 1); + for (int c = 10; c-- > 0; ) { + partitionInfo = builtInPartitionerB.peekCurrentPartitionInfo(testCluster); + assertEquals(0, partitionInfo.partition()); + builtInPartitionerB.updatePartitionInfo(partitionInfo, 1, testCluster); + } + } + + @Test + public void unavailablePartitionsTest() { + // Partition 1 in topic A, partition 0 in topic B and partition 0 in topic C are unavailable partitions. + List allPartitions = asList(new PartitionInfo(TOPIC_A, 0, NODES[0], NODES, NODES), + new PartitionInfo(TOPIC_A, 1, null, NODES, NODES), + new PartitionInfo(TOPIC_A, 2, NODES[2], NODES, NODES), + new PartitionInfo(TOPIC_B, 0, null, NODES, NODES), + new PartitionInfo(TOPIC_B, 1, NODES[0], NODES, NODES), + new PartitionInfo(TOPIC_C, 0, null, NODES, NODES) + ); + + Cluster testCluster = new Cluster("clusterId", asList(NODES[0], NODES[1], NODES[2]), allPartitions, + Collections.emptySet(), Collections.emptySet()); + + // Create partitions with "sticky" batch size to accommodate 1 record. + BuiltInPartitioner builtInPartitionerA = new BuiltInPartitioner(logContext, TOPIC_A, 1); + + // Assure we never choose partition 1 because it is unavailable. + BuiltInPartitioner.StickyPartitionInfo partitionInfo = builtInPartitionerA.peekCurrentPartitionInfo(testCluster); + int partA = partitionInfo.partition(); + builtInPartitionerA.updatePartitionInfo(partitionInfo, 1, testCluster); + + boolean foundAnotherPartA = false; + assertNotEquals(1, partA); + for (int aPartitions = 0; aPartitions < 100; aPartitions++) { + partitionInfo = builtInPartitionerA.peekCurrentPartitionInfo(testCluster); + int anotherPartA = partitionInfo.partition(); + builtInPartitionerA.updatePartitionInfo(partitionInfo, 1, testCluster); + + assertNotEquals(1, anotherPartA); + foundAnotherPartA = foundAnotherPartA || anotherPartA != partA; + } + assertTrue(foundAnotherPartA, "Expected to find partition other than " + partA); + + BuiltInPartitioner builtInPartitionerB = new BuiltInPartitioner(logContext, TOPIC_B, 1); + // Assure we always choose partition 1 for topic B. + partitionInfo = builtInPartitionerB.peekCurrentPartitionInfo(testCluster); + int partB = partitionInfo.partition(); + builtInPartitionerB.updatePartitionInfo(partitionInfo, 1, testCluster); + + assertEquals(1, partB); + for (int bPartitions = 0; bPartitions < 100; bPartitions++) { + partitionInfo = builtInPartitionerB.peekCurrentPartitionInfo(testCluster); + assertEquals(1, partitionInfo.partition()); + builtInPartitionerB.updatePartitionInfo(partitionInfo, 1, testCluster); + } + + // Assure that we still choose the partition when there are no partitions available. + BuiltInPartitioner builtInPartitionerC = new BuiltInPartitioner(logContext, TOPIC_C, 1); + partitionInfo = builtInPartitionerC.peekCurrentPartitionInfo(testCluster); + int partC = partitionInfo.partition(); + builtInPartitionerC.updatePartitionInfo(partitionInfo, 1, testCluster); + assertEquals(0, partC); + + partitionInfo = builtInPartitionerC.peekCurrentPartitionInfo(testCluster); + partC = partitionInfo.partition(); + assertEquals(0, partC); + } + + @Test + public void adaptivePartitionsTest() { + // Mock random number generator with just sequential integer. + AtomicInteger mockRandom = new AtomicInteger(); + BuiltInPartitioner.mockRandom = () -> mockRandom.getAndAdd(1); + + BuiltInPartitioner builtInPartitioner = new BuiltInPartitioner(logContext, TOPIC_A, 1); + + // Simulate partition queue sizes. + int[] queueSizes = {5, 0, 3, 0, 1}; + int[] partitionIds = new int[queueSizes.length]; + int[] expectedFrequencies = new int[queueSizes.length]; + List allPartitions = new ArrayList<>(); + for (int i = 0; i < partitionIds.length; i++) { + partitionIds[i] = i; + allPartitions.add(new PartitionInfo(TOPIC_A, i, NODES[i % NODES.length], NODES, NODES)); + expectedFrequencies[i] = 6 - queueSizes[i]; // 6 is max(queueSizes) + 1 + } + + builtInPartitioner.updatePartitionLoadStats(queueSizes, partitionIds, queueSizes.length); + + Cluster testCluster = new Cluster("clusterId", asList(NODES), allPartitions, + Collections.emptySet(), Collections.emptySet()); + + // Issue a certain number of partition calls to validate that the partitions would be + // distributed with frequencies that are reciprocal to the queue sizes. The number of + // iterations is defined by the last element of the cumulative frequency table which is + // the sum of all frequencies. We do 2 cycles, just so it's more than 1. + final int numberOfCycles = 2; + int numberOfIterations = builtInPartitioner.loadStatsRangeEnd() * numberOfCycles; + int[] frequencies = new int[queueSizes.length]; + + for (int i = 0; i < numberOfIterations; i++) { + BuiltInPartitioner.StickyPartitionInfo partitionInfo = builtInPartitioner.peekCurrentPartitionInfo(testCluster); + ++frequencies[partitionInfo.partition()]; + builtInPartitioner.updatePartitionInfo(partitionInfo, 1, testCluster); + } + + // Verify that frequencies are reciprocal of queue sizes. + for (int i = 0; i < frequencies.length; i++) { + assertEquals(expectedFrequencies[i] * numberOfCycles, frequencies[i], + "Partition " + i + " was chosen " + frequencies[i] + " times"); + } + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java index a55e5d2220d22..e250748643a43 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java @@ -42,6 +42,7 @@ public class DefaultPartitionerTest { @Test public void testKeyPartitionIsStable() { + @SuppressWarnings("deprecation") final Partitioner partitioner = new DefaultPartitioner(); final Cluster cluster = new Cluster("clusterId", asList(NODES), PARTITIONS, Collections.emptySet(), Collections.emptySet()); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index d5b89ea8645df..cf991de338bdd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -114,18 +114,18 @@ public void testDrainBatches() throws Exception { Collections.emptySet(), Collections.emptySet()); // initial data - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); - accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); - accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); - accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); + accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); + accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); + accum.append(topic, partition4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the max request size is full after the first batch drained Map> batches1 = accum.drain(cluster, new HashSet(Arrays.asList(node1, node2)), (int) batchSize, 0); verifyTopicPartitionInBatches(batches1, tp1, tp3); // add record for tp1, tp3 - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); - accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); + accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the max request size is full after the first batch drained // The drain index should start from next topic partition, that is, node1 => tp2, node2 => tp4 @@ -137,18 +137,18 @@ public void testDrainBatches() throws Exception { verifyTopicPartitionInBatches(batches3, tp1, tp3); // add record for tp2, tp3, tp4 and mute the tp4 - accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); - accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); - accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); + accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); + accum.append(topic, partition4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.mutePartition(tp4); // drain batches from 2 nodes: node1 => tp2, node2 => tp3 (because tp4 is muted) Map> batches4 = accum.drain(cluster, new HashSet(Arrays.asList(node1, node2)), (int) batchSize, 0); verifyTopicPartitionInBatches(batches4, tp2, tp3); // add record for tp1, tp2, tp3, and unmute tp4 - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); - accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); - accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); + accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); + accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.unmutePartition(tp4); // set maxSize as a max value, so that the all partitions in 2 nodes should be drained: node1 => [tp1, tp2], node2 => [tp3, tp4] Map> batches5 = accum.drain(cluster, new HashSet(Arrays.asList(node1, node2)), Integer.MAX_VALUE, 0); @@ -182,8 +182,8 @@ public void testFull() throws Exception { int appends = expectedNumAppends(batchSize); for (int i = 0; i < appends; i++) { // append to the first batch - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); - Deque partitionBatches = accum.batches().get(tp1); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); + Deque partitionBatches = accum.getDeque(tp1); assertEquals(1, partitionBatches.size()); ProducerBatch batch = partitionBatches.peekFirst(); @@ -193,8 +193,8 @@ public void testFull() throws Exception { // this append doesn't fit in the first batch, so a new batch is created and the first batch is closed - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); - Deque partitionBatches = accum.batches().get(tp1); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); + Deque partitionBatches = accum.getDeque(tp1); assertEquals(2, partitionBatches.size()); Iterator partitionBatchesIterator = partitionBatches.iterator(); assertTrue(partitionBatchesIterator.next().isWritable()); @@ -228,10 +228,10 @@ private void testAppendLarge(CompressionType compressionType) throws Exception { byte[] value = new byte[2 * batchSize]; RecordAccumulator accum = createTestRecordAccumulator( batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0); - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); assertEquals(Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes, "Our partition's leader should be ready"); - Deque batches = accum.batches().get(tp1); + Deque batches = accum.getDeque(tp1); assertEquals(1, batches.size()); ProducerBatch producerBatch = batches.peek(); List recordBatches = TestUtils.toList(producerBatch.records().batches()); @@ -266,10 +266,10 @@ private void testAppendLargeOldMessageFormat(CompressionType compressionType) th RecordAccumulator accum = createTestRecordAccumulator( batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0); - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); assertEquals(Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes, "Our partition's leader should be ready"); - Deque batches = accum.batches().get(tp1); + Deque batches = accum.getDeque(tp1); assertEquals(1, batches.size()); ProducerBatch producerBatch = batches.peek(); List recordBatches = TestUtils.toList(producerBatch.records().batches()); @@ -290,7 +290,7 @@ public void testLinger() throws Exception { int lingerMs = 10; RecordAccumulator accum = createTestRecordAccumulator( 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, lingerMs); - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); assertEquals(0, accum.ready(cluster, time.milliseconds()).readyNodes.size(), "No partitions should be ready"); time.sleep(10); assertEquals(Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes, "Our partition's leader should be ready"); @@ -313,7 +313,7 @@ public void testPartialDrain() throws Exception { List partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { for (int i = 0; i < appends; i++) - accum.append(tp, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp.topic(), tp.partition(), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); } assertEquals(Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes, "Partition's leader should be ready"); @@ -335,7 +335,7 @@ public void testStressfulSituation() throws Exception { public void run() { for (int i = 0; i < msgs; i++) { try { - accum.append(new TopicPartition(topic, i % numParts), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(topic, i % numParts, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); } catch (Exception e) { e.printStackTrace(); } @@ -379,7 +379,7 @@ public void testNextReadyCheckDelay() throws Exception { // Partition on node1 only for (int i = 0; i < appends; i++) - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals(0, result.readyNodes.size(), "No nodes should be ready."); assertEquals(lingerMs, result.nextReadyCheckDelayMs, "Next check time should be the linger time"); @@ -388,14 +388,14 @@ public void testNextReadyCheckDelay() throws Exception { // Add partition on node2 only for (int i = 0; i < appends; i++) - accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); result = accum.ready(cluster, time.milliseconds()); assertEquals(0, result.readyNodes.size(), "No nodes should be ready."); assertEquals(lingerMs / 2, result.nextReadyCheckDelayMs, "Next check time should be defined by node1, half remaining linger time"); // Add data for another partition on node1, enough to make data sendable immediately for (int i = 0; i < appends + 1; i++) - accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); result = accum.ready(cluster, time.milliseconds()); assertEquals(Collections.singleton(node1), result.readyNodes, "Node1 should be ready"); // Note this can actually be < linger time because it may use delays from partitions that aren't sendable @@ -417,7 +417,7 @@ CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs, metrics, metr new BufferPool(totalSize, batchSize, metrics, time, metricGrpName)); long now = time.milliseconds(); - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, now + lingerMs + 1); assertEquals(Collections.singleton(node1), result.readyNodes, "Node1 should be ready"); Map> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1); @@ -429,7 +429,7 @@ CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs, metrics, metr accum.reenqueue(batches.get(0).get(0), now); // Put message for partition 1 into accumulator - accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); result = accum.ready(cluster, now + lingerMs + 1); assertEquals(Collections.singleton(node1), result.readyNodes, "Node1 should be ready"); @@ -455,7 +455,7 @@ public void testFlush() throws Exception { 4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, lingerMs); for (int i = 0; i < 100; i++) { - accum.append(new TopicPartition(topic, i % 3), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(topic, i % 3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); assertTrue(accum.hasIncomplete()); } RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); @@ -493,7 +493,7 @@ public void run() { public void testAwaitFlushComplete() throws Exception { RecordAccumulator accum = createTestRecordAccumulator( 4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, Integer.MAX_VALUE); - accum.append(new TopicPartition(topic, 0), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(topic, 0, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.beginFlush(); assertTrue(accum.flushInProgress()); @@ -514,15 +514,19 @@ public void testAbortIncompleteBatches() throws Exception { final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0); final RecordAccumulator accum = createTestRecordAccumulator( 128 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, lingerMs); - class TestCallback implements Callback { + class TestCallback implements RecordAccumulator.AppendCallbacks { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { assertTrue(exception.getMessage().equals("Producer is closed forcefully.")); numExceptionReceivedInCallback.incrementAndGet(); } + + @Override + public void setPartition(int partition) { + } } for (int i = 0; i < numRecords; i++) - accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, new TestCallback(), maxBlockTimeMs, false, time.milliseconds()); + accum.append(topic, i % 3, 0L, key, value, null, new TestCallback(), maxBlockTimeMs, false, time.milliseconds(), cluster); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertFalse(result.readyNodes.isEmpty()); Map> drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); @@ -555,15 +559,19 @@ public void testAbortUnsentBatches() throws Exception { 128 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, lingerMs); final KafkaException cause = new KafkaException(); - class TestCallback implements Callback { + class TestCallback implements RecordAccumulator.AppendCallbacks { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { assertEquals(cause, exception); numExceptionReceivedInCallback.incrementAndGet(); } + + @Override + public void setPartition(int partition) { + } } for (int i = 0; i < numRecords; i++) - accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, new TestCallback(), maxBlockTimeMs, false, time.milliseconds()); + accum.append(topic, i % 3, 0L, key, value, null, new TestCallback(), maxBlockTimeMs, false, time.milliseconds(), cluster); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertFalse(result.readyNodes.isEmpty()); Map> drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, @@ -602,7 +610,7 @@ private void doExpireBatchSingle(int deliveryTimeoutMs) throws InterruptedExcept for (Boolean mute: muteStates) { if (time.milliseconds() < System.currentTimeMillis()) time.setCurrentTimeMs(System.currentTimeMillis()); - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); assertEquals(0, accum.ready(cluster, time.milliseconds()).readyNodes.size(), "No partition should be ready."); time.sleep(lingerMs); @@ -651,11 +659,11 @@ public void testExpiredBatches() throws InterruptedException { // Test batches not in retry for (int i = 0; i < appends; i++) { - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); assertEquals(0, accum.ready(cluster, time.milliseconds()).readyNodes.size(), "No partitions should be ready."); } // Make the batches ready due to batch full - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds()); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster); Set readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready"); // Advance the clock to expire the batch. @@ -685,7 +693,7 @@ public void testExpiredBatches() throws InterruptedException { // Test batches in retry. // Create a retried batch - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds()); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster); time.sleep(lingerMs); readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready"); @@ -709,7 +717,7 @@ public void testExpiredBatches() throws InterruptedException { assertEquals(0, expiredBatches.size(), "All batches should have been expired."); // Test that when being throttled muted batches are expired before the throttle time is over. - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds()); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster); time.sleep(lingerMs); readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready"); @@ -742,7 +750,7 @@ public void testMutedPartitions() throws InterruptedException { batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, 10); int appends = expectedNumAppends(batchSize); for (int i = 0; i < appends; i++) { - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); assertEquals(0, accum.ready(cluster, now).readyNodes.size(), "No partitions should be ready."); } time.sleep(2000); @@ -785,7 +793,7 @@ public void testIdempotenceWithOldMagic() { CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager, new BufferPool(totalSize, batchSize, metrics, time, metricGrpName)); assertThrows(UnsupportedVersionException.class, - () -> accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds())); + () -> accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster)); } @Test @@ -808,10 +816,10 @@ public void testRecordsDrainedWhenTransactionCompleting() throws Exception { // Initially, the transaction is still in progress, so we should respect the linger. Mockito.when(transactionManager.isCompleting()).thenReturn(false); - accumulator.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, - false, time.milliseconds()); - accumulator.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, - false, time.milliseconds()); + accumulator.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, + false, time.milliseconds(), cluster); + accumulator.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, + false, time.milliseconds(), cluster); assertTrue(accumulator.hasUndrained()); RecordAccumulator.ReadyCheckResult firstResult = accumulator.ready(cluster, time.milliseconds()); @@ -930,7 +938,7 @@ public void testSplitFrequency() throws InterruptedException { int dice = random.nextInt(100); byte[] value = (dice < goodCompRatioPercentage) ? bytesWithGoodCompression(random) : bytesWithPoorCompression(random, 100); - accum.append(tp1, 0L, null, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds()); + accum.append(topic, partition1, 0L, null, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster); BatchDrainedResult result = completeOrSplitBatches(accum, batchSize); numSplit += result.numSplit; numBatches += result.numBatches; @@ -953,7 +961,7 @@ public void testSoonToExpireBatchesArePickedUpForExpiry() throws InterruptedExce RecordAccumulator accum = createTestRecordAccumulator( batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, lingerMs); - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); Set readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; Map> drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE, time.milliseconds()); assertTrue(drained.isEmpty()); @@ -968,7 +976,7 @@ public void testSoonToExpireBatchesArePickedUpForExpiry() throws InterruptedExce //assertTrue(accum.soonToExpireInFlightBatches().isEmpty()); // Queue another batch and advance clock such that batch expiry time is earlier than request timeout. - accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); time.sleep(lingerMs * 4); // Now drain and check that accumulator picked up the drained batch because its expiry is soon. @@ -993,7 +1001,7 @@ public void testExpiredBatchesRetry() throws InterruptedException { // Test batches in retry. for (Boolean mute : muteStates) { - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds()); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster); time.sleep(lingerMs); readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready"); @@ -1015,6 +1023,7 @@ public void testExpiredBatchesRetry() throws InterruptedException { } } + @SuppressWarnings("deprecation") @Test public void testStickyBatches() throws Exception { long now = time.milliseconds(); @@ -1024,24 +1033,23 @@ public void testStickyBatches() throws Exception { Partitioner partitioner = new DefaultPartitioner(); RecordAccumulator accum = createTestRecordAccumulator(3200, - batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10L * batchSize, CompressionType.NONE, 10); + batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10L * batchSize, CompressionType.NONE, 10); int expectedAppends = expectedNumAppendsNoKey(batchSize); // Create first batch int partition = partitioner.partition(topic, null, null, "value", value, cluster); - TopicPartition tp = new TopicPartition(topic, partition); - accum.append(tp, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(topic, partition, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); int appends = 1; boolean switchPartition = false; while (!switchPartition) { // Append to the first batch partition = partitioner.partition(topic, null, null, "value", value, cluster); - tp = new TopicPartition(topic, partition); - RecordAccumulator.RecordAppendResult result = accum.append(tp, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, true, time.milliseconds()); - Deque partitionBatches1 = accum.batches().get(tp1); - Deque partitionBatches2 = accum.batches().get(tp2); - Deque partitionBatches3 = accum.batches().get(tp3); + RecordAccumulator.RecordAppendResult result = accum.append(topic, partition, 0L, null, + value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, true, time.milliseconds(), cluster); + Deque partitionBatches1 = accum.getDeque(tp1); + Deque partitionBatches2 = accum.getDeque(tp2); + Deque partitionBatches3 = accum.getDeque(tp3); int numBatches = (partitionBatches1 == null ? 0 : partitionBatches1.size()) + (partitionBatches2 == null ? 0 : partitionBatches2.size()) + (partitionBatches3 == null ? 0 : partitionBatches3.size()); // Only one batch is created because the partition is sticky. assertEquals(1, numBatches); @@ -1062,18 +1070,17 @@ public void testStickyBatches() throws Exception { // KafkaProducer would call this method in this case, make second batch partitioner.onNewBatch(topic, cluster, partition); partition = partitioner.partition(topic, null, null, "value", value, cluster); - tp = new TopicPartition(topic, partition); - accum.append(tp, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(topic, partition, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); appends++; // These appends all go into the second batch while (!switchPartition) { partition = partitioner.partition(topic, null, null, "value", value, cluster); - tp = new TopicPartition(topic, partition); - RecordAccumulator.RecordAppendResult result = accum.append(tp, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, true, time.milliseconds()); - Deque partitionBatches1 = accum.batches().get(tp1); - Deque partitionBatches2 = accum.batches().get(tp2); - Deque partitionBatches3 = accum.batches().get(tp3); + RecordAccumulator.RecordAppendResult result = accum.append(topic, partition, 0L, null, value, + Record.EMPTY_HEADERS, null, maxBlockTimeMs, true, time.milliseconds(), cluster); + Deque partitionBatches1 = accum.getDeque(tp1); + Deque partitionBatches2 = accum.getDeque(tp2); + Deque partitionBatches3 = accum.getDeque(tp3); int numBatches = (partitionBatches1 == null ? 0 : partitionBatches1.size()) + (partitionBatches2 == null ? 0 : partitionBatches2.size()) + (partitionBatches3 == null ? 0 : partitionBatches3.size()); // Only two batches because the new partition is also sticky. assertEquals(2, numBatches); @@ -1089,6 +1096,158 @@ public void testStickyBatches() throws Exception { assertEquals(appends, 2 * expectedAppends); } + @Test + public void testUniformBuiltInPartitioner() throws Exception { + + try { + // Mock random number generator with just sequential integer. + AtomicInteger mockRandom = new AtomicInteger(); + BuiltInPartitioner.mockRandom = () -> mockRandom.getAndAdd(1); + + long totalSize = 1024 * 1024; + int batchSize = 128; // note that this is also a "sticky" limit for the partitioner + RecordAccumulator accum = createTestRecordAccumulator(batchSize, totalSize, CompressionType.NONE, 0); + + // Set up callbacks so that we know what partition is chosen. + final AtomicInteger partition = new AtomicInteger(RecordMetadata.UNKNOWN_PARTITION); + RecordAccumulator.AppendCallbacks callbacks = new RecordAccumulator.AppendCallbacks() { + @Override + public void setPartition(int p) { + partition.set(p); + } + + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + + } + }; + + // Produce small record, we should switch to first partition. + accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, value, Record.EMPTY_HEADERS, + callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); + assertEquals(partition1, partition.get()); + assertEquals(1, mockRandom.get()); + + // Produce large record, we should exceed "sticky" limit, but produce to this partition + // as we switch after the "sticky" limit is exceeded. The partition is switched after + // we produce. + byte[] largeValue = new byte[batchSize]; + accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, + callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); + assertEquals(partition1, partition.get()); + assertEquals(2, mockRandom.get()); + + // Produce large record, we should switch to next partition. + accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, + callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); + assertEquals(partition2, partition.get()); + assertEquals(3, mockRandom.get()); + + // Produce large record, we should switch to next partition. + accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, + callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); + assertEquals(partition3, partition.get()); + assertEquals(4, mockRandom.get()); + + // Produce large record, we should switch to first partition again. + accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, + callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); + assertEquals(partition1, partition.get()); + assertEquals(5, mockRandom.get()); + } finally { + BuiltInPartitioner.mockRandom = null; + } + } + + @Test + public void testAdaptiveBuiltInPartitioner() throws Exception { + try { + // Mock random number generator with just sequential integer. + AtomicInteger mockRandom = new AtomicInteger(); + BuiltInPartitioner.mockRandom = () -> mockRandom.getAndAdd(1); + + // Create accumulator with partitioner config to enable adaptive partitioning. + RecordAccumulator.PartitionerConfig config = new RecordAccumulator.PartitionerConfig(true, 100); + long totalSize = 1024 * 1024; + int batchSize = 128; + RecordAccumulator accum = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0, 0L, + 3200, config, metrics, "producer-metrics", time, new ApiVersions(), null, + new BufferPool(totalSize, batchSize, metrics, time, "producer-internal-metrics")); + + byte[] largeValue = new byte[batchSize]; + int[] queueSizes = {1, 7, 2}; + int[] expectedFrequencies = new int[queueSizes.length]; + for (int i = 0; i < queueSizes.length; i++) { + expectedFrequencies[i] = 8 - queueSizes[i]; // 8 is max(queueSizes) + 1 + for (int c = queueSizes[i]; c-- > 0; ) { + // Add large records to each partition, so that each record creates a batch. + accum.append(topic, i, 0L, null, largeValue, Record.EMPTY_HEADERS, + null, maxBlockTimeMs, false, time.milliseconds(), cluster); + } + assertEquals(queueSizes[i], accum.getDeque(new TopicPartition(topic, i)).size()); + } + + // Let the accumulator generate the probability tables. + accum.ready(cluster, time.milliseconds()); + + // Set up callbacks so that we know what partition is chosen. + final AtomicInteger partition = new AtomicInteger(RecordMetadata.UNKNOWN_PARTITION); + RecordAccumulator.AppendCallbacks callbacks = new RecordAccumulator.AppendCallbacks() { + @Override + public void setPartition(int p) { + partition.set(p); + } + + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + + } + }; + + // Prime built-in partitioner so that it'd switch on every record, as switching only + // happens after the "sticky" limit is exceeded. + accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, + callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); + + // Issue a certain number of partition calls to validate that the partitions would be + // distributed with frequencies that are reciprocal to the queue sizes. The number of + // iterations is defined by the last element of the cumulative frequency table which is + // the sum of all frequencies. We do 2 cycles, just so it's more than 1. + final int numberOfCycles = 2; + int numberOfIterations = accum.getBuiltInPartitioner(topic).loadStatsRangeEnd() * numberOfCycles; + int[] frequencies = new int[queueSizes.length]; + + for (int i = 0; i < numberOfIterations; i++) { + accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, + callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); + ++frequencies[partition.get()]; + } + + // Verify that frequencies are reciprocal of queue sizes. + for (int i = 0; i < frequencies.length; i++) { + assertEquals(expectedFrequencies[i] * numberOfCycles, frequencies[i], + "Partition " + i + " was chosen " + frequencies[i] + " times"); + } + + // Test that partitions residing on high-latency nodes don't get switched to. + accum.updateNodeLatencyStats(0, time.milliseconds() - 200, true); + accum.updateNodeLatencyStats(0, time.milliseconds(), false); + accum.ready(cluster, time.milliseconds()); + + // Do one append, because partition gets switched after append. + accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, + callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); + + for (int c = 10; c-- > 0; ) { + accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, + callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); + assertEquals(partition3, partition.get()); + } + } finally { + BuiltInPartitioner.mockRandom = null; + } + } + private int prepareSplitBatches(RecordAccumulator accum, long seed, int recordSize, int numRecords) throws InterruptedException { Random random = new Random(); @@ -1098,7 +1257,7 @@ private int prepareSplitBatches(RecordAccumulator accum, long seed, int recordSi CompressionRatioEstimator.setEstimation(tp1.topic(), CompressionType.GZIP, 0.1f); // Append 20 records of 100 bytes size with poor compression ratio should make the batch too big. for (int i = 0; i < numRecords; i++) { - accum.append(tp1, 0L, null, bytesWithPoorCompression(random, recordSize), Record.EMPTY_HEADERS, null, 0, false, time.milliseconds()); + accum.append(topic, partition1, 0L, null, bytesWithPoorCompression(random, recordSize), Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster); } RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 60e9f06186255..3d972b3eb2cfe 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -22,7 +22,6 @@ import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.NodeApiVersions; -import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.InvalidRecordException; @@ -467,22 +466,30 @@ public void testAppendInExpiryCallback() throws InterruptedException { final byte[] key = "key".getBytes(); final byte[] value = "value".getBytes(); final long maxBlockTimeMs = 1000; - Callback callback = (metadata, exception) -> { - if (exception instanceof TimeoutException) { - expiryCallbackCount.incrementAndGet(); - try { - accumulator.append(tp1, 0L, key, value, - Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); - } catch (InterruptedException e) { - throw new RuntimeException("Unexpected interruption", e); - } - } else if (exception != null) - unexpectedException.compareAndSet(null, exception); + Cluster cluster = TestUtils.singletonCluster(); + RecordAccumulator.AppendCallbacks callbacks = new RecordAccumulator.AppendCallbacks() { + @Override + public void setPartition(int partition) { + } + + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception instanceof TimeoutException) { + expiryCallbackCount.incrementAndGet(); + try { + accumulator.append(tp1.topic(), tp1.partition(), 0L, key, value, + Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); + } catch (InterruptedException e) { + throw new RuntimeException("Unexpected interruption", e); + } + } else if (exception != null) + unexpectedException.compareAndSet(null, exception); + } }; final long nowMs = time.milliseconds(); for (int i = 0; i < messagesPerBatch; i++) - accumulator.append(tp1, 0L, key, value, null, callback, maxBlockTimeMs, false, nowMs); + accumulator.append(tp1.topic(), tp1.partition(), 0L, key, value, null, callbacks, maxBlockTimeMs, false, nowMs, cluster); // Advance the clock to expire the first batch. time.sleep(10000); @@ -501,9 +508,9 @@ public void testAppendInExpiryCallback() throws InterruptedException { assertEquals(messagesPerBatch, expiryCallbackCount.get(), "Callbacks not invoked for expiry"); assertNull(unexpectedException.get(), "Unexpected exception"); // Make sure that the reconds were appended back to the batch. - assertTrue(accumulator.batches().containsKey(tp1)); - assertEquals(1, accumulator.batches().get(tp1).size()); - assertEquals(messagesPerBatch, accumulator.batches().get(tp1).peekFirst().recordCount); + assertNotNull(accumulator.getDeque(tp1)); + assertEquals(1, accumulator.getDeque(tp1).size()); + assertEquals(messagesPerBatch, accumulator.getDeque(tp1).peekFirst().recordCount); } /** @@ -546,6 +553,76 @@ public void testMetadataTopicExpiry() throws Exception { assertTrue(future.isDone(), "Request should be completed"); } + @Test + public void testNodeLatencyStats() throws Exception { + try (Metrics m = new Metrics()) { + // Create a new record accumulator with non-0 partitionAvailabilityTimeoutMs + // otherwise it wouldn't update the stats. + RecordAccumulator.PartitionerConfig config = new RecordAccumulator.PartitionerConfig(false, 42); + long totalSize = 1024 * 1024; + accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0, 0L, + DELIVERY_TIMEOUT_MS, config, m, "producer-metrics", time, apiVersions, null, + new BufferPool(totalSize, batchSize, m, time, "producer-internal-metrics")); + + SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); + Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, 1, + senderMetrics, time, REQUEST_TIMEOUT, 1000L, null, new ApiVersions()); + + // Produce and send batch. + long time1 = time.milliseconds(); + appendToAccumulator(tp0, 0L, "key", "value"); + sender.runOnce(); + assertEquals(1, client.inFlightRequestCount(), "We should have a single produce request in flight."); + + // We were able to send the batch out, so both the ready and drain values should be the same. + RecordAccumulator.NodeLatencyStats stats = accumulator.getNodeLatencyStats(0); + assertEquals(time1, stats.drainTimeMs); + assertEquals(time1, stats.readyTimeMs); + + // Make the node 1 not ready. + client.throttle(metadata.fetch().nodeById(0), 100); + + // Time passes, but we don't have anything to send. + time.sleep(10); + sender.runOnce(); + assertEquals(1, client.inFlightRequestCount(), "We should have a single produce request in flight."); + + // Stats shouldn't change as we didn't have anything ready. + assertEquals(time1, stats.drainTimeMs); + assertEquals(time1, stats.readyTimeMs); + + // Produce a new batch, but we won't be able to send it because node is not ready. + long time2 = time.milliseconds(); + appendToAccumulator(tp0, 0L, "key", "value"); + sender.runOnce(); + assertEquals(1, client.inFlightRequestCount(), "We should have a single produce request in flight."); + + // The ready time should move forward, but drain time shouldn't change. + assertEquals(time1, stats.drainTimeMs); + assertEquals(time2, stats.readyTimeMs); + + // Time passes, we keep trying to send, but the node is not ready. + time.sleep(10); + time2 = time.milliseconds(); + sender.runOnce(); + assertEquals(1, client.inFlightRequestCount(), "We should have a single produce request in flight."); + + // The ready time should move forward, but drain time shouldn't change. + assertEquals(time1, stats.drainTimeMs); + assertEquals(time2, stats.readyTimeMs); + + // Finally, time passes beyond the throttle and the node is ready. + time.sleep(100); + time2 = time.milliseconds(); + sender.runOnce(); + assertEquals(2, client.inFlightRequestCount(), "We should have 2 produce requests in flight."); + + // Both times should move forward + assertEquals(time2, stats.drainTimeMs); + assertEquals(time2, stats.readyTimeMs); + } + } + @Test public void testInitProducerIdRequest() { final long producerId = 343434L; @@ -1200,7 +1277,7 @@ public void testCorrectHandlingOfOutOfOrderResponses() throws Exception { client.respondToRequest(secondClientRequest, produceResponse(tp0, -1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1)); sender.runOnce(); // receive response 1 - Deque queuedBatches = accumulator.batches().get(tp0); + Deque queuedBatches = accumulator.getDeque(tp0); // Make sure that we are queueing the second batch first. assertEquals(1, queuedBatches.size()); @@ -1281,7 +1358,7 @@ public void testCorrectHandlingOfOutOfOrderResponsesWhenSecondSucceeds() throws assertTrue(request2.isDone()); assertEquals(1, request2.get().offset()); assertFalse(request1.isDone()); - Deque queuedBatches = accumulator.batches().get(tp0); + Deque queuedBatches = accumulator.getDeque(tp0); assertEquals(0, queuedBatches.size()); assertEquals(1, client.inFlightRequestCount()); @@ -1389,7 +1466,7 @@ public void testExpiryOfFirstBatchShouldNotCauseUnresolvedSequencesIfFutureBatch assertEquals(1, request2.get().offset()); assertEquals(0, sender.inFlightBatches(tp0).size()); - Deque batches = accumulator.batches().get(tp0); + Deque batches = accumulator.getDeque(tp0); assertEquals(1, batches.size()); assertFalse(batches.peekFirst().hasSequence()); assertFalse(client.hasInFlightRequests()); @@ -1444,7 +1521,7 @@ public void testExpiryOfFirstBatchShouldCauseEpochBumpIfFutureBatchesFail() thro sendIdempotentProducerResponse(1, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 1); sender.runOnce(); // receive second response, the third request shouldn't be sent since we are in an unresolved state. - Deque batches = accumulator.batches().get(tp0); + Deque batches = accumulator.getDeque(tp0); // The epoch should be bumped and the second request should be requeued assertEquals(2, batches.size()); @@ -1524,7 +1601,7 @@ public void testExpiryOfAllSentBatchesShouldCauseUnresolvedSequences() throws Ex assertFutureFailure(request1, TimeoutException.class); assertTrue(transactionManager.hasUnresolvedSequence(tp0)); assertFalse(client.hasInFlightRequests()); - Deque batches = accumulator.batches().get(tp0); + Deque batches = accumulator.getDeque(tp0); assertEquals(0, batches.size()); assertEquals(producerId, transactionManager.producerIdAndEpoch().producerId); @@ -2337,10 +2414,11 @@ private void testSplitBatchAndSend(TransactionManager txnManager, client.prepareMetadataUpdate(metadataUpdate1); // Send the first message. long nowMs = time.milliseconds(); + Cluster cluster = TestUtils.singletonCluster(); Future f1 = - accumulator.append(tp, 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT, false, nowMs).future; + accumulator.append(tp.topic(), tp.partition(), 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT, false, nowMs, cluster).future; Future f2 = - accumulator.append(tp, 0L, "key2".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT, false, nowMs).future; + accumulator.append(tp.topic(), tp.partition(), 0L, "key2".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT, false, nowMs, cluster).future; sender.runOnce(); // connect sender.runOnce(); // send produce request @@ -2395,7 +2473,7 @@ private void testSplitBatchAndSend(TransactionManager txnManager, assertEquals(2, txnManager.sequenceNumber(tp).longValue(), "The next sequence number should be 2"); assertEquals(OptionalInt.of(1), txnManager.lastAckedSequence(tp), "The last ack'd sequence number should be 1"); assertEquals(1L, f2.get().offset(), "Offset of the first message should be 1"); - assertTrue(accumulator.batches().get(tp).isEmpty(), "There should be no batch in the accumulator"); + assertTrue(accumulator.getDeque(tp).isEmpty(), "There should be no batch in the accumulator"); assertTrue((Double) (m.metrics().get(senderMetrics.batchSplitRate).metricValue()) > 0, "There should be a split"); } } @@ -3063,8 +3141,8 @@ private FutureRecordMetadata appendToAccumulator(TopicPartition tp) throws Inter } private FutureRecordMetadata appendToAccumulator(TopicPartition tp, long timestamp, String key, String value) throws InterruptedException { - return accumulator.append(tp, timestamp, key.getBytes(), value.getBytes(), Record.EMPTY_HEADERS, - null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future; + return accumulator.append(tp.topic(), tp.partition(), timestamp, key.getBytes(), value.getBytes(), Record.EMPTY_HEADERS, + null, MAX_BLOCK_TIMEOUT, false, time.milliseconds(), TestUtils.singletonCluster()).future; } @SuppressWarnings("deprecation") diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 64be3aeaf47b2..377db5ec06cf4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -693,8 +693,9 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(metrics), thi assertEquals(0, transactionManager.sequenceNumber(tp0).intValue()); - Future responseFuture1 = accumulator.append(tp0, time.milliseconds(), "1".getBytes(), "1".getBytes(), Record.EMPTY_HEADERS, - null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future; + Future responseFuture1 = accumulator.append(tp0.topic(), tp0.partition(), time.milliseconds(), + "1".getBytes(), "1".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false, time.milliseconds(), + TestUtils.singletonCluster()).future; sender.runOnce(); assertEquals(1, transactionManager.sequenceNumber(tp0).intValue()); @@ -723,8 +724,9 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(metrics), thi assertEquals(epoch + 1, transactionManager.producerIdAndEpoch().epoch); assertEquals(0, transactionManager.sequenceNumber(tp0).intValue()); - Future responseFuture2 = accumulator.append(tp0, time.milliseconds(), "2".getBytes(), "2".getBytes(), Record.EMPTY_HEADERS, - null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future; + Future responseFuture2 = accumulator.append(tp0.topic(), tp0.partition(), time.milliseconds(), + "2".getBytes(), "2".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false, time.milliseconds(), + TestUtils.singletonCluster()).future; sender.runOnce(); sender.runOnce(); assertEquals(0, transactionManager.firstInFlightSequence(tp0)); @@ -3178,7 +3180,7 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t // New tp1 batches should not be drained from the accumulator while tp1 has in-flight requests using the old epoch appendToAccumulator(tp1); sender.runOnce(); - assertEquals(1, accumulator.batches().get(tp1).size()); + assertEquals(1, accumulator.getDeque(tp1).size()); // Partition failover occurs and tp1 returns a NOT_LEADER_OR_FOLLOWER error // Despite having the old epoch, the batch should retry @@ -3189,8 +3191,8 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t // The batch with the old epoch should be successfully drained, leaving the new one in the queue sender.runOnce(); - assertEquals(1, accumulator.batches().get(tp1).size()); - assertNotEquals(tp1b2, accumulator.batches().get(tp1).peek()); + assertEquals(1, accumulator.getDeque(tp1).size()); + assertNotEquals(tp1b2, accumulator.getDeque(tp1).peek()); assertEquals(epoch, tp1b2.producerEpoch()); // After successfully retrying, there should be no in-flight batches for tp1 and the sequence should be 0 @@ -3205,7 +3207,7 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t // The last batch should now be drained and sent runUntil(() -> transactionManager.hasInflightBatches(tp1)); - assertTrue(accumulator.batches().get(tp1).isEmpty()); + assertTrue(accumulator.getDeque(tp1).isEmpty()); ProducerBatch tp1b3 = transactionManager.nextBatchBySequence(tp1); assertEquals(epoch + 1, tp1b3.producerEpoch()); @@ -3302,7 +3304,7 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t // New tp1 batches should not be drained from the accumulator while tp1 has in-flight requests using the old epoch appendToAccumulator(tp1); sender.runOnce(); - assertEquals(1, accumulator.batches().get(tp1).size()); + assertEquals(1, accumulator.getDeque(tp1).size()); // Partition failover occurs and tp1 returns a NOT_LEADER_OR_FOLLOWER error // Despite having the old epoch, the batch should retry @@ -3313,8 +3315,8 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t // The batch with the old epoch should be successfully drained, leaving the new one in the queue sender.runOnce(); - assertEquals(1, accumulator.batches().get(tp1).size()); - assertNotEquals(tp1b2, accumulator.batches().get(tp1).peek()); + assertEquals(1, accumulator.getDeque(tp1).size()); + assertNotEquals(tp1b2, accumulator.getDeque(tp1).peek()); assertEquals(epoch, tp1b2.producerEpoch()); // After successfully retrying, there should be no in-flight batches for tp1 and the sequence should be 0 @@ -3329,7 +3331,7 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t // The last batch should now be drained and sent runUntil(() -> transactionManager.hasInflightBatches(tp1)); - assertTrue(accumulator.batches().get(tp1).isEmpty()); + assertTrue(accumulator.getDeque(tp1).isEmpty()); ProducerBatch tp1b3 = transactionManager.nextBatchBySequence(tp1); assertEquals(epoch + 1, tp1b3.producerEpoch()); @@ -3344,8 +3346,8 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t private FutureRecordMetadata appendToAccumulator(TopicPartition tp) throws InterruptedException { final long nowMs = time.milliseconds(); - return accumulator.append(tp, nowMs, "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, - null, MAX_BLOCK_TIMEOUT, false, nowMs).future; + return accumulator.append(tp.topic(), tp.partition(), nowMs, "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, + null, MAX_BLOCK_TIMEOUT, false, nowMs, TestUtils.singletonCluster()).future; } private void verifyCommitOrAbortTransactionRetriable(TransactionResult firstTransactionResult, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index a7c1bb7ada75a..33f0892a6f479 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.kstream; -import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java index b14c846925f40..7848c2db8749f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.kstream; -import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.internals.WindowedSerializer; import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Repartitioned.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Repartitioned.java index 40f66f036c603..929b14ab29072 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Repartitioned.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Repartitioned.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.kstream; -import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.internals.WindowedSerializer; import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java index a435cafb89bea..90ffa3a4a8362 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java @@ -16,12 +16,11 @@ */ package org.apache.kafka.streams.processor; -import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.streams.Topology; /** * Determine how records are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's - * {@link DefaultPartitioner} will be used to determine the partition. + * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used to determine the partition. *

      * Kafka topics are divided into one or more partitions. Since each partition must fit on the servers that host it, so * using multiple partitions allows the topic to scale beyond a size that will fit on a single machine. Partitions also enable you diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java index a90a028d729b9..f5c9c158bc0bf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java @@ -15,8 +15,6 @@ * limitations under the License. */ package org.apache.kafka.streams.processor.internals; - -import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.StreamPartitioner; @@ -25,12 +23,15 @@ public class DefaultStreamPartitioner implements StreamPartitioner { private final Cluster cluster; private final Serializer keySerializer; - private final DefaultPartitioner defaultPartitioner; + @SuppressWarnings("deprecation") + private final org.apache.kafka.clients.producer.internals.DefaultPartitioner defaultPartitioner; + + @SuppressWarnings("deprecation") public DefaultStreamPartitioner(final Serializer keySerializer, final Cluster cluster) { this.cluster = cluster; this.keySerializer = keySerializer; - this.defaultPartitioner = new DefaultPartitioner(); + this.defaultPartitioner = new org.apache.kafka.clients.producer.internals.DefaultPartitioner(); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java index adebf167de4ad..a659525727747 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; @@ -55,7 +54,8 @@ public class WindowedStreamPartitionerTest { @Test public void testCopartitioning() { final Random rand = new Random(); - final DefaultPartitioner defaultPartitioner = new DefaultPartitioner(); + @SuppressWarnings("deprecation") + final org.apache.kafka.clients.producer.internals.DefaultPartitioner defaultPartitioner = new org.apache.kafka.clients.producer.internals.DefaultPartitioner(); final WindowedSerializer timeWindowedSerializer = new TimeWindowedSerializer<>(intSerializer); final WindowedStreamPartitioner streamPartitioner = new WindowedStreamPartitioner<>(timeWindowedSerializer); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 48364f27db583..2da9b397e512e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -21,7 +21,6 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; @@ -784,7 +783,7 @@ public void shouldNotAbortTxnOnEOSCloseDirtyIfNothingSent() { new MockClientSupplier() { @Override public Producer getProducer(final Map config) { - return new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + return new MockProducer(cluster, true, byteArraySerializer, byteArraySerializer) { @Override public void abortTransaction() { functionCalled.set(true); @@ -816,7 +815,7 @@ public void shouldThrowIfTopicIsUnknownOnSendWithPartitioner() { new MockClientSupplier() { @Override public Producer getProducer(final Map config) { - return new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + return new MockProducer(cluster, true, byteArraySerializer, byteArraySerializer) { @Override public List partitionsFor(final String topic) { return Collections.emptyList(); @@ -889,7 +888,7 @@ private StreamsProducer getExceptionalStreamsProducerOnSend(final Exception exce new MockClientSupplier() { @Override public Producer getProducer(final Map config) { - return new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + return new MockProducer(cluster, true, byteArraySerializer, byteArraySerializer) { @Override public synchronized Future send(final ProducerRecord record, final Callback callback) { callback.onCompletion(null, exception); @@ -912,7 +911,7 @@ private StreamsProducer getExceptionalStreamProducerOnPartitionsFor(final Runtim new MockClientSupplier() { @Override public Producer getProducer(final Map config) { - return new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + return new MockProducer(cluster, true, byteArraySerializer, byteArraySerializer) { @Override public synchronized List partitionsFor(final String topic) { throw exception; diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java index 880f2cb3bf5d1..53b80ae38b238 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java @@ -24,7 +24,6 @@ import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.streams.KafkaClientSupplier; @@ -69,7 +68,7 @@ public Producer getProducer(final Map config) { } else { assertFalse(config.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); } - final MockProducer producer = new MockProducer<>(cluster, true, new DefaultPartitioner(), BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER); + final MockProducer producer = new MockProducer<>(cluster, true, BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER); producers.add(producer); return producer; }