Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination S3 Data Lake: handle stream with no records #52564

Merged
merged 3 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2445,6 +2445,27 @@ abstract class BasicFunctionalityIntegrationTest(
)
}

@Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Thanks

open fun testNoData() {
val stream =
DestinationStream(
DestinationStream.Descriptor(randomizedNamespace, "test_stream"),
Append,
ObjectType(linkedMapOf("id" to intType)),
generationId = 0,
minimumGenerationId = 0,
syncId = 42,
)
assertDoesNotThrow { runSync(configContents, stream, messages = emptyList()) }
dumpAndDiffRecords(
parsedConfig,
canonicalExpectedRecords = emptyList(),
stream,
primaryKey = listOf(listOf("id")),
cursor = null,
)
}

private fun schematizedObject(
fullObject: LinkedHashMap<String, Any?>,
coercedObject: LinkedHashMap<String, Any?> = fullObject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ data:
alias: airbyte-connector-testing-secret-store
connectorType: destination
definitionId: 716ca874-520b-4902-9f80-9fad66754b89
dockerImageTag: 0.2.20
dockerImageTag: 0.2.21
dockerRepository: airbyte/destination-s3-data-lake
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-data-lake
githubIssueLabel: destination-s3-data-lake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import io.airbyte.cdk.load.write.DestinationWriter
import io.airbyte.cdk.load.write.StreamLoader
import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeTableWriterFactory
import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil
import io.github.oshai.kotlinlogging.KotlinLogging
import javax.inject.Singleton

private val logger = KotlinLogging.logger {}

@Singleton
class S3DataLakeWriter(
private val s3DataLakeTableWriterFactory: S3DataLakeTableWriterFactory,
Expand All @@ -35,6 +38,17 @@ class S3DataLakeWriter(

s3DataLakeTableSynchronizer.applySchemaChanges(table, incomingSchema)

try {
logger.info {
"maybe creating branch $DEFAULT_STAGING_BRANCH for stream ${stream.descriptor}"
}
table.manageSnapshots().createBranch(DEFAULT_STAGING_BRANCH).commit()
} catch (e: IllegalArgumentException) {
logger.info {
"branch $DEFAULT_STAGING_BRANCH already exists for stream ${stream.descriptor}"
}
}

return S3DataLakeStreamLoader(
stream = stream,
table = table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ internal class S3DataLakeWriterTest {
}
val catalog: Catalog = mockk()
val table: Table = mockk { every { schema() } returns icebergSchema }
every { table.manageSnapshots().createBranch(any()).commit() } just runs
val s3DataLakeUtil: S3DataLakeUtil = mockk {
every { createCatalog(any(), any()) } returns catalog
every { createTable(any(), any(), any(), any()) } returns table
Expand All @@ -148,7 +149,7 @@ internal class S3DataLakeWriterTest {
}

@Test
fun testCreateStreamLoaderWithMismatchedSchemas() {
fun testCreateStreamLoaderWithMismatchedSchemasAndAlreadyExistingStagingBranch() {
val streamDescriptor = DestinationStream.Descriptor(namespace = "namespace", name = "name")
val stream =
DestinationStream(
Expand Down Expand Up @@ -210,6 +211,8 @@ internal class S3DataLakeWriterTest {
every { updateSchema.setIdentifierFields(any<Collection<String>>()) } returns updateSchema
every { updateSchema.commit() } just runs
every { table.refresh() } just runs
every { table.manageSnapshots().createBranch(any()).commit() } throws
IllegalArgumentException("branch already exists")
val s3DataLakeUtil: S3DataLakeUtil = mockk {
every { createCatalog(any(), any()) } returns catalog
every { createTable(any(), any(), any(), any()) } returns table
Expand Down Expand Up @@ -348,6 +351,7 @@ internal class S3DataLakeWriterTest {
every { updateSchema.setIdentifierFields(primaryKeys) } returns updateSchema
every { updateSchema.commit() } just runs
every { table.refresh() } just runs
every { table.manageSnapshots().createBranch(any()).commit() } just runs
val s3DataLakeUtil: S3DataLakeUtil = mockk {
every { createCatalog(any(), any()) } returns catalog
every { createTable(any(), any(), any(), any()) } returns table
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/destinations/s3-data-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ for more information.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------|
| 0.2.20 | 2025-01-32 | [\#52068](https://github.com/airbytehq/airbyte/pull/52068) | Add support for default namespace (/database name) |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woops

| 0.2.21 | 2025-01-27 | [\#52564](https://github.com/airbytehq/airbyte/pull/52564) | Fix crash on stream with 0 records |
| 0.2.20 | 2025-01-23 | [\#52068](https://github.com/airbytehq/airbyte/pull/52068) | Add support for default namespace (/database name) |
| 0.2.19 | 2025-01-16 | [\#51595](https://github.com/airbytehq/airbyte/pull/51595) | Clarifications in connector config options |
| 0.2.18 | 2025-01-15 | [\#51042](https://github.com/airbytehq/airbyte/pull/51042) | Write structs as JSON strings instead of Iceberg structs. |
| 0.2.17 | 2025-01-14 | [\#51542](https://github.com/airbytehq/airbyte/pull/51542) | New identifier fields should be marked as required. |
Expand Down
Loading