Skip to content

Commit

Permalink
change for review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Yi Xie committed Nov 8, 2023
1 parent 699b947 commit c923850
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 120 deletions.
18 changes: 3 additions & 15 deletions docs/engines/flink/flink-ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ CREATE TABLE `arctic_catalog`.`arctic_db`.`test_table` (
id BIGINT,
name STRING,
op_time TIMESTAMP,
ts3 AS CAST(op_time as TIMESTAMP(3)),
watermark FOR ts3 AS ts3 - INTERVAL '5' SECOND,
proc AS PROCTIME(),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
Expand All @@ -85,21 +87,7 @@ Currently, most of the syntax supported by [Flink SQL create table](https://nigh
- PRIMARY KEY (column1, column2, …): configure primary keys.
- WITH ('key'='value', …): configure Amoro Table properties.
- computed_column_definition: column_name AS computed_column_expression
- watermark_definition: WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

if you want to create watermark, your ddl sql maybe like below:
```sql
CREATE TABLE `arctic_catalog`.`arctic_db`.`test_table` (
id BIGINT,
name STRING,
ts TIMESTAMP(6),
ts3 AS CAST(ts as TIMESTAMP(3)),
watermark FOR ts3 AS ts3 - INTERVAL '5' SECOND,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'key' = 'value'
);
```
- watermark_definition: WATERMARK FOR rowtime_column_name AS watermark_strategy_expression, rowtime_column_name must be of type TIMESTAMP(3).

#### PARTITIONED BY
Create a partitioned table using PARTITIONED BY.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME;

import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.table.PrimaryKeySpec;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.curator.shaded.com.google.common.collect.Maps;
import org.apache.flink.table.api.TableColumn;
Expand All @@ -54,6 +56,8 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -66,18 +70,21 @@ public class FlinkSchemaUtil {
public static final String COMPUTED_COLUMNS = "computed-column";

public static final String WATERMARK = "watermark";
public static final String PROCTIME_FUNCTION = SCHEMA_PROCTIME + "()";
public static final Pattern COMPUTE_PATTERN =
Pattern.compile("flink\\.computed-column\\.(\\d+)\\.name");

/**
* Convert iceberg Schema to flink TableSchema.
*
* @param arcticSchema
* @param arcticProperties
* @param icebergSchema
* @param tableProperties
* @return Flink TableSchema
*/
public static TableSchema toSchema(
Schema arcticSchema, List<String> primaryKeys, Map<String, String> arcticProperties) {
Schema icebergSchema, List<String> primaryKeys, Map<String, String> tableProperties) {
TableSchema.Builder builder = TableSchema.builder();
RowType rowType = org.apache.iceberg.flink.FlinkSchemaUtil.convert(arcticSchema);
RowType rowType = org.apache.iceberg.flink.FlinkSchemaUtil.convert(icebergSchema);

// add physical columns.
for (RowType.RowField field : rowType.getFields()) {
Expand All @@ -89,29 +96,25 @@ public static TableSchema toSchema(
builder.primaryKey(primaryKeys.toArray(new String[0]));
}

// add watermark
if (arcticProperties.keySet().stream()
.anyMatch(key -> key.startsWith(compoundKey(FLINK_PREFIX, WATERMARK)))) {
builder.watermark(deserializeWatermarkSpec(arcticProperties));
}
HashSet<Integer> computeIndex = getComputeIndex(tableProperties);
List<String> fieldNames = rowType.getFieldNames();

// add computed columns
HashSet<Integer> computedIndex = new HashSet<>();
arcticProperties.forEach(
(k, v) -> {
if (k.startsWith(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS))
&& arcticProperties.get(k) != null) {
int start =
k.indexOf(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS))
+ compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS).length()
+ 1;
int end = k.lastIndexOf('.');
computedIndex.add(NumberUtils.toInt(k.substring(start, end)));
}
});
computedIndex.stream()
.map(index -> deserializeComputeColumn(arcticProperties, index))
computeIndex.stream()
.map(index -> deserializeComputeColumn(tableProperties, index, fieldNames))
.forEach(builder::add);

// get computeColumn fieldName
computeIndex.stream()
.forEach(
index ->
fieldNames.add(
tableProperties.get(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, NAME))));

// add watermark
if (isWatermarkValid(tableProperties)) {
builder.watermark(deserializeWatermarkSpec(tableProperties, fieldNames));
}
return builder.build();
}

Expand Down Expand Up @@ -248,26 +251,17 @@ public static void addPrimaryKey(
/**
* Generate table properties for watermark and computed columns from flink TableSchema.
*
* @param schema flink TableSchema.
* @return Table properties map.
* @param schema Flink TableSchema.
* @return tableProperties.
*/
public static Map<String, String> addSchemaProperties(TableSchema schema) {
public static Map<String, String> generateExtraOptionsFrom(TableSchema schema) {
Map<String, String> properties = Maps.newHashMap();

// field name -> index
final Map<String, Integer> indexMap = new HashMap<>();
List<TableColumn> tableColumns = schema.getTableColumns();
for (int i = 0; i < tableColumns.size(); i++) {
indexMap.put(tableColumns.get(i).getName(), i);
}

List<TableColumn> computedColumns =
schema.getTableColumns().stream()
.filter(column -> column instanceof ComputedColumn)
.collect(Collectors.toList());
properties.putAll(serializeComputeColumn(indexMap, computedColumns));
// add properties for computeColumns
Map<String, String> computeColumnProperties = serializeComputeColumn(schema);
properties.putAll(computeColumnProperties);

// watermark,only support one watermark now
// add properties for watermark,only support one watermark now
List<WatermarkSpec> watermarkSpecs = schema.getWatermarkSpecs();
if (!watermarkSpecs.isEmpty()) {
if (watermarkSpecs.size() > 1) {
Expand All @@ -280,44 +274,94 @@ public static Map<String, String> addSchemaProperties(TableSchema schema) {
}

/** serialize computeColumns into properties */
private static Map<String, String> serializeComputeColumn(
Map<String, Integer> indexMap, List<TableColumn> computedColumns) {
private static Map<String, String> serializeComputeColumn(TableSchema schema) {
Map<String, String> serialized = new HashMap<>();
computedColumns.stream()
.forEach(
column -> {
int index = indexMap.get(column.getName());
serialized.put(
compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, NAME), column.getName());
serialized.put(
compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, DATA_TYPE),
column.getType().getLogicalType().asSerializableString());
serialized.put(
compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, EXPR),
((TableColumn.ComputedColumn) column).getExpression());
});

List<TableColumn> tableColumns = schema.getTableColumns();
// index in Flink TableSchema
for (int index = 0; index < tableColumns.size(); index++) {
TableColumn column = tableColumns.get(index);
if (column instanceof ComputedColumn) {
serialized.put(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, NAME), column.getName());
serialized.put(
compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, DATA_TYPE),
column.getType().getLogicalType().asSerializableString());
serialized.put(
compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, EXPR),
((TableColumn.ComputedColumn) column).getExpression());
}
}
return serialized;
}

/** deserialize computeColumns from properties */
private static TableColumn deserializeComputeColumn(
Map<String, String> arcticProperties, int index) {
String name = arcticProperties.get(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, NAME));
Map<String, String> tableProperties, int index, List<String> fieldNames) {
String expr = tableProperties.get(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, EXPR));
if (!isExprContainField(expr, fieldNames)) {
throw new IllegalStateException(
"expression " + expr + " do not match any columns in amoro. ");
}
DataType dataType =
TypeConversions.fromLogicalToDataType(
LogicalTypeParser.parse(
arcticProperties.get(
tableProperties.get(
compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, DATA_TYPE))));

TableColumn column =
TableColumn.computed(
name,
tableProperties.get(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, NAME)),
dataType,
arcticProperties.get(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, EXPR)));
expr);
return column;
}

private static boolean isExprContainField(String expr, List<String> fieldNames) {
if (expr.equalsIgnoreCase(PROCTIME_FUNCTION)) {
return true;
}
for (String fieldName : fieldNames) {
if (expr.contains("`" + fieldName + "`")) {
return true;
}
}
return false;
}

private static boolean isComputeValid(Map<String, String> tableProperties, int index) {
// check if properties for computeColumn is valid and complete
if (StringUtils.isNotBlank(
tableProperties.get(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, NAME)))
&& StringUtils.isNotBlank(
tableProperties.get(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, DATA_TYPE)))
&& StringUtils.isNotBlank(
tableProperties.get(compoundKey(FLINK_PREFIX, COMPUTED_COLUMNS, index, EXPR)))) {
return true;
}
LOG.warn(
"properties for computeColumn {} is incomplete, It should contain {}, {}, {}. skip to convert it into computeColumn ",
index,
NAME,
DATA_TYPE,
EXPR);
return false;
}

private static HashSet<Integer> getComputeIndex(Map<String, String> tableProperties) {
HashSet<Integer> computedIndex = new HashSet<>();
tableProperties
.keySet()
.forEach(
k -> {
Matcher matcher = COMPUTE_PATTERN.matcher(k);
if (matcher.find()) {
int indexId = NumberUtils.toInt(matcher.group(1));
if (isComputeValid(tableProperties, indexId)) {
computedIndex.add(indexId);
}
}
});
return computedIndex;
}

/** serialize watermarkSpec into properties */
private static Map<String, String> serializeWatermarkSpec(WatermarkSpec watermarkSpec) {
Map<String, String> serializedWatermarkSpec = new HashMap<>();
Expand All @@ -335,18 +379,44 @@ private static Map<String, String> serializeWatermarkSpec(WatermarkSpec watermar
}

/** deserialize watermarkSpec from properties */
private static WatermarkSpec deserializeWatermarkSpec(Map<String, String> arcticProperties) {
private static WatermarkSpec deserializeWatermarkSpec(
Map<String, String> tableProperties, List<String> fieldNames) {
String rowtimeAttribute =
arcticProperties.get(compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_ROWTIME));
String watermarkExpressionString =
arcticProperties.get(compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_STRATEGY_EXPR));
tableProperties.get(compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_ROWTIME));
if (!fieldNames.contains(rowtimeAttribute)) {
throw new IllegalStateException(
"Watermark rowtime attribute '"
+ rowtimeAttribute
+ " does not match any columns in amoro.");
}
DataType watermarkExprOutputType =
TypeConversions.fromLogicalToDataType(
LogicalTypeParser.parse(
arcticProperties.get(
tableProperties.get(
compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_STRATEGY_DATA_TYPE))));
return new WatermarkSpec(
rowtimeAttribute,
tableProperties.get(compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_STRATEGY_EXPR)),
watermarkExprOutputType);
}

return new WatermarkSpec(rowtimeAttribute, watermarkExpressionString, watermarkExprOutputType);
private static boolean isWatermarkValid(Map<String, String> tableProperties) {
// check if properties for watermark is valid and complete
if (StringUtils.isNotBlank(
tableProperties.get(compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_ROWTIME)))
&& StringUtils.isNotBlank(
tableProperties.get(compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_STRATEGY_EXPR)))
&& StringUtils.isNotBlank(
tableProperties.get(
compoundKey(FLINK_PREFIX, WATERMARK, WATERMARK_STRATEGY_DATA_TYPE)))) {
return true;
}
LOG.warn(
"properties for watermark is incomplete, It should contain {}, {}, {}. skip to convert it into watermark strategy ",
WATERMARK_ROWTIME,
WATERMARK_STRATEGY_EXPR,
WATERMARK_STRATEGY_DATA_TYPE);
return false;
}

private static String compoundKey(Object... components) {
Expand All @@ -356,8 +426,8 @@ private static String compoundKey(Object... components) {
/**
* get physical tableSchema
*
* @param tableSchema
* @return flink tableSchema
* @param tableSchema Flink TableSchema
* @return Flink tableSchema
*/
public static TableSchema getPhysicalSchema(TableSchema tableSchema) {
TableSchema.Builder builder = filter(tableSchema, TableColumn::isPhysical);
Expand Down
Loading

0 comments on commit c923850

Please sign in to comment.