From 0423a25e350c67b2813676ee4d349f013309d583 Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Tue, 17 Dec 2024 13:02:49 -0800 Subject: [PATCH] WIP fix tiny parts --- .../base/src/main/resources/application.yaml | 2 + .../DockerizedDestination.kt | 2 +- .../ObjectStorageDestinationStateManager.kt | 21 +++++----- .../ObjectStorageStreamLoaderFactory.kt | 9 +++-- .../object_storage/RecordToPartAccumulator.kt | 28 +++++++++---- .../ObjectStorageStreamLoaderTest.kt | 4 +- .../RecordToPartAccumulatorTest.kt | 39 +++++++++++++------ .../dev_null/DevNullConfiguration.kt | 9 +++-- .../src/main/resources/application.yaml | 2 + 9 files changed, 79 insertions(+), 37 deletions(-) diff --git a/airbyte-cdk/bulk/core/base/src/main/resources/application.yaml b/airbyte-cdk/bulk/core/base/src/main/resources/application.yaml index e6160c5260e0..7bad9b3fce29 100644 --- a/airbyte-cdk/bulk/core/base/src/main/resources/application.yaml +++ b/airbyte-cdk/bulk/core/base/src/main/resources/application.yaml @@ -8,3 +8,5 @@ airbyte: flush: rate-ms: 900000 # 15 minutes window-ms: 900000 # 15 minutes + destination: + record-batch-size-override: ${AIRBYTE_DESTINATION_RECORD_BATCH_SIZE_OVERRIDE:null} diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DockerizedDestination.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DockerizedDestination.kt index 1d8744afd915..1da2fc8d7892 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DockerizedDestination.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DockerizedDestination.kt @@ -116,7 +116,7 @@ class DockerizedDestination( "-v", "$fileTransferMountSource:/tmp", "-e", - "AIRBYTE_DESTINATION_RECORD_BATCH_SIZE=1", + "AIRBYTE_DESTINATION_RECORD_BATCH_SIZE_OVERRIDE=1", "-e", "USE_FILE_TRANSFER=$useFileTransfer", ) + diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt index 0488d55c4433..9ee96112c0e6 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt @@ -84,9 +84,8 @@ class ObjectStorageDestinationState( val partNumber: Long, ) - @get:JsonIgnore - val generations: Sequence - get() = + private suspend fun getGenerations(): Sequence = + accessLock.withLock { generationMap.entries .asSequence() .map { (state, gens) -> @@ -100,14 +99,16 @@ class ObjectStorageDestinationState( } } .flatten() + } - @get:JsonIgnore - val nextPartNumber: Long - get() = generations.flatMap { it.objects }.map { it.partNumber }.maxOrNull()?.plus(1) ?: 0L + suspend fun getNextPartNumber(): Long = + getGenerations().flatMap { it.objects }.map { it.partNumber }.maxOrNull()?.plus(1) ?: 0L /** Returns generationId -> objectAndPart for all staged objects that should be kept. */ - fun getStagedObjectsToFinalize(minimumGenerationId: Long): Sequence> = - generations + suspend fun getStagedObjectsToFinalize( + minimumGenerationId: Long + ): Sequence> = + getGenerations() .filter { it.isStaging && it.generationId >= minimumGenerationId } .flatMap { it.objects.map { obj -> it.generationId to obj } } @@ -115,8 +116,8 @@ class ObjectStorageDestinationState( * Returns generationId -> objectAndPart for all objects (staged and unstaged) that should be * cleaned up. */ - fun getObjectsToDelete(minimumGenerationId: Long): Sequence> { - val (toKeep, toDrop) = generations.partition { it.generationId >= minimumGenerationId } + suspend fun getObjectsToDelete(minimumGenerationId: Long): Sequence> { + val (toKeep, toDrop) = getGenerations().partition { it.generationId >= minimumGenerationId } val keepKeys = toKeep.flatMap { it.objects.map { obj -> obj.key } }.toSet() return toDrop.asSequence().flatMap { it.objects.filter { obj -> obj.key !in keepKeys }.map { obj -> it.generationId to obj } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt index c3b2db03ba77..5f362fcffa49 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt @@ -51,6 +51,7 @@ class ObjectStorageStreamLoaderFactory, U : OutputStream>( pathFactory, bufferedWriterFactory, destinationStateManager, + uploadConfigurationProvider.objectStorageUploadConfiguration.uploadPartSizeBytes, uploadConfigurationProvider.objectStorageUploadConfiguration.fileSizeBytes ) } @@ -67,7 +68,8 @@ class ObjectStorageStreamLoader, U : OutputStream>( private val pathFactory: ObjectStoragePathFactory, private val bufferedWriterFactory: BufferedFormattingWriterFactory, private val destinationStateManager: DestinationStateManager, - private val recordBatchSizeBytes: Long, + private val partSizeBytes: Long, + private val fileSizeBytes: Long, ) : StreamLoader { private val log = KotlinLogging.logger {} @@ -79,7 +81,7 @@ class ObjectStorageStreamLoader, U : OutputStream>( val state = destinationStateManager.getState(stream) // This is the number used to populate {part_number} on the object path. // We'll call it file number here to avoid confusion with the part index used for uploads. - val fileNumber = state.nextPartNumber + val fileNumber = state.getNextPartNumber() log.info { "Got next file number from destination state: $fileNumber" } this.fileNumber.set(fileNumber) } @@ -88,7 +90,8 @@ class ObjectStorageStreamLoader, U : OutputStream>( return RecordToPartAccumulator( pathFactory, bufferedWriterFactory, - recordBatchSizeBytes, + partSizeBytes = partSizeBytes, + fileSizeBytes = fileSizeBytes, stream, fileNumber ) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulator.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulator.kt index 4ebfe5a79634..5434e79002d0 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulator.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulator.kt @@ -26,7 +26,8 @@ data class ObjectInProgress( class RecordToPartAccumulator( private val pathFactory: ObjectStoragePathFactory, private val bufferedWriterFactory: BufferedFormattingWriterFactory, - private val recordBatchSizeBytes: Long, + private val partSizeBytes: Long, + private val fileSizeBytes: Long, private val stream: DestinationStream, private val fileNumber: AtomicLong, ) : BatchAccumulator { @@ -66,26 +67,37 @@ class RecordToPartAccumulator( partialUpload.writer.flush() // Check if we have reached the target size. - val newSize = partialUpload.partFactory.totalSize + partialUpload.writer.bufferSize - if (newSize >= recordBatchSizeBytes || endOfStream) { + val bufferSize = partialUpload.writer.bufferSize + val newSize = partialUpload.partFactory.totalSize + bufferSize + if (newSize >= fileSizeBytes || endOfStream) { - // If we have reached target size, clear the object and yield a final part. + // If we have reached target file size, clear the object and yield a final part. val bytes = partialUpload.writer.finish() partialUpload.writer.close() val part = partialUpload.partFactory.nextPart(bytes, isFinal = true) log.info { - "Size $newSize/${recordBatchSizeBytes}b reached (endOfStream=$endOfStream), yielding final part ${part.partIndex} (empty=${part.isEmpty})" + val reason = if (endOfStream) "end of stream" else "file size ${fileSizeBytes}b" + "Buffered: ${bufferSize}b; total: ${newSize}b; $reason reached, yielding final part ${part.partIndex}" } currentObject.remove(key) return LoadablePart(part) - } else { - // If we have not reached target size, just yield the next part. + } else if (bufferSize >= partSizeBytes) { + // If we have not reached file size, but have reached part size, yield a non-final part. val bytes = partialUpload.writer.takeBytes() val part = partialUpload.partFactory.nextPart(bytes) log.info { - "Size $newSize/${recordBatchSizeBytes}b not reached, yielding part ${part.partIndex} (empty=${part.isEmpty})" + "Buffered: ${bufferSize}b; total ${newSize}b; part size ${partSizeBytes}b reached, yielding part ${part.partIndex}" + } + + return LoadablePart(part) + } else { + // If we have not reached either the file or part size, yield a null part. + // TODO: Change this to a generator interface so we never have to do this. + val part = partialUpload.partFactory.nextPart(null) + log.info { + "Buffered: ${bufferSize}b; total ${newSize}b; part size ${partSizeBytes}b not reached, yielding null part ${part.partIndex}" } return LoadablePart(part) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderTest.kt index c7dd8040f147..47631dd552c2 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderTest.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderTest.kt @@ -37,6 +37,7 @@ class ObjectStorageStreamLoaderTest { mockk(relaxed = true) private val destinationStateManager: DestinationStateManager = mockk(relaxed = true) + private val fileSize: Long = 2 private val partSize: Long = 1 private val objectStorageStreamLoader = @@ -48,7 +49,8 @@ class ObjectStorageStreamLoaderTest { pathFactory, writerFactory, destinationStateManager, - partSize + partSizeBytes = partSize, + fileSizeBytes = fileSize ) ) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulatorTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulatorTest.kt index 511b2b956928..527651bbc476 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulatorTest.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulatorTest.kt @@ -21,7 +21,8 @@ import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test class RecordToPartAccumulatorTest { - private val recordBatchSizeBytes: Long = 10L + private val partSizeBytes: Long = 2L + private val fileSizeBytes: Long = 4L private lateinit var pathFactory: ObjectStoragePathFactory private lateinit var bufferedWriterFactory: BufferedFormattingWriterFactory @@ -63,7 +64,8 @@ class RecordToPartAccumulatorTest { RecordToPartAccumulator( pathFactory = pathFactory, bufferedWriterFactory = bufferedWriterFactory, - recordBatchSizeBytes = recordBatchSizeBytes, + partSizeBytes = partSizeBytes, + fileSizeBytes = fileSizeBytes, stream = stream, fileNumber = fileNumber ) @@ -93,11 +95,11 @@ class RecordToPartAccumulatorTest { // Object 1 - // part 6/10b => not data sufficient, should be first and nonfinal - when (val batch = acc.processRecords(makeRecords(6), 0L, false) as ObjectStorageBatch) { + // part 0->1/2b of 4b total => not data sufficient, should be first and empty + when (val batch = acc.processRecords(makeRecords(1), 0L, false) as ObjectStorageBatch) { is LoadablePart -> { - assert(batch.part.bytes.contentEquals(makeBytes(6))) - assert(batch.part.partIndex == 1) + assert(batch.part.isEmpty) + assert(batch.part.partIndex == 0) assert(batch.part.fileNumber == 111L) assert(!batch.isPersisted()) assert(!batch.part.isFinal) @@ -110,6 +112,19 @@ class RecordToPartAccumulatorTest { when (val batch = acc.processRecords(makeRecords(0), 0L, false) as ObjectStorageBatch) { is LoadablePart -> { assert(batch.part.isEmpty) + assert(batch.part.partIndex == 0) + assert(batch.part.fileNumber == 111L) + assert(!batch.isPersisted()) + assert(!batch.part.isFinal) + assert(batch.part.key == "path.111") + } + else -> assert(false) + } + + // part 1->3/2b of 4b total => data sufficient for part, should be first part and nonfinal + when (val batch = acc.processRecords(makeRecords(2), 0L, false) as ObjectStorageBatch) { + is LoadablePart -> { + assert(batch.part.bytes.contentEquals(makeBytes(3))) assert(batch.part.partIndex == 1) assert(batch.part.fileNumber == 111L) assert(!batch.isPersisted()) @@ -119,10 +134,12 @@ class RecordToPartAccumulatorTest { else -> assert(false) } - // part 11/10b => data sufficient, should be second now and final - when (val batch = acc.processRecords(makeRecords(5), 0L, false) as ObjectStorageBatch) { + // part 3->4/2b of 4b total => data sufficient for file (but not part! this is expected!), + // should be second part and final (and not empty) + when (val batch = acc.processRecords(makeRecords(1), 0L, false) as ObjectStorageBatch) { is LoadablePart -> { - assert(batch.part.bytes.contentEquals(makeBytes(5))) + println(batch.part.bytes.contentToString()) + assert(batch.part.bytes.contentEquals(makeBytes(1))) assert(batch.part.partIndex == 2) assert(batch.part.fileNumber == 111L) assert(!batch.isPersisted()) @@ -134,7 +151,7 @@ class RecordToPartAccumulatorTest { // Object 2 - // Next part 10/10b => data sufficient, should be first and final + // Next part 10/4b => data sufficient, should be first and final when (val batch = acc.processRecords(makeRecords(10), 0L, false) as ObjectStorageBatch) { is LoadablePart -> { assert(batch.part.bytes.contentEquals(makeBytes(10))) @@ -163,7 +180,7 @@ class RecordToPartAccumulatorTest { // One flush per call, one create/close per finished object coVerify(exactly = 3) { bufferedWriterFactory.create(any()) } - coVerify(exactly = 5) { bufferedWriter.flush() } + coVerify(exactly = 6) { bufferedWriter.flush() } coVerify(exactly = 3) { bufferedWriter.close() } } } diff --git a/airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullConfiguration.kt b/airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullConfiguration.kt index 7cd777ee8187..58a5249c4562 100644 --- a/airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullConfiguration.kt +++ b/airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullConfiguration.kt @@ -42,12 +42,12 @@ data class DevNullConfiguration( @Singleton class DevNullConfigurationFactory( @Value("\${airbyte.destination.record-batch-size-override}") - private val recordBatchSizeBytes: Long + private val recordBatchSizeBytesOverride: Long? ) : DestinationConfigurationFactory { private val log = KotlinLogging.logger {} override fun makeWithoutExceptionHandling(pojo: DevNullSpecification): DevNullConfiguration { - log.info { "Record batch size from environment: $recordBatchSizeBytes" } + log.info { "Record batch size from environment: $recordBatchSizeBytesOverride" } return when (pojo) { is DevNullSpecificationOss -> { when (pojo.testDestination) { @@ -108,7 +108,10 @@ class DevNullConfigurationFactory( } } } - }.copy(recordBatchSizeBytes = recordBatchSizeBytes) + }.copy( + recordBatchSizeBytes = recordBatchSizeBytesOverride + ?: DestinationConfiguration.DEFAULT_RECORD_BATCH_SIZE_BYTES + ) } } diff --git a/airbyte-integrations/connectors/destination-dev-null/src/main/resources/application.yaml b/airbyte-integrations/connectors/destination-dev-null/src/main/resources/application.yaml index a723a0dae96e..3d7d25d71e0e 100644 --- a/airbyte-integrations/connectors/destination-dev-null/src/main/resources/application.yaml +++ b/airbyte-integrations/connectors/destination-dev-null/src/main/resources/application.yaml @@ -11,3 +11,5 @@ airbyte: flush: rate-ms: 900000 # 15 minutes window-ms: 900000 # 15 minutes + destination: + record-batch-size-override: ${AIRBYTE_DESTINATION_RECORD_BATCH_SIZE_OVERRIDE:null}