Skip to content

Commit

Permalink
convert #36466 to kotlin
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Mar 27, 2024
1 parent 7b8b981 commit 52ed312
Showing 1 changed file with 23 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,6 @@ import io.airbyte.commons.json.Jsons
import io.airbyte.integrations.base.destination.typing_deduping.*
import io.airbyte.integrations.base.destination.typing_deduping.Struct
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair
import java.sql.*
import java.time.Instant
import java.time.OffsetDateTime
import java.time.temporal.ChronoUnit
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.function.Predicate
import java.util.stream.Collectors.toMap
import lombok.extern.slf4j.Slf4j
import org.jooq.Condition
import org.jooq.DSLContext
Expand All @@ -37,6 +28,17 @@ import org.jooq.impl.DSL.quotedName
import org.jooq.impl.SQLDataType
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.sql.*
import java.time.Instant
import java.time.OffsetDateTime
import java.time.temporal.ChronoUnit
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.function.Predicate
import java.util.stream.Collectors.toMap
import java.util.HashMap


@Slf4j
abstract class JdbcDestinationHandler<DestinationState>(
Expand Down Expand Up @@ -254,12 +256,19 @@ abstract class JdbcDestinationHandler<DestinationState>(
// This is to handle any destinations that upcase the column names.
// For example - Snowflake with QUOTED_IDENTIFIERS_IGNORE_CASE=TRUE.
val record = recordJson as ObjectNode
record.fieldNames().forEachRemaining { fieldName: String ->
record.set<JsonNode>(
fieldName.lowercase(Locale.getDefault()),
record[fieldName]
)
val newFields: HashMap<String, JsonNode> = HashMap()

val it = record.fieldNames()
while (it.hasNext()) {
val fieldName = it.next()
// We can't directly call record.set here, because that will raise a
// ConcurrentModificationException on the fieldnames iterator.
// Instead, build up a map of new fields and set them all at once.
newFields.put(fieldName.lowercase(Locale.getDefault()), record[fieldName])
}


record.setAll<JsonNode>(newFields)
}
.collect(
toMap(
Expand Down

0 comments on commit 52ed312

Please sign in to comment.