diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java index dd724538d47f..ac88a7192d43 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.source.relationaldb; import static io.airbyte.integrations.base.errors.messages.ErrorMessage.getErrorMessage; +import static io.airbyte.protocol.models.v0.CatalogHelpers.fieldsToJsonSchema; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; @@ -161,6 +162,8 @@ public AutoCloseableIterator read(final JsonNode config, validateCursorFieldForIncrementalTables(fullyQualifiedTableNameToInfo, catalog, database); + logSourceSchemaChange(fullyQualifiedTableNameToInfo, catalog); + final List> incrementalIterators = getIncrementalIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager, emittedAt); @@ -180,6 +183,36 @@ public AutoCloseableIterator read(final JsonNode config, }); } + // in case of user manually modified source table schema but did not refresh it and save into the + // catalog - it can lead to sync failure. This method compare actual schema vs catalog schema + private void logSourceSchemaChange(Map>> fullyQualifiedTableNameToInfo, + ConfiguredAirbyteCatalog catalog) { + for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) { + final AirbyteStream stream = airbyteStream.getStream(); + final String fullyQualifiedTableName = getFullyQualifiedTableName(stream.getNamespace(), + stream.getName()); + if (!fullyQualifiedTableNameToInfo.containsKey(fullyQualifiedTableName)) { + continue; + } + final TableInfo> table = fullyQualifiedTableNameToInfo.get(fullyQualifiedTableName); + final List fields = table.getFields() + .stream() + .map(this::toField) + .distinct() + .collect(Collectors.toList()); + final JsonNode currentJsonSchema = fieldsToJsonSchema(fields); + + final JsonNode catalogSchema = stream.getJsonSchema(); + if (!catalogSchema.equals(currentJsonSchema)) { + LOGGER.warn( + "Source schema changed for table {}! Actual schema: {}. Catalog schema: {}", + fullyQualifiedTableName, + currentJsonSchema, + catalogSchema); + } + } + } + private void validateCursorFieldForIncrementalTables( final Map>> tableNameToTable, final ConfiguredAirbyteCatalog catalog,