Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes #327: Improve error message when Kafka Streams magic number found #482

Merged
merged 13 commits into from
Dec 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ public HighestOffsetAndIncompletes getDecodedIncompletes(long baseOffset) {
case BitSetV2Compressed -> deserialiseBitSetWrapToIncompletes(BitSetV2, baseOffset, decompressZstd(data));
case RunLengthV2 -> runLengthDecodeToIncompletes(encoding, baseOffset, data);
case RunLengthV2Compressed -> runLengthDecodeToIncompletes(RunLengthV2, baseOffset, decompressZstd(data));
case KafkaStreams, KafkaStreamsV2 ->
throw new KafkaStreamsEncodingNotSupported();
default ->
throw new UnsupportedOperationException("Encoding (" + encoding.description() + ") not supported");
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.confluent.parallelconsumer.offsets;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import lombok.experimental.StandardException;

/**
* Thrown when magic number for Kafka Streams offset metadata is found.
* @see <a href="https://github.com/apache/kafka/blob/cc77a38d280657a0e3969b255f103af4d11c7914/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopicPartitionMetadata.java#L33">Kafka Streams magic number</a>
* @author Nacho Munoz
*/
@StandardException
public class KafkaStreamsEncodingNotSupported extends EncodingNotSupportedException{
private static final String ERROR_MESSAGE = "It looks like you might be reusing a Kafka Streams consumer group id, as KS magic numbers were found in the serialised payload, instead of our own. Using PC on top of KS commit data isn't supported. Please, use a fresh consumer group, unique to PC.";

public KafkaStreamsEncodingNotSupported() {
super(ERROR_MESSAGE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,15 @@ public enum OffsetEncoding {
* switch from encoding run lengths as Shorts to Integers
*/
RunLengthV2(v2, (byte) 'e'),
RunLengthV2Compressed(v2, (byte) 'p');
RunLengthV2Compressed(v2, (byte) 'p'),

/**
* Checks for pre-existing Kafka Streams metadata. Although the Kafka Streams magic numbers are annoyingly simple, ours are not, so should be safe to take this guess that they are indeed from Kafka Streams.
* <a href="https://github.com/apache/kafka/blob/cc77a38d280657a0e3969b255f103af4d11c7914/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopicPartitionMetadata.java#L33">source from Kafka Streams code</a>
*/
KafkaStreams(v1, (byte) 1),
KafkaStreamsV2(v2, (byte) 2);


public enum Version {
v1, v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ void setup() {
@EnumSource()
@ResourceLock(value = OffsetMapCodecManager.METADATA_DATA_SIZE_RESOURCE_LOCK, mode = READ)
void offsetsOpenClose(OffsetEncoding encoding) {
var skip = UniLists.of(OffsetEncoding.ByteArray, OffsetEncoding.ByteArrayCompressed);
var skip = UniLists.of(OffsetEncoding.ByteArray, OffsetEncoding.ByteArrayCompressed, OffsetEncoding.KafkaStreams, OffsetEncoding.KafkaStreamsV2);
assumeFalse(skip.contains(encoding));

// todo remove - not even relevant to this test? smelly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ void largeIncompleteOffsetValues(long nextExpectedOffset) {
@ResourceLock(value = OffsetSimultaneousEncoder.COMPRESSION_FORCED_RESOURCE_LOCK, mode = READ_WRITE)
void ensureEncodingGracefullyWorksWhenOffsetsAreVeryLargeAndNotSequential(OffsetEncoding encoding) {
assumeThat("Codec skipped, not applicable", encoding,
not(in(of(ByteArray, ByteArrayCompressed)))); // byte array not currently used
not(in(of(ByteArray, ByteArrayCompressed, KafkaStreams, KafkaStreamsV2)))); // byte array not currently used
var encodingsThatFail = UniLists.of(BitSet, BitSetCompressed, BitSetV2, RunLength, RunLengthCompressed);

// todo don't use static public accessors to change things - makes parallel testing harder and is smelly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Optional.of;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

// todo refactor - remove tests which use hard coded state vs dynamic state - #compressionCycle, #selialiseCycle, #runLengthEncoding, #loadCompressedRunLengthRncoding
@Slf4j
Expand Down Expand Up @@ -359,6 +360,49 @@ void deserialiseBitSet() {
assertThat(deserialisedBitSet).isEqualTo(input);
}

/**
* Tests for friendly errors when Kafka Streams (as far as we can guess) magic numbers are found in the offset metadata.
*/
@SneakyThrows
nachomdo marked this conversation as resolved.
Show resolved Hide resolved
@Test
void deserialiseKafkaStreamsV1() {
final var input = ByteBuffer.allocate(32);
// magic number
input.put((byte) 1);
// timestamp
input.putLong(System.currentTimeMillis());

EncodedOffsetPair encodedOffsetPair = EncodedOffsetPair.unwrap(input.array());
assertThatThrownBy(()->encodedOffsetPair.getDecodedIncompletes(0L))
.isInstanceOf(KafkaStreamsEncodingNotSupported.class);
}

/**
* Tests for friendly errors when Kafka Streams V2 (as far as we can guess) magic numbers are found in the offset metadata.
*/
@SneakyThrows
nachomdo marked this conversation as resolved.
Show resolved Hide resolved
@Test
void deserialiseKafkaStreamsV2() {
final var input = ByteBuffer.allocate(32);
// magic number
input.put((byte) 2);
// timestamp
input.putLong(System.currentTimeMillis());
// metadata
// number of entries
input.putInt(1);
// key size
input.putInt(1);
// key
input.put((byte) 'a');
// value
input.putLong(1L);

EncodedOffsetPair encodedOffsetPair = EncodedOffsetPair.unwrap(input.array());
assertThatThrownBy(()->encodedOffsetPair.getDecodedIncompletes(0L))
.isInstanceOf(KafkaStreamsEncodingNotSupported.class);
}

@SneakyThrows
@Test
void compressionCycle() {
Expand Down