Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove log4j from java cdk #38583

Merged
merged 1 commit into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import io.airbyte.cdk.integrations.destination.jdbc.copy.StreamCopier
import io.airbyte.commons.json.Jsons
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.io.*
import java.nio.charset.StandardCharsets
import java.sql.SQLException
Expand All @@ -26,8 +27,8 @@ import java.util.*
import java.util.function.Consumer
import org.apache.commons.csv.CSVFormat
import org.apache.commons.csv.CSVPrinter
import org.slf4j.Logger
import org.slf4j.LoggerFactory

private val LOGGER = KotlinLogging.logger {}

abstract class AzureBlobStorageStreamCopier(
protected val stagingFolder: String,
Expand Down Expand Up @@ -104,38 +105,32 @@ abstract class AzureBlobStorageStreamCopier(

@Throws(Exception::class)
override fun closeStagingUploader(hasFailed: Boolean) {
LOGGER.info("Uploading remaining data for {} stream.", streamName)
LOGGER.info { "Uploading remaining data for $streamName stream." }
for (csvPrinter in csvPrinters.values) {
csvPrinter.close()
}
LOGGER.info("All data for {} stream uploaded.", streamName)
LOGGER.info { "All data for $streamName stream uploaded." }
}

@Throws(Exception::class)
override fun createDestinationSchema() {
LOGGER.info("Creating schema in destination if it doesn't exist: {}", schemaName)
LOGGER.info { "Creating schema in destination if it doesn't exist: $schemaName" }
sqlOperations.createSchemaIfNotExists(db, schemaName)
}

@Throws(Exception::class)
override fun createTemporaryTable() {
LOGGER.info(
"Preparing tmp table in destination for stream: {}, schema: {}, tmp table name: {}.",
streamName,
schemaName,
tmpTableName
)
LOGGER.info {
"Preparing tmp table in destination for stream: $streamName, schema: $schemaName, tmp table name: $tmpTableName."
}
sqlOperations.createTableIfNotExists(db, schemaName, tmpTableName)
}

@Throws(Exception::class)
override fun copyStagingFileToTemporaryTable() {
LOGGER.info(
"Starting copy to tmp table: {} in destination for stream: {}, schema: {}.",
tmpTableName,
streamName,
schemaName
)
LOGGER.info {
"Starting copy to tmp table: $tmpTableName in destination for stream: $streamName, schema: $schemaName."
}
for (azureStagingFile in azureStagingFiles) {
copyAzureBlobCsvFileIntoTable(
db,
Expand All @@ -145,11 +140,9 @@ abstract class AzureBlobStorageStreamCopier(
azureBlobConfig
)
}
LOGGER.info(
"Copy to tmp table {} in destination for stream {} complete.",
tmpTableName,
streamName
)
LOGGER.info {
"Copy to tmp table $tmpTableName in destination for stream $streamName complete."
}
}

private fun getFullAzurePath(azureStagingFile: String?): String {
Expand All @@ -166,50 +159,45 @@ abstract class AzureBlobStorageStreamCopier(
@Throws(Exception::class)
override fun createDestinationTable(): String? {
@Suppress("DEPRECATION") val destTableName = nameTransformer.getRawTableName(streamName)
LOGGER.info("Preparing table {} in destination.", destTableName)
LOGGER.info { "Preparing table $destTableName in destination." }
sqlOperations.createTableIfNotExists(db, schemaName, destTableName)
LOGGER.info("Table {} in destination prepared.", tmpTableName)
LOGGER.info { "Table $tmpTableName in destination prepared." }

return destTableName
}

@Throws(Exception::class)
override fun generateMergeStatement(destTableName: String?): String {
LOGGER.info(
"Preparing to merge tmp table {} to dest table: {}, schema: {}, in destination.",
tmpTableName,
destTableName,
schemaName
)
LOGGER.info {
"Preparing to merge tmp table $tmpTableName to dest table: $destTableName, schema: $schemaName, in destination."
}
val queries = StringBuilder()
if (destSyncMode == DestinationSyncMode.OVERWRITE) {
queries.append(sqlOperations.truncateTableQuery(db, schemaName, destTableName))
LOGGER.info(
"Destination OVERWRITE mode detected. Dest table: {}, schema: {}, truncated.",
destTableName,
schemaName
)
LOGGER.info {
"Destination OVERWRITE mode detected. Dest table: $destTableName, schema: $schemaName, truncated."
}
}
queries.append(sqlOperations.insertTableQuery(db, schemaName, tmpTableName, destTableName))
return queries.toString()
}

@Throws(Exception::class)
override fun removeFileAndDropTmpTable() {
LOGGER.info("Begin cleaning azure blob staging files.")
LOGGER.info { "Begin cleaning azure blob staging files." }
for (appendBlobClient in blobClients.values) {
appendBlobClient.delete()
}
LOGGER.info("Azure Blob staging files cleaned.")
LOGGER.info { "Azure Blob staging files cleaned." }

LOGGER.info("Begin cleaning {} tmp table in destination.", tmpTableName)
LOGGER.info { "Begin cleaning $tmpTableName tmp table in destination." }
sqlOperations.dropTableIfExists(db, schemaName, tmpTableName)
LOGGER.info("{} tmp table in destination cleaned.", tmpTableName)
LOGGER.info { "$tmpTableName tmp table in destination cleaned." }
}

@Throws(Exception::class)
override fun closeNonCurrentStagingFileWriters() {
LOGGER.info("Begin closing non current file writers")
LOGGER.info { "Begin closing non current file writers" }
val removedKeys: MutableSet<String> = HashSet()
for (key in activeStagingWriterFileNames) {
if (key != currentFile) {
Expand All @@ -231,8 +219,7 @@ abstract class AzureBlobStorageStreamCopier(
)

companion object {
private val LOGGER: Logger =
LoggerFactory.getLogger(AzureBlobStorageStreamCopier::class.java)

fun attemptAzureBlobWriteAndDelete(config: AzureBlobStorageConfig) {
var appendBlobClient: AppendBlobClient? = null
try {
Expand All @@ -249,7 +236,7 @@ abstract class AzureBlobStorageStreamCopier(
listCreatedBlob(containerClient)
} finally {
if (appendBlobClient != null && appendBlobClient.exists()) {
LOGGER.info("Deleting blob: " + appendBlobClient.blobName)
LOGGER.info { "Deleting blob: ${appendBlobClient.blobName}" }
appendBlobClient.delete()
}
}
Expand All @@ -260,16 +247,14 @@ abstract class AzureBlobStorageStreamCopier(
.listBlobs()
.forEach(
Consumer { blobItem: BlobItem ->
LOGGER.info(
"Blob name: " + blobItem.name + "Snapshot: " + blobItem.snapshot
)
LOGGER.info { "Blob name: ${blobItem.name} Snapshot: ${blobItem.snapshot}" }
}
)
}

private fun writeTestDataIntoBlob(appendBlobClient: AppendBlobClient?) {
val test = "test_data"
LOGGER.info("Writing test data to Azure Blob storage: $test")
LOGGER.info { "Writing test data to Azure Blob storage: $test" }
val dataStream: InputStream =
ByteArrayInputStream(test.toByteArray(StandardCharsets.UTF_8))

Expand All @@ -278,7 +263,7 @@ abstract class AzureBlobStorageStreamCopier(
.appendBlock(dataStream, test.length.toLong())
.blobCommittedBlockCount

LOGGER.info("blobCommittedBlockCount: $blobCommittedBlockCount")
LOGGER.info { "blobCommittedBlockCount: $blobCommittedBlockCount" }
}

private fun getBlobContainerClient(
Expand All @@ -291,9 +276,9 @@ abstract class AzureBlobStorageStreamCopier(

if (!appendBlobClient.exists()) {
appendBlobClient.create()
LOGGER.info("blobContainerClient created")
LOGGER.info { "blobContainerClient created" }
} else {
LOGGER.info("blobContainerClient already exists")
LOGGER.info { "blobContainerClient already exists" }
}
return containerClient
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility
import io.airbyte.commons.json.Jsons
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import io.github.oshai.kotlinlogging.KotlinLogging
import java.math.BigDecimal
import java.sql.*
import java.sql.Date
Expand All @@ -21,16 +22,12 @@ import java.time.*
import java.time.chrono.IsoEra
import java.time.format.DateTimeParseException
import java.util.*
import org.slf4j.Logger
import org.slf4j.LoggerFactory

private val LOGGER = KotlinLogging.logger {}
/** Source operation skeleton for JDBC compatible databases. */
abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
JdbcCompatibleSourceOperations<Datatype> {

private val LOGGER: Logger =
LoggerFactory.getLogger(AbstractJdbcCompatibleSourceOperations::class.java)

@Throws(SQLException::class)
override fun convertDatabaseRowToAirbyteRecordData(queryContext: ResultSet): AirbyteRecordData {
// the first call communicates with the database. after that the result is cached.
Expand All @@ -47,12 +44,9 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
copyToJsonField(queryContext, i, jsonNode)
} catch (e: java.lang.Exception) {
jsonNode.putNull(columnName)
LOGGER.info(
"Failed to serialize column: {}, of type {}, with error {}",
columnName,
columnTypeName,
e.message
)
LOGGER.info {
"Failed to serialize column: $columnName, of type $columnTypeName, with error ${e.message}"
}
AirbyteTraceMessageUtility.emitAnalyticsTrace(dataTypesSerializationErrorMessage())
metaChanges.add(
AirbyteRecordMessageMetaChange()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@ package io.airbyte.cdk.db.jdbc

import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.db.DataTypeUtils
import io.github.oshai.kotlinlogging.KotlinLogging
import java.sql.*
import java.time.*
import java.time.format.DateTimeFormatter
import java.util.concurrent.*
import kotlin.math.abs
import kotlin.math.min
import org.slf4j.Logger
import org.slf4j.LoggerFactory

private val LOGGER = KotlinLogging.logger {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious: is it preferable to declare the logger outside the class?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is a question i have too. I'm inconsistently doing it inside the class and only outside when I need it in companion object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's either outside the class or inside the companion object. Definitely not inside the class, as that would create a new instance for each class instance


object DateTimeConverter {
private val LOGGER: Logger = LoggerFactory.getLogger(DateTimeConverter::class.java)

val TIME_WITH_TIMEZONE_FORMATTER: DateTimeFormatter =
DateTimeFormatter.ofPattern(
"HH:mm:ss[.][SSSSSSSSS][SSSSSSS][SSSSSS][SSSSS][SSSS][SSS][SS][S][''][XXX][XX][X]"
Expand All @@ -34,7 +35,7 @@ object DateTimeConverter {
else time.toString()
} else {
if (!loggedUnknownTimeWithTimeZoneClass) {
LOGGER.info("Unknown class for Time with timezone data type" + time.javaClass)
LOGGER.info { "Unknown class for Time with timezone data type ${time.javaClass}" }
loggedUnknownTimeWithTimeZoneClass = true
}
val timetz = OffsetTime.parse(time.toString(), TIME_WITH_TIMEZONE_FORMATTER)
Expand Down Expand Up @@ -78,9 +79,9 @@ object DateTimeConverter {
return AbstractJdbcCompatibleSourceOperations.Companion.resolveEra(localDate, value)
} else {
if (!loggedUnknownTimestampWithTimeZoneClass) {
LOGGER.info(
"Unknown class for Timestamp with time zone data type" + timestamp.javaClass
)
LOGGER.info {
"Unknown class for Timestamp with time zone data type ${timestamp.javaClass}"
}
loggedUnknownTimestampWithTimeZoneClass = true
}
val instant = Instant.parse(timestamp.toString())
Expand Down Expand Up @@ -123,7 +124,7 @@ object DateTimeConverter {
)
} else {
if (!loggedUnknownTimestampClass) {
LOGGER.info("Unknown class for Timestamp data type" + timestamp.javaClass)
LOGGER.info { "Unknown class for Timestamp data type ${timestamp.javaClass}" }
loggedUnknownTimestampClass = true
}
val localDateTime = LocalDateTime.parse(timestamp.toString())
Expand Down Expand Up @@ -158,7 +159,7 @@ object DateTimeConverter {
return LocalDate.ofEpochDay(date.toLong()).format(DataTypeUtils.DATE_FORMATTER)
} else {
if (!loggedUnknownDateClass) {
LOGGER.info("Unknown class for Date data type" + date.javaClass)
LOGGER.info { "Unknown class for Date data type${date.javaClass}" }
loggedUnknownDateClass = true
}
val localDate = LocalDate.parse(date.toString())
Expand All @@ -182,22 +183,22 @@ object DateTimeConverter {
} else {
val updatedValue =
min(abs(value.toDouble()), LocalTime.MAX.toNanoOfDay().toDouble()).toLong()
LOGGER.debug(
"Time values must use number of nanoseconds greater than 0 and less than 86400000000000 but its {}, converting to {} ",
value,
updatedValue
)
LOGGER.debug {
"Time values must use number of nanoseconds greater than 0 and less than 86400000000000 but its $value, converting to $updatedValue "
}
return formatTime(LocalTime.ofNanoOfDay(updatedValue))
}
} else {
if (!loggedUnknownTimeClass) {
LOGGER.info("Unknown class for Time data type" + time.javaClass)
LOGGER.info { "Unknown class for Time data type ${time.javaClass}" }
loggedUnknownTimeClass = true
}

val valueAsString = time.toString()
if (valueAsString.startsWith("24")) {
LOGGER.debug("Time value {} is above range, converting to 23:59:59", valueAsString)
LOGGER.debug {
"Time value ${valueAsString} is above range, converting to 23:59:59"
}
return LocalTime.MAX.toString()
}
return formatTime(LocalTime.parse(valueAsString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import io.airbyte.cdk.db.JdbcCompatibleSourceOperations
import io.airbyte.commons.exceptions.ConnectionErrorException
import io.airbyte.commons.functional.CheckedConsumer
import io.airbyte.commons.functional.CheckedFunction
import io.github.oshai.kotlinlogging.KotlinLogging
import java.sql.*
import java.util.*
import java.util.function.Function
import java.util.stream.Stream
import javax.sql.DataSource
import org.slf4j.Logger
import org.slf4j.LoggerFactory

private val LOGGER = KotlinLogging.logger {}
/**
* Database object for interacting with a JDBC connection. Can be used for any JDBC compliant db.
*/
Expand Down Expand Up @@ -50,15 +50,13 @@ constructor(
): Stream<T> {
val connection = dataSource.connection
return JdbcDatabase.Companion.toUnsafeStream<T>(query.apply(connection), recordTransform)
.onClose(
Runnable {
try {
connection.close()
} catch (e: SQLException) {
throw RuntimeException(e)
}
.onClose {
try {
connection.close()
} catch (e: SQLException) {
throw RuntimeException(e)
}
)
}
}

@get:Throws(SQLException::class)
Expand Down Expand Up @@ -125,16 +123,12 @@ constructor(
.onClose(
Runnable {
try {
LOGGER.info("closing connection")
LOGGER.info { "closing connection" }
connection.close()
} catch (e: SQLException) {
throw RuntimeException(e)
}
}
)
}

companion object {
private val LOGGER: Logger = LoggerFactory.getLogger(DefaultJdbcDatabase::class.java)
}
}
Loading
Loading