Skip to content

Commit

Permalink
mysql stream statuses (#38198)
Browse files Browse the repository at this point in the history
Co-authored-by: Xiaohan Song <xiaohan@airbyte.io>
  • Loading branch information
rodireich and xiaohansong authored May 23, 2024
1 parent 503e397 commit 7bd9423
Show file tree
Hide file tree
Showing 18 changed files with 911 additions and 289 deletions.
2 changes: 2 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.35.11 | 2024-05-23 | [\#38357](https://github.com/airbytehq/airbyte/pull/38357) | This release fixes an error on the previous release. |
| 0.35.10 | 2024-05-23 | [\#38357](https://github.com/airbytehq/airbyte/pull/38357) | Add shared code for db sources stream status trace messages and testing. |
| 0.35.8 | 2024-05-22 | [\#38572](https://github.com/airbytehq/airbyte/pull/38572) | Add a temporary static method to decouple SnowflakeDestination from AbstractJdbcDestination |
| 0.35.7 | 2024-05-20 | [\#38357](https://github.com/airbytehq/airbyte/pull/38357) | Decouple create namespace from per stream operation interface. |
| 0.35.6 | 2024-05-17 | [\#38107](https://github.com/airbytehq/airbyte/pull/38107) | New interfaces for Destination connectors to plug into AsyncStreamConsumer |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ object AirbyteTraceMessageUtility {
)
}

private fun makeStreamStatusTraceAirbyteMessage(
fun makeStreamStatusTraceAirbyteMessage(
airbyteStreamStatusHolder: AirbyteStreamStatusHolder
): AirbyteMessage {
return makeAirbyteMessageFromTraceMessage(airbyteStreamStatusHolder.toTraceMessage())
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.35.8
version=0.35.11
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.SyncMode
import io.airbyte.protocol.models.v0.SyncMode.*
import io.github.oshai.kotlinlogging.KotlinLogging
import java.sql.Connection
import java.sql.PreparedStatement
Expand Down Expand Up @@ -126,31 +127,45 @@ abstract class AbstractJdbcSource<Datatype>(
syncMode: SyncMode,
cursorField: Optional<String>
): AutoCloseableIterator<AirbyteMessage> {
if (
supportResumableFullRefresh(database, airbyteStream) &&
syncMode == SyncMode.FULL_REFRESH
) {
if (supportResumableFullRefresh(database, airbyteStream) && syncMode == FULL_REFRESH) {
val initialLoadHandler =
getInitialLoadHandler(database, airbyteStream, catalog, stateManager)
?: throw IllegalStateException(
"Must provide initialLoadHandler for resumable full refresh."
)
return initialLoadHandler.getIteratorForStream(airbyteStream, table, Instant.now())
return augmentWithStreamStatus(
airbyteStream,
initialLoadHandler.getIteratorForStream(airbyteStream, table, Instant.now())
)
}

// If flag is off, fall back to legacy non-resumable refresh
return super.getFullRefreshStream(
database,
airbyteStream,
catalog,
stateManager,
namespace,
selectedDatabaseFields,
table,
emittedAt,
syncMode,
cursorField,
)
var iterator =
super.getFullRefreshStream(
database,
airbyteStream,
catalog,
stateManager,
namespace,
selectedDatabaseFields,
table,
emittedAt,
syncMode,
cursorField,
)

return when (airbyteStream.syncMode) {
FULL_REFRESH -> augmentWithStreamStatus(airbyteStream, iterator)
else -> iterator
}
}

open fun augmentWithStreamStatus(
airbyteStream: ConfiguredAirbyteStream,
streamItrator: AutoCloseableIterator<AirbyteMessage>
): AutoCloseableIterator<AirbyteMessage> {
// no-op
return streamItrator
}

override fun queryTableFullRefresh(
Expand Down Expand Up @@ -192,9 +207,7 @@ abstract class AbstractJdbcSource<Datatype>(
// if the connector emits intermediate states, the incremental query
// must be sorted by the cursor
// field
if (
syncMode == SyncMode.INCREMENTAL && stateEmissionFrequency > 0
) {
if (syncMode == INCREMENTAL && stateEmissionFrequency > 0) {
val quotedCursorField: String =
enquoteIdentifier(cursorField.get(), quoteString)
sql.append(" ORDER BY $quotedCursorField ASC")
Expand Down Expand Up @@ -704,7 +717,7 @@ abstract class AbstractJdbcSource<Datatype>(
HashSet(Sets.difference(allStreams, alreadySyncedStreams))

return catalog.streams
.filter { c: ConfiguredAirbyteStream -> c.syncMode == SyncMode.INCREMENTAL }
.filter { c: ConfiguredAirbyteStream -> c.syncMode == INCREMENTAL }
.filter { stream: ConfiguredAirbyteStream ->
newlyAddedStreams.contains(
AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.stream)
Expand Down Expand Up @@ -737,4 +750,27 @@ abstract class AbstractJdbcSource<Datatype>(
return result
}
}

override fun createReadIterator(
database: JdbcDatabase,
airbyteStream: ConfiguredAirbyteStream,
catalog: ConfiguredAirbyteCatalog?,
table: TableInfo<CommonField<Datatype>>,
stateManager: StateManager?,
emittedAt: Instant
): AutoCloseableIterator<AirbyteMessage> {
val iterator =
super.createReadIterator(
database,
airbyteStream,
catalog,
table,
stateManager,
emittedAt
)
return when (airbyteStream.syncMode) {
INCREMENTAL -> augmentWithStreamStatus(airbyteStream, iterator)
else -> iterator
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ protected constructor(driverClassName: String) :
* @param emittedAt Time when data was emitted from the Source database
* @return
*/
private fun createReadIterator(
protected open fun createReadIterator(
database: Database,
airbyteStream: ConfiguredAirbyteStream,
catalog: ConfiguredAirbyteCatalog?,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.source.relationaldb.streamstatus

import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility.makeStreamStatusTraceAirbyteMessage
import io.airbyte.commons.stream.AirbyteStreamStatusHolder
import io.airbyte.commons.util.AutoCloseableIterator
import io.airbyte.protocol.models.v0.AirbyteMessage
import org.slf4j.Logger
import org.slf4j.LoggerFactory

class StreamStatusTraceEmitterIterator(private val statusHolder: AirbyteStreamStatusHolder) :
AutoCloseableIterator<AirbyteMessage?> {
private var emitted = false

override fun hasNext(): Boolean {
return !emitted
}

override fun next(): AirbyteMessage {
emitted = true
return makeStreamStatusTraceAirbyteMessage(statusHolder)
}

@Throws(Exception::class)
override fun close() {
// no-op
}

override fun remove() {
// no-op
}

companion object {
private val LOGGER: Logger =
LoggerFactory.getLogger(StreamStatusTraceEmitterIterator::class.java)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,20 @@ import io.airbyte.cdk.integrations.util.HostPortResolver.resolveHost
import io.airbyte.cdk.integrations.util.HostPortResolver.resolvePort
import io.airbyte.cdk.testutils.TestDatabase
import io.airbyte.commons.json.Jsons
import io.airbyte.commons.util.MoreIterators
import io.airbyte.protocol.models.Field
import io.airbyte.protocol.models.JsonSchemaType
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStateMessage
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
import io.airbyte.protocol.models.v0.CatalogHelpers
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.airbyte.protocol.models.v0.SyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.sql.JDBCType
import java.util.function.Consumer
import java.util.function.Supplier
import java.util.stream.Stream
import org.jooq.SQLDialect
Expand Down Expand Up @@ -196,6 +207,134 @@ internal class DefaultJdbcSourceAcceptanceTest :
}
}

@Throws(Exception::class)
override fun incrementalCursorCheck(
initialCursorField: String?,
cursorField: String,
initialCursorValue: String?,
endCursorValue: String?,
expectedRecordMessages: List<AirbyteMessage>,
airbyteStream: ConfiguredAirbyteStream
) {
airbyteStream.syncMode = SyncMode.INCREMENTAL
airbyteStream.cursorField = java.util.List.of(cursorField)
airbyteStream.destinationSyncMode = DestinationSyncMode.APPEND

val configuredCatalog =
ConfiguredAirbyteCatalog().withStreams(java.util.List.of(airbyteStream))

val dbStreamState = buildStreamState(airbyteStream, initialCursorField, initialCursorValue)

val actualMessages =
MoreIterators.toList(
source()!!.read(
config(),
configuredCatalog,
Jsons.jsonNode(createState(java.util.List.of(dbStreamState))),
),
)

setEmittedAtToNull(actualMessages)

val expectedStreams =
java.util.List.of(buildStreamState(airbyteStream, cursorField, endCursorValue))

val expectedMessages: MutableList<AirbyteMessage> = ArrayList(expectedRecordMessages)
expectedMessages.addAll(
createExpectedTestMessages(expectedStreams, expectedRecordMessages.size.toLong()),
)

setTraceEmittedAtToNull(actualMessages)
setTraceEmittedAtToNull(expectedMessages)
Assertions.assertEquals(expectedMessages.size, actualMessages.size)
Assertions.assertTrue(expectedMessages.containsAll(actualMessages))
Assertions.assertTrue(actualMessages.containsAll(expectedMessages))
}

override open fun assertStreamStatusTraceMessageIndex(
idx: Int,
allMessages: List<AirbyteMessage>,
expectedStreamStatus: AirbyteStreamStatusTraceMessage
) {
// no-op
}

@Test
@Throws(Exception::class)
override fun testReadOneColumn() {
val catalog =
CatalogHelpers.createConfiguredAirbyteCatalog(
streamName(),
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.NUMBER),
)
val actualMessages = MoreIterators.toList(source().read(config(), catalog, null))

setEmittedAtToNull(actualMessages)

val expectedMessages: MutableList<AirbyteMessage> = airbyteMessagesReadOneColumn

Assertions.assertEquals(expectedMessages.size, actualMessages.size)
Assertions.assertTrue(expectedMessages.containsAll(actualMessages))
Assertions.assertTrue(actualMessages.containsAll(expectedMessages))
}

@Test
@Throws(Exception::class)
override fun testReadOneTableIncrementallyTwice() {
val config = config()
val namespace = defaultNamespace
val configuredCatalog = getConfiguredCatalogWithOneStream(namespace)
configuredCatalog.streams.forEach(
Consumer { airbyteStream: ConfiguredAirbyteStream ->
airbyteStream.syncMode = SyncMode.INCREMENTAL
airbyteStream.cursorField = java.util.List.of(COL_ID)
airbyteStream.destinationSyncMode = DestinationSyncMode.APPEND
},
)

val actualMessagesFirstSync =
MoreIterators.toList(
source()!!.read(
config,
configuredCatalog,
createEmptyState(streamName(), namespace),
),
)

val stateAfterFirstSyncOptional =
actualMessagesFirstSync
.filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.STATE }
.first()

executeStatementReadIncrementallyTwice()

val actualMessagesSecondSync =
MoreIterators.toList(
source()!!.read(
config,
configuredCatalog,
extractState(stateAfterFirstSyncOptional),
),
)

Assertions.assertEquals(
2,
actualMessagesSecondSync
.filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.RECORD }
.count()
.toInt(),
)
val expectedMessages: MutableList<AirbyteMessage> =
getExpectedAirbyteMessagesSecondSync(namespace)

setEmittedAtToNull(actualMessagesSecondSync)

Assertions.assertEquals(expectedMessages.size, actualMessagesSecondSync.size)
Assertions.assertTrue(expectedMessages.containsAll(actualMessagesSecondSync))
Assertions.assertTrue(actualMessagesSecondSync.containsAll(expectedMessages))
}

companion object {
private lateinit var PSQL_CONTAINER: PostgreSQLContainer<*>

Expand Down
Loading

0 comments on commit 7bd9423

Please sign in to comment.