Skip to content

Commit

Permalink
fix compiler errors
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Mar 27, 2024
1 parent 30e5b72 commit a37358d
Show file tree
Hide file tree
Showing 37 changed files with 866 additions and 517 deletions.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.28.9
version=0.28.10
12 changes: 12 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/gcs-destinations/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@ java {
}
}

compileKotlin {
compilerOptions {
allWarningsAsErrors = false
}
}

compileTestFixturesKotlin {
compilerOptions {
allWarningsAsErrors = false
}
}

dependencies {
implementation project(':airbyte-cdk:java:airbyte-cdk:dependencies')
implementation project(':airbyte-cdk:java:airbyte-cdk:core')
Expand Down
Empty file.
Empty file.
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -12,66 +12,89 @@ import io.airbyte.cdk.integrations.base.Destination
import io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage.getErrorMessage
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
import io.airbyte.cdk.integrations.destination.record_buffer.BufferStorage
import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer
import io.airbyte.cdk.integrations.destination.s3.S3BaseChecks.testMultipartUpload
import io.airbyte.cdk.integrations.destination.s3.S3BaseChecks.testSingleUpload
import io.airbyte.cdk.integrations.destination.s3.S3ConsumerFactory
import io.airbyte.cdk.integrations.destination.s3.SerializedBufferFactory.Companion.getCreateFunction
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.function.Consumer
import java.util.function.Function
import org.slf4j.Logger
import org.slf4j.LoggerFactory

abstract class BaseGcsDestination : BaseConnector(), Destination {
private val nameTransformer: NamingConventionTransformer = GcsNameTransformer()

override fun check(config: JsonNode): AirbyteConnectionStatus? {
try {
val destinationConfig: GcsDestinationConfig = GcsDestinationConfig.Companion.getGcsDestinationConfig(config)
val s3Client = destinationConfig.s3Client
val destinationConfig: GcsDestinationConfig =
GcsDestinationConfig.Companion.getGcsDestinationConfig(config)
val s3Client = destinationConfig.getS3Client()

// Test single upload (for small files) permissions
testSingleUpload(s3Client, destinationConfig.bucketName, destinationConfig.bucketPath)

// Test multipart upload with stream transfer manager
testMultipartUpload(s3Client, destinationConfig.bucketName, destinationConfig.bucketPath)
testMultipartUpload(
s3Client,
destinationConfig.bucketName,
destinationConfig.bucketPath
)

return AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED)
} catch (e: AmazonS3Exception) {
LOGGER.error("Exception attempting to access the Gcs bucket", e)
val message = getErrorMessage(e.errorCode, 0, e.message, e)
emitConfigErrorTrace(e, message)
return AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage(message)
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage(message)
} catch (e: Exception) {
LOGGER.error("Exception attempting to access the Gcs bucket: {}. Please make sure you account has all of these roles: " + EXPECTED_ROLES, e)
LOGGER.error(
"Exception attempting to access the Gcs bucket: {}. Please make sure you account has all of these roles: " +
EXPECTED_ROLES,
e
)
emitConfigErrorTrace(e, e.message)
return AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("Could not connect to the Gcs bucket with the provided configuration. \n" + e
.message)
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage(
"Could not connect to the Gcs bucket with the provided configuration. \n" +
e.message
)
}
}

override fun getConsumer(config: JsonNode,
configuredCatalog: ConfiguredAirbyteCatalog?,
outputRecordCollector: Consumer<AirbyteMessage?>?): AirbyteMessageConsumer? {
val gcsConfig: GcsDestinationConfig = GcsDestinationConfig.Companion.getGcsDestinationConfig(config)
return S3ConsumerFactory().create(
override fun getConsumer(
config: JsonNode,
configuredCatalog: ConfiguredAirbyteCatalog,
outputRecordCollector: Consumer<AirbyteMessage?>?
): AirbyteMessageConsumer? {
val gcsConfig: GcsDestinationConfig =
GcsDestinationConfig.Companion.getGcsDestinationConfig(config)
return S3ConsumerFactory()
.create(
outputRecordCollector,
GcsStorageOperations(nameTransformer, gcsConfig.s3Client, gcsConfig),
GcsStorageOperations(nameTransformer, gcsConfig.getS3Client(), gcsConfig),
nameTransformer,
getCreateFunction(gcsConfig, Function<String, BufferStorage> { fileExtension: String? -> FileBuffer(fileExtension) }),
getCreateFunction(
gcsConfig,
Function<String, BufferStorage> { fileExtension: String ->
FileBuffer(fileExtension)
}
),
gcsConfig,
configuredCatalog)
configuredCatalog
)
}

companion object {
private val LOGGER: Logger = LoggerFactory.getLogger(BaseGcsDestination::class.java)
const val EXPECTED_ROLES: String = ("storage.multipartUploads.abort, storage.multipartUploads.create, "
+ "storage.objects.create, storage.objects.delete, storage.objects.get, storage.objects.list")
const val EXPECTED_ROLES: String =
("storage.multipartUploads.abort, storage.multipartUploads.create, " +
"storage.objects.create, storage.objects.delete, storage.objects.get, storage.objects.list")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,47 @@ import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations
* Currently we always reuse the S3 client for GCS. So the GCS config extends from the S3 config.
* This may change in the future.
*/
class GcsDestinationConfig(bucketName: String?,
bucketPath: String?,
bucketRegion: String?,
val gcsCredentialConfig: GcsCredentialConfig?,
formatConfig: S3FormatConfig?) : S3DestinationConfig(GCS_ENDPOINT,
class GcsDestinationConfig(
bucketName: String,
bucketPath: String,
bucketRegion: String?,
val gcsCredentialConfig: GcsCredentialConfig,
formatConfig: S3FormatConfig
) :
S3DestinationConfig(
GCS_ENDPOINT,
bucketName!!,
bucketPath!!,
bucketRegion,
S3DestinationConstants.DEFAULT_PATH_FORMAT,
gcsCredentialConfig.getS3CredentialConfig().orElseThrow(),
gcsCredentialConfig.s3CredentialConfig.orElseThrow(),
formatConfig!!,
null,
null,
false,
S3StorageOperations.DEFAULT_UPLOAD_THREADS) {
S3StorageOperations.DEFAULT_UPLOAD_THREADS
) {
override fun createS3Client(): AmazonS3 {
when (gcsCredentialConfig!!.credentialType) {
when (gcsCredentialConfig.credentialType) {
GcsCredentialType.HMAC_KEY -> {
val hmacKeyCredential = gcsCredentialConfig as GcsHmacKeyCredentialConfig?
val awsCreds = BasicAWSCredentials(hmacKeyCredential.getHmacKeyAccessId(), hmacKeyCredential.getHmacKeySecret())
val hmacKeyCredential = gcsCredentialConfig as GcsHmacKeyCredentialConfig
val awsCreds =
BasicAWSCredentials(
hmacKeyCredential.hmacKeyAccessId,
hmacKeyCredential.hmacKeySecret
)

return AmazonS3ClientBuilder.standard()
.withEndpointConfiguration(
AwsClientBuilder.EndpointConfiguration(GCS_ENDPOINT, bucketRegion))
.withCredentials(AWSStaticCredentialsProvider(awsCreds))
.build()
.withEndpointConfiguration(
AwsClientBuilder.EndpointConfiguration(GCS_ENDPOINT, bucketRegion)
)
.withCredentials(AWSStaticCredentialsProvider(awsCreds))
.build()
}

else -> throw IllegalArgumentException("Unsupported credential type: " + gcsCredentialConfig.credentialType!!.name)
else ->
throw IllegalArgumentException(
"Unsupported credential type: " + gcsCredentialConfig.credentialType!!.name
)
}
}

Expand All @@ -60,11 +72,12 @@ class GcsDestinationConfig(bucketName: String?,

fun getGcsDestinationConfig(config: JsonNode): GcsDestinationConfig {
return GcsDestinationConfig(
config["gcs_bucket_name"].asText(),
config["gcs_bucket_path"].asText(),
config["gcs_bucket_region"].asText(),
GcsCredentialConfigs.getCredentialConfig(config),
getS3FormatConfig(config))
config["gcs_bucket_name"].asText(),
config["gcs_bucket_path"].asText(),
config["gcs_bucket_region"].asText(),
GcsCredentialConfigs.getCredentialConfig(config),
getS3FormatConfig(config)
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,25 @@ import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations
import org.slf4j.Logger
import org.slf4j.LoggerFactory

class GcsStorageOperations(nameTransformer: NamingConventionTransformer?,
s3Client: AmazonS3?,
s3Config: S3DestinationConfig?) : S3StorageOperations(nameTransformer!!, s3Client!!, s3Config!!) {
/**
* GCS only supports the legacy AmazonS3#doesBucketExist method.
*/
class GcsStorageOperations(
nameTransformer: NamingConventionTransformer,
s3Client: AmazonS3,
s3Config: S3DestinationConfig
) : S3StorageOperations(nameTransformer!!, s3Client!!, s3Config!!) {
/** GCS only supports the legacy AmazonS3#doesBucketExist method. */
override fun doesBucketExist(bucket: String?): Boolean {
return s3Client.doesBucketExist(bucket)
}

/**
* This method is overridden because GCS doesn't accept request to delete multiple objects. The only
* difference is that the AmazonS3#deleteObjects method is replaced with AmazonS3#deleteObject.
* This method is overridden because GCS doesn't accept request to delete multiple objects. The
* only difference is that the AmazonS3#deleteObjects method is replaced with
* AmazonS3#deleteObject.
*/
override fun cleanUpObjects(bucket: String?, keysToDelete: List<DeleteObjectsRequest.KeyVersion>) {
override fun cleanUpObjects(
bucket: String?,
keysToDelete: List<DeleteObjectsRequest.KeyVersion>
) {
for (keyToDelete in keysToDelete) {
LOGGER.info("Deleting object {}", keyToDelete.key)
s3Client.deleteObject(bucket, keyToDelete.key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,31 @@ import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.avro.AvroRecordFactory
import io.airbyte.cdk.integrations.destination.s3.avro.JsonToAvroSchemaConverter
import io.airbyte.cdk.integrations.destination.s3.avro.S3AvroFormatConfig
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create
import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import java.io.IOException
import java.sql.Timestamp
import java.util.*
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericDatumWriter
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import tech.allegro.schema.json2avro.converter.JsonAvroConverter
import java.io.IOException
import java.sql.Timestamp
import java.util.*

class GcsAvroWriter @JvmOverloads constructor(config: GcsDestinationConfig,
s3Client: AmazonS3,
configuredStream: ConfiguredAirbyteStream,
uploadTimestamp: Timestamp,
converter: JsonAvroConverter?,
jsonSchema: JsonNode? = null) : BaseGcsWriter(config, s3Client, configuredStream), DestinationFileWriter {
class GcsAvroWriter
@JvmOverloads
constructor(
config: GcsDestinationConfig,
s3Client: AmazonS3,
configuredStream: ConfiguredAirbyteStream,
uploadTimestamp: Timestamp,
converter: JsonAvroConverter?,
jsonSchema: JsonNode? = null
) : BaseGcsWriter(config, s3Client, configuredStream), DestinationFileWriter {
private val avroRecordFactory: AvroRecordFactory
private val uploadManager: StreamTransferManager
private val outputStream: MultiPartOutputStream
Expand All @@ -42,30 +47,48 @@ class GcsAvroWriter @JvmOverloads constructor(config: GcsDestinationConfig,
override val outputPath: String

init {
val schema = if (jsonSchema == null
) GcsUtils.getDefaultAvroSchema(stream.name, stream.namespace, true, false)
else JsonToAvroSchemaConverter().getAvroSchema(jsonSchema, stream.name,
stream.namespace, true, false, false, true)
val schema =
if (jsonSchema == null)
GcsUtils.getDefaultAvroSchema(stream.name, stream.namespace, true, false)
else
JsonToAvroSchemaConverter()
.getAvroSchema(
jsonSchema,
stream.name,
stream.namespace,
true,
false,
false,
true
)
LOGGER.info("Avro schema for stream {}: {}", stream.name, schema!!.toString(false))

val outputFilename: String = BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, S3Format.AVRO)
val outputFilename: String =
BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, S3Format.AVRO)
outputPath = java.lang.String.join("/", outputPrefix, outputFilename)
fileLocation = String.format("gs://%s/%s", config.bucketName, outputPath)

LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.name, config.bucketName,
outputPath)
LOGGER.info(
"Full GCS path for stream '{}': {}/{}",
stream.name,
config.bucketName,
outputPath
)

this.avroRecordFactory = AvroRecordFactory(schema, converter)
this.uploadManager = create(config.bucketName, outputPath, s3Client)
.setPartSize(DEFAULT_PART_SIZE_MB.toLong())
this.uploadManager =
create(config.bucketName, outputPath, s3Client)
.setPartSize(StreamTransferManagerFactory.DEFAULT_PART_SIZE_MB.toLong())
.get()
// We only need one output stream as we only have one input stream. This is reasonably performant.
// We only need one output stream as we only have one input stream. This is reasonably
// performant.
this.outputStream = uploadManager.multiPartOutputStreams[0]

val formatConfig = config.formatConfig as S3AvroFormatConfig
// The DataFileWriter always uses binary encoding.
// If json encoding is needed in the future, use the GenericDatumWriter directly.
this.dataFileWriter = DataFileWriter(GenericDatumWriter<GenericData.Record>())
this.dataFileWriter =
DataFileWriter(GenericDatumWriter<GenericData.Record>())
.setCodec(formatConfig.codecFactory)
.create(schema, outputStream)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ import io.airbyte.cdk.integrations.destination.s3.credential.S3CredentialConfig
import java.util.*

interface GcsCredentialConfig : BlobStorageCredentialConfig<GcsCredentialType?> {
val s3CredentialConfig: Optional<S3CredentialConfig?>
val s3CredentialConfig: Optional<S3CredentialConfig>
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import java.util.*
object GcsCredentialConfigs {
fun getCredentialConfig(config: JsonNode): GcsCredentialConfig {
val credentialConfig = config["credential"]
val credentialType = GcsCredentialType.valueOf(credentialConfig["credential_type"].asText().uppercase(Locale.getDefault()))
val credentialType =
GcsCredentialType.valueOf(
credentialConfig["credential_type"].asText().uppercase(Locale.getDefault())
)

if (credentialType == GcsCredentialType.HMAC_KEY) {
return GcsHmacKeyCredentialConfig(credentialConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ class GcsHmacKeyCredentialConfig : GcsCredentialConfig {
override val credentialType: GcsCredentialType
get() = GcsCredentialType.HMAC_KEY

override val s3CredentialConfig: Optional<S3CredentialConfig?>
override val s3CredentialConfig: Optional<S3CredentialConfig>
get() = Optional.of(S3AccessKeyCredentialConfig(hmacKeyAccessId, hmacKeySecret))
}
Loading

0 comments on commit a37358d

Please sign in to comment.