diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java index 35036259289ce..7158bfe89d965 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java @@ -34,6 +34,9 @@ import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.descriptors.ConnectorDescriptor; import org.apache.flink.table.descriptors.StreamTableDescriptor; import org.apache.flink.table.expressions.Expression; @@ -228,6 +231,7 @@ void registerFunction( * @param dataStream The {@link DataStream} to be converted. * @param The external type of the {@link DataStream}. * @return The converted {@link Table}. + * @see #fromChangelogStream(DataStream) */ Table fromDataStream(DataStream dataStream); @@ -300,7 +304,7 @@ void registerFunction( * // physical columns will be derived automatically * * Schema.newBuilder() - * .columnByMetadata("rowtime", "TIMESTAMP(3)") // extract timestamp into a table column + * .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)") // extract timestamp into a column * .watermark("rowtime", "SOURCE_WATERMARK()") // declare watermarks propagation * .build() * @@ -314,12 +318,125 @@ void registerFunction( * * * @param dataStream The {@link DataStream} to be converted. - * @param schema customized schema for the final table. + * @param schema The customized schema for the final table. * @param The external type of the {@link DataStream}. * @return The converted {@link Table}. + * @see #fromChangelogStream(DataStream, Schema) */ Table fromDataStream(DataStream dataStream, Schema schema); + /** + * Converts the given {@link DataStream} of changelog entries into a {@link Table}. + * + *

Compared to {@link #fromDataStream(DataStream)}, this method consumes instances of {@link + * Row} and evaluates the {@link RowKind} flag that is contained in every record during runtime. + * The runtime behavior is similar to that of a {@link DynamicTableSource}. + * + *

This method expects a changelog containing all kinds of changes (enumerated in {@link + * RowKind}) as the default {@link ChangelogMode}. Use {@link #fromChangelogStream(DataStream, + * Schema, ChangelogMode)} to limit the kinds of changes (e.g. for upsert mode). + * + *

Column names and types of the {@link Table} are automatically derived from the {@link + * TypeInformation} of the {@link DataStream}. If the outermost record's {@link TypeInformation} + * is a {@link CompositeType}, it will be flattened in the first level. {@link TypeInformation} + * that cannot be represented as one of the listed {@link DataTypes} will be treated as a + * black-box {@link DataTypes#RAW(Class, TypeSerializer)} type. Thus, composite nested fields + * will not be accessible. + * + *

By default, the stream record's timestamp and watermarks are not propagated unless + * explicitly declared via {@link #fromChangelogStream(DataStream, Schema)}. + * + * @param dataStream The changelog stream of {@link Row}. + * @return The converted {@link Table}. + */ + Table fromChangelogStream(DataStream dataStream); + + /** + * Converts the given {@link DataStream} of changelog entries into a {@link Table}. + * + *

Compared to {@link #fromDataStream(DataStream)}, this method consumes instances of {@link + * Row} and evaluates the {@link RowKind} flag that is contained in every record during runtime. + * The runtime behavior is similar to that of a {@link DynamicTableSource}. + * + *

This method expects a changelog containing all kinds of changes (enumerated in {@link + * RowKind}) as the default {@link ChangelogMode}. Use {@link #fromChangelogStream(DataStream, + * Schema, ChangelogMode)} to limit the kinds of changes (e.g. for upsert mode). + * + *

Column names and types of the {@link Table} are automatically derived from the {@link + * TypeInformation} of the {@link DataStream}. If the outermost record's {@link TypeInformation} + * is a {@link CompositeType}, it will be flattened in the first level. {@link TypeInformation} + * that cannot be represented as one of the listed {@link DataTypes} will be treated as a + * black-box {@link DataTypes#RAW(Class, TypeSerializer)} type. Thus, composite nested fields + * will not be accessible. + * + *

By default, the stream record's timestamp and watermarks are not propagated unless + * explicitly declared. + * + *

This method allows to declare a {@link Schema} for the resulting table. The declaration is + * similar to a {@code CREATE TABLE} DDL in SQL and allows to: + * + *

    + *
  • enrich or overwrite automatically derived columns with a custom {@link DataType} + *
  • reorder columns + *
  • add computed or metadata columns next to the physical columns + *
  • access a stream record's timestamp + *
  • declare a watermark strategy or propagate the {@link DataStream} watermarks + *
  • declare a primary key + *
+ * + *

See {@link #fromDataStream(DataStream, Schema)} for more information and examples on how + * to declare a {@link Schema}. + * + * @param dataStream The changelog stream of {@link Row}. + * @param schema The customized schema for the final table. + * @return The converted {@link Table}. + */ + Table fromChangelogStream(DataStream dataStream, Schema schema); + + /** + * Converts the given {@link DataStream} of changelog entries into a {@link Table}. + * + *

Compared to {@link #fromDataStream(DataStream)}, this method consumes instances of {@link + * Row} and evaluates the {@link RowKind} flag that is contained in every record during runtime. + * The runtime behavior is similar to that of a {@link DynamicTableSource}. + * + *

This method requires an explicitly declared {@link ChangelogMode}. For example, use {@link + * ChangelogMode#upsert()} if the stream will not contain {@link RowKind#UPDATE_BEFORE}, or + * {@link ChangelogMode#insertOnly()} for non-updating streams. + * + *

Column names and types of the {@link Table} are automatically derived from the {@link + * TypeInformation} of the {@link DataStream}. If the outermost record's {@link TypeInformation} + * is a {@link CompositeType}, it will be flattened in the first level. {@link TypeInformation} + * that cannot be represented as one of the listed {@link DataTypes} will be treated as a + * black-box {@link DataTypes#RAW(Class, TypeSerializer)} type. Thus, composite nested fields + * will not be accessible. + * + *

By default, the stream record's timestamp and watermarks are not propagated unless + * explicitly declared. + * + *

This method allows to declare a {@link Schema} for the resulting table. The declaration is + * similar to a {@code CREATE TABLE} DDL in SQL and allows to: + * + *

    + *
  • enrich or overwrite automatically derived columns with a custom {@link DataType} + *
  • reorder columns + *
  • add computed or metadata columns next to the physical columns + *
  • access a stream record's timestamp + *
  • declare a watermark strategy or propagate the {@link DataStream} watermarks + *
  • declare a primary key + *
+ * + *

See {@link #fromDataStream(DataStream, Schema)} for more information and examples of how + * to declare a {@link Schema}. + * + * @param dataStream The changelog stream of {@link Row}. + * @param schema The customized schema for the final table. + * @param changelogMode The expected kinds of changes in the incoming changelog. + * @return The converted {@link Table}. + */ + Table fromChangelogStream( + DataStream dataStream, Schema schema, ChangelogMode changelogMode); + /** * Creates a view from the given {@link DataStream} in a given path. Registered views can be * referenced in SQL queries. @@ -351,7 +468,7 @@ void registerFunction( * * @param path The path under which the {@link DataStream} is created. See also the {@link * TableEnvironment} class description for the format of the path. - * @param schema customized schema for the final table. + * @param schema The customized schema for the final table. * @param dataStream The {@link DataStream} out of which to create the view. * @param The type of the {@link DataStream}. */ @@ -376,9 +493,10 @@ void registerFunction( *

If the input table contains a single rowtime column, it will be propagated into a stream * record's timestamp. Watermarks will be propagated as well. * - * @param table The {@link Table} to convert. + * @param table The {@link Table} to convert. It must be insert-only. * @return The converted {@link DataStream}. * @see #toDataStream(Table, AbstractDataType) + * @see #toChangelogStream(Table) */ DataStream toDataStream(Table table); @@ -391,17 +509,18 @@ void registerFunction( *

This method is a shortcut for: * *

-     *     tableEnv.toDataStream(table, DataTypes.of(class))
+     *     tableEnv.toDataStream(table, DataTypes.of(targetClass))
      * 
* *

Calling this method with a class of {@link Row} will redirect to {@link * #toDataStream(Table)}. * - * @param table The {@link Table} to convert. + * @param table The {@link Table} to convert. It must be insert-only. * @param targetClass The {@link Class} that decides about the final external representation in * {@link DataStream} records. * @param External record. * @return The converted {@link DataStream}. + * @see #toChangelogStream(Table, Schema) */ DataStream toDataStream(Table table, Class targetClass); @@ -444,15 +563,164 @@ void registerFunction( *

If the input table contains a single rowtime column, it will be propagated into a stream * record's timestamp. Watermarks will be propagated as well. * - * @param table The {@link Table} to convert. + * @param table The {@link Table} to convert. It must be insert-only. * @param targetDataType The {@link DataType} that decides about the final external * representation in {@link DataStream} records. * @param External record. * @return The converted {@link DataStream}. * @see #toDataStream(Table) + * @see #toChangelogStream(Table, Schema) */ DataStream toDataStream(Table table, AbstractDataType targetDataType); + /** + * Converts the given {@link Table} into a {@link DataStream} of changelog entries. + * + *

Compared to {@link #toDataStream(Table)}, this method produces instances of {@link Row} + * and sets the {@link RowKind} flag that is contained in every record during runtime. The + * runtime behavior is similar to that of a {@link DynamicTableSink}. + * + *

This method can emit a changelog containing all kinds of changes (enumerated in {@link + * RowKind}) that the given updating table requires as the default {@link ChangelogMode}. Use + * {@link #toChangelogStream(Table, Schema, ChangelogMode)} to limit the kinds of changes (e.g. + * for upsert mode). + * + *

Note that the type system of the table ecosystem is richer than the one of the DataStream + * API. The table runtime will make sure to properly serialize the output records to the first + * operator of the DataStream API. Afterwards, the {@link Types} semantics of the DataStream API + * need to be considered. + * + *

If the input table contains a single rowtime column, it will be propagated into a stream + * record's timestamp. Watermarks will be propagated as well. + * + * @param table The {@link Table} to convert. It can be updating or insert-only. + * @return The converted changelog stream of {@link Row}. + */ + DataStream toChangelogStream(Table table); + + /** + * Converts the given {@link Table} into a {@link DataStream} of changelog entries. + * + *

Compared to {@link #toDataStream(Table)}, this method produces instances of {@link Row} + * and sets the {@link RowKind} flag that is contained in every record during runtime. The + * runtime behavior is similar to that of a {@link DynamicTableSink}. + * + *

This method can emit a changelog containing all kinds of changes (enumerated in {@link + * RowKind}) that the given updating table requires as the default {@link ChangelogMode}. Use + * {@link #toChangelogStream(Table, Schema, ChangelogMode)} to limit the kinds of changes (e.g. + * for upsert mode). + * + *

The given {@link Schema} is used to configure the table runtime to convert columns and + * internal data structures to the desired representation. The following example shows how to + * convert a table column into a POJO type. + * + *

+     *     // given a Table of (id BIGINT, payload ROW < name STRING , age INT >)
+     *
+     *     public static class MyPojo {
+     *         public String name;
+     *         public Integer age;
+     *
+     *         // default constructor for DataStream API
+     *         public MyPojo() {}
+     *
+     *         // fully assigning constructor for field order in Table API
+     *         public MyPojo(String name, Integer age) {
+     *             this.name = name;
+     *             this.age = age;
+     *         }
+     *     }
+     *
+     *     tableEnv.toChangelogStream(
+     *         table,
+     *         Schema.newBuilder()
+     *             .column("id", DataTypes.BIGINT())
+     *             .column("payload", DataTypes.of(MyPojo.class)) // force an implicit conversion
+     *             .build());
+     * 
+ * + *

Note that the type system of the table ecosystem is richer than the one of the DataStream + * API. The table runtime will make sure to properly serialize the output records to the first + * operator of the DataStream API. Afterwards, the {@link Types} semantics of the DataStream API + * need to be considered. + * + *

If the input table contains a single rowtime column, it will be propagated into a stream + * record's timestamp. Watermarks will be propagated as well. + * + *

If the rowtime should not be a concrete field in the final {@link Row} anymore, or the + * schema should be symmetrical for both {@link #fromChangelogStream} and {@link + * #toChangelogStream}, the rowtime can also be declared as a metadata column that will be + * propagated into a stream record's timestamp. It is possible to declare a schema without + * physical/regular columns. In this case, those columns will be automatically derived and + * implicitly put at the beginning of the schema declaration. + * + *

The following examples illustrate common schema declarations and their semantics: + * + *

+     *     // given a Table of (id INT, name STRING, my_rowtime TIMESTAMP_LTZ(3))
+     *
+     *     // === EXAMPLE 1 ===
+     *
+     *     // no physical columns defined, they will be derived automatically,
+     *     // the last derived physical column will be skipped in favor of the metadata column
+     *
+     *     Schema.newBuilder()
+     *         .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
+     *         .build()
+     *
+     *     // equal to: CREATE TABLE (id INT, name STRING, rowtime TIMESTAMP_LTZ(3) METADATA)
+     *
+     *     // === EXAMPLE 2 ===
+     *
+     *     // physical columns defined, all columns must be defined
+     *
+     *     Schema.newBuilder()
+     *         .column("id", "INT")
+     *         .column("name", "STRING")
+     *         .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
+     *         .build()
+     *
+     *     // equal to: CREATE TABLE (id INT, name STRING, rowtime TIMESTAMP_LTZ(3) METADATA)
+     * 
+ * + * @param table The {@link Table} to convert. It can be updating or insert-only. + * @param targetSchema The {@link Schema} that decides about the final external representation + * in {@link DataStream} records. + * @return The converted changelog stream of {@link Row}. + */ + DataStream toChangelogStream(Table table, Schema targetSchema); + + /** + * Converts the given {@link Table} into a {@link DataStream} of changelog entries. + * + *

Compared to {@link #toDataStream(Table)}, this method produces instances of {@link Row} + * and sets the {@link RowKind} flag that is contained in every record during runtime. The + * runtime behavior is similar to that of a {@link DynamicTableSink}. + * + *

This method requires an explicitly declared {@link ChangelogMode}. For example, use {@link + * ChangelogMode#upsert()} if the stream will not contain {@link RowKind#UPDATE_BEFORE}, or + * {@link ChangelogMode#insertOnly()} for non-updating streams. + * + *

Note that the type system of the table ecosystem is richer than the one of the DataStream + * API. The table runtime will make sure to properly serialize the output records to the first + * operator of the DataStream API. Afterwards, the {@link Types} semantics of the DataStream API + * need to be considered. + * + *

If the input table contains a single rowtime column, it will be propagated into a stream + * record's timestamp. Watermarks will be propagated as well. However, it is also possible to + * write out the rowtime as a metadata column. See {@link #toChangelogStream(Table, Schema)} for + * more information and examples on how to declare a {@link Schema}. + * + * @param table The {@link Table} to convert. It can be updating or insert-only. + * @param targetSchema The {@link Schema} that decides about the final external representation + * in {@link DataStream} records. + * @param changelogMode The required kinds of changes in the result changelog. An exception will + * be thrown if the given updating table cannot be represented in this changelog mode. + * @return The converted changelog stream of {@link Row}. + */ + DataStream toChangelogStream( + Table table, Schema targetSchema, ChangelogMode changelogMode); + /** * Converts the given {@link DataStream} into a {@link Table} with specified field names. * diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java index ff2ee2b48e147..4c266523ec930 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java @@ -232,28 +232,52 @@ public void registerFunction( @Override public Table fromDataStream(DataStream dataStream) { - return fromDataStreamInternal(dataStream, null, null); + return fromStreamInternal(dataStream, null, null, ChangelogMode.insertOnly()); } @Override public Table fromDataStream(DataStream dataStream, Schema schema) { Preconditions.checkNotNull(schema, "Schema must not be null."); - return fromDataStreamInternal(dataStream, schema, null); + return fromStreamInternal(dataStream, schema, null, ChangelogMode.insertOnly()); + } + + @Override + public Table fromChangelogStream(DataStream dataStream) { + return fromStreamInternal(dataStream, null, null, ChangelogMode.all()); + } + + @Override + public Table fromChangelogStream(DataStream dataStream, Schema schema) { + Preconditions.checkNotNull(schema, "Schema must not be null."); + return fromStreamInternal(dataStream, schema, null, ChangelogMode.all()); + } + + @Override + public Table fromChangelogStream( + DataStream dataStream, Schema schema, ChangelogMode changelogMode) { + Preconditions.checkNotNull(schema, "Schema must not be null."); + return fromStreamInternal(dataStream, schema, null, changelogMode); } @Override public void createTemporaryView(String path, DataStream dataStream) { - createTemporaryView(path, fromDataStreamInternal(dataStream, null, path)); + createTemporaryView( + path, fromStreamInternal(dataStream, null, path, ChangelogMode.insertOnly())); } @Override public void createTemporaryView(String path, DataStream dataStream, Schema schema) { - createTemporaryView(path, fromDataStreamInternal(dataStream, schema, path)); + createTemporaryView( + path, fromStreamInternal(dataStream, schema, path, ChangelogMode.insertOnly())); } - private Table fromDataStreamInternal( - DataStream dataStream, @Nullable Schema schema, @Nullable String viewPath) { + private Table fromStreamInternal( + DataStream dataStream, + @Nullable Schema schema, + @Nullable String viewPath, + ChangelogMode changelogMode) { Preconditions.checkNotNull(dataStream, "Data stream must not be null."); + Preconditions.checkNotNull(changelogMode, "Changelog mode must not be null."); final CatalogManager catalogManager = getCatalogManager(); final SchemaResolver schemaResolver = catalogManager.getSchemaResolver(); final OperationTreeBuilder operationTreeBuilder = getOperationTreeBuilder(); @@ -281,7 +305,7 @@ private Table fromDataStreamInternal( dataStream, schemaTranslationResult.getPhysicalDataType(), schemaTranslationResult.isTopLevelRecord(), - ChangelogMode.insertOnly(), + changelogMode, resolvedSchema); final List projections = schemaTranslationResult.getProjections(); @@ -324,28 +348,69 @@ public DataStream toDataStream(Table table, Class targetClass) { public DataStream toDataStream(Table table, AbstractDataType targetDataType) { Preconditions.checkNotNull(table, "Table must not be null."); Preconditions.checkNotNull(targetDataType, "Target data type must not be null."); - final CatalogManager catalogManager = getCatalogManager(); - final SchemaResolver schemaResolver = catalogManager.getSchemaResolver(); - final OperationTreeBuilder operationTreeBuilder = getOperationTreeBuilder(); final ExternalSchemaTranslator.OutputResult schemaTranslationResult = ExternalSchemaTranslator.fromInternal( - catalogManager.getDataTypeFactory(), + getCatalogManager().getDataTypeFactory(), table.getResolvedSchema(), targetDataType); - final List projections = schemaTranslationResult.getProjections(); - final QueryOperation projectOperation; - if (projections == null) { - projectOperation = table.getQueryOperation(); - } else { - projectOperation = - operationTreeBuilder.project( - projections.stream() - .map(ApiExpressionUtils::unresolvedRef) - .collect(Collectors.toList()), - table.getQueryOperation()); - } + return toStreamInternal(table, schemaTranslationResult, ChangelogMode.insertOnly()); + } + + @Override + public DataStream toChangelogStream(Table table) { + Preconditions.checkNotNull(table, "Table must not be null."); + + final ExternalSchemaTranslator.OutputResult schemaTranslationResult = + ExternalSchemaTranslator.fromInternal(table.getResolvedSchema(), null); + + return toStreamInternal(table, schemaTranslationResult, null); + } + + @Override + public DataStream toChangelogStream(Table table, Schema targetSchema) { + Preconditions.checkNotNull(table, "Table must not be null."); + Preconditions.checkNotNull(targetSchema, "Target schema must not be null."); + + final ExternalSchemaTranslator.OutputResult schemaTranslationResult = + ExternalSchemaTranslator.fromInternal(table.getResolvedSchema(), targetSchema); + + return toStreamInternal(table, schemaTranslationResult, null); + } + + @Override + public DataStream toChangelogStream( + Table table, Schema targetSchema, ChangelogMode changelogMode) { + Preconditions.checkNotNull(table, "Table must not be null."); + Preconditions.checkNotNull(targetSchema, "Target schema must not be null."); + Preconditions.checkNotNull(changelogMode, "Changelog mode must not be null."); + + final ExternalSchemaTranslator.OutputResult schemaTranslationResult = + ExternalSchemaTranslator.fromInternal(table.getResolvedSchema(), targetSchema); + + return toStreamInternal(table, schemaTranslationResult, changelogMode); + } + + private DataStream toStreamInternal( + Table table, + ExternalSchemaTranslator.OutputResult schemaTranslationResult, + @Nullable ChangelogMode changelogMode) { + final CatalogManager catalogManager = getCatalogManager(); + final SchemaResolver schemaResolver = catalogManager.getSchemaResolver(); + final OperationTreeBuilder operationTreeBuilder = getOperationTreeBuilder(); + + final QueryOperation projectOperation = + schemaTranslationResult + .getProjections() + .map( + projections -> + operationTreeBuilder.project( + projections.stream() + .map(ApiExpressionUtils::unresolvedRef) + .collect(Collectors.toList()), + table.getQueryOperation())) + .orElseGet(table::getQueryOperation); final ResolvedSchema resolvedSchema = schemaResolver.resolve(schemaTranslationResult.getSchema()); @@ -361,13 +426,15 @@ public DataStream toDataStream(Table table, AbstractDataType targetDat objectIdentifier, projectOperation, resolvedSchema, - ChangelogMode.insertOnly(), - schemaTranslationResult.getPhysicalDataType()); + changelogMode, + schemaTranslationResult + .getPhysicalDataType() + .orElseGet(resolvedSchema::toPhysicalRowDataType)); - return toDataStreamInternal(table, modifyOperation); + return toStreamInternal(table, modifyOperation); } - private DataStream toDataStreamInternal(Table table, ModifyOperation modifyOperation) { + private DataStream toStreamInternal(Table table, ModifyOperation modifyOperation) { final List> transformations = planner.translate(Collections.singletonList(modifyOperation)); @@ -441,7 +508,7 @@ public DataStream toAppendStream(Table table, TypeInformation typeInfo table.getQueryOperation(), TypeConversions.fromLegacyInfoToDataType(typeInfo), OutputConversionModifyOperation.UpdateMode.APPEND); - return toDataStreamInternal(table, modifyOperation); + return toStreamInternal(table, modifyOperation); } @Override @@ -458,7 +525,7 @@ public DataStream> toRetractStream( table.getQueryOperation(), wrapWithChangeFlag(typeInfo), OutputConversionModifyOperation.UpdateMode.RETRACT); - return toDataStreamInternal(table, modifyOperation); + return toStreamInternal(table, modifyOperation); } @Override diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ExternalSchemaTranslator.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ExternalSchemaTranslator.java index 92ff0ef21f02b..5c47c880b8468 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ExternalSchemaTranslator.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ExternalSchemaTranslator.java @@ -23,25 +23,31 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Schema.UnresolvedColumn; +import org.apache.flink.table.api.Schema.UnresolvedMetadataColumn; import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn; +import org.apache.flink.table.api.Schema.UnresolvedPrimaryKey; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.types.AbstractDataType; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.StructuredType; import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; +import org.apache.flink.table.types.utils.DataTypeUtils; import org.apache.flink.table.types.utils.TypeInfoDataTypeConverter; import javax.annotation.Nullable; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasFamily; import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot; import static org.apache.flink.table.types.utils.DataTypeUtils.flattenToDataTypes; import static org.apache.flink.table.types.utils.DataTypeUtils.flattenToNames; @@ -56,7 +62,48 @@ public final class ExternalSchemaTranslator { /** * Converts the given {@link DataType} into the final {@link OutputResult}. * - *

Currently, this method serves only the following use case: + *

This method serves three types of use cases: + * + *

    + *
  • 1. Derive physical columns from the input schema. + *
  • 2. Derive physical columns from the input schema but enrich with metadata column. + *
  • 3. Entirely use declared schema. + *
+ */ + public static OutputResult fromInternal( + ResolvedSchema inputSchema, @Nullable Schema declaredSchema) { + + // no schema has been declared by the user, + // the schema will be entirely derived from the input + if (declaredSchema == null) { + // go through data type to erase time attributes + final DataType physicalDataType = inputSchema.toSourceRowDataType(); + final Schema schema = Schema.newBuilder().fromRowDataType(physicalDataType).build(); + return new OutputResult(null, schema, null); + } + + final List declaredColumns = declaredSchema.getColumns(); + + // the declared schema does not contain physical information, + // thus, it only replaces physical columns with metadata rowtime + if (declaredColumns.stream().noneMatch(ExternalSchemaTranslator::isPhysical)) { + // go through data type to erase time attributes + final DataType sourceDataType = inputSchema.toSourceRowDataType(); + final DataType physicalDataType = + patchDataTypeWithoutMetadataRowtime(sourceDataType, declaredColumns); + final Schema.Builder builder = Schema.newBuilder(); + builder.fromRowDataType(physicalDataType); + builder.fromColumns(declaredColumns); + return new OutputResult(null, builder.build(), null); + } + + return new OutputResult(null, declaredSchema, null); + } + + /** + * Converts the given {@link DataType} into the final {@link OutputResult}. + * + *

This method serves one type of use case: * *

    *
  • 1. Derive physical columns from the input data type. @@ -117,17 +164,18 @@ public static InputResult fromExternal( // the schema will be entirely derived from the input if (declaredSchema == null) { final Schema.Builder builder = Schema.newBuilder(); - addPhysicalDataTypeFields(builder, inputDataType); + addPhysicalSourceDataTypeFields(builder, inputDataType, null); return new InputResult(inputDataType, isTopLevelRecord, builder.build(), null); } final List declaredColumns = declaredSchema.getColumns(); + final UnresolvedPrimaryKey declaredPrimaryKey = declaredSchema.getPrimaryKey().orElse(null); // the declared schema does not contain physical information, // thus, it only enriches the non-physical column parts if (declaredColumns.stream().noneMatch(ExternalSchemaTranslator::isPhysical)) { final Schema.Builder builder = Schema.newBuilder(); - addPhysicalDataTypeFields(builder, inputDataType); + addPhysicalSourceDataTypeFields(builder, inputDataType, declaredPrimaryKey); builder.fromSchema(declaredSchema); return new InputResult(inputDataType, isTopLevelRecord, builder.build(), null); } @@ -142,6 +190,38 @@ public static InputResult fromExternal( return new InputResult(patchedDataType, isTopLevelRecord, patchedSchema, projections); } + // -------------------------------------------------------------------------------------------- + // Helper methods + // -------------------------------------------------------------------------------------------- + + private static DataType patchDataTypeWithoutMetadataRowtime( + DataType dataType, List declaredColumns) { + // best effort logic to remove a rowtime column at the end of a derived schema, + // this enables 100% symmetrical API calls for from/toChangelogStream, + // otherwise the full schema must be declared + final List columnDataTypes = dataType.getChildren(); + final int columnCount = columnDataTypes.size(); + final long persistedMetadataCount = + declaredColumns.stream() + .filter(c -> c instanceof UnresolvedMetadataColumn) + .map(UnresolvedMetadataColumn.class::cast) + .filter(c -> !c.isVirtual()) + .count(); + // we only truncate if exactly one persisted metadata column exists + if (persistedMetadataCount != 1L || columnCount < 1) { + return dataType; + } + // we only truncate timestamps + if (!hasFamily( + columnDataTypes.get(columnCount - 1).getLogicalType(), + LogicalTypeFamily.TIMESTAMP)) { + return dataType; + } + // truncate last field + final int[] indices = IntStream.range(0, columnCount - 1).toArray(); + return DataTypeUtils.projectRow(dataType, indices); + } + private static @Nullable List extractProjections( Schema patchedSchema, Schema declaredSchema) { final List patchedColumns = @@ -164,7 +244,7 @@ private static Schema createPatchedSchema( // physical columns if (isTopLevelRecord) { - addPhysicalDataTypeFields(builder, patchedDataType); + addPhysicalSourceDataTypeFields(builder, patchedDataType, null); } else { builder.column( LogicalTypeUtils.getAtomicName(Collections.emptyList()), patchedDataType); @@ -304,10 +384,30 @@ private static DataTypes.Field[] patchFields( .toArray(DataTypes.Field[]::new); } - private static void addPhysicalDataTypeFields(Schema.Builder builder, DataType dataType) { - final List fieldDataTypes = flattenToDataTypes(dataType); + private static void addPhysicalSourceDataTypeFields( + Schema.Builder builder, DataType dataType, @Nullable UnresolvedPrimaryKey primaryKey) { final List fieldNames = flattenToNames(dataType); - builder.fromFields(fieldNames, fieldDataTypes); + final List fieldDataTypes = flattenToDataTypes(dataType); + + // fields of a Row class are always nullable, which causes problems in the primary key + // validation, it would force users to always fully declare the schema for row types and + // would make the automatic schema derivation less useful, we therefore patch the + // nullability for primary key columns + final List fieldDataTypesWithPatchedNullability = + IntStream.range(0, fieldNames.size()) + .mapToObj( + pos -> { + final String fieldName = fieldNames.get(pos); + final DataType fieldDataType = fieldDataTypes.get(pos); + if (primaryKey != null + && primaryKey.getColumnNames().contains(fieldName)) { + return fieldDataTypes.get(pos).notNull(); + } + return fieldDataType; + }) + .collect(Collectors.toList()); + + builder.fromFields(fieldNames, fieldDataTypesWithPatchedNullability); } private static boolean isPhysical(UnresolvedColumn column) { @@ -397,26 +497,30 @@ public static final class OutputResult { /** * Data type expected from the first external operator after output conversion. The data * type might not be a row type and can possibly be nullable. + * + *

    Null if equal to {@link ResolvedSchema#toPhysicalRowDataType()}. */ - private final DataType physicalDataType; + private final @Nullable DataType physicalDataType; private OutputResult( - @Nullable List projections, Schema schema, DataType physicalDataType) { + @Nullable List projections, + Schema schema, + @Nullable DataType physicalDataType) { this.projections = projections; this.schema = schema; this.physicalDataType = physicalDataType; } - public @Nullable List getProjections() { - return projections; + public Optional> getProjections() { + return Optional.ofNullable(projections); } public Schema getSchema() { return schema; } - public DataType getPhysicalDataType() { - return physicalDataType; + public Optional getPhysicalDataType() { + return Optional.ofNullable(physicalDataType); } } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExternalModifyOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExternalModifyOperation.java index 85ab379d19ddb..26a45ea8e1373 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExternalModifyOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExternalModifyOperation.java @@ -25,9 +25,12 @@ import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.types.DataType; +import javax.annotation.Nullable; + import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; /** Internal operation used to convert a {@link Table} into a DataStream. */ @@ -42,7 +45,8 @@ public final class ExternalModifyOperation implements ModifyOperation { private final ResolvedSchema resolvedSchema; - private final ChangelogMode changelogMode; + /** Null if changelog mode is derived from input. */ + private final @Nullable ChangelogMode changelogMode; private final DataType physicalDataType; @@ -76,8 +80,8 @@ public DataType getPhysicalDataType() { return physicalDataType; } - public ChangelogMode getChangelogMode() { - return changelogMode; + public Optional getChangelogMode() { + return Optional.ofNullable(changelogMode); } public ResolvedSchema getResolvedSchema() { diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/ExternalSchemaTranslatorTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/ExternalSchemaTranslatorTest.java index 5fc0ec7d03133..7c565d0a6dcf9 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/ExternalSchemaTranslatorTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/ExternalSchemaTranslatorTest.java @@ -23,6 +23,9 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ExternalSchemaTranslator.InputResult; +import org.apache.flink.table.catalog.ExternalSchemaTranslator.OutputResult; +import org.apache.flink.table.data.StringData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.utils.DataTypeFactoryMock; import org.apache.flink.types.Row; @@ -49,7 +52,7 @@ public void testInputFromRow() { final TypeInformation inputTypeInfo = Types.ROW(Types.ROW(Types.INT, Types.BOOLEAN), Types.ENUM(DayOfWeek.class)); - final ExternalSchemaTranslator.InputResult result = + final InputResult result = ExternalSchemaTranslator.fromExternal( dataTypeFactoryWithRawType(DayOfWeek.class), inputTypeInfo, null); @@ -82,7 +85,7 @@ public void testInputFromRow() { } @Test - public void testOutputToRow() { + public void testOutputToRowDataType() { final ResolvedSchema inputSchema = ResolvedSchema.of( Column.physical("c", DataTypes.INT()), @@ -95,11 +98,11 @@ public void testOutputToRow() { DataTypes.FIELD("b", DataTypes.DOUBLE()), DataTypes.FIELD("c", DataTypes.INT())); - final ExternalSchemaTranslator.OutputResult result = + final OutputResult result = ExternalSchemaTranslator.fromInternal( dataTypeFactory(), inputSchema, physicalDataType); - assertEquals(Arrays.asList("a", "b", "c"), result.getProjections()); + assertEquals(Optional.of(Arrays.asList("a", "b", "c")), result.getProjections()); assertEquals( Schema.newBuilder() @@ -109,14 +112,14 @@ public void testOutputToRow() { .build(), result.getSchema()); - assertEquals(physicalDataType, result.getPhysicalDataType()); + assertEquals(Optional.of(physicalDataType), result.getPhysicalDataType()); } @Test public void testInputFromAtomic() { final TypeInformation inputTypeInfo = Types.GENERIC(Row.class); - final ExternalSchemaTranslator.InputResult result = + final InputResult result = ExternalSchemaTranslator.fromExternal( dataTypeFactoryWithRawType(Row.class), inputTypeInfo, null); @@ -132,25 +135,25 @@ public void testInputFromAtomic() { } @Test - public void testOutputToAtomic() { + public void testOutputToAtomicDataType() { final ResolvedSchema inputSchema = ResolvedSchema.of(Column.physical("a", DataTypes.INT())); - final ExternalSchemaTranslator.OutputResult result = + final OutputResult result = ExternalSchemaTranslator.fromInternal( dataTypeFactory(), inputSchema, DataTypes.INT()); - assertNull(result.getProjections()); + assertEquals(Optional.empty(), result.getProjections()); assertEquals(Schema.newBuilder().column("f0", DataTypes.INT()).build(), result.getSchema()); - assertEquals(DataTypes.INT(), result.getPhysicalDataType()); + assertEquals(Optional.of(DataTypes.INT()), result.getPhysicalDataType()); } @Test public void testInputFromRowWithNonPhysicalDeclaredSchema() { final TypeInformation inputTypeInfo = Types.ROW(Types.INT, Types.LONG); - final ExternalSchemaTranslator.InputResult result = + final InputResult result = ExternalSchemaTranslator.fromExternal( dataTypeFactory(), inputTypeInfo, @@ -171,7 +174,7 @@ public void testInputFromRowWithNonPhysicalDeclaredSchema() { assertEquals( Schema.newBuilder() - .column("f0", DataTypes.INT()) + .column("f0", DataTypes.INT().notNull()) // not null due to primary key .column("f1", DataTypes.BIGINT()) .columnByExpression("computed", "f1 + 42") .columnByExpression("computed2", "f1 - 1") @@ -187,7 +190,7 @@ public void testInputFromRowWithPhysicalDeclaredSchema() { final TypeInformation inputTypeInfo = Types.ROW(Types.INT, Types.LONG, Types.GENERIC(BigDecimal.class), Types.BOOLEAN); - final ExternalSchemaTranslator.InputResult result = + final InputResult result = ExternalSchemaTranslator.fromExternal( dataTypeFactoryWithRawType(BigDecimal.class), inputTypeInfo, @@ -231,7 +234,7 @@ public void testInputFromRowWithPhysicalDeclaredSchema() { public void testInputFromAtomicWithPhysicalDeclaredSchema() { final TypeInformation inputTypeInfo = Types.GENERIC(Row.class); - final ExternalSchemaTranslator.InputResult result = + final InputResult result = ExternalSchemaTranslator.fromExternal( dataTypeFactoryWithRawType(Row.class), inputTypeInfo, @@ -285,6 +288,81 @@ public void testInvalidDeclaredSchemaColumn() { } } + @Test + public void testOutputToNoSchema() { + final ResolvedSchema tableSchema = + ResolvedSchema.of( + Column.physical("id", DataTypes.BIGINT()), + Column.metadata("rowtime", DataTypes.TIMESTAMP_LTZ(3), null, false), + Column.physical("name", DataTypes.STRING())); + + final OutputResult result = ExternalSchemaTranslator.fromInternal(tableSchema, null); + + assertEquals(Optional.empty(), result.getProjections()); + + assertEquals( + Schema.newBuilder() + .column("id", DataTypes.BIGINT()) + .column("rowtime", DataTypes.TIMESTAMP_LTZ(3)) // becomes physical + .column("name", DataTypes.STRING()) + .build(), + result.getSchema()); + + assertEquals(Optional.empty(), result.getPhysicalDataType()); + } + + @Test + public void testOutputToMetadataSchema() { + final ResolvedSchema tableSchema = + ResolvedSchema.of( + Column.physical("id", DataTypes.BIGINT()), + Column.physical("name", DataTypes.STRING()), + Column.metadata("rowtime", DataTypes.TIMESTAMP_LTZ(3), null, false)); + + final OutputResult result = + ExternalSchemaTranslator.fromInternal( + tableSchema, + Schema.newBuilder() + .columnByExpression("computed", "f1 + 42") + .columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3)) + .build()); + + assertEquals( + Schema.newBuilder() + .column("id", DataTypes.BIGINT()) + .column("name", DataTypes.STRING()) + .columnByExpression("computed", "f1 + 42") + .columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3)) // becomes metadata + .build(), + result.getSchema()); + } + + @Test + public void testOutputToDeclaredSchema() { + final ResolvedSchema tableSchema = + ResolvedSchema.of( + Column.physical("id", DataTypes.BIGINT()), + Column.physical("rowtime", DataTypes.TIMESTAMP_LTZ(3)), + Column.physical("name", DataTypes.STRING())); + + final OutputResult result = + ExternalSchemaTranslator.fromInternal( + tableSchema, + Schema.newBuilder() + .column("id", DataTypes.BIGINT()) + .columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3)) + .column("name", DataTypes.STRING().bridgedTo(StringData.class)) + .build()); + + assertEquals( + Schema.newBuilder() + .column("id", DataTypes.BIGINT()) + .columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3)) + .column("name", DataTypes.STRING().bridgedTo(StringData.class)) + .build(), + result.getSchema()); + } + private static DataTypeFactory dataTypeFactoryWithRawType(Class rawType) { final DataTypeFactoryMock dataTypeFactory = new DataTypeFactoryMock(); dataTypeFactory.dataType = Optional.of(DataTypeFactoryMock.dummyRaw(rawType)); diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala index d813f75918864..ff726d35a34ac 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala @@ -24,6 +24,9 @@ import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl import org.apache.flink.table.api.{TableEnvironment, _} +import org.apache.flink.table.connector.ChangelogMode +import org.apache.flink.table.connector.sink.DynamicTableSink +import org.apache.flink.table.connector.source.DynamicTableSource import org.apache.flink.table.descriptors.{ConnectorDescriptor, StreamTableDescriptor} import org.apache.flink.table.expressions.Expression import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction} @@ -196,7 +199,7 @@ trait StreamTableEnvironment extends TableEnvironment { * // physical columns will be derived automatically * * Schema.newBuilder() - * .columnByMetadata("rowtime", "TIMESTAMP(3)") // extract timestamp into a table column + * .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)") // extract timestamp into a column * .watermark("rowtime", "SOURCE_WATERMARK()") // declare watermarks propagation * .build() * @@ -210,12 +213,120 @@ trait StreamTableEnvironment extends TableEnvironment { * }}} * * @param dataStream The [[DataStream]] to be converted. - * @param schema customized schema for the final table. + * @param schema The customized schema for the final table. * @tparam T The external type of the [[DataStream]]. * @return The converted [[Table]]. */ def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table + /** + * Converts the given [[DataStream]] of changelog entries into a [[Table]]. + * + * Compared to [[fromDataStream(DataStream)]], this method consumes instances of [[Row]] and + * evaluates the [[RowKind]] flag that is contained in every record during runtime. + * The runtime behavior is similar to that of a [[DynamicTableSource]]. + * + * This method expects a changelog containing all kinds of changes (enumerated in [[RowKind]]) + * as the default [[ChangelogMode]]. Use [[fromChangelogStream(DataStream, Schema, + * ChangelogMode)]] to limit the kinds of changes (e.g. for upsert mode). + * + * Column names and types of the [[Table]] are automatically derived from the [[TypeInformation]] + * of the [[DataStream]]. If the outermost record's [[TypeInformation]] is a [[CompositeType]], + * it will be flattened in the first level. [[TypeInformation]] that cannot be represented as one + * of the listed [[DataTypes]] will be treated as a black-box + * [[DataTypes.RAW(Class, TypeSerializer)]] type. Thus, composite nested fields will not be + * accessible. + * + * By default, the stream record's timestamp and watermarks are not propagated unless + * explicitly declared via [[fromChangelogStream(DataStream, Schema)]]. + * + * @param dataStream The changelog stream of [[Row]]. + * @return The converted [[Table]]. + */ + def fromChangelogStream(dataStream: DataStream[Row]): Table + + /** + * Converts the given [[DataStream]] of changelog entries into a [[Table]]. + * + * Compared to [[fromDataStream(DataStream)]], this method consumes instances of [[Row]] and + * evaluates the [[RowKind]] flag that is contained in every record during runtime. + * The runtime behavior is similar to that of a [[DynamicTableSource]]. + * + * This method expects a changelog containing all kinds of changes (enumerated in [[RowKind]]) as + * the default [[ChangelogMode]]. Use [[fromChangelogStream(DataStream, Schema, ChangelogMode)]] + * to limit the kinds of changes (e.g. for upsert mode). + * + * Column names and types of the [[Table]] are automatically derived from the [[TypeInformation]] + * of the [[DataStream]]. If the outermost record's [[TypeInformation]] is a [[CompositeType]], + * it will be flattened in the first level. [[TypeInformation]] that cannot be represented as one + * of the listed [[DataTypes]] will be treated as a black-box + * [[DataTypes.RAW(Class, TypeSerializer)]] type. Thus, composite nested fields will not be + * accessible. + * + * By default, the stream record's timestamp and watermarks are not propagated unless explicitly + * declared. + * + * This method allows to declare a [[Schema]] for the resulting table. The declaration is + * similar to a `CREATE TABLE` DDL in SQL and allows to: + * + * - enrich or overwrite automatically derived columns with a custom [[DataType]] + * - reorder columns + * - add computed or metadata columns next to the physical columns + * - access a stream record's timestamp + * - declare a watermark strategy or propagate the [[DataStream]] watermarks + * - declare a primary key + * + * See [[fromDataStream(DataStream, Schema)]] for more information and examples on how + * to declare a [[Schema]]. + * + * @param dataStream The changelog stream of [[Row]]. + * @param schema The customized schema for the final table. + * @return The converted [[Table]]. + */ + def fromChangelogStream(dataStream: DataStream[Row], schema: Schema): Table + + /** + * Converts the given [[DataStream]] of changelog entries into a [[Table]]. + * + * Compared to [[fromDataStream(DataStream)]], this method consumes instances of [[Row]] and + * evaluates the [[RowKind]] flag that is contained in every record during runtime. + * The runtime behavior is similar to that of a [[DynamicTableSource]]. + * + * This method requires an explicitly declared [[ChangelogMode]]. For example, use + * [[ChangelogMode.upsert()]] if the stream will not contain [[RowKind.UPDATE_BEFORE]], or + * [[ChangelogMode.insertOnly()]] for non-updating streams. + * + * Column names and types of the [[Table]] are automatically derived from the [[TypeInformation]] + * of the [[DataStream]]. If the outermost record's [[TypeInformation]] is a [[CompositeType]], + * it will be flattened in the first level. [[TypeInformation]] that cannot be represented as one + * of the listed [[DataTypes]] will be treated as a black-box + * [[DataTypes.RAW(Class, TypeSerializer)]] type. Thus, composite nested fields will not be + * accessible. + * + * By default, the stream record's timestamp and watermarks are not propagated unless + * explicitly declared. + * + * This method allows to declare a [[Schema]] for the resulting table. The declaration is + * similar to a `CREATE TABLE` DDL in SQL and allows to: + * + * - enrich or overwrite automatically derived columns with a custom [[DataType]] + * - reorder columns + * - add computed or metadata columns next to the physical columns + * - access a stream record's timestamp + * - declare a watermark strategy or propagate the [[DataStream]] watermarks + * - declare a primary key + * + * See [[fromDataStream(DataStream, Schema)]] for more information and examples on how + * to declare a [[Schema]]. + * + * @param dataStream The changelog stream of [[Row]]. + * @param schema The customized schema for the final table. + * @param changelogMode The expected kinds of changes in the incoming changelog. + * @return The converted [[Table]]. + */ + def fromChangelogStream( + dataStream: DataStream[Row], schema: Schema, changelogMode: ChangelogMode): Table + /** * Creates a view from the given [[DataStream]] in a given path. * Registered tables can be referenced in SQL queries. @@ -247,7 +358,7 @@ trait StreamTableEnvironment extends TableEnvironment { * * @param path The path under which the [[DataStream]] is created. * See also the [[TableEnvironment]] class description for the format of the path. - * @param schema customized schema for the final table. + * @param schema The customized schema for the final table. * @param dataStream The [[DataStream]] out of which to create the view. * @tparam T The type of the [[DataStream]]. */ @@ -272,7 +383,7 @@ trait StreamTableEnvironment extends TableEnvironment { * If the input table contains a single rowtime column, it will be propagated into a stream * record's timestamp. Watermarks will be propagated as well. * - * @param table The [[Table]] to convert. + * @param table The [[Table]] to convert. It must be insert-only. * @return The converted [[DataStream]]. * @see [[toDataStream(Table, AbstractDataType)]] */ @@ -287,12 +398,12 @@ trait StreamTableEnvironment extends TableEnvironment { * This method is a shortcut for: * * {{{ - * tableEnv.toDataStream(table, DataTypes.of(class)) + * tableEnv.toDataStream(table, DataTypes.of(targetClass)) * }}} * * Calling this method with a class of [[Row]] will redirect to [[toDataStream(Table)]]. * - * @param table The [[Table]] to convert. + * @param table The [[Table]] to convert. It must be insert-only. * @param targetClass The [[Class]] that decides about the final external representation in * [[DataStream]] records. * @tparam T External record. @@ -327,7 +438,7 @@ trait StreamTableEnvironment extends TableEnvironment { * If the input table contains a single rowtime column, it will be propagated into a stream * record's timestamp. Watermarks will be propagated as well. * - * @param table The [[Table]] to convert. + * @param table The [[Table]] to convert. It must be insert-only. * @param targetDataType The [[DataType]] that decides about the final external * representation in [[DataStream]] records. * @tparam T External record. @@ -336,6 +447,143 @@ trait StreamTableEnvironment extends TableEnvironment { */ def toDataStream[T](table: Table, targetDataType: AbstractDataType[_]): DataStream[T] + /** + * Converts the given [[Table]] into a [[DataStream]] of changelog entries. + * + * Compared to [[toDataStream(Table)]], this method produces instances of [[Row]] + * and sets the [[RowKind]] flag that is contained in every record during runtime. The + * runtime behavior is similar to that of a [[DynamicTableSink]]. + * + * This method can emit a changelog containing all kinds of changes (enumerated in [[RowKind]]) + * that the given updating table requires as the default [[ChangelogMode]]. Use + * [[toChangelogStream(Table, Schema, ChangelogMode)]] to limit the kinds of changes (e.g. + * for upsert mode). + * + * Note that the type system of the table ecosystem is richer than the one of the DataStream + * API. The table runtime will make sure to properly serialize the output records to the first + * operator of the DataStream API. Afterwards, the [[org.apache.flink.api.common.typeinfo.Types]] + * semantics of the DataStream API need to be considered. + * + * If the input table contains a single rowtime column, it will be propagated into a stream + * record's timestamp. Watermarks will be propagated as well. + * + * @param table The [[Table]] to convert. It can be updating or insert-only. + * @return The converted changelog stream of [[Row]]. + */ + def toChangelogStream(table: Table): DataStream[Row] + + /** + * Converts the given [[Table]] into a [[DataStream]] of changelog entries. + * + * Compared to [[toDataStream(Table)]], this method produces instances of [[Row]] + * and sets the [[RowKind]] flag that is contained in every record during runtime. The + * runtime behavior is similar to that of a [[DynamicTableSink]]. + * + * This method can emit a changelog containing all kinds of changes (enumerated in [[RowKind]]) + * that the given updating table requires as the default [[ChangelogMode]]. Use + * [[toChangelogStream(Table, Schema, ChangelogMode)]] to limit the kinds of changes (e.g. + * for upsert mode). + * + * The given [[Schema]] is used to configure the table runtime to convert columns and + * internal data structures to the desired representation. The following example shows how to + * convert a table column into a POJO type. + * + * {{{ + * // given a Table of (id BIGINT, payload ROW < name STRING , age INT >) + * + * case class MyPojo(var name: String, var age: java.lang.Integer) + * + * tableEnv.toChangelogStream( + * table, + * Schema.newBuilder() + * .column("id", DataTypes.BIGINT()) + * .column("payload", DataTypes.of(classOf[MyPojo])) // force an implicit conversion + * .build()) + * }}} + * + * Note that the type system of the table ecosystem is richer than the one of the DataStream + * API. The table runtime will make sure to properly serialize the output records to the first + * operator of the DataStream API. Afterwards, the [[org.apache.flink.api.common.typeinfo.Types]] + * semantics of the DataStream API need to be considered. + * + * If the input table contains a single rowtime column, it will be propagated into a stream + * record's timestamp. Watermarks will be propagated as well. + * + * If the rowtime should not be a concrete field in the final [[Row]] anymore, or the schema + * should be symmetrical for both [[fromChangelogStream]] and [[toChangelogStream]], the rowtime + * can also be declared as a metadata column that will be propagated into a stream record's + * timestamp. It is possible to declare a schema without physical/regular columns. In this case, + * those columns will be automatically derived and implicitly put at the beginning of the schema + * declaration. + * + * The following examples illustrate common schema declarations and their semantics: + * + * {{{ + * // given a Table of (id INT, name STRING, my_rowtime TIMESTAMP_LTZ(3)) + * + * // === EXAMPLE 1 === + * + * // no physical columns defined, they will be derived automatically, + * // the last derived physical column will be skipped in favor of the metadata column + * + * Schema.newBuilder() + * .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)") + * .build() + * + * // equal to: CREATE TABLE (id INT, name STRING, rowtime TIMESTAMP_LTZ(3) METADATA) + * + * // === EXAMPLE 2 === + * + * // physical columns defined, all columns must be defined + * + * Schema.newBuilder() + * .column("id", "INT") + * .column("name", "STRING") + * .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)") + * .build() + * + * // equal to: CREATE TABLE (id INT, name STRING, rowtime TIMESTAMP_LTZ(3) METADATA) + * }}} + * + * @param table The [[Table]] to convert. It can be updating or insert-only. + * @param targetSchema The [[Schema]] that decides about the final external representation + * in [[DataStream]] records. + * @return The converted changelog stream of [[Row]]. + */ + def toChangelogStream(table: Table, targetSchema: Schema): DataStream[Row] + + /** + * Converts the given [[Table]] into a [[DataStream]] of changelog entries. + * + * Compared to [[toDataStream(Table)]], this method produces instances of [[Row]] + * and sets the [[RowKind]] flag that is contained in every record during runtime. The + * runtime behavior is similar to that of a [[DynamicTableSink]]. + * + * This method requires an explicitly declared [[ChangelogMode]]. For example, use + * [[ChangelogMode.upsert()]] if the stream will not contain [[RowKind.UPDATE_BEFORE]], or + * [[ChangelogMode.insertOnly()]] for non-updating streams. + * + * Note that the type system of the table ecosystem is richer than the one of the DataStream + * API. The table runtime will make sure to properly serialize the output records to the first + * operator of the DataStream API. Afterwards, the [[org.apache.flink.api.common.typeinfo.Types]] + * semantics of the DataStream API need to be considered. + * + * If the input table contains a single rowtime column, it will be propagated into a stream + * record's timestamp. Watermarks will be propagated as well. However, it is also possible to + * write out the rowtime as a metadata column. See [[toChangelogStream(Table, Schema)]] for + * more information and examples on how to declare a [[Schema]]. + * + * @param table The [[Table]] to convert. It can be updating or insert-only. + * @param targetSchema The [[Schema]] that decides about the final external representation + * in [[DataStream]] records. + * @param changelogMode The required kinds of changes in the result changelog. An exception will + * be thrown if the given updating table cannot be represented in this + * changelog mode. + * @return The converted changelog stream of [[Row]]. + */ + def toChangelogStream( + table: Table, targetSchema: Schema, changelogMode: ChangelogMode): DataStream[Row] + /** * Converts the given [[DataStream]] into a [[Table]] with specified field names. * diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala index 6219a29aa59d3..091b47cb95a96 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala @@ -77,31 +77,63 @@ class StreamTableEnvironmentImpl ( with org.apache.flink.table.api.bridge.scala.StreamTableEnvironment { override def fromDataStream[T](dataStream: DataStream[T]): Table = { - fromDataStreamInternal(dataStream.javaStream, null, null) + Preconditions.checkNotNull(dataStream, "Data stream must not be null.") + fromStreamInternal(dataStream.javaStream, null, null, ChangelogMode.insertOnly()) } override def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table = { - fromDataStreamInternal(dataStream.javaStream, schema, null) + Preconditions.checkNotNull(dataStream, "Data stream must not be null.") + Preconditions.checkNotNull(schema, "Schema must not be null.") + fromStreamInternal(dataStream.javaStream, schema, null, ChangelogMode.insertOnly()) + } + + override def fromChangelogStream(dataStream: DataStream[Row]): Table = { + Preconditions.checkNotNull(dataStream, "Data stream must not be null.") + fromStreamInternal(dataStream.javaStream, null, null, ChangelogMode.all()) + } + + override def fromChangelogStream(dataStream: DataStream[Row], schema: Schema): Table = { + Preconditions.checkNotNull(dataStream, "Data stream must not be null.") + Preconditions.checkNotNull(schema, "Schema must not be null.") + fromStreamInternal(dataStream.javaStream, schema, null, ChangelogMode.all()) + } + + override def fromChangelogStream( + dataStream: DataStream[Row], + schema: Schema, + changelogMode: ChangelogMode) + : Table = { + Preconditions.checkNotNull(dataStream, "Data stream must not be null.") + Preconditions.checkNotNull(schema, "Schema must not be null.") + fromStreamInternal(dataStream.javaStream, schema, null, changelogMode) } override def createTemporaryView[T]( path: String, dataStream: DataStream[T]): Unit = { - createTemporaryView(path, fromDataStreamInternal(dataStream.javaStream, null, path)) + Preconditions.checkNotNull(dataStream, "Data stream must not be null.") + createTemporaryView( + path, + fromStreamInternal(dataStream.javaStream, null, path, ChangelogMode.insertOnly())) } override def createTemporaryView[T]( path: String, dataStream: DataStream[T], schema: Schema): Unit = { - createTemporaryView(path, fromDataStreamInternal(dataStream.javaStream, schema, path)) + Preconditions.checkNotNull(dataStream, "Data stream must not be null.") + Preconditions.checkNotNull(schema, "Schema must not be null.") + createTemporaryView( + path, + fromStreamInternal(dataStream.javaStream, schema, path, ChangelogMode.insertOnly())) } - private def fromDataStreamInternal[T]( + private def fromStreamInternal[T]( dataStream: JDataStream[T], @Nullable schema: Schema, - @Nullable viewPath: String): Table = { - Preconditions.checkNotNull(dataStream, "Data stream must not be null.") + @Nullable viewPath: String, + changelogMode: ChangelogMode): Table = { + Preconditions.checkNotNull(changelogMode, "Changelog mode must not be null.") val catalogManager = getCatalogManager val schemaResolver = catalogManager.getSchemaResolver val operationTreeBuilder = getOperationTreeBuilder @@ -109,7 +141,7 @@ class StreamTableEnvironmentImpl ( val unresolvedIdentifier = if (viewPath != null) { getParser.parseIdentifier(viewPath) } else { - UnresolvedIdentifier.of("Unregistered_DataStream_" + dataStream.getId) + UnresolvedIdentifier.of("Unregistered_DataStream_Source_" + dataStream.getId) } val objectIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier) @@ -125,7 +157,7 @@ class StreamTableEnvironmentImpl ( dataStream, schemaTranslationResult.getPhysicalDataType, schemaTranslationResult.isTopLevelRecord, - ChangelogMode.insertOnly(), + changelogMode, resolvedSchema) val projections = schemaTranslationResult.getProjections @@ -147,7 +179,8 @@ class StreamTableEnvironmentImpl ( override def toDataStream(table: Table): DataStream[Row] = { Preconditions.checkNotNull(table, "Table must not be null.") - val sourceType = table.getResolvedSchema.toSinkRowDataType + // include all columns of the query (incl. metadata and computed columns) + val sourceType = table.getResolvedSchema.toSourceRowDataType toDataStream(table, sourceType) } @@ -165,26 +198,72 @@ class StreamTableEnvironmentImpl ( override def toDataStream[T](table: Table, targetDataType: AbstractDataType[_]): DataStream[T] = { Preconditions.checkNotNull(table, "Table must not be null.") Preconditions.checkNotNull(targetDataType, "Target data type must not be null.") - val catalogManager = getCatalogManager - val schemaResolver = catalogManager.getSchemaResolver - val operationTreeBuilder = getOperationTreeBuilder val schemaTranslationResult = ExternalSchemaTranslator.fromInternal( catalogManager.getDataTypeFactory, table.getResolvedSchema, targetDataType) - val projections = schemaTranslationResult.getProjections - val projectOperation = if (projections == null) { - table.getQueryOperation - } else { + toStreamInternal(table, schemaTranslationResult, ChangelogMode.insertOnly()) + } + + override def toChangelogStream(table: Table): DataStream[Row] = { + Preconditions.checkNotNull(table, "Table must not be null.") + + val schemaTranslationResult = ExternalSchemaTranslator.fromInternal( + table.getResolvedSchema, + null) + + toStreamInternal(table, schemaTranslationResult, null) + } + + override def toChangelogStream(table: Table, targetSchema: Schema): DataStream[Row] = { + Preconditions.checkNotNull(table, "Table must not be null.") + Preconditions.checkNotNull(targetSchema, "Target schema must not be null.") + + val schemaTranslationResult = ExternalSchemaTranslator.fromInternal( + table.getResolvedSchema, + targetSchema) + + toStreamInternal(table, schemaTranslationResult, null) + } + + override def toChangelogStream( + table: Table, + targetSchema: Schema, + changelogMode: ChangelogMode) + : DataStream[Row] = { + Preconditions.checkNotNull(table, "Table must not be null.") + Preconditions.checkNotNull(targetSchema, "Target schema must not be null.") + Preconditions.checkNotNull(changelogMode, "Changelog mode must not be null.") + + val schemaTranslationResult = ExternalSchemaTranslator.fromInternal( + table.getResolvedSchema, + targetSchema) + + toStreamInternal(table, schemaTranslationResult, changelogMode) + } + + private def toStreamInternal[T]( + table: Table, + schemaTranslationResult: ExternalSchemaTranslator.OutputResult, + @Nullable changelogMode: ChangelogMode) + : DataStream[T] = { + val catalogManager = getCatalogManager + val schemaResolver = catalogManager.getSchemaResolver + val operationTreeBuilder = getOperationTreeBuilder + + val optionalProjections = schemaTranslationResult.getProjections + val projectOperation = if (optionalProjections.isPresent) { + val projections = optionalProjections.get operationTreeBuilder.project( - util.Arrays.asList( - projections - .asScala - .map(ApiExpressionUtils.unresolvedRef) - .toArray), + projections.asScala + .map(ApiExpressionUtils.unresolvedRef) + .map(_.asInstanceOf[Expression]) + .asJava, table.getQueryOperation) + } else { + table.getQueryOperation } val resolvedSchema = schemaResolver.resolve(schemaTranslationResult.getSchema) @@ -195,15 +274,16 @@ class StreamTableEnvironmentImpl ( val modifyOperation = new ExternalModifyOperation( objectIdentifier, - table.getQueryOperation, + projectOperation, resolvedSchema, - ChangelogMode.insertOnly(), - schemaTranslationResult.getPhysicalDataType) + changelogMode, + schemaTranslationResult.getPhysicalDataType + .orElse(resolvedSchema.toPhysicalRowDataType)) - toDataStreamInternal(table, modifyOperation) + toStreamInternal(table, modifyOperation) } - private def toDataStreamInternal[T]( + private def toStreamInternal[T]( table: Table, modifyOperation: ModifyOperation) : DataStream[T] = { @@ -242,7 +322,7 @@ class StreamTableEnvironmentImpl ( table.getQueryOperation, TypeConversions.fromLegacyInfoToDataType(returnType), OutputConversionModifyOperation.UpdateMode.APPEND) - toDataStreamInternal[T](table, modifyOperation) + toStreamInternal[T](table, modifyOperation) } override def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, T)] = { @@ -252,7 +332,7 @@ class StreamTableEnvironmentImpl ( table.getQueryOperation, TypeConversions.fromLegacyInfoToDataType(returnType), OutputConversionModifyOperation.UpdateMode.RETRACT) - toDataStreamInternal(table, modifyOperation) + toStreamInternal(table, modifyOperation) } override def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java index c8d59a735f88f..c4e3d1cc208e5 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java @@ -38,6 +38,21 @@ public final class ChangelogMode { private static final ChangelogMode INSERT_ONLY = ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).build(); + private static final ChangelogMode UPSERT = + ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .build(); + + private static final ChangelogMode ALL = + ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.UPDATE_BEFORE) + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .build(); + private final Set kinds; private ChangelogMode(Set kinds) { @@ -51,6 +66,19 @@ public static ChangelogMode insertOnly() { return INSERT_ONLY; } + /** + * Shortcut for an upsert changelog that describes idempotent updates on a key and thus does not + * contain {@link RowKind#UPDATE_BEFORE} rows. + */ + public static ChangelogMode upsert() { + return UPSERT; + } + + /** Shortcut for a changelog that can contain all {@link RowKind}s. */ + public static ChangelogMode all() { + return ALL; + } + /** Builder for configuring and creating instances of {@link ChangelogMode}. */ public static Builder newBuilder() { return new Builder(); diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java index 6dbd42393bc57..b732265ba61b7 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java @@ -138,7 +138,7 @@ public static RelNode convertExternalToRel( final ResolvedCatalogTable catalogTable = new ResolvedCatalogTable(unresolvedTable, schema); final DynamicTableSink tableSink = new ExternalDynamicSink( - externalModifyOperation.getChangelogMode(), + externalModifyOperation.getChangelogMode().orElse(null), externalModifyOperation.getPhysicalDataType()); return convertSinkToRel( relBuilder, diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/ExternalDynamicSink.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/ExternalDynamicSink.java index 6e64b811b9010..e916744d843a8 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/ExternalDynamicSink.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/ExternalDynamicSink.java @@ -22,8 +22,10 @@ import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.operators.sink.OutputConversionOperator; import org.apache.flink.table.runtime.typeutils.ExternalTypeInfo; @@ -31,21 +33,38 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + /** Table sink for connecting to the external {@link DataStream} API. */ @Internal -final class ExternalDynamicSink implements DynamicTableSink { +final class ExternalDynamicSink implements DynamicTableSink, SupportsWritingMetadata { + + private static final String ROWTIME_METADATA_KEY = "rowtime"; - private final ChangelogMode changelogMode; + private static final DataType ROWTIME_METADATA_DATA_TYPE = DataTypes.TIMESTAMP_LTZ(3).notNull(); + + private final @Nullable ChangelogMode changelogMode; private final DataType physicalDataType; - ExternalDynamicSink(ChangelogMode changelogMode, DataType physicalDataType) { + // mutable attributes + + private boolean consumeRowtimeMetadata; + + ExternalDynamicSink(@Nullable ChangelogMode changelogMode, DataType physicalDataType) { this.changelogMode = changelogMode; this.physicalDataType = physicalDataType; } @Override public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + if (changelogMode == null) { + return requestedMode; + } return changelogMode; } @@ -73,14 +92,17 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { new OutputConversionOperator( atomicFieldGetter, physicalConverter, - transformationContext.getRowtimeIndex()), + transformationContext.getRowtimeIndex(), + consumeRowtimeMetadata), ExternalTypeInfo.of(physicalDataType), input.getParallelism()); }; } private String generateOperatorName() { - return String.format("TableToDataSteam(type=%s)", physicalDataType.toString()); + return String.format( + "TableToDataSteam(type=%s, rowtime=%s)", + physicalDataType.toString(), consumeRowtimeMetadata); } @Override @@ -92,4 +114,14 @@ public DynamicTableSink copy() { public String asSummaryString() { return generateOperatorName(); } + + @Override + public Map listWritableMetadata() { + return Collections.singletonMap(ROWTIME_METADATA_KEY, ROWTIME_METADATA_DATA_TYPE); + } + + @Override + public void applyWritableMetadata(List metadataKeys, DataType consumedDataType) { + consumeRowtimeMetadata = metadataKeys.contains(ROWTIME_METADATA_KEY); + } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/ExternalDynamicSource.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/ExternalDynamicSource.java index 3a81f9baa6f9e..c15535727bbb4 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/ExternalDynamicSource.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/connectors/ExternalDynamicSource.java @@ -59,7 +59,7 @@ final class ExternalDynamicSource // mutable attributes - private boolean attachRowtime; + private boolean produceRowtimeMetadata; private boolean propagateWatermark; @@ -81,7 +81,7 @@ public DynamicTableSource copy() { final ExternalDynamicSource copy = new ExternalDynamicSource<>( identifier, dataStream, physicalDataType, isTopLevelRecord, changelogMode); - copy.attachRowtime = attachRowtime; + copy.produceRowtimeMetadata = produceRowtimeMetadata; copy.propagateWatermark = propagateWatermark; return copy; } @@ -110,7 +110,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon new InputConversionOperator<>( physicalConverter, !isTopLevelRecord, - attachRowtime, + produceRowtimeMetadata, propagateWatermark, changelogMode.containsOnly(RowKind.INSERT)), null, // will be filled by the framework @@ -124,7 +124,7 @@ private String generateOperatorName() { "DataSteamToTable(stream=%s, type=%s, rowtime=%s, watermark=%s)", identifier.asSummaryString(), physicalDataType.toString(), - attachRowtime, + produceRowtimeMetadata, propagateWatermark); } @@ -135,7 +135,7 @@ public Map listReadableMetadata() { @Override public void applyReadableMetadata(List metadataKeys, DataType producedDataType) { - attachRowtime = metadataKeys.contains(ROWTIME_METADATA_KEY); + produceRowtimeMetadata = metadataKeys.contains(ROWTIME_METADATA_KEY); } @Override diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java index 5f3cbe668803e..9ead35740ce5d 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java @@ -38,11 +38,14 @@ import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.WatermarkSpec; +import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.expressions.utils.ResolvedExpressionMock; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RawType; import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.types.Either; import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.CollectionUtil; @@ -288,6 +291,150 @@ public void testFromAndToDataStreamEventTime() throws Exception { Row.of("c", 1000)); } + @Test + public void testFromAndToChangelogStreamEventTime() throws Exception { + final DataStream> dataStream = getWatermarkedDataStream(); + + final DataStream changelogStream = + dataStream + .map(t -> Row.ofKind(RowKind.INSERT, t.f1, t.f2)) + .returns(Types.ROW(Types.INT, Types.STRING)); + + // derive physical columns and add a rowtime + final Table table = + tableEnv.fromChangelogStream( + changelogStream, + Schema.newBuilder() + .columnByMetadata("rowtime", DataTypes.TIMESTAMP(3)) + .watermark("rowtime", "SOURCE_WATERMARK()") + .build()); + tableEnv.createTemporaryView("t", table); + + // access and reorder columns + final Table reordered = tableEnv.sqlQuery("SELECT f1, rowtime, f0 FROM t"); + + // write out the rowtime column with fully declared schema + final DataStream result = + tableEnv.toChangelogStream( + reordered, + Schema.newBuilder() + .column("f1", DataTypes.STRING()) + .columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3)) + .column("f0", DataTypes.INT()) + .build()); + + // test event time window and field access + testResult( + result.keyBy(k -> k.getField("f1")) + .window(TumblingEventTimeWindows.of(Time.milliseconds(5))) + .apply( + (key, window, input, out) -> { + int sum = 0; + for (Row row : input) { + sum += row.getFieldAs("f0"); + } + out.collect(Row.of(key, sum)); + }) + .returns(Types.ROW(Types.STRING, Types.INT)), + Row.of("a", 47), + Row.of("c", 1000), + Row.of("c", 1000)); + } + + @Test + public void testFromAndToChangelogStreamRetract() throws Exception { + final List> inputOrOutput = + Arrays.asList( + input(RowKind.INSERT, "bob", 0), + output(RowKind.INSERT, "bob", 0), + // -- + input(RowKind.UPDATE_BEFORE, "bob", 0), + output(RowKind.DELETE, "bob", 0), + // -- + input(RowKind.UPDATE_AFTER, "bob", 1), + output(RowKind.INSERT, "bob", 1), + // -- + input(RowKind.INSERT, "alice", 1), + output(RowKind.INSERT, "alice", 1), + // -- + input(RowKind.INSERT, "alice", 1), + output(RowKind.UPDATE_BEFORE, "alice", 1), + output(RowKind.UPDATE_AFTER, "alice", 2), + // -- + input(RowKind.UPDATE_BEFORE, "alice", 1), + output(RowKind.UPDATE_BEFORE, "alice", 2), + output(RowKind.UPDATE_AFTER, "alice", 1), + // -- + input(RowKind.UPDATE_AFTER, "alice", 2), + output(RowKind.UPDATE_BEFORE, "alice", 1), + output(RowKind.UPDATE_AFTER, "alice", 3), + // -- + input(RowKind.UPDATE_BEFORE, "alice", 2), + output(RowKind.UPDATE_BEFORE, "alice", 3), + output(RowKind.UPDATE_AFTER, "alice", 1), + // -- + input(RowKind.UPDATE_AFTER, "alice", 100), + output(RowKind.UPDATE_BEFORE, "alice", 1), + output(RowKind.UPDATE_AFTER, "alice", 101)); + + final DataStream changelogStream = env.fromElements(getInput(inputOrOutput)); + tableEnv.createTemporaryView("t", tableEnv.fromChangelogStream(changelogStream)); + + final Table result = tableEnv.sqlQuery("SELECT f0, SUM(f1) FROM t GROUP BY f0"); + + testResult(result.execute(), getOutput(inputOrOutput)); + + testResult(tableEnv.toChangelogStream(result), getOutput(inputOrOutput)); + } + + @Test + public void testFromAndToChangelogStreamUpsert() throws Exception { + final List> inputOrOutput = + Arrays.asList( + input(RowKind.INSERT, "bob", 0), + output(RowKind.INSERT, "bob", 0), + // -- + input(RowKind.UPDATE_AFTER, "bob", 1), + output(RowKind.DELETE, "bob", 0), + output(RowKind.INSERT, "bob", 1), + // -- + input(RowKind.INSERT, "alice", 1), + output(RowKind.INSERT, "alice", 1), + // -- + input(RowKind.INSERT, "alice", 1), // no impact + // -- + input(RowKind.UPDATE_AFTER, "alice", 2), + output(RowKind.DELETE, "alice", 1), + output(RowKind.INSERT, "alice", 2), + // -- + input(RowKind.UPDATE_AFTER, "alice", 100), + output(RowKind.DELETE, "alice", 2), + output(RowKind.INSERT, "alice", 100)); + + final DataStream changelogStream = env.fromElements(getInput(inputOrOutput)); + tableEnv.createTemporaryView( + "t", + tableEnv.fromChangelogStream( + changelogStream, + Schema.newBuilder().primaryKey("f0").build(), + ChangelogMode.upsert())); + + final Table result = tableEnv.sqlQuery("SELECT f0, SUM(f1) FROM t GROUP BY f0"); + + testResult(result.execute(), getOutput(inputOrOutput)); + + testResult( + tableEnv.toChangelogStream( + result, + Schema.newBuilder().primaryKey("f0").build(), + ChangelogMode.upsert()), + getOutput(inputOrOutput)); + } + + // -------------------------------------------------------------------------------------------- + // Helper methods + // -------------------------------------------------------------------------------------------- + private DataStream> getWatermarkedDataStream() { final DataStream> dataStream = env.fromCollection( @@ -303,6 +450,25 @@ private DataStream> getWatermarkedDataStream() { .withTimestampAssigner((ctx) -> (element, recordTimestamp) -> element.f0)); } + private static Either input(RowKind kind, Object... fields) { + return Either.Left(Row.ofKind(kind, fields)); + } + + private static Row[] getInput(List> inputOrOutput) { + return inputOrOutput.stream().filter(Either::isLeft).map(Either::left).toArray(Row[]::new); + } + + private static Either output(RowKind kind, Object... fields) { + return Either.Right(Row.ofKind(kind, fields)); + } + + private static Row[] getOutput(List> inputOrOutput) { + return inputOrOutput.stream() + .filter(Either::isRight) + .map(Either::right) + .toArray(Row[]::new); + } + private static void testSchema(Table table, Column... expectedColumns) { assertEquals(ResolvedSchema.of(expectedColumns), table.getResolvedSchema()); } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/OutputConversionOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/OutputConversionOperator.java index 571f06bda073d..c24775f7b6513 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/OutputConversionOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/OutputConversionOperator.java @@ -40,15 +40,19 @@ public class OutputConversionOperator extends TableStreamOperator private final int rowtimeIndex; + private final boolean consumeRowtimeMetadata; + private transient StreamRecord outRecord; public OutputConversionOperator( @Nullable RowData.FieldGetter atomicFieldGetter, DataStructureConverter converter, - int rowtimeIndex) { + int rowtimeIndex, + boolean consumeRowtimeMetadata) { this.atomicFieldGetter = atomicFieldGetter; this.converter = converter; this.rowtimeIndex = rowtimeIndex; + this.consumeRowtimeMetadata = consumeRowtimeMetadata; } @Override @@ -65,7 +69,10 @@ public void open() throws Exception { public void processElement(StreamRecord element) throws Exception { final RowData rowData = element.getValue(); - if (rowtimeIndex != -1) { + if (consumeRowtimeMetadata) { + final long rowtime = rowData.getTimestamp(rowData.getArity() - 1, 3).getMillisecond(); + outRecord.setTimestamp(rowtime); + } else if (rowtimeIndex != -1) { final long rowtime = rowData.getTimestamp(rowtimeIndex, 3).getMillisecond(); outRecord.setTimestamp(rowtime); } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/source/InputConversionOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/source/InputConversionOperator.java index 1f78a2b6d9b5e..41bca4f13c685 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/source/InputConversionOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/source/InputConversionOperator.java @@ -45,7 +45,7 @@ public final class InputConversionOperator extends TableStreamOperator extends TableStreamOperator element) throws Exception { kind)); } - if (!attachRowtime) { + if (!produceRowtimeMetadata) { output.collect(outRecord.replace(payloadRowData)); return; }