Skip to content

Commit

Permalink
[AMORO-1457]Support computed columns and watermark in Flink DDL (#2239)
Browse files Browse the repository at this point in the history
* supoort_colume and watermark for flink1.15

* separate getPhysicalSchemaForDimTable when dim_table.enabled=true

* rm ut for flink1.15

* change for review comments

* change for master conflict

* fix checkstyle

* change compute index start from1

* change method name

* change doc for computr column

* fix check style

* fix for comments

---------

Co-authored-by: Yi Xie <xieyi01@rd.netease.com>
Co-authored-by: ZhouJinsong <zhoujinsong0505@163.com>
  • Loading branch information
3 people authored Nov 27, 2023
1 parent a0b98b7 commit f11336f
Show file tree
Hide file tree
Showing 7 changed files with 658 additions and 77 deletions.
8 changes: 6 additions & 2 deletions docs/engines/flink/flink-ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ CREATE TABLE `arctic_catalog`.`arctic_db`.`test_table` (
id BIGINT,
name STRING,
op_time TIMESTAMP,
ts3 AS CAST(op_time as TIMESTAMP(3)),
watermark FOR ts3 AS ts3 - INTERVAL '5' SECOND,
proc AS PROCTIME(),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'key' = 'value'
Expand All @@ -83,8 +86,8 @@ Currently, most of the syntax supported by [Flink SQL create table](https://nigh
- PARTITION BY (column1, column2, …): configure Flink partition fields, but Flink does not yet support hidden partitions.
- PRIMARY KEY (column1, column2, …): configure primary keys.
- WITH ('key'='value', …): configure Amoro Table properties.

Currently, configuration of computed columns and watermark fields is not supported.
- computed_column_definition: column_name AS computed_column_expression. Currently, compute column must be listed after all physical columns.
- watermark_definition: WATERMARK FOR rowtime_column_name AS watermark_strategy_expression, rowtime_column_name must be of type TIMESTAMP(3).

#### PARTITIONED BY
Create a partitioned table using PARTITIONED BY.
Expand All @@ -105,6 +108,7 @@ CREATE TABLE `test_table` (
id BIGINT,
name STRING,
op_time TIMESTAMP,
proc as PROCTIME(),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'arctic',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,98 @@

package com.netease.arctic.flink;

import static org.apache.flink.table.descriptors.DescriptorProperties.DATA_TYPE;
import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
import static org.apache.flink.table.descriptors.DescriptorProperties.NAME;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME;

import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.table.PrimaryKeySpec;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.curator.shaded.com.google.common.collect.Maps;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableColumn.ComputedColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.WatermarkSpec;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/** An util that converts flink table schema. */
public class FlinkSchemaUtil {

private static final Logger LOG = LoggerFactory.getLogger(FlinkSchemaUtil.class);
public static final String FLINK_PREFIX = "flink";

public static final String COMPUTED_COLUMNS = "computed-column";

public static final String WATERMARK = "watermark";
public static final String PROCTIME_FUNCTION = SCHEMA_PROCTIME + "()";
public static final Pattern COMPUTE_PATTERN =
Pattern.compile("flink\\.computed-column\\.(\\d+)\\.name");

/**
* Convert a {@link RowType} to a {@link TableSchema}.
* Convert iceberg Schema to flink TableSchema.
*
* @param rowType a RowType
* @param primaryKeys
* @param icebergSchema
* @param tableProperties
* @return Flink TableSchema
*/
public static TableSchema toSchema(RowType rowType, List<String> primaryKeys) {
public static TableSchema toSchema(
Schema icebergSchema, List<String> primaryKeys, Map<String, String> tableProperties) {
TableSchema.Builder builder = TableSchema.builder();
RowType rowType = org.apache.iceberg.flink.FlinkSchemaUtil.convert(icebergSchema);

// add physical columns.
for (RowType.RowField field : rowType.getFields()) {
builder.field(field.getName(), TypeConversions.fromLogicalToDataType(field.getType()));
}

// add primary key
if (CollectionUtils.isNotEmpty(primaryKeys)) {
builder.primaryKey(primaryKeys.toArray(new String[0]));
}

Set<Integer> computeIndex = getComputeIndex(tableProperties);
List<String> fieldNames = rowType.getFieldNames();

// add computed columns
for (int index : computeIndex) {
builder.add(deserializeComputeColumn(tableProperties, index, fieldNames));
fieldNames.add(tableProperties.get(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, NAME)));
}

// add watermark
if (isWatermarkValid(tableProperties)) {
builder.watermark(deserializeWatermarkSpec(tableProperties, fieldNames));
}
return builder.build();
}

Expand All @@ -69,10 +118,7 @@ public static TableSchema toSchema(RowType rowType, List<String> primaryKeys) {
* com.netease.arctic.flink.table.ArcticDynamicSource} distinguish the watermark field. For now,
* it only be used in the case of Arctic as dim-table.
*/
public static TableSchema getPhysicalSchema(TableSchema tableSchema, boolean addWatermark) {
if (!addWatermark) {
return tableSchema;
}
public static TableSchema getPhysicalSchemaForDimTable(TableSchema tableSchema) {
TableSchema.Builder builder = filter(tableSchema, TableColumn::isPhysical);
tableSchema.getWatermarkSpecs().forEach(builder::watermark);
return builder.build();
Expand Down Expand Up @@ -197,4 +243,191 @@ public static void addPrimaryKey(
"Arctic primary key should be declared in table")));
});
}
/**
* Generate table properties for watermark and computed columns from flink TableSchema.
*
* @param schema Flink TableSchema.
* @return tableProperties.
*/
public static Map<String, String> generateExtraOptionsFrom(TableSchema schema) {
Map<String, String> properties = Maps.newHashMap();

// add properties for computeColumns
Map<String, String> computeColumnProperties = serializeComputeColumn(schema);
properties.putAll(computeColumnProperties);

// add properties for watermark,only support one watermark now
List<WatermarkSpec> watermarkSpecs = schema.getWatermarkSpecs();
if (!watermarkSpecs.isEmpty()) {
if (watermarkSpecs.size() > 1) {
throw new IllegalStateException("Multiple watermark definition is not supported yet.");
}
properties.putAll(serializeWatermarkSpec(watermarkSpecs.get(0)));
}

return properties;
}

/** Serialize compute columns into properties. */
private static Map<String, String> serializeComputeColumn(TableSchema schema) {
Map<String, String> serialized = new HashMap<>();
List<TableColumn> tableColumns = schema.getTableColumns();
// index in compute Column, starting from 1
int computeIndex = 1;
for (TableColumn column : tableColumns) {
if (column instanceof ComputedColumn) {
serialized.put(
compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, computeIndex, NAME), column.getName());
serialized.put(
compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, computeIndex, DATA_TYPE),
column.getType().getLogicalType().asSerializableString());
serialized.put(
compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, computeIndex, EXPR),
((TableColumn.ComputedColumn) column).getExpression());
computeIndex++;
}
}
return serialized;
}

/** Deserialize compute columns from properties. */
private static TableColumn deserializeComputeColumn(
Map<String, String> tableProperties, int index, List<String> fieldNames) {
String expr = tableProperties.get(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, EXPR));
if (!isExprContainField(expr, fieldNames)) {
throw new IllegalStateException(
"expression " + expr + " does not match any columns in the table. ");
}
DataType dataType =
TypeConversions.fromLogicalToDataType(
LogicalTypeParser.parse(
tableProperties.get(
compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, DATA_TYPE))));
TableColumn column =
TableColumn.computed(
tableProperties.get(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, NAME)),
dataType,
expr);
return column;
}

private static boolean isExprContainField(String expr, List<String> fieldNames) {
if (expr.equalsIgnoreCase(PROCTIME_FUNCTION)) {
return true;
}
for (String fieldName : fieldNames) {
if (expr.contains("`" + fieldName + "`")) {
return true;
}
}
return false;
}

private static boolean isComputeValid(Map<String, String> tableProperties, int index) {
// check if properties for computeColumn is valid and complete
if (StringUtils.isNotBlank(
tableProperties.get(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, NAME)))
&& StringUtils.isNotBlank(
tableProperties.get(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, DATA_TYPE)))
&& StringUtils.isNotBlank(
tableProperties.get(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, EXPR)))) {
return true;
}
LOG.warn(
"properties for computeColumn {} is incomplete, It should contain {}, {}, {}. skip to convert it into computeColumn ",
index,
NAME,
DATA_TYPE,
EXPR);
return false;
}

private static Set<Integer> getComputeIndex(Map<String, String> tableProperties) {
Set<Integer> computedIndex = new TreeSet<>();
tableProperties
.keySet()
.forEach(
k -> {
Matcher matcher = COMPUTE_PATTERN.matcher(k);
if (matcher.find()) {
int indexId = NumberUtils.toInt(matcher.group(1));
if (indexId > 0 && isComputeValid(tableProperties, indexId)) {
computedIndex.add(indexId);
}
}
});
return computedIndex;
}

/** Serialize watermarkSpec into properties. */
private static Map<String, String> serializeWatermarkSpec(WatermarkSpec watermarkSpec) {
Map<String, String> serializedWatermarkSpec = new HashMap<>();
serializedWatermarkSpec.put(
compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_ROWTIME),
watermarkSpec.getRowtimeAttribute());
serializedWatermarkSpec.put(
compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_STRATEGY_EXPR),
watermarkSpec.getWatermarkExpr());
serializedWatermarkSpec.put(
compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_STRATEGY_DATA_TYPE),
watermarkSpec.getWatermarkExprOutputType().getLogicalType().asSerializableString());

return serializedWatermarkSpec;
}

/** Deserialize watermarkSpec from properties. */
private static WatermarkSpec deserializeWatermarkSpec(
Map<String, String> tableProperties, List<String> fieldNames) {
String rowtimeAttribute =
tableProperties.get(compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_ROWTIME));
if (!fieldNames.contains(rowtimeAttribute)) {
throw new IllegalStateException(
"Watermark rowtime attribute '"
+ rowtimeAttribute
+ " does not match any columns in the table. ");
}
DataType watermarkExprOutputType =
TypeConversions.fromLogicalToDataType(
LogicalTypeParser.parse(
tableProperties.get(
compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_STRATEGY_DATA_TYPE))));
return new WatermarkSpec(
rowtimeAttribute,
tableProperties.get(compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_STRATEGY_EXPR)),
watermarkExprOutputType);
}

private static boolean isWatermarkValid(Map<String, String> tableProperties) {
// check if properties for watermark is valid and complete
if (StringUtils.isNotBlank(
tableProperties.get(compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_ROWTIME)))
&& StringUtils.isNotBlank(
tableProperties.get(compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_STRATEGY_EXPR)))
&& StringUtils.isNotBlank(
tableProperties.get(
compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_STRATEGY_DATA_TYPE)))) {
return true;
}
LOG.warn(
"properties for watermark is incomplete, It should contain {}, {}, {}. skip to convert it into watermark strategy ",
WATERMARK_ROWTIME,
WATERMARK_STRATEGY_EXPR,
WATERMARK_STRATEGY_DATA_TYPE);
return false;
}

private static String compoundKey(Object... components) {
return Stream.of(components).map(Object::toString).collect(Collectors.joining("."));
}

/**
* get physical tableSchema
*
* @param tableSchema Flink TableSchema
* @return Flink tableSchema
*/
public static TableSchema getPhysicalSchema(TableSchema tableSchema) {
TableSchema.Builder builder = filter(tableSchema, TableColumn::isPhysical);
return builder.build();
}
}
Loading

0 comments on commit f11336f

Please sign in to comment.