diff --git a/docs/content/docs/connectors/flink-sources/postgres-cdc.md b/docs/content/docs/connectors/flink-sources/postgres-cdc.md
index f839c4380e1..1e24f31d1a5 100644
--- a/docs/content/docs/connectors/flink-sources/postgres-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/postgres-cdc.md
@@ -28,8 +28,7 @@ under the License.
The Postgres CDC connector allows for reading snapshot data and incremental data from PostgreSQL database. This document describes how to setup the Postgres CDC connector to run SQL queries against PostgreSQL databases.
-Dependencies
-------------
+## Dependencies
In order to setup the Postgres CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
@@ -45,8 +44,7 @@ Download [flink-sql-connector-postgres-cdc-3.0.1.jar](https://repo1.maven.org/ma
**Note:** Refer to [flink-sql-connector-postgres-cdc](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-postgres-cdc), more released versions will be available in the Maven central warehouse.
-How to create a Postgres CDC table
-----------------
+## How to create a Postgres CDC table
The Postgres CDC table can be defined as following:
@@ -76,8 +74,7 @@ CREATE TABLE shipments (
SELECT * FROM shipments;
```
-Connector Options
-----------------
+## Connector Options
@@ -236,12 +233,29 @@ Connector Options
so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
+
+ | scan.lsn-commit.checkpoints-num-delay |
+ optional |
+ 3 |
+ Integer |
+ The number of checkpoint delays before starting to commit the LSN offsets.
+ The checkpoint LSN offsets will be committed in rolling fashion, the earliest checkpoint identifier will be committed first from the delayed checkpoints.
+ |
+
-Note: `slot.name` is recommended to set for different tables to avoid the potential `PSQLException: ERROR: replication slot "flink" is active for PID 974` error. See more [here](https://debezium.io/documentation/reference/2.0/connectors/postgresql.html#postgresql-property-slot-name).
+### Notes
+
+#### `slot.name` option
+
+The `slot.name` is recommended to set for different tables to avoid the potential `PSQLException: ERROR: replication slot "flink" is active for PID 974` error. See more [here](https://debezium.io/documentation/reference/2.0/connectors/postgresql.html#postgresql-property-slot-name).
+
+#### `scan.lsn-commit.checkpoints-num-delay` option
+
+When consuming PostgreSQL logs, the LSN offset must be committed to trigger the log data cleanup for the corresponding slot. However, once the LSN offset is committed, earlier offsets become invalid. To ensure access to earlier LSN offsets for job recovery, we delay the LSN commit by `scan.lsn-commit.checkpoints-num-delay` (default value is `3`) checkpoints. This feature is available when config option `scan.incremental.snapshot.enabled` is set to true.
### Incremental Snapshot Options
@@ -340,8 +354,7 @@ The following options is available only when `scan.incremental.snapshot.enabled=
-Available Metadata
-----------------
+## Available Metadata
The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.
@@ -377,8 +390,7 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a
-Limitation
---------
+## Limitation
### Can't perform checkpoint during scanning snapshot of tables when incremental snapshot is disabled
@@ -417,8 +429,7 @@ CREATE TABLE products (
);
```
-Features
---------
+## Features
### Incremental Snapshot Reading (Experimental)
@@ -522,8 +533,7 @@ public class PostgreSQLSourceExample {
}
```
-Data Type Mapping
-----------------
+## Data Type Mapping
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/pom.xml b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/pom.xml
index b0763b1d74f..4fc75d0ca55 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/pom.xml
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/pom.xml
@@ -164,10 +164,16 @@ limitations under the License.
${json-path.version}
test
+
+ org.apache.flink
+ flink-connector-test-utils
+ ${flink.version}
+ test
+
-
\ No newline at end of file
+
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java
index 5e83f6685b3..25a5186d175 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java
@@ -63,13 +63,11 @@
/** The dialect for Postgres. */
public class PostgresDialect implements JdbcDataSourceDialect {
private static final long serialVersionUID = 1L;
-
private static final String CONNECTION_NAME = "postgres-cdc-connector";
+
private final PostgresSourceConfig sourceConfig;
private transient Tables.TableFilter filters;
-
private transient CustomPostgresSchema schema;
-
@Nullable private PostgresStreamFetchTask streamFetchTask;
public PostgresDialect(PostgresSourceConfig sourceConfig) {
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
index 6fa882f5671..ee991a70f81 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
@@ -280,6 +280,12 @@ public PostgresSourceBuilder scanNewlyAddedTableEnabled(boolean scanNewlyAdde
return this;
}
+ /** Set the {@code LSN} checkpoints delay number for Postgres to commit the offsets. */
+ public PostgresSourceBuilder lsnCommitCheckpointsDelay(int lsnCommitDelay) {
+ this.configFactory.setLsnCommitCheckpointsDelay(lsnCommitDelay);
+ return this;
+ }
+
/**
* Build the {@link PostgresIncrementalSource}.
*
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
index c20e57fabb7..a4e30da11d2 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
@@ -37,6 +37,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
private static final long serialVersionUID = 1L;
private final int subtaskId;
+ private final int lsnCommitCheckpointsDelay;
public PostgresSourceConfig(
int subtaskId,
@@ -64,7 +65,8 @@ public PostgresSourceConfig(
int connectionPoolSize,
@Nullable String chunkKeyColumn,
boolean skipSnapshotBackfill,
- boolean isScanNewlyAddedTableEnabled) {
+ boolean isScanNewlyAddedTableEnabled,
+ int lsnCommitCheckpointsDelay) {
super(
startupOptions,
databaseList,
@@ -92,14 +94,34 @@ public PostgresSourceConfig(
skipSnapshotBackfill,
isScanNewlyAddedTableEnabled);
this.subtaskId = subtaskId;
+ this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
}
+ /**
+ * Returns {@code subtaskId} value.
+ *
+ * @return subtask id
+ */
public int getSubtaskId() {
return subtaskId;
}
+ /**
+ * Returns {@code lsnCommitCheckpointsDelay} value.
+ *
+ * @return lsn commit checkpoint delay
+ */
+ public int getLsnCommitCheckpointsDelay() {
+ return this.lsnCommitCheckpointsDelay;
+ }
+
+ /**
+ * Returns the slot name for backfill task.
+ *
+ * @return backfill task slot name
+ */
public String getSlotNameForBackfillTask() {
- return getDbzProperties().getProperty(SLOT_NAME.name()) + "_" + subtaskId;
+ return getDbzProperties().getProperty(SLOT_NAME.name()) + "_" + getSubtaskId();
}
@Override
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
index 1490dd400e5..fb9054bd423 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
@@ -50,6 +50,8 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory {
private List schemaList;
+ private int lsnCommitCheckpointsDelay;
+
/** Creates a new {@link PostgresSourceConfig} for the given subtask {@code subtaskId}. */
@Override
public PostgresSourceConfig create(int subtaskId) {
@@ -100,7 +102,7 @@ public PostgresSourceConfig create(int subtaskId) {
// The PostgresSource will do snapshot according to its StartupMode.
// Do not need debezium to do the snapshot work.
- props.put("snapshot.mode", "never");
+ props.setProperty("snapshot.mode", "never");
Configuration dbzConfiguration = Configuration.from(props);
return new PostgresSourceConfig(
@@ -129,7 +131,8 @@ public PostgresSourceConfig create(int subtaskId) {
connectionPoolSize,
chunkKeyColumn,
skipSnapshotBackfill,
- scanNewlyAddedTableEnabled);
+ scanNewlyAddedTableEnabled,
+ lsnCommitCheckpointsDelay);
}
/**
@@ -173,4 +176,9 @@ public void slotName(String slotName) {
public void heartbeatInterval(Duration heartbeatInterval) {
this.heartbeatInterval = heartbeatInterval;
}
+
+ /** The lsn commit checkpoints delay for Postgres. */
+ public void setLsnCommitCheckpointsDelay(int lsnCommitCheckpointsDelay) {
+ this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
+ }
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java
index 391458ded97..0443fa2e195 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java
@@ -78,4 +78,14 @@ public class PostgresSourceOptions extends JdbcSourceOptions {
.defaultValue(Duration.ofSeconds(30))
.withDescription(
"Optional interval of sending heartbeat event for tracing the latest available replication slot offsets");
+
+ public static final ConfigOption SCAN_LSN_COMMIT_CHECKPOINTS_DELAY =
+ ConfigOptions.key("scan.lsn-commit.checkpoints-num-delay")
+ .intType()
+ .defaultValue(3)
+ .withDescription(
+ "The number of checkpoint delays before starting to commit the LSN offsets.\n"
+ + "By setting this to higher value, the offset that is consumed by global slot will be "
+ + "committed after multiple checkpoint delays instead of after each checkpoint completion.\n"
+ + "This allows continuous recycle of log files in stream phase.");
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java
index f6d20100531..81bff8488ed 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java
@@ -28,6 +28,7 @@
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderContext;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderWithCommit;
+import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import org.apache.flink.cdc.connectors.postgres.source.events.OffsetCommitAckEvent;
import org.apache.flink.cdc.connectors.postgres.source.events.OffsetCommitEvent;
import org.apache.flink.configuration.Configuration;
@@ -38,6 +39,7 @@
import org.slf4j.LoggerFactory;
import java.util.List;
+import java.util.PriorityQueue;
import java.util.function.Supplier;
/**
@@ -54,6 +56,9 @@ public class PostgresSourceReader extends IncrementalSourceReaderWithCommit {
/** whether to commit offset. */
private volatile boolean isCommitOffset = false;
+ private final PriorityQueue minHeap;
+ private final int lsnCommitCheckpointsDelay;
+
public PostgresSourceReader(
FutureCompletingBlockingQueue elementQueue,
Supplier supplier,
@@ -72,6 +77,9 @@ public PostgresSourceReader(
sourceConfig,
sourceSplitSerializer,
dialect);
+ this.lsnCommitCheckpointsDelay =
+ ((PostgresSourceConfig) sourceConfig).getLsnCommitCheckpointsDelay();
+ this.minHeap = new PriorityQueue<>();
}
@Override
@@ -104,12 +112,23 @@ public List snapshotState(long checkpointId) {
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ this.minHeap.add(checkpointId);
+ if (this.minHeap.size() <= this.lsnCommitCheckpointsDelay) {
+ LOG.info("Pending checkpoints '{}'.", this.minHeap);
+ return;
+ }
+ final long checkpointIdToCommit = this.minHeap.poll();
+ LOG.info(
+ "Pending checkpoints '{}', to be committed checkpoint id '{}'.",
+ this.minHeap,
+ checkpointIdToCommit);
+
// After all snapshot splits are finished, update stream split's metadata and reset start
// offset, which maybe smaller than before.
// In case that new start-offset of stream split has been recycled, don't commit offset
// during new table added phase.
if (isCommitOffset()) {
- super.notifyCheckpointComplete(checkpointId);
+ super.notifyCheckpointComplete(checkpointIdToCommit);
}
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
index e0e1b6b9be7..5674908ae9f 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
@@ -57,6 +57,7 @@
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
+import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SLOT_NAME;
@@ -114,6 +115,7 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
boolean closeIdlerReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
boolean isScanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
+ int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
if (enableParallelRead) {
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
@@ -158,7 +160,8 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
chunkKeyColumn,
closeIdlerReaders,
skipSnapshotBackfill,
- isScanNewlyAddedTableEnabled);
+ isScanNewlyAddedTableEnabled,
+ lsnCommitCheckpointsDelay);
}
@Override
@@ -200,6 +203,7 @@ public Set> optionalOptions() {
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
+ options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
return options;
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java
index add3d8f3dc0..9a9dd822da1 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java
@@ -82,10 +82,9 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe
private final StartupOptions startupOptions;
private final String chunkKeyColumn;
private final boolean closeIdleReaders;
-
private final boolean skipSnapshotBackfill;
-
private final boolean scanNewlyAddedTableEnabled;
+ private final int lsnCommitCheckpointsDelay;
// --------------------------------------------------------------------------------------------
// Mutable attributes
@@ -124,7 +123,8 @@ public PostgreSQLTableSource(
@Nullable String chunkKeyColumn,
boolean closeIdleReaders,
boolean skipSnapshotBackfill,
- boolean isScanNewlyAddedTableEnabled) {
+ boolean isScanNewlyAddedTableEnabled,
+ int lsnCommitCheckpointsDelay) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
@@ -155,6 +155,7 @@ public PostgreSQLTableSource(
this.closeIdleReaders = closeIdleReaders;
this.skipSnapshotBackfill = skipSnapshotBackfill;
this.scanNewlyAddedTableEnabled = isScanNewlyAddedTableEnabled;
+ this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
}
@Override
@@ -216,6 +217,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.closeIdleReaders(closeIdleReaders)
.skipSnapshotBackfill(skipSnapshotBackfill)
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
+ .lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay)
.build();
return SourceProvider.of(parallelSource);
} else {
@@ -283,7 +285,8 @@ public DynamicTableSource copy() {
chunkKeyColumn,
closeIdleReaders,
skipSnapshotBackfill,
- scanNewlyAddedTableEnabled);
+ scanNewlyAddedTableEnabled,
+ lsnCommitCheckpointsDelay);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
index 10d64f226b7..5cc8ba5c586 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
@@ -220,6 +220,7 @@ protected PostgresSourceConfigFactory getMockPostgresSourceConfigFactory(
postgresSourceConfigFactory.tableList(schemaName + "." + tableName);
postgresSourceConfigFactory.splitSize(splitSize);
postgresSourceConfigFactory.skipSnapshotBackfill(skipSnapshotBackfill);
+ postgresSourceConfigFactory.setLsnCommitCheckpointsDelay(1);
return postgresSourceConfigFactory;
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
index 96fa103f939..8d1a6d97556 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
@@ -707,6 +707,7 @@ private List testBackfillWhenWritingEvents(
.tableList(tableId)
.startupOptions(startupOptions)
.skipSnapshotBackfill(skipSnapshotBackfill)
+ .lsnCommitCheckpointsDelay(1)
.deserializer(customerTable.getDeserializer())
.build();
@@ -816,7 +817,8 @@ private void testPostgresParallelSource(
+ " 'scan.startup.mode' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '100',"
+ " 'slot.name' = '%s',"
- + " 'scan.incremental.snapshot.backfill.skip' = '%s'"
+ + " 'scan.incremental.snapshot.backfill.skip' = '%s',"
+ + " 'scan.lsn-commit.checkpoints-num-delay' = '1'"
+ ""
+ ")",
customDatabase.getHost(),
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
new file mode 100644
index 00000000000..1c855cc3ff4
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source.reader;
+
+import org.apache.flink.cdc.connectors.postgres.source.MockPostgresDialect;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder;
+import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
+import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
+import org.apache.flink.cdc.connectors.postgres.testutils.TestTable;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PostgresSourceReader}. */
+class PostgresSourceReaderTest {
+
+ @Test
+ void testNotifyCheckpointWindowSizeOne() throws Exception {
+ final PostgresSourceReader reader = createReader(1);
+ final List completedCheckpointIds = new ArrayList<>();
+ MockPostgresDialect.setNotifyCheckpointCompleteCallback(
+ id -> completedCheckpointIds.add(id));
+ reader.notifyCheckpointComplete(11L);
+ assertThat(completedCheckpointIds).isEmpty();
+ reader.notifyCheckpointComplete(12L);
+ assertThat(completedCheckpointIds).containsExactly(11L);
+ reader.notifyCheckpointComplete(13L);
+ assertThat(completedCheckpointIds).containsExactly(11L, 12L);
+ }
+
+ @Test
+ void testNotifyCheckpointWindowSizeDefault() throws Exception {
+ final PostgresSourceReader reader = createReader(3);
+ final List completedCheckpointIds = new ArrayList<>();
+ MockPostgresDialect.setNotifyCheckpointCompleteCallback(
+ id -> completedCheckpointIds.add(id));
+ reader.notifyCheckpointComplete(103L);
+ assertThat(completedCheckpointIds).isEmpty();
+ reader.notifyCheckpointComplete(102L);
+ assertThat(completedCheckpointIds).isEmpty();
+ reader.notifyCheckpointComplete(101L);
+ assertThat(completedCheckpointIds).isEmpty();
+ reader.notifyCheckpointComplete(104L);
+ assertThat(completedCheckpointIds).containsExactly(101L);
+ }
+
+ private PostgresSourceReader createReader(final int lsnCommitCheckpointsDelay)
+ throws Exception {
+ final PostgresOffsetFactory offsetFactory = new PostgresOffsetFactory();
+ final PostgresSourceConfigFactory configFactory = new PostgresSourceConfigFactory();
+ configFactory.hostname("host");
+ configFactory.database("pgdb");
+ configFactory.username("username");
+ configFactory.password("password");
+ configFactory.setLsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay);
+ final TestTable customerTable =
+ new TestTable(
+ "pgdb",
+ "customer",
+ "customers",
+ ResolvedSchema.of(Column.physical("id", BIGINT())));
+ final DebeziumDeserializationSchema> deserializer = customerTable.getDeserializer();
+ MockPostgresDialect dialect = new MockPostgresDialect(configFactory.create(0));
+ final PostgresSourceBuilder.PostgresIncrementalSource> source =
+ new PostgresSourceBuilder.PostgresIncrementalSource<>(
+ configFactory, checkNotNull(deserializer), offsetFactory, dialect);
+ return source.createReader(new TestingReaderContext());
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java
index 9fe55b18f20..199ff24fe08 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java
@@ -64,7 +64,8 @@ public MockPostgreSQLTableSource(PostgreSQLTableSource postgreSQLTableSource) {
(String) get(postgreSQLTableSource, "chunkKeyColumn"),
(boolean) get(postgreSQLTableSource, "closeIdleReaders"),
(boolean) get(postgreSQLTableSource, "skipSnapshotBackfill"),
- (boolean) get(postgreSQLTableSource, "scanNewlyAddedTableEnabled"));
+ (boolean) get(postgreSQLTableSource, "scanNewlyAddedTableEnabled"),
+ (int) get(postgreSQLTableSource, "lsnCommitCheckpointsDelay"));
}
@Override
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java
index eb7360689db..50cd9b7a4d1 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java
@@ -59,6 +59,7 @@
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECT_TIMEOUT;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
@@ -151,7 +152,8 @@ public void testCommonProperties() {
null,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
- SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue());
+ SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
+ SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -196,7 +198,8 @@ public void testOptionalProperties() {
null,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT,
true,
- true);
+ true,
+ SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -239,7 +242,8 @@ public void testMetadataColumns() {
null,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
- SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue());
+ SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
+ SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue());
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys =
Arrays.asList("op_ts", "database_name", "schema_name", "table_name");
@@ -292,7 +296,8 @@ public void testEnableParallelReadSource() {
null,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
- SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue());
+ SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
+ SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -335,7 +340,8 @@ public void testStartupFromLatestOffset() {
null,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
- SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue());
+ SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
+ SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue());
assertEquals(expectedSource, actualSource);
}