Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 the original author or authors.
* Copyright 2021-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,6 @@

package org.springframework.kafka.support.serializer;

import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -32,24 +31,22 @@
* Delegates to a serializer based on type.
*
* @author Gary Russell
* @author Artem Bilan
*
* @since 2.7.9
*
*/
public class DelegatingByTypeSerializer implements Serializer<Object> {

private static final String RAWTYPES = "rawtypes";

@SuppressWarnings(RAWTYPES)
private final Map<Class<?>, Serializer> delegates = new LinkedHashMap<>();
private final Map<Class<?>, Serializer<?>> delegates = new LinkedHashMap<>();

private final boolean assignable;

/**
* Construct an instance with the map of delegates; keys matched exactly.
* @param delegates the delegates.
*/
@SuppressWarnings(RAWTYPES)
public DelegatingByTypeSerializer(Map<Class<?>, Serializer> delegates) {
public DelegatingByTypeSerializer(Map<Class<?>, Serializer<?>> delegates) {
this(delegates, false);
}

Expand All @@ -62,8 +59,7 @@ public DelegatingByTypeSerializer(Map<Class<?>, Serializer> delegates) {
* @param assignable whether the target is assignable to the key.
* @since 2.8.3
*/
@SuppressWarnings(RAWTYPES)
public DelegatingByTypeSerializer(Map<Class<?>, Serializer> delegates, boolean assignable) {
public DelegatingByTypeSerializer(Map<Class<?>, Serializer<?>> delegates, boolean assignable) {
Assert.notNull(delegates, "'delegates' cannot be null");
Assert.noNullElements(delegates.values(), "Serializers in delegates map cannot be null");
this.delegates.putAll(delegates);
Expand All @@ -73,67 +69,62 @@ public DelegatingByTypeSerializer(Map<Class<?>, Serializer> delegates, boolean a
/**
* Returns true if {@link #findDelegate(Object, Map)} should consider assignability to
* the key rather than an exact match.
* @return true if assigable.
* @return true if assignable.
* @since 2.8.3
*/
protected boolean isAssignable() {
return this.assignable;
}

@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.delegates.values().forEach(del -> del.configure(configs, isKey));
}

@SuppressWarnings({ RAWTYPES, "unchecked" })
@Override
public byte[] serialize(String topic, Object data) {
Serializer delegate = findDelegate(data, this.delegates);
Serializer<Object> delegate = findDelegate(data, this.delegates);
return delegate.serialize(topic, data);
}

@SuppressWarnings({ "unchecked", RAWTYPES })
@Override
public byte[] serialize(String topic, Headers headers, Object data) {
Serializer delegate = findDelegate(data, this.delegates);
Serializer<Object> delegate = findDelegate(data, this.delegates);
return delegate.serialize(topic, headers, data);
}

/**
* Determine the serializer for the data type.
* @param data the data.
* @param delegates the available delegates.
* @return the delgate.
* @param <T> the data type
* @return the delegate.
* @throws SerializationException when there is no match.
* @since 2.8.3
*/
@SuppressWarnings(RAWTYPES)
protected Serializer findDelegate(Object data, Map<Class<?>, Serializer> delegates) {
@SuppressWarnings("unchecked")
protected <T> Serializer<T> findDelegate(T data, Map<Class<?>, Serializer<?>> delegates) {
if (!this.assignable) {
Serializer delegate = delegates.get(data.getClass());
Serializer<?> delegate = delegates.get(data.getClass());
if (delegate == null) {
throw new SerializationException("No matching delegate for type: " + data.getClass().getName()
+ "; supported types: " + this.delegates.keySet().stream()
.map(clazz -> clazz.getName())
.collect(Collectors.toList()));
.map(Class::getName)
.collect(Collectors.toList()));
}
return delegate;
return (Serializer<T>) delegate;
}
else {
Iterator<Entry<Class<?>, Serializer>> iterator = this.delegates.entrySet().iterator();
while (iterator.hasNext()) {
Entry<Class<?>, Serializer> entry = iterator.next();
for (Entry<Class<?>, Serializer<?>> entry : this.delegates.entrySet()) {
if (entry.getKey().isAssignableFrom(data.getClass())) {
return entry.getValue();
return (Serializer<T>) entry.getValue();
}
}
throw new SerializationException("No matching delegate for type: " + data.getClass().getName()
+ "; supported types: " + this.delegates.keySet().stream()
.map(clazz -> clazz.getName())
.collect(Collectors.toList()));
.map(Class::getName)
.collect(Collectors.toList()));
}
}


}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 the original author or authors.
* Copyright 2019-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,6 +35,7 @@
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
Expand All @@ -45,6 +46,8 @@

/**
* @author Gary Russell
* @author Artem Bilan
*
* @since 2.3
*
*/
Expand Down Expand Up @@ -121,16 +124,16 @@ void testWithPropertyConfigKeys() {
private void doTest(DelegatingSerializer serializer, DelegatingDeserializer deserializer) {
Headers headers = new RecordHeaders();
headers.add(new RecordHeader(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR, "bytes".getBytes()));
byte[] bytes = new byte[] { 1, 2, 3, 4 };
byte[] bytes = new byte[]{ 1, 2, 3, 4 };
byte[] serialized = serializer.serialize("foo", headers, new Bytes(bytes));
assertThat(serialized).isSameAs(bytes);
headers.add(new RecordHeader(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR, "int".getBytes()));
serialized = serializer.serialize("foo", headers, 42);
assertThat(serialized).isEqualTo(new byte[] { 0, 0, 0, 42 });
assertThat(serialized).isEqualTo(new byte[]{ 0, 0, 0, 42 });
assertThat(deserializer.deserialize("foo", headers, serialized)).isEqualTo(42);
headers.add(new RecordHeader(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR, "string".getBytes()));
serialized = serializer.serialize("foo", headers, "bar");
assertThat(serialized).isEqualTo(new byte[] { 'b', 'a', 'r' });
assertThat(serialized).isEqualTo(new byte[]{ 'b', 'a', 'r' });
assertThat(deserializer.deserialize("foo", headers, serialized)).isEqualTo("bar");

// implicit Serdes
Expand All @@ -151,25 +154,25 @@ private void doTest(DelegatingSerializer serializer, DelegatingDeserializer dese
Collections.singletonMap(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR, "string"));
new DefaultKafkaHeaderMapper().fromHeaders(messageHeaders, headers);
assertThat(headers.lastHeader(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR).value())
.isEqualTo(new byte[] { 's', 't', 'r', 'i', 'n', 'g' });
.isEqualTo(new byte[]{ 's', 't', 'r', 'i', 'n', 'g' });
serialized = serializer.serialize("foo", headers, "bar");
assertThat(serialized).isEqualTo(new byte[] { 'b', 'a', 'r' });
assertThat(serialized).isEqualTo(new byte[]{ 'b', 'a', 'r' });
assertThat(deserializer.deserialize("foo", headers, serialized)).isEqualTo("bar");
}

private void doTestKeys(DelegatingSerializer serializer, DelegatingDeserializer deserializer) {
Headers headers = new RecordHeaders();
headers.add(new RecordHeader(DelegatingSerializer.KEY_SERIALIZATION_SELECTOR, "bytes".getBytes()));
byte[] bytes = new byte[] { 1, 2, 3, 4 };
byte[] bytes = new byte[]{ 1, 2, 3, 4 };
byte[] serialized = serializer.serialize("foo", headers, new Bytes(bytes));
assertThat(serialized).isSameAs(bytes);
headers.add(new RecordHeader(DelegatingSerializer.KEY_SERIALIZATION_SELECTOR, "int".getBytes()));
serialized = serializer.serialize("foo", headers, 42);
assertThat(serialized).isEqualTo(new byte[] { 0, 0, 0, 42 });
assertThat(serialized).isEqualTo(new byte[]{ 0, 0, 0, 42 });
assertThat(deserializer.deserialize("foo", headers, serialized)).isEqualTo(42);
headers.add(new RecordHeader(DelegatingSerializer.KEY_SERIALIZATION_SELECTOR, "string".getBytes()));
serialized = serializer.serialize("foo", headers, "bar");
assertThat(serialized).isEqualTo(new byte[] { 'b', 'a', 'r' });
assertThat(serialized).isEqualTo(new byte[]{ 'b', 'a', 'r' });
assertThat(deserializer.deserialize("foo", headers, serialized)).isEqualTo("bar");

// implicit Serdes
Expand All @@ -190,9 +193,9 @@ private void doTestKeys(DelegatingSerializer serializer, DelegatingDeserializer
Collections.singletonMap(DelegatingSerializer.KEY_SERIALIZATION_SELECTOR, "string"));
new DefaultKafkaHeaderMapper().fromHeaders(messageHeaders, headers);
assertThat(headers.lastHeader(DelegatingSerializer.KEY_SERIALIZATION_SELECTOR).value())
.isEqualTo(new byte[] { 's', 't', 'r', 'i', 'n', 'g' });
.isEqualTo(new byte[]{ 's', 't', 'r', 'i', 'n', 'g' });
serialized = serializer.serialize("foo", headers, "bar");
assertThat(serialized).isEqualTo(new byte[] { 'b', 'a', 'r' });
assertThat(serialized).isEqualTo(new byte[]{ 'b', 'a', 'r' });
assertThat(deserializer.deserialize("foo", headers, serialized)).isEqualTo("bar");
}

Expand All @@ -218,21 +221,24 @@ void byTypeBadType() {
assertThatExceptionOfType(SerializationException.class).isThrownBy(
() -> serializer.serialize("foo", new Bytes(foo)))
.withMessageMatching("No matching delegate for type: " + Bytes.class.getName()
+ "; supported types: \\[(java.lang.String, \\[B|\\[B, java.lang.String)\\]");
+ "; supported types: \\[(java.lang.String, \\[B|\\[B, java.lang.String)]");
}

@Test
void assignable() {
DelegatingByTypeSerializer serializer = new DelegatingByTypeSerializer(Map.of(Number.class,
new IntegerSerializer(), byte[].class, new ByteArraySerializer()), true);
var delegates = new HashMap<Class<?>, Serializer<?>>();
delegates.put(Number.class, new IntegerSerializer());
delegates.put(byte[].class, new ByteArraySerializer());
DelegatingByTypeSerializer serializer = new DelegatingByTypeSerializer(delegates, true);

Integer i = 42;
assertThat(serializer.serialize("foo", i)).isEqualTo(new byte[] {0, 0, 0, 42});
assertThat(serializer.serialize("foo", i)).isEqualTo(new byte[]{ 0, 0, 0, 42 });
byte[] foo = "foo".getBytes();
assertThat(serializer.serialize("foo", foo)).isSameAs(foo);
assertThatExceptionOfType(SerializationException.class).isThrownBy(
() -> serializer.serialize("foo", new Bytes(foo)))
.withMessageMatching("No matching delegate for type: " + Bytes.class.getName()
+ "; supported types: \\[(java.lang.Number, \\[B|\\[B, java.lang.Number)\\]");
() -> serializer.serialize("foo", new Bytes(foo)))
.withMessageMatching("No matching delegate for type: " + Bytes.class.getName()
+ "; supported types: \\[(java.lang.Number, \\[B|\\[B, java.lang.Number)]");
}

}