From fa92e3286c52decaff3fcc125a197a7c60d90673 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni <113392464+akashkulk@users.noreply.github.com> Date: Thu, 16 Mar 2023 14:56:32 -0700 Subject: [PATCH] Some more cleanup in abstract classes in preparation of splitting (#24164) --- .../source/relationaldb/AbstractDbSource.java | 83 +++++-------------- .../relationaldb/DbSourceDiscoverUtil.java | 45 +++++++++- .../state/StateGeneratorUtils.java | 4 + .../relationaldb/AbstractDbSourceTest.java | 8 +- 4 files changed, 72 insertions(+), 68 deletions(-) diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java index 8bb7fa4de1cd..090fd2968fd4 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java @@ -5,7 +5,6 @@ package io.airbyte.integrations.source.relationaldb; import static io.airbyte.integrations.base.errors.messages.ErrorMessage.getErrorMessage; -import static io.airbyte.protocol.models.v0.CatalogHelpers.fieldsToJsonSchema; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; @@ -28,11 +27,11 @@ import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.relationaldb.InvalidCursorInfoUtil.InvalidCursorInfo; import io.airbyte.integrations.source.relationaldb.models.DbState; +import io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils; import io.airbyte.integrations.source.relationaldb.state.StateManager; import io.airbyte.integrations.source.relationaldb.state.StateManagerFactory; import io.airbyte.integrations.util.ConnectorExceptionUtil; import io.airbyte.protocol.models.CommonField; -import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil.JsonSchemaPrimitive; import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.protocol.models.v0.AirbyteCatalog; @@ -61,7 +60,6 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; -import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import org.slf4j.Logger; @@ -134,7 +132,7 @@ public AutoCloseableIterator read(final JsonNode config, throws Exception { final StateManager stateManager = StateManagerFactory.createStateManager(getSupportedStateType(config), - deserializeInitialState(state, config), catalog); + deserializeInitialState(state, config, featureFlags.useStreamCapableState()), catalog); final Instant emittedAt = Instant.now(); final Database database = createDatabase(config); @@ -148,7 +146,7 @@ public AutoCloseableIterator read(final JsonNode config, validateCursorFieldForIncrementalTables(fullyQualifiedTableNameToInfo, catalog, database); - logSourceSchemaChange(fullyQualifiedTableNameToInfo, catalog); + DbSourceDiscoverUtil.logSourceSchemaChange(fullyQualifiedTableNameToInfo, catalog, this::getAirbyteType); final List> incrementalIterators = getIncrementalIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager, @@ -169,36 +167,6 @@ public AutoCloseableIterator read(final JsonNode config, }); } - // in case of user manually modified source table schema but did not refresh it and save into the - // catalog - it can lead to sync failure. This method compare actual schema vs catalog schema - private void logSourceSchemaChange(final Map>> fullyQualifiedTableNameToInfo, - final ConfiguredAirbyteCatalog catalog) { - for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) { - final AirbyteStream stream = airbyteStream.getStream(); - final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(stream.getNamespace(), - stream.getName()); - if (!fullyQualifiedTableNameToInfo.containsKey(fullyQualifiedTableName)) { - continue; - } - final TableInfo> table = fullyQualifiedTableNameToInfo.get(fullyQualifiedTableName); - final List fields = table.getFields() - .stream() - .map(this::toField) - .distinct() - .collect(Collectors.toList()); - final JsonNode currentJsonSchema = fieldsToJsonSchema(fields); - - final JsonNode catalogSchema = stream.getJsonSchema(); - if (!catalogSchema.equals(currentJsonSchema)) { - LOGGER.warn( - "Source schema changed for table {}! Actual schema: {}. Catalog schema: {}", - fullyQualifiedTableName, - currentJsonSchema, - catalogSchema); - } - } - } - private void validateCursorFieldForIncrementalTables( final Map>> tableNameToTable, final ConfiguredAirbyteCatalog catalog, @@ -310,7 +278,7 @@ private List> getFullRefreshIterators( tableNameToTable, stateManager, emittedAt, - configuredStream -> configuredStream.getSyncMode().equals(SyncMode.FULL_REFRESH)); + SyncMode.FULL_REFRESH); } protected List> getIncrementalIterators( @@ -325,7 +293,7 @@ protected List> getIncrementalIterators( tableNameToTable, stateManager, emittedAt, - configuredStream -> configuredStream.getSyncMode().equals(SyncMode.INCREMENTAL)); + SyncMode.INCREMENTAL); } /** @@ -336,7 +304,7 @@ protected List> getIncrementalIterators( * @param tableNameToTable Mapping of table name to table * @param stateManager Manager used to track the state of data synced by the connector * @param emittedAt Time when data was emitted from the Source database - * @param selector essentially a boolean that verifies if configuredStream has selected a sync mode + * @param syncMode the sync mode for which we want to grab the required iterators * @return List of AirbyteMessageIterators containing all iterators for a catalog */ private List> getSelectedIterators( @@ -345,10 +313,10 @@ private List> getSelectedIterators( final Map>> tableNameToTable, final StateManager stateManager, final Instant emittedAt, - final Predicate selector) { + final SyncMode syncMode) { final List> iteratorList = new ArrayList<>(); for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) { - if (selector.test(airbyteStream)) { + if (airbyteStream.getSyncMode().equals(syncMode)) { final AirbyteStream stream = airbyteStream.getStream(); final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(stream.getNamespace(), stream.getName()); @@ -522,11 +490,11 @@ private AutoCloseableIterator getFullRefreshStream(final Databas return getMessageIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli()); } - private AutoCloseableIterator getMessageIterator( - final AutoCloseableIterator recordIterator, - final String streamName, - final String namespace, - final long emittedAt) { + private static AutoCloseableIterator getMessageIterator( + final AutoCloseableIterator recordIterator, + final String streamName, + final String namespace, + final long emittedAt) { return AutoCloseableIterators.transform(recordIterator, r -> new AirbyteMessage() .withType(Type.RECORD) .withRecord(new AirbyteRecordMessage() @@ -536,16 +504,6 @@ private AutoCloseableIterator getMessageIterator( .withData(r))); } - private Field toField(final CommonField field) { - if (getAirbyteType(field.getType()) == JsonSchemaType.OBJECT && field.getProperties() != null - && !field.getProperties().isEmpty()) { - final var properties = field.getProperties().stream().map(this::toField).toList(); - return Field.of(field.getName(), getAirbyteType(field.getType()), properties); - } else { - return Field.of(field.getName(), getAirbyteType(field.getType())); - } - } - /** * @param database - The database where from privileges for tables will be consumed * @param schema - The schema where from privileges for tables will be consumed @@ -703,15 +661,18 @@ protected int getStateEmissionFrequency() { * @return The deserialized object representation of the state. */ protected List deserializeInitialState(final JsonNode initialStateJson, - final JsonNode config) { + final JsonNode config, + final boolean useStreamCapableState) { final Optional typedState = StateMessageHelper.getTypedState(initialStateJson, - featureFlags.useStreamCapableState()); + useStreamCapableState); return typedState.map((state) -> { switch (state.getStateType()) { case GLOBAL: - return List.of(convertStateMessage(state.getGlobal())); + return List.of(StateGeneratorUtils.convertStateMessage(state.getGlobal())); case STREAM: - return state.getStateMessages().stream().map(this::convertStateMessage).toList(); + return state.getStateMessages() + .stream() + .map(stateMessage -> StateGeneratorUtils.convertStateMessage(stateMessage)).toList(); case LEGACY: default: return List.of(new AirbyteStateMessage().withType(AirbyteStateType.LEGACY) @@ -720,10 +681,6 @@ protected List deserializeInitialState(final JsonNode initi }).orElse(generateEmptyInitialState(config)); } - protected AirbyteStateMessage convertStateMessage(final io.airbyte.protocol.models.AirbyteStateMessage state) { - return Jsons.object(Jsons.jsonNode(state), AirbyteStateMessage.class); - } - /** * Generates an empty, initial state for use by the connector. * diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/DbSourceDiscoverUtil.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/DbSourceDiscoverUtil.java index 0dd3bbfc8db4..15aa0239abef 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/DbSourceDiscoverUtil.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/DbSourceDiscoverUtil.java @@ -4,6 +4,9 @@ package io.airbyte.integrations.source.relationaldb; +import static io.airbyte.protocol.models.v0.CatalogHelpers.fieldsToJsonSchema; + +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Lists; import io.airbyte.protocol.models.CommonField; import io.airbyte.protocol.models.Field; @@ -11,6 +14,8 @@ import io.airbyte.protocol.models.v0.AirbyteCatalog; import io.airbyte.protocol.models.v0.AirbyteStream; import io.airbyte.protocol.models.v0.CatalogHelpers; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import io.airbyte.protocol.models.v0.SyncMode; import java.util.Collections; import java.util.List; @@ -18,9 +23,47 @@ import java.util.Objects; import java.util.function.Function; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +/** + * Contains utilities and helper classes for discovering schemas in database sources. + */ public class DbSourceDiscoverUtil { + private static final Logger LOGGER = LoggerFactory.getLogger(DbSourceDiscoverUtil.class); + + // In case of user manually modified source table schema but did not refresh it and save into the + // catalog - it can lead to sync failure. This method compare actual schema vs catalog schema + public static void logSourceSchemaChange(final Map>> fullyQualifiedTableNameToInfo, + final ConfiguredAirbyteCatalog catalog, + final Function airbyteTypeConverter) { + for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) { + final AirbyteStream stream = airbyteStream.getStream(); + final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(stream.getNamespace(), + stream.getName()); + if (!fullyQualifiedTableNameToInfo.containsKey(fullyQualifiedTableName)) { + continue; + } + final TableInfo> table = fullyQualifiedTableNameToInfo.get(fullyQualifiedTableName); + final List fields = table.getFields() + .stream() + .map(commonField -> toField(commonField, airbyteTypeConverter)) + .distinct() + .collect(Collectors.toList()); + final JsonNode currentJsonSchema = fieldsToJsonSchema(fields); + + final JsonNode catalogSchema = stream.getJsonSchema(); + if (!catalogSchema.equals(currentJsonSchema)) { + LOGGER.warn( + "Source schema changed for table {}! Actual schema: {}. Catalog schema: {}", + fullyQualifiedTableName, + currentJsonSchema, + catalogSchema); + } + } + } + public static AirbyteCatalog convertTableInfosToAirbyteCatalog(final List>> tableInfos, final Map> fullyQualifiedTableNameToPrimaryKeys, final Function airbyteTypeConverter) { @@ -71,7 +114,7 @@ public static String getFullyQualifiedTableName(final String nameSpace, final St return nameSpace != null ? nameSpace + "." + tableName : tableName; } - public static Field toField(final CommonField commonField, final Function airbyteTypeConverter) { + private static Field toField(final CommonField commonField, final Function airbyteTypeConverter) { if (airbyteTypeConverter.apply(commonField.getType()) == JsonSchemaType.OBJECT && commonField.getProperties() != null && !commonField.getProperties().isEmpty()) { final var properties = commonField.getProperties().stream().map(commField -> toField(commField, airbyteTypeConverter)).toList(); diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java index 9fdfa2ac0d67..9fb2da0445da 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java @@ -209,4 +209,8 @@ public static List convertLegacyStateToStreamState(final Ai .collect(Collectors.toList()); } + public static AirbyteStateMessage convertStateMessage(final io.airbyte.protocol.models.AirbyteStateMessage state) { + return Jsons.object(Jsons.jsonNode(state), AirbyteStateMessage.class); + } + } diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/AbstractDbSourceTest.java b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/AbstractDbSourceTest.java index 27b04b5770c2..b35e0cb65037 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/AbstractDbSourceTest.java +++ b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/AbstractDbSourceTest.java @@ -39,7 +39,7 @@ void testDeserializationOfLegacyState() throws IOException { final String legacyStateJson = MoreResources.readResource("states/legacy.json"); final JsonNode legacyState = Jsons.deserialize(legacyStateJson); - final List result = dbSource.deserializeInitialState(legacyState, config); + final List result = dbSource.deserializeInitialState(legacyState, config, false); assertEquals(1, result.size()); assertEquals(AirbyteStateType.LEGACY, result.get(0).getType()); } @@ -53,7 +53,7 @@ void testDeserializationOfGlobalState() throws IOException { final String globalStateJson = MoreResources.readResource("states/global.json"); final JsonNode globalState = Jsons.deserialize(globalStateJson); - final List result = dbSource.deserializeInitialState(globalState, config); + final List result = dbSource.deserializeInitialState(globalState, config, true); assertEquals(1, result.size()); assertEquals(AirbyteStateType.GLOBAL, result.get(0).getType()); } @@ -67,7 +67,7 @@ void testDeserializationOfStreamState() throws IOException { final String streamStateJson = MoreResources.readResource("states/per_stream.json"); final JsonNode streamState = Jsons.deserialize(streamStateJson); - final List result = dbSource.deserializeInitialState(streamState, config); + final List result = dbSource.deserializeInitialState(streamState, config, true); assertEquals(2, result.size()); assertEquals(AirbyteStateType.STREAM, result.get(0).getType()); } @@ -77,7 +77,7 @@ void testDeserializationOfNullState() throws IOException { final AbstractDbSource dbSource = spy(AbstractDbSource.class); final JsonNode config = mock(JsonNode.class); - final List result = dbSource.deserializeInitialState(null, config); + final List result = dbSource.deserializeInitialState(null, config, false); assertEquals(1, result.size()); assertEquals(dbSource.getSupportedStateType(config), result.get(0).getType()); }