Skip to content

Commit

Permalink
start implementing refreshes orchestration
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Jun 14, 2024
1 parent 32fe73a commit 19237b7
Show file tree
Hide file tree
Showing 13 changed files with 711 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ abstract class JdbcDestinationHandler<DestinationState>(
streamConfig,
finalTableDefinition.isPresent,
initialRawTableState,
// TODO fix this
// for now, no JDBC destinations actually do refreshes
// so this is just to make our code compile
InitialRawTableStatus(false, false, Optional.empty()),
isSchemaMismatch,
isFinalTableEmpty,
destinationState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ class StagingStreamOperations<DestinationState : MinimumDestinationState>(
) {

private val log = KotlinLogging.logger {}
override fun writeRecordsImpl(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
override fun writeRecordsImpl(
streamConfig: StreamConfig,
suffix: String,
stream: Stream<PartialAirbyteMessage>
) {
val writeBuffer =
StagingSerializedBufferFactory.initializeBuffer(fileUploadFormat, destinationColumns)

Expand All @@ -51,7 +55,7 @@ class StagingStreamOperations<DestinationState : MinimumDestinationState>(
"Buffer flush complete for stream ${streamConfig.id.originalName} (${FileUtils.byteCountToDisplaySize(it.byteCount)}) to staging"
}
if (it.byteCount != 0L) {
storageOperation.writeToStage(streamConfig, writeBuffer)
storageOperation.writeToStage(streamConfig, suffix, writeBuffer)
} else {
log.info { "Skipping writing to storage since there are no bytes to write" }
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ class StandardStreamOperation<DestinationState : MinimumDestinationState>(
destinationInitialStatus,
disableTypeDedupe
) {
override fun writeRecordsImpl(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
storageOperation.writeToStage(streamConfig, stream)
override fun writeRecordsImpl(
streamConfig: StreamConfig,
suffix: String,
stream: Stream<PartialAirbyteMessage>
) {
storageOperation.writeToStage(streamConfig, suffix, stream)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@ interface StorageOperation<Data> {

/**
* Prepare staging area which cloud be creating any object storage, temp tables or file storage.
* Similar to [createFinalTable], accepts a [suffix] parameter, which should be used in conjunction
* with [overwriteStage].
* Similar to [createFinalTable], accepts a [suffix] parameter, which should be used in
* conjunction with [overwriteStage].
*
* @param replace If true, then replace existing resources with empty e.g. tables. If false,
* then leave existing resources untouched.
*/
fun prepareStage(streamId: StreamId, suffix: String, replace: Boolean = false)

/**
* Swap the "temporary" stage into the "real" stage. For example,
* `DROP TABLE airbyte_internal.foo; ALTER TABLE airbyte_internal.foo_tmp RENAME TO foo`.
* Swap the "temporary" stage into the "real" stage. For example, `DROP TABLE IF NOT EXISTS
* airbyte_internal.foo; ALTER TABLE airbyte_internal.foo_tmp RENAME TO foo`.
*/
fun overwriteStage(streamId: StreamId, suffix: String)

Expand All @@ -35,19 +38,21 @@ interface StorageOperation<Data> {
fun transferFromTempStage(streamId: StreamId, suffix: String)

/**
* Get the generation of a single record in the stage. Not necessarily the min or max generation,
* just _any_ record.
* Get the generation of a single record in the stage. Not necessarily the min or max
* generation, just _any_ record.
*
* [AbstractStreamOperation] is responsible for orchestrating the stages so that the temp stage
* always contains exactly one generation.
*
* @return The generation ID of a record in the stage, or `null` if the stage is empty.
*/
fun getStageGeneration(streamId: StreamId, suffix: String): Long
fun getStageGeneration(streamId: StreamId, suffix: String): Long?

/** Delete previously staged data, using deterministic information from streamId. */
fun cleanupStage(streamId: StreamId)

/** Write data to stage. */
fun writeToStage(streamConfig: StreamConfig, data: Data)
fun writeToStage(streamConfig: StreamConfig, suffix: String, data: Data)

/*
* ==================== Final Table Operations ================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@ package io.airbyte.integrations.base.destination.typing_deduping
data class DestinationInitialStatus<DestinationState>(
val streamConfig: StreamConfig,
val isFinalTablePresent: Boolean,
// TODO we should probably make this nullable, then delete InitialRawTableStatus.rawTableExists
val initialRawTableStatus: InitialRawTableStatus,
/**
* The state of the temp raw table, or null if there is no temp raw table at the start of the
* sync.
*/
val initialTempRawTableStatus: InitialRawTableStatus,
val isSchemaMismatch: Boolean,
val isFinalTableEmpty: Boolean,
val destinationState: DestinationState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@ import java.util.*

data class InitialRawTableStatus(
val rawTableExists: Boolean,
/**
* Whether there were any records with null `_airbyte_loaded_at`, at the time that this status
* was fetched.
*/
val hasUnprocessedRecords: Boolean,
// TODO Make maxProcessedTimestamp just `Instant?` instead of Optional
/**
* The highest timestamp such that all records in `SELECT * FROM raw_table WHERE
* _airbyte_extracted_at <= ?` have a nonnull `_airbyte_loaded_at`.
*
* Destinations MAY use this value to only run T+D on records with `_airbyte_extracted_at > ?`
* (note the strictly-greater comparison).
*/
val maxProcessedTimestamp: Optional<Instant>
)
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ data class StreamId(
return "$quote$finalNamespace$quote.$quote$finalName$suffix$quote"
}

fun rawTableId(quote: String): String {
return "$quote$rawNamespace$quote.$quote$rawName$quote"
@JvmOverloads
fun rawTableId(quote: String, suffix: String = ""): String {
return "$quote$rawNamespace$quote.$quote$rawName$suffix$quote"
}

fun finalName(quote: String): String {
Expand Down
Loading

0 comments on commit 19237b7

Please sign in to comment.