-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
destination-s3: add file transfer #46302
Merged
benmoriceau
merged 12 commits into
master
from
stephane/10-01-destination-s3_add_file_transfer
Oct 30, 2024
Merged
Changes from 3 commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
367e2f0
destination-s3: add file transfer
stephane-airbyte d8a3bb0
Log file copy info
benmoriceau 53a8182
Revert "Log file copy info"
benmoriceau 7ee9838
Force the stream to flush
benmoriceau 75486ef
Use local CDK
benmoriceau 415b532
Use FF and test
benmoriceau ca957df
Format
benmoriceau 2000516
Fix build
benmoriceau 07f7b66
PR comments
benmoriceau bac819b
Remove local CDK
benmoriceau 1df388d
Bump CDK version
benmoriceau 4d742a8
Bump s3 cdk version
benmoriceau File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
56 changes: 56 additions & 0 deletions
56
...in/kotlin/io/airbyte/cdk/integrations/destination/async/model/AirbyteRecordMessageFile.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
/* | ||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.cdk.integrations.destination.async.model | ||
|
||
import com.fasterxml.jackson.annotation.JsonProperty | ||
|
||
class AirbyteRecordMessageFile { | ||
constructor( | ||
fileUrl: String? = null, | ||
bytes: Long? = null, | ||
fileRelativePath: String? = null, | ||
modified: Long? = null, | ||
sourceFileUrl: String? = null | ||
) { | ||
this.fileUrl = fileUrl | ||
this.bytes = bytes | ||
this.fileRelativePath = fileRelativePath | ||
this.modified = modified | ||
this.sourceFileUrl = sourceFileUrl | ||
} | ||
constructor() : | ||
this( | ||
fileUrl = null, | ||
bytes = null, | ||
fileRelativePath = null, | ||
modified = null, | ||
sourceFileUrl = null | ||
) | ||
|
||
@get:JsonProperty("file_url") | ||
@set:JsonProperty("file_url") | ||
@JsonProperty("file_url") | ||
var fileUrl: String? = null | ||
|
||
@get:JsonProperty("bytes") | ||
@set:JsonProperty("bytes") | ||
@JsonProperty("bytes") | ||
var bytes: Long? = null | ||
|
||
@get:JsonProperty("file_relative_path") | ||
@set:JsonProperty("file_relative_path") | ||
@JsonProperty("file_relative_path") | ||
var fileRelativePath: String? = null | ||
|
||
@get:JsonProperty("modified") | ||
@set:JsonProperty("modified") | ||
@JsonProperty("modified") | ||
var modified: Long? = null | ||
|
||
@get:JsonProperty("source_file_url") | ||
@set:JsonProperty("source_file_url") | ||
@JsonProperty("source_file_url") | ||
var sourceFileUrl: String? = null | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2 changes: 1 addition & 1 deletion
2
airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
version=0.47.3 | ||
version=0.48.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -79,7 +79,8 @@ protected constructor( | |
s3Config, | ||
catalog, | ||
memoryRatio, | ||
nThreads | ||
nThreads, | ||
featureFlags | ||
) | ||
} | ||
|
||
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this accomplishing? Is this because other changes caused
optimalBytesToRead
to be zero?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is not 0 (it's 1) but one and yes it is allowing to add a record to the output disregard of the size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the problem here was if
bytesRead == 0 && memoryItem.size > optimalBytesToRead
, then we never add anything to the queue. So here, regardless ofmemoryItem.size
oroptimalBytesToRead
, if there's no item in the queue, we add the current one.For
fileTransfer
we setoptimalBytesToRead
to1
so that we force a flush for each message. but with such a small value, any message is bigger than the optimal size, which causes an infinite loop.Note that the infinite loop could also happen if
memoryItem.size()
was big enough andoptimalBytesToRead
was small enough. With our current settings, I don't believe it's possible, but it's just a couple of setting tweaks away...