diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 0b97d5de7b80..450e0038d5d2 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -173,12 +173,12 @@ corresponds to that version. ### Java CDK | Version | Date | Pull Request | Subject | -|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.35.6 | 2024-05-17 | [\#38107](https://github.com/airbytehq/airbyte/pull/38107) | New interfaces for Destination connectors to plug into AsyncStreamConsumer | | 0.35.5 | 2024-05-17 | [\#38204](https://github.com/airbytehq/airbyte/pull/38204) | add assume-role authentication to s3 | | 0.35.2 | 2024-05-13 | [\#38104](https://github.com/airbytehq/airbyte/pull/38104) | Handle transient error messages | | 0.35.0 | 2024-05-13 | [\#38127](https://github.com/airbytehq/airbyte/pull/38127) | Destinations: Populate generation/sync ID on StreamConfig | | 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates | -| 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates | | 0.34.3 | 2024-05-10 | [\#38095](https://github.com/airbytehq/airbyte/pull/38095) | Minor changes for databricks connector | | 0.34.1 | 2024-05-07 | [\#38030](https://github.com/airbytehq/airbyte/pull/38030) | Add support for transient errors | | 0.34.0 | 2024-05-01 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | Destinations: Remove incremental T+D | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/JavaBaseConstants.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/JavaBaseConstants.kt index c43b00774590..5003c09c4acf 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/JavaBaseConstants.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/JavaBaseConstants.kt @@ -57,8 +57,8 @@ object JavaBaseConstants { const val DEFAULT_AIRBYTE_INTERNAL_NAMESPACE: String = "airbyte_internal" enum class DestinationColumns(val rawColumns: List) { - V2_WITH_META(JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES), - V2_WITHOUT_META(JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META), - LEGACY(JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS) + V2_WITH_META(V2_RAW_TABLE_COLUMN_NAMES), + V2_WITHOUT_META(V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META), + LEGACY(LEGACY_RAW_TABLE_COLUMNS) } } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StreamSyncSummary.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StreamSyncSummary.kt index ba9688dcb80c..fd2bf0fdc785 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StreamSyncSummary.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StreamSyncSummary.kt @@ -11,7 +11,7 @@ import java.util.* * destinations framework; new implementations should always provide this information). If this * value is empty, consumers should assume that the sync wrote nonzero records for this stream. */ -class StreamSyncSummary(val recordsWritten: Optional) { +data class StreamSyncSummary(val recordsWritten: Optional) { companion object { @JvmField val DEFAULT: StreamSyncSummary = StreamSyncSummary(Optional.empty()) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/operation/SyncOperation.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/operation/SyncOperation.kt new file mode 100644 index 000000000000..ef24f959aa90 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/operation/SyncOperation.kt @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.integrations.destination.operation + +import io.airbyte.cdk.integrations.destination.StreamSyncSummary +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.protocol.models.v0.StreamDescriptor +import java.util.stream.Stream + +/** + * Destination Connector sync operations Any initialization required for the connector should be + * done as part of instantiation/init blocks + */ +interface SyncOperation { + + /** + * This function is a shim for + * [io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction] After the + * method control is returned, it should be assumed that the data is committed to a durable + * storage and send back any State message acknowledgements. + */ + fun flushStream(descriptor: StreamDescriptor, stream: Stream) + + /** + * Finalize streams which could involve typing deduping or any other housekeeping tasks + * required. + */ + fun finalizeStreams(streamSyncSummaries: Map) +} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index f8cdf992edf1..1a4261f9b0f2 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.35.5 +version=0.35.6 diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingSerializedBufferFactory.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingSerializedBufferFactory.kt new file mode 100644 index 000000000000..9e7563f98065 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingSerializedBufferFactory.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.integrations.destination.staging + +import io.airbyte.cdk.integrations.base.JavaBaseConstants +import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer +import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer +import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat +import io.airbyte.cdk.integrations.destination.s3.csv.CsvSerializedBuffer +import io.airbyte.cdk.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator + +/** + * Factory which can create an instance of concrete SerializedBuffer for one-time use before buffer + * is closed. [io.airbyte.cdk.integrations.destination.s3.SerializedBufferFactory] is almost similar + * which needs to be unified. That doesn't work well with our DV2 staging destinations, which mostly + * support CSV only. + */ +object StagingSerializedBufferFactory { + + fun initializeBuffer( + fileUploadFormat: FileUploadFormat, + destinationColumns: JavaBaseConstants.DestinationColumns + ): SerializableBuffer { + when (fileUploadFormat) { + FileUploadFormat.CSV -> { + return CsvSerializedBuffer( + FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX), + StagingDatabaseCsvSheetGenerator(destinationColumns), + true, + ) + } + else -> { + TODO("Only CSV is supported for Staging format") + } + } + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt new file mode 100644 index 000000000000..eb7c193757c6 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.integrations.destination.staging.operation + +import io.airbyte.cdk.integrations.base.JavaBaseConstants +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer +import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat +import io.airbyte.cdk.integrations.destination.staging.StagingSerializedBufferFactory +import io.airbyte.commons.json.Jsons +import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation +import io.airbyte.integrations.base.destination.operation.StorageOperation +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState +import io.github.oshai.kotlinlogging.KotlinLogging +import java.util.stream.Stream +import org.apache.commons.io.FileUtils + +class StagingStreamOperations( + private val storageOperation: StorageOperation, + destinationInitialStatus: DestinationInitialStatus, + private val fileUploadFormat: FileUploadFormat, + private val destinationColumns: JavaBaseConstants.DestinationColumns, + disableTypeDedupe: Boolean = false +) : + AbstractStreamOperation( + storageOperation, + destinationInitialStatus, + disableTypeDedupe + ) { + + private val log = KotlinLogging.logger {} + override fun writeRecords(streamConfig: StreamConfig, stream: Stream) { + val writeBuffer = + StagingSerializedBufferFactory.initializeBuffer(fileUploadFormat, destinationColumns) + + writeBuffer.use { + stream.forEach { record: PartialAirbyteMessage -> + it.accept( + record.serialized!!, + Jsons.serialize(record.record!!.meta), + record.record!!.emittedAt + ) + } + it.flush() + log.info { + "Buffer flush complete for stream ${streamConfig.id.originalName} (${FileUtils.byteCountToDisplaySize(it.byteCount)}) to staging" + } + storageOperation.writeToStage(streamConfig.id, writeBuffer) + } + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/build.gradle b/airbyte-cdk/java/airbyte-cdk/typing-deduping/build.gradle index bb8f72624672..4f5cd32179fe 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/build.gradle +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/build.gradle @@ -22,4 +22,5 @@ dependencies { testFixturesApi testFixtures(project(':airbyte-cdk:java:airbyte-cdk:airbyte-cdk-core')) testFixturesImplementation 'org.mockito.kotlin:mockito-kotlin:5.2.1' testImplementation 'org.mockito.kotlin:mockito-kotlin:5.2.1' + testImplementation "io.mockk:mockk:1.13.11" } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt new file mode 100644 index 000000000000..2238359029ad --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.operation + +import io.airbyte.cdk.integrations.destination.StreamSyncSummary +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus +import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState +import io.airbyte.protocol.models.v0.DestinationSyncMode +import io.github.oshai.kotlinlogging.KotlinLogging +import java.util.Optional +import java.util.stream.Stream + +abstract class AbstractStreamOperation( + private val storageOperation: StorageOperation, + destinationInitialStatus: DestinationInitialStatus, + private val disableTypeDedupe: Boolean = false +) : StreamOperation { + private val log = KotlinLogging.logger {} + + // State maintained to make decision between async calls + private val finalTmpTableSuffix: String + private val initialRawTableStatus: InitialRawTableStatus = + destinationInitialStatus.initialRawTableStatus + + /** + * After running any sync setup code, we may update the destination state. This field holds that + * updated destination state. + */ + final override val updatedDestinationState: DestinationState + + init { + val stream = destinationInitialStatus.streamConfig + storageOperation.prepareStage(stream.id, stream.destinationSyncMode) + if (!disableTypeDedupe) { + storageOperation.createFinalNamespace(stream.id) + // Prepare final tables based on sync mode. + finalTmpTableSuffix = prepareFinalTable(destinationInitialStatus) + } else { + log.info { "Typing and deduping disabled, skipping final table initialization" } + finalTmpTableSuffix = NO_SUFFIX + } + updatedDestinationState = destinationInitialStatus.destinationState.withSoftReset(false) + } + + companion object { + private const val NO_SUFFIX = "" + private const val TMP_OVERWRITE_TABLE_SUFFIX = "_airbyte_tmp" + } + + private fun prepareFinalTable( + initialStatus: DestinationInitialStatus + ): String { + val stream = initialStatus.streamConfig + // No special handling if final table doesn't exist, just create and return + if (!initialStatus.isFinalTablePresent) { + log.info { + "Final table does not exist for stream ${initialStatus.streamConfig.id.finalName}, creating." + } + storageOperation.createFinalTable(stream, NO_SUFFIX, false) + return NO_SUFFIX + } + + log.info { "Final Table exists for stream ${stream.id.finalName}" } + // The table already exists. Decide whether we're writing to it directly, or + // using a tmp table. + when (stream.destinationSyncMode) { + DestinationSyncMode.OVERWRITE -> return prepareFinalTableForOverwrite(initialStatus) + DestinationSyncMode.APPEND, + DestinationSyncMode.APPEND_DEDUP -> { + if ( + initialStatus.isSchemaMismatch || + initialStatus.destinationState.needsSoftReset() + ) { + // We're loading data directly into the existing table. + // Make sure it has the right schema. + // Also, if a raw table migration wants us to do a soft reset, do that + // here. + log.info { "Executing soft-reset on final table of stream $stream" } + storageOperation.softResetFinalTable(stream) + } + return NO_SUFFIX + } + } + } + + private fun prepareFinalTableForOverwrite( + initialStatus: DestinationInitialStatus + ): String { + val stream = initialStatus.streamConfig + if (!initialStatus.isFinalTableEmpty || initialStatus.isSchemaMismatch) { + // overwrite an existing tmp table if needed. + storageOperation.createFinalTable(stream, TMP_OVERWRITE_TABLE_SUFFIX, true) + log.info { + "Using temp final table for table ${stream.id.finalName}, this will be overwritten at end of sync" + } + // We want to overwrite an existing table. Write into a tmp table. + // We'll overwrite the table at the + // end of the sync. + return TMP_OVERWRITE_TABLE_SUFFIX + } + + log.info { + "Final Table for stream ${stream.id.finalName} is empty and matches the expected v2 format, writing to table directly" + } + return NO_SUFFIX + } + + /** Write records will be destination type specific, Insert vs staging based on format */ + abstract override fun writeRecords( + streamConfig: StreamConfig, + stream: Stream + ) + + override fun finalizeTable(streamConfig: StreamConfig, syncSummary: StreamSyncSummary) { + // Delete staging directory, implementation will handle if it has to do it or not or a No-OP + storageOperation.cleanupStage(streamConfig.id) + if (disableTypeDedupe) { + log.info { + "Typing and deduping disabled, skipping final table finalization. " + + "Raw records can be found at ${streamConfig.id.rawNamespace}.${streamConfig.id.rawName}" + } + return + } + + // Legacy logic that if recordsWritten or not tracked then it could be non-zero + val isNotOverwriteSync = streamConfig.destinationSyncMode != DestinationSyncMode.OVERWRITE + // Legacy logic that if recordsWritten or not tracked then it could be non-zero. + // But for OVERWRITE syncs, we don't need to look at old records. + val shouldRunTypingDeduping = + syncSummary.recordsWritten.map { it > 0 }.orElse(true) || + (initialRawTableStatus.hasUnprocessedRecords && isNotOverwriteSync) + if (!shouldRunTypingDeduping) { + log.info { + "Skipping typing and deduping for stream ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} " + + "because it had no records during this sync and no unprocessed records from a previous sync." + } + } else { + // In overwrite mode, we want to read all the raw records. Typically, this is equivalent + // to filtering on timestamp, but might as well be explicit. + val timestampFilter = + if (isNotOverwriteSync) { + initialRawTableStatus.maxProcessedTimestamp + } else { + Optional.empty() + } + storageOperation.typeAndDedupe(streamConfig, timestampFilter, finalTmpTableSuffix) + } + + // For overwrite, it's wasteful to do T+D, so we don't do soft-reset in prepare. Instead, we + // do + // type-dedupe + // on a suffixed table and do a swap here when we have to for schema mismatches + if ( + streamConfig.destinationSyncMode == DestinationSyncMode.OVERWRITE && + finalTmpTableSuffix.isNotBlank() + ) { + storageOperation.overwriteFinalTable(streamConfig, finalTmpTableSuffix) + } + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultFlush.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultFlush.kt new file mode 100644 index 000000000000..b4516a0a331f --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultFlush.kt @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.operation + +import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.cdk.integrations.destination.operation.SyncOperation +import io.airbyte.protocol.models.v0.StreamDescriptor +import java.util.stream.Stream + +class DefaultFlush( + override val optimalBatchSizeBytes: Long, + private val syncOperation: SyncOperation +) : DestinationFlushFunction { + override fun flush(streamDescriptor: StreamDescriptor, stream: Stream) { + syncOperation.flushStream(streamDescriptor, stream) + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperation.kt new file mode 100644 index 000000000000..cb49d82da382 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperation.kt @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.operation + +import io.airbyte.cdk.integrations.destination.StreamSyncSummary +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.cdk.integrations.destination.operation.SyncOperation +import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil as exceptions +import io.airbyte.commons.concurrency.CompletableFutures.allOf +import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler +import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog +import io.airbyte.integrations.base.destination.typing_deduping.StreamId +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil as tdutils +import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState +import io.airbyte.protocol.models.v0.StreamDescriptor +import io.github.oshai.kotlinlogging.KotlinLogging +import java.util.concurrent.CompletableFuture +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.stream.Stream +import org.apache.commons.lang3.concurrent.BasicThreadFactory + +class DefaultSyncOperation( + private val parsedCatalog: ParsedCatalog, + private val destinationHandler: DestinationHandler, + private val defaultNamespace: String, + private val streamOperationFactory: StreamOperationFactory, + private val migrations: List>, + private val executorService: ExecutorService = + Executors.newFixedThreadPool( + 10, + BasicThreadFactory.Builder().namingPattern("sync-operations-%d").build(), + ) +) : SyncOperation { + companion object { + // Use companion to be accessible during instantiation with init + private val log = KotlinLogging.logger {} + } + + private val streamOpsMap: Map> + init { + streamOpsMap = createPerStreamOpClients() + } + + private fun createPerStreamOpClients(): Map> { + log.info { "Preparing required schemas and tables for all streams" } + val streamsInitialStates = destinationHandler.gatherInitialState(parsedCatalog.streams) + + val postMigrationInitialStates = + tdutils.executeRawTableMigrations( + executorService, + destinationHandler, + migrations, + streamsInitialStates + ) + destinationHandler.commitDestinationStates( + postMigrationInitialStates.associate { it.streamConfig.id to it.destinationState } + ) + + val initializationFutures = + postMigrationInitialStates + .map { + CompletableFuture.supplyAsync( + { Pair(it.streamConfig.id, streamOperationFactory.createInstance(it)) }, + executorService, + ) + } + .toList() + val futuresResult = allOf(initializationFutures).toCompletableFuture().get() + val result = + exceptions.getResultsOrLogAndThrowFirst( + "Following exceptions occurred during sync initialization", + futuresResult, + ) + destinationHandler.commitDestinationStates( + futuresResult + // If we're here, then all the futures were successful, so we're in the Right case + // of every Either + .map { it.right!! } + .associate { (id, streamOps) -> id to streamOps.updatedDestinationState } + ) + return result.toMap() + } + + override fun flushStream(descriptor: StreamDescriptor, stream: Stream) { + val streamConfig = + parsedCatalog.getStream(descriptor.namespace ?: defaultNamespace, descriptor.name) + streamOpsMap[streamConfig.id]?.writeRecords(streamConfig, stream) + } + + override fun finalizeStreams(streamSyncSummaries: Map) { + try { + // Only call finalizeTable operations which has summary. rest will be skipped + val finalizeFutures = + streamSyncSummaries.entries + .map { + CompletableFuture.supplyAsync( + { + val streamConfig = + parsedCatalog.getStream( + it.key.namespace ?: defaultNamespace, + it.key.name, + ) + streamOpsMap[streamConfig.id]?.finalizeTable(streamConfig, it.value) + }, + executorService, + ) + } + .toList() + val futuresResult = allOf(finalizeFutures).toCompletableFuture().join() + exceptions.getResultsOrLogAndThrowFirst( + "Following exceptions occurred while finalizing the sync", + futuresResult, + ) + } finally { + log.info { "Cleaning up sync operation thread pools" } + executorService.shutdown() + } + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StandardStreamOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StandardStreamOperation.kt new file mode 100644 index 000000000000..e437f1a63f52 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StandardStreamOperation.kt @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.operation + +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState +import java.util.stream.Stream + +/** + * This is a pass through stream operation which hands off the [Stream] of messages to the + * [StorageOperation.writeToStage] + */ +class StandardStreamOperation( + private val storageOperation: StorageOperation>, + destinationInitialStatus: DestinationInitialStatus, + disableTypeDedupe: Boolean = false +) : + AbstractStreamOperation>( + storageOperation, + destinationInitialStatus, + disableTypeDedupe + ) { + override fun writeRecords(streamConfig: StreamConfig, stream: Stream) { + storageOperation.writeToStage(streamConfig.id, stream) + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt new file mode 100644 index 000000000000..359706dd04e7 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.operation + +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import io.airbyte.integrations.base.destination.typing_deduping.StreamId +import io.airbyte.protocol.models.v0.DestinationSyncMode +import java.time.Instant +import java.util.Optional + +interface StorageOperation { + /* + * ==================== Staging or Raw table Operations ================================ + */ + + /** + * Prepare staging area which cloud be creating any object storage, temp tables or file storage + */ + fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) + + /** Delete previously staged data, using deterministic information from streamId. */ + fun cleanupStage(streamId: StreamId) + + /** Write data to stage. */ + fun writeToStage(streamId: StreamId, data: Data) + + /* + * ==================== Final Table Operations ================================ + */ + + /** Create final namespace extracted from [StreamId] */ + fun createFinalNamespace(streamId: StreamId) + + /** Create final table extracted from [StreamId] */ + fun createFinalTable(streamConfig: StreamConfig, suffix: String, replace: Boolean) + + /** Reset the final table using a temp table or ALTER existing table's columns. */ + fun softResetFinalTable(streamConfig: StreamConfig) + + /** + * Attempt to atomically swap the final table (name and namespace extracted from [StreamId]). + * This could be destination specific, INSERT INTO..SELECT * and DROP TABLE OR CREATE OR REPLACE + * ... SELECT *, DROP TABLE + */ + fun overwriteFinalTable(streamConfig: StreamConfig, tmpTableSuffix: String) + + /** + */ + fun typeAndDedupe( + streamConfig: StreamConfig, + maxProcessedTimestamp: Optional, + finalTableSuffix: String + ) +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StreamOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StreamOperation.kt new file mode 100644 index 000000000000..fea5df12bb58 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StreamOperation.kt @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.operation + +import io.airbyte.cdk.integrations.destination.StreamSyncSummary +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import java.util.stream.Stream + +/** Operations on individual streams. */ +interface StreamOperation { + + val updatedDestinationState: T + + fun writeRecords(streamConfig: StreamConfig, stream: Stream) + + fun finalizeTable(streamConfig: StreamConfig, syncSummary: StreamSyncSummary) +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StreamOperationFactory.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StreamOperationFactory.kt new file mode 100644 index 000000000000..60f0daec2388 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StreamOperationFactory.kt @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.operation + +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus + +fun interface StreamOperationFactory { + + /** + * Create an instance with required dependencies injected using a concrete factory + * implementation. + */ + fun createInstance( + destinationInitialStatus: DestinationInitialStatus + ): StreamOperation +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.kt index 903ab16df20e..79585dca47c5 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.kt @@ -7,9 +7,9 @@ import io.airbyte.cdk.integrations.base.IntegrationRunner import io.airbyte.cdk.integrations.destination.StreamSyncSummary import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil.getResultsOrLogAndThrowFirst import io.airbyte.commons.concurrency.CompletableFutures -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.Companion.executeRawTableMigrations -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.Companion.executeWeirdMigrations -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.Companion.prepareSchemas +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.executeRawTableMigrations +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.executeWeirdMigrations +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.prepareSchemas import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState import io.airbyte.protocol.models.v0.DestinationSyncMode @@ -186,7 +186,7 @@ class DefaultTyperDeduper( // Make sure it has the right schema. // Also, if a raw table migration wants us to do a soft reset, do that // here. - TypeAndDedupeTransaction.executeSoftReset( + TyperDeduperUtil.executeSoftReset( sqlGenerator, destinationHandler, stream @@ -267,7 +267,7 @@ class DefaultTyperDeduper( val initialRawTableStatus = initialRawTableStateByStream.getValue(streamConfig.id) - TypeAndDedupeTransaction.executeTypeAndDedupe( + TyperDeduperUtil.executeTypeAndDedupe( sqlGenerator, destinationHandler, streamConfig, diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.kt index 6a55c364852b..7d9206f8810c 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.kt @@ -5,9 +5,9 @@ package io.airbyte.integrations.base.destination.typing_deduping import io.airbyte.cdk.integrations.base.IntegrationRunner import io.airbyte.cdk.integrations.destination.StreamSyncSummary -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.Companion.executeRawTableMigrations -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.Companion.executeWeirdMigrations -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.Companion.prepareSchemas +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.executeRawTableMigrations +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.executeWeirdMigrations +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.prepareSchemas import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState import io.airbyte.protocol.models.v0.StreamDescriptor diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.kt index c37b25926467..50d318b5933c 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.kt @@ -95,7 +95,7 @@ interface SqlGenerator { * @return */ fun prepareTablesForSoftReset(stream: StreamConfig): Sql { - val createTempTable = createTable(stream, TypeAndDedupeTransaction.SOFT_RESET_SUFFIX, true) + val createTempTable = createTable(stream, TyperDeduperUtil.SOFT_RESET_SUFFIX, true) val clearLoadedAt = clearLoadedAt(stream.id) return Sql.Companion.concat(createTempTable, clearLoadedAt) } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TypeAndDedupeTransaction.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TypeAndDedupeTransaction.kt deleted file mode 100644 index ca36a51f9233..000000000000 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TypeAndDedupeTransaction.kt +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ -package io.airbyte.integrations.base.destination.typing_deduping - -import java.time.Instant -import java.util.* -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -object TypeAndDedupeTransaction { - const val SOFT_RESET_SUFFIX: String = "_ab_soft_reset" - private val LOGGER: Logger = LoggerFactory.getLogger(TypeAndDedupeTransaction::class.java) - - /** - * It can be expensive to build the errors array in the airbyte_meta column, so we first attempt - * an 'unsafe' transaction which assumes everything is typed correctly. If that fails, we will - * run a more expensive query which handles casting errors - * - * @param sqlGenerator for generating sql for the destination - * @param destinationHandler for executing sql created - * @param streamConfig which stream to operate on - * @param minExtractedAt to reduce the amount of data in the query - * @param suffix table suffix for temporary tables - * @throws Exception if the safe query fails - */ - @JvmStatic - @Throws(Exception::class) - fun executeTypeAndDedupe( - sqlGenerator: SqlGenerator, - destinationHandler: DestinationHandler<*>, - streamConfig: StreamConfig?, - minExtractedAt: Optional, - suffix: String - ) { - try { - LOGGER.info( - "Attempting typing and deduping for {}.{} with suffix {}", - streamConfig!!.id.originalNamespace, - streamConfig.id.originalName, - suffix - ) - val unsafeSql = sqlGenerator.updateTable(streamConfig, suffix, minExtractedAt, false) - destinationHandler.execute(unsafeSql) - } catch (e: Exception) { - if (sqlGenerator.shouldRetry(e)) { - // TODO Destination specific non-retryable exceptions should be added. - LOGGER.error( - "Encountered Exception on unsafe SQL for stream {} {} with suffix {}, attempting with error handling", - streamConfig!!.id.originalNamespace, - streamConfig.id.originalName, - suffix, - e - ) - val saferSql = sqlGenerator.updateTable(streamConfig, suffix, minExtractedAt, true) - destinationHandler.execute(saferSql) - } else { - LOGGER.error( - "Encountered Exception on unsafe SQL for stream {} {} with suffix {}, Retry is skipped", - streamConfig!!.id.originalNamespace, - streamConfig.id.originalName, - suffix, - e - ) - throw e - } - } - } - - /** - * Everything in [TypeAndDedupeTransaction.executeTypeAndDedupe] but with a little extra prep - * work for the soft reset temp tables - * - * @param sqlGenerator for generating sql for the destination - * @param destinationHandler for executing sql created - * @param streamConfig which stream to operate on - * @throws Exception if the safe query fails - */ - @JvmStatic - @Throws(Exception::class) - fun executeSoftReset( - sqlGenerator: SqlGenerator, - destinationHandler: DestinationHandler<*>, - streamConfig: StreamConfig - ) { - LOGGER.info( - "Attempting soft reset for stream {} {}", - streamConfig.id.originalNamespace, - streamConfig.id.originalName - ) - destinationHandler.execute(sqlGenerator.prepareTablesForSoftReset(streamConfig)) - executeTypeAndDedupe( - sqlGenerator, - destinationHandler, - streamConfig, - Optional.empty(), - SOFT_RESET_SUFFIX - ) - destinationHandler.execute( - sqlGenerator.overwriteFinalTable(streamConfig.id, SOFT_RESET_SUFFIX) - ) - } -} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt index 961e1ff5b9e6..cfcb1f92106f 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt @@ -8,6 +8,7 @@ import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil.getResultsOrLogAn import io.airbyte.commons.concurrency.CompletableFutures import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState +import java.time.Instant import java.util.* import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletionStage @@ -16,190 +17,274 @@ import java.util.stream.Collectors.toMap import org.slf4j.Logger import org.slf4j.LoggerFactory -class TyperDeduperUtil { - companion object { - private val LOGGER: Logger = LoggerFactory.getLogger(TyperDeduperUtil::class.java) +object TyperDeduperUtil { + const val SOFT_RESET_SUFFIX: String = "_ab_soft_reset" + private val LOGGER: Logger = LoggerFactory.getLogger(TyperDeduperUtil::class.java) - @JvmStatic - fun executeRawTableMigrations( - executorService: ExecutorService, - destinationHandler: DestinationHandler, - migrations: List>, - initialStates: List> - ): List> { - // TODO: Either the migrations run the soft reset and create v2 tables or the actual - // prepare tables. - // unify the logic - // with current state of raw tables & final tables. This is done first before gather - // initial state - // to avoid recreating - // final tables later again. + @JvmStatic + fun executeRawTableMigrations( + executorService: ExecutorService, + destinationHandler: DestinationHandler, + migrations: List>, + initialStates: List> + ): List> { + // TODO: Either the migrations run the soft reset and create v2 tables or the actual + // prepare tables. + // unify the logic + // with current state of raw tables & final tables. This is done first before gather + // initial state + // to avoid recreating + // final tables later again. - // Run migrations in lockstep. Some migrations may require us to refetch the initial - // state. - // We want to be able to batch those calls together across streams. - // If a migration runs on one stream, it's likely to also run on other streams. - // So we can bundle the gatherInitialState calls together. - var currentStates = initialStates - for (migration in migrations) { - // Execute the migration on all streams in parallel - val futures: - Map>> = - currentStates - .stream() - .collect( - toMap( - { it.streamConfig.id }, - { initialState -> - runMigrationsAsync( - executorService, - destinationHandler, - migration, - initialState - ) - } - ) + // Run migrations in lockstep. Some migrations may require us to refetch the initial + // state. + // We want to be able to batch those calls together across streams. + // If a migration runs on one stream, it's likely to also run on other streams. + // So we can bundle the gatherInitialState calls together. + var currentStates = initialStates + for (migration in migrations) { + // Execute the migration on all streams in parallel + val futures: + Map>> = + currentStates + .stream() + .collect( + toMap( + { it.streamConfig.id }, + { initialState -> + runMigrationsAsync( + executorService, + destinationHandler, + migration, + initialState + ) + } ) - val migrationResultFutures = - CompletableFutures.allOf(futures.values.toList()).toCompletableFuture().join() - getResultsOrLogAndThrowFirst( - "The following exceptions were thrown attempting to run migrations:\n", - migrationResultFutures - ) - val migrationResults: Map> = - futures.mapValues { it.value.toCompletableFuture().join() } - - // Check if we need to refetch DestinationInitialState - val invalidatedStreams: Set = - migrationResults.filter { it.value.invalidateInitialState }.keys - val updatedStates: List> - if (invalidatedStreams.isNotEmpty()) { - LOGGER.info("Refetching initial state for streams: $invalidatedStreams") - updatedStates = - destinationHandler.gatherInitialState( - currentStates - .filter { invalidatedStreams.contains(it.streamConfig.id) } - .map { it.streamConfig } - ) - LOGGER.info("Updated states: $updatedStates") - } else { - updatedStates = emptyList() - } - - // Update the DestinationInitialStates with the new DestinationStates, - // and also update initialStates with the refetched states. - currentStates = - currentStates.map { initialState -> - // migrationResults will always contain an entry for each stream, so we can - // safely use !! - val updatedDestinationState = - migrationResults[initialState.streamConfig.id]!!.updatedDestinationState - if (invalidatedStreams.contains(initialState.streamConfig.id)) { - // We invalidated this stream's DestinationInitialState. - // Find the updated DestinationInitialState, and update it with our new - // DestinationState - return@map updatedStates - .filter { updatedState -> - updatedState.streamConfig.id.equals( - initialState.streamConfig.id - ) - } - .first() - .copy(destinationState = updatedDestinationState) - } else { - // Just update the original DestinationInitialState with the new - // DestinationState. - return@map initialState.copy(destinationState = updatedDestinationState) - } - } - } - return currentStates - } - - /** - * The legacy-style migrations (V1V2Migrator, V2TableMigrator) need to run before we gather - * initial state, because they're dumb and weird. (specifically: SnowflakeV2TableMigrator - * inspects the final tables and triggers a soft reset directly within the migration). TODO: - * Migrate these migrations to the new migration system. This will also reduce the number of - * times we need to query DB metadata, since (a) we can rely on the gatherInitialState - * values, and (b) we can add a DestinationState field for these migrations. It also enables - * us to not trigger multiple soft resets in a single sync. - */ - @JvmStatic - fun executeWeirdMigrations( - executorService: ExecutorService, - sqlGenerator: SqlGenerator, - destinationHandler: DestinationHandler, - v1V2Migrator: DestinationV1V2Migrator, - v2TableMigrator: V2TableMigrator, - parsedCatalog: ParsedCatalog - ) { - val futures = - parsedCatalog.streams.map { - CompletableFuture.supplyAsync( - { - v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, it) - v2TableMigrator.migrateIfNecessary(it) - }, - executorService ) - } + val migrationResultFutures = + CompletableFutures.allOf(futures.values.toList()).toCompletableFuture().join() getResultsOrLogAndThrowFirst( "The following exceptions were thrown attempting to run migrations:\n", - CompletableFutures.allOf(futures.toList()).toCompletableFuture().join() + migrationResultFutures ) - } + val migrationResults: Map> = + futures.mapValues { it.value.toCompletableFuture().join() } + + // Check if we need to refetch DestinationInitialState + val invalidatedStreams: Set = + migrationResults.filter { it.value.invalidateInitialState }.keys + val updatedStates: List> + if (invalidatedStreams.isNotEmpty()) { + LOGGER.info("Refetching initial state for streams: $invalidatedStreams") + updatedStates = + destinationHandler.gatherInitialState( + currentStates + .filter { invalidatedStreams.contains(it.streamConfig.id) } + .map { it.streamConfig } + ) + LOGGER.info("Updated states: $updatedStates") + } else { + updatedStates = emptyList() + } - /** - * Extracts all the "raw" and "final" schemas identified in the [parsedCatalog] and ensures - * they exist in the Destination Database. - */ - @JvmStatic - fun prepareSchemas( - sqlGenerator: SqlGenerator, - destinationHandler: DestinationHandler, - parsedCatalog: ParsedCatalog - ) { - val rawSchema = parsedCatalog.streams.map { it.id.rawNamespace } - val finalSchema = parsedCatalog.streams.map { it.id.finalNamespace } - val createAllSchemasSql = - (rawSchema + finalSchema).distinct().map { sqlGenerator.createSchema(it) } - destinationHandler.execute(Sql.concat(createAllSchemasSql)) + // Update the DestinationInitialStates with the new DestinationStates, + // and also update initialStates with the refetched states. + currentStates = + currentStates.map { initialState -> + // migrationResults will always contain an entry for each stream, so we can + // safely use !! + val updatedDestinationState = + migrationResults[initialState.streamConfig.id]!!.updatedDestinationState + if (invalidatedStreams.contains(initialState.streamConfig.id)) { + // We invalidated this stream's DestinationInitialState. + // Find the updated DestinationInitialState, and update it with our new + // DestinationState + return@map updatedStates + .filter { updatedState -> + updatedState.streamConfig.id.equals(initialState.streamConfig.id) + } + .first() + .copy(destinationState = updatedDestinationState) + } else { + // Just update the original DestinationInitialState with the new + // DestinationState. + return@map initialState.copy(destinationState = updatedDestinationState) + } + } } + return currentStates + } - private fun runMigrationsAsync( - executorService: ExecutorService, - destinationHandler: DestinationHandler, - migration: Migration, - initialStatus: DestinationInitialStatus - ): CompletionStage> { - return CompletableFuture.supplyAsync( - { - LOGGER.info( - "Maybe executing ${migration.javaClass.simpleName} migration for stream ${initialStatus.streamConfig.id.originalNamespace}.${initialStatus.streamConfig.id.originalName}." - ) + /** + * The legacy-style migrations (V1V2Migrator, V2TableMigrator) need to run before we gather + * initial state, because they're dumb and weird. (specifically: SnowflakeV2TableMigrator + * inspects the final tables and triggers a soft reset directly within the migration). TODO: + * Migrate these migrations to the new migration system. This will also reduce the number of + * times we need to query DB metadata, since (a) we can rely on the gatherInitialState values, + * and (b) we can add a DestinationState field for these migrations. It also enables us to not + * trigger multiple soft resets in a single sync. + */ + @JvmStatic + fun executeWeirdMigrations( + executorService: ExecutorService, + sqlGenerator: SqlGenerator, + destinationHandler: DestinationHandler, + v1V2Migrator: DestinationV1V2Migrator, + v2TableMigrator: V2TableMigrator, + parsedCatalog: ParsedCatalog + ) { + val futures = + parsedCatalog.streams.map { + CompletableFuture.supplyAsync( + { + v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, it) + v2TableMigrator.migrateIfNecessary(it) + }, + executorService + ) + } + getResultsOrLogAndThrowFirst( + "The following exceptions were thrown attempting to run migrations:\n", + CompletableFutures.allOf(futures.toList()).toCompletableFuture().join() + ) + } - // We technically don't need to track this, but might as well hedge against - // migrations - // accidentally setting softReset=false - val softReset = initialStatus.destinationState.needsSoftReset() - val migrationResult = - migration.migrateIfNecessary( - destinationHandler, - initialStatus.streamConfig, - initialStatus - ) - val updatedNeedsSoftReset = - softReset || migrationResult.updatedDestinationState.needsSoftReset() - return@supplyAsync migrationResult.copy( - updatedDestinationState = - migrationResult.updatedDestinationState.withSoftReset( - updatedNeedsSoftReset - ) + /** + * Extracts all the "raw" and "final" schemas identified in the [parsedCatalog] and ensures they + * exist in the Destination Database. + */ + @JvmStatic + fun prepareSchemas( + sqlGenerator: SqlGenerator, + destinationHandler: DestinationHandler, + parsedCatalog: ParsedCatalog + ) { + val rawSchema = parsedCatalog.streams.map { it.id.rawNamespace } + val finalSchema = parsedCatalog.streams.map { it.id.finalNamespace } + val createAllSchemasSql = + (rawSchema + finalSchema).distinct().map { sqlGenerator.createSchema(it) } + destinationHandler.execute(Sql.concat(createAllSchemasSql)) + } + + private fun runMigrationsAsync( + executorService: ExecutorService, + destinationHandler: DestinationHandler, + migration: Migration, + initialStatus: DestinationInitialStatus + ): CompletionStage> { + return CompletableFuture.supplyAsync( + { + LOGGER.info( + "Maybe executing ${migration.javaClass.simpleName} migration for stream ${initialStatus.streamConfig.id.originalNamespace}.${initialStatus.streamConfig.id.originalName}." + ) + + // We technically don't need to track this, but might as well hedge against + // migrations + // accidentally setting softReset=false + val softReset = initialStatus.destinationState.needsSoftReset() + val migrationResult = + migration.migrateIfNecessary( + destinationHandler, + initialStatus.streamConfig, + initialStatus ) - }, - executorService + val updatedNeedsSoftReset = + softReset || migrationResult.updatedDestinationState.needsSoftReset() + return@supplyAsync migrationResult.copy( + updatedDestinationState = + migrationResult.updatedDestinationState.withSoftReset(updatedNeedsSoftReset) + ) + }, + executorService + ) + } + + /** + * It can be expensive to build the errors array in the airbyte_meta column, so we first attempt + * an 'unsafe' transaction which assumes everything is typed correctly. If that fails, we will + * run a more expensive query which handles casting errors + * + * @param sqlGenerator for generating sql for the destination + * @param destinationHandler for executing sql created + * @param streamConfig which stream to operate on + * @param minExtractedAt to reduce the amount of data in the query + * @param suffix table suffix for temporary tables + * @throws Exception if the safe query fails + */ + @JvmStatic + @Throws(Exception::class) + fun executeTypeAndDedupe( + sqlGenerator: SqlGenerator, + destinationHandler: DestinationHandler<*>, + streamConfig: StreamConfig?, + minExtractedAt: Optional, + suffix: String + ) { + try { + LOGGER.info( + "Attempting typing and deduping for {}.{} with suffix {}", + streamConfig!!.id.originalNamespace, + streamConfig.id.originalName, + suffix ) + val unsafeSql = sqlGenerator.updateTable(streamConfig, suffix, minExtractedAt, false) + destinationHandler.execute(unsafeSql) + } catch (e: Exception) { + if (sqlGenerator.shouldRetry(e)) { + // TODO Destination specific non-retryable exceptions should be added. + LOGGER.error( + "Encountered Exception on unsafe SQL for stream {} {} with suffix {}, attempting with error handling", + streamConfig!!.id.originalNamespace, + streamConfig.id.originalName, + suffix, + e + ) + val saferSql = sqlGenerator.updateTable(streamConfig, suffix, minExtractedAt, true) + destinationHandler.execute(saferSql) + } else { + LOGGER.error( + "Encountered Exception on unsafe SQL for stream {} {} with suffix {}, Retry is skipped", + streamConfig!!.id.originalNamespace, + streamConfig.id.originalName, + suffix, + e + ) + throw e + } } } + + /** + * Everything in [TyperDeduperUtil.executeTypeAndDedupe] but with a little extra prep work for + * the soft reset temp tables + * + * @param sqlGenerator for generating sql for the destination + * @param destinationHandler for executing sql created + * @param streamConfig which stream to operate on + * @throws Exception if the safe query fails + */ + @JvmStatic + @Throws(Exception::class) + fun executeSoftReset( + sqlGenerator: SqlGenerator, + destinationHandler: DestinationHandler<*>, + streamConfig: StreamConfig + ) { + LOGGER.info( + "Attempting soft reset for stream {} {}", + streamConfig.id.originalNamespace, + streamConfig.id.originalName + ) + destinationHandler.execute(sqlGenerator.prepareTablesForSoftReset(streamConfig)) + executeTypeAndDedupe( + sqlGenerator, + destinationHandler, + streamConfig, + Optional.empty(), + SOFT_RESET_SUFFIX + ) + destinationHandler.execute( + sqlGenerator.overwriteFinalTable(streamConfig.id, SOFT_RESET_SUFFIX) + ) + } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperationTest.kt new file mode 100644 index 000000000000..0baed27a6f82 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperationTest.kt @@ -0,0 +1,595 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.operation + +import io.airbyte.cdk.integrations.destination.StreamSyncSummary +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType +import io.airbyte.integrations.base.destination.typing_deduping.ColumnId +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus +import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import io.airbyte.integrations.base.destination.typing_deduping.StreamId +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState +import io.airbyte.protocol.models.v0.DestinationSyncMode +import io.mockk.checkUnnecessaryStub +import io.mockk.clearMocks +import io.mockk.confirmVerified +import io.mockk.every +import io.mockk.mockk +import io.mockk.verifySequence +import java.time.Instant +import java.util.Optional +import java.util.stream.Stream +import org.junit.jupiter.api.Nested +import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.Arguments +import org.junit.jupiter.params.provider.MethodSource +import org.junit.jupiter.params.provider.ValueSource + +/** + * Verify that [AbstractStreamOperation] behaves correctly, given various initial states. We + * intentionally mock the [DestinationInitialStatus]. This allows us to verify that the stream ops + * only looks at specific fields - the mocked initial statuses will throw exceptions for unstubbed + * methods. + * + * For example, we don't need to write separate test cases for "final table does not exist and + * destination state has softReset=true/false" - instead we have a single test case for "final table + * does not exist", and it doesn't stub the `needsSoftReset` method. If we introduce a bug in stream + * ops and it starts checking needsSoftReset even though the final table doesn't exist, then these + * tests will start failing. + */ +class AbstractStreamOperationTest { + class TestStreamOperation( + storageOperation: StorageOperation>, + destinationInitialStatus: DestinationInitialStatus + ) : + AbstractStreamOperation>( + storageOperation, + destinationInitialStatus, + ) { + override fun writeRecords( + streamConfig: StreamConfig, + stream: Stream + ) { + // noop + } + } + + // This mock is purely for verification. Set relaxed=true so we don't need to stub every call. + // Our tests use confirmVerified() to check that we didn't miss any actions. + private val storageOperation = + mockk>>(relaxed = true) + + @Nested + inner class Overwrite { + private val streamConfig = + StreamConfig( + streamId, + DestinationSyncMode.OVERWRITE, + listOf(), + Optional.empty(), + columns, + // TODO currently these values are unused. Eventually we should restructure this + // class + // to test based on generation ID instead of sync mode. + 0, + 0, + 0 + ) + + @Test + fun emptyDestination() { + val initialState = + mockk> { + every { streamConfig } returns this@Overwrite.streamConfig + every { initialRawTableStatus } returns mockk() + every { isFinalTablePresent } returns false + every { + destinationState.withSoftReset(any()) + } returns destinationState + } + + val streamOperations = TestStreamOperation(storageOperation, initialState) + + verifySequence { + storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.createFinalNamespace(streamId) + storageOperation.createFinalTable(streamConfig, "", false) + } + confirmVerified(storageOperation) + + clearMocks(storageOperation) + streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(42))) + + verifySequence { + storageOperation.cleanupStage(streamId) + storageOperation.typeAndDedupe( + streamConfig, + Optional.empty(), + "", + ) + } + confirmVerified(storageOperation) + checkUnnecessaryStub( + initialState, + initialState.initialRawTableStatus, + initialState.destinationState + ) + } + + @Test + fun existingEmptyTableSchemaMismatch() { + val initialState = + mockk> { + every { streamConfig } returns this@Overwrite.streamConfig + every { initialRawTableStatus } returns mockk() + every { isFinalTablePresent } returns true + every { isFinalTableEmpty } returns true + // Even though there's a schema mismatch, we're running in overwrite mode, + // so we shouldn't execute a soft reset. + // We do need to use a temp final table though. + every { isSchemaMismatch } returns true + every { + destinationState.withSoftReset(any()) + } returns destinationState + } + + val streamOperations = TestStreamOperation(storageOperation, initialState) + + verifySequence { + storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.createFinalNamespace(streamId) + storageOperation.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true) + } + confirmVerified(storageOperation) + + clearMocks(storageOperation) + streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(42))) + + verifySequence { + storageOperation.cleanupStage(streamId) + storageOperation.typeAndDedupe( + streamConfig, + Optional.empty(), + EXPECTED_OVERWRITE_SUFFIX, + ) + storageOperation.overwriteFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX) + } + confirmVerified(storageOperation) + checkUnnecessaryStub( + initialState, + initialState.initialRawTableStatus, + initialState.destinationState + ) + } + + @Test + fun existingEmptyTableMatchingSchema() { + val initialState = + mockk> { + every { streamConfig } returns this@Overwrite.streamConfig + every { initialRawTableStatus } returns mockk() + every { isFinalTablePresent } returns true + every { isFinalTableEmpty } returns true + every { isSchemaMismatch } returns false + every { + destinationState.withSoftReset(any()) + } returns destinationState + } + + val streamOperations = TestStreamOperation(storageOperation, initialState) + + verifySequence { + storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.createFinalNamespace(streamId) + // No table creation - we can just reuse the existing table. + } + confirmVerified(storageOperation) + + clearMocks(storageOperation) + streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(42))) + + verifySequence { + storageOperation.cleanupStage(streamId) + storageOperation.typeAndDedupe( + streamConfig, + Optional.empty(), + "", + ) + } + confirmVerified(storageOperation) + checkUnnecessaryStub( + initialState, + initialState.initialRawTableStatus, + initialState.destinationState + ) + } + + @Test + fun existingNonEmptyTable() { + val initialState = + mockk> { + every { streamConfig } returns this@Overwrite.streamConfig + every { initialRawTableStatus } returns mockk() + every { isFinalTablePresent } returns true + every { isFinalTableEmpty } returns false + every { + destinationState.withSoftReset(any()) + } returns destinationState + } + + val streamOperations = TestStreamOperation(storageOperation, initialState) + + verifySequence { + storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.createFinalNamespace(streamId) + storageOperation.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true) + } + confirmVerified(storageOperation) + + clearMocks(storageOperation) + streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(42))) + + verifySequence { + storageOperation.cleanupStage(streamId) + storageOperation.typeAndDedupe( + streamConfig, + Optional.empty(), + EXPECTED_OVERWRITE_SUFFIX, + ) + storageOperation.overwriteFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX) + } + confirmVerified(storageOperation) + checkUnnecessaryStub( + initialState, + initialState.initialRawTableStatus, + initialState.destinationState + ) + } + + @ParameterizedTest + @ValueSource(booleans = [true, false]) + fun existingNonEmptyTableNoNewRecords(hasUnprocessedRecords: Boolean) { + val initialState = + mockk> { + every { streamConfig } returns this@Overwrite.streamConfig + every { initialRawTableStatus } returns mockk() + // This is an overwrite sync, so we can ignore the old raw records. + // We should skip T+D if the current sync emitted 0 records. + every { initialRawTableStatus.hasUnprocessedRecords } returns + hasUnprocessedRecords + every { isFinalTablePresent } returns true + every { isFinalTableEmpty } returns false + every { + destinationState.withSoftReset(any()) + } returns destinationState + } + + val streamOperations = TestStreamOperation(storageOperation, initialState) + + verifySequence { + storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.createFinalNamespace(streamId) + storageOperation.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true) + } + confirmVerified(storageOperation) + + clearMocks(storageOperation) + streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(0))) + + verifySequence { + storageOperation.cleanupStage(streamId) + storageOperation.overwriteFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX) + } + confirmVerified(storageOperation) + checkUnnecessaryStub( + initialState, + initialState.initialRawTableStatus, + initialState.destinationState + ) + } + } + + @Nested + inner class NonOverwrite { + @ParameterizedTest + @MethodSource( + "io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#nonOverwriteStreamConfigs" + ) + fun emptyDestination(streamConfig: StreamConfig) { + val initialState = + mockk> { + every { this@mockk.streamConfig } returns streamConfig + every { initialRawTableStatus.maxProcessedTimestamp } returns Optional.empty() + every { isFinalTablePresent } returns false + every { + destinationState.withSoftReset(any()) + } returns destinationState + } + + val streamOperations = TestStreamOperation(storageOperation, initialState) + + verifySequence { + storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.createFinalNamespace(streamId) + storageOperation.createFinalTable(streamConfig, "", false) + } + confirmVerified(storageOperation) + + clearMocks(storageOperation) + streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(42))) + + verifySequence { + storageOperation.cleanupStage(streamId) + storageOperation.typeAndDedupe( + streamConfig, + Optional.empty(), + "", + ) + } + confirmVerified(storageOperation) + checkUnnecessaryStub( + initialState, + initialState.initialRawTableStatus, + initialState.destinationState + ) + } + + @ParameterizedTest + @MethodSource( + "io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#nonOverwriteStreamConfigs" + ) + fun existingTableSchemaMismatch(streamConfig: StreamConfig) { + val initialState = + mockk> { + every { this@mockk.streamConfig } returns streamConfig + every { initialRawTableStatus.maxProcessedTimestamp } returns Optional.empty() + every { isFinalTablePresent } returns true + every { isSchemaMismatch } returns true + every { + destinationState.withSoftReset(any()) + } returns destinationState + } + + val streamOperations = TestStreamOperation(storageOperation, initialState) + + verifySequence { + storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.createFinalNamespace(streamId) + storageOperation.softResetFinalTable(streamConfig) + } + confirmVerified(storageOperation) + + clearMocks(storageOperation) + streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(42))) + + verifySequence { + storageOperation.cleanupStage(streamId) + storageOperation.typeAndDedupe( + streamConfig, + Optional.empty(), + "", + ) + } + confirmVerified(storageOperation) + checkUnnecessaryStub( + initialState, + initialState.initialRawTableStatus, + initialState.destinationState + ) + } + + @ParameterizedTest + @MethodSource( + "io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#nonOverwriteStreamConfigs" + ) + fun existingTableSchemaMatch(streamConfig: StreamConfig) { + val initialState = + mockk> { + every { this@mockk.streamConfig } returns streamConfig + every { initialRawTableStatus.maxProcessedTimestamp } returns Optional.empty() + every { isFinalTablePresent } returns true + every { isSchemaMismatch } returns false + every { destinationState } returns MinimumDestinationState.Impl(false) + } + + val streamOperations = TestStreamOperation(storageOperation, initialState) + + verifySequence { + storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.createFinalNamespace(streamId) + // No soft reset - we can just reuse the existing table. + } + confirmVerified(storageOperation) + + clearMocks(storageOperation) + streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(42))) + + verifySequence { + storageOperation.cleanupStage(streamId) + storageOperation.typeAndDedupe( + streamConfig, + Optional.empty(), + "", + ) + } + confirmVerified(storageOperation) + checkUnnecessaryStub(initialState, initialState.initialRawTableStatus) + } + + @ParameterizedTest + @MethodSource( + "io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#nonOverwriteStreamConfigs" + ) + fun existingTableAndStateRequiresSoftReset(streamConfig: StreamConfig) { + val initialState = + mockk> { + every { this@mockk.streamConfig } returns streamConfig + every { initialRawTableStatus.maxProcessedTimestamp } returns Optional.empty() + every { isFinalTablePresent } returns true + every { isSchemaMismatch } returns false + every { destinationState } returns MinimumDestinationState.Impl(true) + } + + val streamOperations = TestStreamOperation(storageOperation, initialState) + + verifySequence { + storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.createFinalNamespace(streamId) + storageOperation.softResetFinalTable(streamConfig) + } + confirmVerified(storageOperation) + + clearMocks(storageOperation) + streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(42))) + + verifySequence { + storageOperation.cleanupStage(streamId) + storageOperation.typeAndDedupe( + streamConfig, + Optional.empty(), + "", + ) + } + confirmVerified(storageOperation) + checkUnnecessaryStub(initialState, initialState.initialRawTableStatus) + } + + @ParameterizedTest + @MethodSource( + "io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#nonOverwriteStreamConfigsAndBoolean" + ) + fun existingNonEmptyTableNoNewRecords( + streamConfig: StreamConfig, + hasUnprocessedRecords: Boolean + ) { + val initialState = + mockk> { + every { this@mockk.streamConfig } returns streamConfig + // This is an overwrite sync, so we can ignore the old raw records. + // We should skip T+D if the current sync emitted 0 records. + every { initialRawTableStatus.hasUnprocessedRecords } returns + hasUnprocessedRecords + if (hasUnprocessedRecords) { + // We only care about this value if we're executing T+D. + // If there are no unprocessed records from a previous sync, and no new + // records from + // this sync, + // we don't need to set it. + every { initialRawTableStatus.maxProcessedTimestamp } returns + maxProcessedTimestamp + } + every { isFinalTablePresent } returns true + every { isSchemaMismatch } returns false + every { destinationState } returns MinimumDestinationState.Impl(false) + } + + val streamOperations = TestStreamOperation(storageOperation, initialState) + + verifySequence { + storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.createFinalNamespace(streamId) + } + confirmVerified(storageOperation) + + clearMocks(storageOperation) + streamOperations.finalizeTable(streamConfig, StreamSyncSummary(Optional.of(0))) + + verifySequence { + storageOperation.cleanupStage(streamId) + // If this sync emitted no records, we only need to run T+D if a previous sync + // emitted + // some records but failed to run T+D. + if (hasUnprocessedRecords) { + storageOperation.typeAndDedupe(streamConfig, maxProcessedTimestamp, "") + } + } + confirmVerified(storageOperation) + checkUnnecessaryStub(initialState, initialState.initialRawTableStatus) + } + } + + companion object { + val streamId = + StreamId( + "final_namespace", + "final_name", + "raw_namespace", + "raw_name", + "original_namespace", + "original_name", + ) + private val pk1 = ColumnId("pk1", "pk1_original_name", "pk1_canonical_name") + private val pk2 = ColumnId("pk2", "pk2_original_name", "pk2_canonical_name") + private val cursor = ColumnId("cursor", "cursor_original_name", "cursor_canonical_name") + val columns: LinkedHashMap = + linkedMapOf( + pk1 to AirbyteProtocolType.INTEGER, + pk2 to AirbyteProtocolType.STRING, + cursor to AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE, + ColumnId( + "username", + "username_original_name", + "username_canonical_name", + ) to AirbyteProtocolType.STRING, + ) + + const val EXPECTED_OVERWRITE_SUFFIX = "_airbyte_tmp" + val maxProcessedTimestamp = Optional.of(Instant.parse("2024-01-23T12:34:56Z")) + + private val appendStreamConfig = + StreamConfig( + streamId, + DestinationSyncMode.APPEND, + listOf(), + Optional.empty(), + columns, + // TODO currently these values are unused. Eventually we should restructure this + // class + // to test based on generation ID instead of sync mode. + 0, + 0, + 0 + ) + private val dedupStreamConfig = + StreamConfig( + streamId, + DestinationSyncMode.APPEND_DEDUP, + listOf(pk1, pk2), + Optional.of(cursor), + columns, + // TODO currently these values are unused. Eventually we should restructure this + // class + // to test based on generation ID instead of sync mode. + 0, + 0, + 0 + ) + + // junit 5 doesn't support class-level parameterization... + // so we have to hack this in a somewhat dumb way. + // append and dedup should behave identically from StreamOperations' POV, + // so just shove them together. + @JvmStatic + fun nonOverwriteStreamConfigs(): Stream = + Stream.of( + Arguments.of(appendStreamConfig), + Arguments.of(dedupStreamConfig), + ) + + // Some tests are further parameterized, which this method supports. + @JvmStatic + fun nonOverwriteStreamConfigsAndBoolean(): Stream = + Stream.of( + Arguments.of(appendStreamConfig, true), + Arguments.of(appendStreamConfig, false), + Arguments.of(dedupStreamConfig, true), + Arguments.of(dedupStreamConfig, false), + ) + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperationTest.kt new file mode 100644 index 000000000000..31ab363852ff --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperationTest.kt @@ -0,0 +1,307 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.operation + +import io.airbyte.cdk.integrations.destination.StreamSyncSummary +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus +import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus +import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog +import io.airbyte.integrations.base.destination.typing_deduping.Sql +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import io.airbyte.integrations.base.destination.typing_deduping.StreamId +import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState +import io.airbyte.protocol.models.v0.DestinationSyncMode +import io.mockk.clearMocks +import io.mockk.confirmVerified +import io.mockk.every +import io.mockk.mockk +import io.mockk.spyk +import io.mockk.verify +import io.mockk.verifySequence +import java.util.Optional +import java.util.stream.Stream +import kotlin.test.assertEquals +import org.junit.jupiter.api.Test + +/** + * All of these tests use APPEND sync mode for simplicity. + * [io.airbyte.cdk.integrations.destination.operation.SyncOperation] doesn't care about sync mode; + * all of that logic lives in [StreamOperation]. + */ +class DefaultSyncOperationTest { + private data class MockState( + val needsSoftReset: Boolean, + val softResetMigrationCompleted: Boolean, + val nonSoftResetMigrationCompleted: Boolean + ) : MinimumDestinationState { + override fun needsSoftReset(): Boolean = needsSoftReset + + override fun withSoftReset(needsSoftReset: Boolean): T { + @Suppress("UNCHECKED_CAST") return copy(needsSoftReset = needsSoftReset) as T + } + } + private class TestStreamOperation(destinationState: MockState) : StreamOperation { + // Simulate a StreamOperation implementation that triggers a soft reset upon initialization. + override val updatedDestinationState: MockState = destinationState.withSoftReset(false) + override fun writeRecords( + streamConfig: StreamConfig, + stream: Stream + ) {} + override fun finalizeTable(streamConfig: StreamConfig, syncSummary: StreamSyncSummary) {} + } + private val streamOperations: MutableMap> = + mutableMapOf() + private val streamOperationFactory: StreamOperationFactory = + StreamOperationFactory { initialStatus: DestinationInitialStatus -> + streamOperations.computeIfAbsent(initialStatus.streamConfig) { + spyk(TestStreamOperation(initialStatus.destinationState)) + } + } + + private val destinationHandler = mockk>(relaxed = true) + + @Test + fun multipleMigrations() { + val appendInitialStatus = + DestinationInitialStatus( + appendStreamConfig, + isFinalTablePresent = true, + initialRawTableStatus = + InitialRawTableStatus( + rawTableExists = true, + hasUnprocessedRecords = false, + maxProcessedTimestamp = Optional.empty(), + ), + isSchemaMismatch = true, + isFinalTableEmpty = false, + destinationState = + MockState( + needsSoftReset = false, + softResetMigrationCompleted = false, + nonSoftResetMigrationCompleted = false, + ) + ) + every { destinationHandler.gatherInitialState(any()) } returns listOf(appendInitialStatus) + + val syncOperation = + DefaultSyncOperation( + parsedCatalog, + destinationHandler, + "default_ns", + streamOperationFactory, + listOf(migrationWithSoftReset, migrationWithoutSoftReset, badMigration), + ) + + assertEquals(setOf(appendStreamConfig), streamOperations.keys) + verifySequence { + destinationHandler.gatherInitialState(any()) + destinationHandler.execute( + Sql.of("MIGRATE WITH SOFT_RESET airbyte_internal.append_stream;") + ) + destinationHandler.gatherInitialState(any()) + destinationHandler.execute( + Sql.of("MIGRATE WITHOUT SOFT_RESET airbyte_internal.append_stream;") + ) + destinationHandler.execute(Sql.of("BAD MIGRATE airbyte_internal.append_stream;")) + destinationHandler.commitDestinationStates( + mapOf( + appendStreamConfig.id to + MockState( + needsSoftReset = true, + softResetMigrationCompleted = true, + nonSoftResetMigrationCompleted = true, + ), + ), + ) + streamOperations.values.onEach { it.updatedDestinationState } + destinationHandler.commitDestinationStates( + mapOf( + appendStreamConfig.id to + MockState( + needsSoftReset = false, + softResetMigrationCompleted = true, + nonSoftResetMigrationCompleted = true, + ), + ), + ) + } + confirmVerified(destinationHandler) + streamOperations.values.onEach { confirmVerified(it) } + + clearMocks(destinationHandler) + streamOperations.values.onEach { clearMocks(it) } + + syncOperation.finalizeStreams( + mapOf(appendStreamConfig.id.asStreamDescriptor() to StreamSyncSummary(Optional.of(42))) + ) + + verify(exactly = 1) { + streamOperations.values.onEach { + it.finalizeTable(appendStreamConfig, StreamSyncSummary(Optional.of(42))) + } + } + confirmVerified(destinationHandler) + streamOperations.values.onEach { confirmVerified(it) } + } + + /** + * Verify that with an initial DestinationState containing needsSoftReset=true, even if no + * migrations trigger a soft reset, we still retain the soft reset status. + */ + @Test + fun initialSoftReset() { + val appendInitialStatus = + DestinationInitialStatus( + appendStreamConfig, + isFinalTablePresent = true, + initialRawTableStatus = + InitialRawTableStatus( + rawTableExists = true, + hasUnprocessedRecords = false, + maxProcessedTimestamp = Optional.empty(), + ), + isSchemaMismatch = true, + isFinalTableEmpty = false, + destinationState = + MockState( + // Note the needsSoftReset=true here. + needsSoftReset = true, + softResetMigrationCompleted = true, + nonSoftResetMigrationCompleted = true, + ) + ) + every { destinationHandler.gatherInitialState(any()) } returns listOf(appendInitialStatus) + + DefaultSyncOperation( + parsedCatalog, + destinationHandler, + "default_ns", + streamOperationFactory, + listOf(migrationWithSoftReset, migrationWithoutSoftReset, badMigration), + ) + + assertEquals(setOf(appendStreamConfig), streamOperations.keys) + verifySequence { + destinationHandler.gatherInitialState(any()) + // The "badly-written" migration doesn't check anything in the state, + // so it always executes. + destinationHandler.execute(Sql.of("BAD MIGRATE airbyte_internal.append_stream;")) + destinationHandler.commitDestinationStates( + mapOf( + appendStreamConfig.id to + MockState( + needsSoftReset = true, + softResetMigrationCompleted = true, + nonSoftResetMigrationCompleted = true, + ), + ), + ) + streamOperations.values.onEach { it.updatedDestinationState } + destinationHandler.commitDestinationStates( + mapOf( + appendStreamConfig.id to + MockState( + needsSoftReset = false, + softResetMigrationCompleted = true, + nonSoftResetMigrationCompleted = true, + ), + ), + ) + } + confirmVerified(destinationHandler) + streamOperations.values.onEach { confirmVerified(it) } + } + + companion object { + // A migration that wants a soft reset, and also requires us to refech initial state + private val migrationWithSoftReset: Migration = + object : Migration { + override fun migrateIfNecessary( + destinationHandler: DestinationHandler, + stream: StreamConfig, + state: DestinationInitialStatus + ): Migration.MigrationResult { + if (!state.destinationState.softResetMigrationCompleted) { + destinationHandler.execute( + Sql.of("MIGRATE WITH SOFT_RESET ${stream.id.rawTableId("")}"), + ) + return Migration.MigrationResult( + state.destinationState.copy( + needsSoftReset = true, + softResetMigrationCompleted = true, + ), + true, + ) + } else { + return Migration.MigrationResult( + state.destinationState, + false, + ) + } + } + } + + // A migration that doesn't do anything interesting + private val migrationWithoutSoftReset: Migration = + object : Migration { + override fun migrateIfNecessary( + destinationHandler: DestinationHandler, + stream: StreamConfig, + state: DestinationInitialStatus + ): Migration.MigrationResult { + if (!state.destinationState.nonSoftResetMigrationCompleted) { + destinationHandler.execute( + Sql.of("MIGRATE WITHOUT SOFT_RESET ${stream.id.rawTableId("")}"), + ) + } + return Migration.MigrationResult( + state.destinationState.copy(nonSoftResetMigrationCompleted = true), + false, + ) + } + } + + // A migration that incorrectly _unsets_ needsSoftReset. + private val badMigration: Migration = + object : Migration { + override fun migrateIfNecessary( + destinationHandler: DestinationHandler, + stream: StreamConfig, + state: DestinationInitialStatus + ): Migration.MigrationResult { + destinationHandler.execute( + Sql.of("BAD MIGRATE ${stream.id.rawTableId("")}"), + ) + return Migration.MigrationResult( + state.destinationState.copy(needsSoftReset = false), + false, + ) + } + } + + private val appendStreamConfig = + StreamConfig( + StreamId( + "append_ns", + "append_stream", + "airbyte_internal", + "append_stream", + "append_ns", + "append_stream" + ), + DestinationSyncMode.APPEND, + listOf(), + Optional.empty(), + linkedMapOf(), + 0, + 0, + 0, + ) + private val parsedCatalog = ParsedCatalog(listOf(appendStreamConfig)) + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt index ac9e81c8d9be..08c2ec8ef980 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt @@ -7,8 +7,8 @@ import com.fasterxml.jackson.databind.JsonNode import com.google.common.collect.Streams import io.airbyte.commons.json.Jsons import io.airbyte.commons.string.Strings -import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeTransaction.executeSoftReset -import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeTransaction.executeTypeAndDedupe +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.executeSoftReset +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.executeTypeAndDedupe import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState import io.airbyte.protocol.models.v0.AirbyteStream import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog @@ -1628,11 +1628,7 @@ abstract class BaseSqlGeneratorIntegrationTest