Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37101] Remove legacy internal methods for changing serializer behavior in SerializerConfig #25960

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.connector.file.table.stream;

import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.typeinfo.TypeInformation;

Expand All @@ -30,7 +29,7 @@
public class PartitionCommitInfoTest {
@Test
public void testPartitionCommitSerializer() {
SerializerConfig serializerConfig = new SerializerConfigImpl();
SerializerConfigImpl serializerConfig = new SerializerConfigImpl();
serializerConfig.setGenericTypes(false);
assertNotNull(
TypeInformation.of(PartitionCommitInfo.class).createSerializer(serializerConfig));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.api.common.serialization;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
import org.apache.flink.configuration.PipelineOptions;
Expand All @@ -39,94 +38,6 @@
*/
@PublicEvolving
public interface SerializerConfig extends Serializable {
/**
* Adds a new Kryo default serializer to the Runtime.
*
* <p>Note that the serializer instance must be serializable (as defined by
* java.io.Serializable), because it may be distributed to the worker nodes by java
* serialization.
*
* <p>The method will be converted to private in the next Flink major version after removing its
* deprecated caller methods.
*
* @param type The class of the types serialized with the given serializer.
* @param serializer The serializer to use.
*/
@Internal
<T extends Serializer<?> & Serializable> void addDefaultKryoSerializer(
Class<?> type, T serializer);

/**
* Adds a new Kryo default serializer to the Runtime.
*
* <p>The method will be converted to private in the next Flink major version after removing its
* deprecated caller methods.
*
* @param type The class of the types serialized with the given serializer.
* @param serializerClass The class of the serializer to use.
*/
@Internal
void addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass);

/**
* Registers the given type with a Kryo Serializer.
*
* <p>Note that the serializer instance must be serializable (as defined by
* java.io.Serializable), because it may be distributed to the worker nodes by java
* serialization.
*
* <p>The method will be converted to private in the next Flink major version after removing its
* deprecated caller methods.
*
* @param type The class of the types serialized with the given serializer.
* @param serializer The serializer to use.
*/
@Internal
<T extends Serializer<?> & Serializable> void registerTypeWithKryoSerializer(
Class<?> type, T serializer);

/**
* Registers the given Serializer via its class as a serializer for the given type at the
* KryoSerializer.
*
* <p>The method will be converted to private in the next Flink major version after removing its
* deprecated caller methods.
*
* @param type The class of the types serialized with the given serializer.
* @param serializerClass The class of the serializer to use.
*/
@Internal
@SuppressWarnings("rawtypes")
void registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass);

/**
* Registers the given type with the serialization stack. If the type is eventually serialized
* as a POJO, then the type is registered with the POJO serializer. If the type ends up being
* serialized with Kryo, then it will be registered at Kryo to make sure that only tags are
* written.
*
* <p>The method will be converted to private in the next Flink major version after removing its
* deprecated caller methods.
*
* @param type The class of the type to register.
*/
@Internal
void registerPojoType(Class<?> type);

/**
* Registers the given type with the serialization stack. If the type is eventually serialized
* as a POJO, then the type is registered with the POJO serializer. If the type ends up being
* serialized with Kryo, then it will be registered at Kryo to make sure that only tags are
* written.
*
* <p>The method will be converted to private in the next Flink major version after removing its
* deprecated caller methods.
*
* @param type The class of the type to register.
*/
@Internal
void registerKryoType(Class<?> type);

/** Returns the registered types with their Kryo Serializer classes. */
LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
getRegisteredTypesWithKryoSerializerClasses();
Expand All @@ -151,40 +62,12 @@ <T extends Serializer<?> & Serializable> void registerTypeWithKryoSerializer(
*/
boolean hasGenericTypesDisabled();

/**
* The method will be converted to private in the next Flink major version after removing its
* deprecated caller methods.
*/
@Internal
void setGenericTypes(boolean genericTypes);

/** Returns whether Kryo is the serializer for POJOs. */
boolean isForceKryoEnabled();

/**
* The method will be converted to private in the next Flink major version after removing its
* deprecated caller methods.
*/
@Internal
void setForceKryo(boolean forceKryo);

/** Returns whether the Apache Avro is the serializer for POJOs. */
boolean isForceAvroEnabled();

/**
* The method will be converted to private in the next Flink major version after removing its
* deprecated caller methods.
*/
@Internal
void setForceAvro(boolean forceAvro);

/**
* The method will be converted to private in the next Flink major version after removing its
* deprecated caller methods.
*/
@Internal
public void setForceKryoAvro(boolean forceKryoAvro);

/** Returns whether forces Flink to register Apache Avro classes in Kryo serializer. */
TernaryBoolean isForceKryoAvroEnabled();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,10 @@ public void setForceAvro(boolean forceAvro) {
configuration.set(PipelineOptions.FORCE_AVRO, forceAvro);
}

@Override
public void setForceKryoAvro(boolean forceKryoAvro) {
configuration.set(PipelineOptions.FORCE_KRYO_AVRO, forceKryoAvro);
}

@Override
public TernaryBoolean isForceKryoAvroEnabled() {
return configuration
.getOptional(PipelineOptions.FORCE_KRYO_AVRO)
Expand Down Expand Up @@ -356,59 +354,9 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) {
configuration
.getOptional(PipelineOptions.FORCE_KRYO_AVRO)
.ifPresent(this::setForceKryoAvro);

try {
configuration
.getOptional(PipelineOptions.SERIALIZATION_CONFIG)
.ifPresent(c -> parseSerializationConfigWithExceptionHandling(classLoader, c));
} catch (Exception e) {
throw e;
}
}

private LinkedHashSet<Class<?>> loadClasses(
List<String> classNames, ClassLoader classLoader, String errorMessage) {
return classNames.stream()
.map(name -> this.<Class<?>>loadClass(name, classLoader, errorMessage))
.collect(Collectors.toCollection(LinkedHashSet::new));
}

private LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
parseKryoSerializersWithExceptionHandling(
ClassLoader classLoader, List<String> kryoSerializers) {
try {
return parseKryoSerializers(classLoader, kryoSerializers);
} catch (Exception e) {
throw new IllegalArgumentException(
String.format(
"Could not configure kryo serializers from %s. The expected format is:"
+ "'class:<fully qualified class name>,serializer:<fully qualified serializer name>;...",
kryoSerializers),
e);
}
}

private LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> parseKryoSerializers(
ClassLoader classLoader, List<String> kryoSerializers) {
return kryoSerializers.stream()
.map(ConfigurationUtils::parseStringToMap)
.collect(
Collectors.toMap(
m ->
loadClass(
m.get("class"),
classLoader,
"Could not load class for kryo serialization"),
m ->
loadClass(
m.get("serializer"),
classLoader,
"Could not load serializer's class"),
(m1, m2) -> {
throw new IllegalArgumentException(
"Duplicated serializer for class: " + m1);
},
LinkedHashMap::new));
configuration
.getOptional(PipelineOptions.SERIALIZATION_CONFIG)
.ifPresent(c -> parseSerializationConfigWithExceptionHandling(classLoader, c));
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.api.common;

import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
Expand Down Expand Up @@ -62,24 +63,36 @@ private static Collection<TestSpec> specs() {
.whenSetFromFile("pipeline.force-avro", "true")
.viaSetter(
booleanSetter(
(ec) -> ec.getSerializerConfig().setForceAvro(true),
(ec) -> ec.getSerializerConfig().setForceAvro(false)))
(ec) ->
((SerializerConfigImpl) ec.getSerializerConfig())
.setForceAvro(true),
(ec) ->
((SerializerConfigImpl) ec.getSerializerConfig())
.setForceAvro(false)))
.getterVia((ec) -> ec.getSerializerConfig().isForceAvroEnabled())
.nonDefaultValue(true),
TestSpec.testValue(false)
.whenSetFromFile("pipeline.force-kryo", "false")
.viaSetter(
booleanSetter(
(ec) -> ec.getSerializerConfig().setForceKryo(true),
(ec) -> ec.getSerializerConfig().setForceKryo(false)))
(ec) ->
((SerializerConfigImpl) ec.getSerializerConfig())
.setForceKryo(true),
(ec) ->
((SerializerConfigImpl) ec.getSerializerConfig())
.setForceKryo(false)))
.getterVia((ec) -> ec.getSerializerConfig().isForceKryoEnabled())
.nonDefaultValue(false),
TestSpec.testValue(false)
.whenSetFromFile("pipeline.generic-types", "false")
.viaSetter(
booleanSetter(
(ec) -> ec.getSerializerConfig().setGenericTypes(true),
(ec) -> ec.getSerializerConfig().setGenericTypes(false)))
(ec) ->
((SerializerConfigImpl) ec.getSerializerConfig())
.setGenericTypes(true),
(ec) ->
((SerializerConfigImpl) ec.getSerializerConfig())
.setGenericTypes(false)))
.getterVia(
execConfig ->
!execConfig.getSerializerConfig().hasGenericTypesDisabled())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,12 @@ void testExecutionConfigSerialization() throws Exception {
} else {
config.disableClosureCleaner();
}
config.getSerializerConfig().setForceAvro(forceAvroEnabled);
config.getSerializerConfig().setForceKryo(forceKryoEnabled);
config.getSerializerConfig().setGenericTypes(!disableGenericTypes);

final SerializerConfigImpl serializerConfig =
(SerializerConfigImpl) config.getSerializerConfig();
serializerConfig.setForceAvro(forceAvroEnabled);
serializerConfig.setForceKryo(forceKryoEnabled);
serializerConfig.setGenericTypes(!disableGenericTypes);
if (objectReuseEnabled) {
config.enableObjectReuse();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ void testDefaultKryoRegisteredClassesDidNotChange() throws Exception {
@Test
void testEnableForceKryoAvroRegister() {
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.getSerializerConfig().setForceKryoAvro(true);
((SerializerConfigImpl) executionConfig.getSerializerConfig()).setForceKryoAvro(true);
final Kryo kryo =
new KryoSerializer<>(Integer.class, executionConfig.getSerializerConfig())
.getKryo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.formats.avro.typeutils;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -129,7 +130,7 @@ void testSimpleAvroRead(boolean useMiniCluster, @InjectMiniCluster MiniCluster m
void testSerializeWithAvro(boolean useMiniCluster, @InjectMiniCluster MiniCluster miniCluster)
throws Exception {
final StreamExecutionEnvironment env = getExecutionEnvironment(useMiniCluster, miniCluster);
env.getConfig().getSerializerConfig().setForceAvro(true);
((SerializerConfigImpl) env.getConfig().getSerializerConfig()).setForceKryoAvro(true);
Path in = new Path(inFile.getAbsoluteFile().toURI());

AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
Expand Down Expand Up @@ -209,7 +210,7 @@ void testKeySelection(boolean useMiniCluster, @InjectMiniCluster MiniCluster min
void testWithAvroGenericSer(boolean useMiniCluster, @InjectMiniCluster MiniCluster miniCluster)
throws Exception {
final StreamExecutionEnvironment env = getExecutionEnvironment(useMiniCluster, miniCluster);
env.getConfig().getSerializerConfig().setForceAvro(true);
((SerializerConfigImpl) env.getConfig().getSerializerConfig()).setForceKryoAvro(true);
Path in = new Path(inFile.getAbsoluteFile().toURI());

AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
Expand Down Expand Up @@ -237,7 +238,7 @@ void testWithAvroGenericSer(boolean useMiniCluster, @InjectMiniCluster MiniClust
void testWithKryoGenericSer(boolean useMiniCluster, @InjectMiniCluster MiniCluster miniCluster)
throws Exception {
final StreamExecutionEnvironment env = getExecutionEnvironment(useMiniCluster, miniCluster);
env.getConfig().getSerializerConfig().setForceKryo(true);
((SerializerConfigImpl) env.getConfig().getSerializerConfig()).setForceKryoAvro(true);
Path in = new Path(inFile.getAbsoluteFile().toURI());

AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.types;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
Expand Down Expand Up @@ -69,7 +68,7 @@ public static <T> void assertSerializedAsPojo(Class<T> clazz) throws AssertionEr
* Kryo for one or more fields
*/
public static <T> void assertSerializedAsPojoWithoutKryo(Class<T> clazz) throws AssertionError {
final SerializerConfig serializerConfig = new SerializerConfigImpl();
final SerializerConfigImpl serializerConfig = new SerializerConfigImpl();
serializerConfig.setGenericTypes(false);

final TypeInformation<T> typeInformation = TypeInformation.of(clazz);
Expand Down