Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ public String identifier() {
}

private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";

private static final String SCAN_STARTUP_MODE_VALUE_SNAPSHOT = "snapshot";
private static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset";
private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET = "specific-offset";
Expand All @@ -221,7 +223,8 @@ private static StartupOptions getStartupOptions(Configuration config) {
switch (modeString.toLowerCase()) {
case SCAN_STARTUP_MODE_VALUE_INITIAL:
return StartupOptions.initial();

case SCAN_STARTUP_MODE_VALUE_SNAPSHOT:
return StartupOptions.snapshot();
case SCAN_STARTUP_MODE_VALUE_LATEST:
return StartupOptions.latest();

Expand All @@ -238,9 +241,10 @@ private static StartupOptions getStartupOptions(Configuration config) {
default:
throw new ValidationException(
String.format(
"Invalid value for option '%s'. Supported values are [%s, %s, %s, %s, %s], but was: %s",
"Invalid value for option '%s'. Supported values are [%s, %s, %s, %s, %s, %s], but was: %s",
SCAN_STARTUP_MODE.key(),
SCAN_STARTUP_MODE_VALUE_INITIAL,
SCAN_STARTUP_MODE_VALUE_SNAPSHOT,
SCAN_STARTUP_MODE_VALUE_LATEST,
SCAN_STARTUP_MODE_VALUE_EARLIEST,
SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ public JdbcSourceConfigFactory chunkKeyColumn(String chunkKeyColumn) {
public JdbcSourceConfigFactory startupOptions(StartupOptions startupOptions) {
switch (startupOptions.startupMode) {
case INITIAL:
case SNAPSHOT:
case LATEST_OFFSET:
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ public enum StartupMode {

SPECIFIC_OFFSETS,

TIMESTAMP
TIMESTAMP,
SNAPSHOT
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ public static StartupOptions initial() {
return new StartupOptions(StartupMode.INITIAL, null, null, null);
}

/**
* Performs an initial snapshot on the monitored database tables upon first startup, and not
* read the binlog anymore .
*/
public static StartupOptions snapshot() {
return new StartupOptions(StartupMode.SNAPSHOT, null, null, null);
}

/**
* Never to perform snapshot on the monitored database tables upon first startup, just read from
* the beginning of the change log. This should be used with care, as it is only valid when the
Expand Down Expand Up @@ -89,6 +97,7 @@ private StartupOptions(

switch (startupMode) {
case INITIAL:
case SNAPSHOT:
case EARLIEST_OFFSET:
case LATEST_OFFSET:
break;
Expand All @@ -104,6 +113,17 @@ private StartupOptions(
}
}

public boolean isStreamOnly() {
return startupMode == StartupMode.EARLIEST_OFFSET
|| startupMode == StartupMode.LATEST_OFFSET
|| startupMode == StartupMode.SPECIFIC_OFFSETS
|| startupMode == StartupMode.TIMESTAMP;
}

public boolean isSnapshotOnly() {
return startupMode == StartupMode.SNAPSHOT;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.ververica.cdc.common.annotation.VisibleForTesting;
import com.ververica.cdc.connectors.base.config.SourceConfig;
import com.ververica.cdc.connectors.base.dialect.DataSourceDialect;
import com.ververica.cdc.connectors.base.options.StartupMode;
import com.ververica.cdc.connectors.base.source.assigner.HybridSplitAssigner;
import com.ververica.cdc.connectors.base.source.assigner.SplitAssigner;
import com.ververica.cdc.connectors.base.source.assigner.StreamSplitAssigner;
Expand Down Expand Up @@ -101,7 +100,12 @@ public OffsetFactory getOffsetFactory() {

@Override
public Boundedness getBoundedness() {
return Boundedness.CONTINUOUS_UNBOUNDED;
C sourceConfig = configFactory.create(0);
if (sourceConfig.getStartupOptions().isSnapshotOnly()) {
return Boundedness.BOUNDED;
} else {
return Boundedness.CONTINUOUS_UNBOUNDED;
}
}

@Override
Expand Down Expand Up @@ -139,7 +143,7 @@ public SplitEnumerator<SourceSplitBase, PendingSplitsState> createEnumerator(
SplitEnumeratorContext<SourceSplitBase> enumContext) {
C sourceConfig = configFactory.create(0);
final SplitAssigner splitAssigner;
if (sourceConfig.getStartupOptions().startupMode == StartupMode.INITIAL) {
if (!sourceConfig.getStartupOptions().isStreamOnly()) {
try {
final List<TableId> remainingTables =
dataSourceDialect.discoverDataCollections(sourceConfig);
Expand All @@ -161,7 +165,8 @@ public SplitEnumerator<SourceSplitBase, PendingSplitsState> createEnumerator(
splitAssigner = new StreamSplitAssigner(sourceConfig, dataSourceDialect, offsetFactory);
}

return new IncrementalSourceEnumerator(enumContext, sourceConfig, splitAssigner);
return new IncrementalSourceEnumerator(
enumContext, sourceConfig, splitAssigner, getBoundedness());
}

@Override
Expand Down Expand Up @@ -189,7 +194,8 @@ public SplitEnumerator<SourceSplitBase, PendingSplitsState> restoreEnumerator(
throw new UnsupportedOperationException(
"Unsupported restored PendingSplitsState: " + checkpoint);
}
return new IncrementalSourceEnumerator(enumContext, sourceConfig, splitAssigner);
return new IncrementalSourceEnumerator(
enumContext, sourceConfig, splitAssigner, getBoundedness());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class HybridSplitAssigner<C extends SourceConfig> implements SplitAssigne
private static final String STREAM_SPLIT_ID = "stream-split";

private final int splitMetaGroupSize;
private final C sourceConfig;

private boolean isStreamSplitAssigned;

Expand All @@ -61,6 +62,7 @@ public HybridSplitAssigner(
DataSourceDialect<C> dialect,
OffsetFactory offsetFactory) {
this(
sourceConfig,
new SnapshotSplitAssigner<>(
sourceConfig,
currentParallelism,
Expand All @@ -80,6 +82,7 @@ public HybridSplitAssigner(
DataSourceDialect<C> dialect,
OffsetFactory offsetFactory) {
this(
sourceConfig,
new SnapshotSplitAssigner<>(
sourceConfig,
currentParallelism,
Expand All @@ -92,10 +95,12 @@ public HybridSplitAssigner(
}

private HybridSplitAssigner(
C sourceConfig,
SnapshotSplitAssigner<C> snapshotSplitAssigner,
boolean isStreamSplitAssigned,
int splitMetaGroupSize,
OffsetFactory offsetFactory) {
this.sourceConfig = sourceConfig;
this.snapshotSplitAssigner = snapshotSplitAssigner;
this.isStreamSplitAssigned = isStreamSplitAssigned;
this.splitMetaGroupSize = splitMetaGroupSize;
Expand Down Expand Up @@ -179,8 +184,8 @@ public void notifyCheckpointComplete(long checkpointId) {
}

@Override
public boolean isStreamSplitAssigned() {
return isStreamSplitAssigned;
public boolean noMoreSplits() {
return snapshotSplitAssigner.noMoreSplits() && isStreamSplitAssigned;
}

@Override
Expand All @@ -199,13 +204,17 @@ public StreamSplit createStreamSplit() {
Map<String, Offset> splitFinishedOffsets = snapshotSplitAssigner.getSplitFinishedOffsets();
final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();

Offset minOffset = null;
Offset minOffset = null, maxOffset = null;
for (SchemalessSnapshotSplit split : assignedSnapshotSplit) {
// find the min offset of change log
// find the min and max offset of change log
Offset changeLogOffset = splitFinishedOffsets.get(split.splitId());
if (minOffset == null || changeLogOffset.isBefore(minOffset)) {
minOffset = changeLogOffset;
}
if (maxOffset == null || changeLogOffset.isAfter(maxOffset)) {
maxOffset = changeLogOffset;
}

finishedSnapshotSplitInfos.add(
new FinishedSnapshotSplitInfo(
split.getTableId(),
Expand All @@ -216,14 +225,21 @@ public StreamSplit createStreamSplit() {
offsetFactory));
}

// If the source is running in snapshot mode, we use the highest watermark among
// snapshot splits as the ending offset to provide a consistent snapshot view at the moment
// of high watermark.
Offset stoppingOffset = offsetFactory.createNoStoppingOffset();
if (sourceConfig.getStartupOptions().isSnapshotOnly()) {
stoppingOffset = maxOffset;
}

// the finishedSnapshotSplitInfos is too large for transmission, divide it to groups and
// then transfer them

boolean divideMetaToGroups = finishedSnapshotSplitInfos.size() > splitMetaGroupSize;
return new StreamSplit(
STREAM_SPLIT_ID,
minOffset == null ? offsetFactory.createInitialOffset() : minOffset,
offsetFactory.createNoStoppingOffset(),
stoppingOffset,
divideMetaToGroups ? new ArrayList<>() : finishedSnapshotSplitInfos,
new HashMap<>(),
finishedSnapshotSplitInfos.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ public void notifyCheckpointComplete(long checkpointId) {
@Override
public void close() {}

/** Indicates there is no more splits available in this assigner. */
@Override
public boolean noMoreSplits() {
return remainingTables.isEmpty() && remainingSplits.isEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,8 @@ public interface SplitAssigner {
*/
boolean waitingForFinishedSplits();

/** Whether the split assigner is finished stream split assigning. */
default boolean isStreamSplitAssigned() {
throw new UnsupportedOperationException("Not support to assigning StreamSplit.");
}
/** Indicates there is no more splits available in this assigner. */
boolean noMoreSplits();

/**
* Gets the finished splits' information. This is useful metadata to generate a stream split
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void notifyCheckpointComplete(long checkpointId) {
}

@Override
public boolean isStreamSplitAssigned() {
public boolean noMoreSplits() {
return isStreamSplitAssigned;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.ververica.cdc.connectors.base.source.enumerator;

import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
Expand Down Expand Up @@ -66,14 +67,18 @@ public class IncrementalSourceEnumerator
private final TreeSet<Integer> readersAwaitingSplit;
private List<List<FinishedSnapshotSplitInfo>> finishedSnapshotSplitMeta;

private Boundedness boundedness;

public IncrementalSourceEnumerator(
SplitEnumeratorContext<SourceSplitBase> context,
SourceConfig sourceConfig,
SplitAssigner splitAssigner) {
SplitAssigner splitAssigner,
Boundedness boundedness) {
this.context = context;
this.sourceConfig = sourceConfig;
this.splitAssigner = splitAssigner;
this.readersAwaitingSplit = new TreeSet<>();
this.boundedness = boundedness;
}

@Override
Expand Down Expand Up @@ -163,7 +168,7 @@ private void assignSplits() {
continue;
}

if (splitAssigner.isStreamSplitAssigned() && sourceConfig.isCloseIdleReaders()) {
if (shouldCloseIdleReader()) {
// close idle readers when snapshot phase finished.
context.signalNoMoreSplits(nextAwaiting);
awaitingReader.remove();
Expand All @@ -184,6 +189,17 @@ private void assignSplits() {
}
}

private boolean shouldCloseIdleReader() {
// When no unassigned split anymore, Signal NoMoreSplitsEvent to awaiting reader in two
// situations:
// 1. When Set StartupMode = snapshot mode(also bounded), there's no more splits in the
// assigner.
// 2. When set scan.incremental.close-idle-reader.enabled = true, there's no more splits in
// the assigner.
return splitAssigner.noMoreSplits()
&& (boundedness == Boundedness.BOUNDED || (sourceConfig.isCloseIdleReaders()));
}

private int[] getRegisteredReader() {
return this.context.registeredReaders().keySet().stream()
.mapToInt(Integer::intValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
protected void onSplitFinished(Map<String, SourceSplitState> finishedSplitIds) {
for (SourceSplitState splitState : finishedSplitIds.values()) {
SourceSplitBase sourceSplit = splitState.toSourceSplit();
if (sourceConfig.getStartupOptions().isSnapshotOnly() && sourceSplit.isStreamSplit()) {
// when startupMode = SNAPSHOT. the stream split could finish.
continue;
}
checkState(
sourceSplit.isSnapshotSplit(),
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public RecordsWithSplitIds<SourceRecords> fetch() throws IOException {
throw new IOException(e);
}
return dataIt == null
? finishedSnapshotSplit()
? finishedSplit()
: ChangeEventRecords.forRecords(currentSplitId, dataIt);
}

Expand Down Expand Up @@ -154,7 +154,7 @@ public boolean canAssignNextSplit() {
return currentFetcher == null || currentFetcher.isFinished();
}

private ChangeEventRecords finishedSnapshotSplit() {
private ChangeEventRecords finishedSplit() {
final ChangeEventRecords finishedRecords =
ChangeEventRecords.forFinishedSplit(currentSplitId);
currentSplitId = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -88,13 +87,18 @@ public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
try {
streamFetchTask.execute(taskContext);
} catch (Exception e) {
this.currentTaskRunning = false;
LOG.error(
String.format(
"Execute stream read task for stream split %s fail",
currentStreamSplit),
e);
readException = e;
} finally {
try {
stopReadTask();
} catch (Exception e) {
throw new RuntimeException(e);
}
Comment on lines +96 to +101
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the stream split is unbounded before this PR, we set this.currentTaskRunning = false; when error exception in catch , and stream split never will stop for other reason.
Now in INCREMENTAL_SNAPSHOT_ONLY mode, the job will stop after reaching to the highwatermark.

}
});
}
Expand Down Expand Up @@ -122,7 +126,7 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
sourceRecordsSet.add(new SourceRecords(sourceRecords));
return sourceRecordsSet.iterator();
} else {
return Collections.emptyIterator();
return null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ public MongoDBSourceConfigFactory startupOptions(StartupOptions startupOptions)
checkNotNull(startupOptions);
switch (startupOptions.startupMode) {
case INITIAL:
case SNAPSHOT:
case LATEST_OFFSET:
case TIMESTAMP:
this.startupOptions = startupOptions;
Expand Down
Loading