Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cdc-pipeline-connector] add MULTI_SPLITS_SINGLE_TABLE to provide multiple splits case of ValuesDataSourceHelper. #2737

Merged
merged 2 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public class ValuesDataFactory implements DataSourceFactory, DataSinkFactory {

@Override
public DataSource createDataSource(Context context) {
ValuesDataSourceHelper.SourceEventType eventType =
context.getConfiguration().get(ValuesDataSourceOptions.SOURCE_EVENT_TYPE);
ValuesDataSourceHelper.EventSetId eventType =
context.getConfiguration().get(ValuesDataSourceOptions.EVENT_SET_ID);
int failAtPos =
context.getConfiguration().get(ValuesDataSourceOptions.FAILURE_INJECTION_INDEX);
return new ValuesDataSource(eventType, failAtPos);
Expand All @@ -66,7 +66,7 @@ public Set<ConfigOption<?>> requiredOptions() {
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(ValuesDataSourceOptions.SOURCE_EVENT_TYPE);
options.add(ValuesDataSourceOptions.EVENT_SET_ID);
options.add(ValuesDataSourceOptions.FAILURE_INJECTION_INDEX);
options.add(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY);
return options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
Expand Down Expand Up @@ -54,25 +55,29 @@
public class ValuesDataSource implements DataSource {

/** index of testCase for {@link ValuesDataSourceHelper}. */
private final ValuesDataSourceHelper.SourceEventType eventType;
private final ValuesDataSourceHelper.EventSetId eventSetId;

/** index for {@link EventIteratorReader} to fail when reading. */
private final int failAtPos;

public ValuesDataSource(ValuesDataSourceHelper.SourceEventType eventType) {
this.eventType = eventType;
public ValuesDataSource(ValuesDataSourceHelper.EventSetId eventSetId) {
this.eventSetId = eventSetId;
this.failAtPos = Integer.MAX_VALUE;
}

public ValuesDataSource(ValuesDataSourceHelper.SourceEventType eventType, int failAtPos) {
this.eventType = eventType;
public ValuesDataSource(ValuesDataSourceHelper.EventSetId eventSetId, int failAtPos) {
this.eventSetId = eventSetId;
this.failAtPos = failAtPos;
}

@Override
public EventSourceProvider getEventSourceProvider() {
ValuesDataSourceHelper.setSourceEvents(eventType);
return FlinkSourceProvider.of(new ValuesSource(failAtPos));
ValuesDataSourceHelper.setSourceEvents(eventSetId);
HybridSource<Event> hybridSource =
HybridSource.builder(new ValuesSource(failAtPos, eventSetId, true))
.addSource(new ValuesSource(failAtPos, eventSetId, false))
.build();
return FlinkSourceProvider.of(hybridSource);
}

@Override
Expand All @@ -91,8 +96,18 @@ private static class ValuesSource

private final int failAtPos;

public ValuesSource(int failAtPos) {
private final ValuesDataSourceHelper.EventSetId eventSetId;

/** True this source is in snapshot stage, otherwise is in incremental stage. */
private final boolean isInSnapshotPhase;

public ValuesSource(
int failAtPos,
ValuesDataSourceHelper.EventSetId eventSetId,
boolean isInSnapshotPhase) {
this.failAtPos = failAtPos;
this.eventSetId = eventSetId;
this.isInSnapshotPhase = isInSnapshotPhase;
}

@Override
Expand All @@ -103,10 +118,17 @@ public Boundedness getBoundedness() {
@Override
public SplitEnumerator<EventIteratorSplit, Collection<EventIteratorSplit>> createEnumerator(
SplitEnumeratorContext<EventIteratorSplit> enumContext) {
ValuesDataSourceHelper.setSourceEvents(eventSetId);
Collection<EventIteratorSplit> eventIteratorSplits = new ArrayList<>();
List<List<Event>> eventWithSplits = ValuesDataSourceHelper.getSourceEvents();
for (int i = 0; i < eventWithSplits.size(); i++) {
eventIteratorSplits.add(new EventIteratorSplit(i, 0));
// make the last EventIteratorSplit of eventWithSplits to be an incremental
// EventIteratorSplit.
if (isInSnapshotPhase) {
for (int i = 0; i < eventWithSplits.size() - 1; i++) {
eventIteratorSplits.add(new EventIteratorSplit(i, 0));
}
} else {
eventIteratorSplits.add(new EventIteratorSplit(eventWithSplits.size() - 1, 0));
}
return new IteratorSourceEnumerator<>(enumContext, eventIteratorSplits);
}
Expand All @@ -133,7 +155,7 @@ public SimpleVersionedSerializer<EventIteratorSplit> getSplitSerializer() {
@Override
public SourceReader<Event, EventIteratorSplit> createReader(
SourceReaderContext readerContext) {
return new EventIteratorReader(readerContext, failAtPos);
return new EventIteratorReader(readerContext, failAtPos, eventSetId);
}

private static void serializeEventIteratorSplit(
Expand Down Expand Up @@ -238,11 +260,23 @@ private static class EventIteratorReader
// position for this Split to fail
private final int failAtPos;

private final ValuesDataSourceHelper.EventSetId eventSetId;

private int numberOfEventsEmit = 0;

public EventIteratorReader(SourceReaderContext context, int failAtPos) {
public EventIteratorReader(
SourceReaderContext context,
int failAtPos,
ValuesDataSourceHelper.EventSetId eventSetId) {
super(context);
this.failAtPos = failAtPos;
this.eventSetId = eventSetId;
}

@Override
public void start() {
ValuesDataSourceHelper.setSourceEvents(eventSetId);
super.start();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,20 @@
import java.util.List;
import java.util.Map;

/** A helper class for {@link ValuesDataSource} to build events of each split. */
/**
* A helper class for {@link ValuesDataSource} to build events of each split.
*
* <p>the last list of getSourceEvents method is defined as the split for incremental stage.
*/
public class ValuesDataSourceHelper {

/**
* Different situations for creating sourceEvents, {@link
* ValuesDataSourceOptions#SOURCE_EVENT_TYPE}.
* Different situations for creating sourceEvents, {@link ValuesDataSourceOptions#EVENT_SET_ID}.
*/
public enum SourceEventType {
public enum EventSetId {
SINGLE_SPLIT_SINGLE_TABLE,
SINGLE_SPLIT_MULTI_TABLES,
MULTI_SPLITS_SINGLE_TABLE,
CUSTOM_SOURCE_EVENTS
}

Expand All @@ -62,8 +66,8 @@ public enum SourceEventType {

public static List<List<Event>> getSourceEvents() {
if (sourceEvents == null) {
throw new IllegalArgumentException(
"sourceEvents should be set by `setSourceEvents` method.");
// use default enum of SINGLE_SPLIT_SINGLE_TABLE
sourceEvents = singleSplitSingleTable();
}
return sourceEvents;
}
Expand All @@ -74,19 +78,23 @@ public static void setSourceEvents(List<List<Event>> customSourceEvents) {
}

/** set sourceEvents using predefined events. */
public static void setSourceEvents(SourceEventType eventType) {
public static void setSourceEvents(EventSetId eventType) {
switch (eventType) {
case SINGLE_SPLIT_SINGLE_TABLE:
{
sourceEvents = singleSplitSingleTable();
break;
}

case SINGLE_SPLIT_MULTI_TABLES:
{
sourceEvents = singleSplitMultiTables();
break;
}
case MULTI_SPLITS_SINGLE_TABLE:
{
sourceEvents = multiSplitsSingleTable();
break;
}
case CUSTOM_SOURCE_EVENTS:
{
break;
Expand Down Expand Up @@ -320,4 +328,162 @@ private static List<List<Event>> singleSplitMultiTables() {
eventOfSplits.add(split1);
return eventOfSplits;
}

private static List<List<Event>> multiSplitsSingleTable() {
List<List<Event>> eventOfSplits = new ArrayList<>();
List<Event> split1 = new ArrayList<>();
// create table
Schema schema =
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING())
.physicalColumn("col2", DataTypes.STRING())
.primaryKey("col1")
.build();
CreateTableEvent createTableEvent = new CreateTableEvent(table1, schema);
split1.add(createTableEvent);
BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING()));

// create slit1
DataChangeEvent insertEvent1 =
DataChangeEvent.insertEvent(
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1")
}));
split1.add(insertEvent1);
DataChangeEvent insertEvent2 =
DataChangeEvent.insertEvent(
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
BinaryStringData.fromString("2")
}));
split1.add(insertEvent2);
eventOfSplits.add(split1);

// create slit2
List<Event> split2 = new ArrayList<>();
split2.add(createTableEvent);
DataChangeEvent insertEvent3 =
DataChangeEvent.insertEvent(
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("3"),
BinaryStringData.fromString("3")
}));
split2.add(insertEvent3);
DataChangeEvent insertEvent4 =
DataChangeEvent.insertEvent(
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("4"),
BinaryStringData.fromString("4")
}));
split2.add(insertEvent4);
eventOfSplits.add(split2);

// create slit3
List<Event> split3 = new ArrayList<>();
split3.add(createTableEvent);
DataChangeEvent insertEvent5 =
DataChangeEvent.insertEvent(
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("5"),
BinaryStringData.fromString("5")
}));
split3.add(insertEvent5);
DataChangeEvent insertEvent6 =
DataChangeEvent.insertEvent(
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("6"),
BinaryStringData.fromString("6")
}));
split3.add(insertEvent6);
eventOfSplits.add(split3);

// create slit4
List<Event> split4 = new ArrayList<>();
split4.add(createTableEvent);
DataChangeEvent deleteEvent1 =
DataChangeEvent.deleteEvent(
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
BinaryStringData.fromString("2")
}));
split4.add(deleteEvent1);
DataChangeEvent deleteEvent2 =
DataChangeEvent.deleteEvent(
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("4"),
BinaryStringData.fromString("4")
}));
split4.add(deleteEvent2);
DataChangeEvent deleteEvent3 =
DataChangeEvent.deleteEvent(
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("6"),
BinaryStringData.fromString("6")
}));
split4.add(deleteEvent3);
AddColumnEvent.ColumnWithPosition columnWithPosition =
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("col3", DataTypes.STRING()));
AddColumnEvent addColumnEvent =
new AddColumnEvent(table1, Collections.singletonList(columnWithPosition));
split4.add(addColumnEvent);
generator =
new BinaryRecordDataGenerator(
RowType.of(DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()));
DataChangeEvent updateEvent1 =
DataChangeEvent.updateEvent(
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1"),
BinaryStringData.fromString("")
}),
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1"),
BinaryStringData.fromString("x")
}));
split4.add(updateEvent1);
DataChangeEvent updateEvent2 =
DataChangeEvent.updateEvent(
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("3"),
BinaryStringData.fromString("3"),
BinaryStringData.fromString("")
}),
generator.generate(
new Object[] {
BinaryStringData.fromString("3"),
BinaryStringData.fromString("3"),
BinaryStringData.fromString("x")
}));
split4.add(updateEvent2);
eventOfSplits.add(split4);

return eventOfSplits;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,23 @@
/** Configurations for {@link ValuesDataSource}. */
public class ValuesDataSourceOptions {

public static final ConfigOption<ValuesDataSourceHelper.SourceEventType> SOURCE_EVENT_TYPE =
ConfigOptions.key("source.event.type")
.enumType(ValuesDataSourceHelper.SourceEventType.class)
.defaultValue(ValuesDataSourceHelper.SourceEventType.SINGLE_SPLIT_SINGLE_TABLE)
public static final ConfigOption<ValuesDataSourceHelper.EventSetId> EVENT_SET_ID =
ConfigOptions.key("event-set.id")
.enumType(ValuesDataSourceHelper.EventSetId.class)
.defaultValue(ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE)
.withDescription(
Description.builder()
.text("Type of creating source change events. ")
.text(
"Id for creating source change events from ValuesDataSourceHelper.EventSetId.")
.linebreak()
.add(
ListElement.list(
text(
"SINGLE_SPLIT_SINGLE_TABLE: Default and predetermined case. Creating schema changes of single table and put them into one split."),
text(
"SINGLE_SPLIT_MULTI_TABLES: A predetermined case. Creating schema changes of multiple tables and put them into one split."),
text(
"MULTI_SPLITS_SINGLE_TABLE: A predetermined case. Creating schema changes of single table and put them into multiple splits."),
text(
"CUSTOM_SOURCE_EVENTS: Passed change events by the user through calling `setSourceEvents` method.")))
.build());
Expand Down
Loading
Loading