Skip to content

Commit

Permalink
🐞 Postgres source: fix bug in intermediate state emission (#15496)
Browse files Browse the repository at this point in the history
* Rename record counter

* Rename method

* Emit intermediate state after all cursor records

* Emit intermediate state only when it is ready

* Merge two checks

* Add a testing message

* Fix unit tests

* Add one more testing record and add comments

* Add test case for multiple records with the same cursor value

* Revert irrelevant change

* Add explanation in javadoc

* Format code

* Rename testing methods

* Fix comment

* Bump version

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
tuliren and octavia-squidington-iii authored Aug 10, 2022
1 parent f540499 commit 2f17e99
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 1.0.0
dockerImageTag: 1.0.1
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7140,7 +7140,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:1.0.0"
- dockerImage: "airbyte/source-postgres:1.0.1"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.0
LABEL io.airbyte.version=1.0.1
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.0
LABEL io.airbyte.version=1.0.1
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import java.util.Iterator;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -27,10 +28,16 @@ public class StateDecoratingIterator extends AbstractIterator<AirbyteMessage> im
private final JsonSchemaPrimitive cursorType;
private final int stateEmissionFrequency;

private final String initialCursor;
private String maxCursor;
private AirbyteMessage intermediateStateMessage;
private boolean hasEmittedFinalState;
private int recordCount;

// The intermediateStateMessage is set to the latest state message.
// For every stateEmissionFrequency messages, emitIntermediateState is set to true and
// the latest intermediateStateMessage will be emitted.
private int totalRecordCount = 0;
private boolean emitIntermediateState = false;
private AirbyteMessage intermediateStateMessage = null;

/**
* @param stateEmissionFrequency If larger than 0, intermediate states will be emitted for every
Expand All @@ -49,6 +56,7 @@ public StateDecoratingIterator(final Iterator<AirbyteMessage> messageIterator,
this.pair = pair;
this.cursorField = cursorField;
this.cursorType = cursorType;
this.initialCursor = initialCursor;
this.maxCursor = initialCursor;
this.stateEmissionFrequency = stateEmissionFrequency;
}
Expand All @@ -60,36 +68,41 @@ private String getCursorCandidate(final AirbyteMessage message) {

@Override
protected AirbyteMessage computeNext() {
if (intermediateStateMessage != null) {
final AirbyteMessage message = intermediateStateMessage;
intermediateStateMessage = null;
return message;
} else if (messageIterator.hasNext()) {
recordCount++;
if (messageIterator.hasNext()) {
if (emitIntermediateState && intermediateStateMessage != null) {
final AirbyteMessage message = intermediateStateMessage;
intermediateStateMessage = null;
emitIntermediateState = false;
return message;
}

totalRecordCount++;
final AirbyteMessage message = messageIterator.next();
if (message.getRecord().getData().hasNonNull(cursorField)) {
final String cursorCandidate = getCursorCandidate(message);
if (IncrementalUtils.compareCursors(maxCursor, cursorCandidate, cursorType) < 0) {
if (stateEmissionFrequency > 0 && !Objects.equals(maxCursor, initialCursor) && messageIterator.hasNext()) {
// Only emit an intermediate state when it is not the first or last record message,
// because the last state message will be taken care of in a different branch.
intermediateStateMessage = createStateMessage(false);
}
maxCursor = cursorCandidate;
}
}

if (stateEmissionFrequency > 0 && recordCount % stateEmissionFrequency == 0) {
// Mark the state as final in case this intermediate state happens to be the last one.
// This is not necessary, but avoid sending the final states twice and prevent any edge case.
final boolean isFinalState = !messageIterator.hasNext();
intermediateStateMessage = emitStateMessage(isFinalState);
if (stateEmissionFrequency > 0 && totalRecordCount % stateEmissionFrequency == 0) {
emitIntermediateState = true;
}

return message;
} else if (!hasEmittedFinalState) {
return emitStateMessage(true);
return createStateMessage(true);
} else {
return endOfData();
}
}

public AirbyteMessage emitStateMessage(final boolean isFinalState) {
public AirbyteMessage createStateMessage(final boolean isFinalState) {
final AirbyteStateMessage stateMessage = stateManager.updateAndEmit(pair, maxCursor);
LOGGER.info("State Report: stream name: {}, original cursor field: {}, original cursor value {}, cursor field: {}, new cursor value: {}",
pair,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,18 @@ class StateDecoratingIteratorTest {
private static final AirbyteMessage RECORD_MESSAGE_2 = createRecordMessage(RECORD_VALUE_2);
private static final AirbyteMessage STATE_MESSAGE_2 = createStateMessage(RECORD_VALUE_2);

private static final String RECORD_VALUE_3 = "xyz";
private static final String RECORD_VALUE_3 = "ghi";
private static final AirbyteMessage RECORD_MESSAGE_3 = createRecordMessage(RECORD_VALUE_3);
private static final AirbyteMessage STATE_MESSAGE_3 = createStateMessage(RECORD_VALUE_3);

private static final String RECORD_VALUE_4 = "jkl";
private static final AirbyteMessage RECORD_MESSAGE_4 = createRecordMessage(RECORD_VALUE_4);
private static final AirbyteMessage STATE_MESSAGE_4 = createStateMessage(RECORD_VALUE_4);

private static final String RECORD_VALUE_5 = "xyz";
private static final AirbyteMessage RECORD_MESSAGE_5 = createRecordMessage(RECORD_VALUE_5);
private static final AirbyteMessage STATE_MESSAGE_5 = createStateMessage(RECORD_VALUE_5);

private static AirbyteMessage createRecordMessage(final String recordValue) {
return new AirbyteMessage()
.withType(Type.RECORD)
Expand All @@ -73,6 +81,8 @@ void setup() {
when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_1)).thenReturn(STATE_MESSAGE_1.getState());
when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_2)).thenReturn(STATE_MESSAGE_2.getState());
when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_3)).thenReturn(STATE_MESSAGE_3.getState());
when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_4)).thenReturn(STATE_MESSAGE_4.getState());
when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_5)).thenReturn(STATE_MESSAGE_5.getState());

when(stateManager.getOriginalCursorField(NAME_NAMESPACE_PAIR)).thenReturn(Optional.empty());
when(stateManager.getOriginalCursor(NAME_NAMESPACE_PAIR)).thenReturn(Optional.empty());
Expand Down Expand Up @@ -106,13 +116,13 @@ void testWithInitialCursor() {
stateManager,
NAME_NAMESPACE_PAIR,
UUID_FIELD_NAME,
RECORD_VALUE_3,
RECORD_VALUE_5,
JsonSchemaPrimitive.STRING,
0);

assertEquals(RECORD_MESSAGE_1, iterator.next());
assertEquals(RECORD_MESSAGE_2, iterator.next());
assertEquals(STATE_MESSAGE_3, iterator.next());
assertEquals(STATE_MESSAGE_5, iterator.next());
assertFalse(iterator.hasNext());
}

Expand Down Expand Up @@ -179,8 +189,8 @@ void testUnicodeNull() {

@Test
@DisplayName("When initial cursor is null, and emit state for every record")
void testStateEmission1() {
messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2, RECORD_MESSAGE_3);
void testStateEmissionFrequency1() {
messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2, RECORD_MESSAGE_3, RECORD_MESSAGE_4, RECORD_MESSAGE_5);
final StateDecoratingIterator iterator1 = new StateDecoratingIterator(
messageIterator,
stateManager,
Expand All @@ -191,19 +201,27 @@ void testStateEmission1() {
1);

assertEquals(RECORD_MESSAGE_1, iterator1.next());
assertEquals(STATE_MESSAGE_1, iterator1.next());
// should emit state 1, but it is unclear whether there will be more
// records with the same cursor value, so no state is ready for emission
assertEquals(RECORD_MESSAGE_2, iterator1.next());
assertEquals(STATE_MESSAGE_2, iterator1.next());
// emit state 1 because it is the latest state ready for emission
assertEquals(STATE_MESSAGE_1, iterator1.next());
assertEquals(RECORD_MESSAGE_3, iterator1.next());
// final state message should only be emitted once
assertEquals(STATE_MESSAGE_2, iterator1.next());
assertEquals(RECORD_MESSAGE_4, iterator1.next());
assertEquals(STATE_MESSAGE_3, iterator1.next());
assertEquals(RECORD_MESSAGE_5, iterator1.next());
// state 4 is not emitted because there is no more record and only
// the final state should be emitted at this point; also the final
// state should only be emitted once
assertEquals(STATE_MESSAGE_5, iterator1.next());
assertFalse(iterator1.hasNext());
}

@Test
@DisplayName("When initial cursor is null, and emit state for every 2 records")
void testStateEmission2() {
messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2, RECORD_MESSAGE_3);
void testStateEmissionFrequency2() {
messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2, RECORD_MESSAGE_3, RECORD_MESSAGE_4, RECORD_MESSAGE_5);
final StateDecoratingIterator iterator1 = new StateDecoratingIterator(
messageIterator,
stateManager,
Expand All @@ -215,16 +233,74 @@ void testStateEmission2() {

assertEquals(RECORD_MESSAGE_1, iterator1.next());
assertEquals(RECORD_MESSAGE_2, iterator1.next());
assertEquals(STATE_MESSAGE_2, iterator1.next());
// emit state 1 because it is the latest state ready for emission
assertEquals(STATE_MESSAGE_1, iterator1.next());
assertEquals(RECORD_MESSAGE_3, iterator1.next());
assertEquals(RECORD_MESSAGE_4, iterator1.next());
// emit state 3 because it is the latest state ready for emission
assertEquals(STATE_MESSAGE_3, iterator1.next());
assertEquals(RECORD_MESSAGE_5, iterator1.next());
assertEquals(STATE_MESSAGE_5, iterator1.next());
assertFalse(iterator1.hasNext());
}

@Test
@DisplayName("When initial cursor is not null")
void testStateEmission3() {
messageIterator = MoreIterators.of(RECORD_MESSAGE_2, RECORD_MESSAGE_3);
void testStateEmissionWhenInitialCursorIsNotNull() {
messageIterator = MoreIterators.of(RECORD_MESSAGE_2, RECORD_MESSAGE_3, RECORD_MESSAGE_4, RECORD_MESSAGE_5);
final StateDecoratingIterator iterator1 = new StateDecoratingIterator(
messageIterator,
stateManager,
NAME_NAMESPACE_PAIR,
UUID_FIELD_NAME,
RECORD_VALUE_1,
JsonSchemaPrimitive.STRING,
1);

assertEquals(RECORD_MESSAGE_2, iterator1.next());
assertEquals(RECORD_MESSAGE_3, iterator1.next());
assertEquals(STATE_MESSAGE_2, iterator1.next());
assertEquals(RECORD_MESSAGE_4, iterator1.next());
assertEquals(STATE_MESSAGE_3, iterator1.next());
assertEquals(RECORD_MESSAGE_5, iterator1.next());
assertEquals(STATE_MESSAGE_5, iterator1.next());
assertFalse(iterator1.hasNext());
}

/**
* Incremental syncs will sort the table with the cursor field, and emit the max cursor for every N
* records. The purpose is to emit the states frequently, so that if any transient failure occurs
* during a long sync, the next run does not need to start from the beginning, but can resume from
* the last successful intermediate state committed on the destination. The next run will start with
* `cursorField > cursor`. However, it is possible that there are multiple records with the same
* cursor value. If the intermediate state is emitted before all these records have been synced to
* the destination, some of these records may be lost.
* <p/>
* Here is an example:
*
* <pre>
* | Record ID | Cursor Field | Other Field | Note |
* | --------- | ------------ | ----------- | ----------------------------- |
* | 1 | F1=16 | F2="abc" | |
* | 2 | F1=16 | F2="def" | <- state emission and failure |
* | 3 | F1=16 | F2="ghi" | |
* </pre>
*
* If the intermediate state is emitted for record 2 and the sync fails immediately such that the
* cursor value `16` is committed, but only record 1 and 2 are actually synced, the next run will
* start with `F1 > 16` and skip record 3.
* <p/>
* So intermediate state emission should only happen when all records with the same cursor value has
* been synced to destination. Reference: https://github.com/airbytehq/airbyte/issues/15427
*/
@Test
@DisplayName("When there are multiple records with the same cursor value")
void testStateEmissionForRecordsSharingSameCursorValue() {
messageIterator = MoreIterators.of(
RECORD_MESSAGE_2, RECORD_MESSAGE_2,
RECORD_MESSAGE_3, RECORD_MESSAGE_3, RECORD_MESSAGE_3,
RECORD_MESSAGE_4,
RECORD_MESSAGE_5, RECORD_MESSAGE_5);
final StateDecoratingIterator iterator1 = new StateDecoratingIterator(
messageIterator,
stateManager,
Expand All @@ -235,9 +311,19 @@ void testStateEmission3() {
1);

assertEquals(RECORD_MESSAGE_2, iterator1.next());
assertEquals(RECORD_MESSAGE_2, iterator1.next());
assertEquals(RECORD_MESSAGE_3, iterator1.next());
// state 2 is the latest state ready for emission because
// all records with the same cursor value have been emitted
assertEquals(STATE_MESSAGE_2, iterator1.next());
assertEquals(RECORD_MESSAGE_3, iterator1.next());
assertEquals(RECORD_MESSAGE_3, iterator1.next());
assertEquals(RECORD_MESSAGE_4, iterator1.next());
assertEquals(STATE_MESSAGE_3, iterator1.next());
assertEquals(RECORD_MESSAGE_5, iterator1.next());
assertEquals(STATE_MESSAGE_4, iterator1.next());
assertEquals(RECORD_MESSAGE_5, iterator1.next());
assertEquals(STATE_MESSAGE_5, iterator1.next());
assertFalse(iterator1.hasNext());
}

Expand Down
6 changes: 4 additions & 2 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -371,18 +371,20 @@ Possible solutions include:

| Version | Date | Pull Request | Subject |
|:--------| :--- | :--- |:----------------------------------------------------------------------------------------------------------------|
| 1.0.1 | 2022-08-10 | [15496](https://github.com/airbytehq/airbyte/pull/15496) | Fix state emission in incremental sync |
| | 2022-08-10 | [15481](https://github.com/airbytehq/airbyte/pull/15481) | Fix data handling from WAL logs in CDC mode |
| 1.0.0 | 2022-08-05 | [15380](https://github.com/airbytehq/airbyte/pull/15380) | Change connector label to generally_available |
| 0.4.44 | 2022-08-05 | [15342](https://github.com/airbytehq/airbyte/pull/15342) | Adjust titles and descriptions in spec.json |
| 0.4.43 | 2022-08-03 | [15226](https://github.com/airbytehq/airbyte/pull/15226) | Make connectionTimeoutMs configurable through JDBC url parameters |
| 0.4.42 | 2022-08-03 | [15273](https://github.com/airbytehq/airbyte/pull/15273) | Fix a bug in `0.4.36` and correctly parse the CDC initial record waiting time |
| 0.4.41 | 2022-08-03 | [15077](https://github.com/airbytehq/airbyte/pull/15077) | Sync data from beginning if the LSN is no longer valid in CDC |
| | 2022-08-03 | [14903](https://github.com/airbytehq/airbyte/pull/14903) | Emit state messages more frequently |
| | 2022-08-03 | [14903](https://github.com/airbytehq/airbyte/pull/14903) | Emit state messages more frequently (⛔ this version has a bug; use `1.0.1` instead) |
| 0.4.40 | 2022-08-03 | [15187](https://github.com/airbytehq/airbyte/pull/15187) | Add support for BCE dates/timestamps |
| | 2022-08-03 | [14534](https://github.com/airbytehq/airbyte/pull/14534) | Align regular and CDC integration tests and data mappers |
| 0.4.39 | 2022-08-02 | [14801](https://github.com/airbytehq/airbyte/pull/14801) | Fix multiple log bindings |
| 0.4.38 | 2022-07-26 | [14362](https://github.com/airbytehq/airbyte/pull/14362) | Integral columns are now discovered as int64 fields. |
| 0.4.37 | 2022-07-22 | [14714](https://github.com/airbytehq/airbyte/pull/14714) | Clarified error message when invalid cursor column selected |
| 0.4.36 | 2022-07-21 | [14451](https://github.com/airbytehq/airbyte/pull/14451) | Make initial CDC waiting time configurable (⛔ this version has a bug and will not work; use `0.4.42` instead) |
| 0.4.36 | 2022-07-21 | [14451](https://github.com/airbytehq/airbyte/pull/14451) | Make initial CDC waiting time configurable (⛔ this version has a bug and will not work; use `0.4.42` instead) |
| 0.4.35 | 2022-07-14 | [14574](https://github.com/airbytehq/airbyte/pull/14574) | Removed additionalProperties:false from JDBC source connectors |
| 0.4.34 | 2022-07-17 | [13840](https://github.com/airbytehq/airbyte/pull/13840) | Added the ability to connect using different SSL modes and SSL certificates. |
| 0.4.33 | 2022-07-14 | [14586](https://github.com/airbytehq/airbyte/pull/14586) | Validate source JDBC url parameters |
Expand Down

0 comments on commit 2f17e99

Please sign in to comment.