Skip to content

Commit

Permalink
Add reset logic to handle long large offsets
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com>
  • Loading branch information
pierDipi committed Nov 8, 2021
1 parent ea89ed0 commit d2776e4
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.BitSet;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -72,7 +73,7 @@ public void recordReceived(final KafkaConsumerRecord<?, ?> record) {
final var tp = new TopicPartition(record.topic(), record.partition());
if (!offsetTrackers.containsKey(tp)) {
// Initialize offset tracker for the given record's topic/partition.
offsetTrackers.put(tp, new OffsetTracker(record.offset() - 1));
offsetTrackers.put(tp, new OffsetTracker(record.offset()));
}
}

Expand Down Expand Up @@ -139,31 +140,72 @@ private synchronized void commit(final TopicPartition topicPartition, final Offs
*/
private static class OffsetTracker {

private final BitSet committedOffsets;
// In order to not use a huge amount of memory we cap the BitSet to a _dynamic_ max size governed by this
// threshold.
private static final int RESET_TRACKER_THRESHOLD = 1_000_000;

private final long initialOffset;
// We store `long` offsets in a `BitSet` that is capable of handling `int` elements.
// The BitSet sets a committed bit for an offset `offset` in this way:
// bitSetOffset = offset - initialOffset
private BitSet committedOffsets;

// InitialOffset represents the offset committed from where committedOffsets BitSet starts, which means that
// the state of committedOffsets[0] is equal to the state of partition[initialOffset].
private long initialOffset;

// CommittedOffsets is the actual offset committed to stable storage.
private long committed;

public OffsetTracker(long initialOffset) {
OffsetTracker(final long initialOffset) {
committedOffsets = new BitSet();
committed = Math.max(initialOffset + 1, 0);
committed = Math.max(initialOffset, 0);
this.initialOffset = committed;
}

public void recordNewOffset(long offset) {
committedOffsets.set((int) (offset - initialOffset));
synchronized void recordNewOffset(final long offset) {
final var bitSetOffset = (int) (offset - initialOffset);
committedOffsets.set(bitSetOffset);
maybeReset(bitSetOffset);
}

public long offsetToCommit() {
synchronized long offsetToCommit() {
return initialOffset + committedOffsets.nextClearBit((int) (committed - initialOffset));
}

public void setCommitted(final long committed) {
synchronized void setCommitted(final long committed) {
this.committed = committed;
}

public long getCommitted() {
synchronized long getCommitted() {
return committed;
}

private void maybeReset(final int offset) {
if (offset > RESET_TRACKER_THRESHOLD) {
reset();
}
}

private void reset() {
// To not grow the BitSet indefinitely we create a new BitSet that starts from the committed offset.
// Since the delivery might be unordered we should copy the state of the current committedOffset BitSet that goes
// from the committed offset to the end of the BitSet.

final var prevCommittedOffsetsArr = committedOffsets.toLongArray();
// Calculate the word index in the long array. Long size is 64.
final var relativeOffset = committed - initialOffset;
final var wordOfCommitted = (int) (relativeOffset / 64);

// Copy from wordOfCommitted to the end: [..., wordOfCommitted, ...]
final var newCommittedOffsetsArr = Arrays.copyOfRange(
prevCommittedOffsetsArr,
wordOfCommitted,
prevCommittedOffsetsArr.length
);

// Re-create committedOffset BitSet and reset initialOffset.
this.committedOffsets = BitSet.valueOf(newCommittedOffsetsArr);
this.initialOffset = committed;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,19 @@ public void shouldCommitAfterSendingEventsOrderedOnTheSamePartitionLongValues()
.containsEntry(new TopicPartition("aaa", 0), Integer.MAX_VALUE + 50L);
}

@Test
public void shouldCommitAfterSendingEventsOrderedOnTheSamePartitionLongPeriod() {
assertThatOffsetCommitted(List.of(new TopicPartition("aaa", 0)), offsetStrategy -> {
for (long i = 0; i < 2_000_051; i++) {

var rec = record("aaa", 0, i);
offsetStrategy.recordReceived(rec);
offsetStrategy.successfullySentToSubscriber(rec);
}
})
.containsEntry(new TopicPartition("aaa", 0), 2_000_051L);
}

@Test
public void shouldNotCommitAfterSendingEventsOrderedOnTheSamePartitionBrokenSequence() {
assertThatOffsetCommitted(List.of(new TopicPartition("aaa", 0)), offsetStrategy -> {
Expand Down

0 comments on commit d2776e4

Please sign in to comment.