-
Notifications
You must be signed in to change notification settings - Fork 15k
KAFKA-8326: Introduce List Serde #6592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
49 commits
Select commit
Hold shift + click to select a range
131b12a
Add List serializer, desearializer, serde
yeralin 146c06a
Fix formatting and unused imports
yeralin 6f18568
Delete ListSerde, and use WrapperSerde instead
0fa8495
Prevent possible NPE when data is null or empty
dae39d7
Use try-with-resources to utilize auto stream closing
36d206d
Remove comparator parameter from ListDeserializer constructor
c93d91e
Implement a test case for ListSerde
aaf22ce
Close de/serializers and propagate parameters in configure methods
b097996
Initialize ArrayList with size for better performance
89b22b5
Add another test case for ListSerde
9dee55c
Add support for fixed/variable size entries encoding
bd0ec99
Force user to pass list class to listSerde
9152216
Introduce default zero-arg constructors
204d1e7
Introduce 4 new configuration parameters for ListSerde
2a0149a
Update configuration strategy for ListSerde
c8b9f1a
Update the code due to review changes
yeralin 90cc373
Set all new config definitions of Type.CLASS
yeralin 6e61cd1
Set default values for newly introduced properties to null
yeralin a884d8b
Suppress unchecked warnings
yeralin fe95b01
Allow import of "org.apache.kafka.clients" package in serialization p…
yeralin 7353b65
Fix spotbug warning
yeralin 7530669
Generify List class
yeralin 17cb8e7
Remove deprecated import
yeralin 85a791b
Use mkMap and mkEntry to populate fixedLengthDeserializers map
yeralin 11d09d2
Move SuppressWarnings statement on a method level
yeralin 71bdf49
Update UUID fixed size to 36 bytes
yeralin 097e780
Add more test cases to cover other types of fixed sizes
yeralin a4d4d89
Rearrange defines
yeralin bd9740f
Modify interface definitions
yeralin dafda14
Make more descriptive docs
yeralin bac39aa
Update the code due to review comments
yeralin 8edaacc
Refactor configure methods for list (de)serializers
yeralin f6326f1
Introduce list (de)serializers configuration tests
yeralin dea18d0
Refactor getters
yeralin eb0db3a
Add null-index-list and negative-size serialization strategies functi…
yeralin 44494eb
Add test coverage for serialization strategies functionality
yeralin 569969e
Update existing test cases
yeralin da88bb7
Migrate to Junit 5 Jupiter
614aa89
Fix merge conflict
b982f36
Remove extraneous configurations
9e02a90
Rename constants
15eb0d8
Use primitives' BYTES constants
cdbbbd5
Validate passed constructor parameters
e8a5e59
Throw when trying to configure initialized list
819e186
Refactor list (de)serializer tests
f58bfce
Review refactoring
b4989fe
Review changes
e7c7789
Add documentation changes
37569c9
Review changes
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
191 changes: 191 additions & 0 deletions
191
clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,191 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.kafka.common.serialization; | ||
|
|
||
| import static org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy; | ||
| import static org.apache.kafka.common.utils.Utils.mkEntry; | ||
| import static org.apache.kafka.common.utils.Utils.mkMap; | ||
|
|
||
| import java.io.ByteArrayInputStream; | ||
| import java.io.DataInputStream; | ||
| import java.io.IOException; | ||
| import java.lang.reflect.Constructor; | ||
| import java.lang.reflect.InvocationTargetException; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import org.apache.kafka.clients.CommonClientConfigs; | ||
| import org.apache.kafka.common.KafkaException; | ||
| import org.apache.kafka.common.config.ConfigException; | ||
| import org.apache.kafka.common.errors.SerializationException; | ||
| import org.apache.kafka.common.serialization.Serdes.ListSerde; | ||
| import org.apache.kafka.common.utils.Utils; | ||
|
|
||
| public class ListDeserializer<Inner> implements Deserializer<List<Inner>> { | ||
|
|
||
| private static final Map<Class<? extends Deserializer<?>>, Integer> FIXED_LENGTH_DESERIALIZERS = mkMap( | ||
| mkEntry(ShortDeserializer.class, Short.BYTES), | ||
| mkEntry(IntegerDeserializer.class, Integer.BYTES), | ||
| mkEntry(FloatDeserializer.class, Float.BYTES), | ||
| mkEntry(LongDeserializer.class, Long.BYTES), | ||
| mkEntry(DoubleDeserializer.class, Double.BYTES), | ||
| mkEntry(UUIDDeserializer.class, 36) | ||
| ); | ||
|
|
||
| private Deserializer<Inner> inner; | ||
| private Class<?> listClass; | ||
| private Integer primitiveSize; | ||
|
|
||
| public ListDeserializer() {} | ||
|
|
||
| public <L extends List<Inner>> ListDeserializer(Class<L> listClass, Deserializer<Inner> inner) { | ||
| if (listClass == null || inner == null) { | ||
| throw new IllegalArgumentException("ListDeserializer requires both \"listClass\" and \"innerDeserializer\" parameters to be provided during initialization"); | ||
| } | ||
| this.listClass = listClass; | ||
| this.inner = inner; | ||
| this.primitiveSize = FIXED_LENGTH_DESERIALIZERS.get(inner.getClass()); | ||
| } | ||
|
|
||
| public Deserializer<Inner> innerDeserializer() { | ||
| return inner; | ||
| } | ||
|
|
||
| @Override | ||
| public void configure(Map<String, ?> configs, boolean isKey) { | ||
| if (listClass != null || inner != null) { | ||
| throw new ConfigException("List deserializer was already initialized using a non-default constructor"); | ||
| } | ||
| configureListClass(configs, isKey); | ||
| configureInnerSerde(configs, isKey); | ||
| } | ||
|
|
||
| private void configureListClass(Map<String, ?> configs, boolean isKey) { | ||
| String listTypePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS; | ||
| final Object listClassOrName = configs.get(listTypePropertyName); | ||
| if (listClassOrName == null) { | ||
| throw new ConfigException("Not able to determine the list class because it was neither passed via the constructor nor set in the config."); | ||
| } | ||
| try { | ||
| if (listClassOrName instanceof String) { | ||
| listClass = Utils.loadClass((String) listClassOrName, Object.class); | ||
| } else if (listClassOrName instanceof Class) { | ||
| listClass = (Class<?>) listClassOrName; | ||
| } else { | ||
| throw new KafkaException("Could not determine the list class instance using \"" + listTypePropertyName + "\" property."); | ||
| } | ||
| } catch (final ClassNotFoundException e) { | ||
| throw new ConfigException(listTypePropertyName, listClassOrName, "Deserializer's list class \"" + listClassOrName + "\" could not be found."); | ||
| } | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| private void configureInnerSerde(Map<String, ?> configs, boolean isKey) { | ||
| String innerSerdePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS; | ||
yeralin marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| final Object innerSerdeClassOrName = configs.get(innerSerdePropertyName); | ||
| if (innerSerdeClassOrName == null) { | ||
| throw new ConfigException("Not able to determine the inner serde class because it was neither passed via the constructor nor set in the config."); | ||
| } | ||
| try { | ||
| if (innerSerdeClassOrName instanceof String) { | ||
| inner = Utils.newInstance((String) innerSerdeClassOrName, Serde.class).deserializer(); | ||
| } else if (innerSerdeClassOrName instanceof Class) { | ||
| inner = (Deserializer<Inner>) ((Serde) Utils.newInstance((Class) innerSerdeClassOrName)).deserializer(); | ||
| } else { | ||
| throw new KafkaException("Could not determine the inner serde class instance using \"" + innerSerdePropertyName + "\" property."); | ||
| } | ||
| inner.configure(configs, isKey); | ||
| primitiveSize = FIXED_LENGTH_DESERIALIZERS.get(inner.getClass()); | ||
| } catch (final ClassNotFoundException e) { | ||
| throw new ConfigException(innerSerdePropertyName, innerSerdeClassOrName, "Deserializer's inner serde class \"" + innerSerdeClassOrName + "\" could not be found."); | ||
yeralin marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
yeralin marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| @SuppressWarnings("unchecked") | ||
| private List<Inner> createListInstance(int listSize) { | ||
| try { | ||
| Constructor<List<Inner>> listConstructor; | ||
| try { | ||
| listConstructor = (Constructor<List<Inner>>) listClass.getConstructor(Integer.TYPE); | ||
| return listConstructor.newInstance(listSize); | ||
| } catch (NoSuchMethodException e) { | ||
| listConstructor = (Constructor<List<Inner>>) listClass.getConstructor(); | ||
| return listConstructor.newInstance(); | ||
| } | ||
| } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | | ||
| IllegalArgumentException | InvocationTargetException e) { | ||
| throw new KafkaException("Could not construct a list instance of \"" + listClass.getCanonicalName() + "\"", e); | ||
| } | ||
| } | ||
|
|
||
| private SerializationStrategy parseSerializationStrategyFlag(final int serializationStrategyFlag) throws IOException { | ||
| if (serializationStrategyFlag < 0 || serializationStrategyFlag >= SerializationStrategy.VALUES.length) { | ||
| throw new SerializationException("Invalid serialization strategy flag value"); | ||
| } | ||
| return SerializationStrategy.VALUES[serializationStrategyFlag]; | ||
yeralin marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| private List<Integer> deserializeNullIndexList(final DataInputStream dis) throws IOException { | ||
yeralin marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| int nullIndexListSize = dis.readInt(); | ||
| List<Integer> nullIndexList = new ArrayList<>(nullIndexListSize); | ||
| while (nullIndexListSize != 0) { | ||
| nullIndexList.add(dis.readInt()); | ||
| nullIndexListSize--; | ||
| } | ||
| return nullIndexList; | ||
| } | ||
|
|
||
| @Override | ||
| public List<Inner> deserialize(String topic, byte[] data) { | ||
| if (data == null) { | ||
| return null; | ||
| } | ||
| try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(data))) { | ||
| SerializationStrategy serStrategy = parseSerializationStrategyFlag(dis.readByte()); | ||
| List<Integer> nullIndexList = null; | ||
| if (serStrategy == SerializationStrategy.CONSTANT_SIZE) { | ||
| // In CONSTANT_SIZE strategy, indexes of null entries are decoded from a null index list | ||
| nullIndexList = deserializeNullIndexList(dis); | ||
| } | ||
| final int size = dis.readInt(); | ||
yeralin marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| List<Inner> deserializedList = createListInstance(size); | ||
| for (int i = 0; i < size; i++) { | ||
| int entrySize = serStrategy == SerializationStrategy.CONSTANT_SIZE ? primitiveSize : dis.readInt(); | ||
| if (entrySize == ListSerde.NULL_ENTRY_VALUE || (nullIndexList != null && nullIndexList.contains(i))) { | ||
| deserializedList.add(null); | ||
| continue; | ||
| } | ||
| byte[] payload = new byte[entrySize]; | ||
| if (dis.read(payload) == -1) { | ||
| throw new SerializationException("End of the stream was reached prematurely"); | ||
| } | ||
| deserializedList.add(inner.deserialize(topic, payload)); | ||
| } | ||
| return deserializedList; | ||
| } catch (IOException e) { | ||
| throw new KafkaException("Unable to deserialize into a List", e); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| if (inner != null) { | ||
| inner.close(); | ||
| } | ||
| } | ||
|
|
||
| } | ||
142 changes: 142 additions & 0 deletions
142
clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,142 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.kafka.common.serialization; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.Iterator; | ||
| import org.apache.kafka.clients.CommonClientConfigs; | ||
| import org.apache.kafka.common.KafkaException; | ||
| import org.apache.kafka.common.config.ConfigException; | ||
| import org.apache.kafka.common.utils.Utils; | ||
|
|
||
| import java.io.ByteArrayOutputStream; | ||
| import java.io.DataOutputStream; | ||
| import java.io.IOException; | ||
| import java.util.Arrays; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
||
| import static org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy; | ||
|
|
||
| public class ListSerializer<Inner> implements Serializer<List<Inner>> { | ||
|
|
||
| private static final List<Class<? extends Serializer<?>>> FIXED_LENGTH_SERIALIZERS = Arrays.asList( | ||
| ShortSerializer.class, | ||
| IntegerSerializer.class, | ||
| FloatSerializer.class, | ||
| LongSerializer.class, | ||
| DoubleSerializer.class, | ||
| UUIDSerializer.class); | ||
|
|
||
| private Serializer<Inner> inner; | ||
| private SerializationStrategy serStrategy; | ||
|
|
||
| public ListSerializer() {} | ||
|
|
||
| public ListSerializer(Serializer<Inner> inner) { | ||
| if (inner == null) { | ||
| throw new IllegalArgumentException("ListSerializer requires \"serializer\" parameter to be provided during initialization"); | ||
| } | ||
| this.inner = inner; | ||
| this.serStrategy = FIXED_LENGTH_SERIALIZERS.contains(inner.getClass()) ? SerializationStrategy.CONSTANT_SIZE : SerializationStrategy.VARIABLE_SIZE; | ||
| } | ||
|
|
||
| public Serializer<Inner> getInnerSerializer() { | ||
| return inner; | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| @Override | ||
| public void configure(Map<String, ?> configs, boolean isKey) { | ||
| if (inner != null) { | ||
| throw new ConfigException("List serializer was already initialized using a non-default constructor"); | ||
| } | ||
| final String innerSerdePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS; | ||
| final Object innerSerdeClassOrName = configs.get(innerSerdePropertyName); | ||
| if (innerSerdeClassOrName == null) { | ||
| throw new ConfigException("Not able to determine the serializer class because it was neither passed via the constructor nor set in the config."); | ||
| } | ||
| try { | ||
| if (innerSerdeClassOrName instanceof String) { | ||
| inner = Utils.newInstance((String) innerSerdeClassOrName, Serde.class).serializer(); | ||
| } else if (innerSerdeClassOrName instanceof Class) { | ||
| inner = (Serializer<Inner>) ((Serde) Utils.newInstance((Class) innerSerdeClassOrName)).serializer(); | ||
| } else { | ||
| throw new KafkaException("Could not create a serializer class instance using \"" + innerSerdePropertyName + "\" property."); | ||
| } | ||
| inner.configure(configs, isKey); | ||
| serStrategy = FIXED_LENGTH_SERIALIZERS.contains(inner.getClass()) ? SerializationStrategy.CONSTANT_SIZE : SerializationStrategy.VARIABLE_SIZE; | ||
| } catch (final ClassNotFoundException e) { | ||
| throw new ConfigException(innerSerdePropertyName, innerSerdeClassOrName, "Serializer class " + innerSerdeClassOrName + " could not be found."); | ||
| } | ||
| } | ||
|
|
||
| private void serializeNullIndexList(final DataOutputStream out, List<Inner> data) throws IOException { | ||
| int i = 0; | ||
| List<Integer> nullIndexList = new ArrayList<>(); | ||
| for (Iterator<Inner> it = data.listIterator(); it.hasNext(); i++) { | ||
| if (it.next() == null) { | ||
| nullIndexList.add(i); | ||
| } | ||
| } | ||
| out.writeInt(nullIndexList.size()); | ||
| for (int nullIndex : nullIndexList) { | ||
| out.writeInt(nullIndex); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public byte[] serialize(String topic, List<Inner> data) { | ||
| if (data == null) { | ||
| return null; | ||
| } | ||
| try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); | ||
| final DataOutputStream out = new DataOutputStream(baos)) { | ||
| out.writeByte(serStrategy.ordinal()); // write serialization strategy flag | ||
| if (serStrategy == SerializationStrategy.CONSTANT_SIZE) { | ||
yeralin marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // In CONSTANT_SIZE strategy, indexes of null entries are encoded in a null index list | ||
| serializeNullIndexList(out, data); | ||
| } | ||
| final int size = data.size(); | ||
| out.writeInt(size); | ||
| for (Inner entry : data) { | ||
| if (entry == null) { | ||
| if (serStrategy == SerializationStrategy.VARIABLE_SIZE) { | ||
| out.writeInt(Serdes.ListSerde.NULL_ENTRY_VALUE); | ||
| } | ||
| } else { | ||
| final byte[] bytes = inner.serialize(topic, entry); | ||
| if (serStrategy == SerializationStrategy.VARIABLE_SIZE) { | ||
| out.writeInt(bytes.length); | ||
yeralin marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| out.write(bytes); | ||
| } | ||
| } | ||
| return baos.toByteArray(); | ||
| } catch (IOException e) { | ||
| throw new KafkaException("Failed to serialize List", e); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| if (inner != null) { | ||
| inner.close(); | ||
| } | ||
| } | ||
|
|
||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.