Skip to content

Commit

Permalink
Generate correct crc32c value for the messages with different produce…
Browse files Browse the repository at this point in the history
…Id (#1011)
  • Loading branch information
akrambek authored May 8, 2024
1 parent 8157c77 commit 28ef864
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public final class KafkaClientProduceFactory extends KafkaClientSaslHandshaker i

private final KafkaProduceClientFlusher flushRecord = this::flushRecord;
private final KafkaProduceClientFlusher flushRecordInit = this::flushRecordInit;
private final KafkaProduceClientFlusher frameProduceRecordContFin = this::flushRecordContFin;
private final KafkaProduceClientFlusher flushRecordContFin = this::flushRecordContFin;
private final KafkaProduceClientFlusher flushRecordIgnoreAll = this::flushRecordIgnoreAll;

private final KafkaProduceClientDecoder decodeSaslHandshakeResponse = this::decodeSaslHandshakeResponse;
Expand Down Expand Up @@ -546,10 +546,9 @@ private int flushRecordInit(
final KafkaAckMode ackMode = kafkaProduceDataEx.ackMode().get();
final KafkaKeyFW key = kafkaProduceDataEx.key();
final Array32FW<KafkaHeaderFW> headers = kafkaProduceDataEx.headers();
client.encodeableRecordBytesDeferred = kafkaProduceDataEx.deferred();
client.valueChecksum = kafkaProduceDataEx.crc32c();
final int deferred = kafkaProduceDataEx.deferred();
final int valueSize = payload != null ? payload.sizeof() : 0;
client.valueCompleteSize = valueSize + client.encodeableRecordBytesDeferred;
final int valueCompleteSize = valueSize + deferred;

final int maxEncodeableBytes = client.encodeSlotLimit + client.valueCompleteSize + produceRecordFramingSize;

Expand All @@ -562,6 +561,10 @@ private int flushRecordInit(
client.doEncodeRequestIfNecessary(traceId, budgetId);
}

client.valueChecksum = kafkaProduceDataEx.crc32c();
client.encodeableRecordBytesDeferred = deferred;
client.valueCompleteSize = valueCompleteSize;

if (client.producerId == RECORD_BATCH_PRODUCER_ID_NONE)
{
client.baseSequence = sequence;
Expand All @@ -574,8 +577,7 @@ private int flushRecordInit(
client.doEncodeRecordInit(traceId, timestamp, ackMode, key, payload, headers);
if (client.encodeSlot != NO_SLOT)
{
client.flusher = frameProduceRecordContFin;
client.flushFlags = FLAGS_INIT;
client.flusher = flushRecordContFin;
}
else
{
Expand Down Expand Up @@ -610,6 +612,7 @@ private int flushRecordContFin(
assert progress == limit;
client.flusher = flushRecord;
client.flushFlags = FLAGS_FIN;
client.encodeableRecordBytesDeferred = 0;
}

return progress;
Expand Down Expand Up @@ -1967,7 +1970,7 @@ private void doEncodeProduceRequest(

final ByteBuffer encodeSlotByteBuffer = encodePool.byteBuffer(encodeSlot);
final int encodeSlotBytePosition = encodeSlotByteBuffer.position();
final int partialValueSize = flushFlags != FLAGS_FIN ? encodeableRecordValueBytes : 0;
final int partialValueSize = encodeableRecordBytesDeferred > 0 ? encodeableRecordValueBytes : 0;
encodeSlotByteBuffer.limit(encodeSlotBytePosition + encodeSlotLimit - partialValueSize);
encodeSlotByteBuffer.position(encodeSlotBytePosition + encodeSlotOffset + crcLimit);

Expand All @@ -1976,7 +1979,7 @@ private void doEncodeProduceRequest(
crc.update(encodeSlotByteBuffer);

long checksum = crc.getValue();
if (flushFlags != FLAGS_FIN)
if (encodeableRecordBytesDeferred > 0)
{
checksum = computeChecksum(encodeBuffer, encodeLimit, encodeProgress, encodeSlotBuffer, checksum);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
#

property deltaMillis 0L
property newTimestamp ${kafka:timestamp() + deltaMillis}
property timestamp 1715191875046L
property newTimestamp ${timestamp + deltaMillis}

connect "zilla://streams/app0"
option zilla:window 8192
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ property networkConnectWindow 8192
property newRequestId ${kafka:newRequestId()}
property produceWaitMax 500

property deltaMillis 0L
property newTimestamp ${kafka:timestamp() + deltaMillis}

connect "zilla://streams/net0"
option zilla:window ${networkConnectWindow}
option zilla:transmission "duplex"
Expand Down Expand Up @@ -98,7 +101,7 @@ write 140 # size
83 # length
-1
[0x02]
0x4e8723aa
0x0460c54e9
0s
0 # last offset delta
${newTimestamp} # first timestamp
Expand Down Expand Up @@ -148,7 +151,7 @@ write 140 # size
83 # length
-1
[0x02]
0x4e8723aa
0x026cd6cf
0s
0 # last offset delta
${newTimestamp} # first timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@

property networkAcceptWindow 8192

property deltaMillis 0L
property newTimestamp ${kafka:timestamp() + deltaMillis}

accept "zilla://streams/net0"
option zilla:window ${networkAcceptWindow}
option zilla:transmission "duplex"
Expand Down Expand Up @@ -94,7 +91,7 @@ read 140
83 # length
-1
[0x02]
[0..4]
0x0460c54e9
0s
0 # last offset delta
(long:timestamp) # first timestamp
Expand Down Expand Up @@ -144,7 +141,7 @@ read 140
83 # length
-1
[0x02]
[0..4]
0x026cd6cf
0s
0 # last offset delta
(long:timestamp) # first timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ property networkConnectWindow 8192
property newRequestId ${kafka:newRequestId()}
property produceWaitMax 500

property deltaMillis 0L
property newTimestamp ${kafka:timestamp() + deltaMillis}

connect "zilla://streams/net0"
option zilla:window ${networkConnectWindow}
option zilla:transmission "duplex"
Expand Down

0 comments on commit 28ef864

Please sign in to comment.