Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions docs/content/docs/connectors/flink-sources/postgres-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ under the License.

The Postgres CDC connector allows for reading snapshot data and incremental data from PostgreSQL database. This document describes how to setup the Postgres CDC connector to run SQL queries against PostgreSQL databases.

Dependencies
------------
## Dependencies

In order to setup the Postgres CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Expand All @@ -45,8 +44,7 @@ Download [flink-sql-connector-postgres-cdc-3.0.1.jar](https://repo1.maven.org/ma

**Note:** Refer to [flink-sql-connector-postgres-cdc](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-postgres-cdc), more released versions will be available in the Maven central warehouse.

How to create a Postgres CDC table
----------------
## How to create a Postgres CDC table

The Postgres CDC table can be defined as following:

Expand Down Expand Up @@ -76,8 +74,7 @@ CREATE TABLE shipments (
SELECT * FROM shipments;
```

Connector Options
----------------
## Connector Options

<div class="highlight">
<table class="colwidths-auto docutils">
Expand Down Expand Up @@ -236,12 +233,29 @@ Connector Options
so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
</td>
</tr>
<tr>
<td>scan.lsn-commit.checkpoints-num-delay</td>
<td>optional</td>
<td style="word-wrap: break-word;">3</td>
<td>Integer</td>
<td>The number of checkpoint delays before starting to commit the LSN offsets. <br>
The checkpoint LSN offsets will be committed in rolling fashion, the earliest checkpoint identifier will be committed first from the delayed checkpoints.
</td>
</tr>
</tbody>
</table>
</div>
<div>

Note: `slot.name` is recommended to set for different tables to avoid the potential `PSQLException: ERROR: replication slot "flink" is active for PID 974` error. See more [here](https://debezium.io/documentation/reference/2.0/connectors/postgresql.html#postgresql-property-slot-name).
### Notes

#### `slot.name` option

The `slot.name` is recommended to set for different tables to avoid the potential `PSQLException: ERROR: replication slot "flink" is active for PID 974` error. See more [here](https://debezium.io/documentation/reference/2.0/connectors/postgresql.html#postgresql-property-slot-name).

#### `scan.lsn-commit.checkpoints-num-delay` option

When consuming PostgreSQL logs, the LSN offset must be committed to trigger the log data cleanup for the corresponding slot. However, once the LSN offset is committed, earlier offsets become invalid. To ensure access to earlier LSN offsets for job recovery, we delay the LSN commit by `scan.lsn-commit.checkpoints-num-delay` (default value is `3`) checkpoints. This feature is available when config option `scan.incremental.snapshot.enabled` is set to true.

### Incremental Snapshot Options

Expand Down Expand Up @@ -340,8 +354,7 @@ The following options is available only when `scan.incremental.snapshot.enabled=
</table>
</div>

Available Metadata
----------------
## Available Metadata

The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.

Expand Down Expand Up @@ -377,8 +390,7 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a
</tbody>
</table>

Limitation
--------
## Limitation

### Can't perform checkpoint during scanning snapshot of tables when incremental snapshot is disabled

Expand Down Expand Up @@ -417,8 +429,7 @@ CREATE TABLE products (
);
```

Features
--------
## Features

### Incremental Snapshot Reading (Experimental)

Expand Down Expand Up @@ -522,8 +533,7 @@ public class PostgreSQLSourceExample {
}
```

Data Type Mapping
----------------
## Data Type Mapping

<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,16 @@ limitations under the License.
<version>${json-path.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>


<!-- tests will have log4j as the default logging framework available -->

</dependencies>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,11 @@
/** The dialect for Postgres. */
public class PostgresDialect implements JdbcDataSourceDialect {
private static final long serialVersionUID = 1L;

private static final String CONNECTION_NAME = "postgres-cdc-connector";

private final PostgresSourceConfig sourceConfig;
private transient Tables.TableFilter filters;

private transient CustomPostgresSchema schema;

@Nullable private PostgresStreamFetchTask streamFetchTask;

public PostgresDialect(PostgresSourceConfig sourceConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,12 @@ public PostgresSourceBuilder<T> scanNewlyAddedTableEnabled(boolean scanNewlyAdde
return this;
}

/** Set the {@code LSN} checkpoints delay number for Postgres to commit the offsets. */
public PostgresSourceBuilder<T> lsnCommitCheckpointsDelay(int lsnCommitDelay) {
this.configFactory.setLsnCommitCheckpointsDelay(lsnCommitDelay);
return this;
}

/**
* Build the {@link PostgresIncrementalSource}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
private static final long serialVersionUID = 1L;

private final int subtaskId;
private final int lsnCommitCheckpointsDelay;

public PostgresSourceConfig(
int subtaskId,
Expand Down Expand Up @@ -64,7 +65,8 @@ public PostgresSourceConfig(
int connectionPoolSize,
@Nullable String chunkKeyColumn,
boolean skipSnapshotBackfill,
boolean isScanNewlyAddedTableEnabled) {
boolean isScanNewlyAddedTableEnabled,
int lsnCommitCheckpointsDelay) {
super(
startupOptions,
databaseList,
Expand Down Expand Up @@ -92,14 +94,34 @@ public PostgresSourceConfig(
skipSnapshotBackfill,
isScanNewlyAddedTableEnabled);
this.subtaskId = subtaskId;
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
}

/**
* Returns {@code subtaskId} value.
*
* @return subtask id
*/
public int getSubtaskId() {
return subtaskId;
}

/**
* Returns {@code lsnCommitCheckpointsDelay} value.
*
* @return lsn commit checkpoint delay
*/
public int getLsnCommitCheckpointsDelay() {
return this.lsnCommitCheckpointsDelay;
}

/**
* Returns the slot name for backfill task.
*
* @return backfill task slot name
*/
public String getSlotNameForBackfillTask() {
return getDbzProperties().getProperty(SLOT_NAME.name()) + "_" + subtaskId;
return getDbzProperties().getProperty(SLOT_NAME.name()) + "_" + getSubtaskId();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory {

private List<String> schemaList;

private int lsnCommitCheckpointsDelay;

/** Creates a new {@link PostgresSourceConfig} for the given subtask {@code subtaskId}. */
@Override
public PostgresSourceConfig create(int subtaskId) {
Expand Down Expand Up @@ -100,7 +102,7 @@ public PostgresSourceConfig create(int subtaskId) {

// The PostgresSource will do snapshot according to its StartupMode.
// Do not need debezium to do the snapshot work.
props.put("snapshot.mode", "never");
props.setProperty("snapshot.mode", "never");

Configuration dbzConfiguration = Configuration.from(props);
return new PostgresSourceConfig(
Expand Down Expand Up @@ -129,7 +131,8 @@ public PostgresSourceConfig create(int subtaskId) {
connectionPoolSize,
chunkKeyColumn,
skipSnapshotBackfill,
scanNewlyAddedTableEnabled);
scanNewlyAddedTableEnabled,
lsnCommitCheckpointsDelay);
}

/**
Expand Down Expand Up @@ -173,4 +176,9 @@ public void slotName(String slotName) {
public void heartbeatInterval(Duration heartbeatInterval) {
this.heartbeatInterval = heartbeatInterval;
}

/** The lsn commit checkpoints delay for Postgres. */
public void setLsnCommitCheckpointsDelay(int lsnCommitCheckpointsDelay) {
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,14 @@ public class PostgresSourceOptions extends JdbcSourceOptions {
.defaultValue(Duration.ofSeconds(30))
.withDescription(
"Optional interval of sending heartbeat event for tracing the latest available replication slot offsets");

public static final ConfigOption<Integer> SCAN_LSN_COMMIT_CHECKPOINTS_DELAY =
ConfigOptions.key("scan.lsn-commit.checkpoints-num-delay")
.intType()
.defaultValue(3)
.withDescription(
"The number of checkpoint delays before starting to commit the LSN offsets.\n"
+ "By setting this to higher value, the offset that is consumed by global slot will be "
+ "committed after multiple checkpoint delays instead of after each checkpoint completion.\n"
+ "This allows continuous recycle of log files in stream phase.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderContext;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderWithCommit;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import org.apache.flink.cdc.connectors.postgres.source.events.OffsetCommitAckEvent;
import org.apache.flink.cdc.connectors.postgres.source.events.OffsetCommitEvent;
import org.apache.flink.configuration.Configuration;
Expand All @@ -38,6 +39,7 @@
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.PriorityQueue;
import java.util.function.Supplier;

/**
Expand All @@ -54,6 +56,9 @@ public class PostgresSourceReader extends IncrementalSourceReaderWithCommit {
/** whether to commit offset. */
private volatile boolean isCommitOffset = false;

private final PriorityQueue<Long> minHeap;
private final int lsnCommitCheckpointsDelay;

public PostgresSourceReader(
FutureCompletingBlockingQueue elementQueue,
Supplier supplier,
Expand All @@ -72,6 +77,9 @@ public PostgresSourceReader(
sourceConfig,
sourceSplitSerializer,
dialect);
this.lsnCommitCheckpointsDelay =
((PostgresSourceConfig) sourceConfig).getLsnCommitCheckpointsDelay();
this.minHeap = new PriorityQueue<>();
}

@Override
Expand Down Expand Up @@ -104,12 +112,23 @@ public List<SourceSplitBase> snapshotState(long checkpointId) {

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
this.minHeap.add(checkpointId);
if (this.minHeap.size() <= this.lsnCommitCheckpointsDelay) {
LOG.info("Pending checkpoints '{}'.", this.minHeap);
return;
}
final long checkpointIdToCommit = this.minHeap.poll();
LOG.info(
"Pending checkpoints '{}', to be committed checkpoint id '{}'.",
this.minHeap,
checkpointIdToCommit);

// After all snapshot splits are finished, update stream split's metadata and reset start
// offset, which maybe smaller than before.
// In case that new start-offset of stream split has been recycled, don't commit offset
// during new table added phase.
if (isCommitOffset()) {
super.notifyCheckpointComplete(checkpointId);
super.notifyCheckpointComplete(checkpointIdToCommit);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SLOT_NAME;
Expand Down Expand Up @@ -114,6 +115,7 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
boolean closeIdlerReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
boolean isScanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);

if (enableParallelRead) {
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
Expand Down Expand Up @@ -158,7 +160,8 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
chunkKeyColumn,
closeIdlerReaders,
skipSnapshotBackfill,
isScanNewlyAddedTableEnabled);
isScanNewlyAddedTableEnabled,
lsnCommitCheckpointsDelay);
}

@Override
Expand Down Expand Up @@ -200,6 +203,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,9 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe
private final StartupOptions startupOptions;
private final String chunkKeyColumn;
private final boolean closeIdleReaders;

private final boolean skipSnapshotBackfill;

private final boolean scanNewlyAddedTableEnabled;
private final int lsnCommitCheckpointsDelay;

// --------------------------------------------------------------------------------------------
// Mutable attributes
Expand Down Expand Up @@ -124,7 +123,8 @@ public PostgreSQLTableSource(
@Nullable String chunkKeyColumn,
boolean closeIdleReaders,
boolean skipSnapshotBackfill,
boolean isScanNewlyAddedTableEnabled) {
boolean isScanNewlyAddedTableEnabled,
int lsnCommitCheckpointsDelay) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
Expand Down Expand Up @@ -155,6 +155,7 @@ public PostgreSQLTableSource(
this.closeIdleReaders = closeIdleReaders;
this.skipSnapshotBackfill = skipSnapshotBackfill;
this.scanNewlyAddedTableEnabled = isScanNewlyAddedTableEnabled;
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
}

@Override
Expand Down Expand Up @@ -216,6 +217,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.closeIdleReaders(closeIdleReaders)
.skipSnapshotBackfill(skipSnapshotBackfill)
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay)
.build();
return SourceProvider.of(parallelSource);
} else {
Expand Down Expand Up @@ -283,7 +285,8 @@ public DynamicTableSource copy() {
chunkKeyColumn,
closeIdleReaders,
skipSnapshotBackfill,
scanNewlyAddedTableEnabled);
scanNewlyAddedTableEnabled,
lsnCommitCheckpointsDelay);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
Expand Down
Loading