Skip to content

Commit

Permalink
destination-snowflake-oc-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
gisripa committed Jun 11, 2024
1 parent 9f0ce4f commit 26b07ed
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.10.0
dockerImageTag: 3.10.1
dockerRepository: airbyte/destination-snowflake
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
githubIssueLabel: destination-snowflake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,11 @@ class SnowflakeAbMetaAndGenIdMigration(private val database: JdbcDatabase) :
stream: StreamConfig,
state: DestinationInitialStatus<SnowflakeState>
): Migration.MigrationResult<SnowflakeState> {
if (state.destinationState.isAirbyteMetaPresentInRaw) {
log.info {
"Skipping airbyte_meta/generation_id migration for ${stream.id.originalNamespace}.${stream.id.originalName} " +
"because previous destination state has isAirbyteMetaPresent"
}
return Migration.MigrationResult(state.destinationState, false)
}

if (!state.initialRawTableStatus.rawTableExists) {
// The raw table doesn't exist. No migration necessary. Update the state.
log.info {
"Skipping airbyte_meta/generation_id migration for ${stream.id.originalNamespace}.${stream.id.originalName} because the raw table doesn't exist"
"Skipping airbyte_meta/generation_id migration for ${stream.id.originalNamespace}.${stream.id.originalName} " +
"because the raw table doesn't exist for sync mode ${stream.destinationSyncMode}"
}
return Migration.MigrationResult(
state.destinationState.copy(isAirbyteMetaPresentInRaw = true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import io.airbyte.integrations.base.destination.typing_deduping.Union
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf
import io.airbyte.integrations.destination.snowflake.SnowflakeDatabaseUtils
import io.airbyte.integrations.destination.snowflake.migrations.SnowflakeState
import io.airbyte.protocol.models.v0.DestinationSyncMode
import java.sql.Connection
import java.sql.DatabaseMetaData
import java.sql.ResultSet
Expand Down Expand Up @@ -86,16 +85,8 @@ class SnowflakeDestinationHandler(
@Throws(Exception::class)
private fun getInitialRawTableState(
id: StreamId,
destinationSyncMode: DestinationSyncMode
): InitialRawTableStatus {
// Short-circuit for overwrite, table will be truncated anyway
if (destinationSyncMode == DestinationSyncMode.OVERWRITE) {
return InitialRawTableStatus(
rawTableExists = false,
hasUnprocessedRecords = false,
maxProcessedTimestamp = Optional.empty()
)
}
val tableExists =
database.executeMetadataQuery { databaseMetaData: DatabaseMetaData ->
LOGGER.info("Retrieving table from Db metadata: {} {}", id.rawNamespace, id.rawName)
Expand Down Expand Up @@ -397,8 +388,7 @@ class SnowflakeDestinationHandler(
!existingSchemaMatchesStreamConfig(streamConfig, existingTable!!)
isFinalTableEmpty = hasRowCount && tableRowCounts[namespace]!![name] == 0
}
val initialRawTableState =
getInitialRawTableState(streamConfig.id, streamConfig.destinationSyncMode)
val initialRawTableState = getInitialRawTableState(streamConfig.id)
val destinationState =
destinationStates.getOrDefault(
streamConfig.id.asPair(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import javax.sql.DataSource
import kotlin.concurrent.Volatile
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertThrows
import org.junit.jupiter.api.Test

abstract class AbstractSnowflakeTypingDedupingTest : BaseTypingDedupingTest() {
Expand Down Expand Up @@ -336,6 +337,82 @@ abstract class AbstractSnowflakeTypingDedupingTest : BaseTypingDedupingTest() {
}
}

@Test
fun testAirbyteMetaAndGenerationIdMigrationForOverwrite() {
val catalog =
ConfiguredAirbyteCatalog()
.withStreams(
listOf(
ConfiguredAirbyteStream()
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
.withSyncId(42L)
.withGenerationId(43L)
.withMinimumGenerationId(0L)
.withStream(
AirbyteStream()
.withNamespace(streamNamespace)
.withName(streamName)
.withJsonSchema(BaseTypingDedupingTest.Companion.SCHEMA),
),
),
)

// First sync
val messages1 = readMessages("dat/sync1_messages.jsonl")
runSync(catalog, messages1, "airbyte/destination-snowflake:3.9.1")

// Second sync
val messages2 = readMessages("dat/sync2_messages.jsonl")
runSync(catalog, messages2)

val expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_overwrite_raw.jsonl")
val expectedFinalRecords2 =
readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl")
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison())
}

@Test
fun testAirbyteMetaAndGenerationIdMigrationForOverwrite310Broken() {
val catalog =
ConfiguredAirbyteCatalog()
.withStreams(
listOf(
ConfiguredAirbyteStream()
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
.withSyncId(42L)
.withGenerationId(43L)
.withMinimumGenerationId(0L)
.withStream(
AirbyteStream()
.withNamespace(streamNamespace)
.withName(streamName)
.withJsonSchema(BaseTypingDedupingTest.Companion.SCHEMA),
),
),
)

// First sync
val messages1 = readMessages("dat/sync1_messages.jsonl")
runSync(catalog, messages1, "airbyte/destination-snowflake:3.9.1")

// Second sync
// This throws exception due to a broken migration in connector
assertThrows(TestHarnessException::class.java) {
runSync(catalog, messages1, "airbyte/destination-snowflake:3.10.0")
}

// Third sync
val messages2 = readMessages("dat/sync2_messages.jsonl")
runSync(catalog, messages2)

val expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_overwrite_raw.jsonl")
val expectedFinalRecords2 =
readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl")
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison())
}

private val defaultSchema: String
get() = config!!["schema"].asText()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Only sync2 messages present in overwrite mode
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43}
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43}
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43}
1 change: 1 addition & 0 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ desired namespace.

| Version | Date | Pull Request | Subject |
|:----------------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.10.1 | 2024-06-11 | [\#39399](https://github.com/airbytehq/airbyte/pull/39399) | Bug fix for _airbyte_meta not migrated in OVERWRITE mode |
| 3.10.0 | 2024-06-10 | [\#39107](https://github.com/airbytehq/airbyte/pull/39107) | _airbyte_meta and _airbyte_generation_id in Raw tables and final tables |
| 3.9.1 | 2024-06-05 | [\#39135](https://github.com/airbytehq/airbyte/pull/39135) | Improved error handling for Staging files |
| 3.9.0 | 2024-05-23 | [\#38658](https://github.com/airbytehq/airbyte/pull/38658) | Adapting to newer interfaces from #38107 |
Expand Down

0 comments on commit 26b07ed

Please sign in to comment.