org.apache.flink
flink-cdc-common
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/BuiltInScalarFunction.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/BuiltInScalarFunction.java
index d1052cf398b..80141a13e36 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/BuiltInScalarFunction.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/BuiltInScalarFunction.java
@@ -18,7 +18,6 @@
package org.apache.flink.cdc.runtime.functions;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.functions.BuiltInFunctionDefinition;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
@@ -31,28 +30,17 @@
import javax.annotation.Nullable;
-import java.util.Optional;
import java.util.function.Function;
-import static org.apache.flink.table.functions.BuiltInFunctionDefinition.DEFAULT_VERSION;
-import static org.apache.flink.table.functions.BuiltInFunctionDefinition.qualifyFunctionName;
-import static org.apache.flink.table.functions.BuiltInFunctionDefinition.validateFunction;
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
/**
* This is the case when the operator has a special parsing syntax or uses other Calcite-specific
- * features that are not exposed via {@link BuiltInFunctionDefinition} yet.
- *
- * Note: Try to keep usages of this class to a minimum and use Flink's {@link
- * BuiltInFunctionDefinition} stack instead.
- *
- *
For simple functions, use the provided builder. Otherwise, this class can also be extended.
+ * features that are not exposed via {@link SqlFunction} yet.
*/
@Internal
public class BuiltInScalarFunction extends SqlFunction {
- private final @Nullable Integer version;
-
private final boolean isDeterministic;
private final boolean isInternal;
@@ -61,7 +49,6 @@ public class BuiltInScalarFunction extends SqlFunction {
protected BuiltInScalarFunction(
String name,
- int version,
SqlKind kind,
@Nullable SqlReturnTypeInference returnTypeInference,
@Nullable SqlOperandTypeInference operandTypeInference,
@@ -77,11 +64,9 @@ protected BuiltInScalarFunction(
operandTypeInference,
operandTypeChecker,
checkNotNull(category));
- this.version = isInternal ? null : version;
this.isDeterministic = isDeterministic;
this.isInternal = isInternal;
this.monotonicity = monotonicity;
- validateFunction(name, version, isInternal);
}
protected BuiltInScalarFunction(
@@ -93,7 +78,6 @@ protected BuiltInScalarFunction(
SqlFunctionCategory category) {
this(
name,
- DEFAULT_VERSION,
kind,
returnTypeInference,
operandTypeInference,
@@ -109,18 +93,6 @@ public static Builder newBuilder() {
return new Builder();
}
- public final Optional getVersion() {
- return Optional.ofNullable(version);
- }
-
- public String getQualifiedName() {
- if (isInternal) {
- return getName();
- }
- assert version != null;
- return qualifyFunctionName(getName(), version);
- }
-
@Override
public boolean isDeterministic() {
return isDeterministic;
@@ -144,8 +116,6 @@ public static class Builder {
private String name;
- private int version = DEFAULT_VERSION;
-
private SqlKind kind = SqlKind.OTHER_FUNCTION;
private SqlReturnTypeInference returnTypeInference;
@@ -163,18 +133,11 @@ public static class Builder {
private Function monotonicity =
call -> SqlMonotonicity.NOT_MONOTONIC;
- /** @see BuiltInFunctionDefinition.Builder#name(String) */
public Builder name(String name) {
this.name = name;
return this;
}
- /** @see BuiltInFunctionDefinition.Builder#version(int) */
- public Builder version(int version) {
- this.version = version;
- return this;
- }
-
public Builder kind(SqlKind kind) {
this.kind = kind;
return this;
@@ -205,7 +168,6 @@ public Builder notDeterministic() {
return this;
}
- /** @see BuiltInFunctionDefinition.Builder#internal() */
public Builder internal() {
this.isInternal = true;
return this;
@@ -224,7 +186,6 @@ public Builder monotonicity(Function monoto
public BuiltInScalarFunction build() {
return new BuiltInScalarFunction(
name,
- version,
kind,
returnTypeInference,
operandTypeInference,
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
index ae598b7da88..ea7bf4c5c13 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
@@ -51,8 +51,8 @@
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.SqlSelect;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.parser.SqlParserPos;
@@ -60,7 +60,6 @@
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
-import org.apache.calcite.sql.util.ListSqlOperatorTable;
import org.apache.calcite.sql.util.SqlOperatorTables;
import org.apache.calcite.sql.validate.SqlConformanceEnum;
import org.apache.calcite.sql.validate.SqlValidator;
@@ -156,13 +155,10 @@ private static RelNode sqlToRel(
factory,
new CalciteConnectionConfigImpl(new Properties()));
TransformSqlOperatorTable transformSqlOperatorTable = TransformSqlOperatorTable.instance();
- SqlStdOperatorTable sqlStdOperatorTable = SqlStdOperatorTable.instance();
- ListSqlOperatorTable udfOperatorTable = new ListSqlOperatorTable();
- udfFunctions.forEach(udfOperatorTable::add);
+ SqlOperatorTable udfOperatorTable = SqlOperatorTables.of(udfFunctions);
SqlValidator validator =
SqlValidatorUtil.newValidator(
- SqlOperatorTables.chain(
- sqlStdOperatorTable, transformSqlOperatorTable, udfOperatorTable),
+ SqlOperatorTables.chain(transformSqlOperatorTable, udfOperatorTable),
calciteCatalogReader,
factory,
SqlValidator.Config.DEFAULT.withIdentifierExpansion(true));
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
index 35253f27918..658550da0c9 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
@@ -20,13 +20,18 @@
import org.apache.flink.cdc.runtime.functions.BuiltInScalarFunction;
import org.apache.flink.cdc.runtime.functions.BuiltInTimestampFunction;
+import org.apache.calcite.sql.SqlBinaryOperator;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlPostfixOperator;
+import org.apache.calcite.sql.SqlPrefixOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlSyntax;
-import org.apache.calcite.sql.fun.SqlCurrentDateFunction;
+import org.apache.calcite.sql.fun.SqlBetweenOperator;
+import org.apache.calcite.sql.fun.SqlCaseOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
@@ -75,6 +80,78 @@ public void lookupOperatorOverloads(
SqlNameMatchers.withCaseSensitive(false));
}
+ // The following binary functions are sorted in documentation definitions. See
+ // https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/core-concept/transform/ for a
+ // full list of CDC supported built-in functions.
+
+ // --------------------
+ // Comparison Functions
+ // --------------------
+ public static final SqlBinaryOperator EQUALS = SqlStdOperatorTable.EQUALS;
+ public static final SqlBinaryOperator NOT_EQUALS = SqlStdOperatorTable.NOT_EQUALS;
+ public static final SqlBinaryOperator GREATER_THAN = SqlStdOperatorTable.GREATER_THAN;
+ public static final SqlBinaryOperator GREATER_THAN_OR_EQUAL =
+ SqlStdOperatorTable.GREATER_THAN_OR_EQUAL;
+ public static final SqlBinaryOperator LESS_THAN = SqlStdOperatorTable.LESS_THAN;
+ public static final SqlBinaryOperator LESS_THAN_OR_EQUAL =
+ SqlStdOperatorTable.LESS_THAN_OR_EQUAL;
+
+ public static final SqlPostfixOperator IS_NULL = SqlStdOperatorTable.IS_NULL;
+ public static final SqlPostfixOperator IS_NOT_NULL = SqlStdOperatorTable.IS_NOT_NULL;
+
+ public static final SqlBetweenOperator BETWEEN = SqlStdOperatorTable.BETWEEN;
+ public static final SqlBetweenOperator NOT_BETWEEN = SqlStdOperatorTable.NOT_BETWEEN;
+
+ public static final SqlSpecialOperator LIKE = SqlStdOperatorTable.LIKE;
+ public static final SqlSpecialOperator NOT_LIKE = SqlStdOperatorTable.NOT_LIKE;
+
+ public static final SqlBinaryOperator IN = SqlStdOperatorTable.IN;
+ public static final SqlBinaryOperator NOT_IN = SqlStdOperatorTable.NOT_IN;
+
+ // -----------------
+ // Logical Functions
+ // -----------------
+ public static final SqlBinaryOperator OR = SqlStdOperatorTable.OR;
+ public static final SqlBinaryOperator AND = SqlStdOperatorTable.AND;
+ public static final SqlPrefixOperator NOT = SqlStdOperatorTable.NOT;
+
+ public static final SqlPostfixOperator IS_FALSE = SqlStdOperatorTable.IS_FALSE;
+ public static final SqlPostfixOperator IS_NOT_FALSE = SqlStdOperatorTable.IS_NOT_FALSE;
+ public static final SqlPostfixOperator IS_TRUE = SqlStdOperatorTable.IS_TRUE;
+ public static final SqlPostfixOperator IS_NOT_TRUE = SqlStdOperatorTable.IS_NOT_TRUE;
+
+ // --------------------
+ // Arithmetic Functions
+ // --------------------
+ public static final SqlBinaryOperator PLUS = SqlStdOperatorTable.PLUS;
+ public static final SqlBinaryOperator MINUS = SqlStdOperatorTable.MINUS;
+ public static final SqlBinaryOperator MULTIPLY = SqlStdOperatorTable.MULTIPLY;
+ public static final SqlBinaryOperator DIVIDE = SqlStdOperatorTable.DIVIDE;
+ public static final SqlBinaryOperator PERCENT_REMAINDER = SqlStdOperatorTable.PERCENT_REMAINDER;
+
+ public static final SqlFunction ABS = SqlStdOperatorTable.ABS;
+ public static final SqlFunction CEIL = SqlStdOperatorTable.CEIL;
+ public static final SqlFunction FLOOR = SqlStdOperatorTable.FLOOR;
+ public static final SqlFunction ROUND =
+ new SqlFunction(
+ "ROUND",
+ SqlKind.OTHER_FUNCTION,
+ TransformSqlReturnTypes.ROUND_FUNCTION_NULLABLE,
+ null,
+ OperandTypes.or(OperandTypes.NUMERIC_INTEGER, OperandTypes.NUMERIC),
+ SqlFunctionCategory.NUMERIC);
+ public static final SqlFunction UUID =
+ BuiltInScalarFunction.newBuilder()
+ .name("UUID")
+ .returnType(ReturnTypes.explicit(SqlTypeName.CHAR, 36))
+ .operandTypeChecker(OperandTypes.NILADIC)
+ .notDeterministic()
+ .build();
+
+ // ----------------
+ // String Functions
+ // ----------------
+ public static final SqlBinaryOperator CONCAT = SqlStdOperatorTable.CONCAT;
public static final SqlFunction CONCAT_FUNCTION =
BuiltInScalarFunction.newBuilder()
.name("CONCAT")
@@ -85,14 +162,48 @@ public void lookupOperatorOverloads(
.operandTypeChecker(
OperandTypes.repeat(SqlOperandCountRanges.from(1), OperandTypes.STRING))
.build();
+
+ public static final SqlFunction CHAR_LENGTH = SqlStdOperatorTable.CHAR_LENGTH;
+ public static final SqlFunction UPPER = SqlStdOperatorTable.UPPER;
+ public static final SqlFunction LOWER = SqlStdOperatorTable.LOWER;
+ public static final SqlFunction TRIM = SqlStdOperatorTable.TRIM;
+ public static final SqlFunction REGEXP_REPLACE =
+ new SqlFunction(
+ "REGEXP_REPLACE",
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.cascade(
+ ReturnTypes.explicit(SqlTypeName.VARCHAR),
+ SqlTypeTransforms.TO_NULLABLE),
+ null,
+ OperandTypes.family(
+ SqlTypeFamily.STRING, SqlTypeFamily.STRING, SqlTypeFamily.STRING),
+ SqlFunctionCategory.STRING);
+ public static final SqlFunction SUBSTR =
+ new SqlFunction(
+ "SUBSTR",
+ SqlKind.OTHER_FUNCTION,
+ TransformSqlReturnTypes.ARG0_VARCHAR_FORCE_NULLABLE,
+ null,
+ OperandTypes.or(
+ OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.INTEGER),
+ OperandTypes.family(
+ SqlTypeFamily.CHARACTER,
+ SqlTypeFamily.INTEGER,
+ SqlTypeFamily.INTEGER)),
+ SqlFunctionCategory.STRING);
+
+ // ------------------
+ // Temporal Functions
+ // ------------------
+ public static final SqlFunction LOCALTIME = SqlStdOperatorTable.LOCALTIME;
public static final SqlFunction LOCALTIMESTAMP =
new BuiltInTimestampFunction("LOCALTIMESTAMP", SqlTypeName.TIMESTAMP, 3);
public static final SqlFunction CURRENT_TIME =
new BuiltInTimestampFunction("CURRENT_TIME", SqlTypeName.TIME, 0);
+ public static final SqlFunction CURRENT_DATE = SqlStdOperatorTable.CURRENT_DATE;
public static final SqlFunction CURRENT_TIMESTAMP =
new BuiltInTimestampFunction(
"CURRENT_TIMESTAMP", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3);
- public static final SqlFunction CURRENT_DATE = new SqlCurrentDateFunction();
public static final SqlFunction NOW =
new BuiltInTimestampFunction("NOW", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3) {
@Override
@@ -100,30 +211,16 @@ public SqlSyntax getSyntax() {
return SqlSyntax.FUNCTION;
}
};
- public static final SqlFunction TO_DATE =
+ public static final SqlFunction DATE_FORMAT =
new SqlFunction(
- "TO_DATE",
+ "DATE_FORMAT",
SqlKind.OTHER_FUNCTION,
- ReturnTypes.cascade(
- ReturnTypes.explicit(SqlTypeName.DATE),
- SqlTypeTransforms.FORCE_NULLABLE),
- null,
+ TransformSqlReturnTypes.VARCHAR_FORCE_NULLABLE,
+ InferTypes.RETURN_TYPE,
OperandTypes.or(
- OperandTypes.family(SqlTypeFamily.STRING),
+ OperandTypes.family(SqlTypeFamily.TIMESTAMP, SqlTypeFamily.STRING),
OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)),
SqlFunctionCategory.TIMEDATE);
- public static final SqlFunction TO_TIMESTAMP =
- new SqlFunction(
- "TO_TIMESTAMP",
- SqlKind.OTHER_FUNCTION,
- ReturnTypes.cascade(
- ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 3),
- SqlTypeTransforms.FORCE_NULLABLE),
- null,
- OperandTypes.or(
- OperandTypes.family(SqlTypeFamily.CHARACTER),
- OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)),
- SqlFunctionCategory.TIMEDATE);
public static final SqlFunction TIMESTAMP_DIFF =
new SqlFunction(
"TIMESTAMP_DIFF",
@@ -135,64 +232,36 @@ public SqlSyntax getSyntax() {
OperandTypes.family(
SqlTypeFamily.ANY, SqlTypeFamily.TIMESTAMP, SqlTypeFamily.TIMESTAMP),
SqlFunctionCategory.TIMEDATE);
- public static final SqlFunction REGEXP_REPLACE =
+ public static final SqlFunction TO_DATE =
new SqlFunction(
- "REGEXP_REPLACE",
+ "TO_DATE",
SqlKind.OTHER_FUNCTION,
ReturnTypes.cascade(
- ReturnTypes.explicit(SqlTypeName.VARCHAR),
- SqlTypeTransforms.TO_NULLABLE),
- null,
- OperandTypes.family(
- SqlTypeFamily.STRING, SqlTypeFamily.STRING, SqlTypeFamily.STRING),
- SqlFunctionCategory.STRING);
- public static final SqlFunction SUBSTR =
- new SqlFunction(
- "SUBSTR",
- SqlKind.OTHER_FUNCTION,
- TransformSqlReturnTypes.ARG0_VARCHAR_FORCE_NULLABLE,
+ ReturnTypes.explicit(SqlTypeName.DATE),
+ SqlTypeTransforms.FORCE_NULLABLE),
null,
OperandTypes.or(
- OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.INTEGER),
- OperandTypes.family(
- SqlTypeFamily.CHARACTER,
- SqlTypeFamily.INTEGER,
- SqlTypeFamily.INTEGER)),
- SqlFunctionCategory.STRING);
- public static final SqlFunction ROUND =
+ OperandTypes.family(SqlTypeFamily.STRING),
+ OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)),
+ SqlFunctionCategory.TIMEDATE);
+ public static final SqlFunction TO_TIMESTAMP =
new SqlFunction(
- "ROUND",
+ "TO_TIMESTAMP",
SqlKind.OTHER_FUNCTION,
- TransformSqlReturnTypes.ROUND_FUNCTION_NULLABLE,
+ ReturnTypes.cascade(
+ ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 3),
+ SqlTypeTransforms.FORCE_NULLABLE),
null,
- OperandTypes.or(OperandTypes.NUMERIC_INTEGER, OperandTypes.NUMERIC),
- SqlFunctionCategory.NUMERIC);
- public static final SqlFunction UUID =
- BuiltInScalarFunction.newBuilder()
- .name("UUID")
- .returnType(ReturnTypes.explicit(SqlTypeName.CHAR, 36))
- .operandTypeChecker(OperandTypes.NILADIC)
- .notDeterministic()
- .build();
- public static final SqlFunction MOD = SqlStdOperatorTable.MOD;
- public static final SqlFunction LOCALTIME = SqlStdOperatorTable.LOCALTIME;
- public static final SqlFunction YEAR = SqlStdOperatorTable.YEAR;
- public static final SqlFunction QUARTER = SqlStdOperatorTable.QUARTER;
- public static final SqlFunction MONTH = SqlStdOperatorTable.MONTH;
- public static final SqlFunction WEEK = SqlStdOperatorTable.WEEK;
- public static final SqlFunction TIMESTAMP_ADD = SqlStdOperatorTable.TIMESTAMP_ADD;
- public static final SqlOperator BETWEEN = SqlStdOperatorTable.BETWEEN;
- public static final SqlOperator SYMMETRIC_BETWEEN = SqlStdOperatorTable.SYMMETRIC_BETWEEN;
- public static final SqlOperator NOT_BETWEEN = SqlStdOperatorTable.NOT_BETWEEN;
- public static final SqlOperator IN = SqlStdOperatorTable.IN;
- public static final SqlOperator NOT_IN = SqlStdOperatorTable.NOT_IN;
- public static final SqlFunction CHAR_LENGTH = SqlStdOperatorTable.CHAR_LENGTH;
- public static final SqlFunction TRIM = SqlStdOperatorTable.TRIM;
- public static final SqlOperator NOT_LIKE = SqlStdOperatorTable.NOT_LIKE;
- public static final SqlOperator LIKE = SqlStdOperatorTable.LIKE;
- public static final SqlFunction UPPER = SqlStdOperatorTable.UPPER;
- public static final SqlFunction LOWER = SqlStdOperatorTable.LOWER;
- public static final SqlFunction ABS = SqlStdOperatorTable.ABS;
+ OperandTypes.or(
+ OperandTypes.family(SqlTypeFamily.CHARACTER),
+ OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)),
+ SqlFunctionCategory.TIMEDATE);
+
+ // ---------------------
+ // Conditional Functions
+ // ---------------------
+ public static final SqlCaseOperator CASE = SqlStdOperatorTable.CASE;
+ public static final SqlFunction COALESCE = SqlStdOperatorTable.COALESCE;
public static final SqlFunction IF =
new SqlFunction(
"IF",
@@ -235,17 +304,9 @@ public SqlSyntax getSyntax() {
OperandTypes.family(
SqlTypeFamily.BOOLEAN, SqlTypeFamily.TIME, SqlTypeFamily.TIME)),
SqlFunctionCategory.NUMERIC);
- public static final SqlFunction NULLIF = SqlStdOperatorTable.NULLIF;
- public static final SqlFunction FLOOR = SqlStdOperatorTable.FLOOR;
- public static final SqlFunction CEIL = SqlStdOperatorTable.CEIL;
- public static final SqlFunction DATE_FORMAT =
- new SqlFunction(
- "DATE_FORMAT",
- SqlKind.OTHER_FUNCTION,
- TransformSqlReturnTypes.VARCHAR_FORCE_NULLABLE,
- InferTypes.RETURN_TYPE,
- OperandTypes.or(
- OperandTypes.family(SqlTypeFamily.TIMESTAMP, SqlTypeFamily.STRING),
- OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)),
- SqlFunctionCategory.TIMEDATE);
+
+ // --------------
+ // Cast Functions
+ // --------------
+ public static final SqlFunction CAST = SqlStdOperatorTable.CAST;
}
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
index 26a6276c058..bf61ad2600a 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
@@ -17,7 +17,6 @@
package org.apache.flink.cdc.runtime.operators.schema;
-import org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableMap;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
@@ -41,6 +40,7 @@
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
import org.apache.commons.collections.ListUtils;
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index 5eee2663e99..86ebefedce9 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -38,9 +38,7 @@
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlSelect;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
-import org.apache.calcite.sql.util.SqlOperatorTables;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.calcite.sql2rel.RelDecorrelator;
@@ -101,10 +99,9 @@ public void testTransformCalciteValidate() {
factory,
new CalciteConnectionConfigImpl(new Properties()));
TransformSqlOperatorTable transformSqlOperatorTable = TransformSqlOperatorTable.instance();
- SqlStdOperatorTable sqlStdOperatorTable = SqlStdOperatorTable.instance();
SqlValidator validator =
SqlValidatorUtil.newValidator(
- SqlOperatorTables.chain(sqlStdOperatorTable, transformSqlOperatorTable),
+ transformSqlOperatorTable,
calciteCatalogReader,
factory,
SqlValidator.Config.DEFAULT.withIdentifierExpansion(true));
@@ -144,10 +141,9 @@ public void testCalciteRelNode() {
factory,
new CalciteConnectionConfigImpl(new Properties()));
TransformSqlOperatorTable transformSqlOperatorTable = TransformSqlOperatorTable.instance();
- SqlStdOperatorTable sqlStdOperatorTable = SqlStdOperatorTable.instance();
SqlValidator validator =
SqlValidatorUtil.newValidator(
- SqlOperatorTables.chain(sqlStdOperatorTable, transformSqlOperatorTable),
+ transformSqlOperatorTable,
calciteCatalogReader,
factory,
SqlValidator.Config.DEFAULT.withIdentifierExpansion(true));