Skip to content

Commit

Permalink
[FLINK-36742][cdc-base][oracle] Filter unacked split for no capture t…
Browse files Browse the repository at this point in the history
…ables when task restore from state
  • Loading branch information
gong committed Nov 18, 2024
1 parent 8e6c361 commit 9e160b3
Show file tree
Hide file tree
Showing 3 changed files with 379 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.meta.events.FinishedSnapshotSplitsAckEvent;
Expand Down Expand Up @@ -261,13 +262,14 @@ private void addSplits(List<SourceSplitBase> splits, boolean checkTableChangeFor
for (SourceSplitBase split : splits) {
if (split.isSnapshotSplit()) {
SnapshotSplit snapshotSplit = split.asSnapshotSplit();
if (snapshotSplit.isSnapshotReadFinished()) {
finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);
} else if (dialect.isIncludeDataCollection(
sourceConfig, snapshotSplit.getTableId())) {
unfinishedSplits.add(split);
if (dialect.isIncludeDataCollection(sourceConfig, snapshotSplit.getTableId())) {
if (snapshotSplit.isSnapshotReadFinished()) {
finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);
} else {
unfinishedSplits.add(split);
}
} else {
LOG.debug(
LOG.info(
"The subtask {} is skipping split {} because it does not match new table filter.",
subtaskId,
split.splitId());
Expand Down Expand Up @@ -320,8 +322,9 @@ private void addSplits(List<SourceSplitBase> splits, boolean checkTableChangeFor
// add all un-finished splits (including binlog split) to SourceReaderBase
if (!unfinishedSplits.isEmpty()) {
super.addSplits(unfinishedSplits);
} else if (suspendedStreamSplit
!= null) { // only request new snapshot split if the stream split is suspended
} else if (suspendedStreamSplit != null
|| getNumberOfCurrentlyAssignedSplits()
<= 1) { // only request new snapshot split if the stream split is suspended
context.sendSplitRequest();
}
}
Expand Down Expand Up @@ -541,4 +544,9 @@ private void logCurrentStreamOffsets(List<SourceSplitBase> splits, long checkpoi
LOG.info("Stream split offset on checkpoint {}: {}", checkpointId, offset);
}
}

@VisibleForTesting
public Map<String, SnapshotSplit> getFinishedUnackedSplits() {
return finishedUnackedSplits;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ limitations under the License.
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Loading

0 comments on commit 9e160b3

Please sign in to comment.