Skip to content

Commit

Permalink
[cdc-composer][tests] Add integration tests for FlinkPipelineComposer (
Browse files Browse the repository at this point in the history
  • Loading branch information
PatrickRen authored and zhangchaoming.zcm committed Jan 3, 2025
1 parent 9655548 commit cc0e55a
Show file tree
Hide file tree
Showing 7 changed files with 337 additions and 59 deletions.
18 changes: 18 additions & 0 deletions flink-cdc-composer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,24 @@ under the License.
<artifactId>flink-cdc-runtime</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-cdc-pipeline-connector-values</artifactId>
<version>${revision}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public static FlinkPipelineComposer ofRemoteCluster(
}

public static FlinkPipelineComposer ofMiniCluster() {
return new FlinkPipelineComposer(StreamExecutionEnvironment.createLocalEnvironment(), true);
return new FlinkPipelineComposer(
StreamExecutionEnvironment.getExecutionEnvironment(), true);
}

private FlinkPipelineComposer(StreamExecutionEnvironment env, boolean isBlocking) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ public static <T extends Factory> T getFactoryByIdentifier(
if (factoryList.isEmpty()) {
throw new RuntimeException(
String.format(
"No factory found in the classpath.\n\n"
"Cannot find factory with identifier \"%s\" in the classpath.\n\n"
+ "Available factory classes are:\n\n"
+ "%s",
identifier,
StreamSupport.stream(loader.spliterator(), false)
.map(f -> f.getClass().getName())
.sorted()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververica.cdc.composer.flink;

import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.junit5.MiniClusterExtension;

import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.common.pipeline.PipelineOptions;
import com.ververica.cdc.composer.PipelineExecution;
import com.ververica.cdc.composer.definition.PipelineDef;
import com.ververica.cdc.composer.definition.SinkDef;
import com.ververica.cdc.composer.definition.SourceDef;
import com.ververica.cdc.connectors.values.ValuesDatabase;
import com.ververica.cdc.connectors.values.factory.ValuesDataFactory;
import com.ververica.cdc.connectors.values.sink.ValuesDataSinkOptions;
import com.ververica.cdc.connectors.values.source.ValuesDataSourceHelper;
import com.ververica.cdc.connectors.values.source.ValuesDataSourceOptions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.Collections;
import java.util.List;

import static com.ververica.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_1;
import static com.ververica.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_2;
import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL;
import static org.assertj.core.api.Assertions.assertThat;

/** Integration test for {@link FlinkPipelineComposer}. */
class FlinkPipelineComposerITCase {
private static final int MAX_PARALLELISM = 4;

// Always use parent-first classloader for CDC classes.
// The reason is that ValuesDatabase uses static field for holding data, we need to make sure
// the class is loaded by AppClassloader so that we can verify data in the test case.
private static final org.apache.flink.configuration.Configuration MINI_CLUSTER_CONFIG =
new org.apache.flink.configuration.Configuration();

static {
MINI_CLUSTER_CONFIG.set(
ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
Collections.singletonList("com.ververica.cdc"));
}

/**
* Use {@link MiniClusterExtension} to reduce the overhead of restarting the MiniCluster for
* every test case.
*/
@RegisterExtension
static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(MAX_PARALLELISM)
.setConfiguration(MINI_CLUSTER_CONFIG)
.build());

private final PrintStream standardOut = System.out;
private final ByteArrayOutputStream outCaptor = new ByteArrayOutputStream();

@BeforeEach
void init() {
// Take over STDOUT as we need to check the output of values sink
System.setOut(new PrintStream(outCaptor));
// Initialize in-memory database
ValuesDatabase.clear();
}

@AfterEach
void cleanup() {
System.setOut(standardOut);
}

@Test
void testSingleSplitSingleTable() throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

// Setup value source
Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE);
SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);

// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);

// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.GLOBAL_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
Collections.emptyList(),
Collections.emptyList(),
pipelineConfig);

// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();

// Check result in ValuesDatabase
List<String> results = ValuesDatabase.getResults(TABLE_1);
assertThat(results)
.contains(
"default_namespace.default_schema.table1:col1=2;newCol3=x",
"default_namespace.default_schema.table1:col1=3;newCol3=");

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existingColumn=null}]}",
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumns=[`newCol2` STRING]}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, ], after=[2, x], op=UPDATE, meta=()}");
}

@Test
void testSingleSplitMultipleTables() throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

// Setup value source
Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES);
SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);

// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);

// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.GLOBAL_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
Collections.emptyList(),
Collections.emptyList(),
pipelineConfig);

// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();

// Check result in ValuesDatabase
List<String> table1Results = ValuesDatabase.getResults(TABLE_1);
assertThat(table1Results)
.containsExactly(
"default_namespace.default_schema.table1:col1=2;newCol3=x",
"default_namespace.default_schema.table1:col1=3;newCol3=");
List<String> table2Results = ValuesDatabase.getResults(TABLE_2);
assertThat(table2Results)
.contains(
"default_namespace.default_schema.table2:col1=1;col2=1",
"default_namespace.default_schema.table2:col1=2;col2=2",
"default_namespace.default_schema.table2:col1=3;col2=3");

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existingColumn=null}]}",
"DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[3, 3], op=INSERT, meta=()}",
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumns=[`newCol2` STRING]}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}");
}

@Test
void testMultiSplitsSingleTable() throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

// Setup value source
Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.MULTI_SPLITS_SINGLE_TABLE);
SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);

// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);

// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.GLOBAL_PARALLELISM, MAX_PARALLELISM);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
Collections.emptyList(),
Collections.emptyList(),
pipelineConfig);

// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();

// Check result in ValuesDatabase
List<String> table1Results = ValuesDatabase.getResults(TABLE_1);
assertThat(table1Results)
.contains(
"default_namespace.default_schema.table1:col1=1;col2=1;col3=x",
"default_namespace.default_schema.table1:col1=3;col2=3;col3=x",
"default_namespace.default_schema.table1:col1=5;col2=5;col3=");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,8 @@ void getFactoryByIdentifier() {
() ->
FactoryDiscoveryUtils.getFactoryByIdentifier(
"data-sink-factory-3", Factory.class))
.hasMessage(
"No factory found in the classpath.\n"
+ "\n"
+ "Available factory classes are:\n"
+ "\n"
+ "com.ververica.cdc.composer.utils.factory.DataSinkFactory1\n"
+ "com.ververica.cdc.composer.utils.factory.DataSinkFactory2\n"
+ "com.ververica.cdc.composer.utils.factory.DataSourceFactory1\n"
+ "com.ververica.cdc.composer.utils.factory.DataSourceFactory2");
.hasMessageStartingWith(
"Cannot find factory with identifier \"data-sink-factory-3\" in the classpath");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ public ValuesSink(boolean materializedInMemory, boolean print) {

@Override
public SinkWriter<Event> createWriter(InitContext context) {
return new ValuesSinkWriter(materializedInMemory, print);
return new ValuesSinkWriter(
materializedInMemory,
print,
context.getSubtaskId(),
context.getNumberOfParallelSubtasks());
}
}

Expand All @@ -91,6 +95,10 @@ private static class ValuesSinkWriter implements SinkWriter<Event> {

private final boolean print;

private final int subtaskIndex;

private final int numSubtasks;

/**
* keep the relationship of TableId and Schema as write method may rely on the schema
* information of DataChangeEvent.
Expand All @@ -99,10 +107,13 @@ private static class ValuesSinkWriter implements SinkWriter<Event> {

private final Map<TableId, List<RecordData.FieldGetter>> fieldGetterMaps;

public ValuesSinkWriter(boolean materializedInMemory, boolean print) {
public ValuesSinkWriter(
boolean materializedInMemory, boolean print, int subtaskIndex, int numSubtasks) {
super();
this.materializedInMemory = materializedInMemory;
this.print = print;
this.subtaskIndex = subtaskIndex;
this.numSubtasks = numSubtasks;
schemaMaps = new HashMap<>();
fieldGetterMaps = new HashMap<>();
}
Expand Down Expand Up @@ -130,10 +141,13 @@ public void write(Event event, Context context) {
ValuesDatabase.applyDataChangeEvent((DataChangeEvent) event);
}
if (print) {
String prefix = numSubtasks > 1 ? subtaskIndex + "> " : "";
// print the detail message to console for verification.
System.out.println(
ValuesDataSinkHelper.convertEventToStr(
event, fieldGetterMaps.get(((ChangeEvent) event).tableId())));
prefix
+ ValuesDataSinkHelper.convertEventToStr(
event,
fieldGetterMaps.get(((ChangeEvent) event).tableId())));
}
}

Expand Down
Loading

0 comments on commit cc0e55a

Please sign in to comment.