Skip to content

Commit

Permalink
fix compiler errors
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Mar 23, 2024
1 parent 5381ce0 commit 8078674
Show file tree
Hide file tree
Showing 22 changed files with 52 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,5 @@ interface StreamCopier {
fun prepareStagingFile(): String?

/** @return current staging file name */
val currentFile: String
val currentFile: String?
}
12 changes: 12 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/gcs-destinations/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@ java {
}
}

compileKotlin {
compilerOptions {
allWarningsAsErrors = false
}
}

compileTestFixturesKotlin {
compilerOptions {
allWarningsAsErrors = false
}
}

dependencies {
implementation project(':airbyte-cdk:java:airbyte-cdk:dependencies')
implementation project(':airbyte-cdk:java:airbyte-cdk:core')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.airbyte.cdk.integrations.base.Destination
import io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage.getErrorMessage
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
import io.airbyte.cdk.integrations.destination.record_buffer.BufferStorage
import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer
import io.airbyte.cdk.integrations.destination.s3.S3BaseChecks.testMultipartUpload
import io.airbyte.cdk.integrations.destination.s3.S3BaseChecks.testSingleUpload
import io.airbyte.cdk.integrations.destination.s3.S3ConsumerFactory
Expand Down Expand Up @@ -64,7 +65,7 @@ abstract class BaseGcsDestination : BaseConnector(), Destination {
outputRecordCollector,
GcsStorageOperations(nameTransformer, gcsConfig.s3Client, gcsConfig),
nameTransformer,
getCreateFunction(gcsConfig, Function<String, BufferStorage> { fileExtension: String? -> FileBuffer(fileExtension) }),
getCreateFunction(gcsConfig, Function<String, BufferStorage> { fileExtension: String -> FileBuffer(fileExtension) }),
gcsConfig,
configuredCatalog)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,23 @@ import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations
class GcsDestinationConfig(bucketName: String?,
bucketPath: String?,
bucketRegion: String?,
val gcsCredentialConfig: GcsCredentialConfig?,
val gcsCredentialConfig: GcsCredentialConfig,
formatConfig: S3FormatConfig?) : S3DestinationConfig(GCS_ENDPOINT,
bucketName!!,
bucketPath!!,
bucketRegion,
S3DestinationConstants.DEFAULT_PATH_FORMAT,
gcsCredentialConfig.getS3CredentialConfig().orElseThrow(),
gcsCredentialConfig.s3CredentialConfig.orElseThrow(),
formatConfig!!,
null,
null,
false,
S3StorageOperations.DEFAULT_UPLOAD_THREADS) {
override fun createS3Client(): AmazonS3 {
when (gcsCredentialConfig!!.credentialType) {
when (gcsCredentialConfig.credentialType) {
GcsCredentialType.HMAC_KEY -> {
val hmacKeyCredential = gcsCredentialConfig as GcsHmacKeyCredentialConfig?
val awsCreds = BasicAWSCredentials(hmacKeyCredential.getHmacKeyAccessId(), hmacKeyCredential.getHmacKeySecret())
val hmacKeyCredential = gcsCredentialConfig as GcsHmacKeyCredentialConfig
val awsCreds = BasicAWSCredentials(hmacKeyCredential.hmacKeyAccessId, hmacKeyCredential.hmacKeySecret)

return AmazonS3ClientBuilder.standard()
.withEndpointConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.avro.AvroRecordFactory
import io.airbyte.cdk.integrations.destination.s3.avro.JsonToAvroSchemaConverter
import io.airbyte.cdk.integrations.destination.s3.avro.S3AvroFormatConfig
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create
import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
Expand Down Expand Up @@ -57,7 +58,7 @@ class GcsAvroWriter @JvmOverloads constructor(config: GcsDestinationConfig,

this.avroRecordFactory = AvroRecordFactory(schema, converter)
this.uploadManager = create(config.bucketName, outputPath, s3Client)
.setPartSize(DEFAULT_PART_SIZE_MB.toLong())
.setPartSize(StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB.toLong())
.get()
// We only need one output stream as we only have one input stream. This is reasonably performant.
this.outputStream = uploadManager.multiPartOutputStreams[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ import io.airbyte.cdk.integrations.destination.s3.credential.S3CredentialConfig
import java.util.*

interface GcsCredentialConfig : BlobStorageCredentialConfig<GcsCredentialType?> {
val s3CredentialConfig: Optional<S3CredentialConfig?>
val s3CredentialConfig: Optional<S3CredentialConfig>
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ class GcsHmacKeyCredentialConfig : GcsCredentialConfig {
override val credentialType: GcsCredentialType
get() = GcsCredentialType.HMAC_KEY

override val s3CredentialConfig: Optional<S3CredentialConfig?>
override val s3CredentialConfig: Optional<S3CredentialConfig>
get() = Optional.of(S3AccessKeyCredentialConfig(hmacKeyAccessId, hmacKeySecret))
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.csv.CsvSheetGenerator
import io.airbyte.cdk.integrations.destination.s3.csv.CsvSheetGenerator.Factory.create
import io.airbyte.cdk.integrations.destination.s3.csv.S3CsvFormatConfig
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create
import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
Expand Down Expand Up @@ -51,7 +52,7 @@ class GcsCsvWriter(config: GcsDestinationConfig,
outputPath)

this.uploadManager = create(config.bucketName, outputPath, s3Client)
.setPartSize(DEFAULT_PART_SIZE_MB.toLong())
.setPartSize(StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB.toLong())
.get()
// We only need one output stream as we only have one input stream. This is reasonably performant.
this.outputStream = uploadManager.multiPartOutputStreams[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,6 @@ abstract class GcsStreamCopier(protected val stagingFolder: String,
get() =// TODO need to update this method when updating whole class for using GcsWriter
null

@VisibleForTesting
fun getGcsStagingFiles(): Set<String> {
return gcsStagingFiles
}

@Throws(SQLException::class)
abstract fun copyGcsCsvFileIntoTable(database: JdbcDatabase?,
gcsFileLocation: String?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ abstract class GcsStreamCopierFactory : StreamCopierFactory<GcsConfig?> {
/**
* Used by the copy consumer.
*/
override fun create(configuredSchema: String?,
fun create(configuredSchema: String?,
gcsConfig: GcsConfig,
stagingFolder: String?,
configuredStream: ConfiguredAirbyteStream?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.google.common.collect.Lists
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.gcs.util.ConfigTestUtils
import io.airbyte.cdk.integrations.destination.s3.avro.S3AvroFormatConfig.Companion.parseCodecConfig
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create
import io.airbyte.commons.json.Jsons
import org.apache.avro.file.DataFileConstants
Expand Down Expand Up @@ -108,7 +109,7 @@ internal class GcsAvroFormatConfigTest {
.get()

val partSizeBytes = FieldUtils.readField(streamTransferManager, "partSize", true) as Int
Assertions.assertEquals(Constants.MB * DEFAULT_PART_SIZE_MB, partSizeBytes)
Assertions.assertEquals(Constants.MB * StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB, partSizeBytes)
}

@Test
Expand All @@ -126,6 +127,6 @@ internal class GcsAvroFormatConfigTest {
.get()

val partSizeBytes = FieldUtils.readField(streamTransferManager, "partSize", true) as Int
Assertions.assertEquals(Constants.MB * DEFAULT_PART_SIZE_MB, partSizeBytes)
Assertions.assertEquals(Constants.MB * StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB, partSizeBytes)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.gcs.util.ConfigTestUtils
import io.airbyte.cdk.integrations.destination.s3.util.Flattening
import io.airbyte.cdk.integrations.destination.s3.util.Flattening.Companion.fromValue
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create
import io.airbyte.commons.json.Jsons
import org.apache.commons.lang3.reflect.FieldUtils
Expand Down Expand Up @@ -45,7 +46,7 @@ class GcsCsvFormatConfigTest {
.get()

val partSizeBytes = FieldUtils.readField(streamTransferManager, "partSize", true) as Int
Assertions.assertEquals(Constants.MB * DEFAULT_PART_SIZE_MB, partSizeBytes)
Assertions.assertEquals(Constants.MB * StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB, partSizeBytes)
}

@Test
Expand All @@ -63,6 +64,6 @@ class GcsCsvFormatConfigTest {
.get()

val partSizeBytes = FieldUtils.readField(streamTransferManager, "partSize", true) as Int
Assertions.assertEquals(Constants.MB * DEFAULT_PART_SIZE_MB, partSizeBytes)
Assertions.assertEquals(Constants.MB * StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB, partSizeBytes)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package io.airbyte.cdk.integrations.destination.gcs.jsonl
import com.amazonaws.services.s3.internal.Constants
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.gcs.util.ConfigTestUtils
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create
import io.airbyte.commons.json.Jsons
import org.apache.commons.lang3.reflect.FieldUtils
Expand Down Expand Up @@ -33,7 +34,7 @@ class GcsJsonlFormatConfigTest {
.get()

val partSizeBytes = FieldUtils.readField(streamTransferManager, "partSize", true) as Int
Assertions.assertEquals(Constants.MB * DEFAULT_PART_SIZE_MB, partSizeBytes)
Assertions.assertEquals(Constants.MB * StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB, partSizeBytes)
}

@Test
Expand All @@ -51,6 +52,6 @@ class GcsJsonlFormatConfigTest {
.get()

val partSizeBytes = FieldUtils.readField(streamTransferManager, "partSize", true) as Int
Assertions.assertEquals(Constants.MB * DEFAULT_PART_SIZE_MB, partSizeBytes)
Assertions.assertEquals(Constants.MB * StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB, partSizeBytes)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package io.airbyte.cdk.integrations.destination.gcs
import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.avro.JsonSchemaType
import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
import io.airbyte.cdk.integrations.standardtest.destination.argproviders.NumberDataTypeTestArgumentProvider
import io.airbyte.commons.json.Jsons
Expand Down Expand Up @@ -102,8 +101,7 @@ abstract class GcsAvroParquetDestinationAcceptanceTest(s3Format: S3Format) : Gcs

@Throws(IOException::class)
private fun readMessagesFromFile(messagesFilename: String): List<AirbyteMessage> {
return MoreResources.readResource(messagesFilename).lines()
.map<AirbyteMessage>(Function { record: String? -> Jsons.deserialize(record, AirbyteMessage::class.java) }).collect<List<AirbyteMessage>, Any>(Collectors.toList<AirbyteMessage>())
return MoreResources.readResource(messagesFilename).lines().map { Jsons.deserialize(it, AirbyteMessage::class.java) }
}

@Throws(Exception::class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.avro.AvroConstants
import io.airbyte.cdk.integrations.destination.s3.util.AvroRecordHelper.getFieldNameUpdater
import io.airbyte.cdk.integrations.destination.s3.util.AvroRecordHelper.pruneAirbyteJson
import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator
import io.airbyte.commons.json.Jsons
Expand Down Expand Up @@ -46,7 +45,7 @@ abstract class GcsBaseAvroDestinationAcceptanceTest : GcsAvroParquetDestinationA
DataFileReader<GenericData.Record>(
SeekableByteArrayInput(`object`.objectContent.readAllBytes()),
GenericDatumReader<GenericData.Record>()).use { dataFileReader ->
val jsonReader: ObjectReader = GcsDestinationAcceptanceTest.Companion.MAPPER.reader()
val jsonReader: ObjectReader = MAPPER.reader()
while (dataFileReader.hasNext()) {
val record = dataFileReader.next()
val jsonBytes = AvroConstants.JSON_CONVERTER.convertToJson(record)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.util.Flattening
import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
import io.airbyte.commons.json.Jsons
import org.apache.commons.csv.CSVFormat
Expand Down Expand Up @@ -82,7 +81,7 @@ abstract class GcsBaseCsvDestinationAcceptanceTest : GcsDestinationAcceptanceTes
}

private fun getJsonNode(input: Map<String, String>, fieldTypes: Map<String, String>): JsonNode {
val json: ObjectNode = GcsDestinationAcceptanceTest.Companion.MAPPER.createObjectNode()
val json: ObjectNode = MAPPER.createObjectNode()

if (input.containsKey(JavaBaseConstants.COLUMN_NAME_DATA)) {
return Jsons.deserialize(input[JavaBaseConstants.COLUMN_NAME_DATA])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package io.airbyte.cdk.integrations.destination.gcs
import com.amazonaws.services.s3.model.S3Object
import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.destination.s3.util.Flattening
import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
import io.airbyte.commons.json.Jsons
import java.io.IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import com.amazonaws.services.s3.model.S3Object
import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
import io.airbyte.commons.json.Jsons
import java.io.BufferedReader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package io.airbyte.cdk.integrations.destination.gcs

import com.amazonaws.services.s3.model.S3Object
import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
import io.airbyte.commons.json.Jsons
import java.io.BufferedReader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import io.airbyte.cdk.integrations.destination.s3.avro.AvroConstants
import io.airbyte.cdk.integrations.destination.s3.parquet.S3ParquetWriter.Companion.getHadoopConfig
import io.airbyte.cdk.integrations.destination.s3.util.AvroRecordHelper.getFieldNameUpdater
import io.airbyte.cdk.integrations.destination.s3.util.AvroRecordHelper.pruneAirbyteJson
import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator
import io.airbyte.commons.json.Jsons
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import com.google.common.collect.ImmutableMap
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations
import io.airbyte.cdk.integrations.destination.s3.util.Flattening.value
import io.airbyte.cdk.integrations.standardtest.destination.DestinationAcceptanceTest
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
import io.airbyte.cdk.integrations.standardtest.destination.comparator.AdvancedTestDataComparator
Expand Down Expand Up @@ -46,9 +45,9 @@ import java.util.stream.Collectors
*/
abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format) : DestinationAcceptanceTest() {
protected var configJson: JsonNode? = null
protected var config: GcsDestinationConfig? = null
protected var s3Client: AmazonS3? = null
protected var nameTransformer: NamingConventionTransformer? = null
protected lateinit var config: GcsDestinationConfig
protected lateinit var s3Client: AmazonS3
protected lateinit var nameTransformer: NamingConventionTransformer
protected var s3StorageOperations: S3StorageOperations? = null

protected val baseConfigJson: JsonNode
Expand All @@ -62,7 +61,7 @@ abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format
return configJson!!
}

override fun getDefaultSchema(config: JsonNode): String {
override fun getDefaultSchema(config: JsonNode): String? {
if (config.has("gcs_bucket_path")) {
return config["gcs_bucket_path"].asText()
}
Expand Down Expand Up @@ -245,6 +244,7 @@ abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format

companion object {
protected val LOGGER: Logger = LoggerFactory.getLogger(GcsDestinationAcceptanceTest::class.java)
@JvmStatic
protected val MAPPER: ObjectMapper = MoreMappers.initMapper()

protected const val SECRET_FILE_PATH: String = "secrets/config.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ open class S3DestinationConfig {
private val lock = Any()
var s3Client: AmazonS3
get() {
synchronized(lock) {
if (s3Client == null) {
return resetS3Client()
if (s3Client == null) {
synchronized(lock) {
if (s3Client == null) {
s3Client = resetS3Client()
}
}
return s3Client
}
return s3Client
}
private set

Expand Down Expand Up @@ -84,7 +86,7 @@ open class S3DestinationConfig {
pathFormat: String,
credentialConfig: S3CredentialConfig,
formatConfig: S3FormatConfig,
s3Client: AmazonS3,
s3Client: AmazonS3?,
fileNamePattern: String?,
checkIntegrity: Boolean,
uploadThreadsCount: Int) {
Expand All @@ -95,7 +97,7 @@ open class S3DestinationConfig {
this.pathFormat = pathFormat
this.s3CredentialConfig = credentialConfig
this.formatConfig = formatConfig
this.s3Client = s3Client
this.s3Client = s3Client ?: resetS3Client()
this.fileNamePattern = fileNamePattern
this.isCheckIntegrity = checkIntegrity
this.uploadThreadsCount = uploadThreadsCount
Expand Down

0 comments on commit 8078674

Please sign in to comment.