Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/content.zh/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,26 @@ To describe a transform rule, the following parameters can be used:
| primary-keys | Sink table primary keys, separated by commas | optional |
| partition-keys | Sink table partition keys, separated by commas | optional |
| table-options | used to the configure table creation statement when automatically creating tables | optional |
| converter-after-transform | used to add a converter to change DataChangeEvent after transform | optional |
| description | Transform rule description | optional |

Multiple rules can be declared in one single pipeline YAML file.

## converter-after-transform

`converter-after-transform` is used to change the DataChangeEvent after other transform. The available values of this options are as follows.

- SOFT_DELETE: The delete event will be converted as an insert event. This converter should be used together with the metadata `__data_event_type__`. Then you can implement the soft delete.

For example, the following transform will not delete data when the delete event happens. Instead it will update the column `op_type` to -D in sink and transform it to an insert record.

```yaml
transform:
- source-table: \.*.\.*
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE
```

# Metadata Fields
## Fields definition
There are some hidden columns used to access metadata information. They will only take effect when explicitly referenced in the transform rules.
Expand Down
34 changes: 25 additions & 9 deletions docs/content/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,34 @@ What's more, it also helps users filter some unnecessary data during the synchro
# Parameters
To describe a transform rule, the following parameters can be used:

| Parameter | Meaning | Optional/Required |
|--------------|----------------------------------------------------|-------------------|
| source-table | Source table id, supports regular expressions | required |
| projection | Projection rule, supports syntax similar to the select clause in SQL | optional |
| filter | Filter rule, supports syntax similar to the where clause in SQL | optional |
| primary-keys | Sink table primary keys, separated by commas | optional |
| partition-keys | Sink table partition keys, separated by commas | optional |
| table-options | used to the configure table creation statement when automatically creating tables | optional |
| description | Transform rule description | optional |
| Parameter | Meaning | Optional/Required |
|---------------------------|-----------------------------------------------------------------------------------|-------------------|
| source-table | Source table id, supports regular expressions | required |
| projection | Projection rule, supports syntax similar to the select clause in SQL | optional |
| filter | Filter rule, supports syntax similar to the where clause in SQL | optional |
| primary-keys | Sink table primary keys, separated by commas | optional |
| partition-keys | Sink table partition keys, separated by commas | optional |
| table-options | used to the configure table creation statement when automatically creating tables | optional |
| converter-after-transform | used to add a converter to change DataChangeEvent after transform | optional |
| description | Transform rule description | optional |

Multiple rules can be declared in one single pipeline YAML file.

## converter-after-transform

`converter-after-transform` is used to change the DataChangeEvent after other transform. The available values of this options are as follows.

- SOFT_DELETE: The delete event will be converted as an insert event. This converter should be used together with the metadata `__data_event_type__`. Then you can implement the soft delete.

For example, the following transform will not delete data when the delete event happens. Instead it will update the column `op_type` to -D in sink and transform it to an insert record.

```yaml
transform:
- source-table: \.*.\.*
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE
```

# Metadata Fields
## Fields definition
There are some hidden columns used to access metadata information. They will only take effect when explicitly referenced in the transform rules.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
private static final String TRANSFORM_PROJECTION_KEY = "projection";
private static final String TRANSFORM_FILTER_KEY = "filter";
private static final String TRANSFORM_DESCRIPTION_KEY = "description";
private static final String TRANSFORM_CONVERTER_AFTER_TRANSFORM_KEY =
"converter-after-transform";

// UDF related keys
private static final String UDF_KEY = "user-defined-function";
Expand Down Expand Up @@ -316,6 +318,10 @@ private TransformDef toTransformDef(JsonNode transformNode) {
Optional.ofNullable(transformNode.get(TRANSFORM_DESCRIPTION_KEY))
.map(JsonNode::asText)
.orElse(null);
String postTransformConverter =
Optional.ofNullable(transformNode.get(TRANSFORM_CONVERTER_AFTER_TRANSFORM_KEY))
.map(JsonNode::asText)
.orElse(null);

return new TransformDef(
sourceTable,
Expand All @@ -324,7 +330,8 @@ private TransformDef toTransformDef(JsonNode transformNode) {
primaryKeys,
partitionKeys,
tableOptions,
description);
description,
postTransformConverter);
}

private Configuration toPipelineConfig(JsonNode pipelineConfigNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ void testValidTimeZone() throws Exception {
}

@Test
void testInvalidTimeZone() throws Exception {
void testInvalidTimeZone() {
URL resource = Resources.getResource("definitions/pipeline-definition-minimized.yaml");
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
assertThatThrownBy(
Expand Down Expand Up @@ -336,15 +336,17 @@ private void testSchemaEvolutionTypesParsing(
"id",
"product_name",
"comment=app order",
"project fields from source table"),
"project fields from source table",
"SOFT_DELETE"),
new TransformDef(
"mydb.web_order_.*",
"CONCAT(id, order_id) as uniq_id, *",
"uniq_id > 10",
null,
null,
null,
"add new uniq_id for each row")),
"add new uniq_id for each row",
null)),
Collections.emptyList(),
Collections.singletonList(
new ModelDef(
Expand Down Expand Up @@ -402,6 +404,7 @@ void testParsingFullDefinitionFromString() throws Exception {
+ " partition-keys: product_name\n"
+ " table-options: comment=app order\n"
+ " description: project fields from source table\n"
+ " converter-after-transform: SOFT_DELETE\n"
+ " - source-table: mydb.web_order_.*\n"
+ " projection: CONCAT(id, order_id) as uniq_id, *\n"
+ " filter: uniq_id > 10\n"
Expand Down Expand Up @@ -469,15 +472,17 @@ void testParsingFullDefinitionFromString() throws Exception {
"id",
"product_name",
"comment=app order",
"project fields from source table"),
"project fields from source table",
"SOFT_DELETE"),
new TransformDef(
"mydb.web_order_.*",
"CONCAT(id, order_id) as uniq_id, *",
"uniq_id > 10",
null,
null,
null,
"add new uniq_id for each row")),
"add new uniq_id for each row",
null)),
Collections.emptyList(),
Collections.singletonList(
new ModelDef(
Expand Down Expand Up @@ -606,15 +611,17 @@ void testParsingFullDefinitionFromString() throws Exception {
"id",
"product_name",
"comment=app order",
"project fields from source table"),
"project fields from source table",
"SOFT_DELETE"),
new TransformDef(
"mydb.web_order_.*",
"CONCAT(id, order_id) as uniq_id, *",
"uniq_id > 10",
null,
null,
null,
"add new uniq_id for each row")),
"add new uniq_id for each row",
null)),
Collections.emptyList(),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
Expand Down Expand Up @@ -646,6 +653,7 @@ void testParsingFullDefinitionFromString() throws Exception {
null,
null,
null,
null,
null)),
Arrays.asList(
new UdfDef(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ transform:
partition-keys: product_name
table-options: comment=app order
description: project fields from source table
converter-after-transform: SOFT_DELETE
- source-table: mydb.web_order_.*
projection: CONCAT(id, order_id) as uniq_id, *
filter: uniq_id > 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ transform:
partition-keys: product_name
table-options: comment=app order
description: project fields from source table
converter-after-transform: SOFT_DELETE
- source-table: mydb.web_order_.*
projection: CONCAT(id, order_id) as uniq_id, *
filter: uniq_id > 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class TransformDef {
private final String primaryKeys;
private final String partitionKeys;
private final String tableOptions;
private final String postTransformConverter;

public TransformDef(
String sourceTable,
Expand All @@ -58,14 +59,16 @@ public TransformDef(
String primaryKeys,
String partitionKeys,
String tableOptions,
String description) {
String description,
String postTransformConverter) {
this.sourceTable = sourceTable;
this.projection = projection;
this.filter = filter;
this.primaryKeys = primaryKeys;
this.partitionKeys = partitionKeys;
this.tableOptions = tableOptions;
this.description = description;
this.postTransformConverter = postTransformConverter;
}

public String getSourceTable() {
Expand Down Expand Up @@ -104,6 +107,10 @@ public String getTableOptions() {
return tableOptions;
}

public String getPostTransformConverter() {
return postTransformConverter;
}

@Override
public String toString() {
return "TransformDef{"
Expand All @@ -119,6 +126,9 @@ public String toString() {
+ ", description='"
+ description
+ '\''
+ ", postTransformConverter='"
+ postTransformConverter
+ '\''
+ '}';
}

Expand All @@ -137,7 +147,8 @@ public boolean equals(Object o) {
&& Objects.equals(description, that.description)
&& Objects.equals(primaryKeys, that.primaryKeys)
&& Objects.equals(partitionKeys, that.partitionKeys)
&& Objects.equals(tableOptions, that.tableOptions);
&& Objects.equals(tableOptions, that.tableOptions)
&& Objects.equals(postTransformConverter, that.postTransformConverter);
}

@Override
Expand All @@ -149,6 +160,7 @@ public int hashCode() {
description,
primaryKeys,
partitionKeys,
tableOptions);
tableOptions,
postTransformConverter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public DataStream<Event> translatePreTransform(
transform.getFilter().orElse(null),
transform.getPrimaryKeys(),
transform.getPartitionKeys(),
transform.getTableOptions());
transform.getTableOptions(),
transform.getPostTransformConverter());
}

preTransformFunctionBuilder.addUdfFunctions(
Expand Down Expand Up @@ -91,7 +92,8 @@ public DataStream<Event> translatePostTransform(
transform.isValidFilter() ? transform.getFilter().get() : null,
transform.getPrimaryKeys(),
transform.getPartitionKeys(),
transform.getTableOptions());
transform.getTableOptions(),
transform.getPostTransformConverter());
}
}
postTransformFunctionBuilder.addTimezone(timezone);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception {
"col1",
"col12",
"key1=value1",
"");
"",
null);

// Setup pipeline
Configuration pipelineConfig = new Configuration();
Expand Down Expand Up @@ -387,7 +388,8 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception {
"col1",
"col12",
"key1=value1",
"");
"",
null);

// Setup pipeline
Configuration pipelineConfig = new Configuration();
Expand Down Expand Up @@ -449,7 +451,8 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
"col1",
"col12",
"key1=value1",
"");
"",
null);
TransformDef transformDef2 =
new TransformDef(
"default_namespace.default_schema.table1",
Expand All @@ -458,7 +461,8 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
null,
null,
null,
"");
"",
null);
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
Expand Down Expand Up @@ -986,7 +990,8 @@ void testTransformMergingWithRoute() throws Exception {
null,
null,
null,
""));
"",
null));

// Setup route
TableId mergedTable = TableId.tableId("default_namespace", "default_schema", "merged");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,8 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception {
"col1",
"col12",
"key1=value1",
"");
"",
null);

// Setup pipeline
Configuration pipelineConfig = new Configuration();
Expand Down Expand Up @@ -430,7 +431,8 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception {
"col1",
"col12",
"key1=value1",
"");
"",
null);

// Setup pipeline
Configuration pipelineConfig = new Configuration();
Expand Down Expand Up @@ -490,7 +492,8 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
"col1",
"col12",
"key1=value1",
"");
"",
null);
TransformDef transformDef2 =
new TransformDef(
"default_namespace.default_schema.table1",
Expand All @@ -499,7 +502,8 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
null,
null,
null,
"");
"",
null);
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
Expand Down Expand Up @@ -1020,7 +1024,8 @@ void testTransformMergingWithRoute() throws Exception {
null,
null,
null,
""));
"",
null));

// Setup route
TableId mergedTable = TableId.tableId("default_namespace", "default_schema", "merged");
Expand Down
Loading
Loading