Skip to content

Commit

Permalink
feature: #37 Adds support for larger bitset and run-length encodings
Browse files Browse the repository at this point in the history
  • Loading branch information
astubbs committed Dec 1, 2020
1 parent c4dd89b commit 5dab32c
Show file tree
Hide file tree
Showing 13 changed files with 300 additions and 127 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
= Change Log

== next

* Fixes
** https://github.com/confluentinc/parallel-consumer/issues/37[Support BitSet encoding lengths longer than Short.MAX_VALUE #37] - adds new serialisation formats that supports wider range of offsets - (32,767 vs 2,147,483,647) for both BitSet and run-length encoding

== v0.2.0.3

* Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import java.util.BitSet;
import java.util.Optional;

import static io.confluent.parallelconsumer.OffsetEncoding.BitSetCompressed;
import static io.confluent.parallelconsumer.OffsetEncoding.*;

/**
* Encodes a range of offsets, from an incompletes collection into a BitSet.
Expand All @@ -30,34 +30,77 @@
*/
class BitsetEncoder extends OffsetEncoder {

public static final Short MAX_LENGTH_ENCODABLE = Short.MAX_VALUE;
private final Version version; // default to new version

private final ByteBuffer wrappedBitsetBytesBuffer;
private static final Version DEFAULT_VERSION = Version.v2;

public static final Integer MAX_LENGTH_ENCODABLE = Integer.MAX_VALUE;

private ByteBuffer wrappedBitsetBytesBuffer;
private final BitSet bitSet;

private Optional<byte[]> encodedBytes = Optional.empty();

public BitsetEncoder(int length, OffsetSimultaneousEncoder offsetSimultaneousEncoder) throws BitSetEncodingNotSupportedException {
this(length, offsetSimultaneousEncoder, DEFAULT_VERSION);
}

public BitsetEncoder(int length, OffsetSimultaneousEncoder offsetSimultaneousEncoder, Version newVersion) throws BitSetEncodingNotSupportedException {
super(offsetSimultaneousEncoder);

this.version = newVersion;

switch (newVersion) {
case v1 -> initV1(length);
case v2 -> initV2(length);
}
bitSet = new BitSet(length);
}

/**
* Switch from encoding bitset length as a short to an integer (length of 32,000 was reasonable too short).
* <p>
* Integer.MAX_VALUE should always be good enough as system restricts large from being processed at once.
*/
private void initV2(int length) throws BitSetEncodingNotSupportedException {
if (length > MAX_LENGTH_ENCODABLE) {
// need to upgrade to using Integer for the bitset length, but can't change serialisation format in-place
throw new BitSetEncodingNotSupportedException(StringUtils.msg("Bitset too long to encode, as length overflows Short.MAX_VALUE. Length: {}. (max: {})", length, Short.MAX_VALUE));
throw new BitSetEncodingNotSupportedException(StringUtils.msg("Bitset V2 too long to encode, as length overflows Integer.MAX_VALUE. Length: {}. (max: {})", length, MAX_LENGTH_ENCODABLE));
}
// prep bit set buffer
this.wrappedBitsetBytesBuffer = ByteBuffer.allocate(Integer.BYTES + ((length / 8) + 1));
// bitset doesn't serialise it's set capacity, so we have to as the unused capacity actually means something
this.wrappedBitsetBytesBuffer.putInt(length);
}

/**
* This was a bit "short" sighted of me....
*/
private void initV1(int length) throws BitSetEncodingNotSupportedException {
if (length > Short.MAX_VALUE) {
// need to upgrade to using Integer for the bitset length, but can't change serialisation format in-place
throw new BitSetEncodingNotSupportedException("Bitset V1 too long to encode, bitset length overflows Short.MAX_VALUE: " + length + ". (max: " + Short.MAX_VALUE + ")");
}
// prep bit set buffer
this.wrappedBitsetBytesBuffer = ByteBuffer.allocate(Short.BYTES + ((length / 8) + 1));
// bitset doesn't serialise it's set capacity, so we have to as the unused capacity actually means something
this.wrappedBitsetBytesBuffer.putShort((short) length);
bitSet = new BitSet(length);
}

@Override
protected OffsetEncoding getEncodingType() {
return OffsetEncoding.BitSet;
return switch (version) {
case v1 -> BitSet;
case v2 -> BitSetV2;
};
}

@Override
protected OffsetEncoding getEncodingTypeCompressed() {
return BitSetCompressed;
return switch (version) {
case v1 -> BitSetCompressed;
case v2 -> BitSetV2Compressed;
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

import static io.confluent.parallelconsumer.OffsetBitSet.deserialiseBitSetWrap;
import static io.confluent.parallelconsumer.OffsetBitSet.deserialiseBitSetWrapToIncompletes;
import static io.confluent.parallelconsumer.OffsetEncoding.*;
import static io.confluent.parallelconsumer.OffsetEncoding.Version.v1;
import static io.confluent.parallelconsumer.OffsetEncoding.Version.v2;
import static io.confluent.parallelconsumer.OffsetRunLength.*;
import static io.confluent.parallelconsumer.OffsetSimpleSerialisation.decompressZstd;
import static io.confluent.parallelconsumer.OffsetSimpleSerialisation.deserialiseByteArrayToBitMapString;
Expand Down Expand Up @@ -67,7 +70,7 @@ private static byte[] copyBytesOutOfBufferForDebug(ByteBuffer bbData) {
static EncodedOffsetPair unwrap(byte[] input) {
ByteBuffer wrap = ByteBuffer.wrap(input).asReadOnlyBuffer();
byte magic = wrap.get();
OffsetEncoding decode = OffsetEncoding.decode(magic);
OffsetEncoding decode = decode(magic);
ByteBuffer slice = wrap.slice();

return new EncodedOffsetPair(decode, slice);
Expand All @@ -78,10 +81,15 @@ public String getDecodedString() {
String binaryArrayString = switch (encoding) {
case ByteArray -> deserialiseByteArrayToBitMapString(data);
case ByteArrayCompressed -> deserialiseByteArrayToBitMapString(decompressZstd(data));
case BitSet -> deserialiseBitSetWrap(data);
case BitSetCompressed -> deserialiseBitSetWrap(decompressZstd(data));
case BitSet -> deserialiseBitSetWrap(data, v1);
case BitSetCompressed -> deserialiseBitSetWrap(decompressZstd(data), v1);
case RunLength -> runLengthDecodeToString(runLengthDeserialise(data));
case RunLengthCompressed -> runLengthDecodeToString(runLengthDeserialise(decompressZstd(data)));
case BitSetV2-> deserialiseBitSetWrap(data, v2);
case BitSetV2Compressed-> deserialiseBitSetWrap(data, v2);
case RunLengthV2-> deserialiseBitSetWrap(data, v2);
case RunLengthV2Compressed-> deserialiseBitSetWrap(data, v2);
default -> throw new InternalRuntimeError("Invalid state"); // todo why is this needed? what's not covered?
};
return binaryArrayString;
}
Expand All @@ -91,11 +99,15 @@ public Tuple<Long, Set<Long>> getDecodedIncompletes(long baseOffset) {
Tuple<Long, Set<Long>> binaryArrayString = switch (encoding) {
// case ByteArray -> deserialiseByteArrayToBitMapString(data);
// case ByteArrayCompressed -> deserialiseByteArrayToBitMapString(decompressZstd(data));
case BitSet -> deserialiseBitSetWrapToIncompletes(baseOffset, data);
case BitSetCompressed -> deserialiseBitSetWrapToIncompletes(baseOffset, decompressZstd(data));
case RunLength -> runLengthDecodeToIncompletes(baseOffset, data);
case RunLengthCompressed -> runLengthDecodeToIncompletes(baseOffset, decompressZstd(data));
default -> throw new UnsupportedOperationException("Encoding (" + encoding.name() + ") not supported");
case BitSet -> deserialiseBitSetWrapToIncompletes(encoding, baseOffset, data);
case BitSetCompressed -> deserialiseBitSetWrapToIncompletes(BitSet, baseOffset, decompressZstd(data));
case RunLength -> runLengthDecodeToIncompletes(encoding, baseOffset, data);
case RunLengthCompressed -> runLengthDecodeToIncompletes(RunLength, baseOffset, decompressZstd(data));
case BitSetV2 -> deserialiseBitSetWrapToIncompletes(encoding, baseOffset, data);
case BitSetV2Compressed -> deserialiseBitSetWrapToIncompletes(BitSetV2, baseOffset, decompressZstd(data));
case RunLengthV2 -> runLengthDecodeToIncompletes(encoding, baseOffset, data);
case RunLengthV2Compressed -> runLengthDecodeToIncompletes(RunLengthV2, baseOffset, decompressZstd(data));
default -> throw new UnsupportedOperationException("Encoding (" + encoding.description() + ") not supported");
};
return binaryArrayString;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,23 @@
import static io.confluent.csid.utils.Range.range;

/**
* Deserialisation tools for {@link BitsetEncoder}.
* <p>
* todo unify or refactor with {@link BitsetEncoder}. Why was it ever seperate?
*
* @see BitsetEncoder
*/
@Slf4j
public class OffsetBitSet {

static String deserialiseBitSetWrap(ByteBuffer wrap) {
static String deserialiseBitSetWrap(ByteBuffer wrap, OffsetEncoding.Version version) {
wrap.rewind();
short originalBitsetSize = wrap.getShort();

int originalBitsetSize = switch (version) {
case v1 -> (int)wrap.getShort(); // up cast ok
case v2 -> wrap.getInt();
};

ByteBuffer slice = wrap.slice();
return deserialiseBitSet(originalBitsetSize, slice);
}
Expand All @@ -42,9 +51,13 @@ static String deserialiseBitSet(int originalBitsetSize, ByteBuffer s) {
return result.toString();
}

static Tuple<Long, Set<Long>> deserialiseBitSetWrapToIncompletes(long baseOffset, ByteBuffer wrap) {
static Tuple<Long, Set<Long>> deserialiseBitSetWrapToIncompletes(OffsetEncoding encoding, long baseOffset, ByteBuffer wrap) {
wrap.rewind();
short originalBitsetSize = wrap.getShort();
int originalBitsetSize = switch(encoding) {
case BitSet -> wrap.getShort();
case BitSetV2 -> wrap.getInt();
default -> throw new InternalRuntimeError("Invalid state");
};
ByteBuffer slice = wrap.slice();
Set<Long> incompletes = deserialiseBitSetToIncompletes(baseOffset, originalBitsetSize, slice);
long highwaterMark = baseOffset + originalBitsetSize;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package io.confluent.parallelconsumer;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.ByteBuffer;

/**
* Base OffsetEncoder
*/
@Slf4j
abstract class OffsetEncoder {

private final OffsetSimultaneousEncoder offsetSimultaneousEncoder;
Expand Down Expand Up @@ -40,6 +45,7 @@ void register() throws EncodingNotSupportedException {
}

private void register(final OffsetEncoding type, final byte[] bytes) {
log.debug("Registering {}, with site {}", type, bytes.length);
offsetSimultaneousEncoder.sortedEncodings.add(new EncodedOffsetPair(type, ByteBuffer.wrap(bytes)));
offsetSimultaneousEncoder.encodingMap.put(type, bytes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,34 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static io.confluent.parallelconsumer.OffsetEncoding.Version.v1;
import static io.confluent.parallelconsumer.OffsetEncoding.Version.v2;

@ToString
@RequiredArgsConstructor
enum OffsetEncoding {
ByteArray((byte) 'L'),
ByteArrayCompressed((byte) 'î'),
BitSet((byte) 'l'),
BitSetCompressed((byte) 'a'),
RunLength((byte) 'n'),
RunLengthCompressed((byte) 'J');
ByteArray(v1, (byte) 'L'),
ByteArrayCompressed(v1, (byte) 'î'),
BitSet(v1, (byte) 'l'),
BitSetCompressed(v1, (byte) 'a'),
RunLength(v1, (byte) 'n'),
RunLengthCompressed(v1, (byte) 'J'),
/**
* switch from encoding bitset length as a short to an integer (length of 32,000 was reasonable too short)
*/
BitSetV2(v2, (byte) 'o'),
BitSetV2Compressed(v2, (byte) 's'),
/**
* switch from encoding run lengths as Shorts to Integers
*/
RunLengthV2(v2, (byte) 'e'),
RunLengthV2Compressed(v2, (byte) 'p');

enum Version {
v1, v2
}

public final Version version;

@Getter
public final byte magicByte;
Expand All @@ -36,4 +55,8 @@ public static OffsetEncoding decode(byte magic) {
return encoding;
}
}

public String description() {
return name() + ":" + version;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,13 @@ void loadOffsetMapForPartition(final Set<TopicPartition> assignment) {
}

static ParallelConsumer.Tuple<Long, TreeSet<Long>> deserialiseIncompleteOffsetMapFromBase64(long finalBaseComittedOffsetForPartition, String incompleteOffsetMap) throws OffsetDecodingError {
byte[] decode;
byte[] decodedBytes;
try {
decode = OffsetSimpleSerialisation.decodeBase64(incompleteOffsetMap);
decodedBytes = OffsetSimpleSerialisation.decodeBase64(incompleteOffsetMap);
} catch (IllegalArgumentException a) {
throw new OffsetDecodingError(msg("Error decoding offset metadata, input was: {}", incompleteOffsetMap), a);
}
ParallelConsumer.Tuple<Long, Set<Long>> incompleteOffsets = decodeCompressedOffsets(finalBaseComittedOffsetForPartition, decode);
ParallelConsumer.Tuple<Long, Set<Long>> incompleteOffsets = decodeCompressedOffsets(finalBaseComittedOffsetForPartition, decodedBytes);
TreeSet<Long> longs = new TreeSet<>(incompleteOffsets.getRight());
return ParallelConsumer.Tuple.pairOf(incompleteOffsets.getLeft(), longs);
}
Expand Down Expand Up @@ -147,13 +147,13 @@ byte[] encodeOffsetsCompressed(long finalOffsetForPartition, TopicPartition tp,
*
* @return Set of offsets which are not complete.
*/
static ParallelConsumer.Tuple<Long, Set<Long>> decodeCompressedOffsets(long finalOffsetForPartition, byte[] s) {
if (s.length == 0) {
static ParallelConsumer.Tuple<Long, Set<Long>> decodeCompressedOffsets(long finalOffsetForPartition, byte[] decodedBytes) {
if (decodedBytes.length == 0) {
// no offset bitmap data
return ParallelConsumer.Tuple.pairOf(finalOffsetForPartition, UniSets.of());
}

EncodedOffsetPair result = EncodedOffsetPair.unwrap(s);
EncodedOffsetPair result = EncodedOffsetPair.unwrap(decodedBytes);

ParallelConsumer.Tuple<Long, Set<Long>> incompletesTuple = result.getDecodedIncompletes(finalOffsetForPartition);

Expand Down
Loading

0 comments on commit 5dab32c

Please sign in to comment.