Skip to content

Commit

Permalink
[Feature][transforms-v2] Support append only stream from cdc source
Browse files Browse the repository at this point in the history
  • Loading branch information
hawk9821 committed Oct 10, 2024
1 parent 749b2fe commit 15395c2
Show file tree
Hide file tree
Showing 14 changed files with 917 additions and 2 deletions.
113 changes: 113 additions & 0 deletions docs/en/transform-v2/cdc-row.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# CdcRow

> CdcRow transform plugin
## Description

transform cdc row to append only row that contains the cdc RowKind. <br />
Example: <br />
CDC row: -D 1, test1, test2 <br />
transformed Row: +I 1,test1,test2,DELETE

## Options

| name | type | required | default value |
|-------------------|--------|----------|---------------|
| custom_field_name | string | yes | operate_type |
| transform_type | enum | yes | SHORT |

### custom_field_name [string]

Custom field name of the RowKind field

### transform_type [enum]

the RowKind field value formatting , the option can be `SHORT` or `FULL`

`SHORT` : +I, -U , +U, -D
`FULL` : INSERT, UPDATE_BEFORE, UPDATE_AFTER , DELETE

## Examples


```yaml

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
},
{
kind = INSERT
fields = [4, "D", 100]
},
{
kind = UPDATE_BEFORE
fields = [1, "A", 100]
},
{
kind = UPDATE_AFTER
fields = [1, "F", 100]
}
{
kind = UPDATE_BEFORE
fields = [2, "B", 100]
},
{
kind = UPDATE_AFTER
fields = [2, "G", 100]
},
{
kind = DELETE
fields = [3, "C", 100]
},
{
kind = DELETE
fields = [4, "D", 100]
}
]
}
}

transform {
CdcRow {
custom_field_name = "custom_name"
transform_type = FULL
result_table_name = "trans_result"
}
}

sink {
Console {
source_table_name = "custom_name"
}
}

```

112 changes: 112 additions & 0 deletions docs/zh/transform-v2/cdc-row.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# CdcRow

> CdcRow transform plugin
## Description

将CDC Row 转换为 Append only Row, 转换后的行扩展了RowKind字典 <br />
Example: <br />
CDC row: -D 1, test1, test2 <br />
transformed Row: +I 1,test1,test2,DELETE

## Options

| name | type | required | default value |
|-------------------|--------|----------|---------------|
| custom_field_name | string | yes | operate_type |
| transform_type | enum | yes | SHORT |

### custom_field_name [string]

RowKind列的自定义名

### transform_type [enum]

格式化RowKind值 , 配置为 `SHORT``FULL`

`SHORT` : +I, -U , +U, -D
`FULL` : INSERT, UPDATE_BEFORE, UPDATE_AFTER , DELETE

## Examples

```yaml

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
},
{
kind = INSERT
fields = [4, "D", 100]
},
{
kind = UPDATE_BEFORE
fields = [1, "A", 100]
},
{
kind = UPDATE_AFTER
fields = [1, "F", 100]
}
{
kind = UPDATE_BEFORE
fields = [2, "B", 100]
},
{
kind = UPDATE_AFTER
fields = [2, "G", 100]
},
{
kind = DELETE
fields = [3, "C", 100]
},
{
kind = DELETE
fields = [4, "D", 100]
}
]
}
}

transform {
CdcRow {
custom_field_name = "custom_name"
transform_type = FULL
result_table_name = "trans_result"
}
}

sink {
Console {
source_table_name = "custom_name"
}
}

```

2 changes: 1 addition & 1 deletion plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,4 @@ seatunnel.transform.Copy = seatunnel-transforms-v2
seatunnel.transform.DynamicCompile = seatunnel-transforms-v2
seatunnel.transform.LLM = seatunnel-transforms-v2
seatunnel.transform.Embedding = seatunnel-transforms-v2

seatunnel.transform.RowKindExtractor = seatunnel-transforms-v2
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.e2e.transform;

import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;

import java.io.IOException;

@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
disabledReason = "Currently SPARK not support adapt")
public class TestRowKindExtractorTransformIT extends TestSuiteBase {

@TestTemplate
public void testRowKindExtractorTransform(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult1 =
container.executeJob("/rowkind_Extractor_transform_case1.conf");
Assertions.assertEquals(0, execResult1.getExitCode());
Container.ExecResult execResult2 =
container.executeJob("/rowkind_Extractor_transform_case2.conf");
Assertions.assertEquals(0, execResult2.getExitCode());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
},
{
kind = INSERT
fields = [4, "D", 100]
},
{
kind = UPDATE_BEFORE
fields = [1, "A", 100]
},
{
kind = UPDATE_AFTER
fields = [1, "F", 100]
}
{
kind = UPDATE_BEFORE
fields = [2, "B", 100]
},
{
kind = UPDATE_AFTER
fields = [2, "G", 100]
},
{
kind = DELETE
fields = [3, "C", 100]
},
{
kind = DELETE
fields = [4, "D", 100]
}
]
}
}

transform {
CdcRow {
custom_field_name = "custom_name"
transform_type = FULL
result_table_name = "trans_result"
}
}

sink {
Assert {
source_table_name = "trans_result"
rules {
row_rules = [
{
rule_type = MAX_ROW
rule_value = 10
},
{
rule_type = MIN_ROW
rule_value = 10
}
]
field_rules = [
{
field_name = custom_name
field_type = string
field_value = [
{
rule_type = NOT_NULL
}
]
}
]
}
}
}
Loading

0 comments on commit 15395c2

Please sign in to comment.