Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Source MongoDB Internal POC: Source Defined Cursor #29044

Merged
merged 9 commits into from
Aug 4, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ dependencies {
implementation libs.jackson.databind
implementation project(':airbyte-db:db-lib')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-integrations:bases:debezium')
implementation libs.airbyte.protocol
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)

implementation 'org.mongodb:mongodb-driver-sync:4.10.2'

testImplementation testFixtures(project(':airbyte-integrations:bases:debezium'))
testImplementation "org.jetbrains.kotlinx:kotlinx-cli:0.3.5"

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mongodb.internal;

import io.airbyte.integrations.debezium.internals.DebeziumEventUtils;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.ArrayList;
import java.util.List;

/**
* Collection of utility methods for generating the {@link AirbyteCatalog}.
*/
public class MongoCatalogHelper {

/**
* The default cursor field name.
*/
public static final String DEFAULT_CURSOR_FIELD = "_id";

/**
* The list of supported sync modes for a given stream.
*/
public static final List<SyncMode> SUPPORTED_SYNC_MODES = List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL);

/**
* Builds an {@link AirbyteStream} with the correct configuration for this source.
*
* @param streamName The name of the stream.
* @param streamNamespace The namespace of the stream.
* @param fields The fields associated with the stream.
* @return The configured {@link AirbyteStream} for this source.
*/
public static AirbyteStream buildAirbyteStream(final String streamName, final String streamNamespace, final List<Field> fields) {
return CatalogHelpers.createAirbyteStream(streamName, streamNamespace, addCdcMetadataColumns(fields))
.withSupportedSyncModes(SUPPORTED_SYNC_MODES)
.withSourceDefinedCursor(true)
.withDefaultCursorField(List.of(DEFAULT_CURSOR_FIELD))
.withSourceDefinedPrimaryKey(List.of(List.of(DEFAULT_CURSOR_FIELD)));
}

/**
* Adds the metadata columns required to use CDC to the list of discovered fields.
*
* @param fields The list of discovered fields.
* @return The modified list of discovered fields that includes the required CDC metadata columns.
*/
public static List<Field> addCdcMetadataColumns(final List<Field> fields) {
final List<Field> modifiedFields = new ArrayList<>(fields);
modifiedFields.add(new Field(DebeziumEventUtils.CDC_LSN, JsonSchemaType.NUMBER));
modifiedFields.add(new Field(DebeziumEventUtils.CDC_UPDATED_AT, JsonSchemaType.STRING));
modifiedFields.add(new Field(DebeziumEventUtils.CDC_DELETED_AT, JsonSchemaType.STRING));
return modifiedFields;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -83,11 +82,15 @@ public static List<AirbyteStream> getAirbyteStreams(final MongoClient mongoClien
final Set<String> authorizedCollections = getAuthorizedCollections(mongoClient, databaseName);
authorizedCollections.parallelStream().forEach(collectionName -> {
final List<Field> fields = getFieldsInCollection(mongoClient.getDatabase(databaseName).getCollection(collectionName));
streams.add(CatalogHelpers.createAirbyteStream(collectionName, databaseName, fields));
streams.add(createAirbyteStream(collectionName, databaseName, fields));
});
return streams;
}

private static AirbyteStream createAirbyteStream(final String collectionName, final String databaseName, final List<Field> fields) {
return MongoCatalogHelper.buildAirbyteStream(collectionName, databaseName, fields);
}

private static List<Field> getFieldsInCollection(final MongoCollection collection) {
final Map<String, Object> fieldsMap = Map.of("input", Map.of("$objectToArray", "$$ROOT"),
"as", "each",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mongodb.internal;

import static io.airbyte.integrations.source.mongodb.internal.MongoCatalogHelper.DEFAULT_CURSOR_FIELD;
import static io.airbyte.integrations.source.mongodb.internal.MongoCatalogHelper.SUPPORTED_SYNC_MODES;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.airbyte.integrations.debezium.internals.DebeziumEventUtils;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteStream;
import java.util.List;
import org.junit.jupiter.api.Test;

class MongoCatalogHelperTest {

@Test
void testBuildingAirbyteStream() {
final String streamName = "name";
final String streamNamespace = "namespace";
final List<Field> discoveredFields = List.of(new Field("field1", JsonSchemaType.STRING),
new Field("field2", JsonSchemaType.NUMBER));

final AirbyteStream airbyteStream = MongoCatalogHelper.buildAirbyteStream(streamName, streamNamespace, discoveredFields);

assertNotNull(airbyteStream);
assertEquals(streamNamespace, airbyteStream.getNamespace());
assertEquals(streamName, airbyteStream.getName());
assertEquals(List.of(DEFAULT_CURSOR_FIELD), airbyteStream.getDefaultCursorField());
assertEquals(true, airbyteStream.getSourceDefinedCursor());
assertEquals(List.of(List.of(DEFAULT_CURSOR_FIELD)), airbyteStream.getSourceDefinedPrimaryKey());
assertEquals(SUPPORTED_SYNC_MODES, airbyteStream.getSupportedSyncModes());
assertEquals(5, airbyteStream.getJsonSchema().get("properties").size());

discoveredFields.forEach(f -> assertTrue(airbyteStream.getJsonSchema().get("properties").has(f.getName())));
assertTrue(airbyteStream.getJsonSchema().get("properties").has(DebeziumEventUtils.CDC_LSN));
assertEquals(JsonSchemaType.NUMBER.getJsonSchemaTypeMap().get("type"),
airbyteStream.getJsonSchema().get("properties").get(DebeziumEventUtils.CDC_LSN).get("type").asText());
assertTrue(airbyteStream.getJsonSchema().get("properties").has(DebeziumEventUtils.CDC_DELETED_AT));
assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"),
airbyteStream.getJsonSchema().get("properties").get(DebeziumEventUtils.CDC_DELETED_AT).get("type").asText());
assertTrue(airbyteStream.getJsonSchema().get("properties").has(DebeziumEventUtils.CDC_UPDATED_AT));
assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"),
airbyteStream.getJsonSchema().get("properties").get(DebeziumEventUtils.CDC_UPDATED_AT).get("type").asText());

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import com.mongodb.connection.ClusterType;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.debezium.internals.DebeziumEventUtils;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.bson.Document;
Expand Down Expand Up @@ -138,13 +141,30 @@ void testDiscoverOperation() throws IOException {
assertTrue(stream.isPresent());
assertEquals(DB_NAME, stream.get().getNamespace());
assertEquals("testCollection", stream.get().getName());
assertEquals("string", stream.get().getJsonSchema().get("properties").get("_id").get("type").asText());
assertEquals("string", stream.get().getJsonSchema().get("properties").get("name").get("type").asText());
assertEquals("string", stream.get().getJsonSchema().get("properties").get("last_updated").get("type").asText());
assertEquals("number", stream.get().getJsonSchema().get("properties").get("total").get("type").asText());
assertEquals("number", stream.get().getJsonSchema().get("properties").get("price").get("type").asText());
assertEquals("array", stream.get().getJsonSchema().get("properties").get("items").get("type").asText());
assertEquals("object", stream.get().getJsonSchema().get("properties").get("owners").get("type").asText());
assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"),
stream.get().getJsonSchema().get("properties").get("_id").get("type").asText());
assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"),
stream.get().getJsonSchema().get("properties").get("name").get("type").asText());
assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"),
stream.get().getJsonSchema().get("properties").get("last_updated").get("type").asText());
assertEquals(JsonSchemaType.NUMBER.getJsonSchemaTypeMap().get("type"),
stream.get().getJsonSchema().get("properties").get("total").get("type").asText());
assertEquals(JsonSchemaType.NUMBER.getJsonSchemaTypeMap().get("type"),
stream.get().getJsonSchema().get("properties").get("price").get("type").asText());
assertEquals(JsonSchemaType.ARRAY.getJsonSchemaTypeMap().get("type"),
stream.get().getJsonSchema().get("properties").get("items").get("type").asText());
assertEquals(JsonSchemaType.OBJECT.getJsonSchemaTypeMap().get("type"),
stream.get().getJsonSchema().get("properties").get("owners").get("type").asText());
assertEquals(JsonSchemaType.NUMBER.getJsonSchemaTypeMap().get("type"),
stream.get().getJsonSchema().get("properties").get(DebeziumEventUtils.CDC_LSN).get("type").asText());
assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"),
stream.get().getJsonSchema().get("properties").get(DebeziumEventUtils.CDC_DELETED_AT).get("type").asText());
assertEquals(JsonSchemaType.STRING.getJsonSchemaTypeMap().get("type"),
stream.get().getJsonSchema().get("properties").get(DebeziumEventUtils.CDC_UPDATED_AT).get("type").asText());
assertEquals(true, stream.get().getSourceDefinedCursor());
assertEquals(List.of(MongoCatalogHelper.DEFAULT_CURSOR_FIELD), stream.get().getDefaultCursorField());
assertEquals(List.of(List.of(MongoCatalogHelper.DEFAULT_CURSOR_FIELD)), stream.get().getSourceDefinedPrimaryKey());
assertEquals(MongoCatalogHelper.SUPPORTED_SYNC_MODES, stream.get().getSupportedSyncModes());
}

@Test
Expand Down