Skip to content

Commit

Permalink
Merge branch 'master' into leti/improve-hyperlinks-in-details-sections
Browse files Browse the repository at this point in the history
  • Loading branch information
letiescanciano authored Jul 23, 2024
2 parents 687e55f + 250c02d commit add8030
Show file tree
Hide file tree
Showing 171 changed files with 1,927 additions and 1,935 deletions.
8 changes: 8 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,14 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:-----------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.43.1 | 2024-07-22 | [\#41622](https://github.com/airbytehq/airbyte/pull/41622) | Fix null safety bug in debezium event processing |
| 0.43.0 | 2024-07-17 | [\#41954](https://github.com/airbytehq/airbyte/pull/41954) | fix refreshes for connectors using the old SqlOperations |
| 0.43.0 | 2024-07-17 | [\#42017](https://github.com/airbytehq/airbyte/pull/42017) | bump postgres-jdbc version |
| 0.43.0 | 2024-07-17 | [\#42015](https://github.com/airbytehq/airbyte/pull/42015) | wait until migration before creating the Writeconfig objects |
| 0.43.0 | 2024-07-17 | [\#41953](https://github.com/airbytehq/airbyte/pull/41953) | add generationId and syncId to SqlOperations functions |
| 0.43.0 | 2024-07-17 | [\#41952](https://github.com/airbytehq/airbyte/pull/41952) | rename and add fields in WriteConfig |
| 0.43.0 | 2024-07-17 | [\#41951](https://github.com/airbytehq/airbyte/pull/41951) | remove nullables in JdbcBufferedConsumerFactory |
| 0.43.0 | 2024-07-17 | [\#41950](https://github.com/airbytehq/airbyte/pull/41950) | remove unused classes|
| 0.42.2 | 2024-07-21 | [\#42122](https://github.com/airbytehq/airbyte/pull/42122) | Support for Debezium resync and shutdown scenarios. |
| 0.42.2 | 2024-07-04 | [\#40208](https://github.com/airbytehq/airbyte/pull/40208) | Implement a new connector error handling and translation framework |
| 0.41.8 | 2024-07-18 | [\#42068](https://github.com/airbytehq/airbyte/pull/42068) | Add analytics message for WASS occurrence. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ abstract class AzureBlobStorageStreamCopier(
}

@Throws(Exception::class)
override fun createDestinationTable(): String? {
override fun createDestinationTable(): String {
@Suppress("DEPRECATION") val destTableName = nameTransformer.getRawTableName(streamName)
LOGGER.info { "Preparing table $destTableName in destination." }
sqlOperations.createTableIfNotExists(db, schemaName, destTableName)
Expand All @@ -167,7 +167,7 @@ abstract class AzureBlobStorageStreamCopier(
}

@Throws(Exception::class)
override fun generateMergeStatement(destTableName: String?): String {
override fun generateMergeStatement(destTableName: String): String {
LOGGER.info {
"Preparing to merge tmp table $tmpTableName to dest table: $destTableName, schema: $schemaName, in destination."
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/java/airbyte-cdk/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ dependencies {
testImplementation testFixtures(project(':airbyte-cdk:java:airbyte-cdk:airbyte-cdk-datastore-postgres'))

testImplementation 'mysql:mysql-connector-java:8.0.33'
testImplementation 'org.postgresql:postgresql:42.6.0'
testImplementation 'org.postgresql:postgresql:42.7.3'
testImplementation 'org.testcontainers:mysql:1.19.0'
testImplementation 'org.testcontainers:postgresql:1.19.0'
testImplementation 'org.xbib.elasticsearch:joptsimple:6.3.2.1'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,27 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
@Throws(SQLException::class)
abstract fun execute(query: CheckedConsumer<Connection, SQLException>)

@Throws(SQLException::class)
/**
* We can't define a default parameter in the method below because "An overriding function is
* not allowed to specify default values for its parameters" in kotlin And the interface could
* have a default parameter, but is not allowed an @JvmOverload because it's abstract. So for
* java compat, we have 2 functions, the same way we would in java
*/
override fun execute(sql: String?) {
execute { connection: Connection -> connection.createStatement().execute(sql) }
execute(sql, true)
}

@Throws(SQLException::class)
fun execute(sql: String?, logStatements: Boolean) {
execute { connection: Connection ->
if (logStatements) {
LOGGER.info("executing statement: $sql")
}
connection.createStatement().execute(sql)
if (logStatements) {
LOGGER.info("statement successfully executed")
}
}
}

@Throws(SQLException::class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ interface SqlOperations {
* @param tableName Name of table
* @return Query
*/
fun truncateTableQuery(database: JdbcDatabase?, schemaName: String?, tableName: String?): String
fun truncateTableQuery(database: JdbcDatabase, schemaName: String, tableName: String): String

/**
* Insert records into table. Assumes the table exists.
Expand All @@ -96,7 +96,9 @@ interface SqlOperations {
database: JdbcDatabase,
records: List<PartialAirbyteMessage>,
schemaName: String?,
tableName: String?
tableName: String?,
syncId: Long,
generationId: Long,
)

/**
Expand All @@ -115,7 +117,7 @@ interface SqlOperations {
database: JdbcDatabase?,
schemaName: String?,
sourceTableName: String?,
destinationTableName: String?
destinationTableName: String?,
): String?

/**
Expand All @@ -130,6 +132,22 @@ interface SqlOperations {
/** Check if the data record is valid and ok to be written to destination */
fun isValidData(data: JsonNode?): Boolean

/**
* check if there's any row in table {@code rawNamespace.rawName} for which the value of the
* _airbyte_generation_id column is different from {@code generationId}
*
* @returns true if the table exists and contains such a row, false otherwise
*/
fun isOtherGenerationIdInTable(
database: JdbcDatabase,
generationId: Long,
rawNamespace: String,
rawName: String
): Boolean

/** overwrite the raw table with the temporary raw table */
fun overwriteRawTable(database: JdbcDatabase, rawNamespace: String, rawName: String)

/**
* Denotes whether the destination has the concept of schema or not
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ interface StreamCopier {
*
* @return the name of the destination table
*/
@Throws(Exception::class) fun createDestinationTable(): String?
@Throws(Exception::class) fun createDestinationTable(): String

/** Generates a merge SQL statement from the temporary table to the final table. */
@Throws(Exception::class) fun generateMergeStatement(destTableName: String?): String
@Throws(Exception::class) fun generateMergeStatement(destTableName: String): String

/**
* Cleans up the copier by removing the staging file and dropping the temporary table after
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.42.3
version=0.43.1
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,8 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
listOf(dummyRecord),
outputSchema,
outputTableName,
-1,
-1,
)
}
} finally {
Expand Down Expand Up @@ -456,7 +458,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
.withRecord(
PartialAirbyteRecordMessage()
.withStream("stream1")
.withEmittedAt(1602637589000L),
.withEmittedAt(1602637589000L)
)
.withSerialized(dummyDataToInsert.toString())
}
Expand Down
Loading

0 comments on commit add8030

Please sign in to comment.