Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destinations CDK: Correctly detect when real raw/final table is correct generation during truncate sync #42503

Merged
merged 2 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:-----------|:-----------|:-------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.44.14 | 2024-08-19 | [\#42503](https://github.com/airbytehq/airbyte/pull/42503) | Destinations (refreshes) - correctly detect existing raw/final table of the correct generation during truncate sync |
| 0.44.13 | 2024-08-14 | [\#42579](https://github.com/airbytehq/airbyte/pull/42579) | S3 destination - OVERWRITE: keep files until successful sync of same generationId |
| 0.44.5 | 2024-08-09 | [\#43374](https://github.com/airbytehq/airbyte/pull/43374) | S3 destination V2 fields, conversion improvements, bugfixes |
| 0.44.4 | 2024-08-08 | [\#43410](https://github.com/airbytehq/airbyte/pull/43330) | Better logs for counting info to state message. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.44.13
version=0.44.14
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
}

if (isTruncateSync) {
prepareStageForTruncate(destinationInitialStatus, stream)
rawTableSuffix = TMP_TABLE_SUFFIX
initialRawTableStatus = destinationInitialStatus.initialTempRawTableStatus
val (rawTableStatus, suffix) = prepareStageForTruncate(destinationInitialStatus, stream)
initialRawTableStatus = rawTableStatus
rawTableSuffix = suffix
} else {
rawTableSuffix = NO_SUFFIX
initialRawTableStatus = prepareStageForNormalSync(stream, destinationInitialStatus)
Expand Down Expand Up @@ -132,7 +132,17 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
private fun prepareStageForTruncate(
destinationInitialStatus: DestinationInitialStatus<DestinationState>,
stream: StreamConfig
) {
): Pair<InitialRawTableStatus, String> {
edgao marked this conversation as resolved.
Show resolved Hide resolved
/*
tl;dr:
* if a temp raw table exists, check whether it belongs to the correct generation.
* if wrong generation, truncate it.
* regardless, write into the temp raw table.
* else, if a real raw table exists, check its generation.
* if wrong generation, write into a new temp raw table.
* else, write into the preexisting real raw table.
* else, create a new temp raw table and write into it.
*/
if (destinationInitialStatus.initialTempRawTableStatus.rawTableExists) {
val tempStageGeneration =
storageOperation.getStageGeneration(stream.id, TMP_TABLE_SUFFIX)
Expand All @@ -146,6 +156,7 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
stream.id,
TMP_TABLE_SUFFIX,
)
return Pair(destinationInitialStatus.initialTempRawTableStatus, TMP_TABLE_SUFFIX)
} else {
log.info {
"${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, and existing temp raw table belongs to generation $tempStageGeneration (!= current generation ${stream.generationId}). Truncating it."
Expand All @@ -156,18 +167,67 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
TMP_TABLE_SUFFIX,
replace = true,
)
// We nuked the temp raw table, so create a new initial raw table status.
return Pair(
InitialRawTableStatus(
rawTableExists = true,
hasUnprocessedRecords = false,
maxProcessedTimestamp = Optional.empty(),
),
TMP_TABLE_SUFFIX,
)
}
} else if (destinationInitialStatus.initialRawTableStatus.rawTableExists) {
// It's possible to "resume" a truncate sync that was previously already finalized.
// In this case, there is no existing temp raw table, and there is a real raw table
// which already belongs to the correct generation.
// Check for that case now.
val realStageGeneration = storageOperation.getStageGeneration(stream.id, NO_SUFFIX)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait what ? 🤯

if (realStageGeneration == null || realStageGeneration == stream.generationId) {
log.info {
"${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, no existing temp raw table, and existing real raw table belongs to generation $realStageGeneration (== current generation ${stream.generationId}). Retaining it."
}
// The real raw table is from the correct generation. Set up any other resources
// (staging file, etc.), but leave the table untouched.
storageOperation.prepareStage(stream.id, NO_SUFFIX)
return Pair(destinationInitialStatus.initialRawTableStatus, NO_SUFFIX)
} else {
log.info {
"${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, existing real raw table belongs to generation $realStageGeneration (!= current generation ${stream.generationId}), and no preexisting temp raw table. Creating a temp raw table."
}
// We're initiating a new truncate refresh. Create a new temp stage.
storageOperation.prepareStage(
stream.id,
TMP_TABLE_SUFFIX,
)
return Pair(
// Create a fresh raw table status, since we created a fresh temp stage.
InitialRawTableStatus(
rawTableExists = true,
hasUnprocessedRecords = false,
maxProcessedTimestamp = Optional.empty(),
),
TMP_TABLE_SUFFIX,
)
}
// (if the existing temp stage is from the correct generation, then we're resuming
// a truncate refresh, and should keep the previous temp stage).
} else {
log.info {
"${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, and no preexisting temp raw table. Creating it."
"${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, and no preexisting temp or raw table. Creating a temp raw table."
}
// We're initiating a new truncate refresh. Create a new temp stage.
storageOperation.prepareStage(
stream.id,
TMP_TABLE_SUFFIX,
)
return Pair(
// Create a fresh raw table status, since we created a fresh temp stage.
InitialRawTableStatus(
rawTableExists = true,
hasUnprocessedRecords = false,
maxProcessedTimestamp = Optional.empty(),
),
TMP_TABLE_SUFFIX,
)
}
}

Expand All @@ -188,8 +248,39 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
// The table already exists. Decide whether we're writing to it directly, or
// using a tmp table.
if (isTruncateSync) {
// Truncate refresh. Use a temp final table.
return prepareFinalTableForOverwrite(initialStatus)
if (initialStatus.isFinalTableEmpty || initialStatus.finalTableGenerationId == null) {
if (!initialStatus.isSchemaMismatch) {
log.info {
"Truncate sync, and final table is empty and has correct schema. Writing to it directly."
}
return NO_SUFFIX
} else {
// No point soft resetting an empty table. We'll just do an overwrite later.
log.info {
"Truncate sync, and final table is empty, but has the wrong schema. Using a temp final table."
}
return prepareFinalTableForOverwrite(initialStatus)
}
} else if (
initialStatus.finalTableGenerationId >=
initialStatus.streamConfig.minimumGenerationId
) {
if (!initialStatus.isSchemaMismatch) {
log.info {
"Truncate sync, and final table matches our generation and has correct schema. Writing to it directly."
}
return NO_SUFFIX
} else {
log.info {
"Truncate sync, and final table matches our generation, but has the wrong schema. Writing to it directly, but triggering a soft reset first."
}
storageOperation.softResetFinalTable(stream)
return NO_SUFFIX
}
} else {
// The final table is in the wrong generation. Use a temp final table.
return prepareFinalTableForOverwrite(initialStatus)
}
} else {
if (initialStatus.isSchemaMismatch || initialStatus.destinationState.needsSoftReset()) {
// We're loading data directly into the existing table.
Expand Down Expand Up @@ -257,14 +348,14 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
// which is possible (`typeAndDedupe(streamConfig.id.copy(rawName = streamConfig.id.rawName
// + suffix))`
// but annoying and confusing.
if (isTruncateSync && streamSuccessful) {
if (isTruncateSync && streamSuccessful && rawTableSuffix.isNotEmpty()) {
log.info {
"Overwriting raw table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} because this is a truncate sync and we received a stream success message."
"Overwriting raw table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} because this is a truncate sync, we received a stream success message, and are using a temporary raw table."
}
storageOperation.overwriteStage(streamConfig.id, rawTableSuffix)
} else {
log.info {
"Not overwriting raw table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName}. Truncate sync: $isTruncateSync; stream success: $streamSuccessful"
"Not overwriting raw table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName}. Truncate sync: $isTruncateSync; stream success: $streamSuccessful; raw table suffix: \"$rawTableSuffix\""
}
}

Expand Down Expand Up @@ -303,10 +394,11 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
"Skipping typing and deduping for stream ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} running as truncate sync. Stream success: $streamSuccessful; records written: ${syncSummary.recordsWritten}; temp raw table already existed: ${initialRawTableStatus.rawTableExists}; temp raw table had records: ${initialRawTableStatus.hasUnprocessedRecords}"
}
} else {
// In truncate mode, we want to read all the raw records. Typically, this is equivalent
// When targeting the temp final table, we want to read all the raw records
// because the temp final table is always a full rebuild. Typically, this is equivalent
// to filtering on timestamp, but might as well be explicit.
val timestampFilter =
if (!isTruncateSync) {
if (finalTmpTableSuffix.isEmpty()) {
initialRawTableStatus.maxProcessedTimestamp
} else {
Optional.empty()
Expand Down
Loading
Loading