Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

convert #36396 to kotlin #36473

Merged
merged 1 commit into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ interface JdbcCompatibleSourceOperations<SourceType> : SourceOperations<ResultSe
preparedStatement: PreparedStatement,
parameterIndex: Int,
cursorFieldType: SourceType?,
value: String
value: String?
)

/** Determine the database specific type of the input field based on its column metadata. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class JdbcSourceOperations :
preparedStatement: PreparedStatement,
parameterIndex: Int,
cursorFieldType: JDBCType?,
value: String
value: String?
) {
when (cursorFieldType) {
JDBCType.TIMESTAMP -> setTimestamp(preparedStatement, parameterIndex, value)
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.28.12
version=0.28.13
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.airbyte.cdk.integrations.destination.jdbc.typing_deduping

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition
Expand All @@ -23,16 +24,16 @@ import java.time.temporal.ChronoUnit
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.function.Function
import java.util.function.Predicate
import java.util.stream.Collectors
import kotlin.collections.LinkedHashMap
import java.util.stream.Collectors.toMap
import lombok.extern.slf4j.Slf4j
import org.jooq.Condition
import org.jooq.DSLContext
import org.jooq.SQLDialect
import org.jooq.conf.ParamType
import org.jooq.impl.DSL
import org.jooq.impl.DSL.field
import org.jooq.impl.DSL.quotedName
import org.jooq.impl.SQLDataType
import org.slf4j.Logger
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -207,60 +208,73 @@ abstract class JdbcDestinationHandler<DestinationState>(
@get:Throws(SQLException::class)
protected val allDestinationStates: Map<AirbyteStreamNameNamespacePair, DestinationState>
get() {

// Guarantee the table exists.
jdbcDatabase.execute(
dslContext
.createTableIfNotExists(
DSL.quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME)
)
.column(
DSL.quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME),
SQLDataType.VARCHAR
quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME)
)
.column(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME), SQLDataType.VARCHAR)
.column(
DSL.quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE),
quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE),
SQLDataType.VARCHAR
) // Just use a string type, even if the destination has a json type.
// We're never going to query this column in a fancy way - all our processing
// can happen
// client-side.
.column(
DSL.quotedName(DESTINATION_STATE_TABLE_COLUMN_STATE),
quotedName(DESTINATION_STATE_TABLE_COLUMN_STATE),
SQLDataType.VARCHAR
) // Add an updated_at field. We don't actually need it yet, but it can't hurt!
.column(
DSL.quotedName(DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT),
quotedName(DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT),
SQLDataType.TIMESTAMPWITHTIMEZONE
)
.getSQL(ParamType.INLINED)
)

// Fetch all records from it. We _could_ filter down to just our streams... but meh.
// This is small
// data.
return jdbcDatabase
.queryJsons(
dslContext
.select(
DSL.field(DSL.quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME)),
DSL.field(DSL.quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)),
DSL.field(DSL.quotedName(DESTINATION_STATE_TABLE_COLUMN_STATE))
field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME)),
field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)),
field(quotedName(DESTINATION_STATE_TABLE_COLUMN_STATE))
)
.from(DSL.quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME))
.sql
.from(quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME))
.getSQL()
)
.stream()
.peek { recordJson: JsonNode ->
// Forcibly downcase all key names.
// This is to handle any destinations that upcase the column names.
// For example - Snowflake with QUOTED_IDENTIFIERS_IGNORE_CASE=TRUE.
val record = recordJson as ObjectNode
record.fieldNames().forEachRemaining { fieldName: String ->
record.set<JsonNode>(
fieldName.lowercase(Locale.getDefault()),
record[fieldName]
)
}
}
.collect(
Collectors.toMap(
Function { record: JsonNode ->
val nameNode = record[DESTINATION_STATE_TABLE_COLUMN_NAME]
val namespaceNode = record[DESTINATION_STATE_TABLE_COLUMN_NAMESPACE]
toMap(
{ record ->
val nameNode: JsonNode = record.get(DESTINATION_STATE_TABLE_COLUMN_NAME)
val namespaceNode: JsonNode =
record.get(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)
AirbyteStreamNameNamespacePair(
nameNode?.asText(),
namespaceNode?.asText()
if (nameNode != null) nameNode.asText() else null,
if (namespaceNode != null) namespaceNode.asText() else null
)
},
Function { record: JsonNode ->
val stateNode = record[DESTINATION_STATE_TABLE_COLUMN_STATE]
{ record ->
val stateNode: JsonNode =
record.get(DESTINATION_STATE_TABLE_COLUMN_STATE)
val state =
if (stateNode != null) Jsons.deserialize(stateNode.asText())
else Jsons.emptyObject()
Expand Down

This file was deleted.

This file was deleted.

Loading