diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 74d408d9a5a49..2d5c8994b4df9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -1465,13 +1465,21 @@ public boolean isDone() { private class AppendCallbacks implements RecordAccumulator.AppendCallbacks { private final Callback userCallback; private final ProducerInterceptors interceptors; - private final ProducerRecord record; - protected int partition = RecordMetadata.UNKNOWN_PARTITION; + private final String topic; + private final Integer recordPartition; + private final String recordLogString; + private volatile int partition = RecordMetadata.UNKNOWN_PARTITION; + private volatile TopicPartition topicPartition; private AppendCallbacks(Callback userCallback, ProducerInterceptors interceptors, ProducerRecord record) { this.userCallback = userCallback; this.interceptors = interceptors; - this.record = record; + // Extract record info as we don't want to keep a reference to the record during + // whole lifetime of the batch. + // We don't want to have an NPE here, because the interceptors would not be notified (see .doSend). + topic = record != null ? record.topic() : null; + recordPartition = record != null ? record.partition() : null; + recordLogString = log.isTraceEnabled() && record != null ? record.toString() : ""; } @Override @@ -1491,7 +1499,7 @@ public void setPartition(int partition) { if (log.isTraceEnabled()) { // Log the message here, because we don't know the partition before that. - log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition); + log.trace("Attempting to append record {} with callback {} to topic {} partition {}", recordLogString, userCallback, topic, partition); } } @@ -1500,11 +1508,15 @@ public int getPartition() { } public TopicPartition topicPartition() { - if (record == null) - return null; - return partition == RecordMetadata.UNKNOWN_PARTITION - ? ProducerInterceptors.extractTopicPartition(record) - : new TopicPartition(record.topic(), partition); + if (topicPartition == null && topic != null) { + if (partition != RecordMetadata.UNKNOWN_PARTITION) + topicPartition = new TopicPartition(topic, partition); + else if (recordPartition != null) + topicPartition = new TopicPartition(topic, recordPartition); + else + topicPartition = new TopicPartition(topic, RecordMetadata.UNKNOWN_PARTITION); + } + return topicPartition; } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 4168ea68aa430..a1f684ac95cd4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -297,7 +297,12 @@ public RecordAppendResult append(String topic, byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock); + // This call may block if we exhausted buffer space. buffer = free.allocate(size, maxTimeToBlock); + // Update the current time in case the buffer allocation blocked above. + // NOTE: getting time may be expensive, so calling it under a lock + // should be avoided. + nowMs = time.milliseconds(); } synchronized (dq) { @@ -307,7 +312,7 @@ public RecordAppendResult append(String topic, partitionInfo.partition(), topic); continue; } - RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer); + RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer, nowMs); // Set buffer to null, so that deallocate doesn't return it back to free pool, since it's used in the batch. if (appendResult.newBatchCreated) buffer = null; @@ -333,6 +338,7 @@ public RecordAppendResult append(String topic, * @param headers the Headers for the record * @param callbacks The callbacks to execute * @param buffer The buffer for the new batch + * @param nowMs The current time, in milliseconds */ private RecordAppendResult appendNewBatch(String topic, int partition, @@ -342,11 +348,10 @@ private RecordAppendResult appendNewBatch(String topic, byte[] value, Header[] headers, AppendCallbacks callbacks, - ByteBuffer buffer) { + ByteBuffer buffer, + long nowMs) { assert partition != RecordMetadata.UNKNOWN_PARTITION; - // Update the current time in case the buffer allocation blocked above. - long nowMs = time.milliseconds(); RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs); if (appendResult != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...