Skip to content

Commit

Permalink
Verify catalog schema fieldNames is a subset of DB schema (#24207)
Browse files Browse the repository at this point in the history
* Update current schema vs catalog validation to omit airbyte metadata fields
* Bumped versions for postgres, mssql, and mySQL + changelog
* Bumped dockerfile version of strict encrypt
* Manually generate definitions for mysql
* Manually generate definitions for mssql

---------

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
Duy Nguyen and octavia-squidington-iii authored Mar 22, 2023
1 parent bb62d80 commit c1323f6
Show file tree
Hide file tree
Showing 13 changed files with 318 additions and 291 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,7 @@
- name: Microsoft SQL Server (MSSQL)
sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerRepository: airbyte/source-mssql
dockerImageTag: 1.0.4
dockerImageTag: 1.0.5
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
icon: mssql.svg
sourceType: database
Expand Down Expand Up @@ -1253,7 +1253,7 @@
- name: MySQL
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerRepository: airbyte/source-mysql
dockerImageTag: 2.0.6
dockerImageTag: 2.0.7
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
icon: mysql.svg
sourceType: database
Expand Down Expand Up @@ -1565,7 +1565,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 2.0.6
dockerImageTag: 2.0.7
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
6 changes: 3 additions & 3 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8460,7 +8460,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mssql:1.0.4"
- dockerImage: "airbyte/source-mssql:1.0.5"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/mssql"
connectionSpecification:
Expand Down Expand Up @@ -9335,7 +9335,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mysql:2.0.6"
- dockerImage: "airbyte/source-mysql:2.0.7"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/mysql"
connectionSpecification:
Expand Down Expand Up @@ -11911,7 +11911,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:2.0.6"
- dockerImage: "airbyte/source-postgres:2.0.7"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.4
LABEL io.airbyte.version=1.0.5
LABEL io.airbyte.name=airbyte/source-mssql-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.4
LABEL io.airbyte.version=1.0.5
LABEL io.airbyte.name=airbyte/source-mssql
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ ENV APPLICATION source-mysql-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.6
LABEL io.airbyte.version=2.0.7

LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ ENV APPLICATION source-mysql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.6
LABEL io.airbyte.version=2.0.7

LABEL io.airbyte.name=airbyte/source-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.6
LABEL io.airbyte.version=2.0.7
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.6
LABEL io.airbyte.version=2.0.7
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.source.relationaldb;

import static io.airbyte.protocol.models.v0.CatalogHelpers.fieldsToJsonSchema;
import static java.util.stream.Collectors.toList;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
Expand All @@ -17,6 +18,8 @@
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -32,16 +35,22 @@
public class DbSourceDiscoverUtil {

private static final Logger LOGGER = LoggerFactory.getLogger(DbSourceDiscoverUtil.class);
private static final List<String> AIRBYTE_METADATA = Arrays.asList("_ab_cdc_lsn",
"_ab_cdc_updated_at",
"_ab_cdc_deleted_at");

// In case of user manually modified source table schema but did not refresh it and save into the
// catalog - it can lead to sync failure. This method compare actual schema vs catalog schema
/*
* This method logs schema drift between source table and the catalog. This can happen if
* (i) underlying table schema changed between syncs
* (ii) The source connector's mapping of datatypes to Airbyte types changed between runs
*/
public static <DataType> void logSourceSchemaChange(final Map<String, TableInfo<CommonField<DataType>>> fullyQualifiedTableNameToInfo,
final ConfiguredAirbyteCatalog catalog,
final Function<DataType, JsonSchemaType> airbyteTypeConverter) {
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
final AirbyteStream stream = airbyteStream.getStream();
final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(stream.getNamespace(),
stream.getName());
stream.getName());
if (!fullyQualifiedTableNameToInfo.containsKey(fullyQualifiedTableName)) {
continue;
}
Expand All @@ -50,14 +59,30 @@ public static <DataType> void logSourceSchemaChange(final Map<String, TableInfo<
.stream()
.map(commonField -> toField(commonField, airbyteTypeConverter))
.distinct()
.collect(Collectors.toList());
.collect(toList());
final JsonNode currentJsonSchema = fieldsToJsonSchema(fields);

final JsonNode catalogSchema = stream.getJsonSchema();
if (!catalogSchema.equals(currentJsonSchema)) {
final JsonNode currentSchemaProperties = currentJsonSchema.get("properties");
final JsonNode catalogProperties = catalogSchema.get("properties");
final List<String> mismatchedFields = new ArrayList<>();
catalogProperties.fieldNames().forEachRemaining(fieldName -> {
// Ignoring metadata fields since those are automatically added onto the catalog schema by Airbyte
// and don't exist in the source schema. They should not be considered a change
if (AIRBYTE_METADATA.contains(fieldName)) {
return;
}

if (!currentSchemaProperties.has(fieldName) ||
!currentSchemaProperties.get(fieldName).equals(catalogProperties.get(fieldName))) {
mismatchedFields.add(fieldName);
}
});

if (!mismatchedFields.isEmpty()) {
LOGGER.warn(
"Source schema changed for table {}! Actual schema: {}. Catalog schema: {}",
"Source schema changed for table {}! Potential mismatches: {}. Actual schema: {}. Catalog schema: {}",
fullyQualifiedTableName,
String.join(", ", mismatchedFields.toString()),
currentJsonSchema,
catalogSchema);
}
Expand All @@ -77,9 +102,9 @@ public static <DataType> AirbyteCatalog convertTableInfosToAirbyteCatalog(final
.stream()
.map(commonField -> toField(commonField, airbyteTypeConverter))
.distinct()
.collect(Collectors.toList());
.collect(toList());
final String fullyQualifiedTableName = getFullyQualifiedTableName(t.getNameSpace(),
t.getName());
t.getName());
final List<String> primaryKeys = fullyQualifiedTableNameToPrimaryKeys.getOrDefault(
fullyQualifiedTableName, Collections
.emptyList());
Expand All @@ -88,25 +113,25 @@ public static <DataType> AirbyteCatalog convertTableInfosToAirbyteCatalog(final
.cursorFields(t.getCursorFields())
.build();
})
.collect(Collectors.toList());
.collect(toList());

final List<AirbyteStream> streams = tableInfoFieldList.stream()
.map(tableInfo -> {
final var primaryKeys = tableInfo.getPrimaryKeys().stream()
.filter(Objects::nonNull)
.map(Collections::singletonList)
.collect(Collectors.toList());
.collect(toList());

return CatalogHelpers
.createAirbyteStream(tableInfo.getName(), tableInfo.getNameSpace(),
tableInfo.getFields())
tableInfo.getFields())
.withSupportedSyncModes(
tableInfo.getCursorFields() != null && tableInfo.getCursorFields().isEmpty()
? Lists.newArrayList(SyncMode.FULL_REFRESH)
: Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
? Lists.newArrayList(SyncMode.FULL_REFRESH)
: Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(primaryKeys);
})
.collect(Collectors.toList());
.collect(toList());
return new AirbyteCatalog().withStreams(streams);
}

Expand Down Expand Up @@ -142,5 +167,4 @@ private static <DataType> void assertColumnsWithSameNameAreSame(final String nam
});
});
}

}
}
Loading

0 comments on commit c1323f6

Please sign in to comment.