Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Fix SubscriptionResponseWrapperSerializer #17205

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ default ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], b
* @param record The record that failed to produce
* @param exception The exception that occurred during production
*/
@SuppressWarnings("deprecation")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side cleanup.

default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context,
final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
Expand Down Expand Up @@ -76,7 +75,6 @@ default ProductionExceptionHandlerResponse handleSerializationException(final Pr
* @param exception the exception that occurred during serialization
* @param origin the origin of the serialization exception
*/
@SuppressWarnings("deprecation")
default ProductionExceptionHandlerResponse handleSerializationException(final ErrorHandlerContext context,
final ProducerRecord record,
final Exception exception,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,8 @@ public void setIfUnset(final SerdeGetter getter) {
public byte[] serialize(final String topic, final SubscriptionResponseWrapper<V> data) {
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized data}

//7-bit (0x7F) maximum for data version.
if (Byte.compare((byte) 0x7F, data.version()) < 0) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is always false. Byte has a range from -128...127; thus, this comparison is incorrect.

throw new UnsupportedVersionException("SubscriptionResponseWrapper version is larger than maximum supported 0x7F");
if (data.version() < 0) {
throw new UnsupportedVersionException("SubscriptionResponseWrapper version cannot be negative");
}

final byte[] serializedData = data.foreignValue() == null ? null : serializer.serialize(topic, data.foreignValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,10 @@ private static final class NonNullableSerde<T> implements Serde<T>, Serializer<T
}

@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {

}
public void configure(final Map<String, ?> configs, final boolean isKey) { }

@Override
public void close() {

}
public void close() { }

@Override
public Serializer<T> serializer() {
Expand All @@ -73,68 +69,95 @@ public T deserialize(final String topic, final byte[] data) {
}

@Test
@SuppressWarnings("unchecked")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side cleanup... (same other tests below)

public void ShouldSerdeWithNonNullsTest() {
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01, (byte) 0x9A, (byte) 0xFF, (byte) 0x00});
final String foreignValue = "foreignValue";
final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
try (final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);

assertArrayEquals(hashedValue, result.originalValueHash());
assertEquals(foreignValue, result.foreignValue());
assertNull(result.primaryPartition());
assertArrayEquals(hashedValue, result.originalValueHash());
assertEquals(foreignValue, result.foreignValue());
assertNull(result.primaryPartition());
}
}

@Test
@SuppressWarnings("unchecked")
public void shouldSerdeWithNullForeignValueTest() {
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01, (byte) 0x9A, (byte) 0xFF, (byte) 0x00});
final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, null, 1);
final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
try (final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);

assertArrayEquals(hashedValue, result.originalValueHash());
assertNull(result.foreignValue());
assertNull(result.primaryPartition());
assertArrayEquals(hashedValue, result.originalValueHash());
assertNull(result.foreignValue());
assertNull(result.primaryPartition());
}
}

@Test
@SuppressWarnings("unchecked")
public void shouldSerdeWithNullHashTest() {
final long[] hashedValue = null;
final String foreignValue = "foreignValue";
final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
try (final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);

assertArrayEquals(hashedValue, result.originalValueHash());
assertEquals(foreignValue, result.foreignValue());
assertNull(result.primaryPartition());
assertArrayEquals(hashedValue, result.originalValueHash());
assertEquals(foreignValue, result.foreignValue());
assertNull(result.primaryPartition());
}
}

@Test
@SuppressWarnings("unchecked")
public void shouldSerdeWithNullsTest() {
final long[] hashedValue = null;
final String foreignValue = null;
final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
try (final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);

assertArrayEquals(hashedValue, result.originalValueHash());
assertEquals(foreignValue, result.foreignValue());
assertNull(result.primaryPartition());
assertArrayEquals(hashedValue, result.originalValueHash());
assertEquals(foreignValue, result.foreignValue());
assertNull(result.primaryPartition());
}
}

@Test
public void shouldThrowExceptionWithBadVersionTest() {
final long[] hashedValue = null;
assertThrows(UnsupportedVersionException.class,
() -> new SubscriptionResponseWrapper<>(hashedValue, "foreignValue", (byte) 0xFF, 1));
assertThrows(
UnsupportedVersionException.class,
() -> new SubscriptionResponseWrapper<>(hashedValue, "foreignValue", (byte) -1, 1)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor change for this test, too, passing in -1 now to make it more logical.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe hashedValue can be replaced by null

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not make much difference. Will just merge as-is to avoid another Jenkins run.

);
}

@Test
public void shouldThrowExceptionOnSerializeWhenDataVersionUnknown() {
final SubscriptionResponseWrapper<String> srw = new InvalidSubscriptionResponseWrapper(null, null, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using anonymous class?

        final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<String>(null, null, 1) {
            @Override
            public byte version() {
                return -1;
            }
        };

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a named class explains the semantics of the test better. Think it better as-is.

try (final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde<>(null)) {
assertThrows(
UnsupportedVersionException.class,
() -> srwSerde.serializer().serialize(null, srw)
);
}
}

public static class InvalidSubscriptionResponseWrapper extends SubscriptionResponseWrapper<String> {

public InvalidSubscriptionResponseWrapper(final long[] originalValueHash,
final String foreignValue,
final Integer primaryPartition) {
super(originalValueHash, foreignValue, primaryPartition);
}

@Override
public byte version() {
return -1;
}
}
}