Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make efficient table discovery during read #52556

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:-----------|:-----------|:------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.48.7 | 2025-01-26 | [\#51596](https://github.com/airbytehq/airbyte/pull/51596) | Make efficient table discovery during read |
| 0.48.6 | 2025-01-26 | [\#51596](https://github.com/airbytehq/airbyte/pull/51596) | Fix flaky source mssql tests |
| 0.48.5 | 2025-01-16 | [\#51583](https://github.com/airbytehq/airbyte/pull/51583) | Also save SSL key to /tmp in destination-postgres |
| 0.48.4 | 2024-12-24 | [\#50410](https://github.com/airbytehq/airbyte/pull/50410) | Save SSL key to /tmp |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.48.6
version=0.48.7
Original file line number Diff line number Diff line change
Expand Up @@ -389,31 +389,7 @@ abstract class AbstractJdbcSource<Datatype>(
)
}
.values
.map { fields: List<JsonNode> ->
TableInfo<CommonField<Datatype>>(
nameSpace = fields[0].get(INTERNAL_SCHEMA_NAME).asText(),
name = fields[0].get(INTERNAL_TABLE_NAME).asText(),
fields =
fields
// read the column metadata Json object, and determine its
// type
.map { f: JsonNode ->
val datatype = sourceOperations.getDatabaseFieldType(f)
val jsonType = getAirbyteType(datatype)
LOGGER.debug {
"Table ${fields[0].get(INTERNAL_TABLE_NAME).asText()} column ${f.get(INTERNAL_COLUMN_NAME).asText()}" +
"(type ${f.get(INTERNAL_COLUMN_TYPE_NAME).asText()}[${f.get(INTERNAL_COLUMN_SIZE).asInt()}], " +
"nullable ${f.get(INTERNAL_IS_NULLABLE).asBoolean()}) -> $jsonType"
}
object :
CommonField<Datatype>(
f.get(INTERNAL_COLUMN_NAME).asText(),
datatype
) {}
},
cursorFields = extractCursorFields(fields)
)
}
.map { fields: List<JsonNode> -> jsonFieldListToTableInfo(fields) }
}

private fun extractCursorFields(fields: List<JsonNode>): List<String> {
Expand Down Expand Up @@ -579,6 +555,53 @@ abstract class AbstractJdbcSource<Datatype>(
)
}

override fun discoverTable(
database: JdbcDatabase,
schema: String,
tableName: String
): TableInfo<CommonField<Datatype>>? {
LOGGER.info { "Discover table: $schema.$tableName" }
return database
.bufferedResultSetQuery<JsonNode>(
{ connection: Connection ->
connection.metaData.getColumns(getCatalog(database), schema, tableName, null)
},
{ resultSet: ResultSet -> this.getColumnMetadata(resultSet) }
)
.groupBy { t: JsonNode ->
ImmutablePair.of<String, String>(
t.get(INTERNAL_SCHEMA_NAME).asText(),
t.get(INTERNAL_TABLE_NAME).asText()
)
}
.values
.map { fields: List<JsonNode> -> jsonFieldListToTableInfo(fields) }
.firstOrNull()
}

private fun jsonFieldListToTableInfo(fields: List<JsonNode>): TableInfo<CommonField<Datatype>> {
return TableInfo<CommonField<Datatype>>(
nameSpace = fields[0].get(INTERNAL_SCHEMA_NAME).asText(),
name = fields[0].get(INTERNAL_TABLE_NAME).asText(),
fields =
fields
// read the column metadata Json object, and determine its
// type
.map { f: JsonNode ->
val datatype = sourceOperations.getDatabaseFieldType(f)
val jsonType = getAirbyteType(datatype)
LOGGER.debug {
"Table ${fields[0].get(INTERNAL_TABLE_NAME).asText()} column ${f.get(INTERNAL_COLUMN_NAME).asText()}" +
"(type ${f.get(INTERNAL_COLUMN_TYPE_NAME).asText()}[${f.get(INTERNAL_COLUMN_SIZE).asInt()}], " +
"nullable ${f.get(INTERNAL_IS_NULLABLE).asBoolean()}) -> $jsonType"
}
object :
CommonField<Datatype>(f.get(INTERNAL_COLUMN_NAME).asText(), datatype) {}
},
cursorFields = extractCursorFields(fields)
)
}

public override fun isCursorType(type: Datatype): Boolean {
return sourceOperations.isCursorType(type)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ protected constructor(driverClassName: String) :
logPreSyncDebugData(database, catalog)

val fullyQualifiedTableNameToInfo =
discoverWithoutSystemTables(database).associateBy {
discoverWithoutSystemTables(database, catalog).associateBy {
String.format("%s.%s", it.nameSpace, it.name)
}

Expand Down Expand Up @@ -289,6 +289,22 @@ protected constructor(driverClassName: String) :
/* no-op */
}

@Throws(Exception::class)
protected fun discoverWithoutSystemTables(
database: Database,
catalog: ConfiguredAirbyteCatalog,
): List<TableInfo<CommonField<DataType>>> {
var result = mutableListOf<TableInfo<CommonField<DataType>>>()
catalog.streams.forEach { airbyteStream: ConfiguredAirbyteStream ->
val stream = airbyteStream.stream
discoverTable(database, stream.namespace, stream.name)?.let {
LOGGER.info { "Discovered table: ${it.nameSpace}.${it.name}: $it" }
result.add(it)
}
}
return result
}

@Throws(Exception::class)
protected fun discoverWithoutSystemTables(
database: Database
Expand Down Expand Up @@ -723,6 +739,23 @@ protected constructor(driverClassName: String) :
tableInfos: List<TableInfo<CommonField<DataType>>>
): Map<String, MutableList<String>>

/**
* Discovers a table in the source database.
*
* @param database
* - source database
* @param schema
* - source schema
* @param tableName
* - source table name
* @return table information
*/
protected abstract fun discoverTable(
database: Database,
schema: String,
tableName: String
): TableInfo<CommonField<DataType>>?

protected abstract val quoteString: String?

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.48.6'
cdkVersionRequired = '0.48.7'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.1.19
dockerImageTag: 4.1.20
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :---------------------------------------------------------------------------------------------------------------- |:------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.1.20 | 2025-01-26 | [52556](https://github.com/airbytehq/airbyte/pull/52556) | Improve tables discovery during read. |
| 4.1.19 | 2025-01-16 | [51596](https://github.com/airbytehq/airbyte/pull/51596) | Bump driver versions to latest (jdbc, debezium, cdk) |
| 4.1.18 | 2025-01-06 | [50943](https://github.com/airbytehq/airbyte/pull/50943) | Use airbyte/java-connector-base:2.0.0. This makes the image rootless. The connector will be incompatible with Airbyte < 0.64. |
| 4.1.17 | 2024-12-17 | [49840](https://github.com/airbytehq/airbyte/pull/49840) | Use a base image: airbyte/java-connector-base:1.0.0 |
Expand Down
Loading