Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions flink-python/pyflink/table/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,20 @@ def log(v, base=None) -> Expression[float]:
return _binary_op("log", base, v)


def source_watermark() -> Expression:
"""
Source watermark declaration for schema.

This is a marker function that doesn't have concrete runtime implementation. It can only
be used as a single expression for watermark strategies in schema declarations. The declaration
will be pushed down into a table source that implements the `SupportsSourceWatermark`
interface. The source will emit system-defined watermarks afterwards.

Please check the documentation whether the connector supports source watermarks.
"""
return _leaf_op("sourceWatermark")


def if_then_else(condition: Union[bool, Expression[bool]], if_true, if_false) -> Expression:
"""
Ternary conditional operator that decides which of two other expressions should be evaluated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ResolvedExpression;
Expand Down Expand Up @@ -464,6 +465,21 @@ public static ApiExpression log(Object base, Object value) {
return apiCall(BuiltInFunctionDefinitions.LOG, base, value);
}

/**
* Source watermark declaration for {@link Schema}.
*
* <p>This is a marker function that doesn't have concrete runtime implementation. It can only
* be used as a single expression in {@link Schema.Builder#watermark(String, Expression)}. The
* declaration will be pushed down into a table source that implements the {@link
* SupportsSourceWatermark} interface. The source will emit system-defined watermarks
* afterwards.
*
* <p>Please check the documentation whether the connector supports source watermarks.
*/
public static ApiExpression sourceWatermark() {
return apiCall(BuiltInFunctionDefinitions.SOURCE_WATERMARK);
}

/**
* Ternary conditional operator that decides which of two other expressions should be evaluated
* based on a evaluated boolean condition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,46 @@
package org.apache.flink.table.catalog;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.operations.QueryOperation;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
* A view created from a {@link QueryOperation} via operations on {@link
* org.apache.flink.table.api.Table}.
*/
/** A view created from a {@link QueryOperation} via operations on {@link Table}. */
@Internal
public class QueryOperationCatalogView extends AbstractCatalogView {
public final class QueryOperationCatalogView implements CatalogView {

private final QueryOperation queryOperation;

public QueryOperationCatalogView(QueryOperation queryOperation) {
this(queryOperation, "");
}

public QueryOperationCatalogView(QueryOperation queryOperation, String comment) {
super(
queryOperation.asSummaryString(),
queryOperation.asSummaryString(),
TableSchema.fromResolvedSchema(queryOperation.getResolvedSchema()),
new HashMap<>(),
comment);
this.queryOperation = queryOperation;
}

public QueryOperation getQueryOperation() {
return queryOperation;
}

@Override
public Schema getUnresolvedSchema() {
return Schema.newBuilder().fromResolvedSchema(queryOperation.getResolvedSchema()).build();
}

@Override
public Map<String, String> getOptions() {
throw new TableException("A view backed by a query operation has no options.");
}

@Override
public String getComment() {
return queryOperation.asSummaryString();
}

@Override
public QueryOperationCatalogView copy() {
return new QueryOperationCatalogView(this.queryOperation, getComment());
return new QueryOperationCatalogView(queryOperation);
}

@Override
Expand All @@ -65,4 +70,16 @@ public Optional<String> getDescription() {
public Optional<String> getDetailedDescription() {
return getDescription();
}

@Override
public String getOriginalQuery() {
throw new TableException(
"A view backed by a query operation has no serializable representation.");
}

@Override
public String getExpandedQuery() {
throw new TableException(
"A view backed by a query operation has no serializable representation.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,11 @@ final class ResolveCallByArgumentsRule implements ResolverRule {

@Override
public List<Expression> apply(List<Expression> expression, ResolutionContext context) {
// only the top-level expressions may access the output data type
final SurroundingInfo surroundingInfo =
context.getOutputDataType().map(SurroundingInfo::of).orElse(null);
return expression.stream()
.flatMap(expr -> expr.accept(new ResolvingCallVisitor(context, null)).stream())
.flatMap(e -> e.accept(new ResolvingCallVisitor(context, surroundingInfo)).stream())
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -120,23 +123,23 @@ public List<ResolvedExpression> visit(UnresolvedCallExpression unresolvedCall) {
// resolve the children with information from the current call
final List<ResolvedExpression> resolvedArgs = new ArrayList<>();
final int argCount = unresolvedCall.getChildren().size();

for (int i = 0; i < argCount; i++) {
final int currentPos = i;
final SurroundingInfo surroundingInfo =
typeInference
.map(
inference ->
SurroundingInfo.of(
name,
definition,
inference,
argCount,
currentPos,
resolutionContext.isGroupedAggregation()))
.orElse(null);
final ResolvingCallVisitor childResolver =
new ResolvingCallVisitor(
resolutionContext,
typeInference
.map(
inference ->
new SurroundingInfo(
name,
definition,
inference,
argCount,
currentPos,
resolutionContext
.isGroupedAggregation()))
.orElse(null));
new ResolvingCallVisitor(resolutionContext, surroundingInfo);
resolvedArgs.addAll(unresolvedCall.getChildren().get(i).accept(childResolver));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
Expand All @@ -39,6 +42,7 @@
import java.util.Collections;

import static org.apache.flink.table.api.Expressions.callSql;
import static org.apache.flink.table.api.Expressions.sourceWatermark;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isProctimeAttribute;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isRowtimeAttribute;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isTimeAttribute;
Expand Down Expand Up @@ -90,23 +94,21 @@ public class SchemaResolutionTest {

// the type of ts_ltz is TIMESTAMP_LTZ
private static final String COMPUTED_SQL_WITH_TS_LTZ = "ts_ltz - INTERVAL '60' MINUTE";

private static final ResolvedExpression COMPUTED_COLUMN_RESOLVED_WITH_TS_LTZ =
new ResolvedExpressionMock(DataTypes.TIMESTAMP_LTZ(3), () -> COMPUTED_SQL_WITH_TS_LTZ);

private static final String WATERMARK_SQL_WITH_TS_LTZ = "ts1 - INTERVAL '5' SECOND";

private static final ResolvedExpression WATERMARK_RESOLVED_WITH_TS_LTZ =
new ResolvedExpressionMock(DataTypes.TIMESTAMP_LTZ(3), () -> WATERMARK_SQL_WITH_TS_LTZ);

private static final Schema SCHEMA_WITH_TS_LTZ =
Schema.newBuilder()
.primaryKeyNamed("primary_constraint", "id") // out of order
.column("id", DataTypes.INT().notNull())
.column("counter", DataTypes.INT().notNull())
.column("payload", "ROW<name STRING, age INT, flag BOOLEAN>")
.columnByMetadata("topic", DataTypes.STRING(), true)
.columnByExpression(
"ts1", callSql(COMPUTED_SQL_WITH_TS_LTZ)) // out of order API expression
.columnByExpression("ts1", callSql(COMPUTED_SQL_WITH_TS_LTZ))
.columnByMetadata("ts_ltz", DataTypes.TIMESTAMP_LTZ(3), "timestamp")
.watermark("ts1", WATERMARK_SQL_WITH_TS_LTZ)
.columnByExpression("proctime", PROCTIME_SQL)
.build();

@Test
Expand Down Expand Up @@ -152,38 +154,53 @@ public void testSchemaResolutionWithTimestampLtzRowtime() {
new ResolvedSchema(
Arrays.asList(
Column.physical("id", DataTypes.INT().notNull()),
Column.physical("counter", DataTypes.INT().notNull()),
Column.physical(
"payload",
DataTypes.ROW(
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("age", DataTypes.INT()),
DataTypes.FIELD("flag", DataTypes.BOOLEAN()))),
Column.metadata("topic", DataTypes.STRING(), null, true),
Column.computed("ts1", COMPUTED_COLUMN_RESOLVED_WITH_TS_LTZ),
Column.metadata(
"ts_ltz", DataTypes.TIMESTAMP_LTZ(3), "timestamp", false),
Column.computed("proctime", PROCTIME_RESOLVED)),
"ts_ltz", DataTypes.TIMESTAMP_LTZ(3), "timestamp", false)),
Collections.singletonList(
WatermarkSpec.of("ts1", WATERMARK_RESOLVED_WITH_TS_LTZ)),
UniqueConstraint.primaryKey(
"primary_constraint", Collections.singletonList("id")));
null);

final ResolvedSchema actualStreamSchema = resolveSchema(SCHEMA_WITH_TS_LTZ, true);
{
assertThat(actualStreamSchema, equalTo(expectedSchema));
assertTrue(isRowtimeAttribute(getType(actualStreamSchema, "ts1")));
assertTrue(isProctimeAttribute(getType(actualStreamSchema, "proctime")));
}

final ResolvedSchema actualBatchSchema = resolveSchema(SCHEMA_WITH_TS_LTZ, false);
{
assertThat(actualBatchSchema, equalTo(expectedSchema));
assertFalse(isRowtimeAttribute(getType(actualBatchSchema, "ts1")));
assertTrue(isProctimeAttribute(getType(actualBatchSchema, "proctime")));
}
}

@Test
public void testSchemaResolutionWithSourceWatermark() {
final ResolvedSchema expectedSchema =
new ResolvedSchema(
Collections.singletonList(
Column.physical("ts_ltz", DataTypes.TIMESTAMP_LTZ(1))),
Collections.singletonList(
WatermarkSpec.of(
"ts_ltz",
new CallExpression(
FunctionIdentifier.of(
BuiltInFunctionDefinitions.SOURCE_WATERMARK
.getName()),
BuiltInFunctionDefinitions.SOURCE_WATERMARK,
Collections.emptyList(),
DataTypes.TIMESTAMP_LTZ(1)))),
null);
final ResolvedSchema resolvedSchema =
resolveSchema(
Schema.newBuilder()
.column("ts_ltz", DataTypes.TIMESTAMP_LTZ(1))
.watermark("ts_ltz", sourceWatermark())
.build());

assertThat(resolvedSchema, equalTo(expectedSchema));
}

@Test
public void testSchemaResolutionErrors() {

Expand Down Expand Up @@ -282,20 +299,6 @@ public void testUnresolvedSchemaString() {
+ " WATERMARK FOR `ts` AS [ts - INTERVAL '5' SECOND],\n"
+ " CONSTRAINT `primary_constraint` PRIMARY KEY (`id`) NOT ENFORCED\n"
+ ")"));
assertThat(
SCHEMA_WITH_TS_LTZ.toString(),
equalTo(
"(\n"
+ " `id` INT NOT NULL,\n"
+ " `counter` INT NOT NULL,\n"
+ " `payload` [ROW<name STRING, age INT, flag BOOLEAN>],\n"
+ " `topic` METADATA VIRTUAL,\n"
+ " `ts1` AS [ts_ltz - INTERVAL '60' MINUTE],\n"
+ " `ts_ltz` METADATA FROM 'timestamp',\n"
+ " `proctime` AS [PROCTIME()],\n"
+ " WATERMARK FOR `ts1` AS [ts1 - INTERVAL '5' SECOND],\n"
+ " CONSTRAINT `primary_constraint` PRIMARY KEY (`id`) NOT ENFORCED\n"
+ ")"));
}

@Test
Expand All @@ -315,22 +318,6 @@ public void testResolvedSchemaString() {
+ " WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '5' SECOND,\n"
+ " CONSTRAINT `primary_constraint` PRIMARY KEY (`id`) NOT ENFORCED\n"
+ ")"));

final ResolvedSchema resolvedSchemaWithTsLtz = resolveSchema(SCHEMA_WITH_TS_LTZ);
assertThat(
resolvedSchemaWithTsLtz.toString(),
equalTo(
"(\n"
+ " `id` INT NOT NULL,\n"
+ " `counter` INT NOT NULL,\n"
+ " `payload` ROW<`name` STRING, `age` INT, `flag` BOOLEAN>,\n"
+ " `topic` STRING METADATA VIRTUAL,\n"
+ " `ts1` TIMESTAMP_LTZ(3) *ROWTIME* AS ts_ltz - INTERVAL '60' MINUTE,\n"
+ " `ts_ltz` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',\n"
+ " `proctime` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME(),\n"
+ " WATERMARK FOR `ts1`: TIMESTAMP_LTZ(3) AS ts1 - INTERVAL '5' SECOND,\n"
+ " CONSTRAINT `primary_constraint` PRIMARY KEY (`id`) NOT ENFORCED\n"
+ ")"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ package org.apache.flink.table.api

import org.apache.flink.annotation.PublicEvolving
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark
import org.apache.flink.table.expressions.ApiExpressionUtils.{unresolvedCall, unresolvedRef, valueLiteral}
import org.apache.flink.table.expressions.{ApiExpressionUtils, Expression, TableSymbol, TimePointUnit}
import org.apache.flink.table.functions.BuiltInFunctionDefinitions.{DISTINCT, RANGE_TO}
import org.apache.flink.table.functions.{ScalarFunction, TableFunction, ImperativeAggregateFunction, UserDefinedFunctionHelper, _}
import org.apache.flink.table.functions.{ImperativeAggregateFunction, ScalarFunction, TableFunction, UserDefinedFunctionHelper, _}
import org.apache.flink.table.types.DataType
import org.apache.flink.types.Row

Expand Down Expand Up @@ -464,12 +465,9 @@ trait ImplicitExpressionConversions {
/**
* Converts a numeric type epoch time to [[DataTypes#TIMESTAMP_LTZ]].
*
* <p>The supported precision is 0 or 3:
*
* <ul>
* <li>0 means the numericEpochTime is in second.
* <li>3 means the numericEpochTime is in millisecond.
* </ul>
* The supported precision is 0 or 3:
* - 0 means the numericEpochTime is in second.
* - 3 means the numericEpochTime is in millisecond.
*/
def toTimestampLtz(numericEpochTime: Expression, precision: Expression): Expression = {
Expressions.toTimestampLtz(numericEpochTime, precision)
Expand Down Expand Up @@ -681,6 +679,21 @@ trait ImplicitExpressionConversions {
Expressions.log(base, value)
}

/**
* Source watermark declaration for [[Schema]].
*
* This is a marker function that doesn't have concrete runtime implementation.
* It can only be used as a single expression in [[Schema.Builder#watermark(String, Expression)]].
* The declaration will be pushed down into a table source that implements the
* [[SupportsSourceWatermark]] interface. The source will emit system-defined watermarks
* afterwards.
*
* Please check the documentation whether the connector supports source watermarks.
*/
def sourceWatermark(): Expression = {
Expressions.sourceWatermark()
}

/**
* Ternary conditional operator that decides which of two other expressions should be evaluated
* based on a evaluated boolean condition.
Expand Down
Loading