From a4d667f1df62f051b68fb57cd5543d3fb2d5b995 Mon Sep 17 00:00:00 2001 From: Stephane Geneix Date: Tue, 10 Sep 2024 13:30:18 -0700 Subject: [PATCH] destination-snowflake: bump CDK --- .../destination-snowflake/build.gradle | 2 +- .../destination-snowflake/gradle.properties | 3 +- .../destination-snowflake/metadata.yaml | 2 +- .../snowflake/SnowflakeDatabaseUtils.kt | 2 +- .../snowflake/SnowflakeDestination.kt | 24 ++-- .../SnowflakeDestinationIntegrationTest.kt | 13 +- ...actSnowflakeSqlGeneratorIntegrationTest.kt | 16 --- .../AbstractSnowflakeTypingDedupingTest.kt | 131 +++++++++++++----- docs/integrations/destinations/snowflake.md | 1 + 9 files changed, 120 insertions(+), 74 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/build.gradle b/airbyte-integrations/connectors/destination-snowflake/build.gradle index efa6fbd2dd74..df64c083f272 100644 --- a/airbyte-integrations/connectors/destination-snowflake/build.gradle +++ b/airbyte-integrations/connectors/destination-snowflake/build.gradle @@ -3,7 +3,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.44.19' + cdkVersionRequired = '0.45.0' features = ['db-destinations', 's3-destinations', 'typing-deduping'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/destination-snowflake/gradle.properties b/airbyte-integrations/connectors/destination-snowflake/gradle.properties index 061eb2c399d9..666ff08b0834 100644 --- a/airbyte-integrations/connectors/destination-snowflake/gradle.properties +++ b/airbyte-integrations/connectors/destination-snowflake/gradle.properties @@ -1 +1,2 @@ -JunitMethodExecutionTimeout=30 m +JunitMethodExecutionTimeout=20 m +testExecutionConcurrency=2 diff --git a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml index 99d68c9cb339..ea0985e32869 100644 --- a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 424892c4-daac-4491-b35d-c6688ba547ba - dockerImageTag: 3.11.11 + dockerImageTag: 3.11.12 dockerRepository: airbyte/destination-snowflake documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake githubIssueLabel: destination-snowflake diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDatabaseUtils.kt b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDatabaseUtils.kt index cb8ce1f2ffd2..f0e70884fea5 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDatabaseUtils.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDatabaseUtils.kt @@ -57,7 +57,7 @@ object SnowflakeDatabaseUtils { private const val IP_NOT_IN_WHITE_LIST_ERR_MSG = "not allowed to access Snowflake" @JvmStatic - fun createDataSource(config: JsonNode, airbyteEnvironment: String?): HikariDataSource { + fun createDataSource(config: JsonNode, airbyteEnvironment: String?): DataSource { val dataSource = HikariDataSource() diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.kt b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.kt index 3cf78b14c453..aebb0389d9ef 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.kt @@ -316,19 +316,17 @@ constructor( } fun main(args: Array) { - IntegrationRunner.addOrphanedThreadFilter { t: Thread -> - if (IntegrationRunner.getThreadCreationInfo(t) != null) { - for (stackTraceElement in IntegrationRunner.getThreadCreationInfo(t)!!.stack) { - val stackClassName = stackTraceElement.className - val stackMethodName = stackTraceElement.methodName - if ( - SFStatement::class.java.canonicalName == stackClassName && - "close" == stackMethodName || - SFSession::class.java.canonicalName == stackClassName && - "callHeartBeatWithQueryTimeout" == stackMethodName - ) { - return@addOrphanedThreadFilter false - } + IntegrationRunner.addOrphanedThreadFilter { threadInfo: IntegrationRunner.OrphanedThreadInfo -> + for (stackTraceElement in threadInfo.threadCreationInfo.stack) { + val stackClassName = stackTraceElement.className + val stackMethodName = stackTraceElement.methodName + if ( + SFStatement::class.java.canonicalName == stackClassName && + "close" == stackMethodName || + SFSession::class.java.canonicalName == stackClassName && + "callHeartBeatWithQueryTimeout" == stackMethodName + ) { + return@addOrphanedThreadFilter false } } true diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationIntegrationTest.kt b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationIntegrationTest.kt index 38be9827db01..baca1c92752b 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationIntegrationTest.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationIntegrationTest.kt @@ -33,15 +33,20 @@ internal class SnowflakeDestinationIntegrationTest { @Throws(Exception::class) fun testCheckFailsWithInvalidPermissions() { // TODO(sherifnada) this test case is assumes config.json does not have permission to access - // the - // schema + // the schema RESTRICTED_SCHEMA was created by the user AIRBYTETESTER, and then permissions + // were removed with + // 'REVOKE ALL ON SCHEMA restricted_schema FROM ROLE integration_tester_destination;' // this connector should be updated with multiple credentials, each with a clear purpose - // (valid, - // invalid: insufficient permissions, invalid: wrong password, etc..) + // (valid, invalid: insufficient permissions, invalid: wrong password, etc..) val credentialsJsonString = deserialize(Files.readString(Paths.get("secrets/config.json"))) val check = SnowflakeDestination(OssCloudEnvVarConsts.AIRBYTE_OSS).check(credentialsJsonString) Assertions.assertEquals(AirbyteConnectionStatus.Status.FAILED, check!!.status) + Assertions.assertEquals( + "Could not connect with provided configuration. Encountered Error with Snowflake Configuration: " + + "Current role does not have permissions on the target schema please verify your privileges", + check.message + ) } @Test diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeSqlGeneratorIntegrationTest.kt b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeSqlGeneratorIntegrationTest.kt index 0ff0dab36112..b5cf7aa14f7f 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeSqlGeneratorIntegrationTest.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeSqlGeneratorIntegrationTest.kt @@ -6,7 +6,6 @@ package io.airbyte.integrations.destination.snowflake.typing_deduping import com.fasterxml.jackson.databind.JsonNode import com.google.common.collect.ImmutableMap import edu.umd.cs.findbugs.annotations.SuppressFBWarnings -import io.airbyte.cdk.db.factory.DataSourceFactory import io.airbyte.cdk.db.jdbc.JdbcDatabase import io.airbyte.cdk.db.jdbc.JdbcUtils import io.airbyte.cdk.integrations.base.JavaBaseConstants @@ -1857,20 +1856,5 @@ abstract class AbstractSnowflakeSqlGeneratorIntegrationTest : private var dataSource: DataSource = SnowflakeDatabaseUtils.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS) private var database: JdbcDatabase = SnowflakeDatabaseUtils.getDatabase(dataSource) - - @JvmStatic - @BeforeAll - fun setupSnowflake(): Unit { - dataSource = - SnowflakeDatabaseUtils.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS) - database = SnowflakeDatabaseUtils.getDatabase(dataSource) - } - - @JvmStatic - @AfterAll - @Throws(Exception::class) - fun teardownSnowflake(): Unit { - DataSourceFactory.close(dataSource) - } } } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.kt b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.kt index 66eae99f3bd3..56777528d6f2 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.kt @@ -21,9 +21,13 @@ import io.airbyte.workers.exception.TestHarnessException import io.github.oshai.kotlinlogging.KotlinLogging import java.nio.file.Path import java.sql.SQLException +import java.time.Instant +import java.time.temporal.ChronoUnit import java.util.* +import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger import javax.sql.DataSource -import kotlin.concurrent.Volatile import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Disabled @@ -56,7 +60,7 @@ abstract class AbstractSnowflakeTypingDedupingTest( dataSource = SnowflakeDatabaseUtils.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS) database = SnowflakeDatabaseUtils.getDatabase(dataSource) - cleanAirbyteInternalTable(databaseName, database, forceUppercaseIdentifiers) + cleanAirbyteInternalTable(database) return config } @@ -419,48 +423,101 @@ abstract class AbstractSnowflakeTypingDedupingTest( "_AIRBYTE_GENERATION_ID", ) - @Volatile private var cleanedAirbyteInternalTable = false + private val cleanedAirbyteInternalTable = AtomicBoolean(false) + private val threadId = AtomicInteger(0) @Throws(SQLException::class) - private fun cleanAirbyteInternalTable( - databaseName: String, - database: JdbcDatabase?, - forceUppercase: Boolean, - ) { - if (!cleanedAirbyteInternalTable) { - synchronized(AbstractSnowflakeTypingDedupingTest::class.java) { - if (!cleanedAirbyteInternalTable) { - val destinationStateTableExists = - database!!.executeMetadataQuery { - it.getTables( - databaseName, - if (forceUppercase) { - "AIRBYTE_INTERNAL" - } else { - "airbyte_internal" - }, - if (forceUppercase) { - "_AIRBYTE_DESTINATION_STATE" - } else { - "_airbyte_destination_state" - }, - null - ) - .next() - } - if (destinationStateTableExists) { - database.execute( - """DELETE FROM "airbyte_internal"."_airbyte_destination_state" WHERE "updated_at" < current_date() - 7""", + private fun cleanAirbyteInternalTable(database: JdbcDatabase?) { + if ( + database!! + .queryJsons("SHOW PARAMETERS LIKE 'QUOTED_IDENTIFIERS_IGNORE_CASE';") + .first() + .get("value") + .asText() + .toBoolean() + ) { + return + } + + if (!cleanedAirbyteInternalTable.getAndSet(true)) { + val cleanupCutoffHours = 6 + LOGGER.info { "tableCleaner running" } + val executor = + Executors.newSingleThreadExecutor { + val thread = Executors.defaultThreadFactory().newThread(it) + thread.name = + "airbyteInternalTableCleanupThread-${threadId.incrementAndGet()}" + thread.isDaemon = true + thread + } + executor.execute { + database.execute( + "DELETE FROM \"airbyte_internal\".\"_airbyte_destination_state\" WHERE \"updated_at\" < timestampadd('hours', -$cleanupCutoffHours, current_timestamp())", + ) + } + executor.execute { + database.execute( + "DELETE FROM \"AIRBYTE_INTERNAL\".\"_AIRBYTE_DESTINATION_STATE\" WHERE \"UPDATED_AT\" < timestampadd('hours', -$cleanupCutoffHours, current_timestamp())", + ) + } + executor.execute { + val schemaList = + database.queryJsons( + "SHOW SCHEMAS IN DATABASE INTEGRATION_TEST_DESTINATION;", + ) + LOGGER.info( + "tableCleaner found ${schemaList.size} schemas in database INTEGRATION_TEST_DESTINATION" + ) + schemaList + .associate { + it.get("name").asText() to Instant.parse(it.get("created_on").asText()) + } + .filter { + it.value.isBefore( + Instant.now().minus(cleanupCutoffHours.toLong(), ChronoUnit.HOURS) ) } - cleanedAirbyteInternalTable = true + .filter { + it.key.startsWith("SQL_GENERATOR", ignoreCase = true) || + it.key.startsWith("TDTEST", ignoreCase = true) || + it.key.startsWith("TYPING_DEDUPING", ignoreCase = true) + } + .forEach { + executor.execute { + database.execute( + "DROP SCHEMA INTEGRATION_TEST_DESTINATION.\"${it.key}\" /* created at ${it.value} */;" + ) + } + } + } + for (schemaName in + listOf("AIRBYTE_INTERNAL", "airbyte_internal", "overridden_raw_dataset")) { + executor.execute { + val sql = + "SHOW TABLES IN schema INTEGRATION_TEST_DESTINATION.\"$schemaName\";" + val tableList = database.queryJsons(sql) + LOGGER.info { + "tableCleaner found ${tableList.size} tables in schema $schemaName" + } + tableList + .associate { + it.get("name").asText() to + Instant.parse(it.get("created_on").asText()) + } + .filter { + it.value.isBefore(Instant.now().minus(6, ChronoUnit.HOURS)) && + it.key.startsWith("TDTEST", ignoreCase = true) + } + .forEach { + executor.execute { + database.execute( + "DROP TABLE INTEGRATION_TEST_DESTINATION.\"$schemaName\".\"${it.key}\" /* created at ${it.value} */;" + ) + } + } } } } } } } - -open class Batch(val name: String) - -class LocalFileBatch(name: String) : Batch(name) diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index a93742b26206..4423065896cc 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -268,6 +268,7 @@ desired namespace. | Version | Date | Pull Request | Subject | | :-------------- | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| 3.11.12 | 2024-09-12 | [45370](https://github.com/airbytehq/airbyte/pull/45370) | fix a race condition in our orphanedThreadFilter | | 3.11.11 | 2024-08-20 | [44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase message parsing limit to 100mb | | 3.11.10 | 2024-08-22 | [\#44526](https://github.com/airbytehq/airbyte/pull/44526) | Revert protocol compliance fix | | 3.11.9 | 2024-08-19 | [\#43367](https://github.com/airbytehq/airbyte/pull/43367) | Add opt in using MERGE statement for upserts and deletes |