diff --git a/docs/en/transform-v2/rowkind-extractor.md b/docs/en/transform-v2/rowkind-extractor.md
new file mode 100644
index 00000000000..fdd1f411a8d
--- /dev/null
+++ b/docs/en/transform-v2/rowkind-extractor.md
@@ -0,0 +1,113 @@
+# RowKindExtractor
+
+> RowKindExtractor transform plugin
+
+## Description
+
+transform cdc row to append only row that contains the cdc RowKind.
+Example:
+CDC row: -D 1, test1, test2
+transformed Row: +I 1,test1,test2,DELETE
+
+## Options
+
+| name | type | required | default value |
+|-------------------|--------|----------|---------------|
+| custom_field_name | string | yes | row_kind |
+| transform_type | enum | yes | SHORT |
+
+### custom_field_name [string]
+
+Custom field name of the RowKind field
+
+### transform_type [enum]
+
+the RowKind field value formatting , the option can be `SHORT` or `FULL`
+
+`SHORT` : +I, -U , +U, -D
+`FULL` : INSERT, UPDATE_BEFORE, UPDATE_AFTER , DELETE
+
+## Examples
+
+
+```yaml
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = INSERT
+ fields = [4, "D", 100]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, "A", 100]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, "F", 100]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = [2, "B", 100]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [2, "G", 100]
+ },
+ {
+ kind = DELETE
+ fields = [3, "C", 100]
+ },
+ {
+ kind = DELETE
+ fields = [4, "D", 100]
+ }
+ ]
+ }
+}
+
+transform {
+ RowKindExtractor {
+ custom_field_name = "custom_name"
+ transform_type = FULL
+ result_table_name = "trans_result"
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "custom_name"
+ }
+}
+
+```
+
diff --git a/docs/zh/transform-v2/rowkind-extractor.md b/docs/zh/transform-v2/rowkind-extractor.md
new file mode 100644
index 00000000000..23494500058
--- /dev/null
+++ b/docs/zh/transform-v2/rowkind-extractor.md
@@ -0,0 +1,112 @@
+# RowKindExtractor
+
+> RowKindExtractor transform plugin
+
+## Description
+
+将CDC Row 转换为 Append only Row, 转换后的行扩展了RowKind字段
+Example:
+CDC row: -D 1, test1, test2
+transformed Row: +I 1,test1,test2,DELETE
+
+## Options
+
+| name | type | required | default value |
+|-------------------|--------|----------|---------------|
+| custom_field_name | string | yes | row_kind |
+| transform_type | enum | yes | SHORT |
+
+### custom_field_name [string]
+
+RowKind列的自定义名
+
+### transform_type [enum]
+
+格式化RowKind值 , 配置为 `SHORT` 或 `FULL`
+
+`SHORT` : +I, -U , +U, -D
+`FULL` : INSERT, UPDATE_BEFORE, UPDATE_AFTER , DELETE
+
+## Examples
+
+```yaml
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = INSERT
+ fields = [4, "D", 100]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, "A", 100]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, "F", 100]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = [2, "B", 100]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [2, "G", 100]
+ },
+ {
+ kind = DELETE
+ fields = [3, "C", 100]
+ },
+ {
+ kind = DELETE
+ fields = [4, "D", 100]
+ }
+ ]
+ }
+}
+
+transform {
+ RowKindExtractor {
+ custom_field_name = "custom_name"
+ transform_type = FULL
+ result_table_name = "trans_result"
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "custom_name"
+ }
+}
+
+```
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 86c95bc3e22..e314ef86613 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -135,9 +135,11 @@ seatunnel.sink.ActiveMQ = connector-activemq
seatunnel.source.Qdrant = connector-qdrant
seatunnel.sink.Qdrant = connector-qdrant
seatunnel.source.Sls = connector-sls
+seatunnel.sink.Sls = connector-sls
seatunnel.source.Typesense = connector-typesense
seatunnel.sink.Typesense = connector-typesense
seatunnel.source.Opengauss-CDC = connector-cdc-opengauss
+
seatunnel.transform.Sql = seatunnel-transforms-v2
seatunnel.transform.FieldMapper = seatunnel-transforms-v2
seatunnel.transform.Filter = seatunnel-transforms-v2
@@ -149,5 +151,4 @@ seatunnel.transform.Copy = seatunnel-transforms-v2
seatunnel.transform.DynamicCompile = seatunnel-transforms-v2
seatunnel.transform.LLM = seatunnel-transforms-v2
seatunnel.transform.Embedding = seatunnel-transforms-v2
-seatunnel.sink.Sls = connector-sls
-
+seatunnel.transform.RowKindExtractor = seatunnel-transforms-v2
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestRowKindExtractorTransformIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestRowKindExtractorTransformIT.java
new file mode 100644
index 00000000000..7f4c3fd6742
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestRowKindExtractorTransformIT.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.transform;
+
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class TestRowKindExtractorTransformIT extends TestSuiteBase {
+
+ @TestTemplate
+ public void testRowKindExtractorTransform(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult1 =
+ container.executeJob("/rowkind_extractor_transform_case1.conf");
+ Assertions.assertEquals(0, execResult1.getExitCode());
+ Container.ExecResult execResult2 =
+ container.executeJob("/rowkind_extractor_transform_case2.conf");
+ Assertions.assertEquals(0, execResult2.getExitCode());
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/rowkind_extractor_transform_case1.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/rowkind_extractor_transform_case1.conf
new file mode 100644
index 00000000000..fd9efaafa66
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/rowkind_extractor_transform_case1.conf
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = INSERT
+ fields = [4, "D", 100]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, "A", 100]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, "F", 100]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = [2, "B", 100]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [2, "G", 100]
+ },
+ {
+ kind = DELETE
+ fields = [3, "C", 100]
+ },
+ {
+ kind = DELETE
+ fields = [4, "D", 100]
+ }
+ ]
+ }
+}
+
+transform {
+ RowKindExtractor {
+ custom_field_name = "custom_name"
+ transform_type = FULL
+ result_table_name = "trans_result"
+ }
+}
+
+sink {
+ Assert {
+ source_table_name = "trans_result"
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 10
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 10
+ }
+ ]
+ field_rules = [
+ {
+ field_name = custom_name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/rowkind_extractor_transform_case2.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/rowkind_extractor_transform_case2.conf
new file mode 100644
index 00000000000..fc7c342c5ed
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/rowkind_extractor_transform_case2.conf
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = INSERT
+ fields = [4, "D", 100]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, "A", 100]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, "F", 100]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = [2, "B", 100]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [2, "G", 100]
+ },
+ {
+ kind = DELETE
+ fields = [3, "C", 100]
+ },
+ {
+ kind = DELETE
+ fields = [4, "D", 100]
+ }
+ ]
+ }
+}
+
+transform {
+ RowKindExtractor {
+ transform_type = SHORT
+ result_table_name = "trans_result"
+ }
+}
+
+sink {
+ Assert {
+ source_table_name = "trans_result"
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 10
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 10
+ }
+ ]
+ field_rules = [
+ {
+ field_name = row_kind
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
index c0ba3d1ca9a..13d25989aca 100644
--- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java
@@ -137,4 +137,12 @@ protected TableIdentifier transformTableIdentifier() {
}
protected abstract Column getOutputColumn();
+
+ public int getFieldIndex() {
+ return fieldIndex;
+ }
+
+ public SeaTunnelRowContainerGenerator getRowContainerGenerator() {
+ return rowContainerGenerator;
+ }
}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransform.java
new file mode 100644
index 00000000000..ae7b1fcaa18
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransform.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rowkind;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;
+
+import com.google.common.annotations.VisibleForTesting;
+import lombok.NonNull;
+
+import java.util.Arrays;
+
+public class RowKindExtractorTransform extends SingleFieldOutputTransform {
+
+ private final ReadonlyConfig config;
+
+ private final RowKindExtractorTransformType transformType;
+
+ public RowKindExtractorTransform(
+ @NonNull ReadonlyConfig config, @NonNull CatalogTable inputCatalogTable) {
+ super(inputCatalogTable);
+ this.config = config;
+ this.transformType = config.get(RowKindExtractorTransformConfig.TRANSFORM_TYPE);
+ }
+
+ @Override
+ public String getPluginName() {
+ return RowKindExtractorTransformConfig.PLUGIN_NAME;
+ }
+
+ @Override
+ protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
+ Object fieldValue = getOutputFieldValue(new SeaTunnelRowAccessor(inputRow));
+ inputRow.setRowKind(RowKind.INSERT);
+ SeaTunnelRow outputRow = getRowContainerGenerator().apply(inputRow);
+ outputRow.setField(getFieldIndex(), fieldValue);
+ return outputRow;
+ }
+
+ @Override
+ protected Object getOutputFieldValue(SeaTunnelRowAccessor inputRow) {
+ switch (transformType) {
+ case SHORT:
+ return inputRow.getRowKind().shortString();
+ case FULL:
+ return inputRow.getRowKind().name();
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupported transform type %s", transformType));
+ }
+ }
+
+ @Override
+ protected Column getOutputColumn() {
+ String customFieldName = config.get(RowKindExtractorTransformConfig.CUSTOM_FIELD_NAME);
+ String[] fieldNames = inputCatalogTable.getTableSchema().getFieldNames();
+ boolean isExist = Arrays.asList(fieldNames).contains(customFieldName);
+ if (isExist) {
+ throw new IllegalArgumentException(
+ String.format("field name %s already exists", customFieldName));
+ }
+ return PhysicalColumn.of(
+ customFieldName,
+ BasicType.STRING_TYPE,
+ 13L,
+ false,
+ RowKind.INSERT.shortString(),
+ "Output column of RowKind");
+ }
+
+ @VisibleForTesting
+ public void initRowContainerGenerator() {
+ transformTableSchema();
+ }
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransformConfig.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransformConfig.java
new file mode 100644
index 00000000000..017e6292a92
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransformConfig.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rowkind;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+
+@Getter
+@Setter
+public class RowKindExtractorTransformConfig implements Serializable {
+
+ public static final String PLUGIN_NAME = "RowKindExtractor";
+
+ public static final Option CUSTOM_FIELD_NAME =
+ Options.key("custom_field_name")
+ .stringType()
+ .defaultValue("row_kind")
+ .withDescription("Custom field name of the RowKind field");
+
+ public static final Option TRANSFORM_TYPE =
+ Options.key("transform_type")
+ .enumType(RowKindExtractorTransformType.class)
+ .defaultValue(RowKindExtractorTransformType.SHORT)
+ .withDescription("transform RowKind field value format");
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransformFactory.java
new file mode 100644
index 00000000000..7228336da78
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransformFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rowkind;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableTransform;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class RowKindExtractorTransformFactory implements TableTransformFactory {
+ @Override
+ public String factoryIdentifier() {
+ return RowKindExtractorTransformConfig.PLUGIN_NAME;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(RowKindExtractorTransformConfig.CUSTOM_FIELD_NAME)
+ .build();
+ }
+
+ @Override
+ public TableTransform createTransform(TableTransformFactoryContext context) {
+ CatalogTable catalogTable = context.getCatalogTables().get(0);
+ return () -> new RowKindExtractorTransform(context.getOptions(), catalogTable);
+ }
+}
diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransformType.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransformType.java
new file mode 100644
index 00000000000..f3eb39d1fe0
--- /dev/null
+++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransformType.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rowkind;
+
+public enum RowKindExtractorTransformType {
+ SHORT,
+ FULL
+}
diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/RowKindExtractorTransformFactoryTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/RowKindExtractorTransformFactoryTest.java
new file mode 100644
index 00000000000..fcc73cc810a
--- /dev/null
+++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/RowKindExtractorTransformFactoryTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform;
+
+import org.apache.seatunnel.transform.rowkind.RowKindExtractorTransformFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class RowKindExtractorTransformFactoryTest {
+
+ @Test
+ public void testOptionRule() throws Exception {
+ RowKindExtractorTransformFactory replaceTransformFactory =
+ new RowKindExtractorTransformFactory();
+ Assertions.assertNotNull(replaceTransformFactory.optionRule());
+ }
+}
diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransformTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransformTest.java
new file mode 100644
index 00000000000..b958dab7d04
--- /dev/null
+++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransformTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.rowkind;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+class RowKindExtractorTransformTest {
+
+ static CatalogTable catalogTable;
+
+ static Object[] values;
+
+ static SeaTunnelRow inputRow;
+
+ @BeforeAll
+ static void setUp() {
+ catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of("catalog", TablePath.DEFAULT),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "key1",
+ BasicType.STRING_TYPE,
+ 1L,
+ Boolean.FALSE,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "key2",
+ BasicType.INT_TYPE,
+ 1L,
+ Boolean.FALSE,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "key3",
+ BasicType.LONG_TYPE,
+ 1L,
+ Boolean.FALSE,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "key4",
+ BasicType.DOUBLE_TYPE,
+ 1L,
+ Boolean.FALSE,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "key5",
+ BasicType.FLOAT_TYPE,
+ 1L,
+ Boolean.FALSE,
+ null,
+ null))
+ .build(),
+ new HashMap<>(),
+ new ArrayList<>(),
+ "comment");
+ values = new Object[] {"value1", 1, 896657703886127105L, 3.1415916, 3.14};
+ inputRow = new SeaTunnelRow(values);
+ }
+
+ @Test
+ void testCdcRowTransformShort() {
+ RowKindExtractorTransform rowKindExtractorTransform =
+ new RowKindExtractorTransform(
+ ReadonlyConfig.fromMap(new HashMap<>()), catalogTable);
+ rowKindExtractorTransform.initRowContainerGenerator();
+ SeaTunnelRow insertRow = inputRow.copy();
+ Assertions.assertEquals(
+ "SeaTunnelRow{tableId=, kind=+I, fields=[value1, 1, 896657703886127105, 3.1415916, 3.14, +I]}",
+ rowKindExtractorTransform.transformRow(insertRow).toString());
+ SeaTunnelRow updateBeforeRow = inputRow.copy();
+ updateBeforeRow.setRowKind(RowKind.UPDATE_BEFORE);
+ Assertions.assertEquals(
+ "SeaTunnelRow{tableId=, kind=+I, fields=[value1, 1, 896657703886127105, 3.1415916, 3.14, -U]}",
+ rowKindExtractorTransform.transformRow(updateBeforeRow).toString());
+ SeaTunnelRow updateAfterRow = inputRow.copy();
+ updateAfterRow.setRowKind(RowKind.UPDATE_AFTER);
+ Assertions.assertEquals(
+ "SeaTunnelRow{tableId=, kind=+I, fields=[value1, 1, 896657703886127105, 3.1415916, 3.14, +U]}",
+ rowKindExtractorTransform.transformRow(updateAfterRow).toString());
+ SeaTunnelRow deleteRow = inputRow.copy();
+ deleteRow.setRowKind(RowKind.DELETE);
+ Assertions.assertEquals(
+ "SeaTunnelRow{tableId=, kind=+I, fields=[value1, 1, 896657703886127105, 3.1415916, 3.14, -D]}",
+ rowKindExtractorTransform.transformRow(deleteRow).toString());
+ }
+
+ @Test
+ void testCdcRowTransformFull() {
+ HashMap conf = new HashMap<>();
+ conf.put("transform_type", "FULL");
+ RowKindExtractorTransform rowKindExtractorTransform =
+ new RowKindExtractorTransform(ReadonlyConfig.fromMap(conf), catalogTable);
+ rowKindExtractorTransform.initRowContainerGenerator();
+ SeaTunnelRow insertRow = inputRow.copy();
+ Assertions.assertEquals(
+ "SeaTunnelRow{tableId=, kind=+I, fields=[value1, 1, 896657703886127105, 3.1415916, 3.14, INSERT]}",
+ rowKindExtractorTransform.transformRow(insertRow).toString());
+ SeaTunnelRow updateBeforeRow = inputRow.copy();
+ updateBeforeRow.setRowKind(RowKind.UPDATE_BEFORE);
+ Assertions.assertEquals(
+ "SeaTunnelRow{tableId=, kind=+I, fields=[value1, 1, 896657703886127105, 3.1415916, 3.14, UPDATE_BEFORE]}",
+ rowKindExtractorTransform.transformRow(updateBeforeRow).toString());
+ SeaTunnelRow updateAfterRow = inputRow.copy();
+ updateAfterRow.setRowKind(RowKind.UPDATE_AFTER);
+ Assertions.assertEquals(
+ "SeaTunnelRow{tableId=, kind=+I, fields=[value1, 1, 896657703886127105, 3.1415916, 3.14, UPDATE_AFTER]}",
+ rowKindExtractorTransform.transformRow(updateAfterRow).toString());
+ SeaTunnelRow deleteRow = inputRow.copy();
+ deleteRow.setRowKind(RowKind.DELETE);
+ Assertions.assertEquals(
+ "SeaTunnelRow{tableId=, kind=+I, fields=[value1, 1, 896657703886127105, 3.1415916, 3.14, DELETE]}",
+ rowKindExtractorTransform.transformRow(deleteRow).toString());
+ }
+}