Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-1457]Support computed columns and watermark in Flink DDL #2239

Merged
merged 27 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
2d3f12b
supoort_colume and watermark for flink1.15
Oct 26, 2023
579ddac
separate getPhysicalSchemaForDimTable when dim_table.enabled=true
Nov 3, 2023
89541b7
Merge branch 'master' into AMORO_1457_support_compute
xieyi888 Nov 3, 2023
eff5a37
Merge branch 'master' into AMORO_1457_support_compute
Nov 3, 2023
d4f36e7
Merge remote-tracking branch 'origin/AMORO_1457_support_compute' into…
Nov 6, 2023
c634733
Merge branch 'master' into AMORO_1457_support_compute
Nov 6, 2023
699b947
rm ut for flink1.15
Nov 6, 2023
c923850
change for review comments
Nov 8, 2023
c36e69d
Merge branch 'master' into AMORO_1457_support_compute
Nov 8, 2023
2d8bd55
change for master conflict
Nov 8, 2023
2b8f5f2
fix checkstyle
Nov 8, 2023
00a9cac
change compute index start from1
Nov 10, 2023
65bbe2c
merge master into current
Nov 12, 2023
0884406
change method name
Nov 12, 2023
f5b602b
Merge branch 'master' into AMORO_1457_support_compute
xieyi888 Nov 12, 2023
ba98f8d
Merge branch 'master' into AMORO_1457_support_compute
xieyi888 Nov 13, 2023
6b8afbb
change doc for computr column
Nov 13, 2023
8d63970
Merge branch 'AMORO_1457_support_compute' of https://github.com/xieyi…
Nov 13, 2023
3b18abe
fix check style
Nov 13, 2023
bc98003
Merge branch 'master' into AMORO_1457_support_compute
xieyi888 Nov 14, 2023
172ff46
Merge branch 'master' into AMORO_1457_support_compute
zhoujinsong Nov 16, 2023
9968557
Merge branch 'master' into AMORO_1457_support_compute
zhoujinsong Nov 20, 2023
a316ad0
fix for comments
Nov 21, 2023
6823770
Merge branch 'AMORO_1457_support_compute' of https://github.com/xieyi…
Nov 21, 2023
eb06640
Merge branch 'master' into AMORO_1457_support_compute
Nov 21, 2023
66d9e1c
Merge branch 'master' into AMORO_1457_support_compute
xieyi888 Nov 22, 2023
a539a9b
Merge branch 'master' into AMORO_1457_support_compute
zhoujinsong Nov 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions docs/engines/flink/flink-ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ CREATE TABLE `arctic_catalog`.`arctic_db`.`test_table` (
id BIGINT,
name STRING,
op_time TIMESTAMP,
ts3 AS CAST(op_time as TIMESTAMP(3)),
watermark FOR ts3 AS ts3 - INTERVAL '5' SECOND,
proc AS PROCTIME(),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'key' = 'value'
Expand All @@ -83,8 +86,8 @@ Currently, most of the syntax supported by [Flink SQL create table](https://nigh
- PARTITION BY (column1, column2, …): configure Flink partition fields, but Flink does not yet support hidden partitions.
- PRIMARY KEY (column1, column2, …): configure primary keys.
- WITH ('key'='value', …): configure Amoro Table properties.

Currently, configuration of computed columns and watermark fields is not supported.
- computed_column_definition: column_name AS computed_column_expression. Currently, compute column must be listed after all physical columns.
- watermark_definition: WATERMARK FOR rowtime_column_name AS watermark_strategy_expression, rowtime_column_name must be of type TIMESTAMP(3).

#### PARTITIONED BY
Create a partitioned table using PARTITIONED BY.
Expand All @@ -105,6 +108,7 @@ CREATE TABLE `test_table` (
id BIGINT,
name STRING,
op_time TIMESTAMP,
proc as PROCTIME(),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'arctic',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,97 @@

package com.netease.arctic.flink;

import static org.apache.flink.table.descriptors.DescriptorProperties.DATA_TYPE;
import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
import static org.apache.flink.table.descriptors.DescriptorProperties.NAME;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME;

import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.table.PrimaryKeySpec;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.curator.shaded.com.google.common.collect.Maps;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableColumn.ComputedColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.WatermarkSpec;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/** An util that converts flink table schema. */
public class FlinkSchemaUtil {

private static final Logger LOG = LoggerFactory.getLogger(FlinkSchemaUtil.class);
public static final String FLINK_PREFIX = "flink";

public static final String COMPUTED_COLUMNS = "computed-column";

public static final String WATERMARK = "watermark";
public static final String PROCTIME_FUNCTION = SCHEMA_PROCTIME + "()";
public static final Pattern COMPUTE_PATTERN =
Pattern.compile("flink\\.computed-column\\.(\\d+)\\.name");

/**
* Convert a {@link RowType} to a {@link TableSchema}.
* Convert iceberg Schema to flink TableSchema.
*
* @param rowType a RowType
* @param primaryKeys
* @param icebergSchema
* @param tableProperties
* @return Flink TableSchema
*/
public static TableSchema toSchema(RowType rowType, List<String> primaryKeys) {
public static TableSchema toSchema(
zhoujinsong marked this conversation as resolved.
Show resolved Hide resolved
Schema icebergSchema, List<String> primaryKeys, Map<String, String> tableProperties) {
TableSchema.Builder builder = TableSchema.builder();
RowType rowType = org.apache.iceberg.flink.FlinkSchemaUtil.convert(icebergSchema);

// add physical columns.
for (RowType.RowField field : rowType.getFields()) {
builder.field(field.getName(), TypeConversions.fromLogicalToDataType(field.getType()));
}

// add primary key
if (CollectionUtils.isNotEmpty(primaryKeys)) {
builder.primaryKey(primaryKeys.toArray(new String[0]));
}

HashSet<Integer> computeIndex = getComputeIndex(tableProperties);
List<String> fieldNames = rowType.getFieldNames();

// add computed columns
for (int index : computeIndex) {
xieyi888 marked this conversation as resolved.
Show resolved Hide resolved
builder.add(deserializeComputeColumn(tableProperties, index, fieldNames));
fieldNames.add(tableProperties.get(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, NAME)));
}

// add watermark
if (isWatermarkValid(tableProperties)) {
builder.watermark(deserializeWatermarkSpec(tableProperties, fieldNames));
}
return builder.build();
}

Expand All @@ -69,10 +117,7 @@ public static TableSchema toSchema(RowType rowType, List<String> primaryKeys) {
* com.netease.arctic.flink.table.ArcticDynamicSource} distinguish the watermark field. For now,
* it only be used in the case of Arctic as dim-table.
*/
public static TableSchema getPhysicalSchema(TableSchema tableSchema, boolean addWatermark) {
if (!addWatermark) {
return tableSchema;
}
public static TableSchema getPhysicalSchemaForDimTable(TableSchema tableSchema) {
TableSchema.Builder builder = filter(tableSchema, TableColumn::isPhysical);
tableSchema.getWatermarkSpecs().forEach(builder::watermark);
return builder.build();
Expand Down Expand Up @@ -197,4 +242,191 @@ public static void addPrimaryKey(
"Arctic primary key should be declared in table")));
});
}
/**
* Generate table properties for watermark and computed columns from flink TableSchema.
*
* @param schema Flink TableSchema.
* @return tableProperties.
*/
public static Map<String, String> generateExtraOptionsFrom(TableSchema schema) {
Map<String, String> properties = Maps.newHashMap();

// add properties for computeColumns
Map<String, String> computeColumnProperties = serializeComputeColumn(schema);
properties.putAll(computeColumnProperties);

// add properties for watermark,only support one watermark now
List<WatermarkSpec> watermarkSpecs = schema.getWatermarkSpecs();
if (!watermarkSpecs.isEmpty()) {
if (watermarkSpecs.size() > 1) {
throw new IllegalStateException("Multiple watermark definition is not supported yet.");
}
properties.putAll(serializeWatermarkSpec(watermarkSpecs.get(0)));
}

return properties;
}

/** serialize computeColumns into properties */
xieyi888 marked this conversation as resolved.
Show resolved Hide resolved
private static Map<String, String> serializeComputeColumn(TableSchema schema) {
Map<String, String> serialized = new HashMap<>();
List<TableColumn> tableColumns = schema.getTableColumns();
// index in compute Column, starting from 1
int computeIndex = 1;
for (TableColumn column : tableColumns) {
if (column instanceof ComputedColumn) {
serialized.put(
compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, computeIndex, NAME), column.getName());
serialized.put(
compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, computeIndex, DATA_TYPE),
column.getType().getLogicalType().asSerializableString());
serialized.put(
compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, computeIndex, EXPR),
((TableColumn.ComputedColumn) column).getExpression());
computeIndex++;
}
}
return serialized;
}

/** deserialize computeColumns from properties */
xieyi888 marked this conversation as resolved.
Show resolved Hide resolved
private static TableColumn deserializeComputeColumn(
Map<String, String> tableProperties, int index, List<String> fieldNames) {
String expr = tableProperties.get(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, EXPR));
if (!isExprContainField(expr, fieldNames)) {
throw new IllegalStateException(
"expression " + expr + " do not match any columns in amoro. ");
xieyi888 marked this conversation as resolved.
Show resolved Hide resolved
}
DataType dataType =
TypeConversions.fromLogicalToDataType(
LogicalTypeParser.parse(
tableProperties.get(
compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, DATA_TYPE))));
TableColumn column =
TableColumn.computed(
tableProperties.get(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, NAME)),
dataType,
expr);
return column;
}

private static boolean isExprContainField(String expr, List<String> fieldNames) {
if (expr.equalsIgnoreCase(PROCTIME_FUNCTION)) {
return true;
}
for (String fieldName : fieldNames) {
if (expr.contains("`" + fieldName + "`")) {
zhoujinsong marked this conversation as resolved.
Show resolved Hide resolved
return true;
}
}
return false;
}

private static boolean isComputeValid(Map<String, String> tableProperties, int index) {
// check if properties for computeColumn is valid and complete
if (StringUtils.isNotBlank(
tableProperties.get(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, NAME)))
&& StringUtils.isNotBlank(
tableProperties.get(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, DATA_TYPE)))
&& StringUtils.isNotBlank(
tableProperties.get(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, EXPR)))) {
return true;
}
LOG.warn(
"properties for computeColumn {} is incomplete, It should contain {}, {}, {}. skip to convert it into computeColumn ",
index,
NAME,
DATA_TYPE,
EXPR);
return false;
}

private static HashSet<Integer> getComputeIndex(Map<String, String> tableProperties) {
HashSet<Integer> computedIndex = new HashSet<>();
tableProperties
.keySet()
.forEach(
k -> {
Matcher matcher = COMPUTE_PATTERN.matcher(k);
if (matcher.find()) {
int indexId = NumberUtils.toInt(matcher.group(1));
if (indexId > 0 && isComputeValid(tableProperties, indexId)) {
computedIndex.add(indexId);
}
}
});
return computedIndex;
}

/** serialize watermarkSpec into properties */
private static Map<String, String> serializeWatermarkSpec(WatermarkSpec watermarkSpec) {
Map<String, String> serializedWatermarkSpec = new HashMap<>();
xieyi888 marked this conversation as resolved.
Show resolved Hide resolved
serializedWatermarkSpec.put(
compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_ROWTIME),
watermarkSpec.getRowtimeAttribute());
serializedWatermarkSpec.put(
compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_STRATEGY_EXPR),
watermarkSpec.getWatermarkExpr());
serializedWatermarkSpec.put(
compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_STRATEGY_DATA_TYPE),
watermarkSpec.getWatermarkExprOutputType().getLogicalType().asSerializableString());

return serializedWatermarkSpec;
}

/** deserialize watermarkSpec from properties */
private static WatermarkSpec deserializeWatermarkSpec(
Map<String, String> tableProperties, List<String> fieldNames) {
String rowtimeAttribute =
tableProperties.get(compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_ROWTIME));
if (!fieldNames.contains(rowtimeAttribute)) {
throw new IllegalStateException(
"Watermark rowtime attribute '"
+ rowtimeAttribute
+ " does not match any columns in amoro.");
}
DataType watermarkExprOutputType =
TypeConversions.fromLogicalToDataType(
LogicalTypeParser.parse(
tableProperties.get(
compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_STRATEGY_DATA_TYPE))));
return new WatermarkSpec(
rowtimeAttribute,
tableProperties.get(compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_STRATEGY_EXPR)),
watermarkExprOutputType);
}

private static boolean isWatermarkValid(Map<String, String> tableProperties) {
// check if properties for watermark is valid and complete
if (StringUtils.isNotBlank(
tableProperties.get(compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_ROWTIME)))
&& StringUtils.isNotBlank(
tableProperties.get(compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_STRATEGY_EXPR)))
&& StringUtils.isNotBlank(
tableProperties.get(
compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_STRATEGY_DATA_TYPE)))) {
return true;
}
LOG.warn(
"properties for watermark is incomplete, It should contain {}, {}, {}. skip to convert it into watermark strategy ",
WATERMARK_ROWTIME,
WATERMARK_STRATEGY_EXPR,
WATERMARK_STRATEGY_DATA_TYPE);
return false;
}

private static String compoundKey(Object... components) {
return Stream.of(components).map(Object::toString).collect(Collectors.joining("."));
}

/**
* get physical tableSchema
*
* @param tableSchema Flink TableSchema
* @return Flink tableSchema
*/
public static TableSchema getPhysicalSchema(TableSchema tableSchema) {
TableSchema.Builder builder = filter(tableSchema, TableColumn::isPhysical);
return builder.build();
}
}
Loading