diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/factory/ValuesDataFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/factory/ValuesDataFactory.java index a964080caa7..8752781e32f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/factory/ValuesDataFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/factory/ValuesDataFactory.java @@ -40,8 +40,8 @@ public class ValuesDataFactory implements DataSourceFactory, DataSinkFactory { @Override public DataSource createDataSource(Context context) { - ValuesDataSourceHelper.SourceEventType eventType = - context.getConfiguration().get(ValuesDataSourceOptions.SOURCE_EVENT_TYPE); + ValuesDataSourceHelper.EventSetId eventType = + context.getConfiguration().get(ValuesDataSourceOptions.EVENT_SET_ID); int failAtPos = context.getConfiguration().get(ValuesDataSourceOptions.FAILURE_INJECTION_INDEX); return new ValuesDataSource(eventType, failAtPos); @@ -66,7 +66,7 @@ public Set> requiredOptions() { @Override public Set> optionalOptions() { Set> options = new HashSet<>(); - options.add(ValuesDataSourceOptions.SOURCE_EVENT_TYPE); + options.add(ValuesDataSourceOptions.EVENT_SET_ID); options.add(ValuesDataSourceOptions.FAILURE_INJECTION_INDEX); options.add(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY); return options; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/source/ValuesDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/source/ValuesDataSource.java index bc2e9716056..57986fafa4a 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/source/ValuesDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/source/ValuesDataSource.java @@ -26,6 +26,7 @@ import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator; import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader; import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit; +import org.apache.flink.connector.base.source.hybrid.HybridSource; import org.apache.flink.core.io.InputStatus; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; @@ -54,25 +55,29 @@ public class ValuesDataSource implements DataSource { /** index of testCase for {@link ValuesDataSourceHelper}. */ - private final ValuesDataSourceHelper.SourceEventType eventType; + private final ValuesDataSourceHelper.EventSetId eventSetId; /** index for {@link EventIteratorReader} to fail when reading. */ private final int failAtPos; - public ValuesDataSource(ValuesDataSourceHelper.SourceEventType eventType) { - this.eventType = eventType; + public ValuesDataSource(ValuesDataSourceHelper.EventSetId eventSetId) { + this.eventSetId = eventSetId; this.failAtPos = Integer.MAX_VALUE; } - public ValuesDataSource(ValuesDataSourceHelper.SourceEventType eventType, int failAtPos) { - this.eventType = eventType; + public ValuesDataSource(ValuesDataSourceHelper.EventSetId eventSetId, int failAtPos) { + this.eventSetId = eventSetId; this.failAtPos = failAtPos; } @Override public EventSourceProvider getEventSourceProvider() { - ValuesDataSourceHelper.setSourceEvents(eventType); - return FlinkSourceProvider.of(new ValuesSource(failAtPos)); + ValuesDataSourceHelper.setSourceEvents(eventSetId); + HybridSource hybridSource = + HybridSource.builder(new ValuesSource(failAtPos, eventSetId, true)) + .addSource(new ValuesSource(failAtPos, eventSetId, false)) + .build(); + return FlinkSourceProvider.of(hybridSource); } @Override @@ -91,8 +96,18 @@ private static class ValuesSource private final int failAtPos; - public ValuesSource(int failAtPos) { + private final ValuesDataSourceHelper.EventSetId eventSetId; + + /** True this source is in snapshot stage, otherwise is in incremental stage. */ + private final boolean isInSnapshotPhase; + + public ValuesSource( + int failAtPos, + ValuesDataSourceHelper.EventSetId eventSetId, + boolean isInSnapshotPhase) { this.failAtPos = failAtPos; + this.eventSetId = eventSetId; + this.isInSnapshotPhase = isInSnapshotPhase; } @Override @@ -103,10 +118,17 @@ public Boundedness getBoundedness() { @Override public SplitEnumerator> createEnumerator( SplitEnumeratorContext enumContext) { + ValuesDataSourceHelper.setSourceEvents(eventSetId); Collection eventIteratorSplits = new ArrayList<>(); List> eventWithSplits = ValuesDataSourceHelper.getSourceEvents(); - for (int i = 0; i < eventWithSplits.size(); i++) { - eventIteratorSplits.add(new EventIteratorSplit(i, 0)); + // make the last EventIteratorSplit of eventWithSplits to be an incremental + // EventIteratorSplit. + if (isInSnapshotPhase) { + for (int i = 0; i < eventWithSplits.size() - 1; i++) { + eventIteratorSplits.add(new EventIteratorSplit(i, 0)); + } + } else { + eventIteratorSplits.add(new EventIteratorSplit(eventWithSplits.size() - 1, 0)); } return new IteratorSourceEnumerator<>(enumContext, eventIteratorSplits); } @@ -133,7 +155,7 @@ public SimpleVersionedSerializer getSplitSerializer() { @Override public SourceReader createReader( SourceReaderContext readerContext) { - return new EventIteratorReader(readerContext, failAtPos); + return new EventIteratorReader(readerContext, failAtPos, eventSetId); } private static void serializeEventIteratorSplit( @@ -238,11 +260,23 @@ private static class EventIteratorReader // position for this Split to fail private final int failAtPos; + private final ValuesDataSourceHelper.EventSetId eventSetId; + private int numberOfEventsEmit = 0; - public EventIteratorReader(SourceReaderContext context, int failAtPos) { + public EventIteratorReader( + SourceReaderContext context, + int failAtPos, + ValuesDataSourceHelper.EventSetId eventSetId) { super(context); this.failAtPos = failAtPos; + this.eventSetId = eventSetId; + } + + @Override + public void start() { + ValuesDataSourceHelper.setSourceEvents(eventSetId); + super.start(); } @Override diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/source/ValuesDataSourceHelper.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/source/ValuesDataSourceHelper.java index 033215cedb0..ab9546fb9cb 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/source/ValuesDataSourceHelper.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/source/ValuesDataSourceHelper.java @@ -37,16 +37,20 @@ import java.util.List; import java.util.Map; -/** A helper class for {@link ValuesDataSource} to build events of each split. */ +/** + * A helper class for {@link ValuesDataSource} to build events of each split. + * + *

the last list of getSourceEvents method is defined as the split for incremental stage. + */ public class ValuesDataSourceHelper { /** - * Different situations for creating sourceEvents, {@link - * ValuesDataSourceOptions#SOURCE_EVENT_TYPE}. + * Different situations for creating sourceEvents, {@link ValuesDataSourceOptions#EVENT_SET_ID}. */ - public enum SourceEventType { + public enum EventSetId { SINGLE_SPLIT_SINGLE_TABLE, SINGLE_SPLIT_MULTI_TABLES, + MULTI_SPLITS_SINGLE_TABLE, CUSTOM_SOURCE_EVENTS } @@ -62,8 +66,8 @@ public enum SourceEventType { public static List> getSourceEvents() { if (sourceEvents == null) { - throw new IllegalArgumentException( - "sourceEvents should be set by `setSourceEvents` method."); + // use default enum of SINGLE_SPLIT_SINGLE_TABLE + sourceEvents = singleSplitSingleTable(); } return sourceEvents; } @@ -74,19 +78,23 @@ public static void setSourceEvents(List> customSourceEvents) { } /** set sourceEvents using predefined events. */ - public static void setSourceEvents(SourceEventType eventType) { + public static void setSourceEvents(EventSetId eventType) { switch (eventType) { case SINGLE_SPLIT_SINGLE_TABLE: { sourceEvents = singleSplitSingleTable(); break; } - case SINGLE_SPLIT_MULTI_TABLES: { sourceEvents = singleSplitMultiTables(); break; } + case MULTI_SPLITS_SINGLE_TABLE: + { + sourceEvents = multiSplitsSingleTable(); + break; + } case CUSTOM_SOURCE_EVENTS: { break; @@ -320,4 +328,162 @@ private static List> singleSplitMultiTables() { eventOfSplits.add(split1); return eventOfSplits; } + + private static List> multiSplitsSingleTable() { + List> eventOfSplits = new ArrayList<>(); + List split1 = new ArrayList<>(); + // create table + Schema schema = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col2", DataTypes.STRING()) + .primaryKey("col1") + .build(); + CreateTableEvent createTableEvent = new CreateTableEvent(table1, schema); + split1.add(createTableEvent); + BinaryRecordDataGenerator generator = + new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING())); + + // create slit1 + DataChangeEvent insertEvent1 = + DataChangeEvent.insertEvent( + table1, + generator.generate( + new Object[] { + BinaryStringData.fromString("1"), + BinaryStringData.fromString("1") + })); + split1.add(insertEvent1); + DataChangeEvent insertEvent2 = + DataChangeEvent.insertEvent( + table1, + generator.generate( + new Object[] { + BinaryStringData.fromString("2"), + BinaryStringData.fromString("2") + })); + split1.add(insertEvent2); + eventOfSplits.add(split1); + + // create slit2 + List split2 = new ArrayList<>(); + split2.add(createTableEvent); + DataChangeEvent insertEvent3 = + DataChangeEvent.insertEvent( + table1, + generator.generate( + new Object[] { + BinaryStringData.fromString("3"), + BinaryStringData.fromString("3") + })); + split2.add(insertEvent3); + DataChangeEvent insertEvent4 = + DataChangeEvent.insertEvent( + table1, + generator.generate( + new Object[] { + BinaryStringData.fromString("4"), + BinaryStringData.fromString("4") + })); + split2.add(insertEvent4); + eventOfSplits.add(split2); + + // create slit3 + List split3 = new ArrayList<>(); + split3.add(createTableEvent); + DataChangeEvent insertEvent5 = + DataChangeEvent.insertEvent( + table1, + generator.generate( + new Object[] { + BinaryStringData.fromString("5"), + BinaryStringData.fromString("5") + })); + split3.add(insertEvent5); + DataChangeEvent insertEvent6 = + DataChangeEvent.insertEvent( + table1, + generator.generate( + new Object[] { + BinaryStringData.fromString("6"), + BinaryStringData.fromString("6") + })); + split3.add(insertEvent6); + eventOfSplits.add(split3); + + // create slit4 + List split4 = new ArrayList<>(); + split4.add(createTableEvent); + DataChangeEvent deleteEvent1 = + DataChangeEvent.deleteEvent( + table1, + generator.generate( + new Object[] { + BinaryStringData.fromString("2"), + BinaryStringData.fromString("2") + })); + split4.add(deleteEvent1); + DataChangeEvent deleteEvent2 = + DataChangeEvent.deleteEvent( + table1, + generator.generate( + new Object[] { + BinaryStringData.fromString("4"), + BinaryStringData.fromString("4") + })); + split4.add(deleteEvent2); + DataChangeEvent deleteEvent3 = + DataChangeEvent.deleteEvent( + table1, + generator.generate( + new Object[] { + BinaryStringData.fromString("6"), + BinaryStringData.fromString("6") + })); + split4.add(deleteEvent3); + AddColumnEvent.ColumnWithPosition columnWithPosition = + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("col3", DataTypes.STRING())); + AddColumnEvent addColumnEvent = + new AddColumnEvent(table1, Collections.singletonList(columnWithPosition)); + split4.add(addColumnEvent); + generator = + new BinaryRecordDataGenerator( + RowType.of(DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING())); + DataChangeEvent updateEvent1 = + DataChangeEvent.updateEvent( + table1, + generator.generate( + new Object[] { + BinaryStringData.fromString("1"), + BinaryStringData.fromString("1"), + BinaryStringData.fromString("") + }), + generator.generate( + new Object[] { + BinaryStringData.fromString("1"), + BinaryStringData.fromString("1"), + BinaryStringData.fromString("x") + })); + split4.add(updateEvent1); + DataChangeEvent updateEvent2 = + DataChangeEvent.updateEvent( + table1, + generator.generate( + new Object[] { + BinaryStringData.fromString("3"), + BinaryStringData.fromString("3"), + BinaryStringData.fromString("") + }), + generator.generate( + new Object[] { + BinaryStringData.fromString("3"), + BinaryStringData.fromString("3"), + BinaryStringData.fromString("x") + })); + split4.add(updateEvent2); + eventOfSplits.add(split4); + + return eventOfSplits; + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/source/ValuesDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/source/ValuesDataSourceOptions.java index 4a844e1ba8b..26ebc7d6804 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/source/ValuesDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/source/ValuesDataSourceOptions.java @@ -26,13 +26,14 @@ /** Configurations for {@link ValuesDataSource}. */ public class ValuesDataSourceOptions { - public static final ConfigOption SOURCE_EVENT_TYPE = - ConfigOptions.key("source.event.type") - .enumType(ValuesDataSourceHelper.SourceEventType.class) - .defaultValue(ValuesDataSourceHelper.SourceEventType.SINGLE_SPLIT_SINGLE_TABLE) + public static final ConfigOption EVENT_SET_ID = + ConfigOptions.key("event-set.id") + .enumType(ValuesDataSourceHelper.EventSetId.class) + .defaultValue(ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE) .withDescription( Description.builder() - .text("Type of creating source change events. ") + .text( + "Id for creating source change events from ValuesDataSourceHelper.EventSetId.") .linebreak() .add( ListElement.list( @@ -40,6 +41,8 @@ public class ValuesDataSourceOptions { "SINGLE_SPLIT_SINGLE_TABLE: Default and predetermined case. Creating schema changes of single table and put them into one split."), text( "SINGLE_SPLIT_MULTI_TABLES: A predetermined case. Creating schema changes of multiple tables and put them into one split."), + text( + "MULTI_SPLITS_SINGLE_TABLE: A predetermined case. Creating schema changes of single table and put them into multiple splits."), text( "CUSTOM_SOURCE_EVENTS: Passed change events by the user through calling `setSourceEvents` method."))) .build()); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/test/java/com/ververica/cdc/connectors/values/source/ValuesDataSourceHelperTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/test/java/com/ververica/cdc/connectors/values/source/ValuesDataSourceITCase.java similarity index 67% rename from flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/test/java/com/ververica/cdc/connectors/values/source/ValuesDataSourceHelperTest.java rename to flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/test/java/com/ververica/cdc/connectors/values/source/ValuesDataSourceITCase.java index 6b0f9de946a..321670339d5 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/test/java/com/ververica/cdc/connectors/values/source/ValuesDataSourceHelperTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/test/java/com/ververica/cdc/connectors/values/source/ValuesDataSourceITCase.java @@ -43,8 +43,11 @@ import java.util.ArrayList; import java.util.List; -/** Integration tests for different enumeration situations of {@link ValuesDataSourceHelper}. */ -public class ValuesDataSourceHelperTest { +/** + * Integration tests for {@link ValuesDataSource} in different enumeration situations of {@link + * ValuesDataSourceHelper}. + */ +public class ValuesDataSourceITCase { @Before public void before() { @@ -56,17 +59,13 @@ public void after() { ValuesDatabase.clear(); } - @Test - public void testSingleSplitSingleTable() throws Exception { + /** read Events from {@link ValuesDataSource} and apply the events to ValuesDatabase. */ + private void executeDataStreamJob(ValuesDataSourceHelper.EventSetId type) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(3000); env.setRestartStrategy(RestartStrategies.noRestart()); FlinkSourceProvider sourceProvider = - (FlinkSourceProvider) - new ValuesDataSource( - ValuesDataSourceHelper.SourceEventType - .SINGLE_SPLIT_SINGLE_TABLE) - .getEventSourceProvider(); + (FlinkSourceProvider) new ValuesDataSource(type).getEventSourceProvider(); CloseableIterator events = env.fromSource( sourceProvider.getSource(), @@ -82,7 +81,11 @@ public void testSingleSplitSingleTable() throws Exception { ValuesDatabase.applySchemaChangeEvent((SchemaChangeEvent) event); } }); + } + @Test + public void testSingleSplitSingleTable() throws Exception { + executeDataStreamJob(ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE); List results = new ArrayList<>(); results.add("default.default.table1:col1=2;newCol3=x"); results.add("default.default.table1:col1=3;newCol3="); @@ -92,31 +95,7 @@ public void testSingleSplitSingleTable() throws Exception { @Test public void testSingleSplitMultiTables() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.enableCheckpointing(3000); - env.setRestartStrategy(RestartStrategies.noRestart()); - FlinkSourceProvider sourceProvider = - (FlinkSourceProvider) - new ValuesDataSource( - ValuesDataSourceHelper.SourceEventType - .SINGLE_SPLIT_MULTI_TABLES) - .getEventSourceProvider(); - CloseableIterator events = - env.fromSource( - sourceProvider.getSource(), - WatermarkStrategy.noWatermarks(), - ValuesDataFactory.IDENTIFIER, - new EventTypeInfo()) - .executeAndCollect(); - events.forEachRemaining( - (event) -> { - if (event instanceof DataChangeEvent) { - ValuesDatabase.applyDataChangeEvent((DataChangeEvent) event); - } else if (event instanceof SchemaChangeEvent) { - ValuesDatabase.applySchemaChangeEvent((SchemaChangeEvent) event); - } - }); - + executeDataStreamJob(ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES); List results = new ArrayList<>(); results.add("default.default.table1:col1=2;newCol3=x"); results.add("default.default.table1:col1=3;newCol3="); @@ -133,10 +112,6 @@ public void testSingleSplitMultiTables() throws Exception { @Test public void testCustomSourceEvents() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.enableCheckpointing(3000); - env.setRestartStrategy(RestartStrategies.noRestart()); - List> splits = new ArrayList<>(); List split1 = new ArrayList<>(); TableId table1 = TableId.tableId("default", "default", "table1"); @@ -146,11 +121,10 @@ public void testCustomSourceEvents() throws Exception { .physicalColumn("col2", DataTypes.STRING()) .primaryKey("col1") .build(); - CreateTableEvent createTableEvent = new CreateTableEvent(table1, schema); - split1.add(createTableEvent); - BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING())); + CreateTableEvent createTableEvent = new CreateTableEvent(table1, schema); + split1.add(createTableEvent); DataChangeEvent insertEvent1 = DataChangeEvent.insertEvent( table1, @@ -171,27 +145,7 @@ public void testCustomSourceEvents() throws Exception { split1.add(insertEvent2); splits.add(split1); ValuesDataSourceHelper.setSourceEvents(splits); - - FlinkSourceProvider sourceProvider = - (FlinkSourceProvider) - new ValuesDataSource( - ValuesDataSourceHelper.SourceEventType.CUSTOM_SOURCE_EVENTS) - .getEventSourceProvider(); - CloseableIterator events = - env.fromSource( - sourceProvider.getSource(), - WatermarkStrategy.noWatermarks(), - ValuesDataFactory.IDENTIFIER, - new EventTypeInfo()) - .executeAndCollect(); - events.forEachRemaining( - (event) -> { - if (event instanceof DataChangeEvent) { - ValuesDatabase.applyDataChangeEvent((DataChangeEvent) event); - } else if (event instanceof SchemaChangeEvent) { - ValuesDatabase.applySchemaChangeEvent((SchemaChangeEvent) event); - } - }); + executeDataStreamJob(ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS); List results = new ArrayList<>(); results.add("default.default.table1:col1=1;col2=1"); @@ -199,4 +153,15 @@ public void testCustomSourceEvents() throws Exception { Assert.assertEquals( results, ValuesDatabase.getResults(TableId.parse("default.default.table1"))); } + + @Test + public void testMultiSplitsSingleTable() throws Exception { + executeDataStreamJob(ValuesDataSourceHelper.EventSetId.MULTI_SPLITS_SINGLE_TABLE); + List results = new ArrayList<>(); + results.add("default.default.table1:col1=1;col2=1;col3=x"); + results.add("default.default.table1:col1=3;col2=3;col3=x"); + results.add("default.default.table1:col1=5;col2=5;col3="); + Assert.assertEquals( + results, ValuesDatabase.getResults(TableId.parse("default.default.table1"))); + } }