Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

destination-snowflake: speed up metadata queries #45422

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.12.0
dockerImageTag: 3.13.0
dockerRepository: airbyte/destination-snowflake
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
githubIssueLabel: destination-snowflake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf
import io.airbyte.integrations.destination.snowflake.SnowflakeDatabaseUtils
import io.airbyte.integrations.destination.snowflake.migrations.SnowflakeState
import java.sql.Connection
import java.sql.DatabaseMetaData
import java.sql.ResultSet
import java.sql.SQLException
import java.time.Instant
Expand Down Expand Up @@ -64,34 +63,57 @@ class SnowflakeDestinationHandler(
// Postgres is close enough to Snowflake SQL for our purposes.
// We don't quote the database name in any queries, so just upcase it.
private val databaseName = databaseName.uppercase(Locale.getDefault())
private data class SnowflakeTableInfo(
val schemaName: String,
val tableName: String,
val rowCount: Int
) {}
private fun queryTable(schemaName: String, tableName: String): List<SnowflakeTableInfo> {
val showTablesQuery =
"""
SHOW TABLES LIKE '$tableName' IN "$databaseName"."$schemaName";
""".trimIndent()
try {
val showTablesResult =
database.queryJsons(
showTablesQuery,
)
return showTablesResult.map {
SnowflakeTableInfo(
it["schema_name"].asText(),
edgao marked this conversation as resolved.
Show resolved Hide resolved
it["name"].asText(),
it["rows"].asText().toInt()
edgao marked this conversation as resolved.
Show resolved Hide resolved
)
}
} catch (e: SnowflakeSQLException) {
val message = e.message
if (
message != null &&
message.contains("does not exist, or operation cannot be performed.")
)
return emptyList()
else {
throw e
}
}
}

@Throws(SQLException::class)
private fun getFinalTableRowCount(
streamIds: List<StreamId>
): LinkedHashMap<String, LinkedHashMap<String, Int>> {
val tableRowCounts = LinkedHashMap<String, LinkedHashMap<String, Int>>()
// convert list stream to array
val namespaces = streamIds.map { it.finalNamespace }.toTypedArray()
val names = streamIds.map { it.finalName }.toTypedArray()
val query =
"""
|SELECT table_schema, table_name, row_count
|FROM information_schema.tables
|WHERE table_catalog = ?
|AND table_schema IN (${IntRange(1, streamIds.size).joinToString { "?" }})
|AND table_name IN (${IntRange(1, streamIds.size).joinToString { "?" }})
|""".trimMargin()
val bindValues = arrayOf(databaseName) + namespaces + names
val results: List<JsonNode> = database.queryJsons(query, *bindValues)
for (result in results) {
val tableSchema = result["TABLE_SCHEMA"].asText()
val tableName = result["TABLE_NAME"].asText()
val rowCount = result["ROW_COUNT"].asInt()
tableRowCounts
.computeIfAbsent(tableSchema) { _: String? -> LinkedHashMap() }[tableName] =
rowCount
val tableRowCountsFromShowQuery = LinkedHashMap<String, LinkedHashMap<String, Int>>()
for (stream in streamIds) {
val tables = queryTable(stream.finalNamespace, stream.finalName)
tables.forEach {
if (it.tableName == stream.finalName) {
tableRowCountsFromShowQuery
.computeIfAbsent(it.schemaName) { LinkedHashMap() }[it.tableName] =
it.rowCount
}
}
}
return tableRowCounts
return tableRowCountsFromShowQuery
}

@Throws(Exception::class)
Expand All @@ -101,36 +123,15 @@ class SnowflakeDestinationHandler(
): InitialRawTableStatus {
val rawTableName = id.rawName + suffix
val tableExists =
database.executeMetadataQuery { databaseMetaData: DatabaseMetaData ->
LOGGER.info(
"Retrieving table from Db metadata: {} {}",
id.rawNamespace,
rawTableName
)
try {
val rs =
databaseMetaData.getTables(
databaseName,
id.rawNamespace,
rawTableName,
null
)
// When QUOTED_IDENTIFIERS_IGNORE_CASE is set to true, the raw table is
// interpreted as uppercase
// in db metadata calls. check for both
val rsUppercase =
databaseMetaData.getTables(
databaseName,
id.rawNamespace.uppercase(),
rawTableName.uppercase(),
null
)
rs.next() || rsUppercase.next()
} catch (e: SQLException) {
LOGGER.error("Failed to retrieve table metadata", e)
throw RuntimeException(e)
}
queryTable(id.rawNamespace, rawTableName).any {
// When QUOTED_IDENTIFIERS_IGNORE_CASE is set to true, the raw table is
// interpreted as uppercase
// in db metadata calls. check for both
(it.schemaName == id.rawNamespace && it.tableName == rawTableName) ||
(it.schemaName == id.rawNamespace.uppercase() &&
it.tableName == rawTableName.uppercase())
}

if (!tableExists) {
return InitialRawTableStatus(
rawTableExists = false,
Expand Down Expand Up @@ -388,7 +389,7 @@ class SnowflakeDestinationHandler(
val destinationStates = getAllDestinationStates()

val streamIds = streamConfigs.map(StreamConfig::id).toList()
val existingTables = findExistingTables(database, databaseName, streamIds)
val existingTables = findExistingTables(database, streamIds)
val tableRowCounts = getFinalTableRowCount(streamIds)
return streamConfigs
.stream()
Expand Down Expand Up @@ -536,42 +537,52 @@ class SnowflakeDestinationHandler(
@Throws(SQLException::class)
fun findExistingTables(
database: JdbcDatabase,
databaseName: String,
streamIds: List<StreamId>
): LinkedHashMap<String, LinkedHashMap<String, TableDefinition>> {
val existingTables = LinkedHashMap<String, LinkedHashMap<String, TableDefinition>>()
// convert list stream to array
val namespaces = streamIds.map { it.finalNamespace }.toTypedArray()
val names = streamIds.map { it.finalName }.toTypedArray()
val query =
"""
|SELECT table_schema, table_name, column_name, data_type, is_nullable
|FROM information_schema.columns
|WHERE table_catalog = ?
|AND table_schema IN (${IntRange(1, streamIds.size).joinToString { "?" }})
|AND table_name IN (${IntRange(1, streamIds.size).joinToString { "?" }})
|ORDER BY table_schema, table_name, ordinal_position;
|""".trimMargin()

val bindValues =
arrayOf(databaseName.uppercase(Locale.getDefault())) + namespaces + names
val results: List<JsonNode> = database.queryJsons(query, *bindValues)
for (result in results) {
val tableSchema = result["TABLE_SCHEMA"].asText()
val tableName = result["TABLE_NAME"].asText()
val columnName = result["COLUMN_NAME"].asText()
val dataType = result["DATA_TYPE"].asText()
val isNullable = result["IS_NULLABLE"].asText()
val tableDefinition =
): Map<String, Map<String, TableDefinition>> {
val existingTables = HashMap<String, HashMap<String, TableDefinition>>()
for (stream in streamIds) {
val schemaName = stream.finalNamespace
val tableName = stream.finalName
val table = getTable(database, schemaName, tableName)
if (table != null) {
existingTables
.computeIfAbsent(tableSchema) { _: String? -> LinkedHashMap() }
.computeIfAbsent(tableName) { _: String? ->
TableDefinition(LinkedHashMap())
}
tableDefinition.columns[columnName] =
ColumnDefinition(columnName, dataType, 0, fromIsNullableIsoString(isNullable))
.computeIfAbsent(schemaName) { _: String? -> HashMap() }
.computeIfAbsent(tableName) { _: String? -> table }
}
}
return existingTables
}

fun getTable(
edgao marked this conversation as resolved.
Show resolved Hide resolved
database: JdbcDatabase,
schemaName: String,
tableName: String,
): TableDefinition? {
try {
val columns = LinkedHashMap<String, ColumnDefinition>()
database.queryJsons("DESCRIBE TABLE \"$schemaName\".\"$tableName\"").map {
edgao marked this conversation as resolved.
Show resolved Hide resolved
val columnName = it["name"].asText()
val dataType =
when (
val snowflakeDataType =
it["type"].asText().takeWhile { char -> char != '(' }
) {
"VARCHAR" -> "TEXT"
else -> snowflakeDataType
}

val isNullable = it["null?"].asText() == "Y"
edgao marked this conversation as resolved.
Show resolved Hide resolved
columns[columnName] =
ColumnDefinition(columnName, dataType, columnSize = 0, isNullable)
}
return TableDefinition(columns)
} catch (e: SnowflakeSQLException) {
if (e.message != null && e.message!!.contains("does not exist")) {
return null
} else {
throw e
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,20 @@
*/
package io.airbyte.integrations.destination.snowflake.typing_deduping

import com.fasterxml.jackson.databind.JsonNode
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler.Companion.fromIsNullableIsoString
import io.airbyte.integrations.base.destination.typing_deduping.BaseDestinationV1V2Migrator
import io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.containsAllIgnoreCase
import io.airbyte.integrations.base.destination.typing_deduping.NamespacedTableName
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.*
import lombok.SneakyThrows
import net.snowflake.client.jdbc.SnowflakeSQLException

private val LOGGER = KotlinLogging.logger {}

@SuppressFBWarnings("NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE")
class SnowflakeV1V2Migrator(
Expand All @@ -26,19 +27,19 @@ class SnowflakeV1V2Migrator(
@SneakyThrows
@Throws(Exception::class)
override fun doesAirbyteInternalNamespaceExist(streamConfig: StreamConfig?): Boolean {
return database
.queryJsons(
"""
SELECT SCHEMA_NAME
FROM information_schema.schemata
WHERE schema_name = ?
AND catalog_name = ?;
""".trimIndent(),
streamConfig!!.id.rawNamespace,
databaseName
)
.isNotEmpty()
try {
return database
.queryJsons(
"SHOW SCHEMAS LIKE '${streamConfig!!.id.rawNamespace}' IN DATABASE \"$databaseName\";",
)
.isNotEmpty()
} catch (e: SnowflakeSQLException) {
edgao marked this conversation as resolved.
Show resolved Hide resolved
if (e.message != null && e.message!!.contains("does not exist")) {
return false
} else {
throw e
}
}
}

override fun schemaMatchesExpectation(
Expand All @@ -54,50 +55,9 @@ class SnowflakeV1V2Migrator(
namespace: String?,
tableName: String?
): Optional<TableDefinition> {
// TODO this looks similar to SnowflakeDestinationHandler#findExistingTables, with a twist;
// databaseName not upper-cased and rawNamespace and rawTableName as-is (no uppercase).
// The obvious database.getMetaData().getColumns() solution doesn't work, because JDBC
// translates
// VARIANT as VARCHAR
val columns =
database
.queryJsons(
"""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_catalog = ?
AND table_schema = ?
AND table_name = ?
ORDER BY ordinal_position;

""".trimIndent(),
databaseName,
namespace!!,
tableName!!
)
.stream()
.collect(
{ LinkedHashMap() },
{ map: java.util.LinkedHashMap<String, ColumnDefinition>, row: JsonNode ->
map[row["COLUMN_NAME"].asText()] =
ColumnDefinition(
row["COLUMN_NAME"].asText(),
row["DATA_TYPE"].asText(),
0,
fromIsNullableIsoString(row["IS_NULLABLE"].asText())
)
},
{
obj: java.util.LinkedHashMap<String, ColumnDefinition>,
m: java.util.LinkedHashMap<String, ColumnDefinition>? ->
obj.putAll(m!!)
}
)
return if (columns.isEmpty()) {
Optional.empty()
} else {
Optional.of(TableDefinition(columns))
}
return Optional.ofNullable(
SnowflakeDestinationHandler.getTable(database, namespace!!, tableName!!)
)
}

override fun convertToV1RawName(streamConfig: StreamConfig): NamespacedTableName {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,9 @@ class SnowflakeInternalStagingLowercaseDatabaseTypingDedupingTest :
override fun testV1V2Migration() {
super.testV1V2Migration()
}

@Test
override fun identicalNameSimultaneousSync() {
super.identicalNameSimultaneousSync()
}
}
5 changes: 3 additions & 2 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,10 @@ desired namespace.

| Version | Date | Pull Request | Subject |
| :-------------- | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 3.13.0 | 2024-09-17 | [\#45422](https://github.com/airbytehq/airbyte/pull/45422) | speed up metadata queries |
| 3.12.0 | 2024-09-17 | [\#38585](https://github.com/airbytehq/airbyte/pull/38585) | force UTF8 collation when creating schemas and tables |
| 3.11.12 | 2024-09-12 | [\#45370](https://github.com/airbytehq/airbyte/pull/45370) | fix a race condition in our orphanedThreadFilter |
| 3.11.11 | 2024-08-20 | [\#44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase message parsing limit to 100mb |
| 3.11.12 | 2024-09-12 | [\#45370](https://github.com/airbytehq/airbyte/pull/45370) | fix a race condition in our orphanedThreadFilter |
| 3.11.11 | 2024-08-20 | [\#44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase message parsing limit to 100mb |
| 3.11.10 | 2024-08-22 | [\#44526](https://github.com/airbytehq/airbyte/pull/44526) | Revert protocol compliance fix |
| 3.11.9 | 2024-08-19 | [\#43367](https://github.com/airbytehq/airbyte/pull/43367) | Add opt in using MERGE statement for upserts and deletes |
| 3.11.8 | 2024-08-16 | [\#42505](https://github.com/airbytehq/airbyte/pull/42505) | Fix bug in refreshes logic (already mitigated in platform, just fixing protocol compliance) |
Expand Down
Loading