Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 0 additions & 28 deletions flink-cdc-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,34 +44,6 @@ limitations under the License.
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>value</artifactId>
<groupId>org.immutables</groupId>
</exclusion>
<exclusion>
<artifactId>value-annotations</artifactId>
<groupId>org.immutables</groupId>
</exclusion>
<exclusion>
<artifactId>commons-compiler</artifactId>
<groupId>org.codehaus.janino</groupId>
</exclusion>
<exclusion>
<artifactId>janino</artifactId>
<groupId>org.codehaus.janino</groupId>
</exclusion>
<exclusion>
<artifactId>flink-scala_2.12</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.cdc.runtime.functions;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.BuiltInFunctionDefinition;

import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
Expand All @@ -31,28 +30,17 @@

import javax.annotation.Nullable;

import java.util.Optional;
import java.util.function.Function;

import static org.apache.flink.table.functions.BuiltInFunctionDefinition.DEFAULT_VERSION;
import static org.apache.flink.table.functions.BuiltInFunctionDefinition.qualifyFunctionName;
import static org.apache.flink.table.functions.BuiltInFunctionDefinition.validateFunction;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;

/**
* This is the case when the operator has a special parsing syntax or uses other Calcite-specific
* features that are not exposed via {@link BuiltInFunctionDefinition} yet.
*
* <p>Note: Try to keep usages of this class to a minimum and use Flink's {@link
* BuiltInFunctionDefinition} stack instead.
*
* <p>For simple functions, use the provided builder. Otherwise, this class can also be extended.
* features that are not exposed via {@link SqlFunction} yet.
*/
@Internal
public class BuiltInScalarFunction extends SqlFunction {

private final @Nullable Integer version;

private final boolean isDeterministic;

private final boolean isInternal;
Expand All @@ -61,7 +49,6 @@ public class BuiltInScalarFunction extends SqlFunction {

protected BuiltInScalarFunction(
String name,
int version,
SqlKind kind,
@Nullable SqlReturnTypeInference returnTypeInference,
@Nullable SqlOperandTypeInference operandTypeInference,
Expand All @@ -77,11 +64,9 @@ protected BuiltInScalarFunction(
operandTypeInference,
operandTypeChecker,
checkNotNull(category));
this.version = isInternal ? null : version;
this.isDeterministic = isDeterministic;
this.isInternal = isInternal;
this.monotonicity = monotonicity;
validateFunction(name, version, isInternal);
}

protected BuiltInScalarFunction(
Expand All @@ -93,7 +78,6 @@ protected BuiltInScalarFunction(
SqlFunctionCategory category) {
this(
name,
DEFAULT_VERSION,
kind,
returnTypeInference,
operandTypeInference,
Expand All @@ -109,18 +93,6 @@ public static Builder newBuilder() {
return new Builder();
}

public final Optional<Integer> getVersion() {
return Optional.ofNullable(version);
}

public String getQualifiedName() {
if (isInternal) {
return getName();
}
assert version != null;
return qualifyFunctionName(getName(), version);
}

@Override
public boolean isDeterministic() {
return isDeterministic;
Expand All @@ -144,8 +116,6 @@ public static class Builder {

private String name;

private int version = DEFAULT_VERSION;

private SqlKind kind = SqlKind.OTHER_FUNCTION;

private SqlReturnTypeInference returnTypeInference;
Expand All @@ -163,18 +133,11 @@ public static class Builder {
private Function<SqlOperatorBinding, SqlMonotonicity> monotonicity =
call -> SqlMonotonicity.NOT_MONOTONIC;

/** @see BuiltInFunctionDefinition.Builder#name(String) */
public Builder name(String name) {
this.name = name;
return this;
}

/** @see BuiltInFunctionDefinition.Builder#version(int) */
public Builder version(int version) {
this.version = version;
return this;
}

public Builder kind(SqlKind kind) {
this.kind = kind;
return this;
Expand Down Expand Up @@ -205,7 +168,6 @@ public Builder notDeterministic() {
return this;
}

/** @see BuiltInFunctionDefinition.Builder#internal() */
public Builder internal() {
this.isInternal = true;
return this;
Expand All @@ -224,7 +186,6 @@ public Builder monotonicity(Function<SqlOperatorBinding, SqlMonotonicity> monoto
public BuiltInScalarFunction build() {
return new BuiltInScalarFunction(
name,
version,
kind,
returnTypeInference,
operandTypeInference,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,15 @@
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.util.ListSqlOperatorTable;
import org.apache.calcite.sql.util.SqlOperatorTables;
import org.apache.calcite.sql.validate.SqlConformanceEnum;
import org.apache.calcite.sql.validate.SqlValidator;
Expand Down Expand Up @@ -156,13 +155,10 @@ private static RelNode sqlToRel(
factory,
new CalciteConnectionConfigImpl(new Properties()));
TransformSqlOperatorTable transformSqlOperatorTable = TransformSqlOperatorTable.instance();
SqlStdOperatorTable sqlStdOperatorTable = SqlStdOperatorTable.instance();
ListSqlOperatorTable udfOperatorTable = new ListSqlOperatorTable();
udfFunctions.forEach(udfOperatorTable::add);
SqlOperatorTable udfOperatorTable = SqlOperatorTables.of(udfFunctions);
SqlValidator validator =
SqlValidatorUtil.newValidator(
SqlOperatorTables.chain(
sqlStdOperatorTable, transformSqlOperatorTable, udfOperatorTable),
SqlOperatorTables.chain(transformSqlOperatorTable, udfOperatorTable),
calciteCatalogReader,
factory,
SqlValidator.Config.DEFAULT.withIdentifierExpansion(true));
Expand Down
Loading