diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/FeedReader.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/FeedReader.kt index fd8fdcf482c2..c37f0bf469c6 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/FeedReader.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/FeedReader.kt @@ -41,6 +41,9 @@ class FeedReader( log.info { "no more partitions to read for '${feed.label}' in round $partitionsCreatorID" } + // Publish a checkpoint if applicable. + maybeCheckpoint() + // Publish stream completion. emitStreamStatus(AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE) break } @@ -279,11 +282,7 @@ class FeedReader( } } finally { // Publish a checkpoint if applicable. - val stateMessages: List = root.stateManager.checkpoint() - if (stateMessages.isNotEmpty()) { - log.info { "checkpoint of ${stateMessages.size} state message(s)" } - stateMessages.forEach(root.outputConsumer::accept) - } + maybeCheckpoint() } } } @@ -291,6 +290,17 @@ class FeedReader( private suspend fun ctx(nameSuffix: String): CoroutineContext = coroutineContext + ThreadRenamingCoroutineName("${feed.label}-$nameSuffix") + Dispatchers.IO + private fun maybeCheckpoint() { + val stateMessages: List = root.stateManager.checkpoint() + if (stateMessages.isEmpty()) { + return + } + log.info { "checkpoint of ${stateMessages.size} state message(s)" } + for (stateMessage in stateMessages) { + root.outputConsumer.accept(stateMessage) + } + } + private fun emitStreamStatus(status: AirbyteStreamStatusTraceMessage.AirbyteStreamStatus) { if (feed is Stream) { root.outputConsumer.accept( diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManager.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManager.kt index cb4c54493737..97716c9f4574 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManager.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManager.kt @@ -96,9 +96,19 @@ class StateManager( initialState: OpaqueStateValue?, private val isCheckpointUnique: Boolean = true, ) : StateManagerScopedToFeed { - private var current: OpaqueStateValue? = initialState - private var pending: OpaqueStateValue? = initialState - private var pendingNumRecords: Long = 0L + private var current: OpaqueStateValue? + private var pending: OpaqueStateValue? + private var isPending: Boolean + private var pendingNumRecords: Long + + init { + synchronized(this) { + current = initialState + pending = initialState + isPending = initialState != null + pendingNumRecords = 0L + } + } override fun current(): OpaqueStateValue? = synchronized(this) { current } @@ -108,13 +118,14 @@ class StateManager( ) { synchronized(this) { pending = state + isPending = true pendingNumRecords += numRecords } } fun swap(): Pair? { synchronized(this) { - if (isCheckpointUnique && pendingNumRecords == 0L && pending == current) { + if (isCheckpointUnique && !isPending) { return null } val returnValue: Pair = pending to pendingNumRecords diff --git a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManagerFactory.kt b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManagerFactory.kt index 82b0c527ea3d..5f906213d7be 100644 --- a/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManagerFactory.kt +++ b/airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManagerFactory.kt @@ -22,11 +22,15 @@ import io.airbyte.cdk.output.FieldTypeMismatch import io.airbyte.cdk.output.InvalidIncrementalSyncMode import io.airbyte.cdk.output.InvalidPrimaryKey import io.airbyte.cdk.output.MultipleStreamsFound +import io.airbyte.cdk.output.OutputConsumer import io.airbyte.cdk.output.StreamHasNoFields import io.airbyte.cdk.output.StreamNotFound +import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage import io.airbyte.protocol.models.v0.AirbyteStream +import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream +import io.airbyte.protocol.models.v0.StreamDescriptor import io.airbyte.protocol.models.v0.SyncMode import jakarta.inject.Singleton @@ -37,6 +41,7 @@ import jakarta.inject.Singleton @Singleton class StateManagerFactory( val metadataQuerierFactory: MetadataQuerier.Factory, + val outputConsumer: OutputConsumer, val handler: CatalogValidationFailureHandler, ) { /** Generates a [StateManager] instance based on the provided inputs. */ @@ -101,14 +106,28 @@ class StateManagerFactory( val jsonSchemaProperties: JsonNode = stream.jsonSchema["properties"] val name: String = stream.name!! val namespace: String? = stream.namespace + val streamDescriptor = StreamDescriptor().withName(name).withNamespace(namespace) + val streamLabel: String = AirbyteStreamNameNamespacePair(name, namespace).toString() when (metadataQuerier.streamNames(namespace).filter { it == name }.size) { 0 -> { handler.accept(StreamNotFound(name, namespace)) + outputConsumer.accept( + AirbyteErrorTraceMessage() + .withStreamDescriptor(streamDescriptor) + .withFailureType(AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR) + .withMessage("Stream '$streamLabel' not found or not accessible in source.") + ) return null } 1 -> Unit else -> { handler.accept(MultipleStreamsFound(name, namespace)) + outputConsumer.accept( + AirbyteErrorTraceMessage() + .withStreamDescriptor(streamDescriptor) + .withFailureType(AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR) + .withMessage("Multiple streams '$streamLabel' found in source.") + ) return null } } @@ -153,6 +172,12 @@ class StateManagerFactory( } if (streamFields.isEmpty()) { handler.accept(StreamHasNoFields(name, namespace)) + outputConsumer.accept( + AirbyteErrorTraceMessage() + .withStreamDescriptor(streamDescriptor) + .withFailureType(AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR) + .withMessage("Stream '$streamLabel' has no accessible fields.") + ) return null } diff --git a/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/command/SyncsTestFixture.kt b/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/command/SyncsTestFixture.kt index 25a394cff21a..3bf5015426b4 100644 --- a/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/command/SyncsTestFixture.kt +++ b/airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/command/SyncsTestFixture.kt @@ -95,10 +95,11 @@ data object SyncsTestFixture { connectionSupplier: Supplier, prelude: (Connection) -> Unit, configuredCatalog: ConfiguredAirbyteCatalog, + initialState: List = listOf(), vararg afterRead: AfterRead, ) { connectionSupplier.get().use(prelude) - var state: List = listOf() + var state: List = initialState for (step in afterRead) { val readOutput: BufferingOutputConsumer = CliRunner.runSource("read", configPojo, configuredCatalog, state) @@ -113,6 +114,7 @@ data object SyncsTestFixture { connectionSupplier: Supplier, prelude: (Connection) -> Unit, configuredCatalogResource: String, + initialStateResource: String?, vararg afterRead: AfterRead, ) { testReads( @@ -120,6 +122,7 @@ data object SyncsTestFixture { connectionSupplier, prelude, configuredCatalogFromResource(configuredCatalogResource), + initialStateFromResource(initialStateResource), *afterRead, ) } @@ -169,6 +172,14 @@ data object SyncsTestFixture { ConfiguredAirbyteCatalog::class.java, ) + fun initialStateFromResource(initialStateResource: String?): List = + if (initialStateResource == null) { + listOf() + } else { + val initialStateJson: String = ResourceUtils.readResource(initialStateResource) + ValidatedJsonUtils.parseList(AirbyteStateMessage::class.java, initialStateJson) + } + interface AfterRead { fun validate(actualOutput: BufferingOutputConsumer) @@ -182,7 +193,7 @@ data object SyncsTestFixture { object : AfterRead { override fun validate(actualOutput: BufferingOutputConsumer) { // State messages are timing-sensitive and therefore non-deterministic. - // Ignore them. + // Ignore them for now. val expectedWithoutStates: List = expectedMessages .filterNot { it.type == AirbyteMessage.Type.STATE } @@ -193,6 +204,19 @@ data object SyncsTestFixture { .filterNot { it.type == AirbyteMessage.Type.STATE } .sortedBy { Jsons.writeValueAsString(it) } Assertions.assertIterableEquals(expectedWithoutStates, actualWithoutStates) + // Check for state message counts (null if no state messages). + val expectedCount: Double? = + expectedMessages + .filter { it.type == AirbyteMessage.Type.STATE } + .mapNotNull { it.state?.sourceStats?.recordCount } + .reduceRightOrNull { a: Double, b: Double -> a + b } + val actualCount: Double? = + actualOutput + .messages() + .filter { it.type == AirbyteMessage.Type.STATE } + .mapNotNull { it.state?.sourceStats?.recordCount } + .reduceRightOrNull { a: Double, b: Double -> a + b } + Assertions.assertEquals(expectedCount, actualCount) } override fun update(connection: Connection) { diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/h2source/H2SourceIntegrationTest.kt b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/h2source/H2SourceIntegrationTest.kt index 271723e628c2..e00b0d50c7bf 100644 --- a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/h2source/H2SourceIntegrationTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/h2source/H2SourceIntegrationTest.kt @@ -146,6 +146,50 @@ class H2SourceIntegrationTest { } } + @Test + fun testReadStreamStateTooFarAhead() { + H2TestFixture().use { h2: H2TestFixture -> + val configPojo = + H2SourceConfigurationJsonObject().apply { + port = h2.port + database = h2.database + resumablePreferred = true + } + SyncsTestFixture.testReads( + configPojo, + h2::createConnection, + Companion::prelude, + "h2source/incremental-only-catalog.json", + "h2source/state-too-far-ahead.json", + SyncsTestFixture.AfterRead.Companion.fromExpectedMessages( + "h2source/expected-messages-stream-too-far-ahead.json", + ), + ) + } + } + + @Test + fun testReadBadCatalog() { + H2TestFixture().use { h2: H2TestFixture -> + val configPojo = + H2SourceConfigurationJsonObject().apply { + port = h2.port + database = h2.database + resumablePreferred = true + } + SyncsTestFixture.testReads( + configPojo, + h2::createConnection, + Companion::prelude, + "h2source/bad-catalog.json", + initialStateResource = null, + SyncsTestFixture.AfterRead.Companion.fromExpectedMessages( + "h2source/expected-messages-stream-bad-catalog.json", + ), + ) + } + } + companion object { @JvmStatic fun prelude(connection: Connection) { diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/resources/h2source/bad-catalog.json b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/resources/h2source/bad-catalog.json new file mode 100644 index 000000000000..ed520e8d66d0 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/resources/h2source/bad-catalog.json @@ -0,0 +1,27 @@ +{ + "streams": [ + { + "stream": { + "name": "FOO", + "json_schema": { + "type": "object", + "properties": { + "BAR": { + "type": "string" + } + } + }, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": false, + "default_cursor_field": [], + "source_defined_primary_key": [], + "is_resumable": false, + "namespace": "PUBLIC" + }, + "sync_mode": "incremental", + "cursor_field": ["BAR"], + "destination_sync_mode": "overwrite", + "primary_key": [] + } + ] +} diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/resources/h2source/expected-messages-stream-bad-catalog.json b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/resources/h2source/expected-messages-stream-bad-catalog.json new file mode 100644 index 000000000000..7a887e33eee0 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/resources/h2source/expected-messages-stream-bad-catalog.json @@ -0,0 +1,24 @@ +[ + { + "type": "LOG", + "log": { + "level": "WARN", + "message": "StreamNotFound(streamName=FOO, streamNamespace=PUBLIC)" + } + }, + { + "type": "TRACE", + "trace": { + "type": "ERROR", + "emitted_at": 3.1336416e12, + "error": { + "stream_descriptor": { + "name": "FOO", + "namespace": "PUBLIC" + }, + "message": "Stream 'PUBLIC_FOO' not found or not accessible in source.", + "failure_type": "config_error" + } + } + } +] diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/resources/h2source/expected-messages-stream-too-far-ahead.json b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/resources/h2source/expected-messages-stream-too-far-ahead.json new file mode 100644 index 000000000000..5d1136aa32dc --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/resources/h2source/expected-messages-stream-too-far-ahead.json @@ -0,0 +1,51 @@ +[ + { + "type": "TRACE", + "trace": { + "type": "STREAM_STATUS", + "emitted_at": 3.1336416e12, + "stream_status": { + "stream_descriptor": { + "name": "EVENTS", + "namespace": "PUBLIC" + }, + "status": "STARTED" + } + } + }, + { + "type": "STATE", + "state": { + "type": "STREAM", + "stream": { + "stream_descriptor": { + "name": "EVENTS", + "namespace": "PUBLIC" + }, + "stream_state": { + "primary_key": {}, + "cursors": { + "TS": "2024-04-30T00:00:00.000000-04:00" + } + } + }, + "sourceStats": { + "recordCount": 0.0 + } + } + }, + { + "type": "TRACE", + "trace": { + "type": "STREAM_STATUS", + "emitted_at": 3.1336416e12, + "stream_status": { + "stream_descriptor": { + "name": "EVENTS", + "namespace": "PUBLIC" + }, + "status": "COMPLETE" + } + } + } +] diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/resources/h2source/expected-messages-stream-warm-start.json b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/resources/h2source/expected-messages-stream-warm-start.json index a9323f871874..561b7014eace 100644 --- a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/resources/h2source/expected-messages-stream-warm-start.json +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/resources/h2source/expected-messages-stream-warm-start.json @@ -57,7 +57,7 @@ } }, "sourceStats": { - "recordCount": 2.0 + "recordCount": 1.0 } } }, diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/resources/h2source/incremental-only-catalog.json b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/resources/h2source/incremental-only-catalog.json new file mode 100644 index 000000000000..2be1a79d7560 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/resources/h2source/incremental-only-catalog.json @@ -0,0 +1,36 @@ +{ + "streams": [ + { + "stream": { + "name": "EVENTS", + "json_schema": { + "type": "object", + "properties": { + "ID": { + "type": "string", + "contentEncoding": "base64" + }, + "TS": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_with_timezone" + }, + "MSG": { + "type": "string" + } + } + }, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": false, + "default_cursor_field": [], + "source_defined_primary_key": [["ID"]], + "is_resumable": true, + "namespace": "PUBLIC" + }, + "sync_mode": "incremental", + "cursor_field": ["TS"], + "destination_sync_mode": "overwrite", + "primary_key": [["ID"]] + } + ] +} diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/resources/h2source/state-too-far-ahead.json b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/resources/h2source/state-too-far-ahead.json new file mode 100644 index 000000000000..63551381a6e7 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/resources/h2source/state-too-far-ahead.json @@ -0,0 +1,17 @@ +[ + { + "type": "STREAM", + "stream": { + "stream_descriptor": { + "name": "EVENTS", + "namespace": "PUBLIC" + }, + "stream_state": { + "primary_key": {}, + "cursors": { + "TS": "2049-04-30T00:00:00.000000-04:00" + } + } + } + } +]