Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a DbSourceDiscoverUtil for common utilities #23929

Merged
merged 4 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
Expand Down Expand Up @@ -109,24 +108,10 @@ public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
public AirbyteCatalog discover(final JsonNode config) throws Exception {
try {
final Database database = createDatabase(config);
final List<AirbyteStream> streams = getTables(database).stream()
.map(tableInfo -> {
final var primaryKeys = tableInfo.getPrimaryKeys().stream()
.filter(Objects::nonNull)
.map(Collections::singletonList)
.collect(Collectors.toList());

return CatalogHelpers
.createAirbyteStream(tableInfo.getName(), tableInfo.getNameSpace(),
tableInfo.getFields())
.withSupportedSyncModes(
tableInfo.getCursorFields() != null && tableInfo.getCursorFields().isEmpty()
? Lists.newArrayList(SyncMode.FULL_REFRESH)
: Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(primaryKeys);
})
.collect(Collectors.toList());
return new AirbyteCatalog().withStreams(streams);
final List<TableInfo<CommonField<DataType>>> tableInfos = discoverWithoutSystemTables(database);
final Map<String, List<String>> fullyQualifiedTableNameToPrimaryKeys = discoverPrimaryKeys(
database, tableInfos);
return DbSourceDiscoverUtil.convertTableInfosToAirbyteCatalog(tableInfos, fullyQualifiedTableNameToPrimaryKeys, this::getAirbyteType);
} finally {
close();
}
Expand Down Expand Up @@ -186,11 +171,11 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,

// in case of user manually modified source table schema but did not refresh it and save into the
// catalog - it can lead to sync failure. This method compare actual schema vs catalog schema
private void logSourceSchemaChange(Map<String, TableInfo<CommonField<DataType>>> fullyQualifiedTableNameToInfo,
ConfiguredAirbyteCatalog catalog) {
private void logSourceSchemaChange(final Map<String, TableInfo<CommonField<DataType>>> fullyQualifiedTableNameToInfo,
final ConfiguredAirbyteCatalog catalog) {
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
final AirbyteStream stream = airbyteStream.getStream();
final String fullyQualifiedTableName = getFullyQualifiedTableName(stream.getNamespace(),
final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(stream.getNamespace(),
stream.getName());
if (!fullyQualifiedTableNameToInfo.containsKey(fullyQualifiedTableName)) {
continue;
Expand Down Expand Up @@ -222,7 +207,7 @@ private void validateCursorFieldForIncrementalTables(
final List<InvalidCursorInfo> tablesWithInvalidCursor = new ArrayList<>();
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
final AirbyteStream stream = airbyteStream.getStream();
final String fullyQualifiedTableName = getFullyQualifiedTableName(stream.getNamespace(),
final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(stream.getNamespace(),
stream.getName());
final boolean hasSourceDefinedCursor =
!Objects.isNull(airbyteStream.getStream().getSourceDefinedCursor())
Expand Down Expand Up @@ -365,7 +350,7 @@ private List<AutoCloseableIterator<AirbyteMessage>> getSelectedIterators(
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
if (selector.test(airbyteStream)) {
final AirbyteStream stream = airbyteStream.getStream();
final String fullyQualifiedTableName = getFullyQualifiedTableName(stream.getNamespace(),
final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(stream.getNamespace(),
stream.getName());
if (!tableNameToTable.containsKey(fullyQualifiedTableName)) {
LOGGER
Expand Down Expand Up @@ -537,10 +522,6 @@ private AutoCloseableIterator<AirbyteMessage> getFullRefreshStream(final Databas
return getMessageIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli());
}

private String getFullyQualifiedTableName(final String nameSpace, final String tableName) {
return nameSpace != null ? nameSpace + "." + tableName : tableName;
}

private AutoCloseableIterator<AirbyteMessage> getMessageIterator(
final AutoCloseableIterator<JsonNode> recordIterator,
final String streamName,
Expand All @@ -555,42 +536,6 @@ private AutoCloseableIterator<AirbyteMessage> getMessageIterator(
.withData(r)));
}

/**
* Get list of source tables/data structures for schema discovery.
*
* @param database instance
* @return list of table/data structure info
* @throws Exception might throw an error during connection to database
*/
private List<TableInfo<Field>> getTables(final Database database) throws Exception {
final List<TableInfo<CommonField<DataType>>> tableInfos = discoverWithoutSystemTables(database);
final Map<String, List<String>> fullyQualifiedTableNameToPrimaryKeys = discoverPrimaryKeys(
database, tableInfos);

return tableInfos.stream()
.map(t -> {
// some databases return multiple copies of the same record for a column (e.g. redshift) because
// they have at least once delivery guarantees. we want to dedupe these, but first we check that the
// records are actually the same and provide a good error message if they are not.
assertColumnsWithSameNameAreSame(t.getNameSpace(), t.getName(), t.getFields());
final List<Field> fields = t.getFields()
.stream()
.map(this::toField)
.distinct()
.collect(Collectors.toList());
final String fullyQualifiedTableName = getFullyQualifiedTableName(t.getNameSpace(),
t.getName());
final List<String> primaryKeys = fullyQualifiedTableNameToPrimaryKeys.getOrDefault(
fullyQualifiedTableName, Collections
.emptyList());
return TableInfo.<Field>builder().nameSpace(t.getNameSpace()).name(t.getName())
.fields(fields).primaryKeys(primaryKeys)
.cursorFields(t.getCursorFields())
.build();
})
.collect(Collectors.toList());
}

private Field toField(final CommonField<DataType> field) {
if (getAirbyteType(field.getType()) == JsonSchemaType.OBJECT && field.getProperties() != null
&& !field.getProperties().isEmpty()) {
Expand All @@ -601,25 +546,6 @@ private Field toField(final CommonField<DataType> field) {
}
}

private void assertColumnsWithSameNameAreSame(final String nameSpace,
final String tableName,
final List<CommonField<DataType>> columns) {
columns.stream()
.collect(Collectors.groupingBy(CommonField<DataType>::getName))
.values()
.forEach(columnsWithSameName -> {
final CommonField<DataType> comparisonColumn = columnsWithSameName.get(0);
columnsWithSameName.forEach(column -> {
if (!column.equals(comparisonColumn)) {
throw new RuntimeException(
String.format(
"Found multiple columns with same name: %s in table: %s.%s but the columns are not the same. columns: %s",
comparisonColumn.getName(), nameSpace, tableName, columns));
}
});
});
}

/**
* @param database - The database where from privileges for tables will be consumed
* @param schema - The schema where from privileges for tables will be consumed
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.relationaldb;

import com.google.common.collect.Lists;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;

public class DbSourceDiscoverUtil {

public static <DataType> AirbyteCatalog convertTableInfosToAirbyteCatalog(final List<TableInfo<CommonField<DataType>>> tableInfos,
final Map<String, List<String>> fullyQualifiedTableNameToPrimaryKeys,
final Function<DataType, JsonSchemaType> airbyteTypeConverter) {
final List<TableInfo<Field>> tableInfoFieldList = tableInfos.stream()
.map(t -> {
// some databases return multiple copies of the same record for a column (e.g. redshift) because
// they have at least once delivery guarantees. we want to dedupe these, but first we check that the
// records are actually the same and provide a good error message if they are not.
assertColumnsWithSameNameAreSame(t.getNameSpace(), t.getName(), t.getFields());
final List<Field> fields = t.getFields()
.stream()
.map(commonField -> toField(commonField, airbyteTypeConverter))
.distinct()
.collect(Collectors.toList());
final String fullyQualifiedTableName = getFullyQualifiedTableName(t.getNameSpace(),
t.getName());
final List<String> primaryKeys = fullyQualifiedTableNameToPrimaryKeys.getOrDefault(
fullyQualifiedTableName, Collections
.emptyList());
return TableInfo.<Field>builder().nameSpace(t.getNameSpace()).name(t.getName())
.fields(fields).primaryKeys(primaryKeys)
.cursorFields(t.getCursorFields())
.build();
})
.collect(Collectors.toList());

final List<AirbyteStream> streams = tableInfoFieldList.stream()
.map(tableInfo -> {
final var primaryKeys = tableInfo.getPrimaryKeys().stream()
.filter(Objects::nonNull)
.map(Collections::singletonList)
.collect(Collectors.toList());

return CatalogHelpers
.createAirbyteStream(tableInfo.getName(), tableInfo.getNameSpace(),
tableInfo.getFields())
.withSupportedSyncModes(
tableInfo.getCursorFields() != null && tableInfo.getCursorFields().isEmpty()
? Lists.newArrayList(SyncMode.FULL_REFRESH)
: Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(primaryKeys);
})
.collect(Collectors.toList());
return new AirbyteCatalog().withStreams(streams);
}

public static String getFullyQualifiedTableName(final String nameSpace, final String tableName) {
return nameSpace != null ? nameSpace + "." + tableName : tableName;
}

public static <DataType> Field toField(final CommonField<DataType> commonField, final Function<DataType, JsonSchemaType> airbyteTypeConverter) {
if (airbyteTypeConverter.apply(commonField.getType()) == JsonSchemaType.OBJECT && commonField.getProperties() != null
&& !commonField.getProperties().isEmpty()) {
final var properties = commonField.getProperties().stream().map(commField -> toField(commField, airbyteTypeConverter)).toList();
return Field.of(commonField.getName(), airbyteTypeConverter.apply(commonField.getType()), properties);
} else {
return Field.of(commonField.getName(), airbyteTypeConverter.apply(commonField.getType()));
}
}

private static <DataType> void assertColumnsWithSameNameAreSame(final String nameSpace,
final String tableName,
final List<CommonField<DataType>> columns) {
columns.stream()
.collect(Collectors.groupingBy(CommonField<DataType>::getName))
.values()
.forEach(columnsWithSameName -> {
final CommonField<DataType> comparisonColumn = columnsWithSameName.get(0);
columnsWithSameName.forEach(column -> {
if (!column.equals(comparisonColumn)) {
throw new RuntimeException(
String.format(
"Found multiple columns with same name: %s in table: %s.%s but the columns are not the same. columns: %s",
comparisonColumn.getName(), nameSpace, tableName, columns));
}
});
});
}

}