diff --git a/flink-python/pyflink/table/expressions.py b/flink-python/pyflink/table/expressions.py
index 179fac4e19da1..0b3a60bfe5d7d 100644
--- a/flink-python/pyflink/table/expressions.py
+++ b/flink-python/pyflink/table/expressions.py
@@ -477,6 +477,20 @@ def log(v, base=None) -> Expression[float]:
return _binary_op("log", base, v)
+def source_watermark() -> Expression:
+ """
+ Source watermark declaration for schema.
+
+ This is a marker function that doesn't have concrete runtime implementation. It can only
+ be used as a single expression for watermark strategies in schema declarations. The declaration
+ will be pushed down into a table source that implements the `SupportsSourceWatermark`
+ interface. The source will emit system-defined watermarks afterwards.
+
+ Please check the documentation whether the connector supports source watermarks.
+ """
+ return _leaf_op("sourceWatermark")
+
+
def if_then_else(condition: Union[bool, Expression[bool]], if_true, if_false) -> Expression:
"""
Ternary conditional operator that decides which of two other expressions should be evaluated
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
index 0444287f2c77b..b88be0f0e0cf7 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
@@ -20,6 +20,7 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ResolvedExpression;
@@ -464,6 +465,21 @@ public static ApiExpression log(Object base, Object value) {
return apiCall(BuiltInFunctionDefinitions.LOG, base, value);
}
+ /**
+ * Source watermark declaration for {@link Schema}.
+ *
+ *
This is a marker function that doesn't have concrete runtime implementation. It can only
+ * be used as a single expression in {@link Schema.Builder#watermark(String, Expression)}. The
+ * declaration will be pushed down into a table source that implements the {@link
+ * SupportsSourceWatermark} interface. The source will emit system-defined watermarks
+ * afterwards.
+ *
+ *
Please check the documentation whether the connector supports source watermarks.
+ */
+ public static ApiExpression sourceWatermark() {
+ return apiCall(BuiltInFunctionDefinitions.SOURCE_WATERMARK);
+ }
+
/**
* Ternary conditional operator that decides which of two other expressions should be evaluated
* based on a evaluated boolean condition.
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java
index 5559813d821db..dc4f0a7c370d1 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java
@@ -19,31 +19,21 @@
package org.apache.flink.table.catalog;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.operations.QueryOperation;
-import java.util.HashMap;
+import java.util.Map;
import java.util.Optional;
-/**
- * A view created from a {@link QueryOperation} via operations on {@link
- * org.apache.flink.table.api.Table}.
- */
+/** A view created from a {@link QueryOperation} via operations on {@link Table}. */
@Internal
-public class QueryOperationCatalogView extends AbstractCatalogView {
+public final class QueryOperationCatalogView implements CatalogView {
+
private final QueryOperation queryOperation;
public QueryOperationCatalogView(QueryOperation queryOperation) {
- this(queryOperation, "");
- }
-
- public QueryOperationCatalogView(QueryOperation queryOperation, String comment) {
- super(
- queryOperation.asSummaryString(),
- queryOperation.asSummaryString(),
- TableSchema.fromResolvedSchema(queryOperation.getResolvedSchema()),
- new HashMap<>(),
- comment);
this.queryOperation = queryOperation;
}
@@ -51,9 +41,24 @@ public QueryOperation getQueryOperation() {
return queryOperation;
}
+ @Override
+ public Schema getUnresolvedSchema() {
+ return Schema.newBuilder().fromResolvedSchema(queryOperation.getResolvedSchema()).build();
+ }
+
+ @Override
+ public Map getOptions() {
+ throw new TableException("A view backed by a query operation has no options.");
+ }
+
+ @Override
+ public String getComment() {
+ return queryOperation.asSummaryString();
+ }
+
@Override
public QueryOperationCatalogView copy() {
- return new QueryOperationCatalogView(this.queryOperation, getComment());
+ return new QueryOperationCatalogView(queryOperation);
}
@Override
@@ -65,4 +70,16 @@ public Optional getDescription() {
public Optional getDetailedDescription() {
return getDescription();
}
+
+ @Override
+ public String getOriginalQuery() {
+ throw new TableException(
+ "A view backed by a query operation has no serializable representation.");
+ }
+
+ @Override
+ public String getExpandedQuery() {
+ throw new TableException(
+ "A view backed by a query operation has no serializable representation.");
+ }
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
index d29e31ecfddba..9a7bca655d1b7 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
@@ -81,8 +81,11 @@ final class ResolveCallByArgumentsRule implements ResolverRule {
@Override
public List apply(List expression, ResolutionContext context) {
+ // only the top-level expressions may access the output data type
+ final SurroundingInfo surroundingInfo =
+ context.getOutputDataType().map(SurroundingInfo::of).orElse(null);
return expression.stream()
- .flatMap(expr -> expr.accept(new ResolvingCallVisitor(context, null)).stream())
+ .flatMap(e -> e.accept(new ResolvingCallVisitor(context, surroundingInfo)).stream())
.collect(Collectors.toList());
}
@@ -120,23 +123,23 @@ public List visit(UnresolvedCallExpression unresolvedCall) {
// resolve the children with information from the current call
final List resolvedArgs = new ArrayList<>();
final int argCount = unresolvedCall.getChildren().size();
+
for (int i = 0; i < argCount; i++) {
final int currentPos = i;
+ final SurroundingInfo surroundingInfo =
+ typeInference
+ .map(
+ inference ->
+ SurroundingInfo.of(
+ name,
+ definition,
+ inference,
+ argCount,
+ currentPos,
+ resolutionContext.isGroupedAggregation()))
+ .orElse(null);
final ResolvingCallVisitor childResolver =
- new ResolvingCallVisitor(
- resolutionContext,
- typeInference
- .map(
- inference ->
- new SurroundingInfo(
- name,
- definition,
- inference,
- argCount,
- currentPos,
- resolutionContext
- .isGroupedAggregation()))
- .orElse(null));
+ new ResolvingCallVisitor(resolutionContext, surroundingInfo);
resolvedArgs.addAll(unresolvedCall.getChildren().get(i).accept(childResolver));
}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
index 6e0edd65e3770..316c24a4d0def 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
@@ -21,8 +21,11 @@
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -39,6 +42,7 @@
import java.util.Collections;
import static org.apache.flink.table.api.Expressions.callSql;
+import static org.apache.flink.table.api.Expressions.sourceWatermark;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isProctimeAttribute;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isRowtimeAttribute;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isTimeAttribute;
@@ -90,23 +94,21 @@ public class SchemaResolutionTest {
// the type of ts_ltz is TIMESTAMP_LTZ
private static final String COMPUTED_SQL_WITH_TS_LTZ = "ts_ltz - INTERVAL '60' MINUTE";
+
private static final ResolvedExpression COMPUTED_COLUMN_RESOLVED_WITH_TS_LTZ =
new ResolvedExpressionMock(DataTypes.TIMESTAMP_LTZ(3), () -> COMPUTED_SQL_WITH_TS_LTZ);
+
private static final String WATERMARK_SQL_WITH_TS_LTZ = "ts1 - INTERVAL '5' SECOND";
+
private static final ResolvedExpression WATERMARK_RESOLVED_WITH_TS_LTZ =
new ResolvedExpressionMock(DataTypes.TIMESTAMP_LTZ(3), () -> WATERMARK_SQL_WITH_TS_LTZ);
+
private static final Schema SCHEMA_WITH_TS_LTZ =
Schema.newBuilder()
- .primaryKeyNamed("primary_constraint", "id") // out of order
.column("id", DataTypes.INT().notNull())
- .column("counter", DataTypes.INT().notNull())
- .column("payload", "ROW")
- .columnByMetadata("topic", DataTypes.STRING(), true)
- .columnByExpression(
- "ts1", callSql(COMPUTED_SQL_WITH_TS_LTZ)) // out of order API expression
+ .columnByExpression("ts1", callSql(COMPUTED_SQL_WITH_TS_LTZ))
.columnByMetadata("ts_ltz", DataTypes.TIMESTAMP_LTZ(3), "timestamp")
.watermark("ts1", WATERMARK_SQL_WITH_TS_LTZ)
- .columnByExpression("proctime", PROCTIME_SQL)
.build();
@Test
@@ -152,38 +154,53 @@ public void testSchemaResolutionWithTimestampLtzRowtime() {
new ResolvedSchema(
Arrays.asList(
Column.physical("id", DataTypes.INT().notNull()),
- Column.physical("counter", DataTypes.INT().notNull()),
- Column.physical(
- "payload",
- DataTypes.ROW(
- DataTypes.FIELD("name", DataTypes.STRING()),
- DataTypes.FIELD("age", DataTypes.INT()),
- DataTypes.FIELD("flag", DataTypes.BOOLEAN()))),
- Column.metadata("topic", DataTypes.STRING(), null, true),
Column.computed("ts1", COMPUTED_COLUMN_RESOLVED_WITH_TS_LTZ),
Column.metadata(
- "ts_ltz", DataTypes.TIMESTAMP_LTZ(3), "timestamp", false),
- Column.computed("proctime", PROCTIME_RESOLVED)),
+ "ts_ltz", DataTypes.TIMESTAMP_LTZ(3), "timestamp", false)),
Collections.singletonList(
WatermarkSpec.of("ts1", WATERMARK_RESOLVED_WITH_TS_LTZ)),
- UniqueConstraint.primaryKey(
- "primary_constraint", Collections.singletonList("id")));
+ null);
final ResolvedSchema actualStreamSchema = resolveSchema(SCHEMA_WITH_TS_LTZ, true);
{
assertThat(actualStreamSchema, equalTo(expectedSchema));
assertTrue(isRowtimeAttribute(getType(actualStreamSchema, "ts1")));
- assertTrue(isProctimeAttribute(getType(actualStreamSchema, "proctime")));
}
final ResolvedSchema actualBatchSchema = resolveSchema(SCHEMA_WITH_TS_LTZ, false);
{
assertThat(actualBatchSchema, equalTo(expectedSchema));
assertFalse(isRowtimeAttribute(getType(actualBatchSchema, "ts1")));
- assertTrue(isProctimeAttribute(getType(actualBatchSchema, "proctime")));
}
}
+ @Test
+ public void testSchemaResolutionWithSourceWatermark() {
+ final ResolvedSchema expectedSchema =
+ new ResolvedSchema(
+ Collections.singletonList(
+ Column.physical("ts_ltz", DataTypes.TIMESTAMP_LTZ(1))),
+ Collections.singletonList(
+ WatermarkSpec.of(
+ "ts_ltz",
+ new CallExpression(
+ FunctionIdentifier.of(
+ BuiltInFunctionDefinitions.SOURCE_WATERMARK
+ .getName()),
+ BuiltInFunctionDefinitions.SOURCE_WATERMARK,
+ Collections.emptyList(),
+ DataTypes.TIMESTAMP_LTZ(1)))),
+ null);
+ final ResolvedSchema resolvedSchema =
+ resolveSchema(
+ Schema.newBuilder()
+ .column("ts_ltz", DataTypes.TIMESTAMP_LTZ(1))
+ .watermark("ts_ltz", sourceWatermark())
+ .build());
+
+ assertThat(resolvedSchema, equalTo(expectedSchema));
+ }
+
@Test
public void testSchemaResolutionErrors() {
@@ -282,20 +299,6 @@ public void testUnresolvedSchemaString() {
+ " WATERMARK FOR `ts` AS [ts - INTERVAL '5' SECOND],\n"
+ " CONSTRAINT `primary_constraint` PRIMARY KEY (`id`) NOT ENFORCED\n"
+ ")"));
- assertThat(
- SCHEMA_WITH_TS_LTZ.toString(),
- equalTo(
- "(\n"
- + " `id` INT NOT NULL,\n"
- + " `counter` INT NOT NULL,\n"
- + " `payload` [ROW],\n"
- + " `topic` METADATA VIRTUAL,\n"
- + " `ts1` AS [ts_ltz - INTERVAL '60' MINUTE],\n"
- + " `ts_ltz` METADATA FROM 'timestamp',\n"
- + " `proctime` AS [PROCTIME()],\n"
- + " WATERMARK FOR `ts1` AS [ts1 - INTERVAL '5' SECOND],\n"
- + " CONSTRAINT `primary_constraint` PRIMARY KEY (`id`) NOT ENFORCED\n"
- + ")"));
}
@Test
@@ -315,22 +318,6 @@ public void testResolvedSchemaString() {
+ " WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '5' SECOND,\n"
+ " CONSTRAINT `primary_constraint` PRIMARY KEY (`id`) NOT ENFORCED\n"
+ ")"));
-
- final ResolvedSchema resolvedSchemaWithTsLtz = resolveSchema(SCHEMA_WITH_TS_LTZ);
- assertThat(
- resolvedSchemaWithTsLtz.toString(),
- equalTo(
- "(\n"
- + " `id` INT NOT NULL,\n"
- + " `counter` INT NOT NULL,\n"
- + " `payload` ROW<`name` STRING, `age` INT, `flag` BOOLEAN>,\n"
- + " `topic` STRING METADATA VIRTUAL,\n"
- + " `ts1` TIMESTAMP_LTZ(3) *ROWTIME* AS ts_ltz - INTERVAL '60' MINUTE,\n"
- + " `ts_ltz` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',\n"
- + " `proctime` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME(),\n"
- + " WATERMARK FOR `ts1`: TIMESTAMP_LTZ(3) AS ts1 - INTERVAL '5' SECOND,\n"
- + " CONSTRAINT `primary_constraint` PRIMARY KEY (`id`) NOT ENFORCED\n"
- + ")"));
}
@Test
diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
index 552ca9de97663..dd655b07b5b9e 100644
--- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
+++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
@@ -20,10 +20,11 @@ package org.apache.flink.table.api
import org.apache.flink.annotation.PublicEvolving
import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark
import org.apache.flink.table.expressions.ApiExpressionUtils.{unresolvedCall, unresolvedRef, valueLiteral}
import org.apache.flink.table.expressions.{ApiExpressionUtils, Expression, TableSymbol, TimePointUnit}
import org.apache.flink.table.functions.BuiltInFunctionDefinitions.{DISTINCT, RANGE_TO}
-import org.apache.flink.table.functions.{ScalarFunction, TableFunction, ImperativeAggregateFunction, UserDefinedFunctionHelper, _}
+import org.apache.flink.table.functions.{ImperativeAggregateFunction, ScalarFunction, TableFunction, UserDefinedFunctionHelper, _}
import org.apache.flink.table.types.DataType
import org.apache.flink.types.Row
@@ -464,12 +465,9 @@ trait ImplicitExpressionConversions {
/**
* Converts a numeric type epoch time to [[DataTypes#TIMESTAMP_LTZ]].
*
- *
The supported precision is 0 or 3:
- *
- *
- *
0 means the numericEpochTime is in second.
- *
3 means the numericEpochTime is in millisecond.
- *
+ * The supported precision is 0 or 3:
+ * - 0 means the numericEpochTime is in second.
+ * - 3 means the numericEpochTime is in millisecond.
*/
def toTimestampLtz(numericEpochTime: Expression, precision: Expression): Expression = {
Expressions.toTimestampLtz(numericEpochTime, precision)
@@ -681,6 +679,21 @@ trait ImplicitExpressionConversions {
Expressions.log(base, value)
}
+ /**
+ * Source watermark declaration for [[Schema]].
+ *
+ * This is a marker function that doesn't have concrete runtime implementation.
+ * It can only be used as a single expression in [[Schema.Builder#watermark(String, Expression)]].
+ * The declaration will be pushed down into a table source that implements the
+ * [[SupportsSourceWatermark]] interface. The source will emit system-defined watermarks
+ * afterwards.
+ *
+ * Please check the documentation whether the connector supports source watermarks.
+ */
+ def sourceWatermark(): Expression = {
+ Expressions.sourceWatermark()
+ }
+
/**
* Ternary conditional operator that decides which of two other expressions should be evaluated
* based on a evaluated boolean condition.
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java
index 1a045ae6b6066..6e36e31899c3b 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java
@@ -216,63 +216,50 @@ public static TableException createUnexpectedException(
*
* @see CallContext#getOutputDataType()
*/
- public static final class SurroundingInfo {
+ public interface SurroundingInfo {
- private final String name;
-
- private final FunctionDefinition functionDefinition;
-
- private final TypeInference typeInference;
-
- private final int argumentCount;
-
- private final int innerCallPosition;
-
- private final boolean isGroupedAggregation;
-
- public SurroundingInfo(
+ static SurroundingInfo of(
String name,
FunctionDefinition functionDefinition,
TypeInference typeInference,
int argumentCount,
int innerCallPosition,
boolean isGroupedAggregation) {
- this.name = name;
- this.functionDefinition = functionDefinition;
- this.typeInference = typeInference;
- this.argumentCount = argumentCount;
- this.innerCallPosition = innerCallPosition;
- this.isGroupedAggregation = isGroupedAggregation;
+ return typeFactory -> {
+ final boolean isValidCount =
+ validateArgumentCount(
+ typeInference.getInputTypeStrategy().getArgumentCount(),
+ argumentCount,
+ false);
+ if (!isValidCount) {
+ return Optional.empty();
+ }
+ // for "takes_string(this_function(NULL))" simulate "takes_string(NULL)"
+ // for retrieving the output type of "this_function(NULL)"
+ final CallContext callContext =
+ new UnknownCallContext(
+ typeFactory,
+ name,
+ functionDefinition,
+ argumentCount,
+ isGroupedAggregation);
+
+ // We might not be able to infer the input types at this moment, if the surrounding
+ // function does not provide an explicit input type strategy.
+ final CallContext adaptedContext =
+ adaptArguments(typeInference, callContext, null, false);
+ return typeInference
+ .getInputTypeStrategy()
+ .inferInputTypes(adaptedContext, false)
+ .map(dataTypes -> dataTypes.get(innerCallPosition));
+ };
}
- private Optional inferOutputType(DataTypeFactory typeFactory) {
- final boolean isValidCount =
- validateArgumentCount(
- typeInference.getInputTypeStrategy().getArgumentCount(),
- argumentCount,
- false);
- if (!isValidCount) {
- return Optional.empty();
- }
- // for "takes_string(this_function(NULL))" simulate "takes_string(NULL)"
- // for retrieving the output type of "this_function(NULL)"
- final CallContext callContext =
- new UnknownCallContext(
- typeFactory,
- name,
- functionDefinition,
- argumentCount,
- isGroupedAggregation);
-
- // We might not be able to infer the input types at this moment, if the surrounding
- // function does not provide an explicit input type strategy.
- final CallContext adaptedContext =
- adaptArguments(typeInference, callContext, null, false);
- return typeInference
- .getInputTypeStrategy()
- .inferInputTypes(adaptedContext, false)
- .map(dataTypes -> dataTypes.get(innerCallPosition));
+ static SurroundingInfo of(DataType dataType) {
+ return typeFactory -> Optional.of(dataType);
}
+
+ Optional inferOutputType(DataTypeFactory typeFactory);
}
/**
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java
index 8e9f66d880775..52786efe7211e 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java
@@ -117,7 +117,7 @@ private TypeInferenceUtil.Result runTypeInference(List actualArgumentT
.outputTypeStrategy(TypeStrategies.MISSING)
.build();
surroundingInfo =
- new TypeInferenceUtil.SurroundingInfo(
+ TypeInferenceUtil.SurroundingInfo.of(
"f_outer",
functionDefinitionMock,
outerTypeInference,
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
index 08d99599a9add..c5ea4ab650d5a 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
@@ -20,30 +20,29 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogManager.TableLookupResult;
import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ConnectorCatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.factories.TableFactoryUtil;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.factories.TableSourceFactoryContextImpl;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
import org.apache.flink.table.planner.sources.TableSourceUtil;
+import org.apache.flink.table.runtime.types.PlannerTypeUtils;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.sources.TableSourceValidation;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.TimestampType;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -52,9 +51,7 @@
import java.util.List;
import java.util.Optional;
-
-import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isRowtimeAttribute;
-import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isTimeAttribute;
+import java.util.stream.Collectors;
/**
* Represents a wrapper for {@link CatalogBaseTable} in {@link org.apache.calcite.schema.Schema}.
@@ -124,34 +121,7 @@ public boolean isStreamingMode() {
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) typeFactory;
- TableSchema tableSchema = TableSchema.fromResolvedSchema(lookupResult.getResolvedSchema());
- final DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
- CatalogBaseTable catalogTable = lookupResult.getTable();
- if (!isStreamingMode
- && catalogTable instanceof ConnectorCatalogTable
- && ((ConnectorCatalogTable, ?>) catalogTable).getTableSource().isPresent()) {
- // If the table source is bounded, materialize the time attributes to normal TIMESTAMP
- // type.
- // Now for ConnectorCatalogTable, there is no way to
- // deduce if it is bounded in the table environment, so the data types in TableSchema
- // always patched with TimeAttribute.
- // See ConnectorCatalogTable#calculateSourceSchema
- // for details.
-
- // Remove the patched time attributes type to let the TableSourceTable handle it.
- // We should remove this logic if the isBatch flag in ConnectorCatalogTable is fixed.
- // TODO: Fix FLINK-14844.
- for (int i = 0; i < fieldDataTypes.length; i++) {
- LogicalType lt = fieldDataTypes[i].getLogicalType();
- if (lt instanceof TimestampType && isRowtimeAttribute(lt)) {
- int precision = ((TimestampType) lt).getPrecision();
- fieldDataTypes[i] = DataTypes.TIMESTAMP(precision);
- } else if (lt instanceof LocalZonedTimestampType && isTimeAttribute(lt)) {
- int precision = ((LocalZonedTimestampType) lt).getPrecision();
- fieldDataTypes[i] = DataTypes.TIMESTAMP_LTZ(precision);
- }
- }
- }
+ final ResolvedSchema schema = lookupResult.getResolvedSchema();
// The following block is a workaround to support tables defined by
// TableEnvironment.connect() and
@@ -159,9 +129,10 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
// It should be removed after we remove DefinedProctimeAttribute/DefinedRowtimeAttributes.
Optional> sourceOpt = findAndCreateTableSource();
if (isStreamingMode
- && tableSchema.getTableColumns().stream().allMatch(TableColumn::isPhysical)
- && tableSchema.getWatermarkSpecs().isEmpty()
+ && schema.getColumns().stream().allMatch(Column::isPhysical)
+ && schema.getWatermarkSpecs().isEmpty()
&& sourceOpt.isPresent()) {
+ TableSchema tableSchema = TableSchema.fromResolvedSchema(schema);
TableSource> source = sourceOpt.get();
if (TableSourceValidation.hasProctimeAttribute(source)
|| TableSourceValidation.hasRowtimeAttribute(source)) {
@@ -171,10 +142,17 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
// ConnectorCatalogTable#calculateSourceSchema
tableSchema = ConnectorCatalogTable.calculateSourceSchema(source, false);
}
+ return TableSourceUtil.getSourceRowType(
+ flinkTypeFactory, tableSchema, scala.Option.empty(), true);
}
- return TableSourceUtil.getSourceRowType(
- flinkTypeFactory, tableSchema, scala.Option.empty(), isStreamingMode);
+ final List fieldNames = schema.getColumnNames();
+ final List fieldTypes =
+ schema.getColumnDataTypes().stream()
+ .map(DataType::getLogicalType)
+ .map(PlannerTypeUtils::removeLegacyTypes)
+ .collect(Collectors.toList());
+ return flinkTypeFactory.buildRelNodeRowType(fieldNames, fieldTypes);
}
@Override
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index b732265ba61b7..6222b236be8c0 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -20,13 +20,12 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.TableColumn.MetadataColumn;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.Column.MetadataColumn;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
@@ -184,7 +183,7 @@ private static RelNode convertSinkToRel(
final DataTypeFactory dataTypeFactory =
unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();
final FlinkTypeFactory typeFactory = unwrapTypeFactory(relBuilder);
- final TableSchema schema = table.getSchema();
+ final ResolvedSchema schema = table.getResolvedSchema();
List sinkAbilitySpecs = new ArrayList<>();
@@ -230,7 +229,7 @@ private static RelNode convertSinkToRel(
*/
public static RelNode validateSchemaAndApplyImplicitCast(
RelNode query,
- TableSchema sinkSchema,
+ ResolvedSchema sinkSchema,
@Nullable ObjectIdentifier sinkIdentifier,
DataTypeFactory dataTypeFactory,
FlinkTypeFactory typeFactory) {
@@ -239,7 +238,7 @@ public static RelNode validateSchemaAndApplyImplicitCast(
final RowType sinkType =
(RowType)
- fixSinkDataType(dataTypeFactory, sinkSchema.toPersistedRowDataType())
+ fixSinkDataType(dataTypeFactory, sinkSchema.toSinkRowDataType())
.getLogicalType();
final List sinkFields = sinkType.getFields();
@@ -284,10 +283,10 @@ public static RelNode validateSchemaAndApplyImplicitCast(
private static void pushMetadataProjection(
FlinkRelBuilder relBuilder,
FlinkTypeFactory typeFactory,
- TableSchema schema,
+ ResolvedSchema schema,
DynamicTableSink sink) {
final RexBuilder rexBuilder = relBuilder.getRexBuilder();
- final List tableColumns = schema.getTableColumns();
+ final List columns = schema.getColumns();
final List physicalColumns = extractPhysicalColumns(schema);
@@ -297,9 +296,9 @@ private static void pushMetadataProjection(
Collectors.toMap(
pos -> {
final MetadataColumn metadataColumn =
- (MetadataColumn) tableColumns.get(pos);
+ (MetadataColumn) columns.get(pos);
return metadataColumn
- .getMetadataAlias()
+ .getMetadataKey()
.orElse(metadataColumn.getName());
},
Function.identity()));
@@ -311,13 +310,11 @@ private static void pushMetadataProjection(
final List fieldNames =
Stream.concat(
- physicalColumns.stream()
- .map(tableColumns::get)
- .map(TableColumn::getName),
+ physicalColumns.stream().map(columns::get).map(Column::getName),
metadataColumns.stream()
- .map(tableColumns::get)
+ .map(columns::get)
.map(MetadataColumn.class::cast)
- .map(c -> c.getMetadataAlias().orElse(c.getName())))
+ .map(c -> c.getMetadataKey().orElse(c.getName())))
.collect(Collectors.toList());
final Map metadataMap = extractMetadataMap(sink);
@@ -328,18 +325,17 @@ private static void pushMetadataProjection(
.map(
pos -> {
final int posAdjusted =
- adjustByVirtualColumns(
- tableColumns, pos);
+ adjustByVirtualColumns(columns, pos);
return relBuilder.field(posAdjusted);
}),
metadataColumns.stream()
.map(
pos -> {
final MetadataColumn metadataColumn =
- (MetadataColumn) tableColumns.get(pos);
+ (MetadataColumn) columns.get(pos);
final String metadataKey =
metadataColumn
- .getMetadataAlias()
+ .getMetadataKey()
.orElse(
metadataColumn
.getName());
@@ -354,8 +350,7 @@ private static void pushMetadataProjection(
expectedType);
final int posAdjusted =
- adjustByVirtualColumns(
- tableColumns, pos);
+ adjustByVirtualColumns(columns, pos);
return rexBuilder.makeAbstractCast(
expectedRelDataType,
relBuilder.field(posAdjusted));
@@ -374,13 +369,13 @@ private static void prepareDynamicSink(
Map staticPartitions,
boolean isOverwrite,
DynamicTableSink sink,
- CatalogTable table,
+ ResolvedCatalogTable table,
List sinkAbilitySpecs) {
validatePartitioning(sinkIdentifier, staticPartitions, sink, table.getPartitionKeys());
validateAndApplyOverwrite(sinkIdentifier, isOverwrite, sink, sinkAbilitySpecs);
- validateAndApplyMetadata(sinkIdentifier, sink, table.getSchema(), sinkAbilitySpecs);
+ validateAndApplyMetadata(sinkIdentifier, sink, table.getResolvedSchema(), sinkAbilitySpecs);
}
/**
@@ -391,15 +386,15 @@ private static void prepareDynamicSink(
* #prepareDynamicSink}.
*/
private static List createRequiredMetadataKeys(
- TableSchema schema, DynamicTableSink sink) {
- final List tableColumns = schema.getTableColumns();
+ ResolvedSchema schema, DynamicTableSink sink) {
+ final List tableColumns = schema.getColumns();
final List metadataColumns = extractPersistedMetadataColumns(schema);
final Set requiredMetadataKeys =
metadataColumns.stream()
.map(tableColumns::get)
.map(MetadataColumn.class::cast)
- .map(c -> c.getMetadataAlias().orElse(c.getName()))
+ .map(c -> c.getMetadataKey().orElse(c.getName()))
.collect(Collectors.toSet());
final Map metadataMap = extractMetadataMap(sink);
@@ -506,33 +501,29 @@ private static void validateAndApplyOverwrite(
sinkAbilitySpecs.add(new OverwriteSpec(true));
}
- private static List extractPhysicalColumns(TableSchema schema) {
- final List tableColumns = schema.getTableColumns();
- return IntStream.range(0, schema.getFieldCount())
- .filter(pos -> tableColumns.get(pos).isPhysical())
+ private static List extractPhysicalColumns(ResolvedSchema schema) {
+ final List columns = schema.getColumns();
+ return IntStream.range(0, schema.getColumnCount())
+ .filter(pos -> columns.get(pos).isPhysical())
.boxed()
.collect(Collectors.toList());
}
- private static List extractPersistedMetadataColumns(TableSchema schema) {
- final List tableColumns = schema.getTableColumns();
- return IntStream.range(0, schema.getFieldCount())
+ private static List extractPersistedMetadataColumns(ResolvedSchema schema) {
+ final List columns = schema.getColumns();
+ return IntStream.range(0, schema.getColumnCount())
.filter(
pos -> {
- final TableColumn tableColumn = tableColumns.get(pos);
- return tableColumn instanceof MetadataColumn
- && tableColumn.isPersisted();
+ final Column column = columns.get(pos);
+ return column instanceof MetadataColumn && column.isPersisted();
})
.boxed()
.collect(Collectors.toList());
}
- private static int adjustByVirtualColumns(List tableColumns, int pos) {
+ private static int adjustByVirtualColumns(List columns, int pos) {
return pos
- - (int)
- IntStream.range(0, pos)
- .filter(i -> !tableColumns.get(i).isPersisted())
- .count();
+ - (int) IntStream.range(0, pos).filter(i -> !columns.get(i).isPersisted()).count();
}
private static Map extractMetadataMap(DynamicTableSink sink) {
@@ -545,9 +536,9 @@ private static Map extractMetadataMap(DynamicTableSink sink) {
private static void validateAndApplyMetadata(
ObjectIdentifier sinkIdentifier,
DynamicTableSink sink,
- TableSchema schema,
+ ResolvedSchema schema,
List sinkAbilitySpecs) {
- final List tableColumns = schema.getTableColumns();
+ final List columns = schema.getColumns();
final List metadataColumns = extractPersistedMetadataColumns(schema);
if (metadataColumns.isEmpty()) {
@@ -569,10 +560,10 @@ private static void validateAndApplyMetadata(
((SupportsWritingMetadata) sink).listWritableMetadata();
metadataColumns.forEach(
pos -> {
- final MetadataColumn metadataColumn = (MetadataColumn) tableColumns.get(pos);
+ final MetadataColumn metadataColumn = (MetadataColumn) columns.get(pos);
final String metadataKey =
- metadataColumn.getMetadataAlias().orElse(metadataColumn.getName());
- final LogicalType metadataType = metadataColumn.getType().getLogicalType();
+ metadataColumn.getMetadataKey().orElse(metadataColumn.getName());
+ final LogicalType metadataType = metadataColumn.getDataType().getLogicalType();
final DataType expectedMetadataDataType = metadataMap.get(metadataKey);
// check that metadata key is valid
if (expectedMetadataDataType == null) {
@@ -626,13 +617,13 @@ private static void validateAndApplyMetadata(
*
*