Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snowflake Connector - Reduce computing resources used for metadata queries #43452

Open
wants to merge 76 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 62 commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
397fa5c
Added initial caching for metadata queries
Vee7574 Aug 2, 2024
78e4b62
Added initial caching for metadata queries
Vee7574 Aug 3, 2024
f32b489
Added initial caching for metadata queries
Vee7574 Aug 6, 2024
9e53d82
Added initial caching for metadata queries
Vee7574 Aug 6, 2024
5a706ac
Added initial caching for metadata queries
Vee7574 Aug 6, 2024
7fb31fa
Added initial caching for metadata queries
Vee7574 Aug 7, 2024
48af04c
Added initial caching for metadata queries
Vee7574 Aug 7, 2024
12dc83a
Added initial caching for metadata queries
Vee7574 Aug 7, 2024
69ce596
Added initial caching for metadata queries
Vee7574 Aug 7, 2024
2c57f15
Added initial caching for metadata queries
Vee7574 Aug 7, 2024
575598d
Added initial caching for metadata queries
Vee7574 Aug 7, 2024
bd91e6f
Added initial caching for metadata queries
Vee7574 Aug 8, 2024
edb487e
Added initial version of SHOW queries
Vee7574 Aug 8, 2024
88814e9
Added initial version of SHOW queries
Vee7574 Aug 8, 2024
cf3cc98
Added initial version of SHOW queries
Vee7574 Aug 8, 2024
89ab6e5
Added initial version of SHOW queries
Vee7574 Aug 8, 2024
45cd7d0
Cleaned up the testing code to prepare for creating the initial PR
Vee7574 Aug 9, 2024
95725e2
Cleaned up the testing code to prepare for creating the initial PR
Vee7574 Aug 9, 2024
6167403
Cleaned up the testing code to prepare for creating the initial PR
Vee7574 Aug 9, 2024
cc68c4f
Cleaned up the testing code to prepare for creating the initial PR
Vee7574 Aug 9, 2024
29c6785
Cleaned up the testing code to prepare for creating the initial PR
Vee7574 Aug 10, 2024
5deb78f
Added logging for verifying the results from show queries
Vee7574 Aug 19, 2024
640049c
Added logging for verifying the results from show queries
Vee7574 Aug 19, 2024
e0bcf2d
Added logging for verifying the results from show queries
Vee7574 Aug 19, 2024
37302ee
Added logging for verifying the results from show queries
Vee7574 Aug 19, 2024
a688e17
Added logging for verifying the results from show queries
Vee7574 Aug 19, 2024
40acab0
Added logging for verifying the results from show queries
Vee7574 Aug 19, 2024
7372bd2
Added logging for verifying the results from show queries
Vee7574 Aug 19, 2024
29b4260
Added logging for verifying the results from show queries
Vee7574 Aug 20, 2024
7bed233
Cleaning up the test code to prepare an initial PR
Vee7574 Aug 20, 2024
c6a37de
Cleaning up the test code to prepare an initial PR
Vee7574 Aug 20, 2024
4b33e38
Cleaning up the test code to prepare an initial PR
Vee7574 Aug 20, 2024
08c30a2
Cleaning up the test code to prepare an initial PR
Vee7574 Aug 20, 2024
9946cfb
Merge branch 'master' into snowflake-reduce-metadata-queries
Vee7574 Aug 20, 2024
0bec496
Cleaning up the test code to prepare an initial PR
Vee7574 Aug 20, 2024
9cbcc60
Updated connector version
Vee7574 Aug 21, 2024
54f7721
Updated connector version
Vee7574 Aug 21, 2024
5cd96c5
Updated connector version
Vee7574 Aug 21, 2024
b332563
Merged conflicts from master
Vee7574 Aug 21, 2024
2adc898
Merge branch 'master' into snowflake-reduce-metadata-queries
Vee7574 Aug 21, 2024
976612e
Merged conflicts from master
Vee7574 Aug 21, 2024
e61387b
Replaced the DatabaseMetaData query with a SHOW TABLES query
Vee7574 Aug 21, 2024
b4b7f7b
Improved exception handling
Vee7574 Aug 21, 2024
8a5ed25
Improved exception handling
Vee7574 Aug 21, 2024
0a3f801
Changing the use of DatabaseMetadata to use Show tables query
Vee7574 Aug 22, 2024
82e3eb3
Changing the use of DatabaseMetadata to use Show tables query
Vee7574 Aug 22, 2024
cf82e90
Merge branch 'master' into snowflake-reduce-metadata-queries
Vee7574 Aug 22, 2024
a096193
Updated version number in metadata.yaml
Vee7574 Aug 22, 2024
1d4a713
Added exception handling to handle closing of connections
Vee7574 Aug 23, 2024
6462915
Added exception handling to handle closing of connections
Vee7574 Aug 23, 2024
b732c83
Added exception handling to handle closing of connections
Vee7574 Aug 23, 2024
c9096a2
Added exception handling to handle closing of connections
Vee7574 Aug 26, 2024
51efa7c
Added exception handling to handle closing of connections
Vee7574 Aug 26, 2024
5c729c1
Added exception handling to handle closing of connections
Vee7574 Aug 26, 2024
3a0b8a9
Added exception handling to handle closing of connections
Vee7574 Aug 26, 2024
0cd8c53
Added exception handling to handle closing of connections
Vee7574 Aug 27, 2024
418bb6e
Removing temporary code that was added for troubleshooting
Vee7574 Aug 27, 2024
3121009
Removing temporary code that was added for troubleshooting
Vee7574 Aug 27, 2024
e82b761
Removing temporary code that was added for troubleshooting
Vee7574 Aug 27, 2024
7d66e12
Removing temporary code that was added for troubleshooting
Vee7574 Aug 27, 2024
671e9a8
Removing temporary code that was added for troubleshooting
Vee7574 Aug 27, 2024
ed3dbdc
Cleaning up extra whitespace
Vee7574 Aug 27, 2024
ff65462
Testing the addition of .use for managing the dataSource.connection
Vee7574 Aug 28, 2024
38b76d4
Testing the addition of .use for managing the dataSource.connection
Vee7574 Aug 28, 2024
679e259
Testing the addition of .use for managing the dataSource.connection
Vee7574 Aug 28, 2024
b1f5b81
Updated the exception handling code
Vee7574 Aug 28, 2024
701bc55
Changed string.format to use Kotlin templates
Vee7574 Aug 28, 2024
ede80bb
Changed string.format to use Kotlin templates
Vee7574 Aug 28, 2024
e37c092
Changing exception handling to handle sql exceptions
Vee7574 Aug 28, 2024
468fbe2
Updating docker image tag
Vee7574 Aug 28, 2024
f1bd975
Updating docker image tag
Vee7574 Aug 28, 2024
3a8fe08
Updating docker image tag
Vee7574 Aug 28, 2024
f3635bb
Removed commented code
Vee7574 Aug 28, 2024
3c96f94
Removed commented code
Vee7574 Aug 28, 2024
77832a4
Removed commented code
Vee7574 Aug 28, 2024
b62b6ef
Incorporated code review comments
Vee7574 Sep 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,20 +115,29 @@ constructor(
statementCreator: CheckedFunction<Connection, PreparedStatement, SQLException>,
recordTransform: CheckedFunction<ResultSet, T, SQLException>
): Stream<T> {
val connection = dataSource.connection
return JdbcDatabase.Companion.toUnsafeStream<T>(
var connection = dataSource.connection

try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

connection.use. Also, keep connection a val instead of a var

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I have changed the connection to a val. The code won't be changed to connection.use as we discussed since the connection needs to be open when the result set is returned

return JdbcDatabase.Companion.toUnsafeStream<T>(
statementCreator.apply(connection).executeQuery(),
recordTransform
)
.onClose(
Runnable {
try {
LOGGER.info { "closing connection" }
connection.close()
} catch (e: SQLException) {
throw RuntimeException(e)
.onClose(
Runnable {
try {
LOGGER.info { "closing connection" }
connection.close()
} catch (e: SQLException) {
throw RuntimeException(e)
}
}
}
)
)
} catch (e: Throwable) {
//Close the connection and rethrow the exception
if (connection != null) {
connection.close()
}
throw e
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.44.14'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
//TODO: Change to false before merging to master
useLocalCdk = true
}

java {
Expand Down Expand Up @@ -44,4 +45,5 @@ integrationTestJava {
dependencies {
implementation 'net.snowflake:snowflake-jdbc:3.14.1'
implementation 'org.apache.commons:commons-text:1.10.0'
implementation 'org.json:json:20210307'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this. We already have Jackson dependency in our dependency chain.

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.11.9
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the change? If unneeded, I'd rather keep this out side of the current PR

dockerImageTag: 3.11.11
dockerRepository: airbyte/destination-snowflake
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
githubIssueLabel: destination-snowflake
Expand Down Expand Up @@ -148,4 +148,4 @@ data:
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
metadataSpecVersion: "1.0"
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -293,4 +293,8 @@ object SnowflakeDatabaseUtils {
AirbyteProtocolType.UNKNOWN -> "VARIANT"
}
}

fun fromIsNullableSnowflakeString(isNullable: String?): Boolean {
return "true".equals(isNullable, ignoreCase = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove and use String.toBoolean()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the function and used String.toBoolean

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,18 @@ import io.airbyte.integrations.base.destination.typing_deduping.Struct
import io.airbyte.integrations.base.destination.typing_deduping.Union
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf
import io.airbyte.integrations.destination.snowflake.SnowflakeDatabaseUtils
import io.airbyte.integrations.destination.snowflake.SnowflakeDatabaseUtils.fromIsNullableSnowflakeString
import io.airbyte.integrations.destination.snowflake.migrations.SnowflakeState
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSqlGenerator.Companion.QUOTE
import java.sql.Connection
import java.sql.DatabaseMetaData
import java.sql.ResultSet
import java.sql.SQLException
import java.time.Instant
import java.util.*
import java.util.stream.Collectors
import net.snowflake.client.jdbc.SnowflakeSQLException
import org.apache.commons.text.StringSubstitutor
import org.json.JSONObject
import org.jooq.SQLDialect
import org.slf4j.Logger
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -70,75 +71,83 @@ class SnowflakeDestinationHandler(
private fun getFinalTableRowCount(
streamIds: List<StreamId>
): LinkedHashMap<String, LinkedHashMap<String, Int>> {
val tableRowCounts = LinkedHashMap<String, LinkedHashMap<String, Int>>()
// convert list stream to array
val namespaces = streamIds.map { it.finalNamespace }.toTypedArray()
val names = streamIds.map { it.finalName }.toTypedArray()
val query =
"""
|SELECT table_schema, table_name, row_count
|FROM information_schema.tables
|WHERE table_catalog = ?
|AND table_schema IN (${IntRange(1, streamIds.size).joinToString { "?" }})
|AND table_name IN (${IntRange(1, streamIds.size).joinToString { "?" }})
|""".trimMargin()
val bindValues = arrayOf(databaseName) + namespaces + names
val results: List<JsonNode> = database.queryJsons(query, *bindValues)
for (result in results) {
val tableSchema = result["TABLE_SCHEMA"].asText()
val tableName = result["TABLE_NAME"].asText()
val rowCount = result["ROW_COUNT"].asInt()
tableRowCounts
.computeIfAbsent(tableSchema) { _: String? -> LinkedHashMap() }[tableName] =
rowCount

val tableRowCountsFromShowQuery = LinkedHashMap<String, LinkedHashMap<String, Int>>()
var showColumnsResult: List<JsonNode> = listOf()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this and set the val inside your try block


try {
for (stream in streamIds) {
val showColumnsQuery =
String.format(
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use kotlin templates instead of String.format

SHOW TABLES LIKE '%s' IN "%s"."%s";
""".trimIndent(),
stream.finalName,
databaseName,
stream.finalNamespace,
)
showColumnsResult = database.queryJsons(
showColumnsQuery,
)
for (result in showColumnsResult) {
val tableSchema = result["schema_name"].asText()
val tableName = result["name"].asText()
val rowCount = result["rows"].asText()

tableRowCountsFromShowQuery
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indentation is super confusing here (probably enforced by our format command). Any way to change that, or is our formatter going to bark at you?
Also, you can simplify with map.computeIfAbsent(tableSchema) { LinkedHashMap() }

Copy link
Contributor

@stephane-airbyte stephane-airbyte Aug 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can also use a linkedlist.withDefault, which would simplify this further

.computeIfAbsent(tableSchema) { _: String? -> LinkedHashMap() }[tableName] =
rowCount.toInt()
}
}
} catch (e: SQLException) {
showColumnsResult.stream().close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why we need to close the stream here

//Not re-throwing the exception since the SQLException occurs when the table does not exist
//throw e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the commented throw e?

}
return tableRowCounts
return tableRowCountsFromShowQuery
}


@Throws(Exception::class)
private fun getInitialRawTableState(
id: StreamId,
suffix: String,
): InitialRawTableStatus {

val rawTableName = id.rawName + suffix
val tableExists =
database.executeMetadataQuery { databaseMetaData: DatabaseMetaData ->
LOGGER.info(
"Retrieving table from Db metadata: {} {}",
var tableExists = false
var showTablesResult: List<JsonNode> = listOf()

try {
val showTablesQuery =
String.format(
"""
SHOW TABLES LIKE '%s' IN "%s"."%s";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use kotlin templates instead of String.format

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does that work when the QUOTED_IDENTIFIERS_IGNORE_CASE is set to true? We have a test class that sets that for the testDatabase

""".trimIndent(),
rawTableName,
databaseName,
id.rawNamespace,
rawTableName
)
try {
val rs =
databaseMetaData.getTables(
databaseName,
id.rawNamespace,
rawTableName,
null
)
// When QUOTED_IDENTIFIERS_IGNORE_CASE is set to true, the raw table is
// interpreted as uppercase
// in db metadata calls. check for both
val rsUppercase =
databaseMetaData.getTables(
databaseName,
id.rawNamespace.uppercase(),
rawTableName.uppercase(),
null
)
rs.next() || rsUppercase.next()
} catch (e: SQLException) {
LOGGER.error("Failed to retrieve table metadata", e)
throw RuntimeException(e)
}
)
showTablesResult = database.queryJsons(
showTablesQuery,
)
if(showTablesResult.size > 0) {
tableExists = true
}
} catch (e: SQLException) {
showTablesResult.stream().close()
//Not re-throwing the exception since the SQLException occurs when the table does not exist
//throw e
}

if (!tableExists) {
return InitialRawTableStatus(
rawTableExists = false,
hasUnprocessedRecords = false,
maxProcessedTimestamp = Optional.empty()
)
}

// Snowflake timestamps have nanosecond precision, so decrement by 1ns
// And use two explicit queries because COALESCE doesn't short-circuit.
// This first query tries to find the oldest raw record with loaded_at = NULL
Expand Down Expand Up @@ -560,6 +569,7 @@ class SnowflakeDestinationHandler(
}

companion object {

private val LOGGER: Logger =
LoggerFactory.getLogger(SnowflakeDestinationHandler::class.java)
const val EXCEPTION_COMMON_PREFIX: String =
Expand All @@ -573,39 +583,63 @@ class SnowflakeDestinationHandler(
databaseName: String,
streamIds: List<StreamId>
): LinkedHashMap<String, LinkedHashMap<String, TableDefinition>> {
val existingTables = LinkedHashMap<String, LinkedHashMap<String, TableDefinition>>()
// convert list stream to array
val namespaces = streamIds.map { it.finalNamespace }.toTypedArray()
val names = streamIds.map { it.finalName }.toTypedArray()
val query =
"""
|SELECT table_schema, table_name, column_name, data_type, is_nullable
|FROM information_schema.columns
|WHERE table_catalog = ?
|AND table_schema IN (${IntRange(1, streamIds.size).joinToString { "?" }})
|AND table_name IN (${IntRange(1, streamIds.size).joinToString { "?" }})
|ORDER BY table_schema, table_name, ordinal_position;
|""".trimMargin()

val bindValues =
arrayOf(databaseName.uppercase(Locale.getDefault())) + namespaces + names
val results: List<JsonNode> = database.queryJsons(query, *bindValues)
for (result in results) {
val tableSchema = result["TABLE_SCHEMA"].asText()
val tableName = result["TABLE_NAME"].asText()
val columnName = result["COLUMN_NAME"].asText()
val dataType = result["DATA_TYPE"].asText()
val isNullable = result["IS_NULLABLE"].asText()
val tableDefinition =
existingTables
.computeIfAbsent(tableSchema) { _: String? -> LinkedHashMap() }
.computeIfAbsent(tableName) { _: String? ->
TableDefinition(LinkedHashMap())

val existingTablesFromShowQuery =
LinkedHashMap<String, LinkedHashMap<String, TableDefinition>>()
var showColumnsResult: List<JsonNode> = listOf()

try {
for (stream in streamIds) {
val showColumnsQuery =
String.format(
"""
SHOW COLUMNS IN TABLE "%s"."%s"."%s";
""".trimIndent(),
databaseName,
stream.finalNamespace,
stream.finalName,
)
showColumnsResult = database.queryJsons(
showColumnsQuery,
)

for (result in showColumnsResult) {
val tableSchema = result["schema_name"].asText()
val tableName = result["table_name"].asText()
val columnName = result["column_name"].asText()
var dataType = JSONObject(result["data_type"].asText()).getString("type")

//TODO: Need to check if there are other datatype differences
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that should probably be checked before merging this PR. Do we have any automated test that checks this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

per the doc, it seems those are the only differences. I'd rather have a comment that points to https://docs.snowflake.com/en/sql-reference/sql/show-columns#output rather than a TODO

// between the original approach and the new approach with SHOW queries
if(dataType.equals("FIXED")) {
dataType = "NUMBER"
} else if(dataType.equals("REAL")) {
dataType = "FLOAT"
}
tableDefinition.columns[columnName] =
ColumnDefinition(columnName, dataType, 0, fromIsNullableIsoString(isNullable))

val isNullable = result["null?"].asText()
val tableDefinition =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's create the table definition outside of the loop on showColumnsResult

existingTablesFromShowQuery
.computeIfAbsent(tableSchema) { _: String? -> LinkedHashMap() }
.computeIfAbsent(tableName) { _: String? ->
TableDefinition(LinkedHashMap())
}
tableDefinition.columns[columnName] =
ColumnDefinition(
columnName,
dataType,
0,
fromIsNullableSnowflakeString(isNullable),
)
}
}
} catch (e: SQLException) {
showColumnsResult.stream().close()
//Not re-throwing the exception since the SQLException occurs when the table does not exist
//throw e
}
return existingTables
return existingTablesFromShowQuery
}
}
}

Loading
Loading