Skip to content

Commit

Permalink
Destination S3 Data Lake: handle stream with no records (#52564)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Jan 27, 2025
1 parent a610ad8 commit 0a0a33f
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2445,6 +2445,27 @@ abstract class BasicFunctionalityIntegrationTest(
)
}

@Test
open fun testNoData() {
val stream =
DestinationStream(
DestinationStream.Descriptor(randomizedNamespace, "test_stream"),
Append,
ObjectType(linkedMapOf("id" to intType)),
generationId = 0,
minimumGenerationId = 0,
syncId = 42,
)
assertDoesNotThrow { runSync(configContents, stream, messages = emptyList()) }
dumpAndDiffRecords(
parsedConfig,
canonicalExpectedRecords = emptyList(),
stream,
primaryKey = listOf(listOf("id")),
cursor = null,
)
}

private fun schematizedObject(
fullObject: LinkedHashMap<String, Any?>,
coercedObject: LinkedHashMap<String, Any?> = fullObject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ data:
alias: airbyte-connector-testing-secret-store
connectorType: destination
definitionId: 716ca874-520b-4902-9f80-9fad66754b89
dockerImageTag: 0.2.20
dockerImageTag: 0.2.21
dockerRepository: airbyte/destination-s3-data-lake
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-data-lake
githubIssueLabel: destination-s3-data-lake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import io.airbyte.cdk.load.write.DestinationWriter
import io.airbyte.cdk.load.write.StreamLoader
import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeTableWriterFactory
import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil
import io.github.oshai.kotlinlogging.KotlinLogging
import javax.inject.Singleton

private val logger = KotlinLogging.logger {}

@Singleton
class S3DataLakeWriter(
private val s3DataLakeTableWriterFactory: S3DataLakeTableWriterFactory,
Expand All @@ -35,6 +38,17 @@ class S3DataLakeWriter(

s3DataLakeTableSynchronizer.applySchemaChanges(table, incomingSchema)

try {
logger.info {
"maybe creating branch $DEFAULT_STAGING_BRANCH for stream ${stream.descriptor}"
}
table.manageSnapshots().createBranch(DEFAULT_STAGING_BRANCH).commit()
} catch (e: IllegalArgumentException) {
logger.info {
"branch $DEFAULT_STAGING_BRANCH already exists for stream ${stream.descriptor}"
}
}

return S3DataLakeStreamLoader(
stream = stream,
table = table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ internal class S3DataLakeWriterTest {
}
val catalog: Catalog = mockk()
val table: Table = mockk { every { schema() } returns icebergSchema }
every { table.manageSnapshots().createBranch(any()).commit() } just runs
val s3DataLakeUtil: S3DataLakeUtil = mockk {
every { createCatalog(any(), any()) } returns catalog
every { createTable(any(), any(), any(), any()) } returns table
Expand All @@ -148,7 +149,7 @@ internal class S3DataLakeWriterTest {
}

@Test
fun testCreateStreamLoaderWithMismatchedSchemas() {
fun testCreateStreamLoaderWithMismatchedSchemasAndAlreadyExistingStagingBranch() {
val streamDescriptor = DestinationStream.Descriptor(namespace = "namespace", name = "name")
val stream =
DestinationStream(
Expand Down Expand Up @@ -210,6 +211,8 @@ internal class S3DataLakeWriterTest {
every { updateSchema.setIdentifierFields(any<Collection<String>>()) } returns updateSchema
every { updateSchema.commit() } just runs
every { table.refresh() } just runs
every { table.manageSnapshots().createBranch(any()).commit() } throws
IllegalArgumentException("branch already exists")
val s3DataLakeUtil: S3DataLakeUtil = mockk {
every { createCatalog(any(), any()) } returns catalog
every { createTable(any(), any(), any(), any()) } returns table
Expand Down Expand Up @@ -348,6 +351,7 @@ internal class S3DataLakeWriterTest {
every { updateSchema.setIdentifierFields(primaryKeys) } returns updateSchema
every { updateSchema.commit() } just runs
every { table.refresh() } just runs
every { table.manageSnapshots().createBranch(any()).commit() } just runs
val s3DataLakeUtil: S3DataLakeUtil = mockk {
every { createCatalog(any(), any()) } returns catalog
every { createTable(any(), any(), any(), any()) } returns table
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/destinations/s3-data-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ for more information.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------|
| 0.2.20 | 2025-01-32 | [\#52068](https://github.com/airbytehq/airbyte/pull/52068) | Add support for default namespace (/database name) |
| 0.2.21 | 2025-01-27 | [\#52564](https://github.com/airbytehq/airbyte/pull/52564) | Fix crash on stream with 0 records |
| 0.2.20 | 2025-01-23 | [\#52068](https://github.com/airbytehq/airbyte/pull/52068) | Add support for default namespace (/database name) |
| 0.2.19 | 2025-01-16 | [\#51595](https://github.com/airbytehq/airbyte/pull/51595) | Clarifications in connector config options |
| 0.2.18 | 2025-01-15 | [\#51042](https://github.com/airbytehq/airbyte/pull/51042) | Write structs as JSON strings instead of Iceberg structs. |
| 0.2.17 | 2025-01-14 | [\#51542](https://github.com/airbytehq/airbyte/pull/51542) | New identifier fields should be marked as required. |
Expand Down

0 comments on commit 0a0a33f

Please sign in to comment.