Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1465,13 +1465,21 @@ public boolean isDone() {
private class AppendCallbacks<K, V> implements RecordAccumulator.AppendCallbacks {
private final Callback userCallback;
private final ProducerInterceptors<K, V> interceptors;
private final ProducerRecord<K, V> record;
protected int partition = RecordMetadata.UNKNOWN_PARTITION;
private final String topic;
private final Integer recordPartition;
private final String recordLogString;
private volatile int partition = RecordMetadata.UNKNOWN_PARTITION;
private volatile TopicPartition topicPartition;

private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interceptors, ProducerRecord<K, V> record) {
this.userCallback = userCallback;
this.interceptors = interceptors;
this.record = record;
// Extract record info as we don't want to keep a reference to the record during
// whole lifetime of the batch.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move these two lines to the immediate line before where we set recordPartition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it applies to all 3 fields: topic, recordPartition and recordLogString - we extract all this info from the record, so the comment is before we do that (in the PR it's kind of hard to see because of the inline discussion). Let me know if you think otherwise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. Sounds good.

// We don't want to have an NPE here, because the interceptors would not be notified (see .doSend).
topic = record != null ? record.topic() : null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on this? What kind of application bug would surface itself in a silent way like this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/apache/kafka/blob/trunk/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L1041 has a test, which effectively codifies the contract. I would agree that it's weird to have contract about null handling, but at this point I'd rather preserve whatever behavior is codified.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this answered my question. What application bug would result in this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would passing null record not be a bug? I've changed the comment to not mention that it would be a bug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a user passes in a null record in send(), we will be throwing a NullPointException somewhere. So, we probably could just throw an exception early in that case without going through the callback and fix the test accordingly. We probably could do that in a separate PR in trunk.

Copy link
Member

@ijuma ijuma Jul 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at the test and it seems to check that an exception is thrown? As @junrao said, this can be done by validating what send receives instead of polluting the whole codebase. I'm OK if we file a JIRA for that and do it as a separate PR. But we should remove this code when we do that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It checks that the exception is thrown and then it checks that interceptors are called. Probably the test is just sloppy and could use a different error condition. KAFKA-14086

recordPartition = record != null ? record.partition() : null;
recordLogString = log.isTraceEnabled() && record != null ? record.toString() : "";
}

@Override
Expand All @@ -1491,7 +1499,7 @@ public void setPartition(int partition) {

if (log.isTraceEnabled()) {
Copy link
Member

@ijuma ijuma Jul 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of lines above we use a language level assert. In Kafka, we typically use assert like methods like the Objects class since the language level asserts are disabled by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention of an assert is to run in tests, but be disabled in prod, so if my understanding is correct, this is the proper usage.

Copy link
Member

@ijuma ijuma Jul 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not considered best practice in Java and we don't typically do it in Kafka. What would be the reason to run it in tests only?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All my previous life I was using asserts extensively in C/C++, they provide both validation and contract documentation. They do redundant validation in builds that are used in system tests without adding perf cost in prod. I can remove it, if it's not compatible with style, though I don't think this is just style -- using asserts makes a material difference in early bug detection and in code comprehension.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we have been mostly using Objects.requireNonNull for null assertion in our code. It doesn't seem to add too much overhead and helps identify issues in production early on. For consistency, perhaps we could use Objects.requireNonNull instead of assert.

@ijuma : What do you recommend that we use for assertions like assert partitionInfo == stickyPartitionInfo.get()?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point is that you can run these checks in prod without measurable cost. Then why limit it to tests?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct @junrao, Objects.requireNonNull would be the recommended way to assert non null. The reference equality check is less common, we could add our own utility method in Utils for that or inline it.

The main thing is to get the appropriate signal if this happens in prod when the cost is low (both examples would be in that category).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so what's the suggestion in this change? Should I leave the code as is or remove assert? Creating a new utility seems to be out of scope for this change. We could have an offline discussion about asserts, I would be happy to see them used more often in Kafka.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the line in this method, you could do something like:

if (partition < 0)
  throw new IllegalArgumentException("partition should be positive, but it was " + partition):

Which is more informative and idiomatic and checks the more general case that we expect partitions to be positive. But I see that we have sprinkled the same check in other methods. So, having a assertPartitionIsPositive would probably be a better approach. In any case, since this code was introduced in a different change, we can file a JIRA and do it as a separate PR.

I am happy to discuss more, but we should be clear about terminology. Language level asserts in Java aren't used much. Checking preconditions through API boundaries is useful. Within a given boundary, it's best to use the type system to avoid having noise all over the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Log the message here, because we don't know the partition before that.
log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition);
log.trace("Attempting to append record {} with callback {} to topic {} partition {}", recordLogString, userCallback, topic, partition);
}
}

Expand All @@ -1500,11 +1508,15 @@ public int getPartition() {
}

public TopicPartition topicPartition() {
if (record == null)
return null;
return partition == RecordMetadata.UNKNOWN_PARTITION
? ProducerInterceptors.extractTopicPartition(record)
: new TopicPartition(record.topic(), partition);
if (topicPartition == null && topic != null) {
if (partition != RecordMetadata.UNKNOWN_PARTITION)
topicPartition = new TopicPartition(topic, partition);
else if (recordPartition != null)
topicPartition = new TopicPartition(topic, recordPartition);
else
topicPartition = new TopicPartition(topic, RecordMetadata.UNKNOWN_PARTITION);
}
return topicPartition;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,12 @@ public RecordAppendResult append(String topic,
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock);
// This call may block if we exhausted buffer space.
buffer = free.allocate(size, maxTimeToBlock);
// Update the current time in case the buffer allocation blocked above.
// NOTE: getting time may be expensive, so calling it under a lock
// should be avoided.
nowMs = time.milliseconds();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change has some other side-effects to the code logic:

  1. In case of retries (multiple iteration of while loop) when buffer allocation blocks, prior to this change tryAppend on line 283 was being called with older nowMs. After this change it's a more recent time. This is a positive change.

  2. In case of retries, when buffer allocation does not occur, prior to this change the time was computed inside appendNewBatch hence, was guaranteed to the latest. After this change, there might be threads blocked on synchronized or the time consumed by previous retry isn't factored in the nowMs being passed to appendNewBatch. Hence, nowMs being passed to appendNewBatch might be stale by some amount (depending on how long threads were waiting to acquire the block). Is that acceptable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@divijvaidya : For 2, before KAFKA-10888, nowMs is also computed before synchronized. So, it has the same behavior as this PR.

Looking at the code, I am not sure if nowMs is strictly needed. nowMs is used to populate ProducerBatch.lastAppendTime. However, since KAFKA-5886, expiration is based on createTime and not on lastAppendTime. lastAppendTime is only used to upper bound lastAttemptMs. This may not be needed. @hachikuji : Could we just get right of ProducerBatch.lastAppendTime?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we reach a conclusion regarding this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I synced up with Jun offline, in this change it makes sense to preserve the current behavior (too close to release).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to file a JIRA for changing this for the next release?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed KAFKA-14083.

}

synchronized (dq) {
Expand All @@ -307,7 +312,7 @@ public RecordAppendResult append(String topic,
partitionInfo.partition(), topic);
continue;
}
RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer);
RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer, nowMs);
// Set buffer to null, so that deallocate doesn't return it back to free pool, since it's used in the batch.
if (appendResult.newBatchCreated)
buffer = null;
Expand All @@ -333,6 +338,7 @@ public RecordAppendResult append(String topic,
* @param headers the Headers for the record
* @param callbacks The callbacks to execute
* @param buffer The buffer for the new batch
* @param nowMs The current time, in milliseconds
*/
private RecordAppendResult appendNewBatch(String topic,
int partition,
Expand All @@ -342,11 +348,10 @@ private RecordAppendResult appendNewBatch(String topic,
byte[] value,
Header[] headers,
AppendCallbacks callbacks,
ByteBuffer buffer) {
ByteBuffer buffer,
long nowMs) {
assert partition != RecordMetadata.UNKNOWN_PARTITION;

// Update the current time in case the buffer allocation blocked above.
long nowMs = time.milliseconds();
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
Expand Down