diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index ce55901c8736e..2b047d3c9958e 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -174,6 +174,7 @@ corresponds to that version. | Version | Date | Pull Request | Subject | |:-----------|:-----------|:------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.48.7 | 2025-01-26 | [\#51596](https://github.com/airbytehq/airbyte/pull/51596) | Make efficient table discovery during read | | 0.48.6 | 2025-01-26 | [\#51596](https://github.com/airbytehq/airbyte/pull/51596) | Fix flaky source mssql tests | | 0.48.5 | 2025-01-16 | [\#51583](https://github.com/airbytehq/airbyte/pull/51583) | Also save SSL key to /tmp in destination-postgres | | 0.48.4 | 2024-12-24 | [\#50410](https://github.com/airbytehq/airbyte/pull/50410) | Save SSL key to /tmp | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index d4f4e334d431c..688b253e3d1c3 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.48.6 +version=0.48.7 diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt index e38de51cc107d..3c534722a18c2 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt @@ -389,31 +389,7 @@ abstract class AbstractJdbcSource( ) } .values - .map { fields: List -> - TableInfo>( - nameSpace = fields[0].get(INTERNAL_SCHEMA_NAME).asText(), - name = fields[0].get(INTERNAL_TABLE_NAME).asText(), - fields = - fields - // read the column metadata Json object, and determine its - // type - .map { f: JsonNode -> - val datatype = sourceOperations.getDatabaseFieldType(f) - val jsonType = getAirbyteType(datatype) - LOGGER.debug { - "Table ${fields[0].get(INTERNAL_TABLE_NAME).asText()} column ${f.get(INTERNAL_COLUMN_NAME).asText()}" + - "(type ${f.get(INTERNAL_COLUMN_TYPE_NAME).asText()}[${f.get(INTERNAL_COLUMN_SIZE).asInt()}], " + - "nullable ${f.get(INTERNAL_IS_NULLABLE).asBoolean()}) -> $jsonType" - } - object : - CommonField( - f.get(INTERNAL_COLUMN_NAME).asText(), - datatype - ) {} - }, - cursorFields = extractCursorFields(fields) - ) - } + .map { fields: List -> jsonFieldListToTableInfo(fields) } } private fun extractCursorFields(fields: List): List { @@ -579,6 +555,53 @@ abstract class AbstractJdbcSource( ) } + override fun discoverTable( + database: JdbcDatabase, + schema: String, + tableName: String + ): TableInfo>? { + LOGGER.info { "Discover table: $schema.$tableName" } + return database + .bufferedResultSetQuery( + { connection: Connection -> + connection.metaData.getColumns(getCatalog(database), schema, tableName, null) + }, + { resultSet: ResultSet -> this.getColumnMetadata(resultSet) } + ) + .groupBy { t: JsonNode -> + ImmutablePair.of( + t.get(INTERNAL_SCHEMA_NAME).asText(), + t.get(INTERNAL_TABLE_NAME).asText() + ) + } + .values + .map { fields: List -> jsonFieldListToTableInfo(fields) } + .firstOrNull() + } + + private fun jsonFieldListToTableInfo(fields: List): TableInfo> { + return TableInfo>( + nameSpace = fields[0].get(INTERNAL_SCHEMA_NAME).asText(), + name = fields[0].get(INTERNAL_TABLE_NAME).asText(), + fields = + fields + // read the column metadata Json object, and determine its + // type + .map { f: JsonNode -> + val datatype = sourceOperations.getDatabaseFieldType(f) + val jsonType = getAirbyteType(datatype) + LOGGER.debug { + "Table ${fields[0].get(INTERNAL_TABLE_NAME).asText()} column ${f.get(INTERNAL_COLUMN_NAME).asText()}" + + "(type ${f.get(INTERNAL_COLUMN_TYPE_NAME).asText()}[${f.get(INTERNAL_COLUMN_SIZE).asInt()}], " + + "nullable ${f.get(INTERNAL_IS_NULLABLE).asBoolean()}) -> $jsonType" + } + object : + CommonField(f.get(INTERNAL_COLUMN_NAME).asText(), datatype) {} + }, + cursorFields = extractCursorFields(fields) + ) + } + public override fun isCursorType(type: Datatype): Boolean { return sourceOperations.isCursorType(type) } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt index 6372c1be6ded0..1dbaa152c27b0 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt @@ -131,7 +131,7 @@ protected constructor(driverClassName: String) : logPreSyncDebugData(database, catalog) val fullyQualifiedTableNameToInfo = - discoverWithoutSystemTables(database).associateBy { + discoverWithoutSystemTables(database, catalog).associateBy { String.format("%s.%s", it.nameSpace, it.name) } @@ -289,6 +289,22 @@ protected constructor(driverClassName: String) : /* no-op */ } + @Throws(Exception::class) + protected fun discoverWithoutSystemTables( + database: Database, + catalog: ConfiguredAirbyteCatalog, + ): List>> { + var result = mutableListOf>>() + catalog.streams.forEach { airbyteStream: ConfiguredAirbyteStream -> + val stream = airbyteStream.stream + discoverTable(database, stream.namespace, stream.name)?.let { + LOGGER.info { "Discovered table: ${it.nameSpace}.${it.name}: $it" } + result.add(it) + } + } + return result + } + @Throws(Exception::class) protected fun discoverWithoutSystemTables( database: Database @@ -723,6 +739,23 @@ protected constructor(driverClassName: String) : tableInfos: List>> ): Map> + /** + * Discovers a table in the source database. + * + * @param database + * - source database + * @param schema + * - source schema + * @param tableName + * - source table name + * @return table information + */ + protected abstract fun discoverTable( + database: Database, + schema: String, + tableName: String + ): TableInfo>? + protected abstract val quoteString: String? /** diff --git a/airbyte-integrations/connectors/source-mssql/build.gradle b/airbyte-integrations/connectors/source-mssql/build.gradle index ec9a5eb821d0d..cedb8d967ee0b 100644 --- a/airbyte-integrations/connectors/source-mssql/build.gradle +++ b/airbyte-integrations/connectors/source-mssql/build.gradle @@ -3,7 +3,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.48.6' + cdkVersionRequired = '0.48.7' features = ['db-sources'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-mssql/metadata.yaml b/airbyte-integrations/connectors/source-mssql/metadata.yaml index b94714c8538b5..d45ecbd108566 100644 --- a/airbyte-integrations/connectors/source-mssql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mssql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1 - dockerImageTag: 4.1.19 + dockerImageTag: 4.1.20 dockerRepository: airbyte/source-mssql documentationUrl: https://docs.airbyte.com/integrations/sources/mssql githubIssueLabel: source-mssql diff --git a/docs/integrations/sources/mssql.md b/docs/integrations/sources/mssql.md index b88db9a64521e..33ab346d3934a 100644 --- a/docs/integrations/sources/mssql.md +++ b/docs/integrations/sources/mssql.md @@ -424,6 +424,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura | Version | Date | Pull Request | Subject | |:--------|:-----------| :---------------------------------------------------------------------------------------------------------------- |:------------------------------------------------------------------------------------------------------------------------------------------------| +| 4.1.20 | 2025-01-26 | [52556](https://github.com/airbytehq/airbyte/pull/52556) | Improve tables discovery during read. | | 4.1.19 | 2025-01-16 | [51596](https://github.com/airbytehq/airbyte/pull/51596) | Bump driver versions to latest (jdbc, debezium, cdk) | | 4.1.18 | 2025-01-06 | [50943](https://github.com/airbytehq/airbyte/pull/50943) | Use airbyte/java-connector-base:2.0.0. This makes the image rootless. The connector will be incompatible with Airbyte < 0.64. | | 4.1.17 | 2024-12-17 | [49840](https://github.com/airbytehq/airbyte/pull/49840) | Use a base image: airbyte/java-connector-base:1.0.0 |