From 2f17e99073427391419cc116ce7bd4ea7edd6f78 Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Wed, 10 Aug 2022 13:02:01 -0700 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9E=20Postgres=20source:=20fix=20bug?= =?UTF-8?q?=20in=20intermediate=20state=20emission=20(#15496)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Rename record counter * Rename method * Emit intermediate state after all cursor records * Emit intermediate state only when it is ready * Merge two checks * Add a testing message * Fix unit tests * Add one more testing record and add comments * Add test case for multiple records with the same cursor value * Revert irrelevant change * Add explanation in javadoc * Format code * Rename testing methods * Fix comment * Bump version * auto-bump connector version [ci skip] Co-authored-by: Octavia Squidington III --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../source-postgres-strict-encrypt/Dockerfile | 2 +- .../connectors/source-postgres/Dockerfile | 2 +- .../relationaldb/StateDecoratingIterator.java | 43 ++++--- .../StateDecoratingIteratorTest.java | 112 ++++++++++++++++-- docs/integrations/sources/postgres.md | 6 +- 7 files changed, 135 insertions(+), 34 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 444de55646cd..51501cd02aa1 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -762,7 +762,7 @@ - name: Postgres sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 dockerRepository: airbyte/source-postgres - dockerImageTag: 1.0.0 + dockerImageTag: 1.0.1 documentationUrl: https://docs.airbyte.io/integrations/sources/postgres icon: postgresql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 3ba04f127e8e..d5bf94d8c27d 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -7140,7 +7140,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-postgres:1.0.0" +- dockerImage: "airbyte/source-postgres:1.0.1" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile index 5cde4f85fd45..51f5cce85e18 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.0 +LABEL io.airbyte.version=1.0.1 LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index 9dfb9767a391..9dae6b7cff80 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.0 +LABEL io.airbyte.version=1.0.1 LABEL io.airbyte.name=airbyte/source-postgres diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java index d2880e26a3cd..12370d9468b6 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIterator.java @@ -13,6 +13,7 @@ import io.airbyte.protocol.models.AirbyteStateMessage; import io.airbyte.protocol.models.JsonSchemaPrimitive; import java.util.Iterator; +import java.util.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,10 +28,16 @@ public class StateDecoratingIterator extends AbstractIterator im private final JsonSchemaPrimitive cursorType; private final int stateEmissionFrequency; + private final String initialCursor; private String maxCursor; - private AirbyteMessage intermediateStateMessage; private boolean hasEmittedFinalState; - private int recordCount; + + // The intermediateStateMessage is set to the latest state message. + // For every stateEmissionFrequency messages, emitIntermediateState is set to true and + // the latest intermediateStateMessage will be emitted. + private int totalRecordCount = 0; + private boolean emitIntermediateState = false; + private AirbyteMessage intermediateStateMessage = null; /** * @param stateEmissionFrequency If larger than 0, intermediate states will be emitted for every @@ -49,6 +56,7 @@ public StateDecoratingIterator(final Iterator messageIterator, this.pair = pair; this.cursorField = cursorField; this.cursorType = cursorType; + this.initialCursor = initialCursor; this.maxCursor = initialCursor; this.stateEmissionFrequency = stateEmissionFrequency; } @@ -60,36 +68,41 @@ private String getCursorCandidate(final AirbyteMessage message) { @Override protected AirbyteMessage computeNext() { - if (intermediateStateMessage != null) { - final AirbyteMessage message = intermediateStateMessage; - intermediateStateMessage = null; - return message; - } else if (messageIterator.hasNext()) { - recordCount++; + if (messageIterator.hasNext()) { + if (emitIntermediateState && intermediateStateMessage != null) { + final AirbyteMessage message = intermediateStateMessage; + intermediateStateMessage = null; + emitIntermediateState = false; + return message; + } + + totalRecordCount++; final AirbyteMessage message = messageIterator.next(); if (message.getRecord().getData().hasNonNull(cursorField)) { final String cursorCandidate = getCursorCandidate(message); if (IncrementalUtils.compareCursors(maxCursor, cursorCandidate, cursorType) < 0) { + if (stateEmissionFrequency > 0 && !Objects.equals(maxCursor, initialCursor) && messageIterator.hasNext()) { + // Only emit an intermediate state when it is not the first or last record message, + // because the last state message will be taken care of in a different branch. + intermediateStateMessage = createStateMessage(false); + } maxCursor = cursorCandidate; } } - if (stateEmissionFrequency > 0 && recordCount % stateEmissionFrequency == 0) { - // Mark the state as final in case this intermediate state happens to be the last one. - // This is not necessary, but avoid sending the final states twice and prevent any edge case. - final boolean isFinalState = !messageIterator.hasNext(); - intermediateStateMessage = emitStateMessage(isFinalState); + if (stateEmissionFrequency > 0 && totalRecordCount % stateEmissionFrequency == 0) { + emitIntermediateState = true; } return message; } else if (!hasEmittedFinalState) { - return emitStateMessage(true); + return createStateMessage(true); } else { return endOfData(); } } - public AirbyteMessage emitStateMessage(final boolean isFinalState) { + public AirbyteMessage createStateMessage(final boolean isFinalState) { final AirbyteStateMessage stateMessage = stateManager.updateAndEmit(pair, maxCursor); LOGGER.info("State Report: stream name: {}, original cursor field: {}, original cursor value {}, cursor field: {}, new cursor value: {}", pair, diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java index 8f16a7d5a11f..474d553d3a1d 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java +++ b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/StateDecoratingIteratorTest.java @@ -45,10 +45,18 @@ class StateDecoratingIteratorTest { private static final AirbyteMessage RECORD_MESSAGE_2 = createRecordMessage(RECORD_VALUE_2); private static final AirbyteMessage STATE_MESSAGE_2 = createStateMessage(RECORD_VALUE_2); - private static final String RECORD_VALUE_3 = "xyz"; + private static final String RECORD_VALUE_3 = "ghi"; private static final AirbyteMessage RECORD_MESSAGE_3 = createRecordMessage(RECORD_VALUE_3); private static final AirbyteMessage STATE_MESSAGE_3 = createStateMessage(RECORD_VALUE_3); + private static final String RECORD_VALUE_4 = "jkl"; + private static final AirbyteMessage RECORD_MESSAGE_4 = createRecordMessage(RECORD_VALUE_4); + private static final AirbyteMessage STATE_MESSAGE_4 = createStateMessage(RECORD_VALUE_4); + + private static final String RECORD_VALUE_5 = "xyz"; + private static final AirbyteMessage RECORD_MESSAGE_5 = createRecordMessage(RECORD_VALUE_5); + private static final AirbyteMessage STATE_MESSAGE_5 = createStateMessage(RECORD_VALUE_5); + private static AirbyteMessage createRecordMessage(final String recordValue) { return new AirbyteMessage() .withType(Type.RECORD) @@ -73,6 +81,8 @@ void setup() { when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_1)).thenReturn(STATE_MESSAGE_1.getState()); when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_2)).thenReturn(STATE_MESSAGE_2.getState()); when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_3)).thenReturn(STATE_MESSAGE_3.getState()); + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_4)).thenReturn(STATE_MESSAGE_4.getState()); + when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_5)).thenReturn(STATE_MESSAGE_5.getState()); when(stateManager.getOriginalCursorField(NAME_NAMESPACE_PAIR)).thenReturn(Optional.empty()); when(stateManager.getOriginalCursor(NAME_NAMESPACE_PAIR)).thenReturn(Optional.empty()); @@ -106,13 +116,13 @@ void testWithInitialCursor() { stateManager, NAME_NAMESPACE_PAIR, UUID_FIELD_NAME, - RECORD_VALUE_3, + RECORD_VALUE_5, JsonSchemaPrimitive.STRING, 0); assertEquals(RECORD_MESSAGE_1, iterator.next()); assertEquals(RECORD_MESSAGE_2, iterator.next()); - assertEquals(STATE_MESSAGE_3, iterator.next()); + assertEquals(STATE_MESSAGE_5, iterator.next()); assertFalse(iterator.hasNext()); } @@ -179,8 +189,8 @@ void testUnicodeNull() { @Test @DisplayName("When initial cursor is null, and emit state for every record") - void testStateEmission1() { - messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2, RECORD_MESSAGE_3); + void testStateEmissionFrequency1() { + messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2, RECORD_MESSAGE_3, RECORD_MESSAGE_4, RECORD_MESSAGE_5); final StateDecoratingIterator iterator1 = new StateDecoratingIterator( messageIterator, stateManager, @@ -191,19 +201,27 @@ void testStateEmission1() { 1); assertEquals(RECORD_MESSAGE_1, iterator1.next()); - assertEquals(STATE_MESSAGE_1, iterator1.next()); + // should emit state 1, but it is unclear whether there will be more + // records with the same cursor value, so no state is ready for emission assertEquals(RECORD_MESSAGE_2, iterator1.next()); - assertEquals(STATE_MESSAGE_2, iterator1.next()); + // emit state 1 because it is the latest state ready for emission + assertEquals(STATE_MESSAGE_1, iterator1.next()); assertEquals(RECORD_MESSAGE_3, iterator1.next()); - // final state message should only be emitted once + assertEquals(STATE_MESSAGE_2, iterator1.next()); + assertEquals(RECORD_MESSAGE_4, iterator1.next()); assertEquals(STATE_MESSAGE_3, iterator1.next()); + assertEquals(RECORD_MESSAGE_5, iterator1.next()); + // state 4 is not emitted because there is no more record and only + // the final state should be emitted at this point; also the final + // state should only be emitted once + assertEquals(STATE_MESSAGE_5, iterator1.next()); assertFalse(iterator1.hasNext()); } @Test @DisplayName("When initial cursor is null, and emit state for every 2 records") - void testStateEmission2() { - messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2, RECORD_MESSAGE_3); + void testStateEmissionFrequency2() { + messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2, RECORD_MESSAGE_3, RECORD_MESSAGE_4, RECORD_MESSAGE_5); final StateDecoratingIterator iterator1 = new StateDecoratingIterator( messageIterator, stateManager, @@ -215,16 +233,74 @@ void testStateEmission2() { assertEquals(RECORD_MESSAGE_1, iterator1.next()); assertEquals(RECORD_MESSAGE_2, iterator1.next()); - assertEquals(STATE_MESSAGE_2, iterator1.next()); + // emit state 1 because it is the latest state ready for emission + assertEquals(STATE_MESSAGE_1, iterator1.next()); assertEquals(RECORD_MESSAGE_3, iterator1.next()); + assertEquals(RECORD_MESSAGE_4, iterator1.next()); + // emit state 3 because it is the latest state ready for emission assertEquals(STATE_MESSAGE_3, iterator1.next()); + assertEquals(RECORD_MESSAGE_5, iterator1.next()); + assertEquals(STATE_MESSAGE_5, iterator1.next()); assertFalse(iterator1.hasNext()); } @Test @DisplayName("When initial cursor is not null") - void testStateEmission3() { - messageIterator = MoreIterators.of(RECORD_MESSAGE_2, RECORD_MESSAGE_3); + void testStateEmissionWhenInitialCursorIsNotNull() { + messageIterator = MoreIterators.of(RECORD_MESSAGE_2, RECORD_MESSAGE_3, RECORD_MESSAGE_4, RECORD_MESSAGE_5); + final StateDecoratingIterator iterator1 = new StateDecoratingIterator( + messageIterator, + stateManager, + NAME_NAMESPACE_PAIR, + UUID_FIELD_NAME, + RECORD_VALUE_1, + JsonSchemaPrimitive.STRING, + 1); + + assertEquals(RECORD_MESSAGE_2, iterator1.next()); + assertEquals(RECORD_MESSAGE_3, iterator1.next()); + assertEquals(STATE_MESSAGE_2, iterator1.next()); + assertEquals(RECORD_MESSAGE_4, iterator1.next()); + assertEquals(STATE_MESSAGE_3, iterator1.next()); + assertEquals(RECORD_MESSAGE_5, iterator1.next()); + assertEquals(STATE_MESSAGE_5, iterator1.next()); + assertFalse(iterator1.hasNext()); + } + + /** + * Incremental syncs will sort the table with the cursor field, and emit the max cursor for every N + * records. The purpose is to emit the states frequently, so that if any transient failure occurs + * during a long sync, the next run does not need to start from the beginning, but can resume from + * the last successful intermediate state committed on the destination. The next run will start with + * `cursorField > cursor`. However, it is possible that there are multiple records with the same + * cursor value. If the intermediate state is emitted before all these records have been synced to + * the destination, some of these records may be lost. + *

+ * Here is an example: + * + *

+   * | Record ID | Cursor Field | Other Field | Note                          |
+   * | --------- | ------------ | ----------- | ----------------------------- |
+   * | 1         | F1=16        | F2="abc"    |                               |
+   * | 2         | F1=16        | F2="def"    | <- state emission and failure |
+   * | 3         | F1=16        | F2="ghi"    |                               |
+   * 
+ * + * If the intermediate state is emitted for record 2 and the sync fails immediately such that the + * cursor value `16` is committed, but only record 1 and 2 are actually synced, the next run will + * start with `F1 > 16` and skip record 3. + *

+ * So intermediate state emission should only happen when all records with the same cursor value has + * been synced to destination. Reference: https://github.com/airbytehq/airbyte/issues/15427 + */ + @Test + @DisplayName("When there are multiple records with the same cursor value") + void testStateEmissionForRecordsSharingSameCursorValue() { + messageIterator = MoreIterators.of( + RECORD_MESSAGE_2, RECORD_MESSAGE_2, + RECORD_MESSAGE_3, RECORD_MESSAGE_3, RECORD_MESSAGE_3, + RECORD_MESSAGE_4, + RECORD_MESSAGE_5, RECORD_MESSAGE_5); final StateDecoratingIterator iterator1 = new StateDecoratingIterator( messageIterator, stateManager, @@ -235,9 +311,19 @@ void testStateEmission3() { 1); assertEquals(RECORD_MESSAGE_2, iterator1.next()); + assertEquals(RECORD_MESSAGE_2, iterator1.next()); + assertEquals(RECORD_MESSAGE_3, iterator1.next()); + // state 2 is the latest state ready for emission because + // all records with the same cursor value have been emitted assertEquals(STATE_MESSAGE_2, iterator1.next()); assertEquals(RECORD_MESSAGE_3, iterator1.next()); + assertEquals(RECORD_MESSAGE_3, iterator1.next()); + assertEquals(RECORD_MESSAGE_4, iterator1.next()); assertEquals(STATE_MESSAGE_3, iterator1.next()); + assertEquals(RECORD_MESSAGE_5, iterator1.next()); + assertEquals(STATE_MESSAGE_4, iterator1.next()); + assertEquals(RECORD_MESSAGE_5, iterator1.next()); + assertEquals(STATE_MESSAGE_5, iterator1.next()); assertFalse(iterator1.hasNext()); } diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index 1f66efb72ac3..58e8a83b0b2c 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -371,18 +371,20 @@ Possible solutions include: | Version | Date | Pull Request | Subject | |:--------| :--- | :--- |:----------------------------------------------------------------------------------------------------------------| +| 1.0.1 | 2022-08-10 | [15496](https://github.com/airbytehq/airbyte/pull/15496) | Fix state emission in incremental sync | +| | 2022-08-10 | [15481](https://github.com/airbytehq/airbyte/pull/15481) | Fix data handling from WAL logs in CDC mode | | 1.0.0 | 2022-08-05 | [15380](https://github.com/airbytehq/airbyte/pull/15380) | Change connector label to generally_available | | 0.4.44 | 2022-08-05 | [15342](https://github.com/airbytehq/airbyte/pull/15342) | Adjust titles and descriptions in spec.json | | 0.4.43 | 2022-08-03 | [15226](https://github.com/airbytehq/airbyte/pull/15226) | Make connectionTimeoutMs configurable through JDBC url parameters | | 0.4.42 | 2022-08-03 | [15273](https://github.com/airbytehq/airbyte/pull/15273) | Fix a bug in `0.4.36` and correctly parse the CDC initial record waiting time | | 0.4.41 | 2022-08-03 | [15077](https://github.com/airbytehq/airbyte/pull/15077) | Sync data from beginning if the LSN is no longer valid in CDC | -| | 2022-08-03 | [14903](https://github.com/airbytehq/airbyte/pull/14903) | Emit state messages more frequently | +| | 2022-08-03 | [14903](https://github.com/airbytehq/airbyte/pull/14903) | Emit state messages more frequently (⛔ this version has a bug; use `1.0.1` instead) | | 0.4.40 | 2022-08-03 | [15187](https://github.com/airbytehq/airbyte/pull/15187) | Add support for BCE dates/timestamps | | | 2022-08-03 | [14534](https://github.com/airbytehq/airbyte/pull/14534) | Align regular and CDC integration tests and data mappers | | 0.4.39 | 2022-08-02 | [14801](https://github.com/airbytehq/airbyte/pull/14801) | Fix multiple log bindings | | 0.4.38 | 2022-07-26 | [14362](https://github.com/airbytehq/airbyte/pull/14362) | Integral columns are now discovered as int64 fields. | | 0.4.37 | 2022-07-22 | [14714](https://github.com/airbytehq/airbyte/pull/14714) | Clarified error message when invalid cursor column selected | -| 0.4.36 | 2022-07-21 | [14451](https://github.com/airbytehq/airbyte/pull/14451) | Make initial CDC waiting time configurable (⛔ this version has a bug and will not work; use `0.4.42` instead) | +| 0.4.36 | 2022-07-21 | [14451](https://github.com/airbytehq/airbyte/pull/14451) | Make initial CDC waiting time configurable (⛔ this version has a bug and will not work; use `0.4.42` instead) | | 0.4.35 | 2022-07-14 | [14574](https://github.com/airbytehq/airbyte/pull/14574) | Removed additionalProperties:false from JDBC source connectors | | 0.4.34 | 2022-07-17 | [13840](https://github.com/airbytehq/airbyte/pull/13840) | Added the ability to connect using different SSL modes and SSL certificates. | | 0.4.33 | 2022-07-14 | [14586](https://github.com/airbytehq/airbyte/pull/14586) | Validate source JDBC url parameters |