Skip to content

Commit

Permalink
[FLINK-37101] Remove legacy internal methods for changing serializer …
Browse files Browse the repository at this point in the history
…behavior in SerializerConfig
  • Loading branch information
X-czh committed Jan 13, 2025
1 parent 7f1c201 commit 2fcc6fc
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.connector.file.table.stream;

import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.typeinfo.TypeInformation;

Expand All @@ -30,7 +29,7 @@
public class PartitionCommitInfoTest {
@Test
public void testPartitionCommitSerializer() {
SerializerConfig serializerConfig = new SerializerConfigImpl();
SerializerConfigImpl serializerConfig = new SerializerConfigImpl();
serializerConfig.setGenericTypes(false);
assertNotNull(
TypeInformation.of(PartitionCommitInfo.class).createSerializer(serializerConfig));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.api.common.serialization;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
import org.apache.flink.configuration.PipelineOptions;
Expand All @@ -39,94 +38,6 @@
*/
@PublicEvolving
public interface SerializerConfig extends Serializable {
/**
* Adds a new Kryo default serializer to the Runtime.
*
* <p>Note that the serializer instance must be serializable (as defined by
* java.io.Serializable), because it may be distributed to the worker nodes by java
* serialization.
*
* <p>The method will be converted to private in the next Flink major version after removing its
* deprecated caller methods.
*
* @param type The class of the types serialized with the given serializer.
* @param serializer The serializer to use.
*/
@Internal
<T extends Serializer<?> & Serializable> void addDefaultKryoSerializer(
Class<?> type, T serializer);

/**
* Adds a new Kryo default serializer to the Runtime.
*
* <p>The method will be converted to private in the next Flink major version after removing its
* deprecated caller methods.
*
* @param type The class of the types serialized with the given serializer.
* @param serializerClass The class of the serializer to use.
*/
@Internal
void addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass);

/**
* Registers the given type with a Kryo Serializer.
*
* <p>Note that the serializer instance must be serializable (as defined by
* java.io.Serializable), because it may be distributed to the worker nodes by java
* serialization.
*
* <p>The method will be converted to private in the next Flink major version after removing its
* deprecated caller methods.
*
* @param type The class of the types serialized with the given serializer.
* @param serializer The serializer to use.
*/
@Internal
<T extends Serializer<?> & Serializable> void registerTypeWithKryoSerializer(
Class<?> type, T serializer);

/**
* Registers the given Serializer via its class as a serializer for the given type at the
* KryoSerializer.
*
* <p>The method will be converted to private in the next Flink major version after removing its
* deprecated caller methods.
*
* @param type The class of the types serialized with the given serializer.
* @param serializerClass The class of the serializer to use.
*/
@Internal
@SuppressWarnings("rawtypes")
void registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass);

/**
* Registers the given type with the serialization stack. If the type is eventually serialized
* as a POJO, then the type is registered with the POJO serializer. If the type ends up being
* serialized with Kryo, then it will be registered at Kryo to make sure that only tags are
* written.
*
* <p>The method will be converted to private in the next Flink major version after removing its
* deprecated caller methods.
*
* @param type The class of the type to register.
*/
@Internal
void registerPojoType(Class<?> type);

/**
* Registers the given type with the serialization stack. If the type is eventually serialized
* as a POJO, then the type is registered with the POJO serializer. If the type ends up being
* serialized with Kryo, then it will be registered at Kryo to make sure that only tags are
* written.
*
* <p>The method will be converted to private in the next Flink major version after removing its
* deprecated caller methods.
*
* @param type The class of the type to register.
*/
@Internal
void registerKryoType(Class<?> type);

/** Returns the registered types with their Kryo Serializer classes. */
LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
getRegisteredTypesWithKryoSerializerClasses();
Expand All @@ -151,40 +62,12 @@ <T extends Serializer<?> & Serializable> void registerTypeWithKryoSerializer(
*/
boolean hasGenericTypesDisabled();

/**
* The method will be converted to private in the next Flink major version after removing its
* deprecated caller methods.
*/
@Internal
void setGenericTypes(boolean genericTypes);

/** Returns whether Kryo is the serializer for POJOs. */
boolean isForceKryoEnabled();

/**
* The method will be converted to private in the next Flink major version after removing its
* deprecated caller methods.
*/
@Internal
void setForceKryo(boolean forceKryo);

/** Returns whether the Apache Avro is the serializer for POJOs. */
boolean isForceAvroEnabled();

/**
* The method will be converted to private in the next Flink major version after removing its
* deprecated caller methods.
*/
@Internal
void setForceAvro(boolean forceAvro);

/**
* The method will be converted to private in the next Flink major version after removing its
* deprecated caller methods.
*/
@Internal
public void setForceKryoAvro(boolean forceKryoAvro);

/** Returns whether forces Flink to register Apache Avro classes in Kryo serializer. */
TernaryBoolean isForceKryoAvroEnabled();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,10 @@ public void setForceAvro(boolean forceAvro) {
configuration.set(PipelineOptions.FORCE_AVRO, forceAvro);
}

@Override
public void setForceKryoAvro(boolean forceKryoAvro) {
configuration.set(PipelineOptions.FORCE_KRYO_AVRO, forceKryoAvro);
}

@Override
public TernaryBoolean isForceKryoAvroEnabled() {
return configuration
.getOptional(PipelineOptions.FORCE_KRYO_AVRO)
Expand Down Expand Up @@ -356,59 +354,9 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) {
configuration
.getOptional(PipelineOptions.FORCE_KRYO_AVRO)
.ifPresent(this::setForceKryoAvro);

try {
configuration
.getOptional(PipelineOptions.SERIALIZATION_CONFIG)
.ifPresent(c -> parseSerializationConfigWithExceptionHandling(classLoader, c));
} catch (Exception e) {
throw e;
}
}

private LinkedHashSet<Class<?>> loadClasses(
List<String> classNames, ClassLoader classLoader, String errorMessage) {
return classNames.stream()
.map(name -> this.<Class<?>>loadClass(name, classLoader, errorMessage))
.collect(Collectors.toCollection(LinkedHashSet::new));
}

private LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
parseKryoSerializersWithExceptionHandling(
ClassLoader classLoader, List<String> kryoSerializers) {
try {
return parseKryoSerializers(classLoader, kryoSerializers);
} catch (Exception e) {
throw new IllegalArgumentException(
String.format(
"Could not configure kryo serializers from %s. The expected format is:"
+ "'class:<fully qualified class name>,serializer:<fully qualified serializer name>;...",
kryoSerializers),
e);
}
}

private LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> parseKryoSerializers(
ClassLoader classLoader, List<String> kryoSerializers) {
return kryoSerializers.stream()
.map(ConfigurationUtils::parseStringToMap)
.collect(
Collectors.toMap(
m ->
loadClass(
m.get("class"),
classLoader,
"Could not load class for kryo serialization"),
m ->
loadClass(
m.get("serializer"),
classLoader,
"Could not load serializer's class"),
(m1, m2) -> {
throw new IllegalArgumentException(
"Duplicated serializer for class: " + m1);
},
LinkedHashMap::new));
configuration
.getOptional(PipelineOptions.SERIALIZATION_CONFIG)
.ifPresent(c -> parseSerializationConfigWithExceptionHandling(classLoader, c));
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.api.common;

import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
Expand Down Expand Up @@ -62,24 +63,36 @@ private static Collection<TestSpec> specs() {
.whenSetFromFile("pipeline.force-avro", "true")
.viaSetter(
booleanSetter(
(ec) -> ec.getSerializerConfig().setForceAvro(true),
(ec) -> ec.getSerializerConfig().setForceAvro(false)))
(ec) ->
((SerializerConfigImpl) ec.getSerializerConfig())
.setForceAvro(true),
(ec) ->
((SerializerConfigImpl) ec.getSerializerConfig())
.setForceAvro(false)))
.getterVia((ec) -> ec.getSerializerConfig().isForceAvroEnabled())
.nonDefaultValue(true),
TestSpec.testValue(false)
.whenSetFromFile("pipeline.force-kryo", "false")
.viaSetter(
booleanSetter(
(ec) -> ec.getSerializerConfig().setForceKryo(true),
(ec) -> ec.getSerializerConfig().setForceKryo(false)))
(ec) ->
((SerializerConfigImpl) ec.getSerializerConfig())
.setForceKryo(true),
(ec) ->
((SerializerConfigImpl) ec.getSerializerConfig())
.setForceKryo(false)))
.getterVia((ec) -> ec.getSerializerConfig().isForceKryoEnabled())
.nonDefaultValue(false),
TestSpec.testValue(false)
.whenSetFromFile("pipeline.generic-types", "false")
.viaSetter(
booleanSetter(
(ec) -> ec.getSerializerConfig().setGenericTypes(true),
(ec) -> ec.getSerializerConfig().setGenericTypes(false)))
(ec) ->
((SerializerConfigImpl) ec.getSerializerConfig())
.setGenericTypes(true),
(ec) ->
((SerializerConfigImpl) ec.getSerializerConfig())
.setGenericTypes(false)))
.getterVia(
execConfig ->
!execConfig.getSerializerConfig().hasGenericTypesDisabled())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,12 @@ void testExecutionConfigSerialization() throws Exception {
} else {
config.disableClosureCleaner();
}
config.getSerializerConfig().setForceAvro(forceAvroEnabled);
config.getSerializerConfig().setForceKryo(forceKryoEnabled);
config.getSerializerConfig().setGenericTypes(!disableGenericTypes);

final SerializerConfigImpl serializerConfig =
(SerializerConfigImpl) config.getSerializerConfig();
serializerConfig.setForceAvro(forceAvroEnabled);
serializerConfig.setForceKryo(forceKryoEnabled);
serializerConfig.setGenericTypes(!disableGenericTypes);
if (objectReuseEnabled) {
config.enableObjectReuse();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ void testDefaultKryoRegisteredClassesDidNotChange() throws Exception {
@Test
void testEnableForceKryoAvroRegister() {
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.getSerializerConfig().setForceKryoAvro(true);
((SerializerConfigImpl) executionConfig.getSerializerConfig()).setForceKryoAvro(true);
final Kryo kryo =
new KryoSerializer<>(Integer.class, executionConfig.getSerializerConfig())
.getKryo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.formats.avro.typeutils;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -129,7 +130,7 @@ void testSimpleAvroRead(boolean useMiniCluster, @InjectMiniCluster MiniCluster m
void testSerializeWithAvro(boolean useMiniCluster, @InjectMiniCluster MiniCluster miniCluster)
throws Exception {
final StreamExecutionEnvironment env = getExecutionEnvironment(useMiniCluster, miniCluster);
env.getConfig().getSerializerConfig().setForceAvro(true);
((SerializerConfigImpl) env.getConfig().getSerializerConfig()).setForceKryoAvro(true);
Path in = new Path(inFile.getAbsoluteFile().toURI());

AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
Expand Down Expand Up @@ -209,7 +210,7 @@ void testKeySelection(boolean useMiniCluster, @InjectMiniCluster MiniCluster min
void testWithAvroGenericSer(boolean useMiniCluster, @InjectMiniCluster MiniCluster miniCluster)
throws Exception {
final StreamExecutionEnvironment env = getExecutionEnvironment(useMiniCluster, miniCluster);
env.getConfig().getSerializerConfig().setForceAvro(true);
((SerializerConfigImpl) env.getConfig().getSerializerConfig()).setForceKryoAvro(true);
Path in = new Path(inFile.getAbsoluteFile().toURI());

AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
Expand Down Expand Up @@ -237,7 +238,7 @@ void testWithAvroGenericSer(boolean useMiniCluster, @InjectMiniCluster MiniClust
void testWithKryoGenericSer(boolean useMiniCluster, @InjectMiniCluster MiniCluster miniCluster)
throws Exception {
final StreamExecutionEnvironment env = getExecutionEnvironment(useMiniCluster, miniCluster);
env.getConfig().getSerializerConfig().setForceKryo(true);
((SerializerConfigImpl) env.getConfig().getSerializerConfig()).setForceKryoAvro(true);
Path in = new Path(inFile.getAbsoluteFile().toURI());

AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.types;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
Expand Down Expand Up @@ -69,7 +68,7 @@ public static <T> void assertSerializedAsPojo(Class<T> clazz) throws AssertionEr
* Kryo for one or more fields
*/
public static <T> void assertSerializedAsPojoWithoutKryo(Class<T> clazz) throws AssertionError {
final SerializerConfig serializerConfig = new SerializerConfigImpl();
final SerializerConfigImpl serializerConfig = new SerializerConfigImpl();
serializerConfig.setGenericTypes(false);

final TypeInformation<T> typeInformation = TypeInformation.of(clazz);
Expand Down

0 comments on commit 2fcc6fc

Please sign in to comment.