diff --git a/docs/content/docs/connectors/flink-sources/postgres-cdc.md b/docs/content/docs/connectors/flink-sources/postgres-cdc.md index 4aca6f884ae..be3d826d0d8 100644 --- a/docs/content/docs/connectors/flink-sources/postgres-cdc.md +++ b/docs/content/docs/connectors/flink-sources/postgres-cdc.md @@ -387,6 +387,13 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a TIMESTAMP_LTZ(3) NOT NULL It indicates the time that the change was made in the database.
If the record is read from snapshot of the table instead of the change stream, the value is always 0. + + row_kind + STRING NOT NULL + It indicates the row kind of the changelog,Note: The downstream SQL operator may fail to compare due to this new added column when processing the row retraction if +the source operator chooses to output the 'row_kind' column for each record. It is recommended to use this metadata column only in simple synchronization jobs. +
'+I' means INSERT message, '-D' means DELETE message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message. + diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLReadableMetadata.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLReadableMetadata.java index 1d4c2b1b766..743411c5230 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLReadableMetadata.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLReadableMetadata.java @@ -18,7 +18,9 @@ package org.apache.flink.cdc.connectors.postgres.table; import org.apache.flink.cdc.debezium.table.MetadataConverter; +import org.apache.flink.cdc.debezium.table.RowDataMetadataConverter; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.DataType; @@ -95,6 +97,28 @@ public Object read(SourceRecord record) { return TimestampData.fromEpochMillis( (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY)); } + }), + + /** + * It indicates the row kind of the changelog. '+I' means INSERT message, '-D' means DELETE + * message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message + */ + ROW_KIND( + "row_kind", + DataTypes.STRING().notNull(), + new RowDataMetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(RowData rowData) { + return StringData.fromString(rowData.getRowKind().shortString()); + } + + @Override + public Object read(SourceRecord record) { + throw new UnsupportedOperationException( + "Please call read(RowData rowData) method instead."); + } }); private final String key; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index c28646aa0ae..4b355cac027 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -165,20 +165,24 @@ public void testConsumingAllEvents() * The final database table looks like this: * * > SELECT * FROM products; - * +-----+--------------------+---------------------------------------------------------+--------+ - * | id | name | description | weight | - * +-----+--------------------+---------------------------------------------------------+--------+ - * | 101 | scooter | Small 2-wheel scooter | 3.14 | - * | 102 | car battery | 12V car battery | 8.1 | - * | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | - * | 104 | hammer | 12oz carpenter's hammer | 0.75 | - * | 105 | hammer | 14oz carpenter's hammer | 0.875 | - * | 106 | hammer | 18oz carpenter hammer | 1 | - * | 107 | rocks | box of assorted rocks | 5.1 | - * | 108 | jacket | water resistent black wind breaker | 0.1 | - * | 109 | spare tire | 24 inch spare tire | 22.2 | - * | 110 | jacket | new water resistent white wind breaker | 0.5 | - * +-----+--------------------+---------------------------------------------------------+--------+ + * +-----+--------------------+------------------------------------------------- + * --------+--------+ + * | id | name | description | weight | + * +-----+--------------------+------------------------------------------------- + * --------+--------+ + * | 101 | scooter | Small 2-wheel scooter | 3.14 | + * | 102 | car battery | 12V car battery | 8.1 | + * | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from + * #40 to #3 | 0.8 | + * | 104 | hammer | 12oz carpenter's hammer | 0.75 | + * | 105 | hammer | 14oz carpenter's hammer | 0.875 | + * | 106 | hammer | 18oz carpenter hammer | 1 | + * | 107 | rocks | box of assorted rocks | 5.1 | + * | 108 | jacket | water resistent black wind breaker | 0.1 | + * | 109 | spare tire | 24 inch spare tire | 22.2 | + * | 110 | jacket | new water resistent white wind breaker | 0.5 | + * +-----+--------------------+------------------------------------------------- + * --------+--------+ * */ @@ -246,7 +250,8 @@ public void testStartupFromLatestOffset() throws Exception { // async submit job TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source"); - // wait for the source startup, we don't have a better way to wait it, use sleep for now + // wait for the source startup, we don't have a better way to wait it, use sleep + // for now Thread.sleep(10000L); try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); @@ -469,6 +474,7 @@ public void testMetadataColumns() throws Throwable { + " db_name STRING METADATA FROM 'database_name' VIRTUAL," + " schema_name STRING METADATA VIRTUAL," + " table_name STRING METADATA VIRTUAL," + + " row_kind STRING METADATA FROM 'row_kind' VIRTUAL," + " id INT NOT NULL," + " name STRING," + " description STRING," @@ -501,6 +507,7 @@ public void testMetadataColumns() throws Throwable { + " database_name STRING," + " schema_name STRING," + " table_name STRING," + + " row_kind STRING," + " id INT," + " name STRING," + " description STRING," @@ -546,52 +553,52 @@ public void testMetadataColumns() throws Throwable { Arrays.asList( "+I(" + databaseName - + ",inventory,products,101,scooter,Small 2-wheel scooter,3.140)", + + ",inventory,products,+I,101,scooter,Small 2-wheel scooter,3.140)", "+I(" + databaseName - + ",inventory,products,102,car battery,12V car battery,8.100)", + + ",inventory,products,+I,102,car battery,12V car battery,8.100)", "+I(" + databaseName - + ",inventory,products,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)", + + ",inventory,products,+I,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)", "+I(" + databaseName - + ",inventory,products,104,hammer,12oz carpenter's hammer,0.750)", + + ",inventory,products,+I,104,hammer,12oz carpenter's hammer,0.750)", "+I(" + databaseName - + ",inventory,products,105,hammer,14oz carpenter's hammer,0.875)", + + ",inventory,products,+I,105,hammer,14oz carpenter's hammer,0.875)", "+I(" + databaseName - + ",inventory,products,106,hammer,16oz carpenter's hammer,1.000)", + + ",inventory,products,+I,106,hammer,16oz carpenter's hammer,1.000)", "+I(" + databaseName - + ",inventory,products,107,rocks,box of assorted rocks,5.300)", + + ",inventory,products,+I,107,rocks,box of assorted rocks,5.300)", "+I(" + databaseName - + ",inventory,products,108,jacket,water resistent black wind breaker,0.100)", + + ",inventory,products,+I,108,jacket,water resistent black wind breaker,0.100)", "+I(" + databaseName - + ",inventory,products,109,spare tire,24 inch spare tire,22.200)", + + ",inventory,products,+I,109,spare tire,24 inch spare tire,22.200)", "+I(" + databaseName - + ",inventory,products,110,jacket,water resistent white wind breaker,0.200)", + + ",inventory,products,+I,110,jacket,water resistent white wind breaker,0.200)", "+I(" + databaseName - + ",inventory,products,111,scooter,Big 2-wheel scooter ,5.180)", + + ",inventory,products,+I,111,scooter,Big 2-wheel scooter ,5.180)", "+U(" + databaseName - + ",inventory,products,106,hammer,18oz carpenter hammer,1.000)", + + ",inventory,products,+U,106,hammer,18oz carpenter hammer,1.000)", "+U(" + databaseName - + ",inventory,products,107,rocks,box of assorted rocks,5.100)", + + ",inventory,products,+U,107,rocks,box of assorted rocks,5.100)", "+U(" + databaseName - + ",inventory,products,110,jacket,new water resistent white wind breaker,0.500)", + + ",inventory,products,+U,110,jacket,new water resistent white wind breaker,0.500)", "+U(" + databaseName - + ",inventory,products,111,scooter,Big 2-wheel scooter ,5.170)", + + ",inventory,products,+U,111,scooter,Big 2-wheel scooter ,5.170)", "-D(" + databaseName - + ",inventory,products,111,scooter,Big 2-wheel scooter ,5.170)"); + + ",inventory,products,-D,111,scooter,Big 2-wheel scooter ,5.170)"); List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); Collections.sort(actual); Collections.sort(expected); @@ -679,20 +686,24 @@ public void testUpsertMode() throws Exception { * The final database table looks like this: * * > SELECT * FROM products; - * +-----+--------------------+---------------------------------------------------------+--------+ - * | id | name | description | weight | - * +-----+--------------------+---------------------------------------------------------+--------+ - * | 101 | scooter | Small 2-wheel scooter | 3.14 | - * | 102 | car battery | 12V car battery | 8.1 | - * | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | - * | 104 | hammer | 12oz carpenter's hammer | 0.75 | - * | 105 | hammer | 14oz carpenter's hammer | 0.875 | - * | 106 | hammer | 18oz carpenter hammer | 1 | - * | 107 | rocks | box of assorted rocks | 5.1 | - * | 108 | jacket | water resistent black wind breaker | 0.1 | - * | 109 | spare tire | 24 inch spare tire | 22.2 | - * | 110 | jacket | new water resistent white wind breaker | 0.5 | - * +-----+--------------------+---------------------------------------------------------+--------+ + * +-----+--------------------+------------------------------------------------- + * --------+--------+ + * | id | name | description | weight | + * +-----+--------------------+------------------------------------------------- + * --------+--------+ + * | 101 | scooter | Small 2-wheel scooter | 3.14 | + * | 102 | car battery | 12V car battery | 8.1 | + * | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from + * #40 to #3 | 0.8 | + * | 104 | hammer | 12oz carpenter's hammer | 0.75 | + * | 105 | hammer | 14oz carpenter's hammer | 0.875 | + * | 106 | hammer | 18oz carpenter hammer | 1 | + * | 107 | rocks | box of assorted rocks | 5.1 | + * | 108 | jacket | water resistent black wind breaker | 0.1 | + * | 109 | spare tire | 24 inch spare tire | 22.2 | + * | 110 | jacket | new water resistent white wind breaker | 0.5 | + * +-----+--------------------+------------------------------------------------- + * --------+--------+ * */ diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java index 50cd9b7a4d1..7b8f88675da 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java @@ -50,19 +50,19 @@ import java.util.Map; import java.util.Properties; +import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE; +import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES; +import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE; import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; -import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CHUNK_META_GROUP_SIZE; -import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECTION_POOL_SIZE; -import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECT_MAX_RETRIES; -import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECT_TIMEOUT; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL; -import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY; -import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; -import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; -import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static org.apache.flink.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -100,6 +100,7 @@ public class PostgreSQLTableFactoryTest { Column.physical("name", DataTypes.STRING()), Column.physical("count", DataTypes.DECIMAL(38, 18)), Column.metadata("time", DataTypes.TIMESTAMP_LTZ(3), "op_ts", true), + Column.metadata("row_kind", DataTypes.STRING(), "row_kind", true), Column.metadata( "database_name", DataTypes.STRING(), "database_name", true), Column.metadata("schema_name", DataTypes.STRING(), "schema_name", true), @@ -211,7 +212,7 @@ public void testMetadataColumns() { DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties); PostgreSQLTableSource postgreSQLTableSource = (PostgreSQLTableSource) actualSource; postgreSQLTableSource.applyReadableMetadata( - Arrays.asList("op_ts", "database_name", "schema_name", "table_name"), + Arrays.asList("row_kind", "op_ts", "database_name", "schema_name", "table_name"), SCHEMA_WITH_METADATA.toSourceRowDataType()); actualSource = postgreSQLTableSource.copy(); PostgreSQLTableSource expectedSource = @@ -246,7 +247,7 @@ public void testMetadataColumns() { SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue()); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = - Arrays.asList("op_ts", "database_name", "schema_name", "table_name"); + Arrays.asList("row_kind", "op_ts", "database_name", "schema_name", "table_name"); assertEquals(expectedSource, actualSource);