Skip to content

Commit

Permalink
start implementing refreshes orchestration
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed May 28, 2024
1 parent 3aa0964 commit d7fd028
Show file tree
Hide file tree
Showing 11 changed files with 607 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ abstract class JdbcDestinationHandler<DestinationState>(
streamConfig,
finalTableDefinition.isPresent,
initialRawTableState,
// TODO fix this
// for now, no JDBC destinations actually do refreshes
// so this is just to make our code compile
InitialRawTableStatus(false, false, Optional.empty()),
isSchemaMismatch,
isFinalTableEmpty,
destinationState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableS
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.time.Instant
import java.util.Optional
import java.util.stream.Stream

Expand All @@ -24,10 +24,11 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
private val log = KotlinLogging.logger {}

// State maintained to make decision between async calls
private val isTruncateSync: Boolean
private val rawTableSuffix: String
private val finalTmpTableSuffix: String
private val initialRawTableStatus: InitialRawTableStatus =
destinationInitialStatus.initialRawTableStatus
// null for truncate syncs, where we don't care at all about the initial status.
private val initialRawTableStatus: InitialRawTableStatus?

/**
* After running any sync setup code, we may update the destination state. This field holds that
Expand All @@ -37,8 +38,85 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat

init {
val stream = destinationInitialStatus.streamConfig
rawTableSuffix = NO_SUFFIX
storageOperation.prepareStage(stream.id, NO_SUFFIX)

isTruncateSync = when (stream.minimumGenerationId) {
0L -> false
stream.generationId -> true
else -> {
// This is technically already handled in CatalogParser.
throw IllegalArgumentException("Hybrid refreshes are not yet supported.")
}
}

if (isTruncateSync) {
if (destinationInitialStatus.initialTempRawTableStatus.rawTableExists) {
val tempStageGeneration =
storageOperation.getStageGeneration(stream.id, TMP_TABLE_SUFFIX)
if (tempStageGeneration != null && tempStageGeneration != stream.generationId) {
// The temp stage is from the wrong generation. Nuke it.
storageOperation.prepareStage(
stream.id,
TMP_TABLE_SUFFIX,
replace = true,
)
} else {
// The temp table is from the correct generation. Set up any other resources
// (staging file, etc.), but leave the table untouched.
storageOperation.prepareStage(
stream.id,
TMP_TABLE_SUFFIX,
)
}
// (if the existing temp stage is from the correct generation, then we're resuming
// a truncate refresh, and should keep the previous temp stage).
} else {
// We're initiating a new truncate refresh. Create a new temp stage.
storageOperation.prepareStage(
stream.id,
TMP_TABLE_SUFFIX,
)
}
rawTableSuffix = TMP_TABLE_SUFFIX
initialRawTableStatus = null
} else {
rawTableSuffix = NO_SUFFIX
storageOperation.prepareStage(stream.id, NO_SUFFIX)
if (destinationInitialStatus.initialTempRawTableStatus.rawTableExists) {
// There was a previous truncate refresh attempt, which failed, and left some
// records behind.
// Retrieve those records and put them in the real stage.
// This is necessary to avoid certain data loss scenarios.
// (specifically: a user initiates a truncate sync, which fails, but emits some records.
// It also emits a state message for "resumable" full refresh.
// The user then initiates an incremental sync, which runs using that state.
// In this case, we MUST retain the records from the truncate attempt.)
storageOperation.transferFromTempStage(stream.id, TMP_TABLE_SUFFIX)

// We need to combine the raw table statuses from the real and temp raw tables.
val hasUnprocessedRecords =
destinationInitialStatus.initialTempRawTableStatus.hasUnprocessedRecords ||
destinationInitialStatus.initialRawTableStatus.hasUnprocessedRecords
// Pick the earlier min timestamp.
val maxProcessedTimestamp: Optional<Instant> =
destinationInitialStatus.initialRawTableStatus.maxProcessedTimestamp.flatMap { realRawTableTimestamp ->
destinationInitialStatus.initialTempRawTableStatus.maxProcessedTimestamp.flatMap { tempRawTableTimestamp ->
if (realRawTableTimestamp.isBefore(tempRawTableTimestamp)) {
Optional.of(realRawTableTimestamp)
} else {
Optional.of(tempRawTableTimestamp)
}
}.or { Optional.of(realRawTableTimestamp) }
}.or { destinationInitialStatus.initialTempRawTableStatus.maxProcessedTimestamp }
initialRawTableStatus = InitialRawTableStatus(
rawTableExists = true,
hasUnprocessedRecords = hasUnprocessedRecords,
maxProcessedTimestamp = maxProcessedTimestamp
)
} else {
initialRawTableStatus = destinationInitialStatus.initialRawTableStatus
}
}

if (!disableTypeDedupe) {
// Prepare final tables based on sync mode.
finalTmpTableSuffix = prepareFinalTable(destinationInitialStatus)
Expand All @@ -51,7 +129,7 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat

companion object {
private const val NO_SUFFIX = ""
private const val TMP_OVERWRITE_TABLE_SUFFIX = "_airbyte_tmp"
const val TMP_TABLE_SUFFIX = "_airbyte_tmp"
}

private fun prepareFinalTable(
Expand All @@ -70,27 +148,22 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
log.info { "Final Table exists for stream ${stream.id.finalName}" }
// The table already exists. Decide whether we're writing to it directly, or
// using a tmp table.
when (stream.destinationSyncMode) {
// For overwrite, it's wasteful to do T+D, so we don't do soft-reset in prepare.
// Instead,
// we do type-dedupe on a suffixed table and do a swap in finalizeTable when we have to
// for schema mismatches.
DestinationSyncMode.OVERWRITE -> return prepareFinalTableForOverwrite(initialStatus)
DestinationSyncMode.APPEND,
DestinationSyncMode.APPEND_DEDUP -> {
if (
initialStatus.isSchemaMismatch ||
initialStatus.destinationState.needsSoftReset()
) {
// We're loading data directly into the existing table.
// Make sure it has the right schema.
// Also, if a raw table migration wants us to do a soft reset, do that
// here.
log.info { "Executing soft-reset on final table of stream $stream" }
storageOperation.softResetFinalTable(stream)
}
return NO_SUFFIX
if (isTruncateSync) {
// Truncate refresh. Use a temp final table.
return prepareFinalTableForOverwrite(initialStatus)
} else {
if (
initialStatus.isSchemaMismatch ||
initialStatus.destinationState.needsSoftReset()
) {
// We're loading data directly into the existing table.
// Make sure it has the right schema.
// Also, if a raw table migration wants us to do a soft reset, do that
// here.
log.info { "Executing soft-reset on final table of stream $stream" }
storageOperation.softResetFinalTable(stream)
}
return NO_SUFFIX
}
}

Expand All @@ -100,14 +173,13 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
val stream = initialStatus.streamConfig
if (!initialStatus.isFinalTableEmpty || initialStatus.isSchemaMismatch) {
// overwrite an existing tmp table if needed.
storageOperation.createFinalTable(stream, TMP_OVERWRITE_TABLE_SUFFIX, true)
storageOperation.createFinalTable(stream, TMP_TABLE_SUFFIX, true)
log.info {
"Using temp final table for table ${stream.id.finalName}, this will be overwritten at end of sync"
}
// We want to overwrite an existing table. Write into a tmp table.
// We'll overwrite the table at the
// end of the sync.
return TMP_OVERWRITE_TABLE_SUFFIX
// We'll overwrite the table at the end of the sync.
return TMP_TABLE_SUFFIX
}

log.info {
Expand All @@ -133,6 +205,20 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
override fun finalizeTable(streamConfig: StreamConfig, syncSummary: StreamSyncSummary) {
// Delete staging directory, implementation will handle if it has to do it or not or a No-OP
storageOperation.cleanupStage(streamConfig.id)

val streamSuccessful = syncSummary.terminalStatus == AirbyteStreamStatus.COMPLETE
// Overwrite the raw table before doing anything else.
// This ensures that if T+D fails, we can easily retain the records on the next sync.
// It also means we don't need to run T+D using the temp raw table,
// which is possible (`typeAndDedupe(streamConfig.id.copy(rawName = streamConfig.id.rawName + suffix))`
// but annoying and confusing.
if (isTruncateSync && streamSuccessful) {
log.info { "Overwriting raw table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} because this is a truncate sync and we received a stream success message." }
storageOperation.overwriteStage(streamConfig.id, rawTableSuffix)
} else {
log.info { "Not overwriting raw table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName}. Truncate sync: $isTruncateSync; stream success: $streamSuccessful" }
}

if (disableTypeDedupe) {
log.info {
"Typing and deduping disabled, skipping final table finalization. " +
Expand All @@ -141,51 +227,41 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
return
}

// Legacy logic that if recordsWritten or not tracked then it could be non-zero
val isNotOverwriteSync = streamConfig.destinationSyncMode != DestinationSyncMode.OVERWRITE
// Non-overwrite syncs should T+D regardless of status,
// so the user sees progress after every attempt.
// But overwrite syncs should only run T+D if the stream was successful
// (since we're T+Ding into a temp final table anyway).
val streamStatusRequiresTd =
isNotOverwriteSync || syncSummary.terminalStatus == AirbyteStreamStatus.COMPLETE
val shouldRunTypingDeduping: Boolean =
if (streamStatusRequiresTd) {
// Legacy logic that if recordsWritten or not tracked then it could be non-zero.
// But for OVERWRITE syncs, we don't need to look at old records.
val hasRecordsNeedingTd =
syncSummary.recordsWritten > 0 ||
(isNotOverwriteSync && initialRawTableStatus.hasUnprocessedRecords)
hasRecordsNeedingTd
} else {
false
}
val shouldRunTypingDeduping =
// Normal syncs should T+D regardless of status, so the user sees progress after
(!isTruncateSync &&
(syncSummary.recordsWritten > 0 ||
// We know this is a normal sync, so initialRawTableStatus is nonnull.
initialRawTableStatus!!.hasUnprocessedRecords)) ||
// But truncate syncs should only T+D if the sync was successful, since we're T+Ding
// into a temp final table anyway. And we only need to check if _this_ sync emitted
// records, since we've nuked the old raw data.
(isTruncateSync && streamSuccessful && syncSummary.recordsWritten > 0)
if (!shouldRunTypingDeduping) {
log.info {
"Skipping typing and deduping for stream ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} " +
"because it had no records during this sync and no unprocessed records from a previous sync."
}
} else {
// In overwrite mode, we want to read all the raw records. Typically, this is equivalent
// In truncate mode, we want to read all the raw records. Typically, this is equivalent
// to filtering on timestamp, but might as well be explicit.
val timestampFilter =
if (isNotOverwriteSync) {
initialRawTableStatus.maxProcessedTimestamp
if (!isTruncateSync) {
initialRawTableStatus!!.maxProcessedTimestamp
} else {
Optional.empty()
}
storageOperation.typeAndDedupe(streamConfig, timestampFilter, finalTmpTableSuffix)
}

if (
streamConfig.destinationSyncMode == DestinationSyncMode.OVERWRITE &&
finalTmpTableSuffix.isNotBlank()
// We should only overwrite the final table if the stream was successful.
// This prevents data downtime if the stream didn't emit all the data.
&&
syncSummary.terminalStatus == AirbyteStreamStatus.COMPLETE
) {
// We want to run this independently of whether we ran T+D.
// E.g. it's valid for a sync to emit 0 records (e.g. the source table is legitimately
// empty), in which case we want to overwrite the final table with an empty table.
if (isTruncateSync && streamSuccessful && finalTmpTableSuffix.isNotBlank()) {
log.info { "Overwriting final table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} because this is a truncate sync, we received a stream success message, and we are using a temp final table.." }
storageOperation.overwriteFinalTable(streamConfig, finalTmpTableSuffix)
} else {
log.info { "Not overwriting final table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName}. Truncate sync: $isTruncateSync; stream success: $streamSuccessful; final table suffix not blank: ${finalTmpTableSuffix.isNotBlank()}" }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ interface StorageOperation<Data> {
* Prepare staging area which cloud be creating any object storage, temp tables or file storage.
* Similar to [createFinalTable], accepts a [suffix] parameter, which should be used in conjunction
* with [overwriteStage].
*
* @param replace If true, then replace existing resources with empty e.g. tables. If false,
* then leave existing resources untouched.
*/
fun prepareStage(streamId: StreamId, suffix: String, replace: Boolean = false)

Expand All @@ -40,8 +43,10 @@ interface StorageOperation<Data> {
*
* [AbstractStreamOperation] is responsible for orchestrating the stages so that the temp stage
* always contains exactly one generation.
*
* @return The generation ID of a record in the stage, or `null` if the stage is empty.
*/
fun getStageGeneration(streamId: StreamId, suffix: String): Long
fun getStageGeneration(streamId: StreamId, suffix: String): Long?

/** Delete previously staged data, using deterministic information from streamId. */
fun cleanupStage(streamId: StreamId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@ package io.airbyte.integrations.base.destination.typing_deduping
data class DestinationInitialStatus<DestinationState>(
val streamConfig: StreamConfig,
val isFinalTablePresent: Boolean,
// TODO we should probably make this nullable, then delete InitialRawTableStatus.rawTableExists
val initialRawTableStatus: InitialRawTableStatus,
/**
* The state of the temp raw table, or null if there is no temp raw table
* at the start of the sync.
*/
val initialTempRawTableStatus: InitialRawTableStatus,
val isSchemaMismatch: Boolean,
val isFinalTableEmpty: Boolean,
val destinationState: DestinationState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,19 @@ import java.util.*

data class InitialRawTableStatus(
val rawTableExists: Boolean,
/**
* Whether there were any records with null `_airbyte_loaded_at`, at the time
* that this status was fetched.
*/
val hasUnprocessedRecords: Boolean,
// TODO Make maxProcessedTimestamp just `Instant?` instead of Optional
/**
* The highest timestamp such that all records in
* `SELECT * FROM raw_table WHERE _airbyte_extracted_at <= ?`
* have a nonnull `_airbyte_loaded_at`.
*
* Destinations MAY use this value to only run T+D on records with
* `_airbyte_extracted_at > ?` (note the strictly-greater comparison).
*/
val maxProcessedTimestamp: Optional<Instant>
)
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ data class StreamId(
return "$quote$finalNamespace$quote.$quote$finalName$suffix$quote"
}

fun rawTableId(quote: String): String {
return "$quote$rawNamespace$quote.$quote$rawName$quote"
@JvmOverloads
fun rawTableId(quote: String, suffix: String = ""): String {
return "$quote$rawNamespace$quote.$quote$rawName$suffix$quote"
}

fun finalName(quote: String): String {
Expand Down
Loading

0 comments on commit d7fd028

Please sign in to comment.