Skip to content

Commit

Permalink
JDBC Sources: validate actual source schema (#21844)
Browse files Browse the repository at this point in the history
* JDBC Sources: validate actual source schema

* add unit test

* updated test cases

* refactoring
  • Loading branch information
VitaliiMaltsev authored Feb 8, 2023
1 parent ba41bba commit 04b9703
Showing 1 changed file with 33 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.source.relationaldb;

import static io.airbyte.integrations.base.errors.messages.ErrorMessage.getErrorMessage;
import static io.airbyte.protocol.models.v0.CatalogHelpers.fieldsToJsonSchema;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -161,6 +162,8 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,

validateCursorFieldForIncrementalTables(fullyQualifiedTableNameToInfo, catalog, database);

logSourceSchemaChange(fullyQualifiedTableNameToInfo, catalog);

final List<AutoCloseableIterator<AirbyteMessage>> incrementalIterators =
getIncrementalIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager,
emittedAt);
Expand All @@ -180,6 +183,36 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
});
}

// in case of user manually modified source table schema but did not refresh it and save into the
// catalog - it can lead to sync failure. This method compare actual schema vs catalog schema
private void logSourceSchemaChange(Map<String, TableInfo<CommonField<DataType>>> fullyQualifiedTableNameToInfo,
ConfiguredAirbyteCatalog catalog) {
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
final AirbyteStream stream = airbyteStream.getStream();
final String fullyQualifiedTableName = getFullyQualifiedTableName(stream.getNamespace(),
stream.getName());
if (!fullyQualifiedTableNameToInfo.containsKey(fullyQualifiedTableName)) {
continue;
}
final TableInfo<CommonField<DataType>> table = fullyQualifiedTableNameToInfo.get(fullyQualifiedTableName);
final List<Field> fields = table.getFields()
.stream()
.map(this::toField)
.distinct()
.collect(Collectors.toList());
final JsonNode currentJsonSchema = fieldsToJsonSchema(fields);

final JsonNode catalogSchema = stream.getJsonSchema();
if (!catalogSchema.equals(currentJsonSchema)) {
LOGGER.warn(
"Source schema changed for table {}! Actual schema: {}. Catalog schema: {}",
fullyQualifiedTableName,
currentJsonSchema,
catalogSchema);
}
}
}

private void validateCursorFieldForIncrementalTables(
final Map<String, TableInfo<CommonField<DataType>>> tableNameToTable,
final ConfiguredAirbyteCatalog catalog,
Expand Down

0 comments on commit 04b9703

Please sign in to comment.