Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk Load CDK: Nop Refactor: Formatted object writes to toolkit #47382

Merged
merged 1 commit into from
Oct 28, 2024

Conversation

johnny-schmidt
Copy link
Contributor

@johnny-schmidt johnny-schmidt commented Oct 25, 2024

What

Move the formatting writers into the toolkit, so s3v2 can just inject the factory

Summary by CodeRabbit

  • New Features
    • Introduced a new interface and classes for writing formatted data to object storage in JSON, CSV, Avro, and Parquet formats.
    • Enhanced CSV writing functionality to accept OutputStream, improving flexibility.
  • Improvements
    • Simplified data formatting and writing processes in the S3 V2 connector by centralizing logic through a factory approach.
  • Version Updates
    • Updated the Docker image tag for the S3 V2 Destination connector from 0.1.12 to 0.1.13.

@johnny-schmidt johnny-schmidt requested a review from a team as a code owner October 25, 2024 20:11
Copy link

vercel bot commented Oct 25, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Skipped Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Oct 28, 2024 10:14pm

@johnny-schmidt johnny-schmidt requested a review from edgao October 25, 2024 20:11
@octavia-squidington-iii octavia-squidington-iii added area/connectors Connector related issues CDK Connector Development Kit labels Oct 25, 2024
@johnny-schmidt
Copy link
Contributor Author

@coderabbitai full review

Copy link

coderabbitai bot commented Oct 25, 2024

Walkthrough

The changes involve modifications to the CSV writing functionality, the introduction of a new object storage formatting writer interface and its implementations, and updates to the S3 V2 connector's metadata and writing logic. The CSVWriter.kt file's toCsvPrinterWithHeader function now accepts an OutputStream instead of a Writer. A new file, ObjectStorageFormattingWriter.kt, defines an interface and various classes for writing records in different formats. The S3V2Writer.kt has been refactored to utilize the new formatting writer factory, simplifying its record processing logic.

Changes

File Change Summary
airbyte-cdk/bulk/toolkits/load-csv/src/main/kotlin/.../CSVWriter.kt Updated toCsvPrinterWithHeader method to accept OutputStream instead of Writer, converting it to a Writer using UTF-8 encoding for CSVPrinter.
airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/.../ObjectStorageFormattingWriter.kt Introduced ObjectStorageFormattingWriter interface and classes for various formats (JSON, CSV, Avro, Parquet). Added ObjectStorageFormattingWriterFactory for instantiation.
airbyte-integrations/connectors/destination-s3-v2/metadata.yaml Updated dockerImageTag from 0.1.12 to 0.1.13.
airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/.../S3V2Writer.kt Refactored to use ObjectStorageFormattingWriterFactory, removing specific format configurations and simplifying the processRecords method. Removed avroSchema handling.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant S3V2Writer
    participant ObjectStorageFormattingWriterFactory
    participant CSVFormattingWriter

    Client->>S3V2Writer: processRecords(records)
    S3V2Writer->>ObjectStorageFormattingWriterFactory: create(stream, outputStream)
    ObjectStorageFormattingWriterFactory->>CSVFormattingWriter: instantiate writer
    CSVFormattingWriter->>S3V2Writer: write records
    S3V2Writer->>Client: response
Loading

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between c2368fb and cc0dffe.

📒 Files selected for processing (4)
  • airbyte-cdk/bulk/toolkits/load-csv/src/main/kotlin/io/airbyte/cdk/load/file/csv/CSVWriter.kt (1 hunks)
  • airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt (1 hunks)
  • airbyte-integrations/connectors/destination-s3-v2/metadata.yaml (1 hunks)
  • airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt (3 hunks)
🔇 Additional comments (5)
airbyte-cdk/bulk/toolkits/load-csv/src/main/kotlin/io/airbyte/cdk/load/file/csv/CSVWriter.kt (1)

13-18: LGTM! Good improvement to the API design.

The change from Writer to OutputStream is a good choice as it:

  • Provides more flexibility in handling different types of outputs
  • Explicitly handles character encoding using UTF-8
  • Aligns well with the toolkit's architecture

Let's verify the impact of auto-flush on performance with high-volume writes:

✅ Verification successful

Based on the search results, I can now provide a final response since we have sufficient context:

Auto-flush setting is appropriate for this use case

The auto-flush setting is correctly enabled as it aligns with the toolkit's purpose of writing CSV data in a streaming fashion. While auto-flush can impact performance in high-volume scenarios, in this context:

  1. The code is part of a bulk loading toolkit where data reliability is prioritized
  2. No performance issues or complaints were found in the test files
  3. The implementation is used by multiple destination connectors (S3, GCS, Snowflake, etc.) without reported performance problems

The auto-flush setting helps ensure data is written reliably, which is especially important when dealing with potentially large CSV files in a data pipeline context.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for any performance-related tests or benchmarks
rg -g "**/*Test*.kt" -g "**/*Benchmark*.kt" "CSVWriter" -A 5

Length of output: 60


Script:

#!/bin/bash
# Let's try a broader search for CSV-related tests and benchmarks
rg -g "**/*Test*.kt" -g "**/*Benchmark*.kt" "CSV" -A 5

# Also search for any usage of CSVFormat with setAutoFlush
rg "setAutoFlush" -A 2

# Look for any CSV-related performance tests
rg -g "**/*Test*.kt" -g "**/*Benchmark*.kt" "performance.*CSV|CSV.*performance" -i

Length of output: 59961

airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt (2)

9-9: Approved: Correct import of ObjectStorageFormattingWriterFactory

The import statement correctly includes the necessary dependency for the ObjectStorageFormattingWriterFactory.


24-24: Approved: Addition of writerFactory to constructor

Including writerFactory in the constructor aligns with the refactoring goals and promotes better abstraction of formatting logic.

airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt (2)

29-31: Well-defined interface for formatting writers.

The ObjectStorageFormattingWriter interface clearly defines the contract for formatting writers and extends Closeable for proper resource management.


79-91: Verify that the CSV printer handles outputStream closure appropriately.

In CSVFormattingWriter, the close() method calls printer.close(). Ensure that this action also closes the underlying outputStream. If not, consider explicitly closing the outputStream to prevent resource leaks.

Run the following script to check if printer.close() closes the outputStream:

✅ Verification successful

Based on the search results, I can now provide a definitive response. The code shows that the CSVPrinter is created using Apache Commons CSV library, and the printer is created with:

.print(outputStream.writer(charset = Charsets.UTF_8))

This means the CSVPrinter wraps the writer created from the outputStream, and when printer.close() is called, it will close both the writer and the underlying outputStream according to Apache Commons CSV's implementation. This is the standard behavior of the library where closing a wrapper also closes the wrapped resource.

The current implementation correctly handles resource cleanup.

The close() method in CSVFormattingWriter properly closes both the CSVPrinter and its underlying OutputStream through a single printer.close() call.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify if CsvPrinter's close method closes the underlying OutputStream.

# Test: Search the CsvPrinter implementation for close behavior. Expect: OutputStream is closed.
rg -A 5 'class.*CsvPrinter' | rg 'close\(\)' -A 3

Length of output: 51


Script:

#!/bin/bash
# Let's try to find the CsvPrinter implementation and its close method using different approaches

# Search for files that might contain CsvPrinter implementation
fd -e kt -e java "CsvPrinter|CSVPrinter"

# Search for toCsvPrinterWithHeader implementation to understand the printer creation
rg -A 10 "toCsvPrinterWithHeader"

# Search for any CSV-related printer implementations
rg -A 10 "class.*(?i)csvprinter"

# Look for schema extension that creates the CSV printer
ast-grep --pattern 'fun $_.toCsvPrinterWithHeader($_) {
  $$$
}'

Length of output: 4750


Script:

#!/bin/bash
# Let's look at the CSVFormat and CSVPrinter implementation from Apache Commons CSV
# which seems to be used based on the CSVFormat.Builder usage

# Search for the CSV format and printer imports
rg "import.*commons\.csv"

# Look for any CSV-related printer creation and closing
rg -A 5 "CSVPrinter|CSVFormat" 

# Look specifically at the print method implementation
rg -A 5 "print\(.*writer\("

Length of output: 81534

@@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: d6116991-e809-4c7c-ae09-c64712df5b66
dockerImageTag: 0.1.12
dockerImageTag: 0.1.13
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Missing changelog for version bump

The version has been bumped from 0.1.12 to 0.1.13, but there is no CHANGELOG.md file in the destination-s3-v2 connector directory to document these changes. A changelog entry should be added to track:

  • The internal refactoring of formatting writers
  • Moving formatting writers to toolkit
  • Changes to S3V2Writer.kt
🔗 Analysis chain

LGTM! Version bump is appropriate.

The increment from 0.1.12 to 0.1.13 aligns with the internal refactoring changes to move formatting writers into the toolkit.

Let's verify the changelog entry exists for this version:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if a changelog entry exists for version 0.1.13
# Expected: Find an entry documenting the formatting writers refactoring

rg -A 5 "0\.1\.13" airbyte-integrations/connectors/destination-s3-v2/CHANGELOG.md || echo "No changelog entry found"

Length of output: 245


Script:

#!/bin/bash
# Let's verify if the changelog exists in a different location or with a different name
fd -g "*CHANGELOG*" -g "!node_modules" airbyte-integrations/connectors/destination-s3-v2/

Length of output: 151


Script:

#!/bin/bash
# Let's try a different approach to find any changelog files
fd CHANGELOG --type f

Length of output: 310

Comment on lines +53 to +54
writerFactory.create(stream, outputStream).use { writer ->
records.forEach { writer.accept(it) }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle exceptions during record processing to enhance robustness

While the use block ensures that the writer resource is properly closed, exceptions during writer.accept(it) could lead to incomplete data processing or unexpected behavior. Consider adding exception handling to manage potential errors during record processing.

Apply this diff to wrap the record processing in a try-catch block:

 writerFactory.create(stream, outputStream).use { writer ->
+    try {
         records.forEach { writer.accept(it) }
+    } catch (e: Exception) {
+        // Handle the exception appropriately, e.g., log and rethrow
+        throw e
+    }
 }

This addition ensures that any exceptions are caught and can be handled or logged, preventing silent failures and improving the reliability of the data writing process.

Committable suggestion was skipped due to low confidence.

Comment on lines +93 to +127
class AvroFormattingWriter(
stream: DestinationStream,
outputStream: OutputStream,
formatConfig: AvroFormatConfiguration,
private val recordDecorator: DestinationRecordToAirbyteValueWithMeta
) : ObjectStorageFormattingWriter {
private val avroSchema = stream.schemaWithMeta.toAvroSchema(stream.descriptor)
private val writer =
outputStream.toAvroWriter(avroSchema, formatConfig.avroCompressionConfiguration)
override fun accept(record: DestinationRecord) {
writer.write(recordDecorator.decorate(record).toAvroRecord(avroSchema))
}

override fun close() {
writer.close()
}
}

class ParquetFormattingWriter(
stream: DestinationStream,
outputStream: OutputStream,
formatConfig: ParquetFormatConfiguration,
private val recordDecorator: DestinationRecordToAirbyteValueWithMeta
) : ObjectStorageFormattingWriter {
private val avroSchema = stream.schemaWithMeta.toAvroSchema(stream.descriptor)
private val writer =
outputStream.toParquetWriter(avroSchema, formatConfig.parquetWriterConfiguration)
override fun accept(record: DestinationRecord) {
writer.write(recordDecorator.decorate(record).toAvroRecord(avroSchema))
}

override fun close() {
writer.close()
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor to reduce duplication between AvroFormattingWriter and ParquetFormattingWriter.

Both AvroFormattingWriter and ParquetFormattingWriter share similar code for schema creation and record writing. To enhance maintainability and reduce duplication, consider creating a common abstract base class or utilizing composition to share the common functionality.

Here's an example of how you might abstract the common code:

abstract class AvroBasedFormattingWriter(
    stream: DestinationStream,
    outputStream: OutputStream,
    formatConfig: Any,
    private val recordDecorator: DestinationRecordToAirbyteValueWithMeta
) : ObjectStorageFormattingWriter {
    protected val avroSchema = stream.schemaWithMeta.toAvroSchema(stream.descriptor)
    protected val writer: SpecificDataWriter = createWriter(outputStream, formatConfig)

    abstract fun createWriter(outputStream: OutputStream, formatConfig: Any): SpecificDataWriter

    override fun accept(record: DestinationRecord) {
        writer.write(recordDecorator.decorate(record).toAvroRecord(avroSchema))
    }

    override fun close() {
        writer.close()
    }
}

Then, AvroFormattingWriter and ParquetFormattingWriter can extend AvroBasedFormattingWriter and implement the createWriter method accordingly.

Comment on lines 74 to 76
override fun close() {
// Do nothing
}
Copy link

@coderabbitai coderabbitai bot Oct 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Consider closing or flushing the outputStream in JsonFormattingWriter.

The close() method in JsonFormattingWriter is currently empty. If the outputStream is not closed or flushed elsewhere, data may not be properly written to the destination. Consider closing or flushing the outputStream in the close() method to ensure all data is correctly persisted.

Apply this diff to close the outputStream in the close() method:

     override fun close() {
-        // Do nothing
+        outputStream.close()
     }

Committable suggestion was skipped due to low confidence.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coderabbit is wrong (I think), but it's probably best to add a comment explaining that these writers explicitly should (must?) not close the outputstream (b/c S3MultipartUpload does anUploadStream().use {})

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edgao, thank you for the clarification. You are correct; the JsonFormattingWriter (and other formatting writers) should not close the outputStream because the S3MultipartUpload manages the stream with UploadStream().use { }. Closing the stream within the writer could interfere with the multipart upload process. Adding a comment to explain this behavior is a good idea to prevent confusion in the future.

(_/)
(•_•)
( )> Noted and learned!


✏️ Learnings added
Learnt from: edgao
PR: airbytehq/airbyte#47382
File: airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt:74-76
Timestamp: 2024-10-25T21:16:10.443Z
Learning: In the `ObjectStorageFormattingWriter` implementations, the `outputStream` should not be closed in the `close()` method because the `S3MultipartUpload` manages the stream using `UploadStream().use { }`. Closing it within the writer could interfere with the upload process. A comment should be added to explain this behavior.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The contract on Closeable::close is that it conflates. Multiple calls should be ignored. Technically something like this

inputStream.use { is ->
myWriter(is).use { mw ->
  mw.stuff()
}
is.stuff()
}

will break for writers like avro/parquet that close the underlying stream, but there's no risk of that given the way we use it here. (streamingUpload behaves like delegation is possible, but not required. I'll add comments to that effect.)

avro/parquet close because they are file writers: they're writing headers and closing data. The Json writer is just writing lines, so maybe technically shouldn't close? Though maybe it should close just to match the behavior of the other writers?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went ahead and made it close for parity. I also added comments to the streamingUpload interface.

Comment on lines +43 to +62
return when (formatConfigProvider.objectStorageFormatConfiguration) {
is JsonFormatConfiguration -> JsonFormattingWriter(outputStream, recordDecorator)
is AvroFormatConfiguration ->
AvroFormattingWriter(
stream,
outputStream,
formatConfigProvider.objectStorageFormatConfiguration
as AvroFormatConfiguration,
recordDecorator
)
is ParquetFormatConfiguration ->
ParquetFormattingWriter(
stream,
outputStream,
formatConfigProvider.objectStorageFormatConfiguration
as ParquetFormatConfiguration,
recordDecorator
)
is CSVFormatConfiguration -> CSVFormattingWriter(stream, outputStream, recordDecorator)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure exhaustive when expression to handle all format configurations.

The when expression in the create function does not include an else branch. If objectStorageFormatConfiguration is of an unexpected type, the code may throw an exception at runtime. Consider adding an else branch to handle unforeseen formats and provide a meaningful error message.

Apply this diff to handle unexpected formats:

         return when (formatConfigProvider.objectStorageFormatConfiguration) {
             is JsonFormatConfiguration -> JsonFormattingWriter(outputStream, recordDecorator)
             is AvroFormatConfiguration ->
                 AvroFormattingWriter(
                     stream,
                     outputStream,
                     formatConfigProvider.objectStorageFormatConfiguration
                         as AvroFormatConfiguration,
                     recordDecorator
                 )
             is ParquetFormatConfiguration ->
                 ParquetFormattingWriter(
                     stream,
                     outputStream,
                     formatConfigProvider.objectStorageFormatConfiguration
                         as ParquetFormatConfiguration,
                     recordDecorator
                 )
             is CSVFormatConfiguration -> CSVFormattingWriter(stream, outputStream, recordDecorator)
+            else -> throw IllegalArgumentException("Unsupported format configuration: ${formatConfigProvider.objectStorageFormatConfiguration::class.simpleName}")
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return when (formatConfigProvider.objectStorageFormatConfiguration) {
is JsonFormatConfiguration -> JsonFormattingWriter(outputStream, recordDecorator)
is AvroFormatConfiguration ->
AvroFormattingWriter(
stream,
outputStream,
formatConfigProvider.objectStorageFormatConfiguration
as AvroFormatConfiguration,
recordDecorator
)
is ParquetFormatConfiguration ->
ParquetFormattingWriter(
stream,
outputStream,
formatConfigProvider.objectStorageFormatConfiguration
as ParquetFormatConfiguration,
recordDecorator
)
is CSVFormatConfiguration -> CSVFormattingWriter(stream, outputStream, recordDecorator)
}
return when (formatConfigProvider.objectStorageFormatConfiguration) {
is JsonFormatConfiguration -> JsonFormattingWriter(outputStream, recordDecorator)
is AvroFormatConfiguration ->
AvroFormattingWriter(
stream,
outputStream,
formatConfigProvider.objectStorageFormatConfiguration
as AvroFormatConfiguration,
recordDecorator
)
is ParquetFormatConfiguration ->
ParquetFormattingWriter(
stream,
outputStream,
formatConfigProvider.objectStorageFormatConfiguration
as ParquetFormatConfiguration,
recordDecorator
)
is CSVFormatConfiguration -> CSVFormattingWriter(stream, outputStream, recordDecorator)
else -> throw IllegalArgumentException("Unsupported format configuration: ${formatConfigProvider.objectStorageFormatConfiguration::class.simpleName}")
}

Copy link
Contributor

@edgao edgao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

.setHeader(*toCsvHeader())
.setAutoFlush(true)
.build()
.print(outputStream.writer(charset = Charsets.UTF_8))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious: was the default charset breaking something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that I know of, but have we even migrated the tests with special strings yet? Regardless, I'm erring on the side of being explicit and also following what we do in the old CDK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/airbytehq/airbyte/pull/46954/files#diff-d2107d506de1477d1d0df842ecf42a54dda1290922cb348991816aa14598222aR420 went in last week

(though I think we're almost at the point where you can copy the old DATs to s2-v2? which would give us equivalent coverage with the old connector, plus the extra stuff in the new test suite)

Comment on lines 74 to 76
override fun close() {
// Do nothing
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coderabbit is wrong (I think), but it's probably best to add a comment explaining that these writers explicitly should (must?) not close the outputstream (b/c S3MultipartUpload does anUploadStream().use {})

@johnny-schmidt johnny-schmidt force-pushed the jschmidt/s3v2/issue-9971/union-to-disjoint-record branch from c2368fb to 23adde0 Compare October 28, 2024 21:53
Base automatically changed from jschmidt/s3v2/issue-9971/union-to-disjoint-record to master October 28, 2024 22:03
@johnny-schmidt johnny-schmidt force-pushed the jschmidt/s3v2/issue-9975/prefactor branch from cc0dffe to 6e9a054 Compare October 28, 2024 22:14
@johnny-schmidt johnny-schmidt enabled auto-merge (squash) October 28, 2024 22:15
@johnny-schmidt johnny-schmidt merged commit 9106d24 into master Oct 28, 2024
39 checks passed
@johnny-schmidt johnny-schmidt deleted the jschmidt/s3v2/issue-9975/prefactor branch October 28, 2024 22:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues CDK Connector Development Kit connectors/destination/s3-v2
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants