Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Source-Postgres] : Add config to throw an error on invalid CDC position #35304

Merged
merged 12 commits into from
Feb 20, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.3.10
dockerImageTag: 3.3.11
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.airbyte.integrations.source.postgres;

// Constants defined in airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json.
public class PostgresSpecConstants {
public static final String INVALID_CDC_CURSOR_POSITION_PROPERTY = "invalid_cdc_cursor_position_behavior";
public static final String FAIL_SYNC_OPTION = "Fail sync";
public static final String RESYNC_DATA_OPTION = "Re-sync data";

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcCursorInvalidMessage;
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.streamsUnderVacuum;
import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.FAIL_SYNC_OPTION;
import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY;
import static io.airbyte.integrations.source.postgres.PostgresUtils.isDebugMode;
import static io.airbyte.integrations.source.postgres.PostgresUtils.prettyPrintConfiguredAirbyteStreamList;

Expand Down Expand Up @@ -112,6 +114,10 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin

if (!savedOffsetAfterReplicationSlotLSN) {
AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage());
if (!sourceConfig.get("replication_method").has(INVALID_CDC_CURSOR_POSITION_PROPERTY) || sourceConfig.get("replication_method").get(
INVALID_CDC_CURSOR_POSITION_PROPERTY).asText().equals(FAIL_SYNC_OPTION)) {
throw new ConfigErrorException("Saved offset it before replication slot's confirmed lsn. Please increase WAL retention or reduce sync frequency. See https://docs.airbyte.com/integrations/sources/postgres/postgres-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo, s/Saved offset it/Saved offset is/

akashkulk marked this conversation as resolved.
Show resolved Hide resolved
}
LOGGER.warn("Saved offset is before Replication slot's confirmed_flush_lsn, Airbyte will trigger sync from scratch");
} else if (!isDebugMode(sourceConfig) && PostgresUtils.shouldFlushAfterSync(sourceConfig)) {
// We do not want to acknowledge the WAL logs in debug mode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,17 @@
"description": "Specifies a query that the connector executes on the source database when the connector sends a heartbeat message. Please see the <a href=\"https://docs.airbyte.com/integrations/sources/postgres/postgres-wal-disk-consumption-and-heartbeat-action-query\">setup guide</a> for how and when to configure this setting.",
"default": "",
"order": 8
},
"invalid_cdc_cursor_position_behavior": {
akashkulk marked this conversation as resolved.
Show resolved Hide resolved
"type": "string",
"title": "Invalid CDC position behavior (Advanced)",
"description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.",
"enum": [
"Fail sync",
"Re-sync data"
],
"default": "Fail sync",
"order": 9
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,17 @@
"description": "Specifies a query that the connector executes on the source database when the connector sends a heartbeat message. Please see the <a href=\"https://docs.airbyte.com/integrations/sources/postgres/postgres-wal-disk-consumption-and-heartbeat-action-query\">setup guide</a> for how and when to configure this setting.",
"default": "",
"order": 8
},
"invalid_cdc_cursor_position_behavior": {
"type": "string",
"title": "Invalid CDC position behavior (Advanced)",
"description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.",
"enum": [
"Fail sync",
"Re-sync data"
],
"default": "Fail sync",
"order": 9
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import static io.airbyte.cdk.integrations.debezium.internals.DebeziumEventConverter.CDC_DELETED_AT;
import static io.airbyte.cdk.integrations.debezium.internals.DebeziumEventConverter.CDC_LSN;
import static io.airbyte.cdk.integrations.debezium.internals.DebeziumEventConverter.CDC_UPDATED_AT;
import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.FAIL_SYNC_OPTION;
import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY;
import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.RESYNC_DATA_OPTION;
import static io.airbyte.integrations.source.postgres.ctid.CtidStateManager.STATE_TYPE_KEY;
import static io.airbyte.integrations.source.postgres.ctid.InitialSyncCtidIteratorConstants.USE_TEST_CHUNK_SIZE;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -88,7 +91,7 @@ protected JsonNode config() {
return testdb.testConfigBuilder()
.withSchemas(modelsSchema(), modelsSchema() + "_random")
.withoutSsl()
.withCdcReplication("After loading Data in the destination")
.withCdcReplication()
.with(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1)
.with("heartbeat_action_query", "")
.build();
Expand All @@ -106,7 +109,7 @@ void testDebugMode() {
final JsonNode invalidDebugConfig = testdb.testConfigBuilder()
.withSchemas(modelsSchema(), modelsSchema() + "_random")
.withoutSsl()
.withCdcReplication("While reading Data")
.withCdcReplication("While reading Data", RESYNC_DATA_OPTION)
.with(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1)
.with("debug_mode", true)
.build();
Expand Down Expand Up @@ -587,8 +590,9 @@ protected void syncShouldHandlePurgedLogsGracefully() throws Exception {
final JsonNode config = testdb.testConfigBuilder()
.withSchemas(modelsSchema(), modelsSchema() + "_random")
.withoutSsl()
.withCdcReplication()
.withCdcReplication("While reading Data", RESYNC_DATA_OPTION)
.with(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1)
.with(INVALID_CDC_CURSOR_POSITION_PROPERTY, RESYNC_DATA_OPTION)
.build();
final AutoCloseableIterator<AirbyteMessage> firstBatchIterator = source()
.read(config, getConfiguredCatalog(), null);
Expand Down Expand Up @@ -632,6 +636,47 @@ protected void syncShouldHandlePurgedLogsGracefully() throws Exception {
assertEquals(MODEL_RECORDS.size() + recordsToCreate + 1, recordsFromThirdBatch.size());
}

@Test
void testSyncShouldFailPurgedLogs() throws Exception {
final int recordsToCreate = 20;

final JsonNode config = testdb.testConfigBuilder()
.withSchemas(modelsSchema(), modelsSchema() + "_random")
.withoutSsl()
.withCdcReplication()
.with(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1)
.build();
final AutoCloseableIterator<AirbyteMessage> firstBatchIterator = source()
.read(config, getConfiguredCatalog(), null);
final List<AirbyteMessage> dataFromFirstBatch = AutoCloseableIterators
.toListAndClose(firstBatchIterator);
final List<AirbyteStateMessage> stateAfterFirstBatch = extractStateMessages(dataFromFirstBatch);
assertExpectedStateMessages(stateAfterFirstBatch);
// second batch of records again 20 being created
bulkInsertRecords(recordsToCreate);

// Extract the last state message
final JsonNode state = Jsons.jsonNode(Collections.singletonList(stateAfterFirstBatch.get(stateAfterFirstBatch.size() - 1)));
final AutoCloseableIterator<AirbyteMessage> secondBatchIterator = source()
.read(config, getConfiguredCatalog(), state);
final List<AirbyteMessage> dataFromSecondBatch = AutoCloseableIterators
.toListAndClose(secondBatchIterator);
final List<AirbyteStateMessage> stateAfterSecondBatch = extractStateMessages(dataFromSecondBatch);
assertExpectedStateMessagesFromIncrementalSync(stateAfterSecondBatch);

for (int recordsCreated = 0; recordsCreated < 1; recordsCreated++) {
final JsonNode record =
Jsons.jsonNode(ImmutableMap
.of(COL_ID, 400 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL,
"H-" + recordsCreated));
writeModelRecord(record);
}

// Triggering sync with the first sync's state only which would mimic a scenario that the second
// sync failed on destination end, and we didn't save state
assertThrows(ConfigErrorException.class, () -> source().read(config, getConfiguredCatalog(), state));
}

protected void assertStateForSyncShouldHandlePurgedLogsGracefully(final List<AirbyteStateMessage> stateMessages) {
assertEquals(28, stateMessages.size());
assertStateTypes(stateMessages, 25);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.integrations.source.postgres;

import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.FAIL_SYNC_OPTION;
import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY;

import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
Expand Down Expand Up @@ -172,10 +175,10 @@ public PostgresConfigBuilder withStandardReplication() {
}

public PostgresConfigBuilder withCdcReplication() {
return withCdcReplication("While reading Data");
return withCdcReplication("While reading Data", FAIL_SYNC_OPTION);
}

public PostgresConfigBuilder withCdcReplication(String LsnCommitBehaviour) {
public PostgresConfigBuilder withCdcReplication(String LsnCommitBehaviour, String cdcCursorInvalidBehaviour) {
return this
.with("is_test", true)
.with("replication_method", Jsons.jsonNode(ImmutableMap.builder()
Expand All @@ -184,6 +187,7 @@ public PostgresConfigBuilder withCdcReplication(String LsnCommitBehaviour) {
.put("publication", testDatabase.getPublicationName())
.put("initial_waiting_seconds", DEFAULT_CDC_REPLICATION_INITIAL_WAIT.getSeconds())
.put("lsn_commit_behaviour", LsnCommitBehaviour)
.put(INVALID_CDC_CURSOR_POSITION_PROPERTY, cdcCursorInvalidBehaviour)
.build()));
}

Expand Down
5 changes: 3 additions & 2 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,10 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.3.11 | 2024-02-20 | [35304](https://github.com/airbytehq/airbyte/pull/35304) | Add config to throw an error on invalid CDC position and enable it by default. |
| 3.3.10 | 2024-02-13 | [35036](https://github.com/airbytehq/airbyte/pull/34751) | Emit analytics message for invalid CDC cursor. |
| 3.3.9 | 2024-02-13 | [35224](https://github.com/airbytehq/airbyte/pull/35224) | Adopt CDK 0.20.4 |
| 3.3.8 | 2024-02-08 | [34751](https://github.com/airbytehq/airbyte/pull/34751) | Adopt CDK 0.19.0 |
| 3.3.9 | 2024-02-13 | [35224](https://github.com/airbytehq/airbyte/pull/35224) | Adopt CDK 0.20.4 |
| 3.3.8 | 2024-02-08 | [34751](https://github.com/airbytehq/airbyte/pull/34751) | Adopt CDK 0.19.0 |
| 3.3.7 | 2024-02-08 | [34781](https://github.com/airbytehq/airbyte/pull/34781) | Add a setting in the setup page to advance the LSN. |
| 3.3.6 | 2024-02-07 | [34892](https://github.com/airbytehq/airbyte/pull/34892) | Adopt CDK v0.16.6 |
| 3.3.5 | 2024-02-07 | [34948](https://github.com/airbytehq/airbyte/pull/34948) | Adopt CDK v0.16.5 |
Expand Down
Loading