From f3cab24fa87f9c5a64cdeaef8619ad42016eceba Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Wed, 7 Aug 2024 12:08:46 +0800 Subject: [PATCH 1/2] [FLINK-35991][cdc-runtime] Resolve operator conflicts in transform SQL operator tables --- .../cdc/runtime/parser/TransformParser.java | 10 +- .../metadata/TransformSqlOperatorTable.java | 229 +++++++++++------- .../runtime/parser/TransformParserTest.java | 8 +- 3 files changed, 150 insertions(+), 97 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java index ae598b7da88..ea7bf4c5c13 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java @@ -51,8 +51,8 @@ import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlOperatorTable; import org.apache.calcite.sql.SqlSelect; -import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.parser.SqlParser; import org.apache.calcite.sql.parser.SqlParserPos; @@ -60,7 +60,6 @@ import org.apache.calcite.sql.type.OperandTypes; import org.apache.calcite.sql.type.SqlReturnTypeInference; import org.apache.calcite.sql.type.SqlTypeFactoryImpl; -import org.apache.calcite.sql.util.ListSqlOperatorTable; import org.apache.calcite.sql.util.SqlOperatorTables; import org.apache.calcite.sql.validate.SqlConformanceEnum; import org.apache.calcite.sql.validate.SqlValidator; @@ -156,13 +155,10 @@ private static RelNode sqlToRel( factory, new CalciteConnectionConfigImpl(new Properties())); TransformSqlOperatorTable transformSqlOperatorTable = TransformSqlOperatorTable.instance(); - SqlStdOperatorTable sqlStdOperatorTable = SqlStdOperatorTable.instance(); - ListSqlOperatorTable udfOperatorTable = new ListSqlOperatorTable(); - udfFunctions.forEach(udfOperatorTable::add); + SqlOperatorTable udfOperatorTable = SqlOperatorTables.of(udfFunctions); SqlValidator validator = SqlValidatorUtil.newValidator( - SqlOperatorTables.chain( - sqlStdOperatorTable, transformSqlOperatorTable, udfOperatorTable), + SqlOperatorTables.chain(transformSqlOperatorTable, udfOperatorTable), calciteCatalogReader, factory, SqlValidator.Config.DEFAULT.withIdentifierExpansion(true)); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java index 35253f27918..658550da0c9 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java @@ -20,13 +20,18 @@ import org.apache.flink.cdc.runtime.functions.BuiltInScalarFunction; import org.apache.flink.cdc.runtime.functions.BuiltInTimestampFunction; +import org.apache.calcite.sql.SqlBinaryOperator; import org.apache.calcite.sql.SqlFunction; import org.apache.calcite.sql.SqlFunctionCategory; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlPostfixOperator; +import org.apache.calcite.sql.SqlPrefixOperator; +import org.apache.calcite.sql.SqlSpecialOperator; import org.apache.calcite.sql.SqlSyntax; -import org.apache.calcite.sql.fun.SqlCurrentDateFunction; +import org.apache.calcite.sql.fun.SqlBetweenOperator; +import org.apache.calcite.sql.fun.SqlCaseOperator; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.type.InferTypes; import org.apache.calcite.sql.type.OperandTypes; @@ -75,6 +80,78 @@ public void lookupOperatorOverloads( SqlNameMatchers.withCaseSensitive(false)); } + // The following binary functions are sorted in documentation definitions. See + // https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/core-concept/transform/ for a + // full list of CDC supported built-in functions. + + // -------------------- + // Comparison Functions + // -------------------- + public static final SqlBinaryOperator EQUALS = SqlStdOperatorTable.EQUALS; + public static final SqlBinaryOperator NOT_EQUALS = SqlStdOperatorTable.NOT_EQUALS; + public static final SqlBinaryOperator GREATER_THAN = SqlStdOperatorTable.GREATER_THAN; + public static final SqlBinaryOperator GREATER_THAN_OR_EQUAL = + SqlStdOperatorTable.GREATER_THAN_OR_EQUAL; + public static final SqlBinaryOperator LESS_THAN = SqlStdOperatorTable.LESS_THAN; + public static final SqlBinaryOperator LESS_THAN_OR_EQUAL = + SqlStdOperatorTable.LESS_THAN_OR_EQUAL; + + public static final SqlPostfixOperator IS_NULL = SqlStdOperatorTable.IS_NULL; + public static final SqlPostfixOperator IS_NOT_NULL = SqlStdOperatorTable.IS_NOT_NULL; + + public static final SqlBetweenOperator BETWEEN = SqlStdOperatorTable.BETWEEN; + public static final SqlBetweenOperator NOT_BETWEEN = SqlStdOperatorTable.NOT_BETWEEN; + + public static final SqlSpecialOperator LIKE = SqlStdOperatorTable.LIKE; + public static final SqlSpecialOperator NOT_LIKE = SqlStdOperatorTable.NOT_LIKE; + + public static final SqlBinaryOperator IN = SqlStdOperatorTable.IN; + public static final SqlBinaryOperator NOT_IN = SqlStdOperatorTable.NOT_IN; + + // ----------------- + // Logical Functions + // ----------------- + public static final SqlBinaryOperator OR = SqlStdOperatorTable.OR; + public static final SqlBinaryOperator AND = SqlStdOperatorTable.AND; + public static final SqlPrefixOperator NOT = SqlStdOperatorTable.NOT; + + public static final SqlPostfixOperator IS_FALSE = SqlStdOperatorTable.IS_FALSE; + public static final SqlPostfixOperator IS_NOT_FALSE = SqlStdOperatorTable.IS_NOT_FALSE; + public static final SqlPostfixOperator IS_TRUE = SqlStdOperatorTable.IS_TRUE; + public static final SqlPostfixOperator IS_NOT_TRUE = SqlStdOperatorTable.IS_NOT_TRUE; + + // -------------------- + // Arithmetic Functions + // -------------------- + public static final SqlBinaryOperator PLUS = SqlStdOperatorTable.PLUS; + public static final SqlBinaryOperator MINUS = SqlStdOperatorTable.MINUS; + public static final SqlBinaryOperator MULTIPLY = SqlStdOperatorTable.MULTIPLY; + public static final SqlBinaryOperator DIVIDE = SqlStdOperatorTable.DIVIDE; + public static final SqlBinaryOperator PERCENT_REMAINDER = SqlStdOperatorTable.PERCENT_REMAINDER; + + public static final SqlFunction ABS = SqlStdOperatorTable.ABS; + public static final SqlFunction CEIL = SqlStdOperatorTable.CEIL; + public static final SqlFunction FLOOR = SqlStdOperatorTable.FLOOR; + public static final SqlFunction ROUND = + new SqlFunction( + "ROUND", + SqlKind.OTHER_FUNCTION, + TransformSqlReturnTypes.ROUND_FUNCTION_NULLABLE, + null, + OperandTypes.or(OperandTypes.NUMERIC_INTEGER, OperandTypes.NUMERIC), + SqlFunctionCategory.NUMERIC); + public static final SqlFunction UUID = + BuiltInScalarFunction.newBuilder() + .name("UUID") + .returnType(ReturnTypes.explicit(SqlTypeName.CHAR, 36)) + .operandTypeChecker(OperandTypes.NILADIC) + .notDeterministic() + .build(); + + // ---------------- + // String Functions + // ---------------- + public static final SqlBinaryOperator CONCAT = SqlStdOperatorTable.CONCAT; public static final SqlFunction CONCAT_FUNCTION = BuiltInScalarFunction.newBuilder() .name("CONCAT") @@ -85,14 +162,48 @@ public void lookupOperatorOverloads( .operandTypeChecker( OperandTypes.repeat(SqlOperandCountRanges.from(1), OperandTypes.STRING)) .build(); + + public static final SqlFunction CHAR_LENGTH = SqlStdOperatorTable.CHAR_LENGTH; + public static final SqlFunction UPPER = SqlStdOperatorTable.UPPER; + public static final SqlFunction LOWER = SqlStdOperatorTable.LOWER; + public static final SqlFunction TRIM = SqlStdOperatorTable.TRIM; + public static final SqlFunction REGEXP_REPLACE = + new SqlFunction( + "REGEXP_REPLACE", + SqlKind.OTHER_FUNCTION, + ReturnTypes.cascade( + ReturnTypes.explicit(SqlTypeName.VARCHAR), + SqlTypeTransforms.TO_NULLABLE), + null, + OperandTypes.family( + SqlTypeFamily.STRING, SqlTypeFamily.STRING, SqlTypeFamily.STRING), + SqlFunctionCategory.STRING); + public static final SqlFunction SUBSTR = + new SqlFunction( + "SUBSTR", + SqlKind.OTHER_FUNCTION, + TransformSqlReturnTypes.ARG0_VARCHAR_FORCE_NULLABLE, + null, + OperandTypes.or( + OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.INTEGER), + OperandTypes.family( + SqlTypeFamily.CHARACTER, + SqlTypeFamily.INTEGER, + SqlTypeFamily.INTEGER)), + SqlFunctionCategory.STRING); + + // ------------------ + // Temporal Functions + // ------------------ + public static final SqlFunction LOCALTIME = SqlStdOperatorTable.LOCALTIME; public static final SqlFunction LOCALTIMESTAMP = new BuiltInTimestampFunction("LOCALTIMESTAMP", SqlTypeName.TIMESTAMP, 3); public static final SqlFunction CURRENT_TIME = new BuiltInTimestampFunction("CURRENT_TIME", SqlTypeName.TIME, 0); + public static final SqlFunction CURRENT_DATE = SqlStdOperatorTable.CURRENT_DATE; public static final SqlFunction CURRENT_TIMESTAMP = new BuiltInTimestampFunction( "CURRENT_TIMESTAMP", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3); - public static final SqlFunction CURRENT_DATE = new SqlCurrentDateFunction(); public static final SqlFunction NOW = new BuiltInTimestampFunction("NOW", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3) { @Override @@ -100,30 +211,16 @@ public SqlSyntax getSyntax() { return SqlSyntax.FUNCTION; } }; - public static final SqlFunction TO_DATE = + public static final SqlFunction DATE_FORMAT = new SqlFunction( - "TO_DATE", + "DATE_FORMAT", SqlKind.OTHER_FUNCTION, - ReturnTypes.cascade( - ReturnTypes.explicit(SqlTypeName.DATE), - SqlTypeTransforms.FORCE_NULLABLE), - null, + TransformSqlReturnTypes.VARCHAR_FORCE_NULLABLE, + InferTypes.RETURN_TYPE, OperandTypes.or( - OperandTypes.family(SqlTypeFamily.STRING), + OperandTypes.family(SqlTypeFamily.TIMESTAMP, SqlTypeFamily.STRING), OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)), SqlFunctionCategory.TIMEDATE); - public static final SqlFunction TO_TIMESTAMP = - new SqlFunction( - "TO_TIMESTAMP", - SqlKind.OTHER_FUNCTION, - ReturnTypes.cascade( - ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 3), - SqlTypeTransforms.FORCE_NULLABLE), - null, - OperandTypes.or( - OperandTypes.family(SqlTypeFamily.CHARACTER), - OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)), - SqlFunctionCategory.TIMEDATE); public static final SqlFunction TIMESTAMP_DIFF = new SqlFunction( "TIMESTAMP_DIFF", @@ -135,64 +232,36 @@ public SqlSyntax getSyntax() { OperandTypes.family( SqlTypeFamily.ANY, SqlTypeFamily.TIMESTAMP, SqlTypeFamily.TIMESTAMP), SqlFunctionCategory.TIMEDATE); - public static final SqlFunction REGEXP_REPLACE = + public static final SqlFunction TO_DATE = new SqlFunction( - "REGEXP_REPLACE", + "TO_DATE", SqlKind.OTHER_FUNCTION, ReturnTypes.cascade( - ReturnTypes.explicit(SqlTypeName.VARCHAR), - SqlTypeTransforms.TO_NULLABLE), - null, - OperandTypes.family( - SqlTypeFamily.STRING, SqlTypeFamily.STRING, SqlTypeFamily.STRING), - SqlFunctionCategory.STRING); - public static final SqlFunction SUBSTR = - new SqlFunction( - "SUBSTR", - SqlKind.OTHER_FUNCTION, - TransformSqlReturnTypes.ARG0_VARCHAR_FORCE_NULLABLE, + ReturnTypes.explicit(SqlTypeName.DATE), + SqlTypeTransforms.FORCE_NULLABLE), null, OperandTypes.or( - OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.INTEGER), - OperandTypes.family( - SqlTypeFamily.CHARACTER, - SqlTypeFamily.INTEGER, - SqlTypeFamily.INTEGER)), - SqlFunctionCategory.STRING); - public static final SqlFunction ROUND = + OperandTypes.family(SqlTypeFamily.STRING), + OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)), + SqlFunctionCategory.TIMEDATE); + public static final SqlFunction TO_TIMESTAMP = new SqlFunction( - "ROUND", + "TO_TIMESTAMP", SqlKind.OTHER_FUNCTION, - TransformSqlReturnTypes.ROUND_FUNCTION_NULLABLE, + ReturnTypes.cascade( + ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 3), + SqlTypeTransforms.FORCE_NULLABLE), null, - OperandTypes.or(OperandTypes.NUMERIC_INTEGER, OperandTypes.NUMERIC), - SqlFunctionCategory.NUMERIC); - public static final SqlFunction UUID = - BuiltInScalarFunction.newBuilder() - .name("UUID") - .returnType(ReturnTypes.explicit(SqlTypeName.CHAR, 36)) - .operandTypeChecker(OperandTypes.NILADIC) - .notDeterministic() - .build(); - public static final SqlFunction MOD = SqlStdOperatorTable.MOD; - public static final SqlFunction LOCALTIME = SqlStdOperatorTable.LOCALTIME; - public static final SqlFunction YEAR = SqlStdOperatorTable.YEAR; - public static final SqlFunction QUARTER = SqlStdOperatorTable.QUARTER; - public static final SqlFunction MONTH = SqlStdOperatorTable.MONTH; - public static final SqlFunction WEEK = SqlStdOperatorTable.WEEK; - public static final SqlFunction TIMESTAMP_ADD = SqlStdOperatorTable.TIMESTAMP_ADD; - public static final SqlOperator BETWEEN = SqlStdOperatorTable.BETWEEN; - public static final SqlOperator SYMMETRIC_BETWEEN = SqlStdOperatorTable.SYMMETRIC_BETWEEN; - public static final SqlOperator NOT_BETWEEN = SqlStdOperatorTable.NOT_BETWEEN; - public static final SqlOperator IN = SqlStdOperatorTable.IN; - public static final SqlOperator NOT_IN = SqlStdOperatorTable.NOT_IN; - public static final SqlFunction CHAR_LENGTH = SqlStdOperatorTable.CHAR_LENGTH; - public static final SqlFunction TRIM = SqlStdOperatorTable.TRIM; - public static final SqlOperator NOT_LIKE = SqlStdOperatorTable.NOT_LIKE; - public static final SqlOperator LIKE = SqlStdOperatorTable.LIKE; - public static final SqlFunction UPPER = SqlStdOperatorTable.UPPER; - public static final SqlFunction LOWER = SqlStdOperatorTable.LOWER; - public static final SqlFunction ABS = SqlStdOperatorTable.ABS; + OperandTypes.or( + OperandTypes.family(SqlTypeFamily.CHARACTER), + OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)), + SqlFunctionCategory.TIMEDATE); + + // --------------------- + // Conditional Functions + // --------------------- + public static final SqlCaseOperator CASE = SqlStdOperatorTable.CASE; + public static final SqlFunction COALESCE = SqlStdOperatorTable.COALESCE; public static final SqlFunction IF = new SqlFunction( "IF", @@ -235,17 +304,9 @@ public SqlSyntax getSyntax() { OperandTypes.family( SqlTypeFamily.BOOLEAN, SqlTypeFamily.TIME, SqlTypeFamily.TIME)), SqlFunctionCategory.NUMERIC); - public static final SqlFunction NULLIF = SqlStdOperatorTable.NULLIF; - public static final SqlFunction FLOOR = SqlStdOperatorTable.FLOOR; - public static final SqlFunction CEIL = SqlStdOperatorTable.CEIL; - public static final SqlFunction DATE_FORMAT = - new SqlFunction( - "DATE_FORMAT", - SqlKind.OTHER_FUNCTION, - TransformSqlReturnTypes.VARCHAR_FORCE_NULLABLE, - InferTypes.RETURN_TYPE, - OperandTypes.or( - OperandTypes.family(SqlTypeFamily.TIMESTAMP, SqlTypeFamily.STRING), - OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)), - SqlFunctionCategory.TIMEDATE); + + // -------------- + // Cast Functions + // -------------- + public static final SqlFunction CAST = SqlStdOperatorTable.CAST; } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java index 5eee2663e99..86ebefedce9 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java @@ -38,9 +38,7 @@ import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlSelect; -import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.type.SqlTypeFactoryImpl; -import org.apache.calcite.sql.util.SqlOperatorTables; import org.apache.calcite.sql.validate.SqlValidator; import org.apache.calcite.sql.validate.SqlValidatorUtil; import org.apache.calcite.sql2rel.RelDecorrelator; @@ -101,10 +99,9 @@ public void testTransformCalciteValidate() { factory, new CalciteConnectionConfigImpl(new Properties())); TransformSqlOperatorTable transformSqlOperatorTable = TransformSqlOperatorTable.instance(); - SqlStdOperatorTable sqlStdOperatorTable = SqlStdOperatorTable.instance(); SqlValidator validator = SqlValidatorUtil.newValidator( - SqlOperatorTables.chain(sqlStdOperatorTable, transformSqlOperatorTable), + transformSqlOperatorTable, calciteCatalogReader, factory, SqlValidator.Config.DEFAULT.withIdentifierExpansion(true)); @@ -144,10 +141,9 @@ public void testCalciteRelNode() { factory, new CalciteConnectionConfigImpl(new Properties())); TransformSqlOperatorTable transformSqlOperatorTable = TransformSqlOperatorTable.instance(); - SqlStdOperatorTable sqlStdOperatorTable = SqlStdOperatorTable.instance(); SqlValidator validator = SqlValidatorUtil.newValidator( - SqlOperatorTables.chain(sqlStdOperatorTable, transformSqlOperatorTable), + transformSqlOperatorTable, calciteCatalogReader, factory, SqlValidator.Config.DEFAULT.withIdentifierExpansion(true)); From b1524d19ee9ec6b3f1afffc85457192637a78b32 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Mon, 12 Aug 2024 14:10:35 +0800 Subject: [PATCH 2/2] [FLINK-36034][cdc-runtime] Get rid of Flink table planner dependency in cdc runtime module This closes #3513. --- flink-cdc-runtime/pom.xml | 28 ------------ .../functions/BuiltInScalarFunction.java | 43 +------------------ .../operators/schema/SchemaEvolveTest.java | 2 +- 3 files changed, 3 insertions(+), 70 deletions(-) diff --git a/flink-cdc-runtime/pom.xml b/flink-cdc-runtime/pom.xml index e278fd669ad..deb29750908 100644 --- a/flink-cdc-runtime/pom.xml +++ b/flink-cdc-runtime/pom.xml @@ -44,34 +44,6 @@ limitations under the License. org.apache.calcite calcite-core - - org.apache.flink - flink-table-planner_2.12 - ${flink.version} - provided - - - value - org.immutables - - - value-annotations - org.immutables - - - commons-compiler - org.codehaus.janino - - - janino - org.codehaus.janino - - - flink-scala_2.12 - org.apache.flink - - - org.apache.flink flink-cdc-common diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/BuiltInScalarFunction.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/BuiltInScalarFunction.java index d1052cf398b..80141a13e36 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/BuiltInScalarFunction.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/BuiltInScalarFunction.java @@ -18,7 +18,6 @@ package org.apache.flink.cdc.runtime.functions; import org.apache.flink.annotation.Internal; -import org.apache.flink.table.functions.BuiltInFunctionDefinition; import org.apache.calcite.sql.SqlFunction; import org.apache.calcite.sql.SqlFunctionCategory; @@ -31,28 +30,17 @@ import javax.annotation.Nullable; -import java.util.Optional; import java.util.function.Function; -import static org.apache.flink.table.functions.BuiltInFunctionDefinition.DEFAULT_VERSION; -import static org.apache.flink.table.functions.BuiltInFunctionDefinition.qualifyFunctionName; -import static org.apache.flink.table.functions.BuiltInFunctionDefinition.validateFunction; -import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull; /** * This is the case when the operator has a special parsing syntax or uses other Calcite-specific - * features that are not exposed via {@link BuiltInFunctionDefinition} yet. - * - *

Note: Try to keep usages of this class to a minimum and use Flink's {@link - * BuiltInFunctionDefinition} stack instead. - * - *

For simple functions, use the provided builder. Otherwise, this class can also be extended. + * features that are not exposed via {@link SqlFunction} yet. */ @Internal public class BuiltInScalarFunction extends SqlFunction { - private final @Nullable Integer version; - private final boolean isDeterministic; private final boolean isInternal; @@ -61,7 +49,6 @@ public class BuiltInScalarFunction extends SqlFunction { protected BuiltInScalarFunction( String name, - int version, SqlKind kind, @Nullable SqlReturnTypeInference returnTypeInference, @Nullable SqlOperandTypeInference operandTypeInference, @@ -77,11 +64,9 @@ protected BuiltInScalarFunction( operandTypeInference, operandTypeChecker, checkNotNull(category)); - this.version = isInternal ? null : version; this.isDeterministic = isDeterministic; this.isInternal = isInternal; this.monotonicity = monotonicity; - validateFunction(name, version, isInternal); } protected BuiltInScalarFunction( @@ -93,7 +78,6 @@ protected BuiltInScalarFunction( SqlFunctionCategory category) { this( name, - DEFAULT_VERSION, kind, returnTypeInference, operandTypeInference, @@ -109,18 +93,6 @@ public static Builder newBuilder() { return new Builder(); } - public final Optional getVersion() { - return Optional.ofNullable(version); - } - - public String getQualifiedName() { - if (isInternal) { - return getName(); - } - assert version != null; - return qualifyFunctionName(getName(), version); - } - @Override public boolean isDeterministic() { return isDeterministic; @@ -144,8 +116,6 @@ public static class Builder { private String name; - private int version = DEFAULT_VERSION; - private SqlKind kind = SqlKind.OTHER_FUNCTION; private SqlReturnTypeInference returnTypeInference; @@ -163,18 +133,11 @@ public static class Builder { private Function monotonicity = call -> SqlMonotonicity.NOT_MONOTONIC; - /** @see BuiltInFunctionDefinition.Builder#name(String) */ public Builder name(String name) { this.name = name; return this; } - /** @see BuiltInFunctionDefinition.Builder#version(int) */ - public Builder version(int version) { - this.version = version; - return this; - } - public Builder kind(SqlKind kind) { this.kind = kind; return this; @@ -205,7 +168,6 @@ public Builder notDeterministic() { return this; } - /** @see BuiltInFunctionDefinition.Builder#internal() */ public Builder internal() { this.isInternal = true; return this; @@ -224,7 +186,6 @@ public Builder monotonicity(Function monoto public BuiltInScalarFunction build() { return new BuiltInScalarFunction( name, - version, kind, returnTypeInference, operandTypeInference, diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java index 26a6276c058..bf61ad2600a 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java @@ -17,7 +17,6 @@ package org.apache.flink.cdc.runtime.operators.schema; -import org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableMap; import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.AddColumnEvent; @@ -41,6 +40,7 @@ import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; import org.apache.flink.shaded.guava31.com.google.common.collect.Sets; import org.apache.commons.collections.ListUtils;