Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some more cleanup in abstract classes in preparation of splitting #24164

Merged
merged 3 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.integrations.source.relationaldb;

import static io.airbyte.integrations.base.errors.messages.ErrorMessage.getErrorMessage;
import static io.airbyte.protocol.models.v0.CatalogHelpers.fieldsToJsonSchema;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
Expand All @@ -28,11 +27,11 @@
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.relationaldb.InvalidCursorInfoUtil.InvalidCursorInfo;
import io.airbyte.integrations.source.relationaldb.models.DbState;
import io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils;
import io.airbyte.integrations.source.relationaldb.state.StateManager;
import io.airbyte.integrations.source.relationaldb.state.StateManagerFactory;
import io.airbyte.integrations.util.ConnectorExceptionUtil;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil.JsonSchemaPrimitive;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
Expand Down Expand Up @@ -61,7 +60,6 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
Expand Down Expand Up @@ -134,7 +132,7 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
throws Exception {
final StateManager stateManager =
StateManagerFactory.createStateManager(getSupportedStateType(config),
deserializeInitialState(state, config), catalog);
deserializeInitialState(state, config, featureFlags.useStreamCapableState()), catalog);
final Instant emittedAt = Instant.now();

final Database database = createDatabase(config);
Expand All @@ -148,7 +146,7 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,

validateCursorFieldForIncrementalTables(fullyQualifiedTableNameToInfo, catalog, database);

logSourceSchemaChange(fullyQualifiedTableNameToInfo, catalog);
DbSourceDiscoverUtil.logSourceSchemaChange(fullyQualifiedTableNameToInfo, catalog, this::getAirbyteType);

final List<AutoCloseableIterator<AirbyteMessage>> incrementalIterators =
getIncrementalIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager,
Expand All @@ -169,36 +167,6 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
});
}

// in case of user manually modified source table schema but did not refresh it and save into the
// catalog - it can lead to sync failure. This method compare actual schema vs catalog schema
private void logSourceSchemaChange(final Map<String, TableInfo<CommonField<DataType>>> fullyQualifiedTableNameToInfo,
final ConfiguredAirbyteCatalog catalog) {
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
final AirbyteStream stream = airbyteStream.getStream();
final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(stream.getNamespace(),
stream.getName());
if (!fullyQualifiedTableNameToInfo.containsKey(fullyQualifiedTableName)) {
continue;
}
final TableInfo<CommonField<DataType>> table = fullyQualifiedTableNameToInfo.get(fullyQualifiedTableName);
final List<Field> fields = table.getFields()
.stream()
.map(this::toField)
.distinct()
.collect(Collectors.toList());
final JsonNode currentJsonSchema = fieldsToJsonSchema(fields);

final JsonNode catalogSchema = stream.getJsonSchema();
if (!catalogSchema.equals(currentJsonSchema)) {
LOGGER.warn(
"Source schema changed for table {}! Actual schema: {}. Catalog schema: {}",
fullyQualifiedTableName,
currentJsonSchema,
catalogSchema);
}
}
}

private void validateCursorFieldForIncrementalTables(
final Map<String, TableInfo<CommonField<DataType>>> tableNameToTable,
final ConfiguredAirbyteCatalog catalog,
Expand Down Expand Up @@ -310,7 +278,7 @@ private List<AutoCloseableIterator<AirbyteMessage>> getFullRefreshIterators(
tableNameToTable,
stateManager,
emittedAt,
configuredStream -> configuredStream.getSyncMode().equals(SyncMode.FULL_REFRESH));
SyncMode.FULL_REFRESH);
}

protected List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
Expand All @@ -325,7 +293,7 @@ protected List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
tableNameToTable,
stateManager,
emittedAt,
configuredStream -> configuredStream.getSyncMode().equals(SyncMode.INCREMENTAL));
SyncMode.INCREMENTAL);
}

/**
Expand All @@ -336,7 +304,7 @@ protected List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
* @param tableNameToTable Mapping of table name to table
* @param stateManager Manager used to track the state of data synced by the connector
* @param emittedAt Time when data was emitted from the Source database
* @param selector essentially a boolean that verifies if configuredStream has selected a sync mode
* @param syncMode the sync mode for which we want to grab the required iterators
* @return List of AirbyteMessageIterators containing all iterators for a catalog
*/
private List<AutoCloseableIterator<AirbyteMessage>> getSelectedIterators(
Expand All @@ -345,10 +313,10 @@ private List<AutoCloseableIterator<AirbyteMessage>> getSelectedIterators(
final Map<String, TableInfo<CommonField<DataType>>> tableNameToTable,
final StateManager stateManager,
final Instant emittedAt,
final Predicate<ConfiguredAirbyteStream> selector) {
final SyncMode syncMode) {
final List<AutoCloseableIterator<AirbyteMessage>> iteratorList = new ArrayList<>();
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
if (selector.test(airbyteStream)) {
if (airbyteStream.getSyncMode().equals(syncMode)) {
final AirbyteStream stream = airbyteStream.getStream();
final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(stream.getNamespace(),
stream.getName());
Expand Down Expand Up @@ -522,11 +490,11 @@ private AutoCloseableIterator<AirbyteMessage> getFullRefreshStream(final Databas
return getMessageIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli());
}

private AutoCloseableIterator<AirbyteMessage> getMessageIterator(
final AutoCloseableIterator<JsonNode> recordIterator,
final String streamName,
final String namespace,
final long emittedAt) {
private static AutoCloseableIterator<AirbyteMessage> getMessageIterator(
final AutoCloseableIterator<JsonNode> recordIterator,
final String streamName,
final String namespace,
final long emittedAt) {
return AutoCloseableIterators.transform(recordIterator, r -> new AirbyteMessage()
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage()
Expand All @@ -536,16 +504,6 @@ private AutoCloseableIterator<AirbyteMessage> getMessageIterator(
.withData(r)));
}

private Field toField(final CommonField<DataType> field) {
if (getAirbyteType(field.getType()) == JsonSchemaType.OBJECT && field.getProperties() != null
&& !field.getProperties().isEmpty()) {
final var properties = field.getProperties().stream().map(this::toField).toList();
return Field.of(field.getName(), getAirbyteType(field.getType()), properties);
} else {
return Field.of(field.getName(), getAirbyteType(field.getType()));
}
}

/**
* @param database - The database where from privileges for tables will be consumed
* @param schema - The schema where from privileges for tables will be consumed
Expand Down Expand Up @@ -703,15 +661,18 @@ protected int getStateEmissionFrequency() {
* @return The deserialized object representation of the state.
*/
protected List<AirbyteStateMessage> deserializeInitialState(final JsonNode initialStateJson,
final JsonNode config) {
final JsonNode config,
final boolean useStreamCapableState) {
final Optional<StateWrapper> typedState = StateMessageHelper.getTypedState(initialStateJson,
featureFlags.useStreamCapableState());
useStreamCapableState);
return typedState.map((state) -> {
switch (state.getStateType()) {
case GLOBAL:
return List.of(convertStateMessage(state.getGlobal()));
return List.of(StateGeneratorUtils.convertStateMessage(state.getGlobal()));
case STREAM:
return state.getStateMessages().stream().map(this::convertStateMessage).toList();
return state.getStateMessages()
.stream()
.map(stateMessage -> StateGeneratorUtils.convertStateMessage(stateMessage)).toList();
case LEGACY:
default:
return List.of(new AirbyteStateMessage().withType(AirbyteStateType.LEGACY)
Expand All @@ -720,10 +681,6 @@ protected List<AirbyteStateMessage> deserializeInitialState(final JsonNode initi
}).orElse(generateEmptyInitialState(config));
}

protected AirbyteStateMessage convertStateMessage(final io.airbyte.protocol.models.AirbyteStateMessage state) {
return Jsons.object(Jsons.jsonNode(state), AirbyteStateMessage.class);
}

/**
* Generates an empty, initial state for use by the connector.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,66 @@

package io.airbyte.integrations.source.relationaldb;

import static io.airbyte.protocol.models.v0.CatalogHelpers.fieldsToJsonSchema;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Contains utilities and helper classes for discovering schemas in database sources.
*/
public class DbSourceDiscoverUtil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm good with leaving testing of this class to blackbox at the connector level for now. But we should unit test this at some point.
Also, making these methods static is appropriate here, but I always have run into an issue trying to test static methods.


private static final Logger LOGGER = LoggerFactory.getLogger(DbSourceDiscoverUtil.class);

// In case of user manually modified source table schema but did not refresh it and save into the
// catalog - it can lead to sync failure. This method compare actual schema vs catalog schema
public static <DataType> void logSourceSchemaChange(final Map<String, TableInfo<CommonField<DataType>>> fullyQualifiedTableNameToInfo,
final ConfiguredAirbyteCatalog catalog,
final Function<DataType, JsonSchemaType> airbyteTypeConverter) {
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
final AirbyteStream stream = airbyteStream.getStream();
final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(stream.getNamespace(),
stream.getName());
if (!fullyQualifiedTableNameToInfo.containsKey(fullyQualifiedTableName)) {
continue;
}
final TableInfo<CommonField<DataType>> table = fullyQualifiedTableNameToInfo.get(fullyQualifiedTableName);
final List<Field> fields = table.getFields()
.stream()
.map(commonField -> toField(commonField, airbyteTypeConverter))
.distinct()
.collect(Collectors.toList());
final JsonNode currentJsonSchema = fieldsToJsonSchema(fields);

final JsonNode catalogSchema = stream.getJsonSchema();
if (!catalogSchema.equals(currentJsonSchema)) {
LOGGER.warn(
"Source schema changed for table {}! Actual schema: {}. Catalog schema: {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra space between table {} and Catalog schema: {}

fullyQualifiedTableName,
currentJsonSchema,
catalogSchema);
}
}
}

public static <DataType> AirbyteCatalog convertTableInfosToAirbyteCatalog(final List<TableInfo<CommonField<DataType>>> tableInfos,
final Map<String, List<String>> fullyQualifiedTableNameToPrimaryKeys,
final Function<DataType, JsonSchemaType> airbyteTypeConverter) {
Expand Down Expand Up @@ -71,7 +114,7 @@ public static String getFullyQualifiedTableName(final String nameSpace, final St
return nameSpace != null ? nameSpace + "." + tableName : tableName;
}

public static <DataType> Field toField(final CommonField<DataType> commonField, final Function<DataType, JsonSchemaType> airbyteTypeConverter) {
private static <DataType> Field toField(final CommonField<DataType> commonField, final Function<DataType, JsonSchemaType> airbyteTypeConverter) {
if (airbyteTypeConverter.apply(commonField.getType()) == JsonSchemaType.OBJECT && commonField.getProperties() != null
&& !commonField.getProperties().isEmpty()) {
final var properties = commonField.getProperties().stream().map(commField -> toField(commField, airbyteTypeConverter)).toList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,8 @@ public static List<AirbyteStateMessage> convertLegacyStateToStreamState(final Ai
.collect(Collectors.toList());
}

public static AirbyteStateMessage convertStateMessage(final io.airbyte.protocol.models.AirbyteStateMessage state) {
return Jsons.object(Jsons.jsonNode(state), AirbyteStateMessage.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void testDeserializationOfLegacyState() throws IOException {
final String legacyStateJson = MoreResources.readResource("states/legacy.json");
final JsonNode legacyState = Jsons.deserialize(legacyStateJson);

final List<AirbyteStateMessage> result = dbSource.deserializeInitialState(legacyState, config);
final List<AirbyteStateMessage> result = dbSource.deserializeInitialState(legacyState, config, false);
assertEquals(1, result.size());
assertEquals(AirbyteStateType.LEGACY, result.get(0).getType());
}
Expand All @@ -53,7 +53,7 @@ void testDeserializationOfGlobalState() throws IOException {
final String globalStateJson = MoreResources.readResource("states/global.json");
final JsonNode globalState = Jsons.deserialize(globalStateJson);

final List<AirbyteStateMessage> result = dbSource.deserializeInitialState(globalState, config);
final List<AirbyteStateMessage> result = dbSource.deserializeInitialState(globalState, config, true);
assertEquals(1, result.size());
assertEquals(AirbyteStateType.GLOBAL, result.get(0).getType());
}
Expand All @@ -67,7 +67,7 @@ void testDeserializationOfStreamState() throws IOException {
final String streamStateJson = MoreResources.readResource("states/per_stream.json");
final JsonNode streamState = Jsons.deserialize(streamStateJson);

final List<AirbyteStateMessage> result = dbSource.deserializeInitialState(streamState, config);
final List<AirbyteStateMessage> result = dbSource.deserializeInitialState(streamState, config, true);
assertEquals(2, result.size());
assertEquals(AirbyteStateType.STREAM, result.get(0).getType());
}
Expand All @@ -77,7 +77,7 @@ void testDeserializationOfNullState() throws IOException {
final AbstractDbSource dbSource = spy(AbstractDbSource.class);
final JsonNode config = mock(JsonNode.class);

final List<AirbyteStateMessage> result = dbSource.deserializeInitialState(null, config);
final List<AirbyteStateMessage> result = dbSource.deserializeInitialState(null, config, false);
assertEquals(1, result.size());
assertEquals(dbSource.getSupportedStateType(config), result.get(0).getType());
}
Expand Down