Skip to content

Commit

Permalink
[Bugfix][CDC base] Fix CDC job cannot consume incremental data After …
Browse files Browse the repository at this point in the history
…restore run (apache#625) (apache#6094)
  • Loading branch information
ic4y authored and DESKTOP-GHPCOV0\dingaolong committed Jan 4, 2024
1 parent 622c37b commit 0f255dd
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;

/** Assigner for snapshot split. */
public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssigner {
Expand Down Expand Up @@ -129,6 +130,16 @@ private SnapshotSplitAssigner(
this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
this.isTableIdCaseSensitive = isTableIdCaseSensitive;
this.dialect = dialect;

LOG.info("SnapshotSplitAssigner created with remaining tables: {}", this.remainingTables);
LOG.info(
"SnapshotSplitAssigner created with remaining splits: [{}]",
this.remainingSplits.stream()
.map(SnapshotSplit::splitId)
.collect(Collectors.joining(",")));
LOG.info(
"SnapshotSplitAssigner created with assigned splits: {}",
this.assignedSplits.keySet());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand All @@ -67,6 +68,8 @@ public class IncrementalSourceReader<T, C extends SourceConfig>
private final C sourceConfig;
private final DebeziumDeserializationSchema<T> debeziumDeserializationSchema;

private final AtomicBoolean needSendSplitRequest = new AtomicBoolean(false);

public IncrementalSourceReader(
BlockingQueue<RecordsWithSplitIds<SourceRecords>> elementsQueue,
Supplier<IncrementalSourceSplitReader<C>> splitReaderSupplier,
Expand Down Expand Up @@ -95,6 +98,10 @@ public void pollNext(Collector<T> output) throws Exception {
}
running = true;
}
if (needSendSplitRequest.get()) {
context.sendSplitRequest();
needSendSplitRequest.compareAndSet(true, false);
}
super.pollNext(output);
}

Expand All @@ -105,11 +112,19 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {}
public void addSplits(List<SourceSplitBase> splits) {
// restore for finishedUnackedSplits
List<SourceSplitBase> unfinishedSplits = new ArrayList<>();
log.info(
"subtask {} add splits: {}",
subtaskId,
splits.stream().map(SourceSplitBase::splitId).collect(Collectors.joining(",")));
for (SourceSplitBase split : splits) {
if (split.isSnapshotSplit()) {
SnapshotSplit snapshotSplit = split.asSnapshotSplit();
if (snapshotSplit.isSnapshotReadFinished()) {
finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);
log.info(
"subtask {} add finished split: {}",
subtaskId,
snapshotSplit.splitId());
} else {
unfinishedSplits.add(split);
}
Expand All @@ -122,6 +137,12 @@ public void addSplits(List<SourceSplitBase> splits) {
// add all un-finished splits (including incremental split) to SourceReaderBase
if (!unfinishedSplits.isEmpty()) {
super.addSplits(unfinishedSplits);
} else {
// If the split received is 'isSnapshotReadFinished', we will not run this split, hence
// we need to send the split request.
// We cannot directly execute context.sendSplitRequest() here, as it is a synchronous
// call and can lead to a deadlock.
needSendSplitRequest.set(true);
}
}

Expand Down

0 comments on commit 0f255dd

Please sign in to comment.