Skip to content

Commit

Permalink
fix: Gracefully drop Bitset encoding if offset difference too large (…
Browse files Browse the repository at this point in the history
…short range)

RunLength encoding will still be used.

See confluentinc#37 Support BitSet encoding lengths longer than Short.MAX_VALUE
confluentinc#37

A test did try to cover this, but the offset difference wasn't large enough.
  • Loading branch information
astubbs committed Nov 30, 2020
1 parent d6f5b8e commit 5e0436b
Show file tree
Hide file tree
Showing 15 changed files with 390 additions and 121 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.adoc
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
= Change Log

== v0.2.0.3

* Fixes
** https://github.com/confluentinc/parallel-consumer/issues/35[Bitset overflow check (#35)] - gracefully drop BitSet encoding as an option if offset different too large

== v0.2.0.2

* Fixes
** Turns back on the https://github.com/confluentinc/parallel-consumer/issues/35[Bitset overflow check (#35)]

== v0.2.0.1 DO NOT USE
== v0.2.0.1 DO NOT USE - has critical bug

* Fixes
** Incorrectly turns off an over-flow check in https://github.com/confluentinc/parallel-consumer/issues/35[offset serialisation system (#35)]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.confluent.parallelconsumer;

public class BitSetEncodingNotSupportedException extends Exception {

public BitSetEncodingNotSupportedException(String msg) {
super(msg);
}

}
Original file line number Diff line number Diff line change
@@ -1,26 +1,50 @@
package io.confluent.parallelconsumer;

import io.confluent.csid.utils.StringUtils;

import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.Optional;

import static io.confluent.parallelconsumer.OffsetEncoding.BitSetCompressed;

/**
* Encodes a range of offsets, from an incompletes collection into a BitSet.
* <p>
* Highly efficient when the completion status is random.
* <p>
* Highly inefficient when the completion status is in large blocks ({@link RunLengthEncoder} is much better)
* <p>
* Because our system works on manipulating INCOMPLETE offsets, it doesn't matter if the offset range we're encoding is
* Sequential or not. Because as records are always in commit order, if we've seen a range of offsets, we know we've
* seen all that exist (within said range). So if offset 8 is missing from the partition, we will encode it as having
* been completed (when in fact it doesn't exist), because we only compare against known incompletes, and assume all
* others are complete.
* <p>
* So, when we deserialize, the INCOMPLETES collection is then restored, and that's what's used to compare to see if a
* record should be skipped or not. So if record 8 is recorded as completed, it will be absent from the restored
* INCOMPLETES list, and we are assured we will never see record 8.
*
* @see RunLengthEncoder
* @see OffsetBitSet
*/
class BitsetEncoder extends OffsetEncoder {

public static final Short MAX_LENGTH_ENCODABLE = Short.MAX_VALUE;

private final ByteBuffer wrappedBitsetBytesBuffer;
private final BitSet bitSet;

private Optional<byte[]> encodedBytes = Optional.empty();

public BitsetEncoder(final int length, OffsetSimultaneousEncoder offsetSimultaneousEncoder) {
public BitsetEncoder(int length, OffsetSimultaneousEncoder offsetSimultaneousEncoder) throws BitSetEncodingNotSupportedException {
super(offsetSimultaneousEncoder);
// prep bit set buffer
this.wrappedBitsetBytesBuffer = ByteBuffer.allocate(Short.BYTES + ((length / 8) + 1));
if (length > Short.MAX_VALUE) {
if (length > MAX_LENGTH_ENCODABLE) {
// need to upgrade to using Integer for the bitset length, but can't change serialisation format in-place
throw new RuntimeException("Bitset too long to encode, bitset length overflows Short.MAX_VALUE: " + length + ". (max: " + Short.MAX_VALUE + ")");
throw new BitSetEncodingNotSupportedException(StringUtils.msg("Bitset too long to encode, as length overflows Short.MAX_VALUE. Length: {}. (max: {})", length, Short.MAX_VALUE));
}
// prep bit set buffer
this.wrappedBitsetBytesBuffer = ByteBuffer.allocate(Short.BYTES + ((length / 8) + 1));
// bitset doesn't serialise it's set capacity, so we have to as the unused capacity actually means something
this.wrappedBitsetBytesBuffer.putShort((short) length);
bitSet = new BitSet(length);
Expand All @@ -37,13 +61,13 @@ protected OffsetEncoding getEncodingTypeCompressed() {
}

@Override
public void containsIndex(final int rangeIndex) {
//noop
public void encodeIncompleteOffset(final int index) {
// noop - bitset defaults to 0's (`unset`)
}

@Override
public void doesNotContainIndex(final int rangeIndex) {
bitSet.set(rangeIndex);
public void encodeCompletedOffset(final int index) {
bitSet.set(index);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ private boolean isResponsibleForCommits() {

private ConsumerRecords<K, V> pollBrokerForRecords() {
managePauseOfSubscription();
log.debug("Subscriptions are paused: {}", paused);

Duration thisLongPollTimeout = (state == ParallelEoSStreamProcessor.State.running) ? BrokerPollSystem.longPollTimeout : Duration.ofMillis(1); // Can't use Duration.ZERO - this causes Object#wait to wait forever

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ protected OffsetEncoding getEncodingTypeCompressed() {
}

@Override
public void containsIndex(final int rangeIndex) {
public void encodeIncompleteOffset(final int rangeIndex) {
this.bytesBuffer.put((byte) 0);
}

@Override
public void doesNotContainIndex(final int rangeIndex) {
public void encodeCompletedOffset(final int rangeIndex) {
this.bytesBuffer.put((byte) 1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Copyright (C) 2020 Confluent, Inc.
*/

import io.confluent.parallelconsumer.ParallelConsumer.Tuple;
import lombok.extern.slf4j.Slf4j;

import java.nio.ByteBuffer;
Expand All @@ -13,6 +14,9 @@

import static io.confluent.csid.utils.Range.range;

/**
* @see BitsetEncoder
*/
@Slf4j
public class OffsetBitSet {

Expand All @@ -38,18 +42,18 @@ static String deserialiseBitSet(int originalBitsetSize, ByteBuffer s) {
return result.toString();
}

static ParallelConsumer.Tuple<Long, Set<Long>> deserialiseBitSetWrapToIncompletes(long baseOffset, ByteBuffer wrap) {
static Tuple<Long, Set<Long>> deserialiseBitSetWrapToIncompletes(long baseOffset, ByteBuffer wrap) {
wrap.rewind();
short originalBitsetSize = wrap.getShort();
ByteBuffer slice = wrap.slice();
Set<Long> incompletes = deserialiseBitSetToIncompletes(baseOffset, originalBitsetSize, slice);
long highwaterMark = baseOffset + originalBitsetSize;
return ParallelConsumer.Tuple.pairOf(highwaterMark, incompletes);
return Tuple.pairOf(highwaterMark, incompletes);
}

static Set<Long> deserialiseBitSetToIncompletes(long baseOffset, int originalBitsetSize, ByteBuffer inputBuffer) {
BitSet bitSet = BitSet.valueOf(inputBuffer);
var incompletes = new HashSet<Long>(1); // can't know how big this needs to be yet
var incompletes = new HashSet<Long>(); // can't know how big this needs to be yet
for (var relativeOffset : range(originalBitsetSize)) {
long offset = baseOffset + relativeOffset;
if (bitSet.get(relativeOffset)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ public OffsetEncoder(OffsetSimultaneousEncoder offsetSimultaneousEncoder) {

protected abstract OffsetEncoding getEncodingTypeCompressed();

abstract void containsIndex(final int rangeIndex);
abstract void encodeIncompleteOffset(final int rangeIndex);

abstract void doesNotContainIndex(final int rangeIndex);
abstract void encodeCompletedOffset(final int rangeIndex);

abstract byte[] serialise();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.kafka.common.TopicPartition;
import pl.tlinkowski.unij.api.UniSets;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.*;

Expand Down Expand Up @@ -47,6 +48,11 @@ public class OffsetMapCodecManager<K, V> {

org.apache.kafka.clients.consumer.Consumer<K, V> consumer;

/**
* Forces the use of a specific codec, instead of choosing the most efficient one. Useful for testing.
*/
static Optional<OffsetEncoding> forcedCodec = Optional.empty();

public OffsetMapCodecManager(final WorkManager<K, V> wm, final org.apache.kafka.clients.consumer.Consumer<K, V> consumer) {
this.wm = wm;
this.consumer = consumer;
Expand Down Expand Up @@ -111,9 +117,15 @@ String serialiseIncompleteOffsetMapToBase64(long finalOffsetForPartition, TopicP
byte[] encodeOffsetsCompressed(long finalOffsetForPartition, TopicPartition tp, Set<Long> incompleteOffsets) {
Long nextExpectedOffset = wm.partitionOffsetHighWaterMarks.get(tp) + 1;
OffsetSimultaneousEncoder simultaneousEncoder = new OffsetSimultaneousEncoder(finalOffsetForPartition, nextExpectedOffset, incompleteOffsets).invoke();
byte[] result = simultaneousEncoder.packSmallest();

return result;
if (forcedCodec.isPresent()) {
OffsetEncoding forcedOffsetEncoding = forcedCodec.get();
log.warn("Forcing use of {}", forcedOffsetEncoding);
Map<OffsetEncoding, byte[]> encodingMap = simultaneousEncoder.getEncodingMap();
byte[] bytes = encodingMap.get(forcedOffsetEncoding);
return simultaneousEncoder.packEncoding(new EncodedOffsetPair(forcedOffsetEncoding, ByteBuffer.wrap(bytes)));
} else {
return simultaneousEncoder.packSmallest();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/

import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.nio.ByteBuffer;
Expand All @@ -17,6 +16,8 @@
* Encode with multiple strategies at the same time.
* <p>
* Have results in an accessible structure, easily selecting the highest compression.
*
* @see #invoke()
*/
@Slf4j
class OffsetSimultaneousEncoder {
Expand All @@ -42,6 +43,11 @@ class OffsetSimultaneousEncoder {
*/
private final long nextExpectedOffset;

/**
* The difference between the base offset (the offset to be committed) and the highest seen offset
*/
private final int length;

/**
* Map of different encoding types for the same offset data, used for retrieving the data for the encoding type
*/
Expand All @@ -56,10 +62,50 @@ class OffsetSimultaneousEncoder {
@Getter
TreeSet<EncodedOffsetPair> sortedEncodings = new TreeSet<>();

/**
* Force the encoder to also add the compressed versions. Useful for testing.
* <p>
* Visible for testing.
*/
boolean compressionForced = false;

/**
* The encoders to run
*/
private final Set<OffsetEncoder> encoders = new HashSet<>();

public OffsetSimultaneousEncoder(long lowWaterMark, Long nextExpectedOffset, Set<Long> incompleteOffsets) {
this.lowWaterMark = lowWaterMark;
this.nextExpectedOffset = nextExpectedOffset;
this.incompleteOffsets = incompleteOffsets;

length = (int) (this.nextExpectedOffset - this.lowWaterMark);

initEncoders();
}

private void initEncoders() {
if (length > LARGE_INPUT_MAP_SIZE_THRESHOLD) {
log.debug("~Large input map size: {} (start: {} end: {})", length, lowWaterMark, nextExpectedOffset);
}

try {
BitsetEncoder bitsetEncoder = new BitsetEncoder(length, this);
encoders.add(bitsetEncoder);
} catch (BitSetEncodingNotSupportedException a) {
log.warn("Cannot use {} encoder", BitsetEncoder.class.getSimpleName(), a);
}

encoders.add(new RunLengthEncoder(this));
}

/**
* Not enabled as byte buffer seems to always be beaten by BitSet, which makes sense
* <p>
* Visible for testing
*/
void addByteBufferEncoder() {
encoders.add(new ByteBufferEncoder(length, this));
}

/**
Expand All @@ -84,38 +130,39 @@ public OffsetSimultaneousEncoder(long lowWaterMark, Long nextExpectedOffset, Set
* TODO: optimisation - could double the run-length range from Short.MAX_VALUE (~33,000) to Short.MAX_VALUE * 2
* (~66,000) by using unsigned shorts instead (higest representable relative offset is Short.MAX_VALUE because each
* runlength entry is a Short)
* <p>
* TODO VERY large offests ranges are slow (Integer.MAX_VALUE) - encoding scans could be avoided if passing in map of incompletes which should already be known
*/
@SneakyThrows
public OffsetSimultaneousEncoder invoke() {
log.trace("Starting encode of incompletes of {}, base offset is: {}", this.incompleteOffsets, lowWaterMark);

final int length = (int) (this.nextExpectedOffset - this.lowWaterMark);

if (length > LARGE_INPUT_MAP_SIZE_THRESHOLD) {
log.debug("~Large input map size: {}", length);
}

final Set<OffsetEncoder> encoders = new HashSet<>();
encoders.add(new BitsetEncoder(length, this));
encoders.add(new RunLengthEncoder(this));
// TODO: Remove? byte buffer seems to always be beaten by BitSet, which makes sense
// encoders.add(new ByteBufferEncoder(length));
log.debug("Starting encode of incompletes, base offset is: {}, end offset is: {}", lowWaterMark, nextExpectedOffset);
log.trace("Incompletes are: {}", this.incompleteOffsets);

//
log.debug("Encode loop offset start,end: [{},{}] length: {}", this.lowWaterMark, this.nextExpectedOffset, length);
/*
* todo refactor this loop into the encoders (or sequential vs non sequential encoders) as RunLength doesn't need
* to look at every offset in the range, only the ones that change from 0 to 1. BitSet however needs to iterate
* the entire range. So when BitSet can't be used, the encoding would be potentially a lot faster as RunLength
* didn't need the whole loop.
*/
range(length).forEach(rangeIndex -> {
final long offset = this.lowWaterMark + rangeIndex;
List<OffsetEncoder> removeToBeRemoved = new ArrayList<>();
if (this.incompleteOffsets.contains(offset)) {
log.trace("Found an incomplete offset {}", offset);
encoders.forEach(x -> x.containsIndex(rangeIndex));
encoders.forEach(x -> {
x.encodeIncompleteOffset(rangeIndex);
});
} else {
encoders.forEach(x -> x.doesNotContainIndex(rangeIndex));
encoders.forEach(x -> {
x.encodeCompletedOffset(rangeIndex);
});
}
encoders.removeAll(removeToBeRemoved);
});

registerEncodings(encoders);

// log.trace("Input: {}", inputString);
log.debug("In order: {}", this.sortedEncodings);

return this;
Expand All @@ -126,8 +173,8 @@ private void registerEncodings(final Set<? extends OffsetEncoder> encoders) {

// compressed versions
// sizes over LARGE_INPUT_MAP_SIZE_THRESHOLD bytes seem to benefit from compression
final boolean noEncodingsAreSmallEnough = encoders.stream().noneMatch(OffsetEncoder::quiteSmall);
if (noEncodingsAreSmallEnough) {
boolean noEncodingsAreSmallEnough = encoders.stream().noneMatch(OffsetEncoder::quiteSmall);
if (noEncodingsAreSmallEnough || compressionForced) {
encoders.forEach(OffsetEncoder::registerCompressed);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ private void doClose(Duration timeout) throws TimeoutException, ExecutionExcepti
log.debug("Shutting down execution pool...");
List<Runnable> unfinished = workerPool.shutdownNow();
if (!unfinished.isEmpty()) {
log.warn("Threads not done: {}", unfinished);
log.warn("Threads not done count: {}", unfinished.size());
}

log.trace("Awaiting worker pool termination...");
Expand All @@ -446,7 +446,7 @@ private void doClose(Duration timeout) throws TimeoutException, ExecutionExcepti
boolean terminationFinishedWithoutTimeout = workerPool.awaitTermination(toSeconds(DrainingCloseable.DEFAULT_TIMEOUT), SECONDS);
interrupted = false;
if (!terminationFinishedWithoutTimeout) {
log.warn("workerPool await timeout!");
log.warn("Thread execution pool termination await timeout! Were any processing jobs dead locked or otherwise stuck?");
boolean shutdown = workerPool.isShutdown();
boolean terminated = workerPool.isTerminated();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ protected OffsetEncoding getEncodingTypeCompressed() {
}

@Override
public void containsIndex(final int rangeIndex) {
public void encodeIncompleteOffset(final int rangeIndex) {
encodeRunLength(false);
}

@Override
public void doesNotContainIndex(final int rangeIndex) {
public void encodeCompletedOffset(final int rangeIndex) {
encodeRunLength(true);
}

Expand Down
Loading

0 comments on commit 5e0436b

Please sign in to comment.