diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 0f4950cd47e4..8b6a3fa1b85a 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -166,6 +166,7 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.16.5 | 2024-02-07 | [\#34948](https://github.com/airbytehq/airbyte/pull/34948) | Fix source state stats counting logic | | 0.16.4 | 2024-02-01 | [\#34727](https://github.com/airbytehq/airbyte/pull/34727) | Add future based stdout consumer in BaseTypingDedupingTest | | 0.16.3 | 2024-01-30 | [\#34669](https://github.com/airbytehq/airbyte/pull/34669) | Fix org.apache.logging.log4j:log4j-slf4j-impl version conflicts. | | 0.16.2 | 2024-01-29 | [\#34630](https://github.com/airbytehq/airbyte/pull/34630) | expose NamingTransformer to sub-classes in destinations JdbcSqlGenerator. | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 76838e896863..b7e05db2aecd 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.16.4 +version=0.16.5 diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIterator.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIterator.java index 667a0ceb8152..919d38b3bb50 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIterator.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIterator.java @@ -188,6 +188,10 @@ protected AirbyteMessage computeNext() { protected final Optional getIntermediateMessage() { if (emitIntermediateState && intermediateStateMessage != null) { final AirbyteMessage message = intermediateStateMessage; + if (message.getState() != null) { + message.getState().setSourceStats(new AirbyteStateStats().withRecordCount((double) recordCountInStateMessage)); + } + intermediateStateMessage = null; recordCountInStateMessage = 0; emitIntermediateState = false; diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIteratorTest.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIteratorTest.java index e2d64f849748..d927faa3f502 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIteratorTest.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIteratorTest.java @@ -6,12 +6,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.cdk.integrations.source.relationaldb.models.DbState; +import io.airbyte.cdk.integrations.source.relationaldb.models.DbStreamState; import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager; +import io.airbyte.cdk.integrations.source.relationaldb.state.StreamStateManager; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.MoreIterators; import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil.JsonSchemaPrimitive; @@ -20,11 +20,15 @@ import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import io.airbyte.protocol.models.v0.AirbyteStateMessage; import io.airbyte.protocol.models.v0.AirbyteStateStats; +import io.airbyte.protocol.models.v0.AirbyteStream; import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.v0.AirbyteStreamState; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.v0.StreamDescriptor; import java.sql.SQLException; import java.util.Collections; import java.util.Iterator; -import java.util.Optional; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -37,27 +41,22 @@ class StateDecoratingIteratorTest { private static final AirbyteStreamNameNamespacePair NAME_NAMESPACE_PAIR = new AirbyteStreamNameNamespacePair(STREAM_NAME, NAMESPACE); private static final String UUID_FIELD_NAME = "ascending_inventory_uuid"; - private static final AirbyteMessage EMPTY_STATE_MESSAGE = new AirbyteMessage().withType(Type.STATE); + private static final AirbyteMessage EMPTY_STATE_MESSAGE = createEmptyStateMessage(0.0); private static final String RECORD_VALUE_1 = "abc"; private static final AirbyteMessage RECORD_MESSAGE_1 = createRecordMessage(RECORD_VALUE_1); - private static final AirbyteMessage STATE_MESSAGE_1 = createStateMessage(RECORD_VALUE_1); private static final String RECORD_VALUE_2 = "def"; private static final AirbyteMessage RECORD_MESSAGE_2 = createRecordMessage(RECORD_VALUE_2); - private static final AirbyteMessage STATE_MESSAGE_2 = createStateMessage(RECORD_VALUE_2); private static final String RECORD_VALUE_3 = "ghi"; private static final AirbyteMessage RECORD_MESSAGE_3 = createRecordMessage(RECORD_VALUE_3); - private static final AirbyteMessage STATE_MESSAGE_3 = createStateMessage(RECORD_VALUE_3); private static final String RECORD_VALUE_4 = "jkl"; private static final AirbyteMessage RECORD_MESSAGE_4 = createRecordMessage(RECORD_VALUE_4); - private static final AirbyteMessage STATE_MESSAGE_4 = createStateMessage(RECORD_VALUE_4); private static final String RECORD_VALUE_5 = "xyz"; private static final AirbyteMessage RECORD_MESSAGE_5 = createRecordMessage(RECORD_VALUE_5); - private static final AirbyteMessage STATE_MESSAGE_5 = createStateMessage(RECORD_VALUE_5); private static AirbyteMessage createRecordMessage(final String recordValue) { return new AirbyteMessage() @@ -66,12 +65,43 @@ private static AirbyteMessage createRecordMessage(final String recordValue) { .withData(Jsons.jsonNode(ImmutableMap.of(UUID_FIELD_NAME, recordValue)))); } - private static AirbyteMessage createStateMessage(final String recordValue) { + private static AirbyteMessage createStateMessage(final String recordValue, final long cursorRecordCount, final double statsRecordCount) { + final DbStreamState dbStreamState = new DbStreamState() + .withCursorField(Collections.singletonList(UUID_FIELD_NAME)) + .withCursor(recordValue) + .withStreamName(STREAM_NAME) + .withStreamNamespace(NAMESPACE); + if (cursorRecordCount > 0) { + dbStreamState.withCursorRecordCount(cursorRecordCount); + } + final DbState dbState = new DbState().withCdc(false).withStreams(Collections.singletonList(dbStreamState)); return new AirbyteMessage() .withType(Type.STATE) .withState(new AirbyteStateMessage() - .withData(Jsons.jsonNode(ImmutableMap.of("cursor", recordValue))) - .withSourceStats(new AirbyteStateStats().withRecordCount(1.0))); + .withType(AirbyteStateMessage.AirbyteStateType.STREAM) + .withStream(new AirbyteStreamState() + .withStreamDescriptor(new StreamDescriptor().withName(STREAM_NAME).withNamespace(NAMESPACE)) + .withStreamState(Jsons.jsonNode(dbStreamState))) + .withData(Jsons.jsonNode(dbState)) + .withSourceStats(new AirbyteStateStats().withRecordCount(statsRecordCount))); + } + + private static AirbyteMessage createEmptyStateMessage(final double statsRecordCount) { + final DbStreamState dbStreamState = new DbStreamState() + .withCursorField(Collections.singletonList(UUID_FIELD_NAME)) + .withStreamName(STREAM_NAME) + .withStreamNamespace(NAMESPACE); + + final DbState dbState = new DbState().withCdc(false).withStreams(Collections.singletonList(dbStreamState)); + return new AirbyteMessage() + .withType(Type.STATE) + .withState(new AirbyteStateMessage() + .withType(AirbyteStateMessage.AirbyteStateType.STREAM) + .withStream(new AirbyteStreamState() + .withStreamDescriptor(new StreamDescriptor().withName(STREAM_NAME).withNamespace(NAMESPACE)) + .withStreamState(Jsons.jsonNode(dbStreamState))) + .withData(Jsons.jsonNode(dbState)) + .withSourceStats(new AirbyteStateStats().withRecordCount(statsRecordCount))); } private Iterator createExceptionIterator() { @@ -105,15 +135,13 @@ public AirbyteMessage next() { @BeforeEach void setup() { - stateManager = mock(StateManager.class); - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, null, 0)).thenReturn(EMPTY_STATE_MESSAGE.getState()); - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_1, 1L)).thenReturn(STATE_MESSAGE_1.getState()); - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_2, 1L)).thenReturn(STATE_MESSAGE_2.getState()); - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_3, 1L)).thenReturn(STATE_MESSAGE_3.getState()); - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_4, 1L)).thenReturn(STATE_MESSAGE_4.getState()); - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_5, 1L)).thenReturn(STATE_MESSAGE_5.getState()); - - when(stateManager.getCursorInfo(NAME_NAMESPACE_PAIR)).thenReturn(Optional.empty()); + final AirbyteStream airbyteStream = new AirbyteStream().withNamespace(NAMESPACE).withName(STREAM_NAME); + final ConfiguredAirbyteStream configuredAirbyteStream = new ConfiguredAirbyteStream() + .withStream(airbyteStream) + .withCursorField(Collections.singletonList(UUID_FIELD_NAME)); + + stateManager = new StreamStateManager(Collections.emptyList(), + new ConfiguredAirbyteCatalog().withStreams(Collections.singletonList(configuredAirbyteStream))); } @Test @@ -130,7 +158,7 @@ void testWithoutInitialCursor() { assertEquals(RECORD_MESSAGE_1, iterator.next()); assertEquals(RECORD_MESSAGE_2, iterator.next()); - assertEquals(STATE_MESSAGE_2, iterator.next()); + assertEquals(createStateMessage(RECORD_VALUE_2, 1, 2.0), iterator.next()); assertFalse(iterator.hasNext()); } @@ -138,7 +166,6 @@ void testWithoutInitialCursor() { void testWithInitialCursor() { // record 1 and 2 has smaller cursor value, so at the end, the initial cursor is emitted with 0 // record count - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_5, 0L)).thenReturn(STATE_MESSAGE_5.getState()); messageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2); final StateDecoratingIterator iterator = new StateDecoratingIterator( @@ -152,7 +179,7 @@ void testWithInitialCursor() { assertEquals(RECORD_MESSAGE_1, iterator.next()); assertEquals(RECORD_MESSAGE_2, iterator.next()); - assertEquals(STATE_MESSAGE_5, iterator.next()); + assertEquals(createStateMessage(RECORD_VALUE_5, 0, 2.0), iterator.next()); assertFalse(iterator.hasNext()); } @@ -173,19 +200,13 @@ void testCursorFieldIsEmpty() { assertEquals(recordMessage, iterator.next()); // null because no records with a cursor field were replicated for the stream. - assertNull(iterator.next().getState()); + assertEquals(createEmptyStateMessage(1.0), iterator.next()); assertFalse(iterator.hasNext()); } @Test void testIteratorCatchesExceptionWhenEmissionFrequencyNonZero() { final Iterator exceptionIterator = createExceptionIterator(); - - // The mock record count matches the number of records returned by the exception iterator. - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_1, 1L)).thenReturn(STATE_MESSAGE_1.getState()); - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_2, 2L)).thenReturn(STATE_MESSAGE_2.getState()); - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_3, 1L)).thenReturn(STATE_MESSAGE_3.getState()); - final StateDecoratingIterator iterator = new StateDecoratingIterator( exceptionIterator, stateManager, @@ -202,7 +223,7 @@ void testIteratorCatchesExceptionWhenEmissionFrequencyNonZero() { assertEquals(RECORD_MESSAGE_3, iterator.next()); // emits the first state message since the iterator has changed cursorFields (2 -> 3) and met the // frequency minimum of 1 record - assertEquals(STATE_MESSAGE_2, iterator.next()); + assertEquals(createStateMessage(RECORD_VALUE_2, 2, 4.0), iterator.next()); // no further records to read since Exception was caught above and marked iterator as endOfData() assertFalse(iterator.hasNext()); } @@ -250,8 +271,6 @@ void testUnicodeNull() { final AirbyteMessage recordMessageWithNull = createRecordMessage(recordValueWithNull); // UTF8 null \u0000 is removed from the cursor value in the state message - final AirbyteMessage stateMessageWithNull = STATE_MESSAGE_1; - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, recordValueWithNull, 1L)).thenReturn(stateMessageWithNull.getState()); messageIterator = MoreIterators.of(recordMessageWithNull); @@ -265,7 +284,7 @@ void testUnicodeNull() { 0); assertEquals(recordMessageWithNull, iterator.next()); - assertEquals(stateMessageWithNull, iterator.next()); + assertEquals(createStateMessage(RECORD_VALUE_1, 1, 1.0), iterator.next()); assertFalse(iterator.hasNext()); } @@ -287,16 +306,16 @@ void testStateEmissionFrequency1() { // records with the same cursor value, so no state is ready for emission assertEquals(RECORD_MESSAGE_2, iterator1.next()); // emit state 1 because it is the latest state ready for emission - assertEquals(STATE_MESSAGE_1, iterator1.next()); + assertEquals(createStateMessage(RECORD_VALUE_1, 1, 2.0), iterator1.next()); assertEquals(RECORD_MESSAGE_3, iterator1.next()); - assertEquals(STATE_MESSAGE_2, iterator1.next()); + assertEquals(createStateMessage(RECORD_VALUE_2, 1, 1.0), iterator1.next()); assertEquals(RECORD_MESSAGE_4, iterator1.next()); - assertEquals(STATE_MESSAGE_3, iterator1.next()); + assertEquals(createStateMessage(RECORD_VALUE_3, 1, 1.0), iterator1.next()); assertEquals(RECORD_MESSAGE_5, iterator1.next()); // state 4 is not emitted because there is no more record and only // the final state should be emitted at this point; also the final // state should only be emitted once - assertEquals(STATE_MESSAGE_5, iterator1.next()); + assertEquals(createStateMessage(RECORD_VALUE_5, 1, 1.0), iterator1.next()); assertFalse(iterator1.hasNext()); } @@ -316,13 +335,13 @@ void testStateEmissionFrequency2() { assertEquals(RECORD_MESSAGE_1, iterator1.next()); assertEquals(RECORD_MESSAGE_2, iterator1.next()); // emit state 1 because it is the latest state ready for emission - assertEquals(STATE_MESSAGE_1, iterator1.next()); + assertEquals(createStateMessage(RECORD_VALUE_1, 1, 2.0), iterator1.next()); assertEquals(RECORD_MESSAGE_3, iterator1.next()); assertEquals(RECORD_MESSAGE_4, iterator1.next()); // emit state 3 because it is the latest state ready for emission - assertEquals(STATE_MESSAGE_3, iterator1.next()); + assertEquals(createStateMessage(RECORD_VALUE_3, 1, 2.0), iterator1.next()); assertEquals(RECORD_MESSAGE_5, iterator1.next()); - assertEquals(STATE_MESSAGE_5, iterator1.next()); + assertEquals(createStateMessage(RECORD_VALUE_5, 1, 1.0), iterator1.next()); assertFalse(iterator1.hasNext()); } @@ -341,11 +360,11 @@ void testStateEmissionWhenInitialCursorIsNotNull() { assertEquals(RECORD_MESSAGE_2, iterator1.next()); assertEquals(RECORD_MESSAGE_3, iterator1.next()); - assertEquals(STATE_MESSAGE_2, iterator1.next()); + assertEquals(createStateMessage(RECORD_VALUE_2, 1, 2.0), iterator1.next()); assertEquals(RECORD_MESSAGE_4, iterator1.next()); - assertEquals(STATE_MESSAGE_3, iterator1.next()); + assertEquals(createStateMessage(RECORD_VALUE_3, 1, 1.0), iterator1.next()); assertEquals(RECORD_MESSAGE_5, iterator1.next()); - assertEquals(STATE_MESSAGE_5, iterator1.next()); + assertEquals(createStateMessage(RECORD_VALUE_5, 1, 1.0), iterator1.next()); assertFalse(iterator1.hasNext()); } @@ -379,10 +398,6 @@ void testStateEmissionWhenInitialCursorIsNotNull() { @Test @DisplayName("When there are multiple records with the same cursor value") void testStateEmissionForRecordsSharingSameCursorValue() { - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_2, 2L)).thenReturn(STATE_MESSAGE_2.getState()); - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_3, 3L)).thenReturn(STATE_MESSAGE_3.getState()); - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_4, 1L)).thenReturn(STATE_MESSAGE_4.getState()); - when(stateManager.updateAndEmit(NAME_NAMESPACE_PAIR, RECORD_VALUE_5, 2L)).thenReturn(STATE_MESSAGE_5.getState()); messageIterator = MoreIterators.of( RECORD_MESSAGE_2, RECORD_MESSAGE_2, @@ -403,15 +418,50 @@ void testStateEmissionForRecordsSharingSameCursorValue() { assertEquals(RECORD_MESSAGE_3, iterator1.next()); // state 2 is the latest state ready for emission because // all records with the same cursor value have been emitted - assertEquals(STATE_MESSAGE_2, iterator1.next()); + assertEquals(createStateMessage(RECORD_VALUE_2, 2, 3.0), iterator1.next()); assertEquals(RECORD_MESSAGE_3, iterator1.next()); assertEquals(RECORD_MESSAGE_3, iterator1.next()); assertEquals(RECORD_MESSAGE_4, iterator1.next()); - assertEquals(STATE_MESSAGE_3, iterator1.next()); + assertEquals(createStateMessage(RECORD_VALUE_3, 3, 3.0), iterator1.next()); assertEquals(RECORD_MESSAGE_5, iterator1.next()); - assertEquals(STATE_MESSAGE_4, iterator1.next()); + assertEquals(createStateMessage(RECORD_VALUE_4, 1, 1.0), iterator1.next()); assertEquals(RECORD_MESSAGE_5, iterator1.next()); - assertEquals(STATE_MESSAGE_5, iterator1.next()); + assertEquals(createStateMessage(RECORD_VALUE_5, 2, 1.0), iterator1.next()); + assertFalse(iterator1.hasNext()); + } + + @Test + void testStateEmissionForRecordsSharingSameCursorValueButDifferentStatsCount() { + messageIterator = MoreIterators.of( + RECORD_MESSAGE_2, RECORD_MESSAGE_2, + RECORD_MESSAGE_2, RECORD_MESSAGE_2, + RECORD_MESSAGE_3, RECORD_MESSAGE_3, RECORD_MESSAGE_3, + RECORD_MESSAGE_3, + RECORD_MESSAGE_3, RECORD_MESSAGE_3, RECORD_MESSAGE_3); + final StateDecoratingIterator iterator1 = new StateDecoratingIterator( + messageIterator, + stateManager, + NAME_NAMESPACE_PAIR, + UUID_FIELD_NAME, + RECORD_VALUE_1, + JsonSchemaPrimitive.STRING, + 10); + + assertEquals(RECORD_MESSAGE_2, iterator1.next()); + assertEquals(RECORD_MESSAGE_2, iterator1.next()); + assertEquals(RECORD_MESSAGE_2, iterator1.next()); + assertEquals(RECORD_MESSAGE_2, iterator1.next()); + assertEquals(RECORD_MESSAGE_3, iterator1.next()); + assertEquals(RECORD_MESSAGE_3, iterator1.next()); + assertEquals(RECORD_MESSAGE_3, iterator1.next()); + assertEquals(RECORD_MESSAGE_3, iterator1.next()); + assertEquals(RECORD_MESSAGE_3, iterator1.next()); + assertEquals(RECORD_MESSAGE_3, iterator1.next()); + // state 2 is the latest state ready for emission because + // all records with the same cursor value have been emitted + assertEquals(createStateMessage(RECORD_VALUE_2, 4, 10.0), iterator1.next()); + assertEquals(RECORD_MESSAGE_3, iterator1.next()); + assertEquals(createStateMessage(RECORD_VALUE_3, 7, 1.0), iterator1.next()); assertFalse(iterator1.hasNext()); } diff --git a/airbyte-integrations/connectors/source-postgres/build.gradle b/airbyte-integrations/connectors/source-postgres/build.gradle index b6acdb87dc87..55f7ec9a3b05 100644 --- a/airbyte-integrations/connectors/source-postgres/build.gradle +++ b/airbyte-integrations/connectors/source-postgres/build.gradle @@ -13,7 +13,7 @@ java { } airbyteJavaConnector { - cdkVersionRequired = '0.16.3' + cdkVersionRequired = '0.16.5' features = ['db-sources'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-postgres/metadata.yaml b/airbyte-integrations/connectors/source-postgres/metadata.yaml index 34f03aaaccc8..53e6b5033932 100644 --- a/airbyte-integrations/connectors/source-postgres/metadata.yaml +++ b/airbyte-integrations/connectors/source-postgres/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 - dockerImageTag: 3.3.4 + dockerImageTag: 3.3.5 dockerRepository: airbyte/source-postgres documentationUrl: https://docs.airbyte.com/integrations/sources/postgres githubIssueLabel: source-postgres diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index 94d8a3805b36..b1c0e42651b6 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -292,6 +292,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | Version | Date | Pull Request | Subject | |---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.3.5 | 2024-02-07 | [34948](https://github.com/airbytehq/airbyte/pull/34948) | Adopt CDK v0.16.5 | | 3.3.4 | 2024-01-31 | [34723](https://github.com/airbytehq/airbyte/pull/34723) | Adopt CDK v0.16.3 | | 3.3.3 | 2024-01-26 | [34573](https://github.com/airbytehq/airbyte/pull/34573) | Adopt CDK v0.16.0 | | 3.3.2 | 2024-01-24 | [34465](https://github.com/airbytehq/airbyte/pull/34465) | Check xmin only if user selects xmin sync mode. |