From 6bb40d8ce7383b8b01644bb57d87dbe51246da0c Mon Sep 17 00:00:00 2001 From: jdpgrailsdev Date: Thu, 3 Aug 2023 10:08:58 -0400 Subject: [PATCH 1/4] Set source defined cursor on discovered catalog --- .../source/mongodb/internal/MongoUtil.java | 11 ++++++++++- .../source/mongodb/internal/MongoDbSourceTest.java | 4 ++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoUtil.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoUtil.java index ebd1beef81277..15c270b17b320 100644 --- a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoUtil.java +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoUtil.java @@ -27,6 +27,8 @@ public class MongoUtil { + public static final String DEFAULT_CURSOR_FIELD = "_id"; + /** * Set of collection prefixes that should be ignored when performing operations, such as discover to * avoid access issues. @@ -83,11 +85,18 @@ public static List getAirbyteStreams(final MongoClient mongoClien final Set authorizedCollections = getAuthorizedCollections(mongoClient, databaseName); authorizedCollections.parallelStream().forEach(collectionName -> { final List fields = getFieldsInCollection(mongoClient.getDatabase(databaseName).getCollection(collectionName)); - streams.add(CatalogHelpers.createAirbyteStream(collectionName, databaseName, fields)); + streams.add(createAirbyteStream(collectionName, databaseName, fields)); }); return streams; } + private static AirbyteStream createAirbyteStream(final String collectionName, final String databaseName, final List fields) { + return CatalogHelpers.createAirbyteStream(collectionName, databaseName, fields) + .withSourceDefinedCursor(true) + .withDefaultCursorField(List.of(DEFAULT_CURSOR_FIELD)) + .withSourceDefinedPrimaryKey(List.of(List.of(DEFAULT_CURSOR_FIELD))); + } + private static List getFieldsInCollection(final MongoCollection collection) { final Map fieldsMap = Map.of("input", Map.of("$objectToArray", "$$ROOT"), "as", "each", diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSourceTest.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSourceTest.java index fba770b133ed0..a9d51d8fefd23 100644 --- a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSourceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSourceTest.java @@ -29,6 +29,7 @@ import io.airbyte.protocol.models.v0.AirbyteStream; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import org.bson.Document; @@ -145,6 +146,9 @@ void testDiscoverOperation() throws IOException { assertEquals("number", stream.get().getJsonSchema().get("properties").get("price").get("type").asText()); assertEquals("array", stream.get().getJsonSchema().get("properties").get("items").get("type").asText()); assertEquals("object", stream.get().getJsonSchema().get("properties").get("owners").get("type").asText()); + assertEquals(true, stream.get().getSourceDefinedCursor()); + assertEquals(List.of(MongoUtil.DEFAULT_CURSOR_FIELD), stream.get().getDefaultCursorField()); + assertEquals(List.of(List.of(MongoUtil.DEFAULT_CURSOR_FIELD)), stream.get().getSourceDefinedPrimaryKey()); } @Test From 2b50f60302c216e68c95b58b44b8500815d65fc4 Mon Sep 17 00:00:00 2001 From: jdpgrailsdev Date: Thu, 3 Aug 2023 10:09:50 -0400 Subject: [PATCH 2/4] Formatting --- .../integrations/source/mongodb/internal/MongoUtil.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoUtil.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoUtil.java index 15c270b17b320..b1954594f2579 100644 --- a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoUtil.java +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoUtil.java @@ -92,9 +92,9 @@ public static List getAirbyteStreams(final MongoClient mongoClien private static AirbyteStream createAirbyteStream(final String collectionName, final String databaseName, final List fields) { return CatalogHelpers.createAirbyteStream(collectionName, databaseName, fields) - .withSourceDefinedCursor(true) - .withDefaultCursorField(List.of(DEFAULT_CURSOR_FIELD)) - .withSourceDefinedPrimaryKey(List.of(List.of(DEFAULT_CURSOR_FIELD))); + .withSourceDefinedCursor(true) + .withDefaultCursorField(List.of(DEFAULT_CURSOR_FIELD)) + .withSourceDefinedPrimaryKey(List.of(List.of(DEFAULT_CURSOR_FIELD))); } private static List getFieldsInCollection(final MongoCollection collection) { From 5e41482d9cd7b164be6f1f0301d9a06973d5d1aa Mon Sep 17 00:00:00 2001 From: jdpgrailsdev Date: Fri, 4 Aug 2023 10:09:48 -0400 Subject: [PATCH 3/4] Correct AirbyteStream settings for CDC --- .../source-mongodb-internal-poc/build.gradle | 2 + .../mongodb/internal/MongoCatalogHelper.java | 58 +++++++++++++++++++ .../source/mongodb/internal/MongoUtil.java | 10 ++-- .../internal/MongoCatalogHelperTest.java | 46 +++++++++++++++ .../mongodb/internal/MongoDbSourceTest.java | 24 +++++--- 5 files changed, 125 insertions(+), 15 deletions(-) create mode 100644 airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoCatalogHelper.java create mode 100644 airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoCatalogHelperTest.java diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/build.gradle b/airbyte-integrations/connectors/source-mongodb-internal-poc/build.gradle index a0d3a070b9635..1ef684dac2b0a 100644 --- a/airbyte-integrations/connectors/source-mongodb-internal-poc/build.gradle +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/build.gradle @@ -16,11 +16,13 @@ dependencies { implementation libs.jackson.databind implementation project(':airbyte-db:db-lib') implementation project(':airbyte-integrations:bases:base-java') + implementation project(':airbyte-integrations:bases:debezium') implementation libs.airbyte.protocol implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) implementation 'org.mongodb:mongodb-driver-sync:4.10.2' + testImplementation testFixtures(project(':airbyte-integrations:bases:debezium')) testImplementation "org.jetbrains.kotlinx:kotlinx-cli:0.3.5" integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test') diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoCatalogHelper.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoCatalogHelper.java new file mode 100644 index 0000000000000..5390690fda8f5 --- /dev/null +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoCatalogHelper.java @@ -0,0 +1,58 @@ +package io.airbyte.integrations.source.mongodb.internal; + +import io.airbyte.integrations.debezium.internals.DebeziumEventUtils; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaType; +import io.airbyte.protocol.models.v0.AirbyteCatalog; +import io.airbyte.protocol.models.v0.AirbyteStream; +import io.airbyte.protocol.models.v0.CatalogHelpers; +import io.airbyte.protocol.models.v0.SyncMode; + +import java.util.ArrayList; +import java.util.List; + +/** + * Collection of utility methods for generating the {@link AirbyteCatalog}. + */ +public class MongoCatalogHelper { + + /** + * The default cursor field name. + */ + public static final String DEFAULT_CURSOR_FIELD = "_id"; + + /** + * The list of supported sync modes for a given stream. + */ + public static final List SUPPORTED_SYNC_MODES = List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL); + + /** + * Builds an {@link AirbyteStream} with the correct configuration for this source. + * + * @param streamName The name of the stream. + * @param streamNamespace The namespace of the stream. + * @param fields The fields associated with the stream. + * @return The configured {@link AirbyteStream} for this source. + */ + public static AirbyteStream buildAirbyteStream(final String streamName, final String streamNamespace, final List fields) { + return CatalogHelpers.createAirbyteStream(streamName, streamNamespace, addCdcMetadataColumns(fields)) + .withSupportedSyncModes(SUPPORTED_SYNC_MODES) + .withSourceDefinedCursor(true) + .withDefaultCursorField(List.of(DEFAULT_CURSOR_FIELD)) + .withSourceDefinedPrimaryKey(List.of(List.of(DEFAULT_CURSOR_FIELD))); + } + + /** + * Adds the metadata columns required to use CDC to the list of discovered fields. + * + * @param fields The list of discovered fields. + * @return The modified list of discovered fields that includes the required CDC metadata columns. + */ + public static List addCdcMetadataColumns(final List fields) { + final List modifiedFields = new ArrayList<>(fields); + modifiedFields.add(new Field(DebeziumEventUtils.CDC_LSN, JsonSchemaType.NUMBER)); + modifiedFields.add(new Field(DebeziumEventUtils.CDC_UPDATED_AT, JsonSchemaType.STRING)); + modifiedFields.add(new Field(DebeziumEventUtils.CDC_DELETED_AT, JsonSchemaType.STRING)); + return modifiedFields; + } +} diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoUtil.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoUtil.java index b1954594f2579..114227bfedea3 100644 --- a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoUtil.java +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoUtil.java @@ -4,6 +4,7 @@ package io.airbyte.integrations.source.mongodb.internal; +import com.google.common.collect.Lists; import com.mongodb.MongoCommandException; import com.mongodb.MongoException; import com.mongodb.MongoSecurityException; @@ -23,12 +24,12 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; + +import io.airbyte.protocol.models.v0.SyncMode; import org.bson.Document; public class MongoUtil { - public static final String DEFAULT_CURSOR_FIELD = "_id"; - /** * Set of collection prefixes that should be ignored when performing operations, such as discover to * avoid access issues. @@ -91,10 +92,7 @@ public static List getAirbyteStreams(final MongoClient mongoClien } private static AirbyteStream createAirbyteStream(final String collectionName, final String databaseName, final List fields) { - return CatalogHelpers.createAirbyteStream(collectionName, databaseName, fields) - .withSourceDefinedCursor(true) - .withDefaultCursorField(List.of(DEFAULT_CURSOR_FIELD)) - .withSourceDefinedPrimaryKey(List.of(List.of(DEFAULT_CURSOR_FIELD))); + return MongoCatalogHelper.buildAirbyteStream(collectionName, databaseName, fields); } private static List getFieldsInCollection(final MongoCollection collection) { diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoCatalogHelperTest.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoCatalogHelperTest.java new file mode 100644 index 0000000000000..c1d67a11f6a35 --- /dev/null +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoCatalogHelperTest.java @@ -0,0 +1,46 @@ +package io.airbyte.integrations.source.mongodb.internal; + +import io.airbyte.integrations.debezium.internals.DebeziumEventUtils; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaType; +import io.airbyte.protocol.models.v0.AirbyteStream; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static io.airbyte.integrations.source.mongodb.internal.MongoCatalogHelper.DEFAULT_CURSOR_FIELD; +import static io.airbyte.integrations.source.mongodb.internal.MongoCatalogHelper.SUPPORTED_SYNC_MODES; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class MongoCatalogHelperTest { + + @Test + void testBuildingAirbyteStream() { + final String streamName = "name"; + final String streamNamespace = "namespace"; + final List discoveredFields = List.of(new Field("field1", JsonSchemaType.STRING), + new Field("field2", JsonSchemaType.NUMBER)); + + final AirbyteStream airbyteStream = MongoCatalogHelper.buildAirbyteStream(streamName, streamNamespace, discoveredFields); + + assertNotNull(airbyteStream); + assertEquals(streamNamespace, airbyteStream.getNamespace()); + assertEquals(streamName, airbyteStream.getName()); + assertEquals(List.of(DEFAULT_CURSOR_FIELD), airbyteStream.getDefaultCursorField()); + assertEquals(true, airbyteStream.getSourceDefinedCursor()); + assertEquals(List.of(List.of(DEFAULT_CURSOR_FIELD)), airbyteStream.getSourceDefinedPrimaryKey()); + assertEquals(SUPPORTED_SYNC_MODES, airbyteStream.getSupportedSyncModes()); + assertEquals(5, airbyteStream.getJsonSchema().get("properties").size()); + + discoveredFields.forEach(f -> assertTrue(airbyteStream.getJsonSchema().get("properties").has(f.getName()))); + assertTrue(airbyteStream.getJsonSchema().get("properties").has(DebeziumEventUtils.CDC_LSN)); + assertEquals(JsonSchemaType.NUMBER.getJsonSchemaTypeMap().get("type"), airbyteStream.getJsonSchema().get("properties").get(DebeziumEventUtils.CDC_LSN).get("type").asText()); + assertTrue(airbyteStream.getJsonSchema().get("properties").has(DebeziumEventUtils.CDC_DELETED_AT)); + assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"), airbyteStream.getJsonSchema().get("properties").get(DebeziumEventUtils.CDC_DELETED_AT).get("type").asText()); + assertTrue(airbyteStream.getJsonSchema().get("properties").has(DebeziumEventUtils.CDC_UPDATED_AT)); + assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"), airbyteStream.getJsonSchema().get("properties").get(DebeziumEventUtils.CDC_UPDATED_AT).get("type").asText()); + + } +} diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSourceTest.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSourceTest.java index a9d51d8fefd23..c33bda18cc3d4 100644 --- a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSourceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSourceTest.java @@ -24,6 +24,8 @@ import com.mongodb.connection.ClusterType; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; +import io.airbyte.integrations.debezium.internals.DebeziumEventUtils; +import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.protocol.models.v0.AirbyteCatalog; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; import io.airbyte.protocol.models.v0.AirbyteStream; @@ -139,16 +141,20 @@ void testDiscoverOperation() throws IOException { assertTrue(stream.isPresent()); assertEquals(DB_NAME, stream.get().getNamespace()); assertEquals("testCollection", stream.get().getName()); - assertEquals("string", stream.get().getJsonSchema().get("properties").get("_id").get("type").asText()); - assertEquals("string", stream.get().getJsonSchema().get("properties").get("name").get("type").asText()); - assertEquals("string", stream.get().getJsonSchema().get("properties").get("last_updated").get("type").asText()); - assertEquals("number", stream.get().getJsonSchema().get("properties").get("total").get("type").asText()); - assertEquals("number", stream.get().getJsonSchema().get("properties").get("price").get("type").asText()); - assertEquals("array", stream.get().getJsonSchema().get("properties").get("items").get("type").asText()); - assertEquals("object", stream.get().getJsonSchema().get("properties").get("owners").get("type").asText()); + assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"), stream.get().getJsonSchema().get("properties").get("_id").get("type").asText()); + assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"), stream.get().getJsonSchema().get("properties").get("name").get("type").asText()); + assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"), stream.get().getJsonSchema().get("properties").get("last_updated").get("type").asText()); + assertEquals(JsonSchemaType.NUMBER.getJsonSchemaTypeMap().get("type"), stream.get().getJsonSchema().get("properties").get("total").get("type").asText()); + assertEquals(JsonSchemaType.NUMBER.getJsonSchemaTypeMap().get("type"), stream.get().getJsonSchema().get("properties").get("price").get("type").asText()); + assertEquals(JsonSchemaType.ARRAY.getJsonSchemaTypeMap().get("type"), stream.get().getJsonSchema().get("properties").get("items").get("type").asText()); + assertEquals(JsonSchemaType.OBJECT.getJsonSchemaTypeMap().get("type"), stream.get().getJsonSchema().get("properties").get("owners").get("type").asText()); + assertEquals(JsonSchemaType.NUMBER.getJsonSchemaTypeMap().get("type"), stream.get().getJsonSchema().get("properties").get(DebeziumEventUtils.CDC_LSN).get("type").asText()); + assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"), stream.get().getJsonSchema().get("properties").get(DebeziumEventUtils.CDC_DELETED_AT).get("type").asText()); + assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"), stream.get().getJsonSchema().get("properties").get(DebeziumEventUtils.CDC_UPDATED_AT).get("type").asText()); assertEquals(true, stream.get().getSourceDefinedCursor()); - assertEquals(List.of(MongoUtil.DEFAULT_CURSOR_FIELD), stream.get().getDefaultCursorField()); - assertEquals(List.of(List.of(MongoUtil.DEFAULT_CURSOR_FIELD)), stream.get().getSourceDefinedPrimaryKey()); + assertEquals(List.of(MongoCatalogHelper.DEFAULT_CURSOR_FIELD), stream.get().getDefaultCursorField()); + assertEquals(List.of(List.of(MongoCatalogHelper.DEFAULT_CURSOR_FIELD)), stream.get().getSourceDefinedPrimaryKey()); + assertEquals(MongoCatalogHelper.SUPPORTED_SYNC_MODES, stream.get().getSupportedSyncModes()); } @Test From ca34f573f5471f311cd67780a83c7ea2be569d08 Mon Sep 17 00:00:00 2001 From: jdpgrailsdev Date: Fri, 4 Aug 2023 10:10:29 -0400 Subject: [PATCH 4/4] Formatting --- .../mongodb/internal/MongoCatalogHelper.java | 84 ++++++++++--------- .../source/mongodb/internal/MongoUtil.java | 4 - .../internal/MongoCatalogHelperTest.java | 77 +++++++++-------- .../mongodb/internal/MongoDbSourceTest.java | 30 ++++--- 4 files changed, 106 insertions(+), 89 deletions(-) diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoCatalogHelper.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoCatalogHelper.java index 5390690fda8f5..feb21dd85f383 100644 --- a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoCatalogHelper.java +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoCatalogHelper.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.source.mongodb.internal; import io.airbyte.integrations.debezium.internals.DebeziumEventUtils; @@ -7,7 +11,6 @@ import io.airbyte.protocol.models.v0.AirbyteStream; import io.airbyte.protocol.models.v0.CatalogHelpers; import io.airbyte.protocol.models.v0.SyncMode; - import java.util.ArrayList; import java.util.List; @@ -16,43 +19,44 @@ */ public class MongoCatalogHelper { - /** - * The default cursor field name. - */ - public static final String DEFAULT_CURSOR_FIELD = "_id"; - - /** - * The list of supported sync modes for a given stream. - */ - public static final List SUPPORTED_SYNC_MODES = List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL); - - /** - * Builds an {@link AirbyteStream} with the correct configuration for this source. - * - * @param streamName The name of the stream. - * @param streamNamespace The namespace of the stream. - * @param fields The fields associated with the stream. - * @return The configured {@link AirbyteStream} for this source. - */ - public static AirbyteStream buildAirbyteStream(final String streamName, final String streamNamespace, final List fields) { - return CatalogHelpers.createAirbyteStream(streamName, streamNamespace, addCdcMetadataColumns(fields)) - .withSupportedSyncModes(SUPPORTED_SYNC_MODES) - .withSourceDefinedCursor(true) - .withDefaultCursorField(List.of(DEFAULT_CURSOR_FIELD)) - .withSourceDefinedPrimaryKey(List.of(List.of(DEFAULT_CURSOR_FIELD))); - } - - /** - * Adds the metadata columns required to use CDC to the list of discovered fields. - * - * @param fields The list of discovered fields. - * @return The modified list of discovered fields that includes the required CDC metadata columns. - */ - public static List addCdcMetadataColumns(final List fields) { - final List modifiedFields = new ArrayList<>(fields); - modifiedFields.add(new Field(DebeziumEventUtils.CDC_LSN, JsonSchemaType.NUMBER)); - modifiedFields.add(new Field(DebeziumEventUtils.CDC_UPDATED_AT, JsonSchemaType.STRING)); - modifiedFields.add(new Field(DebeziumEventUtils.CDC_DELETED_AT, JsonSchemaType.STRING)); - return modifiedFields; - } + /** + * The default cursor field name. + */ + public static final String DEFAULT_CURSOR_FIELD = "_id"; + + /** + * The list of supported sync modes for a given stream. + */ + public static final List SUPPORTED_SYNC_MODES = List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL); + + /** + * Builds an {@link AirbyteStream} with the correct configuration for this source. + * + * @param streamName The name of the stream. + * @param streamNamespace The namespace of the stream. + * @param fields The fields associated with the stream. + * @return The configured {@link AirbyteStream} for this source. + */ + public static AirbyteStream buildAirbyteStream(final String streamName, final String streamNamespace, final List fields) { + return CatalogHelpers.createAirbyteStream(streamName, streamNamespace, addCdcMetadataColumns(fields)) + .withSupportedSyncModes(SUPPORTED_SYNC_MODES) + .withSourceDefinedCursor(true) + .withDefaultCursorField(List.of(DEFAULT_CURSOR_FIELD)) + .withSourceDefinedPrimaryKey(List.of(List.of(DEFAULT_CURSOR_FIELD))); + } + + /** + * Adds the metadata columns required to use CDC to the list of discovered fields. + * + * @param fields The list of discovered fields. + * @return The modified list of discovered fields that includes the required CDC metadata columns. + */ + public static List addCdcMetadataColumns(final List fields) { + final List modifiedFields = new ArrayList<>(fields); + modifiedFields.add(new Field(DebeziumEventUtils.CDC_LSN, JsonSchemaType.NUMBER)); + modifiedFields.add(new Field(DebeziumEventUtils.CDC_UPDATED_AT, JsonSchemaType.STRING)); + modifiedFields.add(new Field(DebeziumEventUtils.CDC_DELETED_AT, JsonSchemaType.STRING)); + return modifiedFields; + } + } diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoUtil.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoUtil.java index 114227bfedea3..349cf88d345f0 100644 --- a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoUtil.java +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/main/java/io/airbyte/integrations/source/mongodb/internal/MongoUtil.java @@ -4,7 +4,6 @@ package io.airbyte.integrations.source.mongodb.internal; -import com.google.common.collect.Lists; import com.mongodb.MongoCommandException; import com.mongodb.MongoException; import com.mongodb.MongoSecurityException; @@ -16,7 +15,6 @@ import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.protocol.models.v0.AirbyteStream; -import io.airbyte.protocol.models.v0.CatalogHelpers; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -24,8 +22,6 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; - -import io.airbyte.protocol.models.v0.SyncMode; import org.bson.Document; public class MongoUtil { diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoCatalogHelperTest.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoCatalogHelperTest.java index c1d67a11f6a35..f67e0e7f16454 100644 --- a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoCatalogHelperTest.java +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoCatalogHelperTest.java @@ -1,12 +1,8 @@ -package io.airbyte.integrations.source.mongodb.internal; +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ -import io.airbyte.integrations.debezium.internals.DebeziumEventUtils; -import io.airbyte.protocol.models.Field; -import io.airbyte.protocol.models.JsonSchemaType; -import io.airbyte.protocol.models.v0.AirbyteStream; -import org.junit.jupiter.api.Test; - -import java.util.List; +package io.airbyte.integrations.source.mongodb.internal; import static io.airbyte.integrations.source.mongodb.internal.MongoCatalogHelper.DEFAULT_CURSOR_FIELD; import static io.airbyte.integrations.source.mongodb.internal.MongoCatalogHelper.SUPPORTED_SYNC_MODES; @@ -14,33 +10,44 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import io.airbyte.integrations.debezium.internals.DebeziumEventUtils; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaType; +import io.airbyte.protocol.models.v0.AirbyteStream; +import java.util.List; +import org.junit.jupiter.api.Test; + class MongoCatalogHelperTest { - @Test - void testBuildingAirbyteStream() { - final String streamName = "name"; - final String streamNamespace = "namespace"; - final List discoveredFields = List.of(new Field("field1", JsonSchemaType.STRING), - new Field("field2", JsonSchemaType.NUMBER)); - - final AirbyteStream airbyteStream = MongoCatalogHelper.buildAirbyteStream(streamName, streamNamespace, discoveredFields); - - assertNotNull(airbyteStream); - assertEquals(streamNamespace, airbyteStream.getNamespace()); - assertEquals(streamName, airbyteStream.getName()); - assertEquals(List.of(DEFAULT_CURSOR_FIELD), airbyteStream.getDefaultCursorField()); - assertEquals(true, airbyteStream.getSourceDefinedCursor()); - assertEquals(List.of(List.of(DEFAULT_CURSOR_FIELD)), airbyteStream.getSourceDefinedPrimaryKey()); - assertEquals(SUPPORTED_SYNC_MODES, airbyteStream.getSupportedSyncModes()); - assertEquals(5, airbyteStream.getJsonSchema().get("properties").size()); - - discoveredFields.forEach(f -> assertTrue(airbyteStream.getJsonSchema().get("properties").has(f.getName()))); - assertTrue(airbyteStream.getJsonSchema().get("properties").has(DebeziumEventUtils.CDC_LSN)); - assertEquals(JsonSchemaType.NUMBER.getJsonSchemaTypeMap().get("type"), airbyteStream.getJsonSchema().get("properties").get(DebeziumEventUtils.CDC_LSN).get("type").asText()); - assertTrue(airbyteStream.getJsonSchema().get("properties").has(DebeziumEventUtils.CDC_DELETED_AT)); - assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"), airbyteStream.getJsonSchema().get("properties").get(DebeziumEventUtils.CDC_DELETED_AT).get("type").asText()); - assertTrue(airbyteStream.getJsonSchema().get("properties").has(DebeziumEventUtils.CDC_UPDATED_AT)); - assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"), airbyteStream.getJsonSchema().get("properties").get(DebeziumEventUtils.CDC_UPDATED_AT).get("type").asText()); - - } + @Test + void testBuildingAirbyteStream() { + final String streamName = "name"; + final String streamNamespace = "namespace"; + final List discoveredFields = List.of(new Field("field1", JsonSchemaType.STRING), + new Field("field2", JsonSchemaType.NUMBER)); + + final AirbyteStream airbyteStream = MongoCatalogHelper.buildAirbyteStream(streamName, streamNamespace, discoveredFields); + + assertNotNull(airbyteStream); + assertEquals(streamNamespace, airbyteStream.getNamespace()); + assertEquals(streamName, airbyteStream.getName()); + assertEquals(List.of(DEFAULT_CURSOR_FIELD), airbyteStream.getDefaultCursorField()); + assertEquals(true, airbyteStream.getSourceDefinedCursor()); + assertEquals(List.of(List.of(DEFAULT_CURSOR_FIELD)), airbyteStream.getSourceDefinedPrimaryKey()); + assertEquals(SUPPORTED_SYNC_MODES, airbyteStream.getSupportedSyncModes()); + assertEquals(5, airbyteStream.getJsonSchema().get("properties").size()); + + discoveredFields.forEach(f -> assertTrue(airbyteStream.getJsonSchema().get("properties").has(f.getName()))); + assertTrue(airbyteStream.getJsonSchema().get("properties").has(DebeziumEventUtils.CDC_LSN)); + assertEquals(JsonSchemaType.NUMBER.getJsonSchemaTypeMap().get("type"), + airbyteStream.getJsonSchema().get("properties").get(DebeziumEventUtils.CDC_LSN).get("type").asText()); + assertTrue(airbyteStream.getJsonSchema().get("properties").has(DebeziumEventUtils.CDC_DELETED_AT)); + assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"), + airbyteStream.getJsonSchema().get("properties").get(DebeziumEventUtils.CDC_DELETED_AT).get("type").asText()); + assertTrue(airbyteStream.getJsonSchema().get("properties").has(DebeziumEventUtils.CDC_UPDATED_AT)); + assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"), + airbyteStream.getJsonSchema().get("properties").get(DebeziumEventUtils.CDC_UPDATED_AT).get("type").asText()); + + } + } diff --git a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSourceTest.java b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSourceTest.java index c33bda18cc3d4..05ff40a11b681 100644 --- a/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSourceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-internal-poc/src/test/java/io/airbyte/integrations/source/mongodb/internal/MongoDbSourceTest.java @@ -141,16 +141,26 @@ void testDiscoverOperation() throws IOException { assertTrue(stream.isPresent()); assertEquals(DB_NAME, stream.get().getNamespace()); assertEquals("testCollection", stream.get().getName()); - assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"), stream.get().getJsonSchema().get("properties").get("_id").get("type").asText()); - assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"), stream.get().getJsonSchema().get("properties").get("name").get("type").asText()); - assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"), stream.get().getJsonSchema().get("properties").get("last_updated").get("type").asText()); - assertEquals(JsonSchemaType.NUMBER.getJsonSchemaTypeMap().get("type"), stream.get().getJsonSchema().get("properties").get("total").get("type").asText()); - assertEquals(JsonSchemaType.NUMBER.getJsonSchemaTypeMap().get("type"), stream.get().getJsonSchema().get("properties").get("price").get("type").asText()); - assertEquals(JsonSchemaType.ARRAY.getJsonSchemaTypeMap().get("type"), stream.get().getJsonSchema().get("properties").get("items").get("type").asText()); - assertEquals(JsonSchemaType.OBJECT.getJsonSchemaTypeMap().get("type"), stream.get().getJsonSchema().get("properties").get("owners").get("type").asText()); - assertEquals(JsonSchemaType.NUMBER.getJsonSchemaTypeMap().get("type"), stream.get().getJsonSchema().get("properties").get(DebeziumEventUtils.CDC_LSN).get("type").asText()); - assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"), stream.get().getJsonSchema().get("properties").get(DebeziumEventUtils.CDC_DELETED_AT).get("type").asText()); - assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"), stream.get().getJsonSchema().get("properties").get(DebeziumEventUtils.CDC_UPDATED_AT).get("type").asText()); + assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"), + stream.get().getJsonSchema().get("properties").get("_id").get("type").asText()); + assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"), + stream.get().getJsonSchema().get("properties").get("name").get("type").asText()); + assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"), + stream.get().getJsonSchema().get("properties").get("last_updated").get("type").asText()); + assertEquals(JsonSchemaType.NUMBER.getJsonSchemaTypeMap().get("type"), + stream.get().getJsonSchema().get("properties").get("total").get("type").asText()); + assertEquals(JsonSchemaType.NUMBER.getJsonSchemaTypeMap().get("type"), + stream.get().getJsonSchema().get("properties").get("price").get("type").asText()); + assertEquals(JsonSchemaType.ARRAY.getJsonSchemaTypeMap().get("type"), + stream.get().getJsonSchema().get("properties").get("items").get("type").asText()); + assertEquals(JsonSchemaType.OBJECT.getJsonSchemaTypeMap().get("type"), + stream.get().getJsonSchema().get("properties").get("owners").get("type").asText()); + assertEquals(JsonSchemaType.NUMBER.getJsonSchemaTypeMap().get("type"), + stream.get().getJsonSchema().get("properties").get(DebeziumEventUtils.CDC_LSN).get("type").asText()); + assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"), + stream.get().getJsonSchema().get("properties").get(DebeziumEventUtils.CDC_DELETED_AT).get("type").asText()); + assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"), + stream.get().getJsonSchema().get("properties").get(DebeziumEventUtils.CDC_UPDATED_AT).get("type").asText()); assertEquals(true, stream.get().getSourceDefinedCursor()); assertEquals(List.of(MongoCatalogHelper.DEFAULT_CURSOR_FIELD), stream.get().getDefaultCursorField()); assertEquals(List.of(List.of(MongoCatalogHelper.DEFAULT_CURSOR_FIELD)), stream.get().getSourceDefinedPrimaryKey());