Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte authored and nurikk-sa committed Apr 4, 2024
1 parent 23ad87d commit abf84f8
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 167 deletions.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.28.13
version=0.28.14

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,13 @@ import java.sql.SQLException
import java.sql.Timestamp
import java.time.Instant
import java.util.*
import java.util.function.Consumer
import org.apache.commons.csv.CSVFormat
import org.apache.commons.csv.CSVPrinter

abstract class JdbcSqlOperations : SqlOperations {
// this adapter modifies record message before inserting them to the destination
protected val dataAdapter: Optional<DataAdapter>
protected val schemaSet: MutableSet<String?> = HashSet()

protected constructor() {
this.dataAdapter = Optional.empty()
}

protected constructor(dataAdapter: DataAdapter) {
this.dataAdapter = Optional.of(dataAdapter)
}
protected constructor() {}

@Throws(Exception::class)
override fun createSchemaIfNotExists(database: JdbcDatabase?, schemaName: String?) {
Expand Down Expand Up @@ -146,11 +137,8 @@ abstract class JdbcSqlOperations : SqlOperations {
CSVPrinter(writer, CSVFormat.DEFAULT).use { csvPrinter ->
for (record in records) {
val uuid = UUID.randomUUID().toString()
// TODO we only need to do this is formatData is overridden. If not, we can just
// do jsonData =
// record.getSerialized()
val jsonData =
Jsons.serialize(formatData(Jsons.deserializeExact(record.serialized)))

val jsonData = record.serialized
val airbyteMeta = Jsons.serialize(record.record!!.meta)
val extractedAt =
Timestamp.from(Instant.ofEpochMilli(record.record!!.emittedAt))
Expand Down Expand Up @@ -233,15 +221,6 @@ abstract class JdbcSqlOperations : SqlOperations {
schemaName: String?,
tableName: String?
) {
dataAdapter.ifPresent { adapter: DataAdapter ->
records!!.forEach(
Consumer { airbyteRecordMessage: PartialAirbyteMessage? ->
val data = Jsons.deserializeExact(airbyteRecordMessage!!.serialized)
adapter.adapt(data)
airbyteRecordMessage.serialized = Jsons.serialize(data)
}
)
}
if (isDestinationV2) {
insertRecordsInternalV2(database, records, schemaName, tableName)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.time.Instant
import java.time.OffsetDateTime
import java.time.temporal.ChronoUnit
import java.util.*
import java.util.HashMap
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.function.Predicate
Expand Down Expand Up @@ -254,12 +255,18 @@ abstract class JdbcDestinationHandler<DestinationState>(
// This is to handle any destinations that upcase the column names.
// For example - Snowflake with QUOTED_IDENTIFIERS_IGNORE_CASE=TRUE.
val record = recordJson as ObjectNode
record.fieldNames().forEachRemaining { fieldName: String ->
record.set<JsonNode>(
fieldName.lowercase(Locale.getDefault()),
record[fieldName]
)
val newFields: HashMap<String, JsonNode> = HashMap()

val it = record.fieldNames()
while (it.hasNext()) {
val fieldName = it.next()
// We can't directly call record.set here, because that will raise a
// ConcurrentModificationException on the fieldnames iterator.
// Instead, build up a map of new fields and set them all at once.
newFields.put(fieldName.lowercase(Locale.getDefault()), record[fieldName])
}

record.setAll<JsonNode>(newFields)
}
.collect(
toMap(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase
import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.db.jdbc.JdbcUtils
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.commons.text.Names
import io.airbyte.integrations.base.destination.typing_deduping.BaseTypingDedupingTest
import io.airbyte.integrations.base.destination.typing_deduping.StreamId.Companion.concatenateRawTableName
import javax.sql.DataSource
Expand All @@ -23,7 +24,7 @@ import org.jooq.impl.DSL
* anything. At some point we might (?) want to do a refactor to combine them.
*/
abstract class JdbcTypingDedupingTest : BaseTypingDedupingTest() {
private var database: JdbcDatabase? = null
protected var database: JdbcDatabase? = null
private var dataSource: DataSource? = null

protected abstract val baseConfig: ObjectNode
Expand Down Expand Up @@ -83,7 +84,11 @@ abstract class JdbcTypingDedupingTest : BaseTypingDedupingTest() {
if (streamNamespace == null) {
streamNamespace = getDefaultSchema(config!!)
}
val tableName = concatenateRawTableName(streamNamespace, streamName!!)
val tableName =
concatenateRawTableName(
streamNamespace,
Names.toAlphanumericAndUnderscore(streamName!!)
)
val schema = rawSchema
return database!!.queryJsons(DSL.selectFrom(DSL.name(schema, tableName)).sql)
}
Expand All @@ -97,7 +102,10 @@ abstract class JdbcTypingDedupingTest : BaseTypingDedupingTest() {
if (streamNamespace == null) {
streamNamespace = getDefaultSchema(config!!)
}
return database!!.queryJsons(DSL.selectFrom(DSL.name(streamNamespace, streamName)).sql)
return database!!.queryJsons(
DSL.selectFrom(DSL.name(streamNamespace, Names.toAlphanumericAndUnderscore(streamName)))
.sql
)
}

@Throws(Exception::class)
Expand Down

0 comments on commit abf84f8

Please sign in to comment.