diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index dfd7af01273..33372132764 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -203,6 +203,8 @@ public String identifier() { } private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial"; + + private static final String SCAN_STARTUP_MODE_VALUE_SNAPSHOT = "snapshot"; private static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset"; private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset"; private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET = "specific-offset"; @@ -221,7 +223,8 @@ private static StartupOptions getStartupOptions(Configuration config) { switch (modeString.toLowerCase()) { case SCAN_STARTUP_MODE_VALUE_INITIAL: return StartupOptions.initial(); - + case SCAN_STARTUP_MODE_VALUE_SNAPSHOT: + return StartupOptions.snapshot(); case SCAN_STARTUP_MODE_VALUE_LATEST: return StartupOptions.latest(); @@ -238,9 +241,10 @@ private static StartupOptions getStartupOptions(Configuration config) { default: throw new ValidationException( String.format( - "Invalid value for option '%s'. Supported values are [%s, %s, %s, %s, %s], but was: %s", + "Invalid value for option '%s'. Supported values are [%s, %s, %s, %s, %s, %s], but was: %s", SCAN_STARTUP_MODE.key(), SCAN_STARTUP_MODE_VALUE_INITIAL, + SCAN_STARTUP_MODE_VALUE_SNAPSHOT, SCAN_STARTUP_MODE_VALUE_LATEST, SCAN_STARTUP_MODE_VALUE_EARLIEST, SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/JdbcSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/JdbcSourceConfigFactory.java index 3c1474874e6..61607a7160f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/JdbcSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/JdbcSourceConfigFactory.java @@ -201,6 +201,7 @@ public JdbcSourceConfigFactory chunkKeyColumn(String chunkKeyColumn) { public JdbcSourceConfigFactory startupOptions(StartupOptions startupOptions) { switch (startupOptions.startupMode) { case INITIAL: + case SNAPSHOT: case LATEST_OFFSET: break; default: diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/StartupMode.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/StartupMode.java index ef6381fc856..67a3285fda9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/StartupMode.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/StartupMode.java @@ -30,5 +30,6 @@ public enum StartupMode { SPECIFIC_OFFSETS, - TIMESTAMP + TIMESTAMP, + SNAPSHOT } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/StartupOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/StartupOptions.java index d365b9e91d3..880d09210ce 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/StartupOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/StartupOptions.java @@ -38,6 +38,14 @@ public static StartupOptions initial() { return new StartupOptions(StartupMode.INITIAL, null, null, null); } + /** + * Performs an initial snapshot on the monitored database tables upon first startup, and not + * read the binlog anymore . + */ + public static StartupOptions snapshot() { + return new StartupOptions(StartupMode.SNAPSHOT, null, null, null); + } + /** * Never to perform snapshot on the monitored database tables upon first startup, just read from * the beginning of the change log. This should be used with care, as it is only valid when the @@ -89,6 +97,7 @@ private StartupOptions( switch (startupMode) { case INITIAL: + case SNAPSHOT: case EARLIEST_OFFSET: case LATEST_OFFSET: break; @@ -104,6 +113,17 @@ private StartupOptions( } } + public boolean isStreamOnly() { + return startupMode == StartupMode.EARLIEST_OFFSET + || startupMode == StartupMode.LATEST_OFFSET + || startupMode == StartupMode.SPECIFIC_OFFSETS + || startupMode == StartupMode.TIMESTAMP; + } + + public boolean isSnapshotOnly() { + return startupMode == StartupMode.SNAPSHOT; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/IncrementalSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/IncrementalSource.java index 55f676a7222..aeb847433f8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/IncrementalSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/IncrementalSource.java @@ -33,7 +33,6 @@ import com.ververica.cdc.common.annotation.VisibleForTesting; import com.ververica.cdc.connectors.base.config.SourceConfig; import com.ververica.cdc.connectors.base.dialect.DataSourceDialect; -import com.ververica.cdc.connectors.base.options.StartupMode; import com.ververica.cdc.connectors.base.source.assigner.HybridSplitAssigner; import com.ververica.cdc.connectors.base.source.assigner.SplitAssigner; import com.ververica.cdc.connectors.base.source.assigner.StreamSplitAssigner; @@ -101,7 +100,12 @@ public OffsetFactory getOffsetFactory() { @Override public Boundedness getBoundedness() { - return Boundedness.CONTINUOUS_UNBOUNDED; + C sourceConfig = configFactory.create(0); + if (sourceConfig.getStartupOptions().isSnapshotOnly()) { + return Boundedness.BOUNDED; + } else { + return Boundedness.CONTINUOUS_UNBOUNDED; + } } @Override @@ -139,7 +143,7 @@ public SplitEnumerator createEnumerator( SplitEnumeratorContext enumContext) { C sourceConfig = configFactory.create(0); final SplitAssigner splitAssigner; - if (sourceConfig.getStartupOptions().startupMode == StartupMode.INITIAL) { + if (!sourceConfig.getStartupOptions().isStreamOnly()) { try { final List remainingTables = dataSourceDialect.discoverDataCollections(sourceConfig); @@ -161,7 +165,8 @@ public SplitEnumerator createEnumerator( splitAssigner = new StreamSplitAssigner(sourceConfig, dataSourceDialect, offsetFactory); } - return new IncrementalSourceEnumerator(enumContext, sourceConfig, splitAssigner); + return new IncrementalSourceEnumerator( + enumContext, sourceConfig, splitAssigner, getBoundedness()); } @Override @@ -189,7 +194,8 @@ public SplitEnumerator restoreEnumerator( throw new UnsupportedOperationException( "Unsupported restored PendingSplitsState: " + checkpoint); } - return new IncrementalSourceEnumerator(enumContext, sourceConfig, splitAssigner); + return new IncrementalSourceEnumerator( + enumContext, sourceConfig, splitAssigner, getBoundedness()); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/HybridSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/HybridSplitAssigner.java index 98bfbfeb8cb..db7f1d1fc2f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/HybridSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/HybridSplitAssigner.java @@ -46,6 +46,7 @@ public class HybridSplitAssigner implements SplitAssigne private static final String STREAM_SPLIT_ID = "stream-split"; private final int splitMetaGroupSize; + private final C sourceConfig; private boolean isStreamSplitAssigned; @@ -61,6 +62,7 @@ public HybridSplitAssigner( DataSourceDialect dialect, OffsetFactory offsetFactory) { this( + sourceConfig, new SnapshotSplitAssigner<>( sourceConfig, currentParallelism, @@ -80,6 +82,7 @@ public HybridSplitAssigner( DataSourceDialect dialect, OffsetFactory offsetFactory) { this( + sourceConfig, new SnapshotSplitAssigner<>( sourceConfig, currentParallelism, @@ -92,10 +95,12 @@ public HybridSplitAssigner( } private HybridSplitAssigner( + C sourceConfig, SnapshotSplitAssigner snapshotSplitAssigner, boolean isStreamSplitAssigned, int splitMetaGroupSize, OffsetFactory offsetFactory) { + this.sourceConfig = sourceConfig; this.snapshotSplitAssigner = snapshotSplitAssigner; this.isStreamSplitAssigned = isStreamSplitAssigned; this.splitMetaGroupSize = splitMetaGroupSize; @@ -179,8 +184,8 @@ public void notifyCheckpointComplete(long checkpointId) { } @Override - public boolean isStreamSplitAssigned() { - return isStreamSplitAssigned; + public boolean noMoreSplits() { + return snapshotSplitAssigner.noMoreSplits() && isStreamSplitAssigned; } @Override @@ -199,13 +204,17 @@ public StreamSplit createStreamSplit() { Map splitFinishedOffsets = snapshotSplitAssigner.getSplitFinishedOffsets(); final List finishedSnapshotSplitInfos = new ArrayList<>(); - Offset minOffset = null; + Offset minOffset = null, maxOffset = null; for (SchemalessSnapshotSplit split : assignedSnapshotSplit) { - // find the min offset of change log + // find the min and max offset of change log Offset changeLogOffset = splitFinishedOffsets.get(split.splitId()); if (minOffset == null || changeLogOffset.isBefore(minOffset)) { minOffset = changeLogOffset; } + if (maxOffset == null || changeLogOffset.isAfter(maxOffset)) { + maxOffset = changeLogOffset; + } + finishedSnapshotSplitInfos.add( new FinishedSnapshotSplitInfo( split.getTableId(), @@ -216,14 +225,21 @@ public StreamSplit createStreamSplit() { offsetFactory)); } + // If the source is running in snapshot mode, we use the highest watermark among + // snapshot splits as the ending offset to provide a consistent snapshot view at the moment + // of high watermark. + Offset stoppingOffset = offsetFactory.createNoStoppingOffset(); + if (sourceConfig.getStartupOptions().isSnapshotOnly()) { + stoppingOffset = maxOffset; + } + // the finishedSnapshotSplitInfos is too large for transmission, divide it to groups and // then transfer them - boolean divideMetaToGroups = finishedSnapshotSplitInfos.size() > splitMetaGroupSize; return new StreamSplit( STREAM_SPLIT_ID, minOffset == null ? offsetFactory.createInitialOffset() : minOffset, - offsetFactory.createNoStoppingOffset(), + stoppingOffset, divideMetaToGroups ? new ArrayList<>() : finishedSnapshotSplitInfos, new HashMap<>(), finishedSnapshotSplitInfos.size()); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java index 5dea133fa63..673cb51bf59 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java @@ -290,7 +290,7 @@ public void notifyCheckpointComplete(long checkpointId) { @Override public void close() {} - /** Indicates there is no more splits available in this assigner. */ + @Override public boolean noMoreSplits() { return remainingTables.isEmpty() && remainingSplits.isEmpty(); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/SplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/SplitAssigner.java index 4fca7cffd9e..bd776b26246 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/SplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/SplitAssigner.java @@ -55,10 +55,8 @@ public interface SplitAssigner { */ boolean waitingForFinishedSplits(); - /** Whether the split assigner is finished stream split assigning. */ - default boolean isStreamSplitAssigned() { - throw new UnsupportedOperationException("Not support to assigning StreamSplit."); - } + /** Indicates there is no more splits available in this assigner. */ + boolean noMoreSplits(); /** * Gets the finished splits' information. This is useful metadata to generate a stream split diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/StreamSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/StreamSplitAssigner.java index 82cbd23b8f0..9c5f8d62a24 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/StreamSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/assigner/StreamSplitAssigner.java @@ -119,7 +119,7 @@ public void notifyCheckpointComplete(long checkpointId) { } @Override - public boolean isStreamSplitAssigned() { + public boolean noMoreSplits() { return isStreamSplitAssigned; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java index 291f6d973f1..46544629937 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java @@ -16,6 +16,7 @@ package com.ververica.cdc.connectors.base.source.enumerator; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; @@ -66,14 +67,18 @@ public class IncrementalSourceEnumerator private final TreeSet readersAwaitingSplit; private List> finishedSnapshotSplitMeta; + private Boundedness boundedness; + public IncrementalSourceEnumerator( SplitEnumeratorContext context, SourceConfig sourceConfig, - SplitAssigner splitAssigner) { + SplitAssigner splitAssigner, + Boundedness boundedness) { this.context = context; this.sourceConfig = sourceConfig; this.splitAssigner = splitAssigner; this.readersAwaitingSplit = new TreeSet<>(); + this.boundedness = boundedness; } @Override @@ -163,7 +168,7 @@ private void assignSplits() { continue; } - if (splitAssigner.isStreamSplitAssigned() && sourceConfig.isCloseIdleReaders()) { + if (shouldCloseIdleReader()) { // close idle readers when snapshot phase finished. context.signalNoMoreSplits(nextAwaiting); awaitingReader.remove(); @@ -184,6 +189,17 @@ private void assignSplits() { } } + private boolean shouldCloseIdleReader() { + // When no unassigned split anymore, Signal NoMoreSplitsEvent to awaiting reader in two + // situations: + // 1. When Set StartupMode = snapshot mode(also bounded), there's no more splits in the + // assigner. + // 2. When set scan.incremental.close-idle-reader.enabled = true, there's no more splits in + // the assigner. + return splitAssigner.noMoreSplits() + && (boundedness == Boundedness.BOUNDED || (sourceConfig.isCloseIdleReaders())); + } + private int[] getRegisteredReader() { return this.context.registeredReaders().keySet().stream() .mapToInt(Integer::intValue) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceReader.java index a9f943b14e8..e88086a869e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceReader.java @@ -141,6 +141,10 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { protected void onSplitFinished(Map finishedSplitIds) { for (SourceSplitState splitState : finishedSplitIds.values()) { SourceSplitBase sourceSplit = splitState.toSourceSplit(); + if (sourceConfig.getStartupOptions().isSnapshotOnly() && sourceSplit.isStreamSplit()) { + // when startupMode = SNAPSHOT. the stream split could finish. + continue; + } checkState( sourceSplit.isSnapshotSplit(), String.format( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java index 849cad1bc33..be6c3f36a55 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java @@ -84,7 +84,7 @@ public RecordsWithSplitIds fetch() throws IOException { throw new IOException(e); } return dataIt == null - ? finishedSnapshotSplit() + ? finishedSplit() : ChangeEventRecords.forRecords(currentSplitId, dataIt); } @@ -154,7 +154,7 @@ public boolean canAssignNextSplit() { return currentFetcher == null || currentFetcher.isFinished(); } - private ChangeEventRecords finishedSnapshotSplit() { + private ChangeEventRecords finishedSplit() { final ChangeEventRecords finishedRecords = ChangeEventRecords.forFinishedSplit(currentSplitId); currentSplitId = null; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java index 28755ff5148..9a0c55e17a9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java @@ -35,7 +35,6 @@ import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -88,13 +87,18 @@ public void submitTask(FetchTask fetchTask) { try { streamFetchTask.execute(taskContext); } catch (Exception e) { - this.currentTaskRunning = false; LOG.error( String.format( "Execute stream read task for stream split %s fail", currentStreamSplit), e); readException = e; + } finally { + try { + stopReadTask(); + } catch (Exception e) { + throw new RuntimeException(e); + } } }); } @@ -122,7 +126,7 @@ public Iterator pollSplitRecords() throws InterruptedException { sourceRecordsSet.add(new SourceRecords(sourceRecords)); return sourceRecordsSet.iterator(); } else { - return Collections.emptyIterator(); + return null; } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java index cc2c33c602e..2ca4ff16712 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java @@ -164,6 +164,7 @@ public MongoDBSourceConfigFactory startupOptions(StartupOptions startupOptions) checkNotNull(startupOptions); switch (startupOptions.startupMode) { case INITIAL: + case SNAPSHOT: case LATEST_OFFSET: case TIMESTAMP: this.startupOptions = startupOptions; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java index d4265bb937d..20e13b4eb07 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java @@ -153,6 +153,8 @@ private void checkPrimaryKey(UniqueConstraint pk, String message) { } private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial"; + + private static final String SCAN_STARTUP_MODE_VALUE_SNAPSHOT = "snapshot"; private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset"; private static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp"; @@ -162,6 +164,8 @@ private static StartupOptions getStartupOptions(ReadableConfig config) { switch (modeString.toLowerCase()) { case SCAN_STARTUP_MODE_VALUE_INITIAL: return StartupOptions.initial(); + case SCAN_STARTUP_MODE_VALUE_SNAPSHOT: + return StartupOptions.snapshot(); case SCAN_STARTUP_MODE_VALUE_LATEST: return StartupOptions.latest(); case SCAN_STARTUP_MODE_VALUE_TIMESTAMP: @@ -174,9 +178,10 @@ private static StartupOptions getStartupOptions(ReadableConfig config) { default: throw new ValidationException( String.format( - "Invalid value for option '%s'. Supported values are [%s, %s, %s], but was: %s", + "Invalid value for option '%s'. Supported values are [%s, %s, %s, %s], but was: %s", SCAN_STARTUP_MODE.key(), SCAN_STARTUP_MODE_VALUE_INITIAL, + SCAN_STARTUP_MODE_VALUE_SNAPSHOT, SCAN_STARTUP_MODE_VALUE_LATEST, SCAN_STARTUP_MODE_VALUE_TIMESTAMP, modeString)); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java index 3986d3edf35..02306b10358 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java @@ -219,13 +219,87 @@ public void testReadSingleTableWithSingleParallelismAndSkipBackfill() throws Exc true); } + @Test + public void testSnapshotOnlyModeWithDMLPostHighWaterMark() throws Exception { + if (!parallelismSnapshot) { + return; + } + // The data num is 21, set fetchSize = 22 to test whether the job is bounded. + List records = + testBackfillWhenWritingEvents( + false, 22, USE_POST_HIGHWATERMARK_HOOK, StartupOptions.snapshot()); + List expectedRecords = + Arrays.asList( + "+I[101, user_1, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[109, user_4, Shanghai, 123567891234]", + "+I[110, user_5, Shanghai, 123567891234]", + "+I[111, user_6, Shanghai, 123567891234]", + "+I[118, user_7, Shanghai, 123567891234]", + "+I[121, user_8, Shanghai, 123567891234]", + "+I[123, user_9, Shanghai, 123567891234]", + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[1019, user_20, Shanghai, 123567891234]", + "+I[2000, user_21, Shanghai, 123567891234]"); + assertEqualsInAnyOrder(expectedRecords, records); + } + + @Test + public void testSnapshotOnlyModeWithDMLPreHighWaterMark() throws Exception { + if (!parallelismSnapshot) { + return; + } + // The data num is 21, set fetchSize = 22 to test whether the job is bounded + List records = + testBackfillWhenWritingEvents( + false, 22, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.snapshot()); + List expectedRecords = + Arrays.asList( + "+I[101, user_1, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[109, user_4, Shanghai, 123567891234]", + "+I[110, user_5, Shanghai, 123567891234]", + "+I[111, user_6, Shanghai, 123567891234]", + "+I[118, user_7, Shanghai, 123567891234]", + "+I[121, user_8, Shanghai, 123567891234]", + "+I[123, user_9, Shanghai, 123567891234]", + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[2000, user_21, Pittsburgh, 123567891234]", + "+I[15213, user_15213, Shanghai, 123567891234]"); + // when enable backfill, the wal log between (snapshot, high_watermark) will be + // applied as snapshot image + assertEqualsInAnyOrder(expectedRecords, records); + } + @Test public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception { if (!parallelismSnapshot) { return; } - List records = testBackfillWhenWritingEvents(false, 21, USE_PRE_HIGHWATERMARK_HOOK); + List records = + testBackfillWhenWritingEvents( + false, 21, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.initial()); List expectedRecords = Arrays.asList( @@ -261,7 +335,9 @@ public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception { return; } - List records = testBackfillWhenWritingEvents(false, 21, USE_POST_LOWWATERMARK_HOOK); + List records = + testBackfillWhenWritingEvents( + false, 21, USE_POST_LOWWATERMARK_HOOK, StartupOptions.initial()); List expectedRecords = Arrays.asList( @@ -297,7 +373,8 @@ public void testEnableBackfillWithDMLPostHighWaterMark() throws Exception { return; } List records = - testBackfillWhenWritingEvents(false, 25, USE_POST_HIGHWATERMARK_HOOK); + testBackfillWhenWritingEvents( + false, 25, USE_POST_HIGHWATERMARK_HOOK, StartupOptions.initial()); List expectedRecords = Arrays.asList( "+I[101, user_1, Shanghai, 123567891234]", @@ -334,7 +411,9 @@ public void testSkipBackfillWithDMLPreHighWaterMark() throws Exception { return; } - List records = testBackfillWhenWritingEvents(true, 25, USE_PRE_HIGHWATERMARK_HOOK); + List records = + testBackfillWhenWritingEvents( + true, 25, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.initial()); List expectedRecords = Arrays.asList( @@ -374,7 +453,9 @@ public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception { return; } - List records = testBackfillWhenWritingEvents(true, 25, USE_POST_LOWWATERMARK_HOOK); + List records = + testBackfillWhenWritingEvents( + true, 25, USE_POST_LOWWATERMARK_HOOK, StartupOptions.initial()); List expectedRecords = Arrays.asList( @@ -410,7 +491,8 @@ public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception { } private List testBackfillWhenWritingEvents( - boolean skipBackFill, int fetchSize, int hookType) throws Exception { + boolean skipBackFill, int fetchSize, int hookType, StartupOptions startupOptions) + throws Exception { String customerDatabase = "customer_" + Integer.toUnsignedString(new Random().nextInt(), 36); @@ -447,7 +529,7 @@ private List testBackfillWhenWritingEvents( .databaseList(customerDatabase) .username(FLINK_USER) .password(FLINK_USER_PASSWORD) - .startupOptions(StartupOptions.initial()) + .startupOptions(startupOptions) .scanFullChangelog(true) .collectionList( getCollectionNameRegex( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java index 44e35926dd1..a7b52be5290 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java @@ -148,10 +148,78 @@ public void testReadSingleTableWithSingleParallelismAndSkipBackfill() throws Exc true); } + @Test + public void testSnapshotOnlyModeWithDMLPostHighWaterMark() throws Exception { + // The data num is 21, set fetchSize = 22 to test whether the job is bounded. + List records = + testBackfillWhenWritingEvents( + false, 22, USE_POST_HIGHWATERMARK_HOOK, StartupOptions.snapshot()); + List expectedRecords = + Arrays.asList( + "+I[101, user_1, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[109, user_4, Shanghai, 123567891234]", + "+I[110, user_5, Shanghai, 123567891234]", + "+I[111, user_6, Shanghai, 123567891234]", + "+I[118, user_7, Shanghai, 123567891234]", + "+I[121, user_8, Shanghai, 123567891234]", + "+I[123, user_9, Shanghai, 123567891234]", + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[1019, user_20, Shanghai, 123567891234]", + "+I[2000, user_21, Shanghai, 123567891234]"); + assertEqualsInAnyOrder(expectedRecords, records); + } + + @Test + public void testSnapshotOnlyModeWithDMLPreHighWaterMark() throws Exception { + // The data num is 21, set fetchSize = 22 to test whether the job is bounded + List records = + testBackfillWhenWritingEvents( + false, 22, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.snapshot()); + List expectedRecords = + Arrays.asList( + "+I[101, user_1, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[109, user_4, Shanghai, 123567891234]", + "+I[110, user_5, Shanghai, 123567891234]", + "+I[111, user_6, Shanghai, 123567891234]", + "+I[118, user_7, Shanghai, 123567891234]", + "+I[121, user_8, Shanghai, 123567891234]", + "+I[123, user_9, Shanghai, 123567891234]", + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[2000, user_21, Pittsburgh, 123567891234]", + "+I[15213, user_15213, Shanghai, 123567891234]"); + // when enable backfill, the wal log between (snapshot, high_watermark) will be + // applied as snapshot image + assertEqualsInAnyOrder(expectedRecords, records); + } + @Test public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception { - List records = testBackfillWhenWritingEvents(false, 21, USE_PRE_HIGHWATERMARK_HOOK); + List records = + testBackfillWhenWritingEvents( + false, 21, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.initial()); List expectedRecords = Arrays.asList( @@ -184,7 +252,9 @@ public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception { @Test public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception { - List records = testBackfillWhenWritingEvents(false, 21, USE_POST_LOWWATERMARK_HOOK); + List records = + testBackfillWhenWritingEvents( + false, 21, USE_POST_LOWWATERMARK_HOOK, StartupOptions.initial()); List expectedRecords = Arrays.asList( @@ -218,7 +288,8 @@ public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception { public void testEnableBackfillWithDMLPostHighWaterMark() throws Exception { List records = - testBackfillWhenWritingEvents(false, 24, USE_POST_HIGHWATERMARK_HOOK); + testBackfillWhenWritingEvents( + false, 24, USE_POST_HIGHWATERMARK_HOOK, StartupOptions.initial()); List expectedRecords = Arrays.asList( "+I[101, user_1, Shanghai, 123567891234]", @@ -253,7 +324,9 @@ public void testEnableBackfillWithDMLPostHighWaterMark() throws Exception { @Test public void testSkipBackfillWithDMLPreHighWaterMark() throws Exception { - List records = testBackfillWhenWritingEvents(true, 24, USE_PRE_HIGHWATERMARK_HOOK); + List records = + testBackfillWhenWritingEvents( + true, 24, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.initial()); List expectedRecords = Arrays.asList( @@ -291,7 +364,9 @@ public void testSkipBackfillWithDMLPreHighWaterMark() throws Exception { @Test public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception { - List records = testBackfillWhenWritingEvents(true, 24, USE_POST_LOWWATERMARK_HOOK); + List records = + testBackfillWhenWritingEvents( + true, 24, USE_POST_LOWWATERMARK_HOOK, StartupOptions.initial()); List expectedRecords = Arrays.asList( @@ -328,7 +403,8 @@ public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception { } private List testBackfillWhenWritingEvents( - boolean skipBackFill, int fetchSize, int hookType) throws Exception { + boolean skipBackFill, int fetchSize, int hookType, StartupOptions startupOptions) + throws Exception { String customerDatabase = CONTAINER.executeCommandFileInSeparateDatabase("customer"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); @@ -357,6 +433,7 @@ private List testBackfillWhenWritingEvents( customerDatabase, new String[] {"customers"})) .deserializer(customerTable.getDeserializer(false)) .skipSnapshotBackfill(skipBackFill) + .startupOptions(startupOptions) .build(); // Do some database operations during hook in snapshot phase. diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index f8fc488fef4..857b2639c2c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -37,7 +37,6 @@ import io.debezium.connector.base.ChangeEventQueue; import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics; import io.debezium.pipeline.DataChangeEvent; -import io.debezium.pipeline.source.spi.ChangeEventSource; import io.debezium.relational.TableId; import io.debezium.relational.Tables; import org.apache.kafka.connect.data.Struct; @@ -87,6 +86,8 @@ public class BinlogSplitReader implements DebeziumReader maxSplitHighWatermarkMap; private final Set pureBinlogPhaseTables; private Tables.TableFilter capturedTableFilter; + private final StoppableChangeEventSourceContext changeEventSourceContext = + new StoppableChangeEventSourceContext(); private static final long READER_CLOSE_TIMEOUT = 30L; @@ -124,29 +125,22 @@ public void submitSplit(MySqlSplit mySqlSplit) { () -> { try { binlogSplitReadTask.execute( - new BinlogSplitChangeEventSourceContextImpl(), + changeEventSourceContext, statefulTaskContext.getMySqlPartition(), statefulTaskContext.getOffsetContext()); } catch (Exception e) { - currentTaskRunning = false; LOG.error( String.format( "Execute binlog read task for mysql split %s fail", currentBinlogSplit), e); readException = e; + } finally { + stopBinlogReadTask(); } }); } - private class BinlogSplitChangeEventSourceContextImpl - implements ChangeEventSource.ChangeEventSourceContext { - @Override - public boolean isRunning() { - return currentTaskRunning; - } - } - @Override public boolean isFinished() { return currentBinlogSplit == null || !currentTaskRunning; @@ -191,9 +185,8 @@ public void close() { if (statefulTaskContext.getBinaryLogClient() != null) { statefulTaskContext.getBinaryLogClient().disconnect(); } - // set currentTaskRunning to false to terminate the - // while loop in MySqlStreamingChangeEventSource's execute method - currentTaskRunning = false; + + stopBinlogReadTask(); if (executorService != null) { executorService.shutdown(); if (!executorService.awaitTermination(READER_CLOSE_TIMEOUT, TimeUnit.SECONDS)) { @@ -328,7 +321,9 @@ private Predicate createEventFilter(BinlogOffset startingOffset) { } public void stopBinlogReadTask() { - this.currentTaskRunning = false; + currentTaskRunning = false; + // Terminate the while loop in MySqlStreamingChangeEventSource's execute method + changeEventSourceContext.stopChangeEventSource(); } @VisibleForTesting diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java index 70e37f010b5..a34bd8691dd 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java @@ -92,6 +92,9 @@ public class SnapshotSplitReader implements DebeziumReader createEnumerator( validator.validate(); final MySqlSplitAssigner splitAssigner; - if (sourceConfig.getStartupOptions().startupMode == StartupMode.INITIAL) { + if (!sourceConfig.getStartupOptions().isStreamOnly()) { try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { boolean isTableIdCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc); splitAssigner = @@ -214,7 +218,8 @@ public SplitEnumerator createEnumerator( splitAssigner = new MySqlBinlogSplitAssigner(sourceConfig); } - return new MySqlSourceEnumerator(enumContext, sourceConfig, splitAssigner); + return new MySqlSourceEnumerator( + enumContext, sourceConfig, splitAssigner, getBoundedness()); } @Override @@ -238,7 +243,8 @@ public SplitEnumerator restoreEnumerator( throw new UnsupportedOperationException( "Unsupported restored PendingSplitsState: " + checkpoint); } - return new MySqlSourceEnumerator(enumContext, sourceConfig, splitAssigner); + return new MySqlSourceEnumerator( + enumContext, sourceConfig, splitAssigner, getBoundedness()); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java index ca4eedf61bb..d3b6ad3c634 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java @@ -110,7 +110,7 @@ public AssignerStatus getAssignerStatus() { } @Override - public boolean isStreamSplitAssigned() { + public boolean noMoreSplits() { return isBinlogSplitAssigned; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java index 4c7b26eba42..e100c97f675 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java @@ -51,6 +51,7 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner { private static final String BINLOG_SPLIT_ID = "binlog-split"; private final int splitMetaGroupSize; + private final MySqlSourceConfig sourceConfig; private boolean isBinlogSplitAssigned; @@ -62,6 +63,7 @@ public MySqlHybridSplitAssigner( List remainingTables, boolean isTableIdCaseSensitive) { this( + sourceConfig, new MySqlSnapshotSplitAssigner( sourceConfig, currentParallelism, remainingTables, isTableIdCaseSensitive), false, @@ -73,6 +75,7 @@ public MySqlHybridSplitAssigner( int currentParallelism, HybridPendingSplitsState checkpoint) { this( + sourceConfig, new MySqlSnapshotSplitAssigner( sourceConfig, currentParallelism, checkpoint.getSnapshotPendingSplits()), checkpoint.isBinlogSplitAssigned(), @@ -80,9 +83,11 @@ public MySqlHybridSplitAssigner( } private MySqlHybridSplitAssigner( + MySqlSourceConfig sourceConfig, MySqlSnapshotSplitAssigner snapshotSplitAssigner, boolean isBinlogSplitAssigned, int splitMetaGroupSize) { + this.sourceConfig = sourceConfig; this.snapshotSplitAssigner = snapshotSplitAssigner; this.isBinlogSplitAssigned = isBinlogSplitAssigned; this.splitMetaGroupSize = splitMetaGroupSize; @@ -99,7 +104,7 @@ public Optional getNext() { // do not assign split until the adding table process finished return Optional.empty(); } - if (snapshotSplitAssigner.noMoreSnapshotSplits()) { + if (snapshotSplitAssigner.noMoreSplits()) { // binlog split assigning if (isBinlogSplitAssigned) { // no more splits for the assigner @@ -129,11 +134,6 @@ public boolean waitingForFinishedSplits() { return snapshotSplitAssigner.waitingForFinishedSplits(); } - @Override - public boolean isStreamSplitAssigned() { - return isBinlogSplitAssigned; - } - @Override public List getFinishedSplitInfos() { return snapshotSplitAssigner.getFinishedSplitInfos(); @@ -174,6 +174,11 @@ public AssignerStatus getAssignerStatus() { return snapshotSplitAssigner.getAssignerStatus(); } + @Override + public boolean noMoreSplits() { + return snapshotSplitAssigner.noMoreSplits() && isBinlogSplitAssigned; + } + @Override public void startAssignNewlyAddedTables() { snapshotSplitAssigner.startAssignNewlyAddedTables(); @@ -189,10 +194,6 @@ public void close() { snapshotSplitAssigner.close(); } - public boolean noMoreSnapshotSplits() { - return snapshotSplitAssigner.noMoreSnapshotSplits(); - } - // -------------------------------------------------------------------------------------------- private MySqlBinlogSplit createBinlogSplit() { @@ -206,12 +207,17 @@ private MySqlBinlogSplit createBinlogSplit() { final List finishedSnapshotSplitInfos = new ArrayList<>(); BinlogOffset minBinlogOffset = null; + BinlogOffset maxBinlogOffset = null; for (MySqlSchemalessSnapshotSplit split : assignedSnapshotSplit) { - // find the min binlog offset + // find the min and max binlog offset BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId()); if (minBinlogOffset == null || binlogOffset.isBefore(minBinlogOffset)) { minBinlogOffset = binlogOffset; } + if (maxBinlogOffset == null || binlogOffset.isAfter(maxBinlogOffset)) { + maxBinlogOffset = binlogOffset; + } + finishedSnapshotSplitInfos.add( new FinishedSnapshotSplitInfo( split.getTableId(), @@ -221,14 +227,21 @@ private MySqlBinlogSplit createBinlogSplit() { binlogOffset)); } + // If the source is running in snapshot mode, we use the highest watermark among + // snapshot splits as the ending offset to provide a consistent snapshot view at the moment + // of high watermark. + BinlogOffset stoppingOffset = BinlogOffset.ofNonStopping(); + if (sourceConfig.getStartupOptions().isSnapshotOnly()) { + stoppingOffset = maxBinlogOffset; + } + // the finishedSnapshotSplitInfos is too large for transmission, divide it to groups and // then transfer them - boolean divideMetaToGroups = finishedSnapshotSplitInfos.size() > splitMetaGroupSize; return new MySqlBinlogSplit( BINLOG_SPLIT_ID, minBinlogOffset == null ? BinlogOffset.ofEarliest() : minBinlogOffset, - BinlogOffset.ofNonStopping(), + stoppingOffset, divideMetaToGroups ? new ArrayList<>() : finishedSnapshotSplitInfos, new HashMap<>(), finishedSnapshotSplitInfos.size()); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java index 0536a2621fe..8753f8aebed 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java @@ -216,7 +216,9 @@ else if (!isRemainingTablesCheckpointed && !isSnapshotAssigningFinished(assigner } private void captureNewlyAddedTables() { - if (sourceConfig.isScanNewlyAddedTableEnabled()) { + // Don't scan newly added table in snapshot mode. + if (sourceConfig.isScanNewlyAddedTableEnabled() + || !sourceConfig.getStartupOptions().isSnapshotOnly()) { // check whether we got newly added tables try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { final List currentCapturedTables = @@ -517,7 +519,7 @@ private void waitTableDiscoveryReady() { } /** Indicates there is no more splits available in this assigner. */ - public boolean noMoreSnapshotSplits() { + public boolean noMoreSplits() { return !needToDiscoveryTables() && remainingTables.isEmpty() && remainingSplits.isEmpty(); } @@ -547,7 +549,7 @@ public Map getSplitFinishedOffsets() { * are finished. */ private boolean allSnapshotSplitsFinished() { - return noMoreSnapshotSplits() && assignedSplits.size() == splitFinishedOffsets.size(); + return noMoreSplits() && assignedSplits.size() == splitFinishedOffsets.size(); } private void splitChunksForRemainingTables() { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSplitAssigner.java index 52dfcd325c9..57f003eb95b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSplitAssigner.java @@ -18,6 +18,7 @@ import org.apache.flink.api.common.state.CheckpointListener; +import com.ververica.cdc.common.annotation.Internal; import com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsState; import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo; @@ -32,6 +33,7 @@ * The {@code MySqlSplitAssigner} is responsible for deciding what split should be processed. It * determines split processing order. */ +@Internal public interface MySqlSplitAssigner { /** @@ -53,11 +55,6 @@ public interface MySqlSplitAssigner { */ boolean waitingForFinishedSplits(); - /** Whether the split assigner is finished stream split assigning. */ - default boolean isStreamSplitAssigned() { - throw new UnsupportedOperationException("Not support to assigning StreamSplit."); - } - /** * Gets the finished splits' information. This is useful metadata to generate a binlog split * that considering finished snapshot splits. @@ -110,6 +107,9 @@ default boolean isStreamSplitAssigned() { /** Starts assign newly added tables. */ void startAssignNewlyAddedTables(); + /** Indicates there is no more splits available in this assigner. */ + boolean noMoreSplits(); + /** * Callback to handle the binlog split has been updated in the newly added tables process. This * is useful to check the newly added tables has been finished or not. diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java index 3b46bd521c8..f475ed60b80 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java @@ -16,6 +16,7 @@ package com.ververica.cdc.connectors.mysql.source.enumerator; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; @@ -24,7 +25,6 @@ import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; import com.ververica.cdc.common.annotation.Internal; -import com.ververica.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner; import com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner; import com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner; import com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsState; @@ -71,6 +71,8 @@ public class MySqlSourceEnumerator implements SplitEnumerator readersAwaitingSplit; private List> binlogSplitMeta; @@ -80,10 +82,12 @@ public class MySqlSourceEnumerator implements SplitEnumerator context, MySqlSourceConfig sourceConfig, - MySqlSplitAssigner splitAssigner) { + MySqlSplitAssigner splitAssigner, + Boundedness boundedness) { this.context = context; this.sourceConfig = sourceConfig; this.splitAssigner = splitAssigner; + this.boundedness = boundedness; this.readersAwaitingSplit = new TreeSet<>(); } @@ -204,10 +208,7 @@ private void assignSplits() { continue; } - if (splitAssigner.isStreamSplitAssigned() - && sourceConfig.isCloseIdleReaders() - && noMoreSnapshotSplits() - && (binlogSplitTaskId != null && !binlogSplitTaskId.equals(nextAwaiting))) { + if (shouldCloseIdleReader(nextAwaiting)) { // close idle readers when snapshot phase finished. context.signalNoMoreSplits(nextAwaiting); awaitingReader.remove(); @@ -232,14 +233,18 @@ && noMoreSnapshotSplits() } } - private boolean noMoreSnapshotSplits() { - if (splitAssigner instanceof MySqlHybridSplitAssigner) { - return ((MySqlHybridSplitAssigner) splitAssigner).noMoreSnapshotSplits(); - } else if (splitAssigner instanceof MySqlBinlogSplitAssigner) { - return true; - } - throw new IllegalStateException( - "Unexpected subtype of MySqlSplitAssigner class when invoking noMoreSnapshotSplits."); + private boolean shouldCloseIdleReader(int nextAwaiting) { + // When no unassigned split anymore, Signal NoMoreSplitsEvent to awaiting reader in two + // situations: + // 1. When Set StartupMode = snapshot mode(also bounded), there's no more splits in the + // assigner. + // 2. When set scan.incremental.close-idle-reader.enabled = true, there's no more splits in + // the assigner. + return splitAssigner.noMoreSplits() + && (boundedness == Boundedness.BOUNDED + || (sourceConfig.isCloseIdleReaders() + && (binlogSplitTaskId != null + && !binlogSplitTaskId.equals(nextAwaiting)))); } private int[] getRegisteredReader() { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java index f42bf2ab0b0..e38ebed5fa9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java @@ -176,15 +176,26 @@ protected void onSplitFinished(Map finishedSplitIds) { for (MySqlSplitState mySqlSplitState : finishedSplitIds.values()) { MySqlSplit mySqlSplit = mySqlSplitState.toMySqlSplit(); if (mySqlSplit.isBinlogSplit()) { - suspendedBinlogSplit = toSuspendedBinlogSplit(mySqlSplit.asBinlogSplit()); - LOG.info( - "Source reader {} suspended binlog split reader success after the newly added table process, current offset {}", - subtaskId, - suspendedBinlogSplit.getStartingOffset()); - context.sendSourceEventToCoordinator( - new LatestFinishedSplitsNumberRequestEvent()); - // do not request next split when the reader is suspended - requestNextSplit = false; + // Two possibilities that finish a binlog split: + // + // 1. Binlog reader is suspended by enumerator because new tables have been + // finished its snapshot reading. + // Under this case mySqlSourceReaderContext.isBinlogSplitReaderSuspended() is + // true and need to request the latest finished splits number. + // + // 2. Binlog reader reaches the ending offset of the split. We need to do + // nothing under this case. + if (mySqlSourceReaderContext.isBinlogSplitReaderSuspended()) { + suspendedBinlogSplit = toSuspendedBinlogSplit(mySqlSplit.asBinlogSplit()); + LOG.info( + "Source reader {} suspended binlog split reader success after the newly added table process, current offset {}", + subtaskId, + suspendedBinlogSplit.getStartingOffset()); + context.sendSourceEventToCoordinator( + new LatestFinishedSplitsNumberRequestEvent()); + // do not request next split when the reader is suspended + requestNextSplit = false; + } } else { finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit()); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java index 755cea74fc0..f136b243d0c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java @@ -213,6 +213,7 @@ public Set> optionalOptions() { } private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial"; + private static final String SCAN_STARTUP_MODE_VALUE_SNAPSHOT = "snapshot"; private static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset"; private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset"; private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET = "specific-offset"; @@ -224,6 +225,8 @@ private static StartupOptions getStartupOptions(ReadableConfig config) { switch (modeString.toLowerCase()) { case SCAN_STARTUP_MODE_VALUE_INITIAL: return StartupOptions.initial(); + case SCAN_STARTUP_MODE_VALUE_SNAPSHOT: + return StartupOptions.snapshot(); case SCAN_STARTUP_MODE_VALUE_LATEST: return StartupOptions.latest(); @@ -241,9 +244,10 @@ private static StartupOptions getStartupOptions(ReadableConfig config) { default: throw new ValidationException( String.format( - "Invalid value for option '%s'. Supported values are [%s, %s, %s, %s, %s], but was: %s", + "Invalid value for option '%s'. Supported values are [%s, %s, %s, %s, %s, %s], but was: %s", SCAN_STARTUP_MODE.key(), SCAN_STARTUP_MODE_VALUE_INITIAL, + SCAN_STARTUP_MODE_VALUE_SNAPSHOT, SCAN_STARTUP_MODE_VALUE_LATEST, SCAN_STARTUP_MODE_VALUE_EARLIEST, SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/StartupMode.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/StartupMode.java index 6cf91932188..e41ee248e17 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/StartupMode.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/StartupMode.java @@ -30,5 +30,6 @@ public enum StartupMode { SPECIFIC_OFFSETS, - TIMESTAMP + TIMESTAMP, + SNAPSHOT } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/StartupOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/StartupOptions.java index 9e90502b021..03a64d07f2a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/StartupOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/StartupOptions.java @@ -40,6 +40,14 @@ public static StartupOptions initial() { return new StartupOptions(StartupMode.INITIAL, null); } + /** + * Performs an initial snapshot on the monitored database tables upon first startup, and not + * read the binlog anymore . + */ + public static StartupOptions snapshot() { + return new StartupOptions(StartupMode.SNAPSHOT, null); + } + /** * Never to perform snapshot on the monitored database tables upon first startup, just read from * the beginning of the binlog. This should be used with care, as it is only valid when the @@ -92,12 +100,23 @@ public static StartupOptions timestamp(long startupTimestampMillis) { private StartupOptions(StartupMode startupMode, BinlogOffset binlogOffset) { this.startupMode = startupMode; this.binlogOffset = binlogOffset; - if (startupMode != StartupMode.INITIAL) { + if (isStreamOnly()) { checkNotNull( binlogOffset, "Binlog offset is required if startup mode is %s", startupMode); } } + public boolean isStreamOnly() { + return startupMode == StartupMode.EARLIEST_OFFSET + || startupMode == StartupMode.LATEST_OFFSET + || startupMode == StartupMode.SPECIFIC_OFFSETS + || startupMode == StartupMode.TIMESTAMP; + } + + public boolean isSnapshotOnly() { + return startupMode == StartupMode.SNAPSHOT; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java index 50bd27978a3..32cd0a21031 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java @@ -160,6 +160,8 @@ public class MySqlSourceITCase extends MySqlSourceTestBase { private static final int USE_POST_LOWWATERMARK_HOOK = 1; private static final int USE_PRE_HIGHWATERMARK_HOOK = 2; + private static final int USE_POST_HIGHWATERMARK_HOOK = 3; + @Parameterized.Parameters(name = "table: {0}, chunkColumn: {1}") public static Collection parameters() { return Arrays.asList( @@ -350,10 +352,76 @@ public void testStartFromLatestOffset() throws Exception { testStartingOffset(StartupOptions.latest(), Collections.emptyList()); } + @Test + public void testSnapshotOnlyModeWithDMLPostHighWaterMark() throws Exception { + List records = + testBackfillWhenWritingEvents( + false, 21, USE_POST_HIGHWATERMARK_HOOK, StartupOptions.snapshot()); + List expectedRecords = + Arrays.asList( + "+I[101, user_1, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[109, user_4, Shanghai, 123567891234]", + "+I[110, user_5, Shanghai, 123567891234]", + "+I[111, user_6, Shanghai, 123567891234]", + "+I[118, user_7, Shanghai, 123567891234]", + "+I[121, user_8, Shanghai, 123567891234]", + "+I[123, user_9, Shanghai, 123567891234]", + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[1019, user_20, Shanghai, 123567891234]", + "+I[2000, user_21, Shanghai, 123567891234]"); + assertEqualsInAnyOrder(expectedRecords, records); + } + + @Test + public void testSnapshotOnlyModeWithDMLPreHighWaterMark() throws Exception { + List records = + testBackfillWhenWritingEvents( + false, 21, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.snapshot()); + List expectedRecords = + Arrays.asList( + "+I[101, user_1, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[109, user_4, Shanghai, 123567891234]", + "+I[110, user_5, Shanghai, 123567891234]", + "+I[111, user_6, Shanghai, 123567891234]", + "+I[118, user_7, Shanghai, 123567891234]", + "+I[121, user_8, Shanghai, 123567891234]", + "+I[123, user_9, Shanghai, 123567891234]", + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[2000, user_21, Pittsburgh, 123567891234]", + "+I[15213, user_15213, Shanghai, 123567891234]"); + // when enable backfill, the wal log between (snapshot, high_watermark) will be + // applied as snapshot image + assertEqualsInAnyOrder(expectedRecords, records); + } + @Test public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception { - List records = testBackfillWhenWritingEvents(false, 21, USE_PRE_HIGHWATERMARK_HOOK); + List records = + testBackfillWhenWritingEvents( + false, 21, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.initial()); List expectedRecords = Arrays.asList( @@ -386,7 +454,9 @@ public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception { @Test public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception { - List records = testBackfillWhenWritingEvents(false, 21, USE_POST_LOWWATERMARK_HOOK); + List records = + testBackfillWhenWritingEvents( + false, 21, USE_POST_LOWWATERMARK_HOOK, StartupOptions.initial()); List expectedRecords = Arrays.asList( @@ -419,7 +489,9 @@ public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception { @Test public void testSkipBackfillWithDMLPreHighWaterMark() throws Exception { - List records = testBackfillWhenWritingEvents(true, 25, USE_PRE_HIGHWATERMARK_HOOK); + List records = + testBackfillWhenWritingEvents( + true, 25, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.initial()); List expectedRecords = Arrays.asList( @@ -456,7 +528,9 @@ public void testSkipBackfillWithDMLPreHighWaterMark() throws Exception { @Test public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception { - List records = testBackfillWhenWritingEvents(true, 25, USE_POST_LOWWATERMARK_HOOK); + List records = + testBackfillWhenWritingEvents( + true, 25, USE_POST_LOWWATERMARK_HOOK, StartupOptions.initial()); List expectedRecords = Arrays.asList( @@ -492,7 +566,11 @@ public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception { } private List testBackfillWhenWritingEvents( - boolean skipSnapshotBackfill, int fetchSize, int hookType) throws Exception { + boolean skipSnapshotBackfill, + int fetchSize, + int hookType, + StartupOptions startupOptions) + throws Exception { customDatabase.createAndInitialize(); TestTable customerTable = new TestTable(customDatabase, "customers", TestTableSchemas.CUSTOMERS); @@ -509,6 +587,7 @@ private List testBackfillWhenWritingEvents( .tableList(customerTable.getTableId()) .deserializer(customerTable.getDeserializer()) .skipSnapshotBackfill(skipSnapshotBackfill) + .startupOptions(startupOptions) .build(); String[] statements = @@ -529,10 +608,17 @@ private List testBackfillWhenWritingEvents( connection.execute(statements); connection.commit(); }; - if (hookType == USE_PRE_HIGHWATERMARK_HOOK) { - hooks.setPreHighWatermarkAction(snapshotPhaseHook); - } else if (hookType == USE_POST_LOWWATERMARK_HOOK) { - hooks.setPostLowWatermarkAction(snapshotPhaseHook); + + switch (hookType) { + case USE_POST_LOWWATERMARK_HOOK: + hooks.setPostLowWatermarkAction(snapshotPhaseHook); + break; + case USE_PRE_HIGHWATERMARK_HOOK: + hooks.setPreHighWatermarkAction(snapshotPhaseHook); + break; + case USE_POST_HIGHWATERMARK_HOOK: + hooks.setPostHighWatermarkAction(snapshotPhaseHook); + break; } source.setSnapshotHooks(hooks); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java index 2bd4c1ff9d5..8c303ce852b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java @@ -31,6 +31,7 @@ import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo; import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit; +import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; @@ -49,6 +50,7 @@ import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** Tests for {@link MySqlHybridSplitAssigner}. */ public class MySqlHybridSplitAssignerTest extends MySqlSourceTestBase { @@ -65,7 +67,8 @@ public static void init() { public void testAssignMySqlBinlogSplitAfterAllSnapshotSplitsFinished() { final String captureTable = "customers"; - MySqlSourceConfig configuration = getConfig(new String[] {captureTable}); + MySqlSourceConfig configuration = + getConfig(new String[] {captureTable}, StartupOptions.initial()); // Step 1. Mock MySqlHybridSplitAssigner Object TableId tableId = new TableId(null, customerDatabase.getDatabaseName(), captureTable); @@ -139,14 +142,54 @@ public void testAssignMySqlBinlogSplitAfterAllSnapshotSplitsFinished() { assigner.close(); } - private MySqlSourceConfig getConfig(String[] captureTables) { + @Test + public void testAssigningInSnapshotOnlyMode() { + final String captureTable = "customers"; + + MySqlSourceConfig sourceConfig = + getConfig(new String[] {captureTable}, StartupOptions.snapshot()); + + // Create and initialize assigner + MySqlHybridSplitAssigner assigner = + new MySqlHybridSplitAssigner(sourceConfig, 1, new ArrayList<>(), false); + assigner.open(); + + // Get all snapshot splits + List snapshotSplits = drainSnapshotSplits(assigner); + + // Generate fake finished offsets from 0 to snapshotSplits.size() - 1 + int i = 0; + Map finishedOffsets = new HashMap<>(); + for (MySqlSnapshotSplit snapshotSplit : snapshotSplits) { + BinlogOffset binlogOffset = + BinlogOffset.builder().setBinlogFilePosition("foo", i++).build(); + finishedOffsets.put(snapshotSplit.splitId(), binlogOffset); + } + assigner.onFinishedSplits(finishedOffsets); + + // Get the binlog split + Optional split = assigner.getNext(); + assertTrue(split.isPresent()); + assertTrue(split.get() instanceof MySqlBinlogSplit); + MySqlBinlogSplit binlogSplit = split.get().asBinlogSplit(); + + // Validate if the stopping offset of the binlog split is the maximum among all finished + // offsets, which should be snapshotSplits.size() - 1 + assertEquals( + BinlogOffset.builder() + .setBinlogFilePosition("foo", snapshotSplits.size() - 1) + .build(), + binlogSplit.getEndingOffset()); + } + + private MySqlSourceConfig getConfig(String[] captureTables, StartupOptions startupOptions) { String[] captureTableIds = Arrays.stream(captureTables) .map(tableName -> customerDatabase.getDatabaseName() + "." + tableName) .toArray(String[]::new); return new MySqlSourceConfigFactory() - .startupOptions(StartupOptions.initial()) + .startupOptions(startupOptions) .databaseList(customerDatabase.getDatabaseName()) .tableList(captureTableIds) .hostname(MYSQL_CONTAINER.getHost()) @@ -156,4 +199,17 @@ private MySqlSourceConfig getConfig(String[] captureTables) { .serverTimeZone(ZoneId.of("UTC").toString()) .createConfig(0); } + + private List drainSnapshotSplits(MySqlHybridSplitAssigner assigner) { + List snapshotSplits = new ArrayList<>(); + while (true) { + Optional split = assigner.getNext(); + if (!split.isPresent()) { + break; + } + assertTrue(split.get() instanceof MySqlSnapshotSplit); + snapshotSplits.add(split.get().asSnapshotSplit()); + } + return snapshotSplits; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java index f9da62ccfc2..4f2b7cfa446 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java @@ -720,7 +720,7 @@ public void testValidation() { } catch (Throwable t) { String msg = "Invalid value for option 'scan.startup.mode'. Supported values are " - + "[initial, latest-offset, earliest-offset, specific-offset, timestamp], " + + "[initial, snapshot, latest-offset, earliest-offset, specific-offset, timestamp], " + "but was: abc"; assertTrue(ExceptionUtils.findThrowableWithMessage(t, msg).isPresent()); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceBuilder.java index c3ec1ee0e60..42f4f7e0b6e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceBuilder.java @@ -21,7 +21,6 @@ import org.apache.flink.util.FlinkRuntimeException; import com.ververica.cdc.common.annotation.Experimental; -import com.ververica.cdc.connectors.base.options.StartupMode; import com.ververica.cdc.connectors.base.options.StartupOptions; import com.ververica.cdc.connectors.base.source.assigner.HybridSplitAssigner; import com.ververica.cdc.connectors.base.source.assigner.SplitAssigner; @@ -291,7 +290,7 @@ public SplitEnumerator createEnumerator( SplitEnumeratorContext enumContext) { final SplitAssigner splitAssigner; PostgresSourceConfig sourceConfig = (PostgresSourceConfig) configFactory.create(0); - if (sourceConfig.getStartupOptions().startupMode == StartupMode.INITIAL) { + if (!sourceConfig.getStartupOptions().isStreamOnly()) { try { final List remainingTables = dataSourceDialect.discoverDataCollections(sourceConfig); @@ -315,7 +314,11 @@ public SplitEnumerator createEnumerator( } return new PostgresSourceEnumerator( - enumContext, sourceConfig, splitAssigner, (PostgresDialect) dataSourceDialect); + enumContext, + sourceConfig, + splitAssigner, + (PostgresDialect) dataSourceDialect, + this.getBoundedness()); } public static PostgresSourceBuilder builder() { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java index 40164b9e462..485efbdbea3 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java @@ -16,9 +16,11 @@ package com.ververica.cdc.connectors.postgres.source.enumerator; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.util.FlinkRuntimeException; +import com.ververica.cdc.common.annotation.Internal; import com.ververica.cdc.connectors.base.source.assigner.SplitAssigner; import com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; @@ -32,6 +34,7 @@ * The Postgres source enumerator that enumerates receive the split request and assign the split to * source readers. */ +@Internal public class PostgresSourceEnumerator extends IncrementalSourceEnumerator { private final PostgresDialect postgresDialect; @@ -40,8 +43,9 @@ public PostgresSourceEnumerator( SplitEnumeratorContext context, PostgresSourceConfig sourceConfig, SplitAssigner splitAssigner, - PostgresDialect postgresDialect) { - super(context, sourceConfig, splitAssigner); + PostgresDialect postgresDialect, + Boundedness boundedness) { + super(context, sourceConfig, splitAssigner, boundedness); this.postgresDialect = postgresDialect; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java index 24425bcf5d6..c01afc1c4e4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java @@ -36,7 +36,6 @@ import io.debezium.connector.postgresql.spi.SlotState; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource; -import io.debezium.pipeline.source.spi.ChangeEventSource; import io.debezium.pipeline.source.spi.SnapshotProgressListener; import io.debezium.pipeline.spi.SnapshotResult; import io.debezium.relational.RelationalSnapshotChangeEventSource; @@ -104,8 +103,8 @@ protected void executeDataSnapshot(Context context) throws Exception { ctx.getSnapshotChangeEventSourceMetrics(), snapshotSplit); - PostgresChangeEventSourceContext changeEventSourceContext = - new PostgresChangeEventSourceContext(); + StoppableChangeEventSourceContext changeEventSourceContext = + new StoppableChangeEventSourceContext(); SnapshotResult snapshotResult = snapshotSplitReadTask.execute( changeEventSourceContext, ctx.getPartition(), ctx.getOffsetContext()); @@ -146,7 +145,7 @@ protected void executeBackfillTask(Context context, StreamSplit backfillStreamSp snapshotSplit, ((PostgresSourceConfig) ctx.getSourceConfig()).getSlotNameForBackfillTask()); backfillReadTask.execute( - new PostgresChangeEventSourceContext(), ctx.getPartition(), postgresOffsetContext); + new StoppableChangeEventSourceContext(), ctx.getPartition(), postgresOffsetContext); } /** @@ -205,18 +204,6 @@ private void maybeDropSlotForBackFillReadTask( } } - class PostgresChangeEventSourceContext implements ChangeEventSource.ChangeEventSourceContext { - - public void finished() { - taskRunning = false; - } - - @Override - public boolean isRunning() { - return taskRunning; - } - } - /** A SnapshotChangeEventSource implementation for Postgres to read snapshot split. */ public static class PostgresSnapshotSplitReadTask extends AbstractSnapshotChangeEventSource { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java index 40ed521fe35..da1b3e3c1fa 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java @@ -90,8 +90,8 @@ public void execute(Context context) throws Exception { sourceFetchContext.getTaskContext(), sourceFetchContext.getReplicationConnection(), split); - StreamSplitChangeEventSourceContext changeEventSourceContext = - new StreamSplitChangeEventSourceContext(); + StoppableChangeEventSourceContext changeEventSourceContext = + new StoppableChangeEventSourceContext(); streamSplitReadTask.execute( changeEventSourceContext, sourceFetchContext.getPartition(), @@ -102,7 +102,8 @@ public void execute(Context context) throws Exception { public void close() { LOG.debug("stopping StreamFetchTask for split: {}", split); if (streamSplitReadTask != null) { - ((StreamSplitChangeEventSourceContext) streamSplitReadTask.context).finished(); + ((StoppableChangeEventSourceContext) (streamSplitReadTask.context)) + .stopChangeEventSource(); } stopped = true; taskRunning = false; @@ -144,19 +145,6 @@ public void commitCurrentOffset() { } } - private class StreamSplitChangeEventSourceContext - implements ChangeEventSource.ChangeEventSourceContext { - - public void finished() { - taskRunning = false; - } - - @Override - public boolean isRunning() { - return taskRunning; - } - } - /** A {@link ChangeEventSource} implementation for Postgres to read streaming changes. */ public static class StreamSplitReadTask extends PostgresStreamingChangeEventSource { private static final Logger LOG = LoggerFactory.getLogger(StreamSplitReadTask.class); @@ -228,7 +216,7 @@ public void execute( new FlinkRuntimeException("Error processing WAL signal event", e)); } - ((PostgresScanFetchTask.PostgresChangeEventSourceContext) context).finished(); + ((StoppableChangeEventSourceContext) context).stopChangeEventSource(); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/StoppableChangeEventSourceContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/StoppableChangeEventSourceContext.java new file mode 100644 index 00000000000..37ce7564fca --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/StoppableChangeEventSourceContext.java @@ -0,0 +1,38 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.postgres.source.fetch; + +import io.debezium.pipeline.source.spi.ChangeEventSource; + +/** + * A change event source context that can stop the running source by invoking {@link + * #stopChangeEventSource()}. + */ +public class StoppableChangeEventSourceContext + implements ChangeEventSource.ChangeEventSourceContext { + + private volatile boolean isRunning = true; + + public void stopChangeEventSource() { + isRunning = false; + } + + @Override + public boolean isRunning() { + return isRunning; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactory.java index c388c28b5b1..aa8ba051daa 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactory.java @@ -200,6 +200,8 @@ public Set> optionalOptions() { } private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial"; + + private static final String SCAN_STARTUP_MODE_VALUE_SNAPSHOT = "snapshot"; private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset"; private static StartupOptions getStartupOptions(ReadableConfig config) { @@ -208,16 +210,18 @@ private static StartupOptions getStartupOptions(ReadableConfig config) { switch (modeString.toLowerCase()) { case SCAN_STARTUP_MODE_VALUE_INITIAL: return StartupOptions.initial(); - + case SCAN_STARTUP_MODE_VALUE_SNAPSHOT: + return StartupOptions.snapshot(); case SCAN_STARTUP_MODE_VALUE_LATEST: return StartupOptions.latest(); default: throw new ValidationException( String.format( - "Invalid value for option '%s'. Supported values are [%s, %s], but was: %s", + "Invalid value for option '%s'. Supported values are [%s, %s, %s], but was: %s", SCAN_STARTUP_MODE.key(), SCAN_STARTUP_MODE_VALUE_INITIAL, + SCAN_STARTUP_MODE_VALUE_SNAPSHOT, SCAN_STARTUP_MODE_VALUE_LATEST, modeString)); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java index f5ce180e283..5681881d623 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java @@ -34,6 +34,7 @@ import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.ExceptionUtils; +import com.ververica.cdc.connectors.base.options.StartupOptions; import com.ververica.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHook; import com.ververica.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks; import com.ververica.cdc.connectors.postgres.PostgresTestBase; @@ -89,6 +90,7 @@ public class PostgresSourceITCase extends PostgresTestBase { private static final int USE_POST_LOWWATERMARK_HOOK = 1; private static final int USE_PRE_HIGHWATERMARK_HOOK = 2; + private static final int USE_POST_HIGHWATERMARK_HOOK = 3; private final String scanStartupMode; @@ -323,13 +325,81 @@ public void testDebeziumSlotDropOnStop() throws Exception { tableResult.getJobClient().get().cancel().get(); } + @Test + public void testSnapshotOnlyModeWithDMLPostHighWaterMark() throws Exception { + // The data num is 21, set fetchSize = 22 to test the job is bounded. + List records = + testBackfillWhenWritingEvents( + false, 22, USE_POST_HIGHWATERMARK_HOOK, StartupOptions.snapshot()); + List expectedRecords = + Arrays.asList( + "+I[101, user_1, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[109, user_4, Shanghai, 123567891234]", + "+I[110, user_5, Shanghai, 123567891234]", + "+I[111, user_6, Shanghai, 123567891234]", + "+I[118, user_7, Shanghai, 123567891234]", + "+I[121, user_8, Shanghai, 123567891234]", + "+I[123, user_9, Shanghai, 123567891234]", + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[1019, user_20, Shanghai, 123567891234]", + "+I[2000, user_21, Shanghai, 123567891234]"); + assertEqualsInAnyOrder(expectedRecords, records); + } + + @Test + public void testSnapshotOnlyModeWithDMLPreHighWaterMark() throws Exception { + // The data num is 21, set fetchSize = 22 to test the job is bounded + List records = + testBackfillWhenWritingEvents( + false, 22, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.snapshot()); + List expectedRecords = + Arrays.asList( + "+I[101, user_1, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[109, user_4, Shanghai, 123567891234]", + "+I[110, user_5, Shanghai, 123567891234]", + "+I[111, user_6, Shanghai, 123567891234]", + "+I[118, user_7, Shanghai, 123567891234]", + "+I[121, user_8, Shanghai, 123567891234]", + "+I[123, user_9, Shanghai, 123567891234]", + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[2000, user_21, Pittsburgh, 123567891234]", + "+I[15213, user_15213, Shanghai, 123567891234]"); + // when enable backfill, the wal log between (snapshot, high_watermark) will be + // applied as snapshot image + assertEqualsInAnyOrder(expectedRecords, records); + } + @Test public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception { if (!DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) { return; } - List records = testBackfillWhenWritingEvents(false, 21, USE_PRE_HIGHWATERMARK_HOOK); + List records = + testBackfillWhenWritingEvents( + false, 21, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.initial()); List expectedRecords = Arrays.asList( @@ -365,7 +435,9 @@ public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception { return; } - List records = testBackfillWhenWritingEvents(false, 21, USE_POST_LOWWATERMARK_HOOK); + List records = + testBackfillWhenWritingEvents( + false, 21, USE_POST_LOWWATERMARK_HOOK, StartupOptions.initial()); List expectedRecords = Arrays.asList( @@ -401,7 +473,9 @@ public void testSkipBackfillWithDMLPreHighWaterMark() throws Exception { return; } - List records = testBackfillWhenWritingEvents(true, 25, USE_PRE_HIGHWATERMARK_HOOK); + List records = + testBackfillWhenWritingEvents( + true, 25, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.initial()); List expectedRecords = Arrays.asList( @@ -441,7 +515,9 @@ public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception { return; } - List records = testBackfillWhenWritingEvents(true, 25, USE_POST_LOWWATERMARK_HOOK); + List records = + testBackfillWhenWritingEvents( + true, 25, USE_POST_LOWWATERMARK_HOOK, StartupOptions.initial()); List expectedRecords = Arrays.asList( @@ -477,7 +553,11 @@ public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception { } private List testBackfillWhenWritingEvents( - boolean skipSnapshotBackfill, int fetchSize, int hookType) throws Exception { + boolean skipSnapshotBackfill, + int fetchSize, + int hookType, + StartupOptions startupOptions) + throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); env.setParallelism(1); @@ -504,6 +584,7 @@ private List testBackfillWhenWritingEvents( .database(customDatabase.getDatabaseName()) .slotName(slotName) .tableList(tableId) + .startupOptions(startupOptions) .skipSnapshotBackfill(skipSnapshotBackfill) .deserializer(customerTable.getDeserializer()) .build(); @@ -528,10 +609,16 @@ private List testBackfillWhenWritingEvents( } }; - if (hookType == USE_POST_LOWWATERMARK_HOOK) { - hooks.setPostLowWatermarkAction(snapshotPhaseHook); - } else if (hookType == USE_PRE_HIGHWATERMARK_HOOK) { - hooks.setPreHighWatermarkAction(snapshotPhaseHook); + switch (hookType) { + case USE_POST_LOWWATERMARK_HOOK: + hooks.setPostLowWatermarkAction(snapshotPhaseHook); + break; + case USE_PRE_HIGHWATERMARK_HOOK: + hooks.setPreHighWatermarkAction(snapshotPhaseHook); + break; + case USE_POST_HIGHWATERMARK_HOOK: + hooks.setPostHighWatermarkAction(snapshotPhaseHook); + break; } source.setSnapshotHooks(hooks); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/table/StartupMode.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/table/StartupMode.java deleted file mode 100644 index c0f62da60b2..00000000000 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/table/StartupMode.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2023 Ververica Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ververica.cdc.connectors.sqlserver.table; - -/** - * Startup modes for the SqlServer CDC Consumer. - * - * @see StartupOptions - */ -public enum StartupMode { - INITIAL, - INITIAL_ONLY, - LATEST_OFFSET, -}