Skip to content

Commit

Permalink
[Feature][transforms-v2] Support append only stream from cdc source
Browse files Browse the repository at this point in the history
  • Loading branch information
hawk9821 committed Sep 28, 2024
1 parent a95be09 commit ddc4331
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,9 @@ public class TestCdcRowTransformIT extends TestSuiteBase {
@TestTemplate
public void testCdcRowTransform(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/cdc_row_transform_case1.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertTrue(
execResult
.getStdout()
.contains(
"subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId=default.default.default SeaTunnelRow#kind=INSERT : 1, A, 100, INSERT"));
Container.ExecResult execResult1 = container.executeJob("/cdc_row_transform_case1.conf");
Assertions.assertEquals(0, execResult1.getExitCode());
Container.ExecResult execResult2 = container.executeJob("/cdc_row_transform_case2.conf");
Assertions.assertEquals(0, execResult2.getExitCode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,3 @@ sink {
}
}
}

sink {
Console {
source_table_name = "trans_result"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,3 @@ sink {
}
}
}

sink {
Console {
source_table_name = "trans_result"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
import org.apache.seatunnel.transform.common.SingleFieldOutputTransform;

import com.google.common.annotations.VisibleForTesting;
import lombok.NonNull;

import java.util.Arrays;
Expand Down Expand Up @@ -83,4 +84,9 @@ protected Column getOutputColumn() {
return PhysicalColumn.of(
customFieldName, BasicType.STRING_TYPE, (Long) null, true, null, null);
}

@VisibleForTesting
public void initRowContainerGenerator() {
transformTableSchema();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.transform;

import org.apache.seatunnel.transform.cdc.CdcRowTransformFactory;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class CdcRowTransformFactoryTest {

@Test
public void testOptionRule() throws Exception {
CdcRowTransformFactory replaceTransformFactory = new CdcRowTransformFactory();
Assertions.assertNotNull(replaceTransformFactory.optionRule());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.HashMap;

public class CdcRowTransformTest {
class CdcRowTransformTest {

static CatalogTable catalogTable;

Expand Down Expand Up @@ -95,13 +97,56 @@ static void setUp() {
}

@Test
void testConfig() {
// test not set
void testCdcRowTransformShort() {
CdcRowTransform cdcRowTransform =
new CdcRowTransform(ReadonlyConfig.fromMap(new HashMap<>()), catalogTable);
cdcRowTransform.initRowContainerGenerator();
SeaTunnelRow insertRow = inputRow.copy();
Assertions.assertEquals(
"SeaTunnelRow{tableId=, kind=+I, fields=[value1, value2, value3, value4, value5, +I]}",
cdcRowTransform.transformRow(insertRow).toString());
SeaTunnelRow updateBeforeRow = inputRow.copy();
updateBeforeRow.setRowKind(RowKind.UPDATE_BEFORE);
Assertions.assertEquals(
"SeaTunnelRow{tableId=, kind=+I, fields=[value1, value2, value3, value4, value5, -U]}",
cdcRowTransform.transformRow(updateBeforeRow).toString());
SeaTunnelRow updateAfterRow = inputRow.copy();
updateAfterRow.setRowKind(RowKind.UPDATE_AFTER);
Assertions.assertEquals(
"SeaTunnelRow{tableId=, kind=+I, fields=[value1, value2, value3, value4, value5, +U]}",
cdcRowTransform.transformRow(updateAfterRow).toString());
SeaTunnelRow deleteRow = inputRow.copy();
deleteRow.setRowKind(RowKind.DELETE);
Assertions.assertEquals(
"SeaTunnelRow{tableId=, kind=+I, fields=[value1, value2, value3, value4, value5, -D]}",
cdcRowTransform.transformRow(deleteRow).toString());
}

@Test
void testCdcRowTransformFull() {
HashMap<String, Object> conf = new HashMap<>();
// conf.put("custom_field_name","custom_field_name");
conf.put("transform_type", "FULL");
CdcRowTransform cdcRowTransform =
new CdcRowTransform(ReadonlyConfig.fromMap(new HashMap<>(conf)), catalogTable);
SeaTunnelRow map = cdcRowTransform.map(inputRow);
System.out.println(map);
new CdcRowTransform(ReadonlyConfig.fromMap(conf), catalogTable);
cdcRowTransform.initRowContainerGenerator();
SeaTunnelRow insertRow = inputRow.copy();
Assertions.assertEquals(
"SeaTunnelRow{tableId=, kind=+I, fields=[value1, value2, value3, value4, value5, INSERT]}",
cdcRowTransform.transformRow(insertRow).toString());
SeaTunnelRow updateBeforeRow = inputRow.copy();
updateBeforeRow.setRowKind(RowKind.UPDATE_BEFORE);
Assertions.assertEquals(
"SeaTunnelRow{tableId=, kind=+I, fields=[value1, value2, value3, value4, value5, UPDATE_BEFORE]}",
cdcRowTransform.transformRow(updateBeforeRow).toString());
SeaTunnelRow updateAfterRow = inputRow.copy();
updateAfterRow.setRowKind(RowKind.UPDATE_AFTER);
Assertions.assertEquals(
"SeaTunnelRow{tableId=, kind=+I, fields=[value1, value2, value3, value4, value5, UPDATE_AFTER]}",
cdcRowTransform.transformRow(updateAfterRow).toString());
SeaTunnelRow deleteRow = inputRow.copy();
deleteRow.setRowKind(RowKind.DELETE);
Assertions.assertEquals(
"SeaTunnelRow{tableId=, kind=+I, fields=[value1, value2, value3, value4, value5, DELETE]}",
cdcRowTransform.transformRow(deleteRow).toString());
}
}

0 comments on commit ddc4331

Please sign in to comment.