From 80920d1394a62b3d0f85acdd53e280850f80c813 Mon Sep 17 00:00:00 2001 From: Xiaohan Song Date: Fri, 10 May 2024 09:13:37 -0700 Subject: [PATCH] Postgres on Resumable full refresh (#37112) --- .../src/main/resources/version.properties | 2 +- .../integrations/debezium/CdcSourceTest.kt | 4 +- .../jdbc/test/JdbcSourceAcceptanceTest.kt | 9 +- .../connectors/source-postgres/build.gradle | 2 +- .../connectors/source-postgres/metadata.yaml | 2 +- .../source/postgres/PostgresSource.java | 300 +++++++++++----- .../PostgresCdcConnectorMetadataInjector.java | 2 +- .../cdc/PostgresCdcCtidInitializer.java | 325 ++++++++++-------- .../postgres/cdc/PostgresCdcCtidUtils.java | 3 - .../postgres/ctid/CtidGlobalStateManager.java | 89 +++-- .../ctid/CtidPerStreamStateManager.java | 5 + .../postgres/ctid/CtidStateManager.java | 34 +- .../source/postgres/ctid/CtidUtils.java | 38 +- .../postgres/ctid/PostgresCtidHandler.java | 60 ++-- .../cursor_based/CursorBasedCtidUtils.java | 6 +- .../source/postgres/xmin/XminCtidUtils.java | 2 +- .../postgres/CdcPostgresSourceTest.java | 9 + .../PostgresJdbcSourceAcceptanceTest.java | 8 +- .../postgres/PostgresSourceSSLTest.java | 4 +- .../source/postgres/PostgresSourceTest.java | 251 +++++++++++++- .../CursorBasedCtidUtilsTest.java | 3 +- .../postgres/utils/PostgresUnitTestsUtil.java | 6 + .../postgres/xmin/XminCtidUtilsTest.java | 3 +- docs/integrations/sources/postgres.md | 5 +- .../postgres/postgres-troubleshooting.md | 2 +- 25 files changed, 843 insertions(+), 331 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index e592c949f14f..e62fca3ae545 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.34.1 \ No newline at end of file +version=0.34.2 \ No newline at end of file diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt index 29a4bb008380..17830786a720 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt @@ -715,7 +715,7 @@ abstract class CdcSourceTest> { // sync, the // data is replicated as expected. @Throws(Exception::class) - fun testCdcAndNonResumableFullRefreshInSameSync() { + protected open fun testCdcAndNonResumableFullRefreshInSameSync() { val configuredCatalog = Jsons.clone(configuredCatalog) val MODEL_RECORDS_2: List = @@ -734,7 +734,7 @@ abstract class CdcSourceTest> { createTableSqlFmt(), modelsSchema(), MODELS_STREAM_NAME_2, - columnClause(columns, Optional.of(COL_ID)), + columnClause(columns, Optional.empty()), ) for (recordJson in MODEL_RECORDS_2) { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt index 8a629add2778..1efb963ff0ce 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt @@ -420,9 +420,10 @@ abstract class JdbcSourceAcceptanceTest> { setEmittedAtToNull(actualMessages) val expectedMessages = airbyteMessagesReadOneColumn - Assertions.assertEquals(expectedMessages.size, actualMessages.size) - Assertions.assertTrue(expectedMessages.containsAll(actualMessages)) - Assertions.assertTrue(actualMessages.containsAll(expectedMessages)) + val actualRecordMessages = filterRecords(actualMessages) + Assertions.assertEquals(expectedMessages.size, actualRecordMessages.size) + Assertions.assertTrue(expectedMessages.containsAll(actualRecordMessages)) + Assertions.assertTrue(actualRecordMessages.containsAll(expectedMessages)) } protected open val airbyteMessagesReadOneColumn: List @@ -507,8 +508,6 @@ abstract class JdbcSourceAcceptanceTest> { expectedMessages.addAll(getAirbyteMessagesSecondSync(streamName2)) - System.out.println("catalog: " + catalog) - val actualMessages = MoreIterators.toList(source()!!.read(config(), catalog, null)) val actualRecordMessages = filterRecords(actualMessages) diff --git a/airbyte-integrations/connectors/source-postgres/build.gradle b/airbyte-integrations/connectors/source-postgres/build.gradle index c8ec7eded691..3e21444195c4 100644 --- a/airbyte-integrations/connectors/source-postgres/build.gradle +++ b/airbyte-integrations/connectors/source-postgres/build.gradle @@ -12,7 +12,7 @@ java { } airbyteJavaConnector { - cdkVersionRequired = '0.34.1' + cdkVersionRequired = '0.34.2' features = ['db-sources', 'datastore-postgres'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-postgres/metadata.yaml b/airbyte-integrations/connectors/source-postgres/metadata.yaml index 594c010e9175..a54a1acd5196 100644 --- a/airbyte-integrations/connectors/source-postgres/metadata.yaml +++ b/airbyte-integrations/connectors/source-postgres/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 - dockerImageTag: 3.3.33 + dockerImageTag: 3.4.0 dockerRepository: airbyte/source-postgres documentationUrl: https://docs.airbyte.com/integrations/sources/postgres githubIssueLabel: source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index ebc43eed54b6..f173123d3cef 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -26,8 +26,13 @@ import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.filterStreamsUnderVacuumForCtidSync; import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.getCursorBasedSyncStatusForStreams; import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.streamsUnderVacuum; +import static io.airbyte.integrations.source.postgres.PostgresUtils.isCdc; +import static io.airbyte.integrations.source.postgres.PostgresUtils.isXmin; import static io.airbyte.integrations.source.postgres.PostgresUtils.prettyPrintConfiguredAirbyteStreamList; import static io.airbyte.integrations.source.postgres.cdc.PostgresCdcCtidInitializer.cdcCtidIteratorsCombined; +import static io.airbyte.integrations.source.postgres.cdc.PostgresCdcCtidInitializer.getCtidInitialLoadGlobalStateManager; +import static io.airbyte.integrations.source.postgres.cdc.PostgresCdcCtidInitializer.getSavedOffsetAfterReplicationSlotLSN; +import static io.airbyte.integrations.source.postgres.ctid.CtidUtils.createInitialLoader; import static io.airbyte.integrations.source.postgres.cursor_based.CursorBasedCtidUtils.categoriseStreams; import static io.airbyte.integrations.source.postgres.cursor_based.CursorBasedCtidUtils.reclassifyCategorisedCtidStreams; import static io.airbyte.integrations.source.postgres.xmin.XminCtidUtils.categoriseStreams; @@ -58,6 +63,7 @@ import io.airbyte.cdk.integrations.source.jdbc.JdbcSSLConnectionUtils; import io.airbyte.cdk.integrations.source.jdbc.JdbcSSLConnectionUtils.SslMode; import io.airbyte.cdk.integrations.source.jdbc.dto.JdbcPrivilegeDto; +import io.airbyte.cdk.integrations.source.relationaldb.InitialLoadHandler; import io.airbyte.cdk.integrations.source.relationaldb.TableInfo; import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager; import io.airbyte.commons.exceptions.ConfigErrorException; @@ -67,12 +73,10 @@ import io.airbyte.commons.map.MoreMaps; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.integrations.source.postgres.PostgresQueryUtils.ResultWithFailed; -import io.airbyte.integrations.source.postgres.PostgresQueryUtils.TableBlockSize; import io.airbyte.integrations.source.postgres.cdc.PostgresReplicationConnection; +import io.airbyte.integrations.source.postgres.ctid.CtidGlobalStateManager; import io.airbyte.integrations.source.postgres.ctid.CtidPerStreamStateManager; -import io.airbyte.integrations.source.postgres.ctid.CtidPostgresSourceOperations; import io.airbyte.integrations.source.postgres.ctid.CtidStateManager; -import io.airbyte.integrations.source.postgres.ctid.CtidUtils; import io.airbyte.integrations.source.postgres.ctid.CtidUtils.StreamsCategorised; import io.airbyte.integrations.source.postgres.ctid.FileNodeHandler; import io.airbyte.integrations.source.postgres.ctid.PostgresCtidHandler; @@ -95,6 +99,7 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import io.airbyte.protocol.models.v0.ConnectorSpecification; +import io.airbyte.protocol.models.v0.SyncMode; import java.net.URI; import java.net.URISyntaxException; import java.net.URLEncoder; @@ -280,7 +285,7 @@ protected void logPreSyncDebugData(final JdbcDatabase database, final Configured public AirbyteCatalog discover(final JsonNode config) throws Exception { final AirbyteCatalog catalog = super.discover(config); - if (PostgresUtils.isCdc(config)) { + if (isCdc(config)) { final List streams = catalog.getStreams().stream() .map(PostgresCatalogHelper::overrideSyncModes) .map(PostgresCatalogHelper::removeIncrementalWithoutPk) @@ -293,7 +298,7 @@ public AirbyteCatalog discover(final JsonNode config) throws Exception { .collect(toList()); catalog.setStreams(streams); - } else if (PostgresUtils.isXmin(config)) { + } else if (isXmin(config)) { // Xmin replication has a source-defined cursor (the xmin column). This is done to prevent the user // from being able to pick their own cursor. final List streams = catalog.getStreams().stream() @@ -416,7 +421,7 @@ public List> getCheckOperations(final J final List> checkOperations = new ArrayList<>( super.getCheckOperations(config)); - if (PostgresUtils.isCdc(config)) { + if (isCdc(config)) { checkOperations.add(database -> { final List matchingSlots = getReplicationSlot(database, config); @@ -471,13 +476,13 @@ public List> getIncrementalIterators(final final StateManager stateManager, final Instant emittedAt) { final JsonNode sourceConfig = database.getSourceConfig(); - if (PostgresUtils.isCdc(sourceConfig) && isAnyStreamIncrementalSyncMode(catalog)) { + if (isCdc(sourceConfig) && isAnyStreamIncrementalSyncMode(catalog)) { LOGGER.info("Using ctid + CDC"); return cdcCtidIteratorsCombined(database, catalog, tableNameToTable, stateManager, emittedAt, getQuoteString(), - getReplicationSlot(database, sourceConfig).get(0)); + (CtidGlobalStateManager) ctidStateManager, savedOffsetAfterReplicationSlotLSN); } - if (isAnyStreamIncrementalSyncMode(catalog) && PostgresUtils.isXmin(sourceConfig)) { + if (isAnyStreamIncrementalSyncMode(catalog) && isXmin(sourceConfig)) { // Log and save the xmin status final XminStatus xminStatus; try { @@ -485,67 +490,43 @@ public List> getIncrementalIterators(final } catch (SQLException e) { throw new RuntimeException(e); } - LOGGER.info(String.format("Xmin Status : {Number of wraparounds: %s, Xmin Transaction Value: %s, Xmin Raw Value: %s", - xminStatus.getNumWraparound(), xminStatus.getXminXidValue(), xminStatus.getXminRawValue())); - final StreamsCategorised streamsCategorised = categoriseStreams(stateManager, catalog, xminStatus); - final ResultWithFailed> streamsUnderVacuum = streamsUnderVacuum(database, - streamsCategorised.ctidStreams().streamsForCtidSync(), - getQuoteString()); - - // Streams we failed to query for Vacuum - such as in the case of an unsupported postgres server - // are reclassified as xmin since we cannot guarantee that ctid will be possible. - reclassifyCategorisedCtidStreams(streamsCategorised, streamsUnderVacuum.failed()); - - List finalListOfStreamsToBeSyncedViaCtid = - filterStreamsUnderVacuumForCtidSync(streamsUnderVacuum.result(), streamsCategorised.ctidStreams()); + + finalListOfStreamsToBeSyncedViaCtid = finalListOfStreamsToBeSyncedViaCtid.stream() + .filter(streamUnderCheck -> streamUnderCheck.getSyncMode() == SyncMode.INCREMENTAL).collect(toList()); + final FileNodeHandler fileNodeHandler = PostgresQueryUtils.fileNodeForStreams(database, finalListOfStreamsToBeSyncedViaCtid, getQuoteString()); - // In case we failed to query for fileNode, streams will get reclassified as xmin - if (!fileNodeHandler.getFailedToQuery().isEmpty()) { - reclassifyCategorisedCtidStreams(streamsCategorised, fileNodeHandler.getFailedToQuery()); - finalListOfStreamsToBeSyncedViaCtid = - filterStreamsUnderVacuumForCtidSync(streamsUnderVacuum.result(), streamsCategorised.ctidStreams()); - } - - final CtidStateManager ctidStateManager = new CtidPerStreamStateManager(streamsCategorised.ctidStreams().statesFromCtidSync(), fileNodeHandler); - final Map tableBlockSizes = - PostgresQueryUtils.getTableBlockSizeForStreams( - database, - finalListOfStreamsToBeSyncedViaCtid, - getQuoteString()); + ctidStateManager.setStreamStateIteratorFields(namespacePair -> Jsons.jsonNode(xminStatus)); + final PostgresCtidHandler ctidHandler = + createInitialLoader(database, finalListOfStreamsToBeSyncedViaCtid, fileNodeHandler, getQuoteString(), ctidStateManager, + Optional.empty()); - final Map tablesMaxTuple = - CtidUtils.isTidRangeScanCapableDBServer(database) ? null - : PostgresQueryUtils.getTableMaxTupleForStreams(database, finalListOfStreamsToBeSyncedViaCtid, getQuoteString()); - if (!streamsCategorised.ctidStreams().streamsForCtidSync().isEmpty()) { + if (!xminStreamsCategorised.ctidStreams().streamsForCtidSync().isEmpty()) { LOGGER.info("Streams to be synced via ctid : {}", finalListOfStreamsToBeSyncedViaCtid.size()); LOGGER.info("Streams: {}", prettyPrintConfiguredAirbyteStreamList(finalListOfStreamsToBeSyncedViaCtid)); } else { LOGGER.info("No Streams will be synced via ctid."); } - if (!streamsCategorised.remainingStreams().streamsForXminSync().isEmpty()) { - LOGGER.info("Streams to be synced via xmin : {}", streamsCategorised.remainingStreams().streamsForXminSync().size()); - LOGGER.info("Streams: {}", prettyPrintConfiguredAirbyteStreamList(streamsCategorised.remainingStreams().streamsForXminSync())); + var xminStreams = xminStreamsCategorised.remainingStreams(); + + if (!xminStreams.streamsForXminSync().isEmpty()) { + LOGGER.info("Streams to be synced via xmin : {}", xminStreams.streamsForXminSync().size()); + LOGGER.info("Streams: {}", prettyPrintConfiguredAirbyteStreamList(xminStreams.streamsForXminSync())); } else { LOGGER.info("No Streams will be synced via xmin."); } - final XminStateManager xminStateManager = new XminStateManager(streamsCategorised.remainingStreams().statesFromXminSync()); + final XminStateManager xminStateManager = new XminStateManager(xminStreams.statesFromXminSync()); final PostgresXminHandler xminHandler = new PostgresXminHandler(database, sourceOperations, getQuoteString(), xminStatus, xminStateManager); - final PostgresCtidHandler ctidHandler = - new PostgresCtidHandler(sourceConfig, database, new CtidPostgresSourceOperations(Optional.empty()), getQuoteString(), - fileNodeHandler, tableBlockSizes, tablesMaxTuple, ctidStateManager, - namespacePair -> Jsons.jsonNode(xminStatus)); - final List> initialSyncCtidIterators = new ArrayList<>(ctidHandler.getInitialSyncCtidIterator( new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt)); final List> xminIterators = new ArrayList<>(xminHandler.getIncrementalIterators( - new ConfiguredAirbyteCatalog().withStreams(streamsCategorised.remainingStreams().streamsForXminSync()), tableNameToTable, emittedAt)); + new ConfiguredAirbyteCatalog().withStreams(xminStreams.streamsForXminSync()), tableNameToTable, emittedAt)); return Stream .of(initialSyncCtidIterators, xminIterators) @@ -555,40 +536,15 @@ public List> getIncrementalIterators(final } else if (isAnyStreamIncrementalSyncMode(catalog)) { final PostgresCursorBasedStateManager postgresCursorBasedStateManager = new PostgresCursorBasedStateManager(stateManager.getRawStateMessages(), catalog); - final StreamsCategorised streamsCategorised = categoriseStreams(postgresCursorBasedStateManager, catalog); - final ResultWithFailed> streamsUnderVacuum = streamsUnderVacuum(database, - streamsCategorised.ctidStreams().streamsForCtidSync(), - getQuoteString()); + recategoriseForCursorBased(database, catalog, stateManager, true); - // Streams we failed to query for Vacuum - such as in the case of an unsupported postgres server - // are reclassified as standard since we cannot guarantee that ctid will be possible. - reclassifyCategorisedCtidStreams(streamsCategorised, streamsUnderVacuum.failed()); - - List finalListOfStreamsToBeSyncedViaCtid = - filterStreamsUnderVacuumForCtidSync(streamsUnderVacuum.result(), streamsCategorised.ctidStreams()); final FileNodeHandler fileNodeHandler = PostgresQueryUtils.fileNodeForStreams(database, finalListOfStreamsToBeSyncedViaCtid, getQuoteString()); - // Streams we failed to query for fileNode - such as in the case of Views are reclassified as - // standard - if (!fileNodeHandler.getFailedToQuery().isEmpty()) { - reclassifyCategorisedCtidStreams(streamsCategorised, fileNodeHandler.getFailedToQuery()); - finalListOfStreamsToBeSyncedViaCtid = - filterStreamsUnderVacuumForCtidSync(streamsUnderVacuum.result(), streamsCategorised.ctidStreams()); - } - final CtidStateManager ctidStateManager = - new CtidPerStreamStateManager(streamsCategorised.ctidStreams().statesFromCtidSync(), fileNodeHandler); - final Map tableBlockSizes = - PostgresQueryUtils.getTableBlockSizeForStreams( - database, - finalListOfStreamsToBeSyncedViaCtid, - getQuoteString()); - - final Map tablesMaxTuple = - CtidUtils.isTidRangeScanCapableDBServer(database) ? null - : PostgresQueryUtils.getTableMaxTupleForStreams(database, finalListOfStreamsToBeSyncedViaCtid, getQuoteString()); + final Map cursorBasedStatusMap = + getCursorBasedSyncStatusForStreams(database, finalListOfStreamsToBeSyncedViaCtid, postgresCursorBasedStateManager, getQuoteString()); if (finalListOfStreamsToBeSyncedViaCtid.isEmpty()) { LOGGER.info("No Streams will be synced via ctid."); @@ -597,26 +553,17 @@ public List> getIncrementalIterators(final LOGGER.info("Streams: {}", prettyPrintConfiguredAirbyteStreamList(finalListOfStreamsToBeSyncedViaCtid)); } - if (!streamsCategorised.remainingStreams().streamsForCursorBasedSync().isEmpty()) { - LOGGER.info("Streams to be synced via cursor : {}", streamsCategorised.remainingStreams().streamsForCursorBasedSync().size()); - LOGGER.info("Streams: {}", prettyPrintConfiguredAirbyteStreamList(streamsCategorised.remainingStreams().streamsForCursorBasedSync())); + if (!cursorBasedStreamsCategorised.remainingStreams().streamsForCursorBasedSync().isEmpty()) { + LOGGER.info("Streams to be synced via cursor : {}", cursorBasedStreamsCategorised.remainingStreams().streamsForCursorBasedSync().size()); + LOGGER.info("Streams: {}", + prettyPrintConfiguredAirbyteStreamList(cursorBasedStreamsCategorised.remainingStreams().streamsForCursorBasedSync())); } else { LOGGER.info("No streams to be synced via cursor"); } - final Map cursorBasedStatusMap = - getCursorBasedSyncStatusForStreams(database, finalListOfStreamsToBeSyncedViaCtid, postgresCursorBasedStateManager, getQuoteString()); - + ctidStateManager.setStreamStateIteratorFields(namespacePair -> Jsons.jsonNode(cursorBasedStatusMap.get(namespacePair))); final PostgresCtidHandler cursorBasedCtidHandler = - new PostgresCtidHandler(sourceConfig, - database, - new CtidPostgresSourceOperations(Optional.empty()), - getQuoteString(), - fileNodeHandler, - tableBlockSizes, - tablesMaxTuple, - ctidStateManager, - namespacePair -> Jsons.jsonNode(cursorBasedStatusMap.get(namespacePair))); + createInitialLoader(database, finalListOfStreamsToBeSyncedViaCtid, fileNodeHandler, getQuoteString(), ctidStateManager, Optional.empty()); final List> initialSyncCtidIterators = new ArrayList<>( cursorBasedCtidHandler.getInitialSyncCtidIterator(new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), @@ -624,7 +571,7 @@ public List> getIncrementalIterators(final emittedAt)); final List> cursorBasedIterators = new ArrayList<>(super.getIncrementalIterators(database, new ConfiguredAirbyteCatalog().withStreams( - streamsCategorised.remainingStreams() + cursorBasedStreamsCategorised.remainingStreams() .streamsForCursorBasedSync()), tableNameToTable, postgresCursorBasedStateManager, emittedAt)); @@ -694,7 +641,7 @@ protected boolean isNotInternalSchema(final JsonNode jsonNode, final Set @Override protected AirbyteStateType getSupportedStateType(final JsonNode config) { - return PostgresUtils.isCdc(config) ? AirbyteStateType.GLOBAL : AirbyteStateType.STREAM; + return isCdc(config) ? AirbyteStateType.GLOBAL : AirbyteStateType.STREAM; } @Override @@ -736,7 +683,7 @@ public AirbyteConnectionStatus check(final JsonNode config) throws Exception { } } } - if (PostgresUtils.isCdc(config)) { + if (isCdc(config)) { if (config.has(SSL_MODE) && config.get(SSL_MODE).has(MODE)) { final String sslModeValue = config.get(SSL_MODE).get(MODE).asText(); if (INVALID_CDC_SSL_MODES.contains(sslModeValue)) { @@ -751,6 +698,169 @@ public AirbyteConnectionStatus check(final JsonNode config) throws Exception { return super.check(config); } + private CtidStateManager ctidStateManager = null; + private boolean savedOffsetAfterReplicationSlotLSN = false; + private List finalListOfStreamsToBeSyncedViaCtid; + + private StreamsCategorised cursorBasedStreamsCategorised; + private StreamsCategorised xminStreamsCategorised; + + private void recategoriseStreamsForXmin(final JdbcDatabase database, + final ConfiguredAirbyteCatalog catalog, + final StateManager stateManager, + final boolean incrementalModeOnly) { + final XminStatus xminStatus; + try { + xminStatus = PostgresQueryUtils.getXminStatus(database); + } catch (SQLException e) { + throw new RuntimeException(e); + } + LOGGER.info(String.format("Xmin Status : {Number of wraparounds: %s, Xmin Transaction Value: %s, Xmin Raw Value: %s", + xminStatus.getNumWraparound(), xminStatus.getXminXidValue(), xminStatus.getXminRawValue())); + xminStreamsCategorised = categoriseStreams(stateManager, catalog, xminStatus); + final ResultWithFailed> streamsUnderVacuum = streamsUnderVacuum(database, + xminStreamsCategorised.ctidStreams().streamsForCtidSync(), + getQuoteString()); + + // Streams we failed to query for Vacuum - such as in the case of an unsupported postgres server + // are reclassified as xmin since we cannot guarantee that ctid will be possible. + reclassifyCategorisedCtidStreams(xminStreamsCategorised, streamsUnderVacuum.failed()); + + finalListOfStreamsToBeSyncedViaCtid = + filterStreamsUnderVacuumForCtidSync(streamsUnderVacuum.result(), xminStreamsCategorised.ctidStreams()); + final FileNodeHandler fileNodeHandler = + PostgresQueryUtils.fileNodeForStreams(database, + finalListOfStreamsToBeSyncedViaCtid, + getQuoteString()); + if (!fileNodeHandler.getFailedToQuery().isEmpty()) { + reclassifyCategorisedCtidStreams(xminStreamsCategorised, fileNodeHandler.getFailedToQuery()); + finalListOfStreamsToBeSyncedViaCtid = + filterStreamsUnderVacuumForCtidSync(streamsUnderVacuum.result(), xminStreamsCategorised.ctidStreams()); + } + if (incrementalModeOnly) { + finalListOfStreamsToBeSyncedViaCtid = filterIncrementalSyncModeStreams(finalListOfStreamsToBeSyncedViaCtid); + } + } + + private void recategoriseForCursorBased(final JdbcDatabase database, + final ConfiguredAirbyteCatalog catalog, + final StateManager postgresCursorBasedStateManager, + final boolean incrementalModeOnly) { + + cursorBasedStreamsCategorised = categoriseStreams(postgresCursorBasedStateManager, catalog); + + final ResultWithFailed> streamsUnderVacuum = streamsUnderVacuum(database, + cursorBasedStreamsCategorised.ctidStreams().streamsForCtidSync(), + getQuoteString()); + + // Streams we failed to query for Vacuum - such as in the case of an unsupported postgres server + // are reclassified as standard since we cannot guarantee that ctid will be possible. + reclassifyCategorisedCtidStreams(cursorBasedStreamsCategorised, streamsUnderVacuum.failed()); + + finalListOfStreamsToBeSyncedViaCtid = + filterStreamsUnderVacuumForCtidSync(streamsUnderVacuum.result(), cursorBasedStreamsCategorised.ctidStreams()); + + final FileNodeHandler fileNodeHandler = + PostgresQueryUtils.fileNodeForStreams(database, + finalListOfStreamsToBeSyncedViaCtid, + getQuoteString()); + + // Streams we failed to query for fileNode - such as in the case of Views are reclassified as + // standard + if (!fileNodeHandler.getFailedToQuery().isEmpty()) { + reclassifyCategorisedCtidStreams(cursorBasedStreamsCategorised, fileNodeHandler.getFailedToQuery()); + finalListOfStreamsToBeSyncedViaCtid = + filterStreamsUnderVacuumForCtidSync(streamsUnderVacuum.result(), cursorBasedStreamsCategorised.ctidStreams()); + } + if (incrementalModeOnly) { + finalListOfStreamsToBeSyncedViaCtid = filterIncrementalSyncModeStreams(finalListOfStreamsToBeSyncedViaCtid); + } + } + + private List filterIncrementalSyncModeStreams(final List allStreams) { + return allStreams.stream().filter(streamUnderCheck -> streamUnderCheck.getSyncMode() == SyncMode.INCREMENTAL).collect(toList()); + } + + @Override + protected void initializeForStateManager(final JdbcDatabase database, + final ConfiguredAirbyteCatalog catalog, + final Map>> tableNameToTable, + final StateManager stateManager) { + if (ctidStateManager != null) { + return; + } + var sourceConfig = database.getSourceConfig(); + + if (isCdc(sourceConfig)) { + savedOffsetAfterReplicationSlotLSN = + getSavedOffsetAfterReplicationSlotLSN(database, catalog, stateManager, getReplicationSlot(database, sourceConfig).get(0)); + ctidStateManager = getCtidInitialLoadGlobalStateManager(database, catalog, stateManager, getQuoteString(), savedOffsetAfterReplicationSlotLSN); + } else { + if (isXmin(sourceConfig)) { + recategoriseStreamsForXmin(database, catalog, stateManager, /* incrementalOnly= */false); + final FileNodeHandler fileNodeHandler = + PostgresQueryUtils.fileNodeForStreams(database, + finalListOfStreamsToBeSyncedViaCtid, + getQuoteString()); + ctidStateManager = new CtidPerStreamStateManager(xminStreamsCategorised.ctidStreams().statesFromCtidSync(), fileNodeHandler); + ctidStateManager.setFileNodeHandler(fileNodeHandler); + } else { + recategoriseForCursorBased(database, catalog, stateManager, /* incrementalOnly= */false); + final FileNodeHandler fileNodeHandler = + PostgresQueryUtils.fileNodeForStreams(database, + finalListOfStreamsToBeSyncedViaCtid, + getQuoteString()); + + ctidStateManager = + new CtidPerStreamStateManager(cursorBasedStreamsCategorised.ctidStreams().statesFromCtidSync(), fileNodeHandler); + ctidStateManager.setFileNodeHandler(fileNodeHandler); + } + } + } + + @Override + public boolean supportResumableFullRefresh(final JdbcDatabase database, final ConfiguredAirbyteStream airbyteStream) { + // finalListOfStreamsToBeSyncedViaCtid will be initialized as part of state manager initialization + // for non CDC only. + if (!ctidStateManager.getFileNodeHandler().hasFileNode(new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair( + airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace()))) { + LOGGER.info("stream " + airbyteStream + " will not sync in resumeable full refresh mode."); + return false; + + } + + final FileNodeHandler fileNodeHandler = + PostgresQueryUtils.fileNodeForStreams(database, + List.of(airbyteStream), + getQuoteString()); + + // We do not support RFR on views. + if (!fileNodeHandler.getFailedToQuery().isEmpty()) { + if (fileNodeHandler.getFailedToQuery() + .contains(new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace()))) { + LOGGER.info("stream " + airbyteStream + " will not sync in resumeable full refresh mode."); + return false; + } + } + + LOGGER.info("stream " + airbyteStream + " will sync in resumeable full refresh mode."); + + return true; + } + + @Override + public InitialLoadHandler getInitialLoadHandler(final JdbcDatabase database, + final ConfiguredAirbyteStream stream, + final ConfiguredAirbyteCatalog catalog, + final StateManager stateManager) { + final FileNodeHandler fileNodeHandler = + PostgresQueryUtils.fileNodeForStreams(database, + List.of(stream), + getQuoteString()); + + return createInitialLoader(database, List.of(stream), fileNodeHandler, getQuoteString(), ctidStateManager, Optional.empty()); + } + protected String toSslJdbcParam(final SslMode sslMode) { return toSslJdbcParamInternal(sslMode); } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcConnectorMetadataInjector.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcConnectorMetadataInjector.java index 13439a522c63..11f4eb2753c6 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcConnectorMetadataInjector.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcConnectorMetadataInjector.java @@ -23,7 +23,7 @@ public class PostgresCdcConnectorMetadataInjector implements CdcMetadataInjector this.lsn = null; } - PostgresCdcConnectorMetadataInjector(final String transactionTimestamp, final Long lsn) { + public PostgresCdcConnectorMetadataInjector(final String transactionTimestamp, final Long lsn) { this.transactionTimestamp = transactionTimestamp; this.lsn = lsn; } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java index 5675561689b1..21c3d1ba78b9 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java @@ -10,6 +10,7 @@ import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY; import static io.airbyte.integrations.source.postgres.PostgresUtils.isDebugMode; import static io.airbyte.integrations.source.postgres.PostgresUtils.prettyPrintConfiguredAirbyteStreamList; +import static io.airbyte.integrations.source.postgres.ctid.CtidUtils.createInitialLoader; import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.cdk.db.jdbc.JdbcDatabase; @@ -26,14 +27,10 @@ import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; import io.airbyte.integrations.source.postgres.PostgresQueryUtils; -import io.airbyte.integrations.source.postgres.PostgresQueryUtils.TableBlockSize; import io.airbyte.integrations.source.postgres.PostgresType; import io.airbyte.integrations.source.postgres.PostgresUtils; import io.airbyte.integrations.source.postgres.cdc.PostgresCdcCtidUtils.CtidStreams; import io.airbyte.integrations.source.postgres.ctid.CtidGlobalStateManager; -import io.airbyte.integrations.source.postgres.ctid.CtidPostgresSourceOperations; -import io.airbyte.integrations.source.postgres.ctid.CtidStateManager; -import io.airbyte.integrations.source.postgres.ctid.CtidUtils; import io.airbyte.integrations.source.postgres.ctid.FileNodeHandler; import io.airbyte.integrations.source.postgres.ctid.PostgresCtidHandler; import io.airbyte.protocol.models.CommonField; @@ -61,158 +58,210 @@ public class PostgresCdcCtidInitializer { private static final Logger LOGGER = LoggerFactory.getLogger(PostgresCdcCtidInitializer.class); + public static boolean getSavedOffsetAfterReplicationSlotLSN(final JdbcDatabase database, + final ConfiguredAirbyteCatalog catalog, + final StateManager stateManager, + final JsonNode replicationSlot) { + final PostgresDebeziumStateUtil postgresDebeziumStateUtil = new PostgresDebeziumStateUtil(); + + final CdcState defaultCdcState = getDefaultCdcState(postgresDebeziumStateUtil, database); + + final JsonNode state = + (stateManager.getCdcStateManager().getCdcState() == null || stateManager.getCdcStateManager().getCdcState().getState() == null) + ? defaultCdcState.getState() + : Jsons.clone(stateManager.getCdcStateManager().getCdcState().getState()); + + final OptionalLong savedOffset = postgresDebeziumStateUtil.savedOffset( + Jsons.clone(PostgresCdcProperties.getDebeziumDefaultProperties(database)), + catalog, + state, + database.getSourceConfig()); + return postgresDebeziumStateUtil.isSavedOffsetAfterReplicationSlotLSN( + // We can assume that there will be only 1 replication slot cause before the sync starts for + // Postgres CDC, + // we run all the check operations and one of the check validates that the replication slot exists + // and has only 1 entry + replicationSlot, + savedOffset); + } + + public static CtidGlobalStateManager getCtidInitialLoadGlobalStateManager(final JdbcDatabase database, + final ConfiguredAirbyteCatalog catalog, + final StateManager stateManager, + final String quoteString, + final boolean savedOffsetAfterReplicationSlotLSN) { + final PostgresDebeziumStateUtil postgresDebeziumStateUtil = new PostgresDebeziumStateUtil(); + + final CtidStreams ctidStreams = PostgresCdcCtidUtils.streamsToSyncViaCtid(stateManager.getCdcStateManager(), catalog, + savedOffsetAfterReplicationSlotLSN); + final List streamsUnderVacuum = new ArrayList<>(); + streamsUnderVacuum.addAll(streamsUnderVacuum(database, + ctidStreams.streamsForCtidSync(), quoteString).result()); + + final List finalListOfStreamsToBeSyncedViaCtid = + streamsUnderVacuum.isEmpty() ? ctidStreams.streamsForCtidSync() + : ctidStreams.streamsForCtidSync().stream() + .filter(c -> !streamsUnderVacuum.contains(AirbyteStreamNameNamespacePair.fromConfiguredAirbyteSteam(c))) + .toList(); + LOGGER.info("Streams to be synced via ctid : {}", finalListOfStreamsToBeSyncedViaCtid.size()); + LOGGER.info("Streams: {}", prettyPrintConfiguredAirbyteStreamList(finalListOfStreamsToBeSyncedViaCtid)); + final FileNodeHandler fileNodeHandler = PostgresQueryUtils.fileNodeForStreams(database, + finalListOfStreamsToBeSyncedViaCtid, + quoteString); + final CdcState defaultCdcState = getDefaultCdcState(postgresDebeziumStateUtil, database); + + final CtidGlobalStateManager ctidStateManager = + new CtidGlobalStateManager(ctidStreams, fileNodeHandler, stateManager, catalog, savedOffsetAfterReplicationSlotLSN, defaultCdcState); + return ctidStateManager; + + } + + private static CdcState getDefaultCdcState(final PostgresDebeziumStateUtil postgresDebeziumStateUtil, final JdbcDatabase database) { + var sourceConfig = database.getSourceConfig(); + final JsonNode initialDebeziumState = postgresDebeziumStateUtil.constructInitialDebeziumState(database, + sourceConfig.get(JdbcUtils.DATABASE_KEY).asText()); + return new CdcState().withState(initialDebeziumState); + } + public static List> cdcCtidIteratorsCombined(final JdbcDatabase database, final ConfiguredAirbyteCatalog catalog, final Map>> tableNameToTable, final StateManager stateManager, final Instant emittedAt, final String quoteString, - final JsonNode replicationSlot) { - try { - final JsonNode sourceConfig = database.getSourceConfig(); - final Duration firstRecordWaitTime = PostgresUtils.getFirstRecordWaitTime(sourceConfig); - final Duration subsequentRecordWaitTime = PostgresUtils.getSubsequentRecordWaitTime(sourceConfig); - final int queueSize = PostgresUtils.getQueueSize(sourceConfig); - LOGGER.info("First record waiting time: {} seconds", firstRecordWaitTime.getSeconds()); - LOGGER.info("Queue size: {}", queueSize); - - if (isDebugMode(sourceConfig) && !PostgresUtils.shouldFlushAfterSync(sourceConfig)) { - throw new ConfigErrorException("WARNING: The config indicates that we are clearing the WAL while reading data. This will mutate the WAL" + - " associated with the source being debugged and is not advised."); - } + final CtidGlobalStateManager ctidStateManager, + final boolean savedOffsetAfterReplicationSlotLSN) { + final JsonNode sourceConfig = database.getSourceConfig(); + final Duration firstRecordWaitTime = PostgresUtils.getFirstRecordWaitTime(sourceConfig); + final Duration subsequentRecordWaitTime = PostgresUtils.getSubsequentRecordWaitTime(sourceConfig); + final int queueSize = PostgresUtils.getQueueSize(sourceConfig); + LOGGER.info("First record waiting time: {} seconds", firstRecordWaitTime.getSeconds()); + LOGGER.info("Queue size: {}", queueSize); - final PostgresDebeziumStateUtil postgresDebeziumStateUtil = new PostgresDebeziumStateUtil(); + if (isDebugMode(sourceConfig) && !PostgresUtils.shouldFlushAfterSync(sourceConfig)) { + throw new ConfigErrorException("WARNING: The config indicates that we are clearing the WAL while reading data. This will mutate the WAL" + + " associated with the source being debugged and is not advised."); + } - final JsonNode initialDebeziumState = postgresDebeziumStateUtil.constructInitialDebeziumState(database, - sourceConfig.get(JdbcUtils.DATABASE_KEY).asText()); + final PostgresDebeziumStateUtil postgresDebeziumStateUtil = new PostgresDebeziumStateUtil(); - final JsonNode state = - (stateManager.getCdcStateManager().getCdcState() == null || stateManager.getCdcStateManager().getCdcState().getState() == null) - ? initialDebeziumState - : Jsons.clone(stateManager.getCdcStateManager().getCdcState().getState()); + final JsonNode initialDebeziumState = postgresDebeziumStateUtil.constructInitialDebeziumState(database, + sourceConfig.get(JdbcUtils.DATABASE_KEY).asText()); - final OptionalLong savedOffset = postgresDebeziumStateUtil.savedOffset( - Jsons.clone(PostgresCdcProperties.getDebeziumDefaultProperties(database)), - catalog, - state, - sourceConfig); + final JsonNode state = + (stateManager.getCdcStateManager().getCdcState() == null || stateManager.getCdcStateManager().getCdcState().getState() == null) + ? initialDebeziumState + : Jsons.clone(stateManager.getCdcStateManager().getCdcState().getState()); - // We should always be able to extract offset out of state if it's not null - if (state != null && savedOffset.isEmpty()) { - throw new RuntimeException( - "Unable extract the offset out of state, State mutation might not be working. " + state.asText()); - } + final OptionalLong savedOffset = postgresDebeziumStateUtil.savedOffset( + Jsons.clone(PostgresCdcProperties.getDebeziumDefaultProperties(database)), + catalog, + state, + sourceConfig); - final boolean savedOffsetAfterReplicationSlotLSN = postgresDebeziumStateUtil.isSavedOffsetAfterReplicationSlotLSN( - // We can assume that there will be only 1 replication slot cause before the sync starts for - // Postgres CDC, - // we run all the check operations and one of the check validates that the replication slot exists - // and has only 1 entry - replicationSlot, - savedOffset); - - if (!savedOffsetAfterReplicationSlotLSN) { - AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage()); - if (!sourceConfig.get("replication_method").has(INVALID_CDC_CURSOR_POSITION_PROPERTY) || sourceConfig.get("replication_method").get( - INVALID_CDC_CURSOR_POSITION_PROPERTY).asText().equals(FAIL_SYNC_OPTION)) { - throw new ConfigErrorException( - "Saved offset is before replication slot's confirmed lsn. Please reset the connection, and then increase WAL retention and/or increase sync frequency to prevent this from happening in the future. See https://docs.airbyte.com/integrations/sources/postgres/postgres-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details."); - } - LOGGER.warn("Saved offset is before Replication slot's confirmed_flush_lsn, Airbyte will trigger sync from scratch"); - } else if (!isDebugMode(sourceConfig) && PostgresUtils.shouldFlushAfterSync(sourceConfig)) { - // We do not want to acknowledge the WAL logs in debug mode. - postgresDebeziumStateUtil.commitLSNToPostgresDatabase(database.getDatabaseConfig(), - savedOffset, - sourceConfig.get("replication_method").get("replication_slot").asText(), - sourceConfig.get("replication_method").get("publication").asText(), - PostgresUtils.getPluginValue(sourceConfig.get("replication_method"))); - } - final CdcState stateToBeUsed = (!savedOffsetAfterReplicationSlotLSN || stateManager.getCdcStateManager().getCdcState() == null - || stateManager.getCdcStateManager().getCdcState().getState() == null) ? new CdcState().withState(initialDebeziumState) - : stateManager.getCdcStateManager().getCdcState(); - final CtidStreams ctidStreams = PostgresCdcCtidUtils.streamsToSyncViaCtid(stateManager.getCdcStateManager(), catalog, - savedOffsetAfterReplicationSlotLSN); - final List> initialSyncCtidIterators = new ArrayList<>(); - final List streamsUnderVacuum = new ArrayList<>(); - if (!ctidStreams.streamsForCtidSync().isEmpty()) { - streamsUnderVacuum.addAll(streamsUnderVacuum(database, - ctidStreams.streamsForCtidSync(), quoteString).result()); - - final List finalListOfStreamsToBeSyncedViaCtid = - streamsUnderVacuum.isEmpty() ? ctidStreams.streamsForCtidSync() - : ctidStreams.streamsForCtidSync().stream() - .filter(c -> !streamsUnderVacuum.contains(AirbyteStreamNameNamespacePair.fromConfiguredAirbyteSteam(c))) - .toList(); - LOGGER.info("Streams to be synced via ctid : {}", finalListOfStreamsToBeSyncedViaCtid.size()); - LOGGER.info("Streams: {}", prettyPrintConfiguredAirbyteStreamList(finalListOfStreamsToBeSyncedViaCtid)); - final FileNodeHandler fileNodeHandler = PostgresQueryUtils.fileNodeForStreams(database, - finalListOfStreamsToBeSyncedViaCtid, - quoteString); - final CtidStateManager ctidStateManager = new CtidGlobalStateManager(ctidStreams, fileNodeHandler, stateToBeUsed, catalog); - final CtidPostgresSourceOperations ctidPostgresSourceOperations = new CtidPostgresSourceOperations( - Optional.of(new PostgresCdcConnectorMetadataInjector(emittedAt.toString(), io.airbyte.cdk.db.PostgresUtils.getLsn(database).asLong()))); - final Map tableBlockSizes = - PostgresQueryUtils.getTableBlockSizeForStreams( - database, - finalListOfStreamsToBeSyncedViaCtid, - quoteString); - - final Map tablesMaxTuple = - CtidUtils.isTidRangeScanCapableDBServer(database) ? null - : PostgresQueryUtils.getTableMaxTupleForStreams(database, finalListOfStreamsToBeSyncedViaCtid, quoteString); - - final PostgresCtidHandler ctidHandler = new PostgresCtidHandler(sourceConfig, database, - ctidPostgresSourceOperations, - quoteString, - fileNodeHandler, - tableBlockSizes, - tablesMaxTuple, - ctidStateManager, - namespacePair -> Jsons.emptyObject()); - - initialSyncCtidIterators.addAll(ctidHandler.getInitialSyncCtidIterator( - new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt)); - } else { - LOGGER.info("No streams will be synced via ctid"); - } + // We should always be able to extract offset out of state if it's not null + if (state != null && savedOffset.isEmpty()) { + throw new RuntimeException( + "Unable extract the offset out of state, State mutation might not be working. " + state.asText()); + } - // Gets the target position. - final var targetPosition = PostgresCdcTargetPosition.targetPosition(database); - // Attempt to advance LSN past the target position. For versions of Postgres before PG15, this - // ensures that there is an event that debezium will - // receive that is after the target LSN. - PostgresUtils.advanceLsn(database); - final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler<>(sourceConfig, - targetPosition, false, firstRecordWaitTime, queueSize, false); - final PostgresCdcStateHandler postgresCdcStateHandler = new PostgresCdcStateHandler(stateManager); - final var propertiesManager = new RelationalDbDebeziumPropertiesManager( - PostgresCdcProperties.getDebeziumDefaultProperties(database), sourceConfig, catalog); - final var eventConverter = new RelationalDbDebeziumEventConverter(new PostgresCdcConnectorMetadataInjector(), emittedAt); - - final Supplier> incrementalIteratorSupplier = () -> handler.getIncrementalIterators( - propertiesManager, eventConverter, new PostgresCdcSavedInfoFetcher(stateToBeUsed), postgresCdcStateHandler); - - if (initialSyncCtidIterators.isEmpty()) { - return Collections.singletonList(incrementalIteratorSupplier.get()); + if (!savedOffsetAfterReplicationSlotLSN) { + AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage()); + if (!sourceConfig.get("replication_method").has(INVALID_CDC_CURSOR_POSITION_PROPERTY) || sourceConfig.get("replication_method").get( + INVALID_CDC_CURSOR_POSITION_PROPERTY).asText().equals(FAIL_SYNC_OPTION)) { + throw new ConfigErrorException( + "Saved offset is before replication slot's confirmed lsn. Please reset the connection, and then increase WAL retention and/or increase sync frequency to prevent this from happening in the future. See https://docs.airbyte.com/integrations/sources/postgres/postgres-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details."); } + LOGGER.warn("Saved offset is before Replication slot's confirmed_flush_lsn, Airbyte will trigger sync from scratch"); + } else if (!isDebugMode(sourceConfig) && PostgresUtils.shouldFlushAfterSync(sourceConfig)) { + // We do not want to acknowledge the WAL logs in debug mode. + postgresDebeziumStateUtil.commitLSNToPostgresDatabase(database.getDatabaseConfig(), + savedOffset, + sourceConfig.get("replication_method").get("replication_slot").asText(), + sourceConfig.get("replication_method").get("publication").asText(), + PostgresUtils.getPluginValue(sourceConfig.get("replication_method"))); + } + final CdcState stateToBeUsed = ctidStateManager.getCdcState(); + final CtidStreams ctidStreams = PostgresCdcCtidUtils.streamsToSyncViaCtid(stateManager.getCdcStateManager(), catalog, + savedOffsetAfterReplicationSlotLSN); + final List> initialSyncCtidIterators = new ArrayList<>(); + final List streamsUnderVacuum = new ArrayList<>(); + if (!ctidStreams.streamsForCtidSync().isEmpty()) { + streamsUnderVacuum.addAll(streamsUnderVacuum(database, + ctidStreams.streamsForCtidSync(), quoteString).result()); - if (streamsUnderVacuum.isEmpty()) { - // This starts processing the WAL as soon as initial sync is complete, this is a bit different from - // the current cdc syncs. - // We finish the current CDC once the initial snapshot is complete and the next sync starts - // processing the WAL - return Stream - .of(initialSyncCtidIterators, Collections.singletonList(AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier, null))) - .flatMap(Collection::stream) - .collect(Collectors.toList()); - } else { - LOGGER.warn("Streams are under vacuuming, not going to process WAL"); - return initialSyncCtidIterators; + final List finalListOfStreamsToBeSyncedViaCtid = + streamsUnderVacuum.isEmpty() ? ctidStreams.streamsForCtidSync() + : ctidStreams.streamsForCtidSync().stream() + .filter(c -> !streamsUnderVacuum.contains(AirbyteStreamNameNamespacePair.fromConfiguredAirbyteSteam(c))) + .toList(); + LOGGER.info("Streams to be synced via ctid : {}", finalListOfStreamsToBeSyncedViaCtid.size()); + final FileNodeHandler fileNodeHandler = PostgresQueryUtils.fileNodeForStreams(database, + finalListOfStreamsToBeSyncedViaCtid, + quoteString); + final PostgresCtidHandler ctidHandler; + try { + ctidHandler = + createInitialLoader(database, finalListOfStreamsToBeSyncedViaCtid, fileNodeHandler, quoteString, ctidStateManager, + Optional.of( + new PostgresCdcConnectorMetadataInjector(emittedAt.toString(), io.airbyte.cdk.db.PostgresUtils.getLsn(database).asLong()))); + } catch (SQLException e) { + throw new RuntimeException(e); } - } catch (final SQLException e) { - throw new RuntimeException(e); + initialSyncCtidIterators.addAll(ctidHandler.getInitialSyncCtidIterator( + new ConfiguredAirbyteCatalog().withStreams(finalListOfStreamsToBeSyncedViaCtid), tableNameToTable, emittedAt)); + } else { + LOGGER.info("No streams will be synced via ctid"); + } + + // Gets the target position. + final var targetPosition = PostgresCdcTargetPosition.targetPosition(database); + // Attempt to advance LSN past the target position. For versions of Postgres before PG15, this + // ensures that there is an event that debezium will + // receive that is after the target LSN. + PostgresUtils.advanceLsn(database); + final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler<>(sourceConfig, + targetPosition, false, firstRecordWaitTime, queueSize, false); + final PostgresCdcStateHandler postgresCdcStateHandler = new PostgresCdcStateHandler(stateManager); + final var propertiesManager = new RelationalDbDebeziumPropertiesManager( + PostgresCdcProperties.getDebeziumDefaultProperties(database), sourceConfig, catalog); + final var eventConverter = new RelationalDbDebeziumEventConverter(new PostgresCdcConnectorMetadataInjector(), emittedAt); + + final Supplier> incrementalIteratorSupplier = () -> handler.getIncrementalIterators( + propertiesManager, eventConverter, new PostgresCdcSavedInfoFetcher(stateToBeUsed), postgresCdcStateHandler); + + if (initialSyncCtidIterators.isEmpty()) { + return Collections.singletonList(incrementalIteratorSupplier.get()); } + + if (streamsUnderVacuum.isEmpty()) { + // This starts processing the WAL as soon as initial sync is complete, this is a bit different from + // the current cdc syncs. + // We finish the current CDC once the initial snapshot is complete and the next sync starts + // processing the WAL + return Stream + .of(initialSyncCtidIterators, Collections.singletonList(AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier, null))) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } else { + LOGGER.warn("Streams are under vacuuming, not going to process WAL"); + return initialSyncCtidIterators; + } + } + + public static CdcState getCdcState(final JdbcDatabase database, + final StateManager stateManager) { + + final JsonNode sourceConfig = database.getSourceConfig(); + final PostgresDebeziumStateUtil postgresDebeziumStateUtil = new PostgresDebeziumStateUtil(); + + final JsonNode initialDebeziumState = postgresDebeziumStateUtil.constructInitialDebeziumState(database, + sourceConfig.get(JdbcUtils.DATABASE_KEY).asText()); + + return (stateManager.getCdcStateManager().getCdcState() == null + || stateManager.getCdcStateManager().getCdcState().getState() == null) ? new CdcState().withState(initialDebeziumState) + : stateManager.getCdcStateManager().getCdcState(); } } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidUtils.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidUtils.java index 0c8ebe4aa354..454d932fd3f8 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidUtils.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidUtils.java @@ -16,7 +16,6 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import io.airbyte.protocol.models.v0.StreamDescriptor; -import io.airbyte.protocol.models.v0.SyncMode; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -34,7 +33,6 @@ public static CtidStreams streamsToSyncViaCtid(final CdcStateManager stateManage return new CtidStreams( fullCatalog.getStreams() .stream() - .filter(c -> c.getSyncMode() == SyncMode.INCREMENTAL) .collect(Collectors.toList()), new HashMap<>()); } @@ -78,7 +76,6 @@ private static List identifyStreamsToSnapshot(final Con final Set allStreams = AirbyteStreamNameNamespacePair.fromConfiguredCatalog(catalog); final Set newlyAddedStreams = new HashSet<>(Sets.difference(allStreams, alreadySyncedStreams)); return catalog.getStreams().stream() - .filter(c -> c.getSyncMode() == SyncMode.INCREMENTAL) .filter(stream -> newlyAddedStreams.contains(AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream()))).map(Jsons::clone) .collect(Collectors.toList()); } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidGlobalStateManager.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidGlobalStateManager.java index 5def38b9240b..659740f32629 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidGlobalStateManager.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidGlobalStateManager.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.cdk.integrations.source.relationaldb.models.CdcState; import io.airbyte.cdk.integrations.source.relationaldb.models.DbStreamState; +import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.source.postgres.cdc.PostgresCdcCtidUtils.CtidStreams; import io.airbyte.integrations.source.postgres.internal.models.CtidStatus; @@ -33,29 +34,41 @@ public class CtidGlobalStateManager extends CtidStateManager { private static final Logger LOGGER = LoggerFactory.getLogger(CtidGlobalStateManager.class); - private final CdcState cdcState; - private final Set streamsThatHaveCompletedSnapshot; + private final StateManager stateManager; + private Set resumableFullRefreshStreams; + private Set streamsThatHaveCompletedSnapshot; + private final boolean savedOffsetAfterReplicationSlotLSN; + private final CdcState defaultCdcState; public CtidGlobalStateManager(final CtidStreams ctidStreams, final FileNodeHandler fileNodeHandler, - final CdcState cdcState, - final ConfiguredAirbyteCatalog catalog) { + final StateManager stateManager, + final ConfiguredAirbyteCatalog catalog, + final boolean savedOffsetAfterReplicationSlotLSN, + final CdcState defaultCdcState) { super(filterOutExpiredFileNodes(ctidStreams.pairToCtidStatus(), fileNodeHandler)); - this.cdcState = cdcState; - this.streamsThatHaveCompletedSnapshot = initStreamsCompletedSnapshot(ctidStreams, catalog); + this.stateManager = stateManager; + this.savedOffsetAfterReplicationSlotLSN = savedOffsetAfterReplicationSlotLSN; + this.defaultCdcState = defaultCdcState; + initStream(ctidStreams, catalog); + this.fileNodeHandler = fileNodeHandler; } - private static Set initStreamsCompletedSnapshot(final CtidStreams ctidStreams, - final ConfiguredAirbyteCatalog catalog) { - final Set streamsThatHaveCompletedSnapshot = new HashSet<>(); + private void initStream(final CtidStreams ctidStreams, + final ConfiguredAirbyteCatalog catalog) { + this.streamsThatHaveCompletedSnapshot = new HashSet<>(); + this.resumableFullRefreshStreams = new HashSet<>(); catalog.getStreams().forEach(configuredAirbyteStream -> { - if (ctidStreams.streamsForCtidSync().contains(configuredAirbyteStream) || configuredAirbyteStream.getSyncMode() != SyncMode.INCREMENTAL) { - return; + if (!ctidStreams.streamsForCtidSync().contains(configuredAirbyteStream) && configuredAirbyteStream.getSyncMode() == SyncMode.INCREMENTAL) { + streamsThatHaveCompletedSnapshot.add( + new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace())); + } + if (ctidStreams.streamsForCtidSync().contains(configuredAirbyteStream) + && configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) { + this.resumableFullRefreshStreams.add( + new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace())); } - streamsThatHaveCompletedSnapshot.add( - new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace())); }); - return streamsThatHaveCompletedSnapshot; } private static Map filterOutExpiredFileNodes( @@ -79,37 +92,65 @@ private static Map filterOutExpiredF public AirbyteStateMessage createCtidStateMessage(final AirbyteStreamNameNamespacePair pair, final CtidStatus ctidStatus) { pairToCtidStatus.put(pair, ctidStatus); final List streamStates = new ArrayList<>(); + streamsThatHaveCompletedSnapshot.forEach(stream -> { final DbStreamState state = getFinalState(stream); streamStates.add(getAirbyteStreamState(stream, Jsons.jsonNode(state))); }); - streamStates.add(getAirbyteStreamState(pair, (Jsons.jsonNode(ctidStatus)))); - final AirbyteGlobalState globalState = new AirbyteGlobalState(); - globalState.setSharedState(Jsons.jsonNode(cdcState)); - globalState.setStreamStates(streamStates); + + resumableFullRefreshStreams.forEach(stream -> { + final CtidStatus ctidStatusForFullRefreshStream = generateCtidStatusForState(stream); + streamStates.add(getAirbyteStreamState(stream, (Jsons.jsonNode(ctidStatusForFullRefreshStream)))); + }); + + if (!resumableFullRefreshStreams.contains(pair)) { + streamStates.add(getAirbyteStreamState(pair, (Jsons.jsonNode(ctidStatus)))); + } return new AirbyteStateMessage() .withType(AirbyteStateType.GLOBAL) - .withGlobal(globalState); + .withGlobal(generateGlobalState(streamStates)); + } + + public AirbyteGlobalState generateGlobalState(final List streamStates) { + final CdcState stateToBeUsed = getCdcState(); + final AirbyteGlobalState globalState = new AirbyteGlobalState(); + globalState.setSharedState(Jsons.jsonNode(stateToBeUsed)); + globalState.setStreamStates(streamStates); + return globalState; + + } + + public CdcState getCdcState() { + final CdcState stateManagerCdcState = stateManager.getCdcStateManager().getCdcState(); + + return !savedOffsetAfterReplicationSlotLSN || stateManagerCdcState == null + || stateManagerCdcState.getState() == null ? defaultCdcState + : stateManagerCdcState; + } @Override public AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun) { - streamsThatHaveCompletedSnapshot.add(pair); + // Only incremental streams can be transformed into the next phase. + if (!resumableFullRefreshStreams.contains(pair)) { + streamsThatHaveCompletedSnapshot.add(pair); + } final List streamStates = new ArrayList<>(); streamsThatHaveCompletedSnapshot.forEach(stream -> { final DbStreamState state = getFinalState(stream); streamStates.add(getAirbyteStreamState(stream, Jsons.jsonNode(state))); }); - final AirbyteGlobalState globalState = new AirbyteGlobalState(); - globalState.setSharedState(Jsons.jsonNode(cdcState)); - globalState.setStreamStates(streamStates); + resumableFullRefreshStreams.forEach(stream -> { + final CtidStatus ctidStatusForFullRefreshStream = generateCtidStatusForState(pair); + streamStates.add(getAirbyteStreamState(pair, Jsons.jsonNode(ctidStatusForFullRefreshStream))); + }); return new AirbyteStateMessage() .withType(AirbyteStateType.GLOBAL) - .withGlobal(globalState); + .withGlobal(generateGlobalState(streamStates)); } private AirbyteStreamState getAirbyteStreamState(final AirbyteStreamNameNamespacePair pair, final JsonNode stateData) { diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidPerStreamStateManager.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidPerStreamStateManager.java index c3a514c74006..3a26ecbd6081 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidPerStreamStateManager.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidPerStreamStateManager.java @@ -81,6 +81,11 @@ public AirbyteStateMessage createCtidStateMessage(final AirbyteStreamNameNamespa @Override public AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun) { + if (streamStateForIncrementalRun == null || streamStateForIncrementalRun.isEmpty()) { + // resumeable full refresh for cursor based stream. + var ctidStatus = generateCtidStatusForState(pair); + return createCtidStateMessage(pair, ctidStatus); + } return XminStateManager.getAirbyteStateMessage(pair, Jsons.object(streamStateForIncrementalRun, XminStatus.class)); } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidStateManager.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidStateManager.java index d3f89529fbe6..f95b0a1f0a84 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidStateManager.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidStateManager.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateMessageProducer; +import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.source.postgres.internal.models.CtidStatus; import io.airbyte.integrations.source.postgres.internal.models.InternalModels.StateType; import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; @@ -27,13 +28,14 @@ public abstract class CtidStateManager implements SourceStateMessageProducer pairToCtidStatus; - private Function streamStateForIncrementalRunSupplier; + protected Function streamStateForIncrementalRunSupplier; - private String lastCtid; - private FileNodeHandler fileNodeHandler; + protected String lastCtid; + protected FileNodeHandler fileNodeHandler; protected CtidStateManager(final Map pairToCtidStatus) { this.pairToCtidStatus = pairToCtidStatus; + this.streamStateForIncrementalRunSupplier = namespacePair -> Jsons.emptyObject(); } public CtidStatus getCtidStatus(final AirbyteStreamNameNamespacePair pair) { @@ -55,26 +57,39 @@ public static boolean validateRelationFileNode(final CtidStatus ctidstatus, public abstract AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun); - public void setStreamStateIteratorFields(Function streamStateForIncrementalRunSupplier, - FileNodeHandler fileNodeHandler) { + public void setStreamStateIteratorFields(Function streamStateForIncrementalRunSupplier) { this.streamStateForIncrementalRunSupplier = streamStateForIncrementalRunSupplier; + } + + public void setFileNodeHandler(final FileNodeHandler fileNodeHandler) { this.fileNodeHandler = fileNodeHandler; } + public FileNodeHandler getFileNodeHandler() { + return fileNodeHandler; + } + @Override public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirbyteStream stream) { final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace()); + final CtidStatus ctidStatus = generateCtidStatusForState(pair); + LOGGER.info("Emitting ctid state for stream {}, state is {}", pair, ctidStatus); + return createCtidStateMessage(pair, ctidStatus); + } + + protected CtidStatus generateCtidStatusForState(final AirbyteStreamNameNamespacePair pair) { final Long fileNode = fileNodeHandler.getFileNode(pair); assert fileNode != null; - final CtidStatus ctidStatus = new CtidStatus() + // If the table is empty, lastCtid will be set to zero for the final state message. + final String lastCtidInState = (Objects.nonNull(lastCtid) + && StringUtils.isNotBlank(lastCtid)) ? lastCtid : Ctid.ZERO.toString(); + return new CtidStatus() .withVersion(CTID_STATUS_VERSION) .withStateType(StateType.CTID) - .withCtid(lastCtid) + .withCtid(lastCtidInState) .withIncrementalState(getStreamState(pair)) .withRelationFilenode(fileNode); - LOGGER.info("Emitting ctid state for stream {}, state is {}", pair, ctidStatus); - return createCtidStateMessage(pair, ctidStatus); } /** @@ -112,6 +127,7 @@ public boolean shouldEmitStateMessage(final ConfiguredAirbyteStream stream) { private JsonNode getStreamState(final AirbyteStreamNameNamespacePair pair) { final CtidStatus currentCtidStatus = getCtidStatus(pair); + return (currentCtidStatus == null || currentCtidStatus.getIncrementalState() == null) ? streamStateForIncrementalRunSupplier.apply(pair) : currentCtidStatus.getIncrementalState(); } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidUtils.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidUtils.java index e98d46025d4f..bd4ec9b00099 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidUtils.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidUtils.java @@ -4,9 +4,13 @@ package io.airbyte.integrations.source.postgres.ctid; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Sets; import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.source.postgres.PostgresQueryUtils; +import io.airbyte.integrations.source.postgres.PostgresQueryUtils.TableBlockSize; +import io.airbyte.integrations.source.postgres.cdc.PostgresCdcConnectorMetadataInjector; import io.airbyte.protocol.models.v0.AirbyteStateMessage; import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; @@ -14,6 +18,8 @@ import io.airbyte.protocol.models.v0.SyncMode; import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -25,14 +31,12 @@ public class CtidUtils { public static final int POSTGRESQL_VERSION_TID_RANGE_SCAN_CAPABLE = 14; public static List identifyNewlyAddedStreams(final ConfiguredAirbyteCatalog fullCatalog, - final Set alreadySeenStreams, - final SyncMode syncMode) { + final Set alreadySeenStreams) { final Set allStreams = AirbyteStreamNameNamespacePair.fromConfiguredCatalog(fullCatalog); final Set newlyAddedStreams = new HashSet<>(Sets.difference(allStreams, alreadySeenStreams)); return fullCatalog.getStreams().stream() - .filter(stream -> stream.getSyncMode() == syncMode) .filter(stream -> newlyAddedStreams.contains(AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream()))) .map(Jsons::clone) .collect(Collectors.toList()); @@ -75,4 +79,32 @@ public static boolean isTidRangeScanCapableDBServer(final JdbcDatabase database) return true; } + public static PostgresCtidHandler createInitialLoader(final JdbcDatabase database, + final List finalListOfStreamsToBeSyncedViaCtid, + final FileNodeHandler fileNodeHandler, + final String quoteString, + final CtidStateManager ctidStateManager, + Optional optionalMetadataInjector) { + final JsonNode sourceConfig = database.getSourceConfig(); + + final Map tableBlockSizes = + PostgresQueryUtils.getTableBlockSizeForStreams( + database, + finalListOfStreamsToBeSyncedViaCtid, + quoteString); + + final Map tablesMaxTuple = + CtidUtils.isTidRangeScanCapableDBServer(database) ? null + : PostgresQueryUtils.getTableMaxTupleForStreams(database, finalListOfStreamsToBeSyncedViaCtid, quoteString); + + return new PostgresCtidHandler(sourceConfig, + database, + new CtidPostgresSourceOperations(optionalMetadataInjector), + quoteString, + fileNodeHandler, + tableBlockSizes, + tablesMaxTuple, + ctidStateManager); + } + } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/PostgresCtidHandler.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/PostgresCtidHandler.java index 4454d7c4c8d7..1338984d4702 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/PostgresCtidHandler.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/PostgresCtidHandler.java @@ -12,6 +12,7 @@ import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.debezium.DebeziumIteratorConstants; import io.airbyte.cdk.integrations.source.relationaldb.DbSourceDiscoverUtil; +import io.airbyte.cdk.integrations.source.relationaldb.InitialLoadHandler; import io.airbyte.cdk.integrations.source.relationaldb.TableInfo; import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator; import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency; @@ -39,11 +40,11 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Function; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PostgresCtidHandler { +public class PostgresCtidHandler implements InitialLoadHandler { private static final Logger LOGGER = LoggerFactory.getLogger(PostgresCtidHandler.class); @@ -55,7 +56,6 @@ public class PostgresCtidHandler { private final FileNodeHandler fileNodeHandler; final Map tableBlockSizes; final Optional> tablesMaxTuple; - private final Function streamStateForIncrementalRunSupplier; private final boolean tidRangeScanCapableDBServer; public PostgresCtidHandler(final JsonNode config, @@ -65,8 +65,7 @@ public PostgresCtidHandler(final JsonNode config, final FileNodeHandler fileNodeHandler, final Map tableBlockSizes, final Map tablesMaxTuple, - final CtidStateManager ctidStateManager, - final Function streamStateForIncrementalRunSupplier) { + final CtidStateManager ctidStateManager) { this.config = config; this.database = database; this.sourceOperations = sourceOperations; @@ -75,10 +74,36 @@ public PostgresCtidHandler(final JsonNode config, this.tableBlockSizes = tableBlockSizes; this.tablesMaxTuple = Optional.ofNullable(tablesMaxTuple); this.ctidStateManager = ctidStateManager; - this.streamStateForIncrementalRunSupplier = streamStateForIncrementalRunSupplier; this.tidRangeScanCapableDBServer = CtidUtils.isTidRangeScanCapableDBServer(database); } + @Override + public AutoCloseableIterator getIteratorForStream(@NotNull ConfiguredAirbyteStream airbyteStream, + @NotNull TableInfo> table, + @NotNull Instant emittedAt) { + final AirbyteStream stream = airbyteStream.getStream(); + final String streamName = stream.getName(); + final String namespace = stream.getNamespace(); + final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(streamName, namespace); + + final List selectedDatabaseFields = table.getFields() + .stream() + .map(CommonField::getName) + .filter(CatalogHelpers.getTopLevelFieldNames(airbyteStream)::contains) + .toList(); + final AutoCloseableIterator queryStream = queryTableCtid( + selectedDatabaseFields, + table.getNameSpace(), + table.getName(), + tableBlockSizes.get(pair).tableSize(), + tableBlockSizes.get(pair).blockSize(), + tablesMaxTuple.orElseGet(() -> Map.of(pair, -1)).get(pair)); + final AutoCloseableIterator recordIterator = + getRecordIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli()); + final AutoCloseableIterator recordAndMessageIterator = augmentWithState(recordIterator, airbyteStream); + return augmentWithLogs(recordAndMessageIterator, pair, streamName); + } + public List> getInitialSyncCtidIterator( final ConfiguredAirbyteCatalog catalog, final Map>> tableNameToTable, @@ -88,7 +113,6 @@ public List> getInitialSyncCtidIterator( final AirbyteStream stream = airbyteStream.getStream(); final String streamName = stream.getName(); final String namespace = stream.getNamespace(); - final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(streamName, namespace); final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(namespace, streamName); if (!tableNameToTable.containsKey(fullyQualifiedTableName)) { LOGGER.info("Skipping stream {} because it is not in the source", fullyQualifiedTableName); @@ -98,24 +122,8 @@ public List> getInitialSyncCtidIterator( // Grab the selected fields to sync final TableInfo> table = tableNameToTable .get(fullyQualifiedTableName); - final List selectedDatabaseFields = table.getFields() - .stream() - .map(CommonField::getName) - .filter(CatalogHelpers.getTopLevelFieldNames(airbyteStream)::contains) - .toList(); - final AutoCloseableIterator queryStream = queryTableCtid( - selectedDatabaseFields, - table.getNameSpace(), - table.getName(), - tableBlockSizes.get(pair).tableSize(), - tableBlockSizes.get(pair).blockSize(), - tablesMaxTuple.orElseGet(() -> Map.of(pair, -1)).get(pair)); - final AutoCloseableIterator recordIterator = - getRecordIterator(queryStream, streamName, namespace, emmitedAt.toEpochMilli()); - final AutoCloseableIterator recordAndMessageIterator = augmentWithState(recordIterator, airbyteStream); - final AutoCloseableIterator logAugmented = augmentWithLogs(recordAndMessageIterator, pair, streamName); - iteratorList.add(logAugmented); - + final var iterator = getIteratorForStream(airbyteStream, table, emmitedAt); + iteratorList.add(iterator); } } return iteratorList; @@ -181,8 +189,6 @@ private AutoCloseableIterator augmentWithState(final AutoCloseab final Long syncCheckpointRecords = config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY) != null ? config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY).asLong() : DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS; - ctidStateManager.setStreamStateIteratorFields(streamStateForIncrementalRunSupplier, fileNodeHandler); - final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace()); return AutoCloseableIterators.transformIterator( diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cursor_based/CursorBasedCtidUtils.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cursor_based/CursorBasedCtidUtils.java index 3b679bc981f8..c4b284121f05 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cursor_based/CursorBasedCtidUtils.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cursor_based/CursorBasedCtidUtils.java @@ -80,10 +80,14 @@ public static StreamsCategorised categoriseStreams(final Sta } final List newlyAddedIncrementalStreams = - identifyNewlyAddedStreams(fullCatalog, alreadySeenStreamPairs, SyncMode.INCREMENTAL); + identifyNewlyAddedStreams(fullCatalog, alreadySeenStreamPairs); final List streamsForCtidSync = getStreamsFromStreamPairs(fullCatalog, stillInCtidStreamPairs, SyncMode.INCREMENTAL); + final List fullRefreshStreamsForCtidSync = + getStreamsFromStreamPairs(fullCatalog, stillInCtidStreamPairs, SyncMode.FULL_REFRESH); + final List streamsForCursorBasedSync = getStreamsFromStreamPairs(fullCatalog, cursorBasedSyncStreamPairs, SyncMode.INCREMENTAL); + streamsForCtidSync.addAll(fullRefreshStreamsForCtidSync); streamsForCtidSync.addAll(newlyAddedIncrementalStreams); return new StreamsCategorised<>(new CtidStreams(streamsForCtidSync, statesFromCtidSync), diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/xmin/XminCtidUtils.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/xmin/XminCtidUtils.java index d8f5857c4c2d..39fcd4e4085a 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/xmin/XminCtidUtils.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/xmin/XminCtidUtils.java @@ -83,7 +83,7 @@ public static StreamsCategorised categoriseStreams(final StateManag } final List newlyAddedIncrementalStreams = - identifyNewlyAddedStreams(fullCatalog, alreadySeenStreams, SyncMode.INCREMENTAL); + identifyNewlyAddedStreams(fullCatalog, alreadySeenStreams); final List streamsForCtidSync = new ArrayList<>(); fullCatalog.getStreams().stream() .filter(stream -> streamsStillInCtidSync.contains(AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream()))) diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java index 6f208fbca3b1..85f7f2c47635 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java @@ -101,6 +101,11 @@ protected PostgresSource source() { return new PostgresSource(); } + @Override + protected boolean supportResumableFullRefresh() { + return true; + } + @Override protected JsonNode config() { return testdb.testConfigBuilder() @@ -241,6 +246,10 @@ private void assertStateTypes(final List stateMes } } + @Override + @Test + protected void testCdcAndNonResumableFullRefreshInSameSync() throws Exception {} + @Override protected void assertStateMessagesForNewTableSnapshotTest(final List stateMessages, final AirbyteStateMessage stateMessageEmittedAfterFirstSyncCompletion) { diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java index dc69d2713f81..1bd837081a38 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java @@ -78,9 +78,15 @@ protected JsonNode config() { .build(); } + private PostgresSource postgresSource = null; + @Override protected PostgresSource source() { - return new PostgresSource(); + if (postgresSource != null) { + postgresSource.close(); + } + postgresSource = new PostgresSource(); + return postgresSource; } @Override diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceSSLTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceSSLTest.java index b4bad09de924..7a246017ff97 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceSSLTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceSSLTest.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.source.postgres; import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.createRecord; +import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.filterRecords; import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.map; import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.setEmittedAtToNull; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -122,7 +123,8 @@ void testReadSuccess() throws Exception { final Set actualMessages = MoreIterators.toSet(new PostgresSource().read(getConfig(), configuredCatalog, null)); setEmittedAtToNull(actualMessages); - assertEquals(ASCII_MESSAGES, actualMessages); + var actualRecordMessage = filterRecords(actualMessages); + assertEquals(ASCII_MESSAGES, actualRecordMessage); } @Test diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java index bfb5bad07639..17b2e8d3bbac 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java @@ -4,7 +4,9 @@ package io.airbyte.integrations.source.postgres; +import static io.airbyte.cdk.integrations.debezium.DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY; import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.createRecord; +import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.filterRecords; import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.map; import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.setEmittedAtToNull; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; @@ -150,10 +152,20 @@ void setup() { @AfterEach void tearDown() { testdb.close(); + if (postgresSource != null) { + postgresSource.close(); + } + postgresSource = null; } - public PostgresSource source() { - return new PostgresSource(); + private PostgresSource postgresSource = null; + + protected PostgresSource source() { + if (postgresSource != null) { + postgresSource.close(); + } + postgresSource = new PostgresSource(); + return postgresSource; } private static DSLContext getDslContextWithSpecifiedUser(final JsonNode config, final String username, final String password) { @@ -189,6 +201,7 @@ private JsonNode getConfig(final String dbName, final String user, final String .put(JdbcUtils.USERNAME_KEY, user) .put(JdbcUtils.PASSWORD_KEY, password) .put(JdbcUtils.SSL_KEY, false) + .put(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1) .build()); } @@ -213,7 +226,8 @@ public void testCanReadTablesAndColumnsWithDoubleQuotes() throws Exception { CatalogHelpers.toDefaultConfiguredCatalog(airbyteCatalog), null)); setEmittedAtToNull(actualMessages); - assertEquals(DOUBLE_QUOTED_MESSAGES, actualMessages); + final var actualRecordMessages = filterRecords(actualMessages); + assertEquals(DOUBLE_QUOTED_MESSAGES, actualRecordMessages); testdb.query(ctx -> ctx.execute("DROP TABLE \"\"\"test_dq_table\"\"\";")); } @@ -227,7 +241,8 @@ public void testCanReadUtf8() throws Exception { final var config = asciiTestDB.testConfigBuilder().withSchemas(SCHEMA_NAME).withoutSsl().build(); final Set actualMessages = MoreIterators.toSet(source().read(config, CONFIGURED_CATALOG, null)); setEmittedAtToNull(actualMessages); - assertEquals(UTF8_MESSAGES, actualMessages); + final var actualRecordMessages = filterRecords(actualMessages); + assertEquals(UTF8_MESSAGES, actualRecordMessages); } } @@ -263,8 +278,11 @@ void testUserDoesntHasPrivilegesToSelectTable() throws Exception { final Set actualMessages = MoreIterators.toSet(source().read(anotherUserConfig, CONFIGURED_CATALOG, null)); setEmittedAtToNull(actualMessages); - assertEquals(6, actualMessages.size()); - assertEquals(PRIVILEGE_TEST_CASE_EXPECTED_MESSAGES, actualMessages); + // expect 6 records and 3 state messages (view does not have its own state message because it goes + // to non resumable full refresh path). + assertEquals(9, actualMessages.size()); + final var actualRecordMessages = filterRecords(actualMessages); + assertEquals(PRIVILEGE_TEST_CASE_EXPECTED_MESSAGES, actualRecordMessages); } @Test @@ -432,8 +450,9 @@ void testReadSuccess() throws Exception { Collectors.toList())); final Set actualMessages = MoreIterators.toSet(source().read(getConfig(), configuredCatalog, null)); setEmittedAtToNull(actualMessages); + final var actualRecordMessages = filterRecords(actualMessages); - assertEquals(ASCII_MESSAGES, actualMessages); + assertEquals(ASCII_MESSAGES, actualRecordMessages); } @Test @@ -466,7 +485,7 @@ void testReadIncrementalSuccess() throws Exception { createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("3.0"), "name", "vegeta", "power", 222.1))); // Assert that the correct number of messages are emitted. - assertEquals(actualMessages.size(), expectedOutput.size() + 1); + assertEquals(actualMessages.size(), expectedOutput.size() + 3); assertThat(actualMessages.contains(expectedOutput)); // Assert that the Postgres source is emitting records & state messages in the correct order. assertCorrectRecordOrderForIncrementalSync(actualMessages, "id", JsonSchemaPrimitive.NUMBER, configuredCatalog, @@ -489,6 +508,214 @@ void testReadIncrementalSuccess() throws Exception { assertThat(nextSyncMessages.contains(createRecord(STREAM_NAME, SCHEMA_NAME, map("id", "5.0", "name", "piccolo", "power", 100.0)))); } + @Test + void testReadFullRefreshEmptyTable() throws Exception { + // Delete all data from id_and_name table. + testdb.query(ctx -> { + ctx.fetch("DELETE FROM id_and_name WHERE id = 'NaN';"); + ctx.fetch("DELETE FROM id_and_name WHERE id = '1';"); + ctx.fetch("DELETE FROM id_and_name WHERE id = '2';"); + return null; + }); + + final ConfiguredAirbyteCatalog configuredCatalog = + CONFIGURED_CATALOG + .withStreams(CONFIGURED_CATALOG.getStreams() + .stream() + .filter(s -> s.getStream().getName().equals(STREAM_NAME)) + .toList()); + final PostgresSource source = source(); + source.setStateEmissionFrequencyForDebug(1); + final List actualMessages = MoreIterators.toList(source.read(getConfig(), configuredCatalog, null)); + setEmittedAtToNull(actualMessages); + + final List stateAfterFirstBatch = extractStateMessage(actualMessages); + + setEmittedAtToNull(actualMessages); + + // Assert that the correct number of messages are emitted - final state message. + assertEquals(1, actualMessages.size()); + assertEquals(1, stateAfterFirstBatch.size()); + + AirbyteStateMessage stateMessage = stateAfterFirstBatch.get(0); + assertEquals("ctid", stateMessage.getStream().getStreamState().get("state_type").asText()); + assertEquals("(0,0)", stateMessage.getStream().getStreamState().get("ctid").asText()); + } + + @Test + void testReadFullRefreshSuccessWithSecondAttempt() throws Exception { + // We want to test ordering, so we can delete the NaN entry and add a 3. + testdb.query(ctx -> { + ctx.fetch("DELETE FROM id_and_name WHERE id = 'NaN';"); + ctx.fetch("INSERT INTO id_and_name (id, name, power) VALUES (3, 'gohan', 222.1);"); + return null; + }); + + final ConfiguredAirbyteCatalog configuredCatalog = + CONFIGURED_CATALOG + .withStreams(CONFIGURED_CATALOG.getStreams() + .stream() + .filter(s -> s.getStream().getName().equals(STREAM_NAME)) + .toList()); + final PostgresSource source = source(); + source.setStateEmissionFrequencyForDebug(1); + final List actualMessages = MoreIterators.toList(source.read(getConfig(), configuredCatalog, null)); + setEmittedAtToNull(actualMessages); + + final List stateAfterFirstBatch = extractStateMessage(actualMessages); + + setEmittedAtToNull(actualMessages); + + final Set expectedOutput = Sets.newHashSet( + createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("1.0"), "name", "goku", "power", null)), + createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("2.0"), "name", "vegeta", "power", 9000.1)), + createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("3.0"), "name", "vegeta", "power", 222.1))); + + // Assert that the correct number of messages are emitted. + assertEquals(expectedOutput.size() + 3, actualMessages.size()); + assertThat(actualMessages.contains(expectedOutput)); + // Assert that the Postgres source is emitting records & state messages in the correct order. + assertCorrectRecordOrderForIncrementalSync(actualMessages, "id", JsonSchemaPrimitive.NUMBER, configuredCatalog, + new AirbyteStreamNameNamespacePair("id_and_name", "public")); + + final AirbyteStateMessage lastEmittedState = stateAfterFirstBatch.get(stateAfterFirstBatch.size() - 1); + final JsonNode state = Jsons.jsonNode(List.of(lastEmittedState)); + + testdb.query(ctx -> { + ctx.fetch("INSERT INTO id_and_name (id, name, power) VALUES (5, 'piccolo', 100.0);"); + return null; + }); + // 2nd sync should reread state checkpoint mark and one new message (where id = '5.0') + final Set nextSyncMessages = + MoreIterators.toSet(source.read(getConfig(), configuredCatalog, state)); + setEmittedAtToNull(nextSyncMessages); + + // A state message is emitted, in addition to the new record messages. + assertEquals(nextSyncMessages.size(), 2); + assertThat(nextSyncMessages.contains(createRecord(STREAM_NAME, SCHEMA_NAME, map("id", "5.0", "name", "piccolo", "power", 100.0)))); + } + + @Test + void testReadFullRefreshSuccessWithSecondAttemptWithVacuum() throws Exception { + // We want to test ordering, so we can delete the NaN entry and add a 3. + testdb.query(ctx -> { + ctx.fetch("DELETE FROM id_and_name WHERE id = 'NaN';"); + ctx.fetch("INSERT INTO id_and_name (id, name, power) VALUES (3, 'gohan', 222.1);"); + return null; + }); + + final ConfiguredAirbyteCatalog configuredCatalog = + CONFIGURED_CATALOG + .withStreams(CONFIGURED_CATALOG.getStreams() + .stream() + .filter(s -> s.getStream().getName().equals(STREAM_NAME)) + .toList()); + final PostgresSource source = source(); + source.setStateEmissionFrequencyForDebug(1); + final List actualMessages = MoreIterators.toList(source.read(getConfig(), configuredCatalog, null)); + setEmittedAtToNull(actualMessages); + + final List stateAfterFirstBatch = extractStateMessage(actualMessages); + + setEmittedAtToNull(actualMessages); + + final Set expectedOutput = Sets.newHashSet( + createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("1.0"), "name", "goku", "power", null)), + createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("2.0"), "name", "vegeta", "power", 9000.1)), + createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("3.0"), "name", "vegeta", "power", 222.1))); + + // Assert that the correct number of messages are emitted. + assertEquals(expectedOutput.size() + 3, actualMessages.size()); + assertThat(actualMessages.contains(expectedOutput)); + // Assert that the Postgres source is emitting records & state messages in the correct order. + assertCorrectRecordOrderForIncrementalSync(actualMessages, "id", JsonSchemaPrimitive.NUMBER, configuredCatalog, + new AirbyteStreamNameNamespacePair("id_and_name", "public")); + + final AirbyteStateMessage lastEmittedState = stateAfterFirstBatch.get(stateAfterFirstBatch.size() - 1); + final JsonNode state = Jsons.jsonNode(List.of(lastEmittedState)); + + testdb.query(ctx -> { + ctx.fetch("VACUUM full id_and_name"); + ctx.fetch("INSERT INTO id_and_name (id, name, power) VALUES (5, 'piccolo', 100.0);"); + return null; + }); + // 2nd sync should reread state checkpoint mark and one new message (where id = '5.0') + final List nextSyncMessages = + MoreIterators.toList(source().read(getConfig(), configuredCatalog, state)); + setEmittedAtToNull(nextSyncMessages); + + // All record messages will be re-read. + assertEquals(8, nextSyncMessages.size()); + assertThat(nextSyncMessages.contains(createRecord(STREAM_NAME, SCHEMA_NAME, map("id", "5.0", "name", "piccolo", "power", 100.0)))); + assertThat(nextSyncMessages.contains(createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("3.0"), "name", "vegeta", "power", 222.1)))); + } + + @Test + void testReadIncrementalSuccessWithFullRefresh() throws Exception { + // We want to test ordering, so we can delete the NaN entry and add a 3. + testdb.query(ctx -> { + ctx.fetch("DELETE FROM id_and_name WHERE id = 'NaN';"); + ctx.fetch("INSERT INTO id_and_name (id, name, power) VALUES (3, 'gohan', 222.1);"); + ctx.fetch("DELETE FROM id_and_name2 WHERE id = 'NaN';"); + ctx.fetch("INSERT INTO id_and_name2 (id, name, power) VALUES (3, 'gohan', 222.1);"); + return null; + }); + + final ConfiguredAirbyteCatalog configuredCatalog = + CONFIGURED_INCR_CATALOG + .withStreams(List.of(CONFIGURED_INCR_CATALOG.getStreams().get(0), CONFIGURED_CATALOG.getStreams().get(1))); + final PostgresSource source = source(); + source.setStateEmissionFrequencyForDebug(1); + final List actualMessages = MoreIterators.toList(source.read(getConfig(), configuredCatalog, null)); + setEmittedAtToNull(actualMessages); + + final List stateAfterFirstBatch = extractStateMessage(actualMessages); + + setEmittedAtToNull(actualMessages); + + final Set expectedOutput = Sets.newHashSet( + createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("1.0"), "name", "goku", "power", null)), + createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("2.0"), "name", "vegeta", "power", 9000.1)), + createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("3.0"), "name", "vegeta", "power", 222.1))); + + // Assert that the correct number of messages are emitted. 6 for incremental streams, 6 for full + // refresh streams. + assertEquals(actualMessages.size(), 12); + assertThat(actualMessages.contains(expectedOutput)); + + // For per stream, platform will collect states for all streams and compose a new state. Thus, in + // the test since we want to reset full refresh, + // we need to get the last state for the "incremental stream", which is not necessarily the last + // state message of the batch. + final AirbyteStateMessage lastEmittedState = getLastStateMessageOfStream(stateAfterFirstBatch, STREAM_NAME); + + final JsonNode state = Jsons.jsonNode(List.of(lastEmittedState)); + + testdb.query(ctx -> { + ctx.fetch("INSERT INTO id_and_name (id, name, power) VALUES (5, 'piccolo', 100.0);"); + return null; + }); + // Incremental sync should only read one new message (where id = '5.0') + final List nextSyncMessages = + MoreIterators.toList(source().read(getConfig(), configuredCatalog, state)); + setEmittedAtToNull(nextSyncMessages); + + // Incremental stream: An extra state message is emitted, in addition to the record messages. + // Full refresh stream: expect 4 messages (3 records and 1 state) + // Thus, we expect 6 messages. + assertEquals(8, nextSyncMessages.size()); + assertThat(nextSyncMessages.contains(createRecord(STREAM_NAME, SCHEMA_NAME, map("id", "5.0", "name", "piccolo", "power", 100.0)))); + } + + private AirbyteStateMessage getLastStateMessageOfStream(List stateMessages, final String streamName) { + for (int i = stateMessages.size() - 1; i >= 0; i--) { + if (stateMessages.get(i).getStream().getStreamDescriptor().getName().equals(streamName)) { + return stateMessages.get(i); + } + } + throw new RuntimeException("stream not found in state message. stream name: " + streamName); + } + /* * The messages that are emitted from an incremental sync should follow certain invariants. They * should : (i) Be emitted in increasing order of the defined cursor. (ii) A record that is emitted @@ -728,12 +955,14 @@ public void testJdbcOptionsParameter() throws Exception { sourceConfig, CatalogHelpers.toDefaultConfiguredCatalog(airbyteCatalog), null)); - setEmittedAtToNull(actualMessages); + final var actualRecordMessages = filterRecords(actualMessages); + + setEmittedAtToNull(actualRecordMessages); // Check that the 'options' JDBC URL parameter was parsed correctly // and that the bytea value is not in the default 'hex' format. - assertEquals(1, actualMessages.size()); - final AirbyteMessage actualMessage = actualMessages.stream().findFirst().get(); + assertEquals(1, actualRecordMessages.size()); + final AirbyteMessage actualMessage = actualRecordMessages.stream().findFirst().get(); assertTrue(actualMessage.getRecord().getData().has("bytes")); assertEquals("\\336\\255\\276\\357", actualMessage.getRecord().getData().get("bytes").asText()); } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/cursor_based/CursorBasedCtidUtilsTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/cursor_based/CursorBasedCtidUtilsTest.java index 6bf511681823..c2fe98004530 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/cursor_based/CursorBasedCtidUtilsTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/cursor_based/CursorBasedCtidUtilsTest.java @@ -152,9 +152,8 @@ public void fullRefreshStreamCategorisationTest() { final StreamStateManager streamStateManager = new StreamStateManager(List.of(stream1CtidState, stream2StandardState), configuredCatalog); final StreamsCategorised streamsCategorised = categoriseStreams(streamStateManager, configuredCatalog); - assertEquals(streamsCategorised.ctidStreams().streamsForCtidSync().size(), 1); + assertEquals(streamsCategorised.ctidStreams().streamsForCtidSync().size(), 2); assertEquals(streamsCategorised.remainingStreams().streamsForCursorBasedSync().size(), 1); - assertEquals(streamsCategorised.ctidStreams().streamsForCtidSync().stream().findFirst().get(), STREAM_1); assertTrue(streamsCategorised.remainingStreams().streamsForCursorBasedSync().contains(STREAM_2)); assertFalse(streamsCategorised.remainingStreams().streamsForCursorBasedSync().contains(STREAM_3_FULL_REFRESH)); } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/utils/PostgresUnitTestsUtil.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/utils/PostgresUnitTestsUtil.java index 189705298cbd..563661d84bb3 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/utils/PostgresUnitTestsUtil.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/utils/PostgresUnitTestsUtil.java @@ -16,6 +16,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; public class PostgresUnitTestsUtil { @@ -80,6 +81,11 @@ public static List filterRecords(final List mess .collect(Collectors.toList()); } + public static Set filterRecords(final Set messages) { + return messages.stream().filter(r -> r.getType() == Type.RECORD) + .collect(Collectors.toSet()); + } + public static List extractSpecificFieldFromCombinedMessages(final List messages, final String streamName, final String field) { diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/xmin/XminCtidUtilsTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/xmin/XminCtidUtilsTest.java index 01a40e3b2bad..94cbb0514f89 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/xmin/XminCtidUtilsTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/xmin/XminCtidUtilsTest.java @@ -132,8 +132,7 @@ public void fullRefreshStreamCategorisationTest() { assertEquals(1, streamsCategorised.remainingStreams().statesFromXminSync().size()); assertEquals(xminState, streamsCategorised.remainingStreams().statesFromXminSync().get(0)); - assertEquals(1, streamsCategorised.ctidStreams().streamsForCtidSync().size()); - assertEquals(MODELS_STREAM_2, streamsCategorised.ctidStreams().streamsForCtidSync().get(0)); + assertEquals(2, streamsCategorised.ctidStreams().streamsForCtidSync().size()); assertEquals(1, streamsCategorised.ctidStreams().statesFromCtidSync().size()); assertEquals(ctidState, streamsCategorised.ctidStreams().statesFromCtidSync().get(0)); diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index 7f6b13fcc650..68c3e43e7b58 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -9,6 +9,8 @@ Airbyte's certified Postgres connector offers the following features: The contents below include a 'Quick Start' guide, advanced setup steps, and reference information (data type mapping, and changelogs). See [here](https://docs.airbyte.com/integrations/sources/postgres/postgres-troubleshooting) to troubleshooting issues with the Postgres connector. +**Please note the required minimum platform version is v0.58.0 for this connector.** + ![Airbyte Postgres Connection](https://raw.githubusercontent.com/airbytehq/airbyte/c078e8ed6703020a584d9362efa5665fbe8db77f/docs/integrations/sources/postgres/assets/airbyte_postgres_source.png?raw=true) ## Quick Start @@ -305,7 +307,8 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp ## Changelog | Version | Date | Pull Request | Subject | -| ------- | ---------- | -------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.4.0 | 2024-04-29 | [37112](https://github.com/airbytehq/airbyte/pull/37112) | resumeable full refresh. | | 3.3.33 | 2024-05-07 | [38030](https://github.com/airbytehq/airbyte/pull/38030) | Mark PG hot standby error as transient. | | 3.3.32 | 2024-04-30 | [37758](https://github.com/airbytehq/airbyte/pull/37758) | Correct previous release to disable debezium retries | | 3.3.31 | 2024-04-30 | [37754](https://github.com/airbytehq/airbyte/pull/37754) | Add CDC logs | diff --git a/docs/integrations/sources/postgres/postgres-troubleshooting.md b/docs/integrations/sources/postgres/postgres-troubleshooting.md index b28770c5d5a2..f66d06451a38 100644 --- a/docs/integrations/sources/postgres/postgres-troubleshooting.md +++ b/docs/integrations/sources/postgres/postgres-troubleshooting.md @@ -14,7 +14,7 @@ ### Version Requirements -- For Airbyte Open Source users, [upgrade](https://docs.airbyte.com/operator-guides/upgrading-airbyte/) your Airbyte platform to version `v0.40.0-alpha` or newer +- For Airbyte Open Source users, [upgrade](https://docs.airbyte.com/operator-guides/upgrading-airbyte/) your Airbyte platform to version `v0.58.0` or newer - Use Postgres v9.3.x or above for non-CDC workflows and Postgres v10 or above for CDC workflows - For Airbyte Cloud (and optionally for Airbyte Open Source), ensure SSL is enabled in your environment