diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index e55547ab4834..0b5c423cca46 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -174,6 +174,7 @@ corresponds to that version. | Version | Date | Pull Request | Subject | | :--------- | :--------- | :----------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| 0.44.21 | 2024-09-04 | [\#45143](https://github.com/airbytehq/airbyte/pull/45143) | S3-destination: don't overwrite existing files, skip those file indexes instead | | 0.44.20 | 2024-08-30 | [\#44933](https://github.com/airbytehq/airbyte/pull/44933) | Avro/Parquet destinations: handle `{}` schemas inside objects/arrays | | 0.44.19 | 2024-08-20 | [\#44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase Jackson message length limit to 100mb | | 0.44.18 | 2024-08-22 | [\#44505](https://github.com/airbytehq/airbyte/pull/44505) | Improve handling of incoming debezium change events | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 940c3205c2ba..5590734237c5 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.44.20 +version=0.44.21 diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt index b92f79722bb9..894e53789973 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt @@ -45,6 +45,7 @@ open class S3StorageOperations( private val s3FilenameTemplateManager: S3FilenameTemplateManager = S3FilenameTemplateManager() private val partCounts: ConcurrentMap = ConcurrentHashMap() + private val objectNameByPrefix: ConcurrentMap> = ConcurrentHashMap() override fun getBucketObjectPath( namespace: String?, @@ -167,6 +168,32 @@ open class S3StorageOperations( * @return the uploaded filename, which is different from the serialized buffer filename * */ + @VisibleForTesting + fun getFileName( + objectPath: String, + recordsData: SerializableBuffer, + ): String { + var fullObjectKey: String + do { + val partId: String = getPartId(objectPath) + val fileExtension: String = getExtension(recordsData.filename) + fullObjectKey = + if (!s3Config.fileNamePattern.isNullOrBlank()) { + s3FilenameTemplateManager.applyPatternToFilename( + S3FilenameTemplateParameterObject.builder() + .partId(partId) + .recordsData(recordsData) + .objectPath(objectPath) + .fileExtension(fileExtension) + .fileNamePattern(s3Config.fileNamePattern) + .build(), + ) + } else { + objectPath + partId + fileExtension + } + } while (objectNameByPrefix.getValue(objectPath).contains(fullObjectKey)) + return fullObjectKey + } @Throws(IOException::class) private fun loadDataIntoBucket( objectPath: String, @@ -175,22 +202,7 @@ open class S3StorageOperations( ): String { val partSize: Long = DEFAULT_PART_SIZE.toLong() val bucket: String? = s3Config.bucketName - val partId: String = getPartId(objectPath) - val fileExtension: String = getExtension(recordsData.filename) - val fullObjectKey: String = - if (!s3Config.fileNamePattern.isNullOrBlank()) { - s3FilenameTemplateManager.applyPatternToFilename( - S3FilenameTemplateParameterObject.builder() - .partId(partId) - .recordsData(recordsData) - .objectPath(objectPath) - .fileExtension(fileExtension) - .fileNamePattern(s3Config.fileNamePattern) - .build(), - ) - } else { - objectPath + partId + fileExtension - } + val fullObjectKey: String = getFileName(objectPath, recordsData) val metadata: MutableMap = HashMap() for (blobDecorator: BlobDecorator in blobDecorators) { blobDecorator.updateMetadata(metadata, getMetadataMapping()) @@ -263,31 +275,14 @@ open class S3StorageOperations( ) { AtomicInteger(0) } - - if (partCount.get() == 0) { - var objects: ObjectListing? - var objectCount = 0 - - val bucket: String? = s3Config.bucketName - objects = s3Client.listObjects(bucket, objectPath) - - if (objects != null) { - objectCount += objects.objectSummaries.size - while (objects != null && objects.nextMarker != null) { - objects = - s3Client.listObjects( - ListObjectsRequest() - .withBucketName(bucket) - .withPrefix(objectPath) - .withMarker(objects.nextMarker), - ) - if (objects != null) { - objectCount += objects.objectSummaries.size - } - } + objectNameByPrefix.computeIfAbsent( + objectPath, + ) { + var objectList: Set = setOf() + forObjectsByPage(objectPath) { objectSummaries -> + objectList = objectList + objectSummaries.map { it.key } } - - partCount.set(objectCount) + objectList } return partCount.getAndIncrement().toString() diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/JsonRecordParquetPreprocessor.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/JsonRecordParquetPreprocessor.kt index b5c27a64ddc4..63b89f92cb4a 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/JsonRecordParquetPreprocessor.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/JsonRecordParquetPreprocessor.kt @@ -12,7 +12,7 @@ import io.airbyte.cdk.integrations.destination.s3.jsonschema.JsonRecordIdentityM import io.airbyte.commons.jackson.MoreMappers class JsonRecordParquetPreprocessor : JsonRecordIdentityMapper() { - private fun mapCommon(record: JsonNode?, matchingOption: ObjectNode): JsonNode? { + private fun mapCommon(record: JsonNode?, matchingOption: ObjectNode): ObjectNode { val newObj = MoreMappers.initMapper().createObjectNode() val propertyName = JsonSchemaParquetPreprocessor.typeFieldName(matchingOption) @@ -24,7 +24,7 @@ class JsonRecordParquetPreprocessor : JsonRecordIdentityMapper() { return newObj } - override fun mapUnion(record: JsonNode?, schema: ObjectNode): JsonNode? { + override fun mapUnion(record: JsonNode?, schema: ObjectNode): ObjectNode? { if (record == null || record.isNull) { return null } @@ -35,7 +35,7 @@ class JsonRecordParquetPreprocessor : JsonRecordIdentityMapper() { return mapCommon(record, matchingOption) } - override fun mapCombined(record: JsonNode?, schema: ObjectNode): JsonNode? { + override fun mapCombined(record: JsonNode?, schema: ObjectNode): ObjectNode? { if (record == null || record.isNull) { return null } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperationsTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperationsTest.kt index 750a312f1380..947f96533964 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperationsTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperationsTest.kt @@ -10,6 +10,7 @@ import com.amazonaws.services.s3.model.ListObjectsRequest import com.amazonaws.services.s3.model.ObjectListing import com.amazonaws.services.s3.model.S3ObjectSummary import io.airbyte.cdk.integrations.destination.NamingConventionTransformer +import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer import io.airbyte.cdk.integrations.destination.s3.util.S3NameTransformer import java.util.concurrent.Executors import java.util.concurrent.TimeUnit @@ -23,6 +24,7 @@ import org.junit.jupiter.api.Test import org.mockito.ArgumentCaptor import org.mockito.ArgumentMatchers import org.mockito.Mockito +import org.mockito.kotlin.eq class S3StorageOperationsTest { @@ -31,7 +33,9 @@ class S3StorageOperationsTest { private const val FAKE_BUCKET_PATH = "fake-bucketPath" private const val NAMESPACE = "namespace" private const val STREAM_NAME = "stream_name1" - private const val OBJECT_TO_DELETE = "$NAMESPACE/$STREAM_NAME/2022_04_04_123456789_0.csv.gz" + private const val OBJECT_PREFIX = "$NAMESPACE/$STREAM_NAME/2022_04_04_123456789_" + private const val OBJECT_EXTENSION = ".csv.gz" + private const val OBJECT_TO_DELETE = "${OBJECT_PREFIX}1$OBJECT_EXTENSION" } private lateinit var s3Client: AmazonS3 @@ -74,6 +78,15 @@ class S3StorageOperationsTest { ), ) .thenReturn(results) + Mockito.`when`( + s3Client.listObjects( + eq(BUCKET_NAME), + ArgumentMatchers.any( + String::class.java, + ), + ), + ) + .thenReturn(results) val s3Config = S3DestinationConfig.create(BUCKET_NAME, FAKE_BUCKET_PATH, "fake-region") @@ -210,4 +223,22 @@ class S3StorageOperationsTest { assertEquals("1", s3StorageOperations.getPartId(FAKE_BUCKET_PATH)) assertEquals("0", s3StorageOperations.getPartId("other_path")) } + + @Test + fun testGetFileName() { + val recordsData = + Mockito.mock( + SerializableBuffer::class.java, + ) + Mockito.`when`(recordsData.filename).thenReturn(".csv.gz") + assertEquals( + OBJECT_PREFIX + 0 + OBJECT_EXTENSION, + s3StorageOperations.getFileName(OBJECT_PREFIX, recordsData) + ) + // 1 is skipped because it's already existing + assertEquals( + OBJECT_PREFIX + 2 + OBJECT_EXTENSION, + s3StorageOperations.getFileName(OBJECT_PREFIX, recordsData) + ) + } } diff --git a/airbyte-integrations/connectors/destination-s3/build.gradle b/airbyte-integrations/connectors/destination-s3/build.gradle index 67bfe4e1dd63..93b032e93b04 100644 --- a/airbyte-integrations/connectors/destination-s3/build.gradle +++ b/airbyte-integrations/connectors/destination-s3/build.gradle @@ -4,7 +4,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.44.20' + cdkVersionRequired = '0.44.21' features = ['db-destinations', 's3-destinations'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/destination-s3/metadata.yaml b/airbyte-integrations/connectors/destination-s3/metadata.yaml index fb887686ce4e..333b8d13f9ba 100644 --- a/airbyte-integrations/connectors/destination-s3/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 - dockerImageTag: 1.0.4 + dockerImageTag: 1.0.5 dockerRepository: airbyte/destination-s3 githubIssueLabel: destination-s3 icon: s3.svg diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index 37b044fb9de2..e10e13ec427b 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -535,7 +535,8 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou Expand to review | Version | Date | Pull Request | Subject | -| :------ | :--------- | :--------------------------------------------------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------- | +|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.0.5 | 2024-09-05 | [45143](https://github.com/airbytehq/airbyte/pull/45143) | don't overwrite (and delete) existing files, skip indexes instead | | 1.0.4 | 2024-08-30 | [44933](https://github.com/airbytehq/airbyte/pull/44933) | Fix: Avro/Parquet: handle empty schemas in nested objects/lists | | 1.0.3 | 2024-08-20 | [44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase message parsing limit to 100mb | | 1.0.2 | 2024-08-19 | [44401](https://github.com/airbytehq/airbyte/pull/44401) | Fix: S3 Avro/Parquet: handle nullable top-level schema |