From 5f515fea97191d9cc70db75524a201468e5653b9 Mon Sep 17 00:00:00 2001 From: Xiaohan Song Date: Wed, 14 Feb 2024 16:56:39 -0800 Subject: [PATCH 1/7] test fix attempt --- .../DebeziumStateDecoratingIterator.java | 1 + .../integrations/debezium/CdcSourceTest.java | 3 +++ .../connectors/source-postgres/build.gradle | 2 +- .../source/postgres/PostgresUtils.java | 2 +- .../CdcPostgresSourceLegacyCtidTest.java | 12 +++++++-- .../postgres/CdcPostgresSourceTest.java | 26 ++++++++++++++----- .../source/postgres/PostgresTestDatabase.java | 1 + 7 files changed, 37 insertions(+), 10 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumStateDecoratingIterator.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumStateDecoratingIterator.java index 71dfee610a3e..b330c69e22e4 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumStateDecoratingIterator.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumStateDecoratingIterator.java @@ -104,6 +104,7 @@ public DebeziumStateDecoratingIterator(final Iterator c this.syncCheckpointDuration = checkpointDuration; this.syncCheckpointRecords = checkpointRecords; this.previousCheckpointOffset = (HashMap) offsetManager.read(); + LOGGER.atInfo().log("offset previous: " + previousCheckpointOffset); this.initialOffset = new HashMap<>(this.previousCheckpointOffset); resetCheckpointValues(); } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java index 26544fe47fbd..1bbe190477d2 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java @@ -706,6 +706,7 @@ public void newTableSnapshotTest() throws Exception { } final JsonNode state2 = stateAfterSecondBatch.get(stateAfterSecondBatch.size() - 1).getData(); + LOGGER.atInfo().log("State2: " + stateAfterSecondBatch); final AutoCloseableIterator thirdBatchIterator = source() .read(config(), updatedCatalog, state2); final List dataFromThirdBatch = AutoCloseableIterators @@ -714,6 +715,8 @@ public void newTableSnapshotTest() throws Exception { final List stateAfterThirdBatch = extractStateMessages(dataFromThirdBatch); assertTrue(stateAfterThirdBatch.size() >= 1); + LOGGER.atInfo().log("state 3: " + stateAfterThirdBatch); + final AirbyteStateMessage stateMessageEmittedAfterThirdSyncCompletion = stateAfterThirdBatch.get(stateAfterThirdBatch.size() - 1); assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterThirdSyncCompletion.getType()); assertNotEquals(stateMessageEmittedAfterThirdSyncCompletion.getGlobal().getSharedState(), diff --git a/airbyte-integrations/connectors/source-postgres/build.gradle b/airbyte-integrations/connectors/source-postgres/build.gradle index d68232e1e409..48fb2c37a872 100644 --- a/airbyte-integrations/connectors/source-postgres/build.gradle +++ b/airbyte-integrations/connectors/source-postgres/build.gradle @@ -14,7 +14,7 @@ java { airbyteJavaConnector { cdkVersionRequired = '0.20.6' features = ['db-sources', 'datastore-postgres'] - useLocalCdk = false + useLocalCdk = true } application { diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java index bfd4903cef9f..7e0e76a48889 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java @@ -198,7 +198,7 @@ public static String prettyPrintConfiguredAirbyteStreamList(final List stateMessages, final int indexTillWhichExpectCtidState) { JsonNode sharedState = null; for (int i = 0; i < stateMessages.size(); i++) { @@ -196,7 +200,11 @@ private void assertStateTypes(final List stateMessages, fin if (Objects.isNull(sharedState)) { sharedState = global.getSharedState(); } else { - assertEquals(sharedState, global.getSharedState()); + // This validation is only true for versions on or after postgres 15. We execute EPHEMERAL_HEARTBEAT_CREATE_STATEMENTS for earlier versions of + // Postgres. See https://github.com/airbytehq/airbyte/pull/33605 for details. + if (getPostgresVersion() >= 15) { + assertEquals(sharedState, global.getSharedState()); + } } assertEquals(1, global.getStreamStates().size()); final AirbyteStreamState streamState = global.getStreamStates().get(0); @@ -324,7 +332,10 @@ public void testTwoStreamSync() throws Exception { if (Objects.isNull(sharedState)) { sharedState = global.getSharedState(); } else { - assertEquals(sharedState, global.getSharedState()); + // LSN will be advanced for postgres version before 15. See https://github.com/airbytehq/airbyte/pull/33605 + if (getPostgresVersion() >= 15) { + assertEquals(sharedState, global.getSharedState()); + } } if (Objects.isNull(firstStreamInState)) { @@ -736,6 +747,8 @@ protected void syncShouldIncrementLSN() throws Exception { final List dataFromFourthBatch = AutoCloseableIterators .toListAndClose(fourthBatchIterator); + System.out.println("states after 3rd batch: " + stateAfterThirdBatch); + final List stateAfterFourthBatch = extractStateMessages(dataFromFourthBatch); assertExpectedStateMessagesFromIncrementalSync(stateAfterFourthBatch); final Set recordsFromFourthBatch = extractRecordMessages( @@ -746,7 +759,7 @@ protected void syncShouldIncrementLSN() throws Exception { // Fourth sync should again move the replication slot ahead assertEquals(1, replicationSlotAfterFourthSync.compareTo(replicationSlotAfterThirdSync)); - assertEquals(1, recordsFromFourthBatch.size()); + assertEquals(1, recordsFromFourthBatch.size(), "all messages: " + dataFromFourthBatch); } protected void assertLsnPositionForSyncShouldIncrementLSN(final Long lsnPosition1, @@ -755,7 +768,10 @@ protected void assertLsnPositionForSyncShouldIncrementLSN(final Long lsnPosition if (syncNumber == 1) { assertEquals(1, lsnPosition2.compareTo(lsnPosition1)); } else if (syncNumber == 2) { - assertEquals(0, lsnPosition2.compareTo(lsnPosition1)); + // Earlier Postgres version will advance lsn even if there is no sync records. See https://github.com/airbytehq/airbyte/pull/33605. + if (getPostgresVersion() >= 15) { + assertEquals(0, lsnPosition2.compareTo(lsnPosition1)); + } } else { throw new RuntimeException("Unknown sync number " + syncNumber); } @@ -791,7 +807,6 @@ protected void verifyCheckpointStatesByRecords() throws Exception { .toListAndClose(secondBatchIterator); assertEquals(recordsToCreate, extractRecordMessages(dataFromSecondBatch).size()); final List stateMessagesCDC = extractStateMessages(dataFromSecondBatch); - assertTrue(stateMessagesCDC.size() > 1, "Generated only the final state."); assertEquals(stateMessagesCDC.size(), stateMessagesCDC.stream().distinct().count(), "There are duplicated states."); } @@ -830,7 +845,6 @@ protected void verifyCheckpointStatesBySeconds() throws Exception { assertEquals(recordsToCreate, extractRecordMessages(dataFromSecondBatch).size()); final List stateMessagesCDC = extractStateMessages(dataFromSecondBatch); - assertTrue(stateMessagesCDC.size() > 1, "Generated only the final state."); assertEquals(stateMessagesCDC.size(), stateMessagesCDC.stream().distinct().count(), "There are duplicated states."); } diff --git a/airbyte-integrations/connectors/source-postgres/src/testFixtures/java/io/airbyte/integrations/source/postgres/PostgresTestDatabase.java b/airbyte-integrations/connectors/source-postgres/src/testFixtures/java/io/airbyte/integrations/source/postgres/PostgresTestDatabase.java index 155b649e96a8..9bd332613f83 100644 --- a/airbyte-integrations/connectors/source-postgres/src/testFixtures/java/io/airbyte/integrations/source/postgres/PostgresTestDatabase.java +++ b/airbyte-integrations/connectors/source-postgres/src/testFixtures/java/io/airbyte/integrations/source/postgres/PostgresTestDatabase.java @@ -22,6 +22,7 @@ public class PostgresTestDatabase extends public static enum BaseImage { POSTGRES_16("postgres:16-bullseye"), + POSTGRES_13("postgres:13-bullseye"), POSTGRES_12("postgres:12-bullseye"), POSTGRES_9("postgres:9-alpine"), POSTGRES_SSL_DEV("marcosmarxm/postgres-ssl:dev"); From b67aa78828003869e07f30463473824d039be40a Mon Sep 17 00:00:00 2001 From: Xiaohan Song Date: Thu, 15 Feb 2024 10:11:30 -0800 Subject: [PATCH 2/7] clean up --- .../internals/DebeziumStateDecoratingIterator.java | 1 - .../cdk/integrations/debezium/CdcSourceTest.java | 5 +---- .../connectors/source-postgres/build.gradle | 2 +- .../integrations/source/postgres/PostgresUtils.java | 2 +- .../postgres/CdcPostgresSourceLegacyCtidTest.java | 13 +++++++++++++ .../source/postgres/CdcPostgresSourceTest.java | 13 +++++++------ 6 files changed, 23 insertions(+), 13 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumStateDecoratingIterator.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumStateDecoratingIterator.java index b330c69e22e4..71dfee610a3e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumStateDecoratingIterator.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumStateDecoratingIterator.java @@ -104,7 +104,6 @@ public DebeziumStateDecoratingIterator(final Iterator c this.syncCheckpointDuration = checkpointDuration; this.syncCheckpointRecords = checkpointRecords; this.previousCheckpointOffset = (HashMap) offsetManager.read(); - LOGGER.atInfo().log("offset previous: " + previousCheckpointOffset); this.initialOffset = new HashMap<>(this.previousCheckpointOffset); resetCheckpointValues(); } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java index 1bbe190477d2..10f41a160669 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java @@ -217,6 +217,7 @@ protected void writeRecords( idCol, makeIdCol, modelCol, recordJson.get(idCol).asInt(), recordJson.get(makeIdCol).asInt(), recordJson.get(modelCol).asText()); + } protected void deleteMessageOnIdCol(final String streamName, final String idCol, final int idValue) { @@ -662,7 +663,6 @@ public void newTableSnapshotTest() throws Exception { .read(config(), updatedCatalog, state); final List dataFromSecondBatch = AutoCloseableIterators .toListAndClose(secondBatchIterator); - final List stateAfterSecondBatch = extractStateMessages(dataFromSecondBatch); assertStateMessagesForNewTableSnapshotTest(stateAfterSecondBatch, stateMessageEmittedAfterFirstSyncCompletion); @@ -706,7 +706,6 @@ public void newTableSnapshotTest() throws Exception { } final JsonNode state2 = stateAfterSecondBatch.get(stateAfterSecondBatch.size() - 1).getData(); - LOGGER.atInfo().log("State2: " + stateAfterSecondBatch); final AutoCloseableIterator thirdBatchIterator = source() .read(config(), updatedCatalog, state2); final List dataFromThirdBatch = AutoCloseableIterators @@ -715,8 +714,6 @@ public void newTableSnapshotTest() throws Exception { final List stateAfterThirdBatch = extractStateMessages(dataFromThirdBatch); assertTrue(stateAfterThirdBatch.size() >= 1); - LOGGER.atInfo().log("state 3: " + stateAfterThirdBatch); - final AirbyteStateMessage stateMessageEmittedAfterThirdSyncCompletion = stateAfterThirdBatch.get(stateAfterThirdBatch.size() - 1); assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterThirdSyncCompletion.getType()); assertNotEquals(stateMessageEmittedAfterThirdSyncCompletion.getGlobal().getSharedState(), diff --git a/airbyte-integrations/connectors/source-postgres/build.gradle b/airbyte-integrations/connectors/source-postgres/build.gradle index 48fb2c37a872..d68232e1e409 100644 --- a/airbyte-integrations/connectors/source-postgres/build.gradle +++ b/airbyte-integrations/connectors/source-postgres/build.gradle @@ -14,7 +14,7 @@ java { airbyteJavaConnector { cdkVersionRequired = '0.20.6' features = ['db-sources', 'datastore-postgres'] - useLocalCdk = true + useLocalCdk = false } application { diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java index 7e0e76a48889..bfd4903cef9f 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java @@ -198,7 +198,7 @@ public static String prettyPrintConfiguredAirbyteStreamList(final List stateMessages, fin if (Objects.isNull(sharedState)) { sharedState = global.getSharedState(); } else { - // This validation is only true for versions on or after postgres 15. We execute EPHEMERAL_HEARTBEAT_CREATE_STATEMENTS for earlier versions of + // This validation is only true for versions on or after postgres 15. We execute + // EPHEMERAL_HEARTBEAT_CREATE_STATEMENTS for earlier versions of // Postgres. See https://github.com/airbytehq/airbyte/pull/33605 for details. if (getPostgresVersion() >= 15) { assertEquals(sharedState, global.getSharedState()); @@ -332,7 +333,8 @@ public void testTwoStreamSync() throws Exception { if (Objects.isNull(sharedState)) { sharedState = global.getSharedState(); } else { - // LSN will be advanced for postgres version before 15. See https://github.com/airbytehq/airbyte/pull/33605 + // LSN will be advanced for postgres version before 15. See + // https://github.com/airbytehq/airbyte/pull/33605 if (getPostgresVersion() >= 15) { assertEquals(sharedState, global.getSharedState()); } @@ -746,9 +748,7 @@ protected void syncShouldIncrementLSN() throws Exception { Jsons.jsonNode(Collections.singletonList(stateAfterThirdBatch.get(stateAfterThirdBatch.size() - 1)))); final List dataFromFourthBatch = AutoCloseableIterators .toListAndClose(fourthBatchIterator); - - System.out.println("states after 3rd batch: " + stateAfterThirdBatch); - + final List stateAfterFourthBatch = extractStateMessages(dataFromFourthBatch); assertExpectedStateMessagesFromIncrementalSync(stateAfterFourthBatch); final Set recordsFromFourthBatch = extractRecordMessages( @@ -768,7 +768,8 @@ protected void assertLsnPositionForSyncShouldIncrementLSN(final Long lsnPosition if (syncNumber == 1) { assertEquals(1, lsnPosition2.compareTo(lsnPosition1)); } else if (syncNumber == 2) { - // Earlier Postgres version will advance lsn even if there is no sync records. See https://github.com/airbytehq/airbyte/pull/33605. + // Earlier Postgres version will advance lsn even if there is no sync records. See + // https://github.com/airbytehq/airbyte/pull/33605. if (getPostgresVersion() >= 15) { assertEquals(0, lsnPosition2.compareTo(lsnPosition1)); } From b1177cd809634a36ac4110afa9046ede9a864560 Mon Sep 17 00:00:00 2001 From: Xiaohan Song Date: Thu, 15 Feb 2024 10:14:31 -0800 Subject: [PATCH 3/7] remove unrelated change --- .../io/airbyte/cdk/integrations/debezium/CdcSourceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java index 10f41a160669..26544fe47fbd 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java @@ -217,7 +217,6 @@ protected void writeRecords( idCol, makeIdCol, modelCol, recordJson.get(idCol).asInt(), recordJson.get(makeIdCol).asInt(), recordJson.get(modelCol).asText()); - } protected void deleteMessageOnIdCol(final String streamName, final String idCol, final int idValue) { @@ -663,6 +662,7 @@ public void newTableSnapshotTest() throws Exception { .read(config(), updatedCatalog, state); final List dataFromSecondBatch = AutoCloseableIterators .toListAndClose(secondBatchIterator); + final List stateAfterSecondBatch = extractStateMessages(dataFromSecondBatch); assertStateMessagesForNewTableSnapshotTest(stateAfterSecondBatch, stateMessageEmittedAfterFirstSyncCompletion); From ca3817c1cc1f011df2601581a8f6f1b78c9275ba Mon Sep 17 00:00:00 2001 From: Xiaohan Song Date: Thu, 15 Feb 2024 10:45:55 -0800 Subject: [PATCH 4/7] format fix --- .../source/postgres/CdcPostgresSourceLegacyCtidTest.java | 1 - .../integrations/source/postgres/CdcPostgresSourceTest.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceLegacyCtidTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceLegacyCtidTest.java index 1bb267da3a32..f6f8984c5389 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceLegacyCtidTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceLegacyCtidTest.java @@ -33,5 +33,4 @@ public void syncShouldIncrementLSN() { } - } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java index e4b074ebb1cf..5641ee251e86 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java @@ -748,7 +748,7 @@ protected void syncShouldIncrementLSN() throws Exception { Jsons.jsonNode(Collections.singletonList(stateAfterThirdBatch.get(stateAfterThirdBatch.size() - 1)))); final List dataFromFourthBatch = AutoCloseableIterators .toListAndClose(fourthBatchIterator); - + final List stateAfterFourthBatch = extractStateMessages(dataFromFourthBatch); assertExpectedStateMessagesFromIncrementalSync(stateAfterFourthBatch); final Set recordsFromFourthBatch = extractRecordMessages( From 29d8a255341b14890a7fc6b98deca73ac9b3059d Mon Sep 17 00:00:00 2001 From: Xiaohan Song Date: Thu, 15 Feb 2024 11:34:28 -0800 Subject: [PATCH 5/7] fix comments --- .../CdcPostgresSourceLegacyCtidTest.java | 15 ++++------- .../postgres/CdcPostgresSourceTest.java | 27 ++++++++++++------- .../source/postgres/PostgresTestDatabase.java | 17 +++++++----- 3 files changed, 34 insertions(+), 25 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceLegacyCtidTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceLegacyCtidTest.java index f6f8984c5389..53ae215c4da4 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceLegacyCtidTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceLegacyCtidTest.java @@ -5,30 +5,25 @@ package io.airbyte.integrations.source.postgres; import io.airbyte.integrations.source.postgres.PostgresTestDatabase.BaseImage; -import io.airbyte.integrations.source.postgres.PostgresTestDatabase.ContainerModifier; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Order; @Order(2) public class CdcPostgresSourceLegacyCtidTest extends CdcPostgresSourceTest { @Override - protected PostgresTestDatabase createTestDatabase() { - return PostgresTestDatabase.in(BaseImage.POSTGRES_13, ContainerModifier.CONF).withReplicationSlot(); + protected void setBaseImage() { + this.postgresImage = BaseImage.POSTGRES_12; } @Override - protected int getPostgresVersion() { - return 13; - } - - // TODO: https://github.com/airbytehq/airbyte/issues/35267 - // Fix the following tests. - @Override + @Disabled("https://github.com/airbytehq/airbyte/issues/35267") public void newTableSnapshotTest() { } @Override + @Disabled("https://github.com/airbytehq/airbyte/issues/35267") public void syncShouldIncrementLSN() { } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java index 5641ee251e86..7ddfa3afdc0f 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java @@ -73,9 +73,16 @@ @Order(1) public class CdcPostgresSourceTest extends CdcSourceTest { + protected BaseImage postgresImage; + + protected void setBaseImage() { + this.postgresImage = getServerImage(); + } + @Override protected PostgresTestDatabase createTestDatabase() { - return PostgresTestDatabase.in(getServerImage(), ContainerModifier.CONF).withReplicationSlot(); + setBaseImage(); + return PostgresTestDatabase.in(this.postgresImage, ContainerModifier.CONF).withReplicationSlot(); } @Override @@ -186,10 +193,6 @@ protected void assertExpectedStateMessagesForRecordsProducedDuringAndAfterSync(f assertStateTypes(stateAfterFirstBatch, 24); } - protected int getPostgresVersion() { - return 16; - } - private void assertStateTypes(final List stateMessages, final int indexTillWhichExpectCtidState) { JsonNode sharedState = null; for (int i = 0; i < stateMessages.size(); i++) { @@ -203,7 +206,7 @@ private void assertStateTypes(final List stateMessages, fin // This validation is only true for versions on or after postgres 15. We execute // EPHEMERAL_HEARTBEAT_CREATE_STATEMENTS for earlier versions of // Postgres. See https://github.com/airbytehq/airbyte/pull/33605 for details. - if (getPostgresVersion() >= 15) { + if (postgresImage.getMajorVersion() >= 15) { assertEquals(sharedState, global.getSharedState()); } } @@ -335,7 +338,7 @@ public void testTwoStreamSync() throws Exception { } else { // LSN will be advanced for postgres version before 15. See // https://github.com/airbytehq/airbyte/pull/33605 - if (getPostgresVersion() >= 15) { + if (postgresImage.getMajorVersion() >= 15) { assertEquals(sharedState, global.getSharedState()); } } @@ -759,7 +762,7 @@ protected void syncShouldIncrementLSN() throws Exception { // Fourth sync should again move the replication slot ahead assertEquals(1, replicationSlotAfterFourthSync.compareTo(replicationSlotAfterThirdSync)); - assertEquals(1, recordsFromFourthBatch.size(), "all messages: " + dataFromFourthBatch); + assertEquals(1, recordsFromFourthBatch.size()); } protected void assertLsnPositionForSyncShouldIncrementLSN(final Long lsnPosition1, @@ -770,7 +773,7 @@ protected void assertLsnPositionForSyncShouldIncrementLSN(final Long lsnPosition } else if (syncNumber == 2) { // Earlier Postgres version will advance lsn even if there is no sync records. See // https://github.com/airbytehq/airbyte/pull/33605. - if (getPostgresVersion() >= 15) { + if (postgresImage.getMajorVersion() >= 15) { assertEquals(0, lsnPosition2.compareTo(lsnPosition1)); } } else { @@ -808,6 +811,9 @@ protected void verifyCheckpointStatesByRecords() throws Exception { .toListAndClose(secondBatchIterator); assertEquals(recordsToCreate, extractRecordMessages(dataFromSecondBatch).size()); final List stateMessagesCDC = extractStateMessages(dataFromSecondBatch); + if (postgresImage.getMajorVersion() >= 15) { + assertTrue(stateMessagesCDC.size() > 1, "Generated only the final state."); + } assertEquals(stateMessagesCDC.size(), stateMessagesCDC.stream().distinct().count(), "There are duplicated states."); } @@ -846,6 +852,9 @@ protected void verifyCheckpointStatesBySeconds() throws Exception { assertEquals(recordsToCreate, extractRecordMessages(dataFromSecondBatch).size()); final List stateMessagesCDC = extractStateMessages(dataFromSecondBatch); + if (postgresImage.getMajorVersion() >= 15) { + assertTrue(stateMessagesCDC.size() > 1, "Generated only the final state."); + } assertEquals(stateMessagesCDC.size(), stateMessagesCDC.stream().distinct().count(), "There are duplicated states."); } diff --git a/airbyte-integrations/connectors/source-postgres/src/testFixtures/java/io/airbyte/integrations/source/postgres/PostgresTestDatabase.java b/airbyte-integrations/connectors/source-postgres/src/testFixtures/java/io/airbyte/integrations/source/postgres/PostgresTestDatabase.java index 9bd332613f83..00fd4d262b9d 100644 --- a/airbyte-integrations/connectors/source-postgres/src/testFixtures/java/io/airbyte/integrations/source/postgres/PostgresTestDatabase.java +++ b/airbyte-integrations/connectors/source-postgres/src/testFixtures/java/io/airbyte/integrations/source/postgres/PostgresTestDatabase.java @@ -21,18 +21,23 @@ public class PostgresTestDatabase extends public static enum BaseImage { - POSTGRES_16("postgres:16-bullseye"), - POSTGRES_13("postgres:13-bullseye"), - POSTGRES_12("postgres:12-bullseye"), - POSTGRES_9("postgres:9-alpine"), - POSTGRES_SSL_DEV("marcosmarxm/postgres-ssl:dev"); + POSTGRES_16("postgres:16-bullseye", 16), + POSTGRES_12("postgres:12-bullseye", 12), + POSTGRES_9("postgres:9-alpine", 9), + POSTGRES_SSL_DEV("marcosmarxm/postgres-ssl:dev", 16); private final String reference; + private final int majorVersion; - private BaseImage(String reference) { + private BaseImage(String reference, int majorVersion) { this.reference = reference; + this.majorVersion = majorVersion; }; + public int getMajorVersion() { + return majorVersion; + } + } public static enum ContainerModifier { From fd0a72bba7a3dd6635098561f4fdfe355cd785f8 Mon Sep 17 00:00:00 2001 From: Xiaohan Song Date: Thu, 15 Feb 2024 11:58:32 -0800 Subject: [PATCH 6/7] fix --- .../source/postgres/CdcPostgresSourceTest.java | 18 +++++++++++++----- .../source/postgres/PostgresTestDatabase.java | 8 ++------ 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java index 7ddfa3afdc0f..e9a6c317d659 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java @@ -108,6 +108,14 @@ protected void setup() { testdb.withPublicationForAllTables(); } + // For legacy Postgres we will call advanceLsn() after we retrieved target LSN, so that debezium + // would not drop any record. + // However, that might cause unexpected state and cause failure in the test. Thus we need to bypass some check if they are on legacy postgres + // versions. + private boolean isOnLegacyPostgres() { + return postgresImage.majorVersion < 15; + } + @Test void testDebugMode() { final JsonNode invalidDebugConfig = testdb.testConfigBuilder() @@ -206,7 +214,7 @@ private void assertStateTypes(final List stateMessages, fin // This validation is only true for versions on or after postgres 15. We execute // EPHEMERAL_HEARTBEAT_CREATE_STATEMENTS for earlier versions of // Postgres. See https://github.com/airbytehq/airbyte/pull/33605 for details. - if (postgresImage.getMajorVersion() >= 15) { + if (!isOnLegacyPostgres()) { assertEquals(sharedState, global.getSharedState()); } } @@ -338,7 +346,7 @@ public void testTwoStreamSync() throws Exception { } else { // LSN will be advanced for postgres version before 15. See // https://github.com/airbytehq/airbyte/pull/33605 - if (postgresImage.getMajorVersion() >= 15) { + if (!isOnLegacyPostgres()) { assertEquals(sharedState, global.getSharedState()); } } @@ -773,7 +781,7 @@ protected void assertLsnPositionForSyncShouldIncrementLSN(final Long lsnPosition } else if (syncNumber == 2) { // Earlier Postgres version will advance lsn even if there is no sync records. See // https://github.com/airbytehq/airbyte/pull/33605. - if (postgresImage.getMajorVersion() >= 15) { + if (!isOnLegacyPostgres()) { assertEquals(0, lsnPosition2.compareTo(lsnPosition1)); } } else { @@ -811,7 +819,7 @@ protected void verifyCheckpointStatesByRecords() throws Exception { .toListAndClose(secondBatchIterator); assertEquals(recordsToCreate, extractRecordMessages(dataFromSecondBatch).size()); final List stateMessagesCDC = extractStateMessages(dataFromSecondBatch); - if (postgresImage.getMajorVersion() >= 15) { + if (!isOnLegacyPostgres()) { assertTrue(stateMessagesCDC.size() > 1, "Generated only the final state."); } assertEquals(stateMessagesCDC.size(), stateMessagesCDC.stream().distinct().count(), "There are duplicated states."); @@ -852,7 +860,7 @@ protected void verifyCheckpointStatesBySeconds() throws Exception { assertEquals(recordsToCreate, extractRecordMessages(dataFromSecondBatch).size()); final List stateMessagesCDC = extractStateMessages(dataFromSecondBatch); - if (postgresImage.getMajorVersion() >= 15) { + if (!isOnLegacyPostgres()) { assertTrue(stateMessagesCDC.size() > 1, "Generated only the final state."); } assertEquals(stateMessagesCDC.size(), stateMessagesCDC.stream().distinct().count(), "There are duplicated states."); diff --git a/airbyte-integrations/connectors/source-postgres/src/testFixtures/java/io/airbyte/integrations/source/postgres/PostgresTestDatabase.java b/airbyte-integrations/connectors/source-postgres/src/testFixtures/java/io/airbyte/integrations/source/postgres/PostgresTestDatabase.java index 00fd4d262b9d..a86dbd39c351 100644 --- a/airbyte-integrations/connectors/source-postgres/src/testFixtures/java/io/airbyte/integrations/source/postgres/PostgresTestDatabase.java +++ b/airbyte-integrations/connectors/source-postgres/src/testFixtures/java/io/airbyte/integrations/source/postgres/PostgresTestDatabase.java @@ -26,18 +26,14 @@ public static enum BaseImage { POSTGRES_9("postgres:9-alpine", 9), POSTGRES_SSL_DEV("marcosmarxm/postgres-ssl:dev", 16); - private final String reference; - private final int majorVersion; + public final String reference; + public final int majorVersion; private BaseImage(String reference, int majorVersion) { this.reference = reference; this.majorVersion = majorVersion; }; - public int getMajorVersion() { - return majorVersion; - } - } public static enum ContainerModifier { From 771778d55c3b8ce6c04bf4e22780620034a8f8fa Mon Sep 17 00:00:00 2001 From: Xiaohan Song Date: Thu, 15 Feb 2024 15:17:33 -0800 Subject: [PATCH 7/7] format --- .../integrations/source/postgres/CdcPostgresSourceTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java index e9a6c317d659..551b81ac22f3 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java @@ -110,7 +110,8 @@ protected void setup() { // For legacy Postgres we will call advanceLsn() after we retrieved target LSN, so that debezium // would not drop any record. - // However, that might cause unexpected state and cause failure in the test. Thus we need to bypass some check if they are on legacy postgres + // However, that might cause unexpected state and cause failure in the test. Thus we need to bypass + // some check if they are on legacy postgres // versions. private boolean isOnLegacyPostgres() { return postgresImage.majorVersion < 15;