Skip to content

Commit

Permalink
Create a DbSourceDiscoverUtil for common utilities (#23929)
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk authored Mar 14, 2023
1 parent ace229f commit 16e81b7
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
Expand Down Expand Up @@ -109,24 +108,10 @@ public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
public AirbyteCatalog discover(final JsonNode config) throws Exception {
try {
final Database database = createDatabase(config);
final List<AirbyteStream> streams = getTables(database).stream()
.map(tableInfo -> {
final var primaryKeys = tableInfo.getPrimaryKeys().stream()
.filter(Objects::nonNull)
.map(Collections::singletonList)
.collect(Collectors.toList());

return CatalogHelpers
.createAirbyteStream(tableInfo.getName(), tableInfo.getNameSpace(),
tableInfo.getFields())
.withSupportedSyncModes(
tableInfo.getCursorFields() != null && tableInfo.getCursorFields().isEmpty()
? Lists.newArrayList(SyncMode.FULL_REFRESH)
: Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(primaryKeys);
})
.collect(Collectors.toList());
return new AirbyteCatalog().withStreams(streams);
final List<TableInfo<CommonField<DataType>>> tableInfos = discoverWithoutSystemTables(database);
final Map<String, List<String>> fullyQualifiedTableNameToPrimaryKeys = discoverPrimaryKeys(
database, tableInfos);
return DbSourceDiscoverUtil.convertTableInfosToAirbyteCatalog(tableInfos, fullyQualifiedTableNameToPrimaryKeys, this::getAirbyteType);
} finally {
close();
}
Expand Down Expand Up @@ -186,11 +171,11 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,

// in case of user manually modified source table schema but did not refresh it and save into the
// catalog - it can lead to sync failure. This method compare actual schema vs catalog schema
private void logSourceSchemaChange(Map<String, TableInfo<CommonField<DataType>>> fullyQualifiedTableNameToInfo,
ConfiguredAirbyteCatalog catalog) {
private void logSourceSchemaChange(final Map<String, TableInfo<CommonField<DataType>>> fullyQualifiedTableNameToInfo,
final ConfiguredAirbyteCatalog catalog) {
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
final AirbyteStream stream = airbyteStream.getStream();
final String fullyQualifiedTableName = getFullyQualifiedTableName(stream.getNamespace(),
final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(stream.getNamespace(),
stream.getName());
if (!fullyQualifiedTableNameToInfo.containsKey(fullyQualifiedTableName)) {
continue;
Expand Down Expand Up @@ -222,7 +207,7 @@ private void validateCursorFieldForIncrementalTables(
final List<InvalidCursorInfo> tablesWithInvalidCursor = new ArrayList<>();
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
final AirbyteStream stream = airbyteStream.getStream();
final String fullyQualifiedTableName = getFullyQualifiedTableName(stream.getNamespace(),
final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(stream.getNamespace(),
stream.getName());
final boolean hasSourceDefinedCursor =
!Objects.isNull(airbyteStream.getStream().getSourceDefinedCursor())
Expand Down Expand Up @@ -365,7 +350,7 @@ private List<AutoCloseableIterator<AirbyteMessage>> getSelectedIterators(
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
if (selector.test(airbyteStream)) {
final AirbyteStream stream = airbyteStream.getStream();
final String fullyQualifiedTableName = getFullyQualifiedTableName(stream.getNamespace(),
final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(stream.getNamespace(),
stream.getName());
if (!tableNameToTable.containsKey(fullyQualifiedTableName)) {
LOGGER
Expand Down Expand Up @@ -537,10 +522,6 @@ private AutoCloseableIterator<AirbyteMessage> getFullRefreshStream(final Databas
return getMessageIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli());
}

private String getFullyQualifiedTableName(final String nameSpace, final String tableName) {
return nameSpace != null ? nameSpace + "." + tableName : tableName;
}

private AutoCloseableIterator<AirbyteMessage> getMessageIterator(
final AutoCloseableIterator<JsonNode> recordIterator,
final String streamName,
Expand All @@ -555,42 +536,6 @@ private AutoCloseableIterator<AirbyteMessage> getMessageIterator(
.withData(r)));
}

/**
* Get list of source tables/data structures for schema discovery.
*
* @param database instance
* @return list of table/data structure info
* @throws Exception might throw an error during connection to database
*/
private List<TableInfo<Field>> getTables(final Database database) throws Exception {
final List<TableInfo<CommonField<DataType>>> tableInfos = discoverWithoutSystemTables(database);
final Map<String, List<String>> fullyQualifiedTableNameToPrimaryKeys = discoverPrimaryKeys(
database, tableInfos);

return tableInfos.stream()
.map(t -> {
// some databases return multiple copies of the same record for a column (e.g. redshift) because
// they have at least once delivery guarantees. we want to dedupe these, but first we check that the
// records are actually the same and provide a good error message if they are not.
assertColumnsWithSameNameAreSame(t.getNameSpace(), t.getName(), t.getFields());
final List<Field> fields = t.getFields()
.stream()
.map(this::toField)
.distinct()
.collect(Collectors.toList());
final String fullyQualifiedTableName = getFullyQualifiedTableName(t.getNameSpace(),
t.getName());
final List<String> primaryKeys = fullyQualifiedTableNameToPrimaryKeys.getOrDefault(
fullyQualifiedTableName, Collections
.emptyList());
return TableInfo.<Field>builder().nameSpace(t.getNameSpace()).name(t.getName())
.fields(fields).primaryKeys(primaryKeys)
.cursorFields(t.getCursorFields())
.build();
})
.collect(Collectors.toList());
}

private Field toField(final CommonField<DataType> field) {
if (getAirbyteType(field.getType()) == JsonSchemaType.OBJECT && field.getProperties() != null
&& !field.getProperties().isEmpty()) {
Expand All @@ -601,25 +546,6 @@ private Field toField(final CommonField<DataType> field) {
}
}

private void assertColumnsWithSameNameAreSame(final String nameSpace,
final String tableName,
final List<CommonField<DataType>> columns) {
columns.stream()
.collect(Collectors.groupingBy(CommonField<DataType>::getName))
.values()
.forEach(columnsWithSameName -> {
final CommonField<DataType> comparisonColumn = columnsWithSameName.get(0);
columnsWithSameName.forEach(column -> {
if (!column.equals(comparisonColumn)) {
throw new RuntimeException(
String.format(
"Found multiple columns with same name: %s in table: %s.%s but the columns are not the same. columns: %s",
comparisonColumn.getName(), nameSpace, tableName, columns));
}
});
});
}

/**
* @param database - The database where from privileges for tables will be consumed
* @param schema - The schema where from privileges for tables will be consumed
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.relationaldb;

import com.google.common.collect.Lists;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;

public class DbSourceDiscoverUtil {

public static <DataType> AirbyteCatalog convertTableInfosToAirbyteCatalog(final List<TableInfo<CommonField<DataType>>> tableInfos,
final Map<String, List<String>> fullyQualifiedTableNameToPrimaryKeys,
final Function<DataType, JsonSchemaType> airbyteTypeConverter) {
final List<TableInfo<Field>> tableInfoFieldList = tableInfos.stream()
.map(t -> {
// some databases return multiple copies of the same record for a column (e.g. redshift) because
// they have at least once delivery guarantees. we want to dedupe these, but first we check that the
// records are actually the same and provide a good error message if they are not.
assertColumnsWithSameNameAreSame(t.getNameSpace(), t.getName(), t.getFields());
final List<Field> fields = t.getFields()
.stream()
.map(commonField -> toField(commonField, airbyteTypeConverter))
.distinct()
.collect(Collectors.toList());
final String fullyQualifiedTableName = getFullyQualifiedTableName(t.getNameSpace(),
t.getName());
final List<String> primaryKeys = fullyQualifiedTableNameToPrimaryKeys.getOrDefault(
fullyQualifiedTableName, Collections
.emptyList());
return TableInfo.<Field>builder().nameSpace(t.getNameSpace()).name(t.getName())
.fields(fields).primaryKeys(primaryKeys)
.cursorFields(t.getCursorFields())
.build();
})
.collect(Collectors.toList());

final List<AirbyteStream> streams = tableInfoFieldList.stream()
.map(tableInfo -> {
final var primaryKeys = tableInfo.getPrimaryKeys().stream()
.filter(Objects::nonNull)
.map(Collections::singletonList)
.collect(Collectors.toList());

return CatalogHelpers
.createAirbyteStream(tableInfo.getName(), tableInfo.getNameSpace(),
tableInfo.getFields())
.withSupportedSyncModes(
tableInfo.getCursorFields() != null && tableInfo.getCursorFields().isEmpty()
? Lists.newArrayList(SyncMode.FULL_REFRESH)
: Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(primaryKeys);
})
.collect(Collectors.toList());
return new AirbyteCatalog().withStreams(streams);
}

public static String getFullyQualifiedTableName(final String nameSpace, final String tableName) {
return nameSpace != null ? nameSpace + "." + tableName : tableName;
}

public static <DataType> Field toField(final CommonField<DataType> commonField, final Function<DataType, JsonSchemaType> airbyteTypeConverter) {
if (airbyteTypeConverter.apply(commonField.getType()) == JsonSchemaType.OBJECT && commonField.getProperties() != null
&& !commonField.getProperties().isEmpty()) {
final var properties = commonField.getProperties().stream().map(commField -> toField(commField, airbyteTypeConverter)).toList();
return Field.of(commonField.getName(), airbyteTypeConverter.apply(commonField.getType()), properties);
} else {
return Field.of(commonField.getName(), airbyteTypeConverter.apply(commonField.getType()));
}
}

private static <DataType> void assertColumnsWithSameNameAreSame(final String nameSpace,
final String tableName,
final List<CommonField<DataType>> columns) {
columns.stream()
.collect(Collectors.groupingBy(CommonField<DataType>::getName))
.values()
.forEach(columnsWithSameName -> {
final CommonField<DataType> comparisonColumn = columnsWithSameName.get(0);
columnsWithSameName.forEach(column -> {
if (!column.equals(comparisonColumn)) {
throw new RuntimeException(
String.format(
"Found multiple columns with same name: %s in table: %s.%s but the columns are not the same. columns: %s",
comparisonColumn.getName(), nameSpace, tableName, columns));
}
});
});
}

}

0 comments on commit 16e81b7

Please sign in to comment.