Skip to content

Commit

Permalink
Some more cleanup in abstract classes in preparation of splitting (ai…
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk authored and adriennevermorel committed Mar 17, 2023
1 parent 6c02fb4 commit fa92e32
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.integrations.source.relationaldb;

import static io.airbyte.integrations.base.errors.messages.ErrorMessage.getErrorMessage;
import static io.airbyte.protocol.models.v0.CatalogHelpers.fieldsToJsonSchema;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
Expand All @@ -28,11 +27,11 @@
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.relationaldb.InvalidCursorInfoUtil.InvalidCursorInfo;
import io.airbyte.integrations.source.relationaldb.models.DbState;
import io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils;
import io.airbyte.integrations.source.relationaldb.state.StateManager;
import io.airbyte.integrations.source.relationaldb.state.StateManagerFactory;
import io.airbyte.integrations.util.ConnectorExceptionUtil;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil.JsonSchemaPrimitive;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
Expand Down Expand Up @@ -61,7 +60,6 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
Expand Down Expand Up @@ -134,7 +132,7 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
throws Exception {
final StateManager stateManager =
StateManagerFactory.createStateManager(getSupportedStateType(config),
deserializeInitialState(state, config), catalog);
deserializeInitialState(state, config, featureFlags.useStreamCapableState()), catalog);
final Instant emittedAt = Instant.now();

final Database database = createDatabase(config);
Expand All @@ -148,7 +146,7 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,

validateCursorFieldForIncrementalTables(fullyQualifiedTableNameToInfo, catalog, database);

logSourceSchemaChange(fullyQualifiedTableNameToInfo, catalog);
DbSourceDiscoverUtil.logSourceSchemaChange(fullyQualifiedTableNameToInfo, catalog, this::getAirbyteType);

final List<AutoCloseableIterator<AirbyteMessage>> incrementalIterators =
getIncrementalIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager,
Expand All @@ -169,36 +167,6 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
});
}

// in case of user manually modified source table schema but did not refresh it and save into the
// catalog - it can lead to sync failure. This method compare actual schema vs catalog schema
private void logSourceSchemaChange(final Map<String, TableInfo<CommonField<DataType>>> fullyQualifiedTableNameToInfo,
final ConfiguredAirbyteCatalog catalog) {
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
final AirbyteStream stream = airbyteStream.getStream();
final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(stream.getNamespace(),
stream.getName());
if (!fullyQualifiedTableNameToInfo.containsKey(fullyQualifiedTableName)) {
continue;
}
final TableInfo<CommonField<DataType>> table = fullyQualifiedTableNameToInfo.get(fullyQualifiedTableName);
final List<Field> fields = table.getFields()
.stream()
.map(this::toField)
.distinct()
.collect(Collectors.toList());
final JsonNode currentJsonSchema = fieldsToJsonSchema(fields);

final JsonNode catalogSchema = stream.getJsonSchema();
if (!catalogSchema.equals(currentJsonSchema)) {
LOGGER.warn(
"Source schema changed for table {}! Actual schema: {}. Catalog schema: {}",
fullyQualifiedTableName,
currentJsonSchema,
catalogSchema);
}
}
}

private void validateCursorFieldForIncrementalTables(
final Map<String, TableInfo<CommonField<DataType>>> tableNameToTable,
final ConfiguredAirbyteCatalog catalog,
Expand Down Expand Up @@ -310,7 +278,7 @@ private List<AutoCloseableIterator<AirbyteMessage>> getFullRefreshIterators(
tableNameToTable,
stateManager,
emittedAt,
configuredStream -> configuredStream.getSyncMode().equals(SyncMode.FULL_REFRESH));
SyncMode.FULL_REFRESH);
}

protected List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
Expand All @@ -325,7 +293,7 @@ protected List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
tableNameToTable,
stateManager,
emittedAt,
configuredStream -> configuredStream.getSyncMode().equals(SyncMode.INCREMENTAL));
SyncMode.INCREMENTAL);
}

/**
Expand All @@ -336,7 +304,7 @@ protected List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
* @param tableNameToTable Mapping of table name to table
* @param stateManager Manager used to track the state of data synced by the connector
* @param emittedAt Time when data was emitted from the Source database
* @param selector essentially a boolean that verifies if configuredStream has selected a sync mode
* @param syncMode the sync mode for which we want to grab the required iterators
* @return List of AirbyteMessageIterators containing all iterators for a catalog
*/
private List<AutoCloseableIterator<AirbyteMessage>> getSelectedIterators(
Expand All @@ -345,10 +313,10 @@ private List<AutoCloseableIterator<AirbyteMessage>> getSelectedIterators(
final Map<String, TableInfo<CommonField<DataType>>> tableNameToTable,
final StateManager stateManager,
final Instant emittedAt,
final Predicate<ConfiguredAirbyteStream> selector) {
final SyncMode syncMode) {
final List<AutoCloseableIterator<AirbyteMessage>> iteratorList = new ArrayList<>();
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
if (selector.test(airbyteStream)) {
if (airbyteStream.getSyncMode().equals(syncMode)) {
final AirbyteStream stream = airbyteStream.getStream();
final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(stream.getNamespace(),
stream.getName());
Expand Down Expand Up @@ -522,11 +490,11 @@ private AutoCloseableIterator<AirbyteMessage> getFullRefreshStream(final Databas
return getMessageIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli());
}

private AutoCloseableIterator<AirbyteMessage> getMessageIterator(
final AutoCloseableIterator<JsonNode> recordIterator,
final String streamName,
final String namespace,
final long emittedAt) {
private static AutoCloseableIterator<AirbyteMessage> getMessageIterator(
final AutoCloseableIterator<JsonNode> recordIterator,
final String streamName,
final String namespace,
final long emittedAt) {
return AutoCloseableIterators.transform(recordIterator, r -> new AirbyteMessage()
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage()
Expand All @@ -536,16 +504,6 @@ private AutoCloseableIterator<AirbyteMessage> getMessageIterator(
.withData(r)));
}

private Field toField(final CommonField<DataType> field) {
if (getAirbyteType(field.getType()) == JsonSchemaType.OBJECT && field.getProperties() != null
&& !field.getProperties().isEmpty()) {
final var properties = field.getProperties().stream().map(this::toField).toList();
return Field.of(field.getName(), getAirbyteType(field.getType()), properties);
} else {
return Field.of(field.getName(), getAirbyteType(field.getType()));
}
}

/**
* @param database - The database where from privileges for tables will be consumed
* @param schema - The schema where from privileges for tables will be consumed
Expand Down Expand Up @@ -703,15 +661,18 @@ protected int getStateEmissionFrequency() {
* @return The deserialized object representation of the state.
*/
protected List<AirbyteStateMessage> deserializeInitialState(final JsonNode initialStateJson,
final JsonNode config) {
final JsonNode config,
final boolean useStreamCapableState) {
final Optional<StateWrapper> typedState = StateMessageHelper.getTypedState(initialStateJson,
featureFlags.useStreamCapableState());
useStreamCapableState);
return typedState.map((state) -> {
switch (state.getStateType()) {
case GLOBAL:
return List.of(convertStateMessage(state.getGlobal()));
return List.of(StateGeneratorUtils.convertStateMessage(state.getGlobal()));
case STREAM:
return state.getStateMessages().stream().map(this::convertStateMessage).toList();
return state.getStateMessages()
.stream()
.map(stateMessage -> StateGeneratorUtils.convertStateMessage(stateMessage)).toList();
case LEGACY:
default:
return List.of(new AirbyteStateMessage().withType(AirbyteStateType.LEGACY)
Expand All @@ -720,10 +681,6 @@ protected List<AirbyteStateMessage> deserializeInitialState(final JsonNode initi
}).orElse(generateEmptyInitialState(config));
}

protected AirbyteStateMessage convertStateMessage(final io.airbyte.protocol.models.AirbyteStateMessage state) {
return Jsons.object(Jsons.jsonNode(state), AirbyteStateMessage.class);
}

/**
* Generates an empty, initial state for use by the connector.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,66 @@

package io.airbyte.integrations.source.relationaldb;

import static io.airbyte.protocol.models.v0.CatalogHelpers.fieldsToJsonSchema;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Contains utilities and helper classes for discovering schemas in database sources.
*/
public class DbSourceDiscoverUtil {

private static final Logger LOGGER = LoggerFactory.getLogger(DbSourceDiscoverUtil.class);

// In case of user manually modified source table schema but did not refresh it and save into the
// catalog - it can lead to sync failure. This method compare actual schema vs catalog schema
public static <DataType> void logSourceSchemaChange(final Map<String, TableInfo<CommonField<DataType>>> fullyQualifiedTableNameToInfo,
final ConfiguredAirbyteCatalog catalog,
final Function<DataType, JsonSchemaType> airbyteTypeConverter) {
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
final AirbyteStream stream = airbyteStream.getStream();
final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(stream.getNamespace(),
stream.getName());
if (!fullyQualifiedTableNameToInfo.containsKey(fullyQualifiedTableName)) {
continue;
}
final TableInfo<CommonField<DataType>> table = fullyQualifiedTableNameToInfo.get(fullyQualifiedTableName);
final List<Field> fields = table.getFields()
.stream()
.map(commonField -> toField(commonField, airbyteTypeConverter))
.distinct()
.collect(Collectors.toList());
final JsonNode currentJsonSchema = fieldsToJsonSchema(fields);

final JsonNode catalogSchema = stream.getJsonSchema();
if (!catalogSchema.equals(currentJsonSchema)) {
LOGGER.warn(
"Source schema changed for table {}! Actual schema: {}. Catalog schema: {}",
fullyQualifiedTableName,
currentJsonSchema,
catalogSchema);
}
}
}

public static <DataType> AirbyteCatalog convertTableInfosToAirbyteCatalog(final List<TableInfo<CommonField<DataType>>> tableInfos,
final Map<String, List<String>> fullyQualifiedTableNameToPrimaryKeys,
final Function<DataType, JsonSchemaType> airbyteTypeConverter) {
Expand Down Expand Up @@ -71,7 +114,7 @@ public static String getFullyQualifiedTableName(final String nameSpace, final St
return nameSpace != null ? nameSpace + "." + tableName : tableName;
}

public static <DataType> Field toField(final CommonField<DataType> commonField, final Function<DataType, JsonSchemaType> airbyteTypeConverter) {
private static <DataType> Field toField(final CommonField<DataType> commonField, final Function<DataType, JsonSchemaType> airbyteTypeConverter) {
if (airbyteTypeConverter.apply(commonField.getType()) == JsonSchemaType.OBJECT && commonField.getProperties() != null
&& !commonField.getProperties().isEmpty()) {
final var properties = commonField.getProperties().stream().map(commField -> toField(commField, airbyteTypeConverter)).toList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,8 @@ public static List<AirbyteStateMessage> convertLegacyStateToStreamState(final Ai
.collect(Collectors.toList());
}

public static AirbyteStateMessage convertStateMessage(final io.airbyte.protocol.models.AirbyteStateMessage state) {
return Jsons.object(Jsons.jsonNode(state), AirbyteStateMessage.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void testDeserializationOfLegacyState() throws IOException {
final String legacyStateJson = MoreResources.readResource("states/legacy.json");
final JsonNode legacyState = Jsons.deserialize(legacyStateJson);

final List<AirbyteStateMessage> result = dbSource.deserializeInitialState(legacyState, config);
final List<AirbyteStateMessage> result = dbSource.deserializeInitialState(legacyState, config, false);
assertEquals(1, result.size());
assertEquals(AirbyteStateType.LEGACY, result.get(0).getType());
}
Expand All @@ -53,7 +53,7 @@ void testDeserializationOfGlobalState() throws IOException {
final String globalStateJson = MoreResources.readResource("states/global.json");
final JsonNode globalState = Jsons.deserialize(globalStateJson);

final List<AirbyteStateMessage> result = dbSource.deserializeInitialState(globalState, config);
final List<AirbyteStateMessage> result = dbSource.deserializeInitialState(globalState, config, true);
assertEquals(1, result.size());
assertEquals(AirbyteStateType.GLOBAL, result.get(0).getType());
}
Expand All @@ -67,7 +67,7 @@ void testDeserializationOfStreamState() throws IOException {
final String streamStateJson = MoreResources.readResource("states/per_stream.json");
final JsonNode streamState = Jsons.deserialize(streamStateJson);

final List<AirbyteStateMessage> result = dbSource.deserializeInitialState(streamState, config);
final List<AirbyteStateMessage> result = dbSource.deserializeInitialState(streamState, config, true);
assertEquals(2, result.size());
assertEquals(AirbyteStateType.STREAM, result.get(0).getType());
}
Expand All @@ -77,7 +77,7 @@ void testDeserializationOfNullState() throws IOException {
final AbstractDbSource dbSource = spy(AbstractDbSource.class);
final JsonNode config = mock(JsonNode.class);

final List<AirbyteStateMessage> result = dbSource.deserializeInitialState(null, config);
final List<AirbyteStateMessage> result = dbSource.deserializeInitialState(null, config, false);
assertEquals(1, result.size());
assertEquals(dbSource.getSupportedStateType(config), result.get(0).getType());
}
Expand Down

0 comments on commit fa92e32

Please sign in to comment.