Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Snowflake: Fix for adding airbyte_meta to raw in overwrite mode #39399

Merged
merged 1 commit into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.10.0
dockerImageTag: 3.10.1
dockerRepository: airbyte/destination-snowflake
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
githubIssueLabel: destination-snowflake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,11 @@ class SnowflakeAbMetaAndGenIdMigration(private val database: JdbcDatabase) :
stream: StreamConfig,
state: DestinationInitialStatus<SnowflakeState>
): Migration.MigrationResult<SnowflakeState> {
if (state.destinationState.isAirbyteMetaPresentInRaw) {
log.info {
"Skipping airbyte_meta/generation_id migration for ${stream.id.originalNamespace}.${stream.id.originalName} " +
"because previous destination state has isAirbyteMetaPresent"
}
return Migration.MigrationResult(state.destinationState, false)
}

if (!state.initialRawTableStatus.rawTableExists) {
// The raw table doesn't exist. No migration necessary. Update the state.
log.info {
"Skipping airbyte_meta/generation_id migration for ${stream.id.originalNamespace}.${stream.id.originalName} because the raw table doesn't exist"
"Skipping airbyte_meta/generation_id migration for ${stream.id.originalNamespace}.${stream.id.originalName} " +
"because the raw table doesn't exist for sync mode ${stream.destinationSyncMode}"
}
return Migration.MigrationResult(
state.destinationState.copy(isAirbyteMetaPresentInRaw = true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import io.airbyte.integrations.base.destination.typing_deduping.Union
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf
import io.airbyte.integrations.destination.snowflake.SnowflakeDatabaseUtils
import io.airbyte.integrations.destination.snowflake.migrations.SnowflakeState
import io.airbyte.protocol.models.v0.DestinationSyncMode
import java.sql.Connection
import java.sql.DatabaseMetaData
import java.sql.ResultSet
Expand Down Expand Up @@ -86,16 +85,8 @@ class SnowflakeDestinationHandler(
@Throws(Exception::class)
private fun getInitialRawTableState(
id: StreamId,
destinationSyncMode: DestinationSyncMode
): InitialRawTableStatus {
// Short-circuit for overwrite, table will be truncated anyway
if (destinationSyncMode == DestinationSyncMode.OVERWRITE) {
return InitialRawTableStatus(
rawTableExists = false,
hasUnprocessedRecords = false,
maxProcessedTimestamp = Optional.empty()
)
}
val tableExists =
database.executeMetadataQuery { databaseMetaData: DatabaseMetaData ->
LOGGER.info("Retrieving table from Db metadata: {} {}", id.rawNamespace, id.rawName)
Expand Down Expand Up @@ -397,8 +388,7 @@ class SnowflakeDestinationHandler(
!existingSchemaMatchesStreamConfig(streamConfig, existingTable!!)
isFinalTableEmpty = hasRowCount && tableRowCounts[namespace]!![name] == 0
}
val initialRawTableState =
getInitialRawTableState(streamConfig.id, streamConfig.destinationSyncMode)
val initialRawTableState = getInitialRawTableState(streamConfig.id)
val destinationState =
destinationStates.getOrDefault(
streamConfig.id.asPair(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import javax.sql.DataSource
import kotlin.concurrent.Volatile
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertThrows
import org.junit.jupiter.api.Test

abstract class AbstractSnowflakeTypingDedupingTest : BaseTypingDedupingTest() {
Expand Down Expand Up @@ -336,6 +337,82 @@ abstract class AbstractSnowflakeTypingDedupingTest : BaseTypingDedupingTest() {
}
}

@Test
fun testAirbyteMetaAndGenerationIdMigrationForOverwrite() {
gisripa marked this conversation as resolved.
Show resolved Hide resolved
val catalog =
ConfiguredAirbyteCatalog()
.withStreams(
listOf(
ConfiguredAirbyteStream()
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
.withSyncId(42L)
.withGenerationId(43L)
.withMinimumGenerationId(0L)
.withStream(
AirbyteStream()
.withNamespace(streamNamespace)
.withName(streamName)
.withJsonSchema(BaseTypingDedupingTest.Companion.SCHEMA),
),
),
)

// First sync
val messages1 = readMessages("dat/sync1_messages.jsonl")
runSync(catalog, messages1, "airbyte/destination-snowflake:3.9.1")

// Second sync
val messages2 = readMessages("dat/sync2_messages.jsonl")
runSync(catalog, messages2)

val expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_overwrite_raw.jsonl")
val expectedFinalRecords2 =
readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl")
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison())
}

@Test
fun testAirbyteMetaAndGenerationIdMigrationForOverwrite310Broken() {
val catalog =
ConfiguredAirbyteCatalog()
.withStreams(
listOf(
ConfiguredAirbyteStream()
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
.withSyncId(42L)
.withGenerationId(43L)
.withMinimumGenerationId(0L)
.withStream(
AirbyteStream()
.withNamespace(streamNamespace)
.withName(streamName)
.withJsonSchema(BaseTypingDedupingTest.Companion.SCHEMA),
),
),
)

// First sync
val messages1 = readMessages("dat/sync1_messages.jsonl")
runSync(catalog, messages1, "airbyte/destination-snowflake:3.9.1")

// Second sync
// This throws exception due to a broken migration in connector
assertThrows(TestHarnessException::class.java) {
runSync(catalog, messages1, "airbyte/destination-snowflake:3.10.0")
}

// Third sync
val messages2 = readMessages("dat/sync2_messages.jsonl")
runSync(catalog, messages2)

val expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_overwrite_raw.jsonl")
val expectedFinalRecords2 =
readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl")
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison())
}

private val defaultSchema: String
get() = config!!["schema"].asText()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Only sync2 messages present in overwrite mode
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43}
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43}
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43}
1 change: 1 addition & 0 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ desired namespace.

| Version | Date | Pull Request | Subject |
|:----------------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.10.1 | 2024-06-11 | [\#39399](https://github.com/airbytehq/airbyte/pull/39399) | Bug fix for _airbyte_meta not migrated in OVERWRITE mode |
| 3.10.0 | 2024-06-10 | [\#39107](https://github.com/airbytehq/airbyte/pull/39107) | _airbyte_meta and _airbyte_generation_id in Raw tables and final tables |
| 3.9.1 | 2024-06-05 | [\#39135](https://github.com/airbytehq/airbyte/pull/39135) | Improved error handling for Staging files |
| 3.9.0 | 2024-05-23 | [\#38658](https://github.com/airbytehq/airbyte/pull/38658) | Adapting to newer interfaces from #38107 |
Expand Down
Loading