Skip to content

Commit ed149c6

Browse files
committed
Fix compatibility with Flink 1.19/1.20 RowDataToJsonConverters
1 parent fcfc130 commit ed149c6

File tree

1 file changed

+44
-0
lines changed

1 file changed

+44
-0
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/JsonRowDataSerializationSchemaUtils.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.formats.common.TimestampFormat;
2323
import org.apache.flink.formats.json.JsonFormatOptions;
2424
import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
25+
import org.apache.flink.formats.json.RowDataToJsonConverters;
2526
import org.apache.flink.table.types.logical.RowType;
2627

2728
import java.lang.reflect.Constructor;
@@ -86,6 +87,49 @@ public static JsonRowDataSerializationSchema createSerializationSchema(
8687
"Failed to find appropriate constructor for JsonRowDataSerializationSchema,please check your Flink version is 1.19 or 1.20.");
8788
}
8889

90+
/**
91+
* In flink>=1.20, the constructor of RowDataToJsonConverters has 4 parameters, and in
92+
* flink<1.20, the constructor of RowDataToJsonConverters has 3 parameters.
93+
*/
94+
public static RowDataToJsonConverters createRowDataToJsonConverters(
95+
TimestampFormat timestampFormat,
96+
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
97+
String mapNullKeyLiteral,
98+
boolean ignoreNullFields) {
99+
try {
100+
Class<?>[] fullParams =
101+
new Class[] {
102+
TimestampFormat.class,
103+
JsonFormatOptions.MapNullKeyMode.class,
104+
String.class,
105+
boolean.class
106+
};
107+
108+
Object[] fullParamValues =
109+
new Object[] {
110+
timestampFormat, mapNullKeyMode, mapNullKeyLiteral, ignoreNullFields
111+
};
112+
113+
for (int i = fullParams.length; i >= 3; i--) {
114+
try {
115+
Constructor<?> constructor =
116+
RowDataToJsonConverters.class.getConstructor(
117+
Arrays.copyOfRange(fullParams, 0, i));
118+
119+
return (RowDataToJsonConverters)
120+
constructor.newInstance(Arrays.copyOfRange(fullParamValues, 0, i));
121+
} catch (NoSuchMethodException ignored) {
122+
}
123+
}
124+
} catch (Exception e) {
125+
throw new RuntimeException(
126+
"Failed to create RowDataToJsonConverters,please check your Flink version is 1.19 or 1.20.",
127+
e);
128+
}
129+
throw new RuntimeException(
130+
"Failed to find appropriate constructor for RowDataToJsonConverters,please check your Flink version is 1.19 or 1.20.");
131+
}
132+
89133
/** flink>=1.20 only has the ENCODE_IGNORE_NULL_FIELDS parameter. */
90134
public static boolean enableIgnoreNullFields(ReadableConfig formatOptions) {
91135
try {

0 commit comments

Comments
 (0)