Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzy15 committed Jul 9, 2023
1 parent 8b4750a commit 6d7007b
Showing 1 changed file with 5 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
Expand All @@ -39,8 +38,6 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import static org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.getTableId;

/**
* Fetcher to fetch data from table split, the split is the incremental split {@link
* IncrementalSplit}.
Expand Down Expand Up @@ -150,11 +147,12 @@ public void close() {
private boolean shouldEmit(SourceRecord sourceRecord) {
if (taskContext.isDataChangeRecord(sourceRecord)) {
Offset position = taskContext.getStreamOffset(sourceRecord);
TableId tableId = getTableId(sourceRecord);
// TableId tableId = getTableId(sourceRecord);
if (!taskContext.isExactlyOnce()) {
log.trace(
"The table {} is not support exactly-once, so ignore the watermark check",
tableId);
// log.trace(
// "The table {} is not support exactly-once, so ignore the
// watermark check",
// tableId);
return position.isAfter(splitStartWatermark);
}
// TODO only the table who captured snapshot splits need to filter( Used to support
Expand Down

0 comments on commit 6d7007b

Please sign in to comment.