diff --git a/airbyte-ci/connectors_ci/metadata_service/lib/metadata_service/models/src/AllowedHosts.yaml b/airbyte-ci/connectors_ci/metadata_service/lib/metadata_service/models/src/AllowedHosts.yaml index 27411af7b098..670cacfea193 100644 --- a/airbyte-ci/connectors_ci/metadata_service/lib/metadata_service/models/src/AllowedHosts.yaml +++ b/airbyte-ci/connectors_ci/metadata_service/lib/metadata_service/models/src/AllowedHosts.yaml @@ -10,4 +10,4 @@ properties: type: array description: An array of hosts that this connector can connect to. AllowedHosts not being present for the source or destination means that access to all hosts is allowed. An empty list here means that no network access is granted. items: - type: string \ No newline at end of file + type: string diff --git a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_catalog_no_id_override.yaml b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_catalog_no_id_override.yaml index d953c372751c..7b4d4f78dbde 100644 --- a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_catalog_no_id_override.yaml +++ b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_catalog_no_id_override.yaml @@ -18,4 +18,3 @@ data: oss: enabled: true definitionId: woohoo - diff --git a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_catalog_no_type_override.yaml b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_catalog_no_type_override.yaml index 46a5442a497d..e128eac11b1c 100644 --- a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_catalog_no_type_override.yaml +++ b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_catalog_no_type_override.yaml @@ -18,4 +18,3 @@ data: oss: enabled: true connectorType: destination - diff --git a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_catalog_unknown_override.yaml b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_catalog_unknown_override.yaml index 280a79bb7fae..7cf82bbce81f 100644 --- a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_catalog_unknown_override.yaml +++ b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_catalog_unknown_override.yaml @@ -18,4 +18,3 @@ data: oss: enabled: true what: is this? - diff --git a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_missing_connector_type.yaml b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_missing_connector_type.yaml index 35a64964cc3b..f245282f51c1 100644 --- a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_missing_connector_type.yaml +++ b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_missing_connector_type.yaml @@ -9,4 +9,3 @@ data: sourceType: database releaseStage: generally_available license: MIT - diff --git a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_missing_definition_id.yaml b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_missing_definition_id.yaml index 3570e877d1c6..8201f66c3d7f 100644 --- a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_missing_definition_id.yaml +++ b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_missing_definition_id.yaml @@ -13,4 +13,3 @@ data: hosts: - "${host}" - "${tunnel_method.tunnel_host}" - diff --git a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_missing_docker_image.yaml b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_missing_docker_image.yaml index 8441b7ffc67d..0653cd8bfd9f 100644 --- a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_missing_docker_image.yaml +++ b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_missing_docker_image.yaml @@ -13,4 +13,3 @@ data: hosts: - "${host}" - "${tunnel_method.tunnel_host}" - diff --git a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_missing_docker_version.yaml b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_missing_docker_version.yaml index 8441b7ffc67d..0653cd8bfd9f 100644 --- a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_missing_docker_version.yaml +++ b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_missing_docker_version.yaml @@ -13,4 +13,3 @@ data: hosts: - "${host}" - "${tunnel_method.tunnel_host}" - diff --git a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_missing_version.yaml b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_missing_version.yaml index 21a08d6216ac..629482fa830e 100644 --- a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_missing_version.yaml +++ b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/metadata_missing_version.yaml @@ -13,4 +13,3 @@ data: hosts: - "${host}" - "${tunnel_method.tunnel_host}" - diff --git a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/nonsense.json b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/nonsense.json index 28d44e9bf015..ad869b38385f 100644 --- a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/nonsense.json +++ b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/nonsense.json @@ -1 +1 @@ -asdsad \ No newline at end of file +asdsad diff --git a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/nonsense.yaml b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/nonsense.yaml index 28d44e9bf015..ad869b38385f 100644 --- a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/nonsense.yaml +++ b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/invalid/nonsense.yaml @@ -1 +1 @@ -asdsad \ No newline at end of file +asdsad diff --git a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/valid/metadata_catalog_allowed_hosts.yaml b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/valid/metadata_catalog_allowed_hosts.yaml index 8655982aab77..3888b001315e 100644 --- a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/valid/metadata_catalog_allowed_hosts.yaml +++ b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/valid/metadata_catalog_allowed_hosts.yaml @@ -14,4 +14,3 @@ data: hosts: - "${host}" - "${tunnel_method.tunnel_host}" - diff --git a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/valid/metadata_catalog_complex_override.yaml b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/valid/metadata_catalog_complex_override.yaml index 994057e6204f..860c498537a0 100644 --- a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/valid/metadata_catalog_complex_override.yaml +++ b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/valid/metadata_catalog_complex_override.yaml @@ -27,5 +27,3 @@ data: resourceRequirements: memory_request: 1Gi memory_limit: 1Gi - - diff --git a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/valid/metadata_catalog_enabled.yaml b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/valid/metadata_catalog_enabled.yaml index 4cd0d99e0688..5e76fa22b9f4 100644 --- a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/valid/metadata_catalog_enabled.yaml +++ b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/valid/metadata_catalog_enabled.yaml @@ -17,4 +17,3 @@ data: catalogs: oss: enabled: true - diff --git a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/valid/metadata_catalog_required_resources.yaml b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/valid/metadata_catalog_required_resources.yaml index 994f2dee4113..fc25d8250265 100644 --- a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/valid/metadata_catalog_required_resources.yaml +++ b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/valid/metadata_catalog_required_resources.yaml @@ -16,4 +16,3 @@ data: resourceRequirements: memory_request: 1Gi memory_limit: 1Gi - diff --git a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/valid/metadata_simple.yaml b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/valid/metadata_simple.yaml index cba1e20b4ae7..3bb2343e2631 100644 --- a/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/valid/metadata_simple.yaml +++ b/airbyte-ci/connectors_ci/metadata_service/lib/tests/fixtures/valid/metadata_simple.yaml @@ -10,4 +10,3 @@ data: sourceType: database releaseStage: generally_available license: MIT - diff --git a/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java b/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java index 4d79a1ca10fa..1d3cdcaa41b0 100644 --- a/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java +++ b/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java @@ -12,6 +12,7 @@ import com.google.cloud.bigquery.QueryParameterValue; import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.Table; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.json.Jsons; @@ -33,7 +34,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; import org.slf4j.Logger; @@ -51,17 +51,6 @@ public class BigQuerySource extends AbstractDbSource getExcludedInternalNameSpaces() { - return Collections.emptySet(); - } - @Override protected List>> discoverInternal(final BigQueryDatabase database) throws Exception { - return discoverInternal(database, null); - } - - @Override - protected List>> discoverInternal(final BigQueryDatabase database, final String schema) { final String projectId = dbConfig.get(CONFIG_PROJECT_ID).asText(); final List tables = (isDatasetConfigured(database) ? database.getDatasetTables(getConfigDatasetId(database)) : database.getProjectTables(projectId)); final List>> result = new ArrayList<>(); tables.stream().map(table -> TableInfo.>builder() - .nameSpace(table.getTableId().getDataset()) - .name(table.getTableId().getTable()) - .fields(Objects.requireNonNull(table.getDefinition().getSchema()).getFields().stream() - .map(f -> { - final StandardSQLTypeName standardType = f.getType().getStandardType(); - return new CommonField<>(f.getName(), standardType); - }) - .collect(Collectors.toList())) - .build()) + .nameSpace(table.getTableId().getDataset()) + .name(table.getTableId().getTable()) + .fields(Objects.requireNonNull(table.getDefinition().getSchema()).getFields().stream() + .map(f -> { + final StandardSQLTypeName standardType = f.getType().getStandardType(); + return new CommonField<>(f.getName(), standardType); + }) + .collect(Collectors.toList())) + .build()) .forEach(result::add); return result; } @@ -135,8 +114,7 @@ protected Map> discoverPrimaryKeys(final BigQueryDatabase d return Collections.emptyMap(); } - @Override - protected String getQuoteString() { + private String getQuoteString() { return QUOTE; } @@ -192,6 +170,17 @@ private String getConfigDatasetId(final SqlDatabase database) { return (isDatasetConfigured(database) ? database.getSourceConfig().get(CONFIG_DATASET_ID).asText() : ""); } + @VisibleForTesting + protected JsonNode toDatabaseConfig(final JsonNode config) { + final var conf = ImmutableMap.builder() + .put(CONFIG_PROJECT_ID, config.get(CONFIG_PROJECT_ID).asText()) + .put(CONFIG_CREDS, config.get(CONFIG_CREDS).asText()); + if (config.hasNonNull(CONFIG_DATASET_ID)) { + conf.put(CONFIG_DATASET_ID, config.get(CONFIG_DATASET_ID).asText()); + } + return Jsons.jsonNode(conf.build()); + } + public static void main(final String[] args) throws Exception { final Source source = new BigQuerySource(); LOGGER.info("starting source: {}", BigQuerySource.class); diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index 371e61739d3d..d1783830ee98 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -42,7 +42,7 @@ import io.airbyte.db.jdbc.streaming.JdbcStreamingQueryConfig; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto; -import io.airbyte.integrations.source.relationaldb.AbstractDbSource; +import io.airbyte.integrations.source.relationaldb.AbstractJdbcBaseSource; import io.airbyte.integrations.source.relationaldb.CursorInfo; import io.airbyte.integrations.source.relationaldb.TableInfo; import io.airbyte.integrations.source.relationaldb.state.StateManager; @@ -79,7 +79,7 @@ * relational DB source which can be accessed via JDBC driver. If you are implementing a connector * for a relational DB which has a JDBC driver, make an effort to use this class. */ -public abstract class AbstractJdbcSource extends AbstractDbSource implements Source { +public abstract class AbstractJdbcSource extends AbstractJdbcBaseSource implements Source { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJdbcSource.class); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java index 4510fb0bbbf7..f389c392ac07 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java @@ -29,7 +29,6 @@ import io.airbyte.protocol.models.CommonField; import io.airbyte.protocol.models.JsonSchemaType; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -54,18 +53,6 @@ public static void main(final String[] args) throws Exception { LOGGER.info("completed source: {}", MongoDbSource.class); } - @Override - public JsonNode toDatabaseConfig(final JsonNode config) { - final var credentials = config.has(MongoUtils.USER) && config.has(JdbcUtils.PASSWORD_KEY) - ? String.format("%s:%s@", config.get(MongoUtils.USER).asText(), config.get(JdbcUtils.PASSWORD_KEY).asText()) - : StringUtils.EMPTY; - - return Jsons.jsonNode(ImmutableMap.builder() - .put("connectionString", buildConnectionString(config, credentials)) - .put(JdbcUtils.DATABASE_KEY, config.get(JdbcUtils.DATABASE_KEY).asText()) - .build()); - } - @Override protected MongoDatabase createDatabase(final JsonNode sourceConfig) throws Exception { final var dbConfig = toDatabaseConfig(sourceConfig); @@ -94,11 +81,6 @@ protected JsonSchemaType getAirbyteType(final BsonType fieldType) { return MongoUtils.getType(fieldType); } - @Override - public Set getExcludedInternalNameSpaces() { - return Collections.emptySet(); - } - @Override protected List>> discoverInternal(final MongoDatabase database) throws Exception { @@ -122,6 +104,45 @@ protected List>> discoverInternal(final MongoDat return tableInfos; } + @Override + protected Map> discoverPrimaryKeys(final MongoDatabase database, + final List>> tableInfos) { + return tableInfos.stream() + .collect(Collectors.toMap( + TableInfo::getName, + TableInfo::getPrimaryKeys)); + } + + @Override + public AutoCloseableIterator queryTableFullRefresh(final MongoDatabase database, + final List columnNames, + final String schemaName, + final String tableName) { + return queryTable(database, columnNames, tableName, null); + } + + @Override + public AutoCloseableIterator queryTableIncremental(final MongoDatabase database, + final List columnNames, + final String schemaName, + final String tableName, + final CursorInfo cursorInfo, + final BsonType cursorFieldType) { + final Bson greaterComparison = gt(cursorInfo.getCursorField(), MongoUtils.getBsonValue(cursorFieldType, cursorInfo.getCursor())); + return queryTable(database, columnNames, tableName, greaterComparison); + } + + private JsonNode toDatabaseConfig(final JsonNode config) { + final var credentials = config.has(MongoUtils.USER) && config.has(JdbcUtils.PASSWORD_KEY) + ? String.format("%s:%s@", config.get(MongoUtils.USER).asText(), config.get(JdbcUtils.PASSWORD_KEY).asText()) + : StringUtils.EMPTY; + + return Jsons.jsonNode(ImmutableMap.builder() + .put("connectionString", buildConnectionString(config, credentials)) + .put(JdbcUtils.DATABASE_KEY, config.get(JdbcUtils.DATABASE_KEY).asText()) + .build()); + } + private Set getAuthorizedCollections(final MongoDatabase database) { /* * db.runCommand ({listCollections: 1.0, authorizedCollections: true, nameOnly: true }) the command @@ -150,45 +171,6 @@ private Set getAuthorizedCollections(final MongoDatabase database) { } } - @Override - protected List>> discoverInternal(final MongoDatabase database, final String schema) throws Exception { - // MondoDb doesn't support schemas - return discoverInternal(database); - } - - @Override - protected Map> discoverPrimaryKeys(final MongoDatabase database, - final List>> tableInfos) { - return tableInfos.stream() - .collect(Collectors.toMap( - TableInfo::getName, - TableInfo::getPrimaryKeys)); - } - - @Override - protected String getQuoteString() { - return ""; - } - - @Override - public AutoCloseableIterator queryTableFullRefresh(final MongoDatabase database, - final List columnNames, - final String schemaName, - final String tableName) { - return queryTable(database, columnNames, tableName, null); - } - - @Override - public AutoCloseableIterator queryTableIncremental(final MongoDatabase database, - final List columnNames, - final String schemaName, - final String tableName, - final CursorInfo cursorInfo, - final BsonType cursorFieldType) { - final Bson greaterComparison = gt(cursorInfo.getCursorField(), MongoUtils.getBsonValue(cursorFieldType, cursorInfo.getCursor())); - return queryTable(database, columnNames, tableName, greaterComparison); - } - @Override public boolean isCursorType(final BsonType bsonType) { // while reading from mongo primary key "id" is always added, so there will be no situation diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java index 090fd2968fd4..9c6044283192 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java @@ -21,7 +21,6 @@ import io.airbyte.config.helpers.StateMessageHelper; import io.airbyte.db.AbstractDatabase; import io.airbyte.db.IncrementalUtils; -import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.BaseConnector; import io.airbyte.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.integrations.base.Source; @@ -52,7 +51,6 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -106,7 +104,7 @@ public AirbyteConnectionStatus check(final JsonNode config) throws Exception { public AirbyteCatalog discover(final JsonNode config) throws Exception { try { final Database database = createDatabase(config); - final List>> tableInfos = discoverWithoutSystemTables(database); + final List>> tableInfos = discoverInternal(database); final Map> fullyQualifiedTableNameToPrimaryKeys = discoverPrimaryKeys( database, tableInfos); return DbSourceDiscoverUtil.convertTableInfosToAirbyteCatalog(tableInfos, fullyQualifiedTableNameToPrimaryKeys, this::getAirbyteType); @@ -131,14 +129,14 @@ public AutoCloseableIterator read(final JsonNode config, final JsonNode state) throws Exception { final StateManager stateManager = - StateManagerFactory.createStateManager(getSupportedStateType(config), + StateManagerFactory.createStateManager(AirbyteStateType.LEGACY, deserializeInitialState(state, config, featureFlags.useStreamCapableState()), catalog); final Instant emittedAt = Instant.now(); final Database database = createDatabase(config); final Map>> fullyQualifiedTableNameToInfo = - discoverWithoutSystemTables(database) + discoverInternal(database) .stream() .collect(Collectors.toMap(t -> String.format("%s.%s", t.getNameSpace(), t.getName()), Function @@ -149,11 +147,22 @@ public AutoCloseableIterator read(final JsonNode config, DbSourceDiscoverUtil.logSourceSchemaChange(fullyQualifiedTableNameToInfo, catalog, this::getAirbyteType); final List> incrementalIterators = - getIncrementalIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager, - emittedAt); + getSelectedIterators( + database, + catalog, + fullyQualifiedTableNameToInfo, + stateManager, + emittedAt, + SyncMode.FULL_REFRESH); final List> fullRefreshIterators = - getFullRefreshIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager, - emittedAt); + getSelectedIterators( + database, + catalog, + fullyQualifiedTableNameToInfo, + stateManager, + emittedAt, + SyncMode.INCREMENTAL); + final List> iteratorList = Stream .of(incrementalIterators, fullRefreshIterators) .flatMap(Collection::stream) @@ -230,72 +239,6 @@ protected boolean verifyCursorColumnValues(final Database database, final String return true; } - /** - * Estimates the total volume (rows and bytes) to sync and emits a - * {@link AirbyteEstimateTraceMessage} associated with the full refresh stream. - * - * @param database database - */ - protected void estimateFullRefreshSyncSize(final Database database, - final ConfiguredAirbyteStream configuredAirbyteStream) { - /* no-op */ - } - - /** - * Estimates the total volume (rows and bytes) to sync and emits a - * {@link AirbyteEstimateTraceMessage} associated with an incremental stream. - * - * @param database database - */ - protected void estimateIncrementalSyncSize(final Database database, - final ConfiguredAirbyteStream configuredAirbyteStream, - final CursorInfo cursorInfo, - final DataType dataType) { - /* no-op */ - } - - private List>> discoverWithoutSystemTables( - final Database database) - throws Exception { - final Set systemNameSpaces = getExcludedInternalNameSpaces(); - final Set systemViews = getExcludedViews(); - final List>> discoveredTables = discoverInternal(database); - return (systemNameSpaces == null || systemNameSpaces.isEmpty() ? discoveredTables - : discoveredTables.stream() - .filter(table -> !systemNameSpaces.contains(table.getNameSpace()) && !systemViews.contains(table.getName())).collect( - Collectors.toList())); - } - - private List> getFullRefreshIterators( - final Database database, - final ConfiguredAirbyteCatalog catalog, - final Map>> tableNameToTable, - final StateManager stateManager, - final Instant emittedAt) { - return getSelectedIterators( - database, - catalog, - tableNameToTable, - stateManager, - emittedAt, - SyncMode.FULL_REFRESH); - } - - protected List> getIncrementalIterators( - final Database database, - final ConfiguredAirbyteCatalog catalog, - final Map>> tableNameToTable, - final StateManager stateManager, - final Instant emittedAt) { - return getSelectedIterators( - database, - catalog, - tableNameToTable, - stateManager, - emittedAt, - SyncMode.INCREMENTAL); - } - /** * Creates a list of read iterators for each stream within an ConfiguredAirbyteCatalog * @@ -384,8 +327,6 @@ private AutoCloseableIterator createReadIterator(final Database cursorInfo.get(), emittedAt); } else { - // if no cursor is present then this is the first read for is the same as doing a full refresh read. - estimateFullRefreshSyncSize(database, airbyteStream); airbyteMessageIterator = getFullRefreshStream(database, streamName, namespace, selectedDatabaseFields, table, emittedAt); } @@ -404,7 +345,6 @@ private AutoCloseableIterator createReadIterator(final Database getStateEmissionFrequency()), airbyteMessageIterator); } else if (airbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) { - estimateFullRefreshSyncSize(database, airbyteStream); iterator = getFullRefreshStream(database, streamName, namespace, selectedDatabaseFields, table, emittedAt); } else if (airbyteStream.getSyncMode() == null) { @@ -454,7 +394,6 @@ private AutoCloseableIterator getIncrementalStream(final Databas table.getFields().stream().anyMatch(f -> f.getName().equals(cursorField)), String.format("Could not find cursor field %s in table %s", cursorField, table.getName())); - estimateIncrementalSyncSize(database, airbyteStream, cursorInfo, cursorType); final AutoCloseableIterator queryIterator = queryTableIncremental( database, selectedDatabaseFields, @@ -504,29 +443,6 @@ private static AutoCloseableIterator getMessageIterator( .withData(r))); } - /** - * @param database - The database where from privileges for tables will be consumed - * @param schema - The schema where from privileges for tables will be consumed - * @return Set with privileges for tables for current DB-session user The method is responsible for - * SELECT-ing the table with privileges. In some cases such SELECT doesn't require (e.g. in - * Oracle DB - the schema is the user, you cannot REVOKE a privilege on a table from its - * owner). - */ - protected Set getPrivilegesTableForCurrentUser(final JdbcDatabase database, - final String schema) - throws SQLException { - return Collections.emptySet(); - } - - /** - * Map a database implementation-specific configuration to json object that adheres to the database - * config spec. See resources/spec.json. - * - * @param config database implementation-specific configuration. - * @return database spec config - */ - public abstract JsonNode toDatabaseConfig(JsonNode config); - /** * Creates a database instance using the database spec config. * @@ -552,22 +468,6 @@ protected abstract List> getCheckOperations */ protected abstract JsonSchemaType getAirbyteType(DataType columnType); - /** - * Get list of system namespaces(schemas) in order to exclude them from the `discover` result list. - * - * @return set of system namespaces(schemas) to be excluded - */ - protected abstract Set getExcludedInternalNameSpaces(); - - /** - * Get list of system views in order to exclude them from the `discover` result list. - * - * @return set of views to be excluded - */ - protected Set getExcludedViews() { - return Collections.emptySet(); - }; - /** * Discover all available tables in the source database. * @@ -579,19 +479,6 @@ protected abstract List>> discoverInternal( final Database database) throws Exception; - /** - * Discovers all available tables within a schema in the source database. - * - * @param database - source database - * @param schema - source schema - * @return list of source tables - * @throws Exception - access to the database might lead to exceptions. - */ - protected abstract List>> discoverInternal( - final Database database, - String schema) - throws Exception; - /** * Discover Primary keys for each table and @return a map of namespace.table name to their * associated list of primary key fields. @@ -603,13 +490,6 @@ protected abstract List>> discoverInternal( protected abstract Map> discoverPrimaryKeys(Database database, List>> tableInfos); - /** - * Returns quote symbol of the database - * - * @return quote symbol - */ - protected abstract String getQuoteString(); - /** * Read all data from a table. * @@ -665,42 +545,13 @@ protected List deserializeInitialState(final JsonNode initi final boolean useStreamCapableState) { final Optional typedState = StateMessageHelper.getTypedState(initialStateJson, useStreamCapableState); - return typedState.map((state) -> { - switch (state.getStateType()) { - case GLOBAL: - return List.of(StateGeneratorUtils.convertStateMessage(state.getGlobal())); - case STREAM: - return state.getStateMessages() - .stream() - .map(stateMessage -> StateGeneratorUtils.convertStateMessage(stateMessage)).toList(); - case LEGACY: - default: - return List.of(new AirbyteStateMessage().withType(AirbyteStateType.LEGACY) - .withData(state.getLegacyState())); - } - }).orElse(generateEmptyInitialState(config)); - } - - /** - * Generates an empty, initial state for use by the connector. - * - * @param config The connector configuration. - * @return The empty, initial state. - */ - protected List generateEmptyInitialState(final JsonNode config) { - // For backwards compatibility with existing connectors - return List.of(new AirbyteStateMessage().withType(AirbyteStateType.LEGACY) - .withData(Jsons.jsonNode(new DbState()))); - } - - /** - * Returns the {@link AirbyteStateType} supported by this connector. - * - * @param config The connector configuration. - * @return A {@link AirbyteStateType} representing the state supported by this connector. - */ - protected AirbyteStateType getSupportedStateType(final JsonNode config) { - return AirbyteStateType.LEGACY; + if (typedState.isPresent()) { + return List.of(new AirbyteStateMessage().withType(AirbyteStateType.LEGACY) + .withData(typedState.get().getLegacyState())); + } else { + // For backwards compatibility with existing connectors + return List.of(new AirbyteStateMessage().withType(AirbyteStateType.LEGACY) + .withData(Jsons.jsonNode(new DbState()))); + } } - } diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractJdbcBaseSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractJdbcBaseSource.java new file mode 100644 index 000000000000..f71d907f53cc --- /dev/null +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractJdbcBaseSource.java @@ -0,0 +1,707 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.relationaldb; + +import static io.airbyte.integrations.base.errors.messages.ErrorMessage.getErrorMessage; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Preconditions; +import io.airbyte.commons.exceptions.ConfigErrorException; +import io.airbyte.commons.exceptions.ConnectionErrorException; +import io.airbyte.commons.features.EnvVariableFeatureFlags; +import io.airbyte.commons.features.FeatureFlags; +import io.airbyte.commons.functional.CheckedConsumer; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.lang.Exceptions; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.commons.util.AutoCloseableIterators; +import io.airbyte.config.StateWrapper; +import io.airbyte.config.helpers.StateMessageHelper; +import io.airbyte.db.AbstractDatabase; +import io.airbyte.db.IncrementalUtils; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.BaseConnector; +import io.airbyte.integrations.base.AirbyteTraceMessageUtility; +import io.airbyte.integrations.base.Source; +import io.airbyte.integrations.source.relationaldb.InvalidCursorInfoUtil.InvalidCursorInfo; +import io.airbyte.integrations.source.relationaldb.models.DbState; +import io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils; +import io.airbyte.integrations.source.relationaldb.state.StateManager; +import io.airbyte.integrations.source.relationaldb.state.StateManagerFactory; +import io.airbyte.integrations.util.ConnectorExceptionUtil; +import io.airbyte.protocol.models.CommonField; +import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil.JsonSchemaPrimitive; +import io.airbyte.protocol.models.JsonSchemaType; +import io.airbyte.protocol.models.v0.AirbyteCatalog; +import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; +import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; +import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.AirbyteMessage.Type; +import io.airbyte.protocol.models.v0.AirbyteRecordMessage; +import io.airbyte.protocol.models.v0.AirbyteStateMessage; +import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType; +import io.airbyte.protocol.models.v0.AirbyteStream; +import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.v0.CatalogHelpers; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.v0.SyncMode; +import java.sql.SQLException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class contains helper functions and boilerplate for implementing a source connector for a DB + * source of both non-relational and relational type. Note that this is a fork of AbstractDbSource and functionality here will be + * moved to the parent class (AbstractJdbcSource.java) + */ +public abstract class AbstractJdbcBaseSource extends + BaseConnector implements Source, AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJdbcBaseSource.class); + // TODO: Remove when the flag is not use anymore + private final FeatureFlags featureFlags = new EnvVariableFeatureFlags(); + + @Override + public AirbyteConnectionStatus check(final JsonNode config) throws Exception { + try { + final Database database = createDatabase(config); + for (final CheckedConsumer checkOperation : getCheckOperations(config)) { + checkOperation.accept(database); + } + + return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); + } catch (final ConnectionErrorException ex) { + final String message = getErrorMessage(ex.getStateCode(), ex.getErrorCode(), + ex.getExceptionMessage(), ex); + AirbyteTraceMessageUtility.emitConfigErrorTrace(ex, message); + return new AirbyteConnectionStatus() + .withStatus(Status.FAILED) + .withMessage(message); + } catch (final Exception e) { + LOGGER.info("Exception while checking connection: ", e); + return new AirbyteConnectionStatus() + .withStatus(Status.FAILED) + .withMessage(String.format(ConnectorExceptionUtil.COMMON_EXCEPTION_MESSAGE_TEMPLATE, e.getMessage())); + } finally { + close(); + } + } + + @Override + public AirbyteCatalog discover(final JsonNode config) throws Exception { + try { + final Database database = createDatabase(config); + final List>> tableInfos = discoverWithoutSystemTables(database); + final Map> fullyQualifiedTableNameToPrimaryKeys = discoverPrimaryKeys( + database, tableInfos); + return DbSourceDiscoverUtil.convertTableInfosToAirbyteCatalog(tableInfos, fullyQualifiedTableNameToPrimaryKeys, this::getAirbyteType); + } finally { + close(); + } + } + + /** + * Creates a list of AirbyteMessageIterators with all the streams selected in a configured catalog + * + * @param config - integration-specific configuration object as json. e.g. { "username": "airbyte", + * "password": "super secure" } + * @param catalog - schema of the incoming messages. + * @param state - state of the incoming messages. + * @return AirbyteMessageIterator with all the streams that are to be synced + * @throws Exception + */ + @Override + public AutoCloseableIterator read(final JsonNode config, + final ConfiguredAirbyteCatalog catalog, + final JsonNode state) + throws Exception { + final StateManager stateManager = + StateManagerFactory.createStateManager(getSupportedStateType(config), + deserializeInitialState(state, config, featureFlags.useStreamCapableState()), catalog); + final Instant emittedAt = Instant.now(); + + final Database database = createDatabase(config); + + final Map>> fullyQualifiedTableNameToInfo = + discoverWithoutSystemTables(database) + .stream() + .collect(Collectors.toMap(t -> String.format("%s.%s", t.getNameSpace(), t.getName()), + Function + .identity())); + + validateCursorFieldForIncrementalTables(fullyQualifiedTableNameToInfo, catalog, database); + + DbSourceDiscoverUtil.logSourceSchemaChange(fullyQualifiedTableNameToInfo, catalog, this::getAirbyteType); + + final List> incrementalIterators = + getIncrementalIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager, + emittedAt); + final List> fullRefreshIterators = + getFullRefreshIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager, + emittedAt); + final List> iteratorList = Stream + .of(incrementalIterators, fullRefreshIterators) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + + return AutoCloseableIterators + .appendOnClose(AutoCloseableIterators.concatWithEagerClose(iteratorList), () -> { + LOGGER.info("Closing database connection pool."); + Exceptions.toRuntime(this::close); + LOGGER.info("Closed database connection pool."); + }); + } + + private void validateCursorFieldForIncrementalTables( + final Map>> tableNameToTable, + final ConfiguredAirbyteCatalog catalog, + final Database database) + throws SQLException { + final List tablesWithInvalidCursor = new ArrayList<>(); + for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) { + final AirbyteStream stream = airbyteStream.getStream(); + final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(stream.getNamespace(), + stream.getName()); + final boolean hasSourceDefinedCursor = + !Objects.isNull(airbyteStream.getStream().getSourceDefinedCursor()) + && airbyteStream.getStream().getSourceDefinedCursor(); + if (!tableNameToTable.containsKey(fullyQualifiedTableName) + || airbyteStream.getSyncMode() != SyncMode.INCREMENTAL || hasSourceDefinedCursor) { + continue; + } + + final TableInfo> table = tableNameToTable + .get(fullyQualifiedTableName); + final Optional cursorField = IncrementalUtils.getCursorFieldOptional(airbyteStream); + if (cursorField.isEmpty()) { + continue; + } + final DataType cursorType = table.getFields().stream() + .filter(info -> info.getName().equals(cursorField.get())) + .map(CommonField::getType) + .findFirst() + .orElseThrow(); + + if (!isCursorType(cursorType)) { + tablesWithInvalidCursor.add( + new InvalidCursorInfo(fullyQualifiedTableName, cursorField.get(), + cursorType.toString(), "Unsupported cursor type")); + continue; + } + + if (!verifyCursorColumnValues(database, stream.getNamespace(), stream.getName(), cursorField.get())) { + tablesWithInvalidCursor.add( + new InvalidCursorInfo(fullyQualifiedTableName, cursorField.get(), + cursorType.toString(), "Cursor column contains NULL value")); + } + } + + if (!tablesWithInvalidCursor.isEmpty()) { + throw new ConfigErrorException( + InvalidCursorInfoUtil.getInvalidCursorConfigMessage(tablesWithInvalidCursor)); + } + } + + /** + * Verify that cursor column allows syncing to go through. + * + * @param database database + * @return true if syncing can go through. false otherwise + * @throws SQLException exception + */ + protected boolean verifyCursorColumnValues(final Database database, final String schema, final String tableName, final String columnName) + throws SQLException { + /* no-op */ + return true; + } + + /** + * Estimates the total volume (rows and bytes) to sync and emits a + * {@link AirbyteEstimateTraceMessage} associated with the full refresh stream. + * + * @param database database + */ + protected void estimateFullRefreshSyncSize(final Database database, + final ConfiguredAirbyteStream configuredAirbyteStream) { + /* no-op */ + } + + /** + * Estimates the total volume (rows and bytes) to sync and emits a + * {@link AirbyteEstimateTraceMessage} associated with an incremental stream. + * + * @param database database + */ + protected void estimateIncrementalSyncSize(final Database database, + final ConfiguredAirbyteStream configuredAirbyteStream, + final CursorInfo cursorInfo, + final DataType dataType) { + /* no-op */ + } + + private List>> discoverWithoutSystemTables( + final Database database) + throws Exception { + final Set systemNameSpaces = getExcludedInternalNameSpaces(); + final Set systemViews = getExcludedViews(); + final List>> discoveredTables = discoverInternal(database); + return (systemNameSpaces == null || systemNameSpaces.isEmpty() ? discoveredTables + : discoveredTables.stream() + .filter(table -> !systemNameSpaces.contains(table.getNameSpace()) && !systemViews.contains(table.getName())).collect( + Collectors.toList())); + } + + private List> getFullRefreshIterators( + final Database database, + final ConfiguredAirbyteCatalog catalog, + final Map>> tableNameToTable, + final StateManager stateManager, + final Instant emittedAt) { + return getSelectedIterators( + database, + catalog, + tableNameToTable, + stateManager, + emittedAt, + SyncMode.FULL_REFRESH); + } + + protected List> getIncrementalIterators( + final Database database, + final ConfiguredAirbyteCatalog catalog, + final Map>> tableNameToTable, + final StateManager stateManager, + final Instant emittedAt) { + return getSelectedIterators( + database, + catalog, + tableNameToTable, + stateManager, + emittedAt, + SyncMode.INCREMENTAL); + } + + /** + * Creates a list of read iterators for each stream within an ConfiguredAirbyteCatalog + * + * @param database Source Database + * @param catalog List of streams (e.g. database tables or API endpoints) with settings on sync mode + * @param tableNameToTable Mapping of table name to table + * @param stateManager Manager used to track the state of data synced by the connector + * @param emittedAt Time when data was emitted from the Source database + * @param syncMode the sync mode for which we want to grab the required iterators + * @return List of AirbyteMessageIterators containing all iterators for a catalog + */ + private List> getSelectedIterators( + final Database database, + final ConfiguredAirbyteCatalog catalog, + final Map>> tableNameToTable, + final StateManager stateManager, + final Instant emittedAt, + final SyncMode syncMode) { + final List> iteratorList = new ArrayList<>(); + for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) { + if (airbyteStream.getSyncMode().equals(syncMode)) { + final AirbyteStream stream = airbyteStream.getStream(); + final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(stream.getNamespace(), + stream.getName()); + if (!tableNameToTable.containsKey(fullyQualifiedTableName)) { + LOGGER + .info("Skipping stream {} because it is not in the source", fullyQualifiedTableName); + continue; + } + + final TableInfo> table = tableNameToTable + .get(fullyQualifiedTableName); + final AutoCloseableIterator tableReadIterator = createReadIterator( + database, + airbyteStream, + table, + stateManager, + emittedAt); + iteratorList.add(tableReadIterator); + } + } + + return iteratorList; + } + + /** + * ReadIterator is used to retrieve records from a source connector + * + * @param database Source Database + * @param airbyteStream represents an ingestion source (e.g. API endpoint or database table) + * @param table information in tabular format + * @param stateManager Manager used to track the state of data synced by the connector + * @param emittedAt Time when data was emitted from the Source database + * @return + */ + private AutoCloseableIterator createReadIterator(final Database database, + final ConfiguredAirbyteStream airbyteStream, + final TableInfo> table, + final StateManager stateManager, + final Instant emittedAt) { + final String streamName = airbyteStream.getStream().getName(); + final String namespace = airbyteStream.getStream().getNamespace(); + final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(streamName, + namespace); + final Set selectedFieldsInCatalog = CatalogHelpers.getTopLevelFieldNames(airbyteStream); + final List selectedDatabaseFields = table.getFields() + .stream() + .map(CommonField::getName) + .filter(selectedFieldsInCatalog::contains) + .collect(Collectors.toList()); + + final AutoCloseableIterator iterator; + // checks for which sync mode we're using based on the configured airbytestream + // this is where the bifurcation between full refresh and incremental + if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) { + final String cursorField = IncrementalUtils.getCursorField(airbyteStream); + final Optional cursorInfo = stateManager.getCursorInfo(pair); + + final AutoCloseableIterator airbyteMessageIterator; + if (cursorInfo.map(CursorInfo::getCursor).isPresent()) { + airbyteMessageIterator = getIncrementalStream( + database, + airbyteStream, + selectedDatabaseFields, + table, + cursorInfo.get(), + emittedAt); + } else { + // if no cursor is present then this is the first read for is the same as doing a full refresh read. + estimateFullRefreshSyncSize(database, airbyteStream); + airbyteMessageIterator = getFullRefreshStream(database, streamName, namespace, + selectedDatabaseFields, table, emittedAt); + } + + final JsonSchemaPrimitive cursorType = IncrementalUtils.getCursorType(airbyteStream, + cursorField); + + iterator = AutoCloseableIterators.transform( + autoCloseableIterator -> new StateDecoratingIterator( + autoCloseableIterator, + stateManager, + pair, + cursorField, + cursorInfo.map(CursorInfo::getCursor).orElse(null), + cursorType, + getStateEmissionFrequency()), + airbyteMessageIterator); + } else if (airbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) { + estimateFullRefreshSyncSize(database, airbyteStream); + iterator = getFullRefreshStream(database, streamName, namespace, selectedDatabaseFields, + table, emittedAt); + } else if (airbyteStream.getSyncMode() == null) { + throw new IllegalArgumentException( + String.format("%s requires a source sync mode", this.getClass())); + } else { + throw new IllegalArgumentException( + String.format("%s does not support sync mode: %s.", this.getClass(), + airbyteStream.getSyncMode())); + } + + final AtomicLong recordCount = new AtomicLong(); + return AutoCloseableIterators.transform(iterator, r -> { + final long count = recordCount.incrementAndGet(); + if (count % 10000 == 0) { + LOGGER.info("Reading stream {}. Records read: {}", streamName, count); + } + return r; + }); + } + + /** + * @param database Source Database + * @param airbyteStream represents an ingestion source (e.g. API endpoint or database table) + * @param selectedDatabaseFields subset of database fields selected for replication + * @param table information in tabular format + * @param cursorInfo state of where to start the sync from + * @param emittedAt Time when data was emitted from the Source database + * @return AirbyteMessage Iterator that + */ + private AutoCloseableIterator getIncrementalStream(final Database database, + final ConfiguredAirbyteStream airbyteStream, + final List selectedDatabaseFields, + final TableInfo> table, + final CursorInfo cursorInfo, + final Instant emittedAt) { + final String streamName = airbyteStream.getStream().getName(); + final String namespace = airbyteStream.getStream().getNamespace(); + final String cursorField = IncrementalUtils.getCursorField(airbyteStream); + final DataType cursorType = table.getFields().stream() + .filter(info -> info.getName().equals(cursorField)) + .map(CommonField::getType) + .findFirst() + .orElseThrow(); + + Preconditions.checkState( + table.getFields().stream().anyMatch(f -> f.getName().equals(cursorField)), + String.format("Could not find cursor field %s in table %s", cursorField, table.getName())); + + estimateIncrementalSyncSize(database, airbyteStream, cursorInfo, cursorType); + final AutoCloseableIterator queryIterator = queryTableIncremental( + database, + selectedDatabaseFields, + table.getNameSpace(), + table.getName(), + cursorInfo, + cursorType); + + return getMessageIterator(queryIterator, streamName, namespace, emittedAt.toEpochMilli()); + } + + /** + * Creates a AirbyteMessageIterator that contains all records for a database source connection + * + * @param database Source Database + * @param streamName name of an individual stream in which a stream represents a source (e.g. API + * endpoint or database table) + * @param namespace Namespace of the database (e.g. public) + * @param selectedDatabaseFields List of all interested database column names + * @param table information in tabular format + * @param emittedAt Time when data was emitted from the Source database + * @return AirbyteMessageIterator with all records for a database source + */ + private AutoCloseableIterator getFullRefreshStream(final Database database, + final String streamName, + final String namespace, + final List selectedDatabaseFields, + final TableInfo> table, + final Instant emittedAt) { + final AutoCloseableIterator queryStream = + queryTableFullRefresh(database, selectedDatabaseFields, table.getNameSpace(), + table.getName()); + return getMessageIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli()); + } + + private static AutoCloseableIterator getMessageIterator( + final AutoCloseableIterator recordIterator, + final String streamName, + final String namespace, + final long emittedAt) { + return AutoCloseableIterators.transform(recordIterator, r -> new AirbyteMessage() + .withType(Type.RECORD) + .withRecord(new AirbyteRecordMessage() + .withStream(streamName) + .withNamespace(namespace) + .withEmittedAt(emittedAt) + .withData(r))); + } + + /** + * @param database - The database where from privileges for tables will be consumed + * @param schema - The schema where from privileges for tables will be consumed + * @return Set with privileges for tables for current DB-session user The method is responsible for + * SELECT-ing the table with privileges. In some cases such SELECT doesn't require (e.g. in + * Oracle DB - the schema is the user, you cannot REVOKE a privilege on a table from its + * owner). + */ + protected Set getPrivilegesTableForCurrentUser(final JdbcDatabase database, + final String schema) + throws SQLException { + return Collections.emptySet(); + } + + /** + * Map a database implementation-specific configuration to json object that adheres to the database + * config spec. See resources/spec.json. + * + * @param config database implementation-specific configuration. + * @return database spec config + */ + public abstract JsonNode toDatabaseConfig(JsonNode config); + + /** + * Creates a database instance using the database spec config. + * + * @param config database spec config + * @return database instance + * @throws Exception might throw an error during connection to database + */ + protected abstract Database createDatabase(JsonNode config) throws Exception; + + /** + * Configures a list of operations that can be used to check the connection to the source. + * + * @return list of consumers that run queries for the check command. + */ + protected abstract List> getCheckOperations(JsonNode config) + throws Exception; + + /** + * Map source types to Airbyte types + * + * @param columnType source data type + * @return airbyte data type + */ + protected abstract JsonSchemaType getAirbyteType(DataType columnType); + + /** + * Get list of system namespaces(schemas) in order to exclude them from the `discover` result list. + * + * @return set of system namespaces(schemas) to be excluded + */ + protected abstract Set getExcludedInternalNameSpaces(); + + /** + * Get list of system views in order to exclude them from the `discover` result list. + * + * @return set of views to be excluded + */ + protected Set getExcludedViews() { + return Collections.emptySet(); + }; + + /** + * Discover all available tables in the source database. + * + * @param database source database + * @return list of the source tables + * @throws Exception access to the database might lead to an exceptions. + */ + protected abstract List>> discoverInternal( + final Database database) + throws Exception; + + /** + * Discovers all available tables within a schema in the source database. + * + * @param database - source database + * @param schema - source schema + * @return list of source tables + * @throws Exception - access to the database might lead to exceptions. + */ + protected abstract List>> discoverInternal( + final Database database, + String schema) + throws Exception; + + /** + * Discover Primary keys for each table and @return a map of namespace.table name to their + * associated list of primary key fields. + * + * @param database source database + * @param tableInfos list of tables + * @return map of namespace.table and primary key fields. + */ + protected abstract Map> discoverPrimaryKeys(Database database, + List>> tableInfos); + + /** + * Returns quote symbol of the database + * + * @return quote symbol + */ + protected abstract String getQuoteString(); + + /** + * Read all data from a table. + * + * @param database source database + * @param columnNames interested column names + * @param schemaName table namespace + * @param tableName target table + * @return iterator with read data + */ + protected abstract AutoCloseableIterator queryTableFullRefresh(final Database database, + final List columnNames, + final String schemaName, + final String tableName); + + /** + * Read incremental data from a table. Incremental read should return only records where cursor + * column value is bigger than cursor. Note that if the connector needs to emit intermediate state + * (i.e. {@link AbstractJbcBaseSource#getStateEmissionFrequency} > 0), the incremental query must be + * sorted by the cursor field. + * + * @return iterator with read data + */ + protected abstract AutoCloseableIterator queryTableIncremental(Database database, + List columnNames, + String schemaName, + String tableName, + CursorInfo cursorInfo, + DataType cursorFieldType); + + /** + * When larger than 0, the incremental iterator will emit intermediate state for every N records. + * Please note that if intermediate state emission is enabled, the incremental query must be ordered + * by the cursor field. + */ + protected int getStateEmissionFrequency() { + return 0; + } + + /** + * @return list of fields that could be used as cursors + */ + protected abstract boolean isCursorType(DataType type); + + /** + * Deserializes the state represented as JSON into an object representation. + * + * @param initialStateJson The state as JSON. + * @param config The connector configuration. + * @return The deserialized object representation of the state. + */ + protected List deserializeInitialState(final JsonNode initialStateJson, + final JsonNode config, + final boolean useStreamCapableState) { + final Optional typedState = StateMessageHelper.getTypedState(initialStateJson, + useStreamCapableState); + return typedState.map((state) -> { + switch (state.getStateType()) { + case GLOBAL: + return List.of(StateGeneratorUtils.convertStateMessage(state.getGlobal())); + case STREAM: + return state.getStateMessages() + .stream() + .map(stateMessage -> StateGeneratorUtils.convertStateMessage(stateMessage)).toList(); + case LEGACY: + default: + return List.of(new AirbyteStateMessage().withType(AirbyteStateType.LEGACY) + .withData(state.getLegacyState())); + } + }).orElse(generateEmptyInitialState(config)); + } + + /** + * Generates an empty, initial state for use by the connector. + * + * @param config The connector configuration. + * @return The empty, initial state. + */ + protected List generateEmptyInitialState(final JsonNode config) { + // For backwards compatibility with existing connectors + return List.of(new AirbyteStateMessage().withType(AirbyteStateType.LEGACY) + .withData(Jsons.jsonNode(new DbState()))); + } + + /** + * Returns the {@link AirbyteStateType} supported by this connector. + * + * @param config The connector configuration. + * @return A {@link AirbyteStateType} representing the state supported by this connector. + */ + protected AirbyteStateType getSupportedStateType(final JsonNode config) { + return AirbyteStateType.LEGACY; + } + +} diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/AbstractDbSourceTest.java b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/AbstractDbSourceTest.java index b35e0cb65037..50f5aa6e46c3 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/AbstractDbSourceTest.java +++ b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/AbstractDbSourceTest.java @@ -79,7 +79,7 @@ void testDeserializationOfNullState() throws IOException { final List result = dbSource.deserializeInitialState(null, config, false); assertEquals(1, result.size()); - assertEquals(dbSource.getSupportedStateType(config), result.get(0).getType()); + assertEquals(AirbyteStateType.LEGACY, result.get(0).getType()); } } diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/AbstractJdbcBaseSourceTest.java b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/AbstractJdbcBaseSourceTest.java new file mode 100644 index 000000000000..501d1544e066 --- /dev/null +++ b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/AbstractJdbcBaseSourceTest.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.relationaldb; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.features.EnvVariableFeatureFlags; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.protocol.models.v0.AirbyteStateMessage; +import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType; +import java.io.IOException; +import java.util.List; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import uk.org.webcompere.systemstubs.environment.EnvironmentVariables; +import uk.org.webcompere.systemstubs.jupiter.SystemStub; +import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension; + +/** + * Test suite for the {@link AbstractDbSource} class. + */ +@ExtendWith(SystemStubsExtension.class) +public class AbstractJdbcBaseSourceTest { + + @SystemStub + private EnvironmentVariables environmentVariables; + + @Test + void testDeserializationOfLegacyState() throws IOException { + final AbstractJdbcBaseSource dbSource = spy(AbstractJdbcBaseSource.class); + final JsonNode config = mock(JsonNode.class); + + final String legacyStateJson = MoreResources.readResource("states/legacy.json"); + final JsonNode legacyState = Jsons.deserialize(legacyStateJson); + + final List result = dbSource.deserializeInitialState(legacyState, config, false); + assertEquals(1, result.size()); + assertEquals(AirbyteStateType.LEGACY, result.get(0).getType()); + } + + @Test + void testDeserializationOfGlobalState() throws IOException { + environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true"); + final AbstractJdbcBaseSource dbSource = spy(AbstractJdbcBaseSource.class); + final JsonNode config = mock(JsonNode.class); + + final String globalStateJson = MoreResources.readResource("states/global.json"); + final JsonNode globalState = Jsons.deserialize(globalStateJson); + + final List result = dbSource.deserializeInitialState(globalState, config, true); + assertEquals(1, result.size()); + assertEquals(AirbyteStateType.GLOBAL, result.get(0).getType()); + } + + @Test + void testDeserializationOfStreamState() throws IOException { + environmentVariables.set(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true"); + final AbstractJdbcBaseSource dbSource = spy(AbstractJdbcBaseSource.class); + final JsonNode config = mock(JsonNode.class); + + final String streamStateJson = MoreResources.readResource("states/per_stream.json"); + final JsonNode streamState = Jsons.deserialize(streamStateJson); + + final List result = dbSource.deserializeInitialState(streamState, config, true); + assertEquals(2, result.size()); + assertEquals(AirbyteStateType.STREAM, result.get(0).getType()); + } + + @Test + void testDeserializationOfNullState() throws IOException { + final AbstractJdbcBaseSource dbSource = spy(AbstractJdbcBaseSource.class); + final JsonNode config = mock(JsonNode.class); + + final List result = dbSource.deserializeInitialState(null, config, false); + assertEquals(1, result.size()); + assertEquals(dbSource.getSupportedStateType(config), result.get(0).getType()); + } + +}