Skip to content

Commit

Permalink
derp
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Jul 24, 2024
1 parent d3a98a6 commit 3d9cf10
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
}

if (isTruncateSync) {
prepareStageForTruncate(destinationInitialStatus, stream)
rawTableSuffix = TMP_TABLE_SUFFIX
initialRawTableStatus = destinationInitialStatus.initialTempRawTableStatus
val (rawTableStatus, suffix) = prepareStageForTruncate(destinationInitialStatus, stream)
initialRawTableStatus = rawTableStatus
rawTableSuffix = suffix
} else {
rawTableSuffix = NO_SUFFIX
initialRawTableStatus = prepareStageForNormalSync(stream, destinationInitialStatus)
Expand Down Expand Up @@ -133,7 +133,17 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
private fun prepareStageForTruncate(
destinationInitialStatus: DestinationInitialStatus<DestinationState>,
stream: StreamConfig
) {
): Pair<InitialRawTableStatus, String> {
/*
tl;dr:
* if a temp raw table exists, check whether it belongs to the correct generation.
* if wrong generation, truncate it.
* regardless, write into the temp raw table.
* else, if a real raw table exists, check its generation.
* if wrong generation, write into a new temp raw table.
* else, write into the preexisting real raw table.
* else, create a new temp raw table and write into it.
*/
if (destinationInitialStatus.initialTempRawTableStatus.rawTableExists) {
val tempStageGeneration =
storageOperation.getStageGeneration(stream.id, TMP_TABLE_SUFFIX)
Expand All @@ -160,15 +170,43 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
}
// (if the existing temp stage is from the correct generation, then we're resuming
// a truncate refresh, and should keep the previous temp stage).
return Pair(destinationInitialStatus.initialTempRawTableStatus, TMP_TABLE_SUFFIX)
} else if (destinationInitialStatus.initialRawTableStatus.rawTableExists) {
// It's possible to "resume" a truncate sync that was previously already finalized.
// In this case, there is no existing temp raw table, and there is a real raw table
// which already belongs to the correct generation.
// Check for that case now.
val realStageGeneration =
storageOperation.getStageGeneration(stream.id, NO_SUFFIX)
if (realStageGeneration == null || realStageGeneration == stream.generationId) {
log.info {
"${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, no existing temp raw table, and existing real raw table belongs to generation $realStageGeneration (== current generation ${stream.generationId}). Retaining it."
}
// The real raw table is from the correct generation. Set up any other resources
// (staging file, etc.), but leave the table untouched.
storageOperation.prepareStage(stream.id, NO_SUFFIX)
return Pair(destinationInitialStatus.initialRawTableStatus, NO_SUFFIX)
} else {
log.info {
"${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, existing real raw table belongs to generation $realStageGeneration (!= current generation ${stream.generationId}), and no preexisting temp raw table. Creating a temp raw table."
}
// We're initiating a new truncate refresh. Create a new temp stage.
storageOperation.prepareStage(
stream.id,
TMP_TABLE_SUFFIX,
)
return Pair(destinationInitialStatus.initialTempRawTableStatus, TMP_TABLE_SUFFIX)
}
} else {
log.info {
"${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, and no preexisting temp raw table. Creating it."
"${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, and no preexisting temp or raw table. Creating a temp raw table."
}
// We're initiating a new truncate refresh. Create a new temp stage.
storageOperation.prepareStage(
stream.id,
TMP_TABLE_SUFFIX,
)
return Pair(destinationInitialStatus.initialTempRawTableStatus, TMP_TABLE_SUFFIX)
}
}

Expand Down Expand Up @@ -258,14 +296,14 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
// which is possible (`typeAndDedupe(streamConfig.id.copy(rawName = streamConfig.id.rawName
// + suffix))`
// but annoying and confusing.
if (isTruncateSync && streamSuccessful) {
if (isTruncateSync && streamSuccessful && rawTableSuffix.isNotEmpty()) {
log.info {
"Overwriting raw table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} because this is a truncate sync and we received a stream success message."
"Overwriting raw table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} because this is a truncate sync, we received a stream success message, and are using a temporary raw table."
}
storageOperation.overwriteStage(streamConfig.id, rawTableSuffix)
} else {
log.info {
"Not overwriting raw table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName}. Truncate sync: $isTruncateSync; stream success: $streamSuccessful"
"Not overwriting raw table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName}. Truncate sync: $isTruncateSync; stream success: $streamSuccessful; raw table suffix: \"$rawTableSuffix\""
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class AbstractStreamOperationTest {
val initialState =
mockk<DestinationInitialStatus<MinimumDestinationState.Impl>> {
every { streamConfig } returns this@Truncate.streamConfig
every { initialRawTableStatus } returns mockk<InitialRawTableStatus>()
every { initialRawTableStatus.rawTableExists } returns false
every { initialTempRawTableStatus.rawTableExists } returns false
every { isFinalTablePresent } returns false
every {
Expand Down Expand Up @@ -122,7 +122,7 @@ class AbstractStreamOperationTest {
val initialState =
mockk<DestinationInitialStatus<MinimumDestinationState.Impl>> {
every { streamConfig } returns this@Truncate.streamConfig
every { initialRawTableStatus } returns mockk<InitialRawTableStatus>()
every { initialRawTableStatus.rawTableExists } returns true
every { initialTempRawTableStatus.rawTableExists } returns false
every { isFinalTablePresent } returns true
every { isFinalTableEmpty } returns true
Expand All @@ -134,10 +134,12 @@ class AbstractStreamOperationTest {
destinationState.withSoftReset<MinimumDestinationState.Impl>(any())
} returns destinationState
}
every { storageOperation.getStageGeneration(streamId, "") } returns -1

val streamOperations = TestStreamOperation(storageOperation, initialState)

verifySequence {
storageOperation.getStageGeneration(streamId, "")
storageOperation.prepareStage(streamId, EXPECTED_SUFFIX)
storageOperation.createFinalTable(streamConfig, EXPECTED_SUFFIX, replace = true)
}
Expand Down Expand Up @@ -172,7 +174,7 @@ class AbstractStreamOperationTest {
val initialState =
mockk<DestinationInitialStatus<MinimumDestinationState.Impl>> {
every { streamConfig } returns this@Truncate.streamConfig
every { initialRawTableStatus } returns mockk<InitialRawTableStatus>()
every { initialRawTableStatus.rawTableExists } returns true
every { initialTempRawTableStatus.rawTableExists } returns false
every { isFinalTablePresent } returns true
every { isFinalTableEmpty } returns true
Expand All @@ -181,10 +183,12 @@ class AbstractStreamOperationTest {
destinationState.withSoftReset<MinimumDestinationState.Impl>(any())
} returns destinationState
}
every { storageOperation.getStageGeneration(streamId, "") } returns -1

val streamOperations = TestStreamOperation(storageOperation, initialState)

verifySequence {
storageOperation.getStageGeneration(streamId, "")
storageOperation.prepareStage(streamId, EXPECTED_SUFFIX)
// No table creation - we can just reuse the existing table.
}
Expand Down Expand Up @@ -218,18 +222,20 @@ class AbstractStreamOperationTest {
val initialState =
mockk<DestinationInitialStatus<MinimumDestinationState.Impl>> {
every { streamConfig } returns this@Truncate.streamConfig
every { initialRawTableStatus } returns mockk<InitialRawTableStatus>()
every { initialRawTableStatus.rawTableExists } returns true
every { initialTempRawTableStatus.rawTableExists } returns false
every { isFinalTablePresent } returns true
every { isFinalTableEmpty } returns false
every {
destinationState.withSoftReset<MinimumDestinationState.Impl>(any())
} returns destinationState
}
every { storageOperation.getStageGeneration(streamId, "") } returns -1

val streamOperations = TestStreamOperation(storageOperation, initialState)

verifySequence {
storageOperation.getStageGeneration(streamId, "")
storageOperation.prepareStage(streamId, EXPECTED_SUFFIX)
storageOperation.createFinalTable(streamConfig, EXPECTED_SUFFIX, replace = true)
}
Expand Down Expand Up @@ -264,14 +270,15 @@ class AbstractStreamOperationTest {
val initialState =
mockk<DestinationInitialStatus<MinimumDestinationState.Impl>> {
every { streamConfig } returns this@Truncate.streamConfig
every { initialRawTableStatus } returns mockk<InitialRawTableStatus>()
every { initialRawTableStatus.rawTableExists } returns true
every { initialTempRawTableStatus.rawTableExists } returns false
every { isFinalTablePresent } returns true
every { isFinalTableEmpty } returns false
every {
destinationState.withSoftReset<MinimumDestinationState.Impl>(any())
} returns destinationState
}
every { storageOperation.getStageGeneration(streamId, "") } returns -1

val streamOperations = TestStreamOperation(storageOperation, initialState)
// No point in verifying setup, completely identical to existingNonEmptyTable
Expand All @@ -298,18 +305,20 @@ class AbstractStreamOperationTest {
val initialState =
mockk<DestinationInitialStatus<MinimumDestinationState.Impl>> {
every { streamConfig } returns this@Truncate.streamConfig
every { initialRawTableStatus } returns mockk<InitialRawTableStatus>()
every { initialRawTableStatus.rawTableExists } returns true
every { initialTempRawTableStatus.rawTableExists } returns false
every { isFinalTablePresent } returns true
every { isFinalTableEmpty } returns false
every {
destinationState.withSoftReset<MinimumDestinationState.Impl>(any())
} returns destinationState
}
every { storageOperation.getStageGeneration(streamId, "") } returns -1

val streamOperations = TestStreamOperation(storageOperation, initialState)

verifySequence {
storageOperation.getStageGeneration(streamId, "")
storageOperation.prepareStage(streamId, EXPECTED_SUFFIX)
storageOperation.createFinalTable(streamConfig, EXPECTED_SUFFIX, replace = true)
}
Expand Down Expand Up @@ -481,6 +490,93 @@ class AbstractStreamOperationTest {
confirmVerified(storageOperation)
checkUnnecessaryStub(initialState, initialState.destinationState)
}

@ParameterizedTest
@MethodSource(
"io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#generationIds"
)
fun existingRealRawTableMatchingGeneration(existingRealTableGeneration: Long?) {
val initialState =
mockk<DestinationInitialStatus<MinimumDestinationState.Impl>> {
every { streamConfig } returns this@Truncate.streamConfig
every { initialRawTableStatus.rawTableExists } returns true
every { initialTempRawTableStatus.rawTableExists } returns false
every { isFinalTablePresent } returns false
every {
destinationState.withSoftReset<MinimumDestinationState.Impl>(any())
} returns destinationState
}
every { storageOperation.getStageGeneration(streamId, "") } returns
existingRealTableGeneration

val streamOperations = TestStreamOperation(storageOperation, initialState)

verifySequence {
storageOperation.getStageGeneration(streamId, "")
storageOperation.prepareStage(streamId, "")
storageOperation.createFinalTable(streamConfig, "", false)
}
confirmVerified(storageOperation)

clearMocks(storageOperation)
streamOperations.finalizeTable(
streamConfig,
StreamSyncSummary(42, AirbyteStreamStatus.COMPLETE)
)

verifySequence {
storageOperation.cleanupStage(streamId)
storageOperation.typeAndDedupe(
streamConfig,
Optional.empty(),
"",
)
}
confirmVerified(storageOperation)
checkUnnecessaryStub(initialState, initialState.destinationState)
}

@Test
fun existingRealRawTableWrongGeneration() {
val initialState =
mockk<DestinationInitialStatus<MinimumDestinationState.Impl>> {
every { streamConfig } returns this@Truncate.streamConfig
every { initialRawTableStatus.rawTableExists } returns true
every { initialTempRawTableStatus.rawTableExists } returns false
every { isFinalTablePresent } returns false
every {
destinationState.withSoftReset<MinimumDestinationState.Impl>(any())
} returns destinationState
}
every { storageOperation.getStageGeneration(streamId, "") } returns -1

val streamOperations = TestStreamOperation(storageOperation, initialState)

verifySequence {
storageOperation.getStageGeneration(streamId, "")
storageOperation.prepareStage(streamId, EXPECTED_SUFFIX, replace = false)
storageOperation.createFinalTable(streamConfig, "", replace = false)
}
confirmVerified(storageOperation)

clearMocks(storageOperation)
streamOperations.finalizeTable(
streamConfig,
StreamSyncSummary(42, AirbyteStreamStatus.COMPLETE)
)

verifySequence {
storageOperation.cleanupStage(streamId)
storageOperation.overwriteStage(streamId, EXPECTED_SUFFIX)
storageOperation.typeAndDedupe(
streamConfig,
Optional.empty(),
"",
)
}
confirmVerified(storageOperation)
checkUnnecessaryStub(initialState, initialState.destinationState)
}
}

@Nested
Expand Down

0 comments on commit 3d9cf10

Please sign in to comment.