-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
destination-s3: add file transfer #46302
destination-s3: add file transfer #46302
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
This stack of pull requests is managed by Graphite. Learn more about stacking. Join @stephane-airbyte and the rest of your teammates on Graphite |
5649507
to
0a94310
Compare
@@ -36,6 +36,11 @@ object UploadFormatConfigFactory { | |||
FileUploadFormat.PARQUET -> { | |||
UploadParquetFormatConfig(formatConfig) | |||
} | |||
FileUploadFormat.RAW_FILES -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh nice. This sidesteps any questions w/r/t conversion.
0a94310
to
7baeb75
Compare
6f87147
to
52c0fe1
Compare
7baeb75
to
2dd1a87
Compare
52c0fe1
to
95a7d03
Compare
2dd1a87
to
75cc841
Compare
95a7d03
to
721ddfa
Compare
75cc841
to
f0f2536
Compare
e2bb0c0
to
e1dd9ce
Compare
721ddfa
to
8a78c22
Compare
a371928
to
cd74813
Compare
8a78c22
to
48a9e9e
Compare
} | ||
val flushFunction = | ||
if (featureFlags.useFileTransfer()) { | ||
FileTransferDestinationFlushFunction( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What determines whether the feature flag is set? The fact that the source is flagged as a file source? Explicit opt-in at the sync level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair question.
Basically, we need the source configuration to have a specific parameter enabled (I don't know the details of the parameter) AND the destination needs to have supportsFileTransfer
set to true
in its metadata.yaml
. If those 2 conditions are true, then the 2 variables are set accordingly, a common volume is mounted on both containers, and it's expected that all records are file-based instead of record-based.
If the source config has the parameter set to true
and the destination doesn't support file transfer, the platform will throw an exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. The shim seems like it's in the best place, and the file flush function is straightforward. I didn't have enough time to go over the tests in detail, but high-level how we're adding the file option to the docker env is clear.
One question about the env variables just to help me plan for the new CDK, but that's my own curiosity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
had one question about the protocol
java.util.List.of( | ||
ConfiguredAirbyteStream() | ||
.withSyncMode(SyncMode.INCREMENTAL) | ||
.withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we just ignoring this sync mode? (.... are we expected to behave differently in overwrite/append mode?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@edgao
I spoke with Stephane about this. Plan is to add as follow-up. For now:
- Same file synced twice overwrites/updates the prior version written.
- No support for purging old files via reset.
This matches the business requirements as I understand them in this first iteration, so I think we are good for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice. so in particular - that means we can do a blind "write <file>
to <path>
", i.e. we don't need to check if the file already exists 🚛
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I think there's 2 roadblocks to being "better" about sync modes :
- sftp doesn't allow to see deletes. It'll only see the current state
- destination state would really allow us to know which files we saved without slow and expensive S3 calls (we could even store a hash in there, and add that to the file transfer protocol)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my part, this looks good to go when ready!
I'll merge this on monday morning HI time (so probably around 9AM PST) if everyone is OK with the timing (especially @johnny-schmidt and @edgao as they would have to deal with potential oncall issues). Please 👍 or 👎 this post to confirm timing of merge (I've added a 👎 and a 👍 so it's easier for everyone. Doesn't mean I'm against the timing I'm suggesting, obviously) |
This reverts commit d8a3bb0.
42d9f6d
to
53a8182
Compare
@edgao @johnny-schmidt I made the fix in |
@@ -4,7 +4,7 @@ plugins { | |||
} | |||
|
|||
airbyteJavaConnector { | |||
cdkVersionRequired = '0.46.1' | |||
cdkVersionRequired = '0.48.0' | |||
features = ['db-destinations', 's3-destinations'] | |||
useLocalCdk = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to change that once the cdk is publish.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johnny-schmidt in case you have thoughts - IMO (a) in general we don't care that much about the async framework, and (b) in particular I don't care enough to figure out why the existing queue size tracker stuff isn't working as expected
lgtm from my side, had a few style nitpicks
runningFlushWorkers, | ||
AtomicBoolean(false), | ||
flusher, | ||
true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use named parameters for primitive arguments
true | |
isFileTransfer = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -54,6 +54,7 @@ constructor( | |||
workerPool: ExecutorService = Executors.newFixedThreadPool(5), | |||
private val airbyteMessageDeserializer: AirbyteMessageDeserializer = | |||
AirbyteMessageDeserializer(), | |||
private val isFileTransfer: Boolean = false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to flushOnEveryMessage
(to reflect functionality rather than usage)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -63,7 +63,7 @@ class BufferDequeue( | |||
|
|||
// otherwise pull records until we hit the memory limit. | |||
val newSize: Long = (memoryItem.size) + bytesRead.get() | |||
if (newSize <= optimalBytesToRead) { | |||
if (newSize <= optimalBytesToRead || output.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this accomplishing? Is this because other changes caused optimalBytesToRead
to be zero?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is not 0 (it's 1) but one and yes it is allowing to add a record to the output disregard of the size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the problem here was if bytesRead == 0 && memoryItem.size > optimalBytesToRead
, then we never add anything to the queue. So here, regardless of memoryItem.size
or optimalBytesToRead
, if there's no item in the queue, we add the current one.
For fileTransfer
we set optimalBytesToRead
to 1
so that we force a flush for each message. but with such a small value, any message is bigger than the optimal size, which causes an infinite loop.
Note that the infinite loop could also happen if memoryItem.size()
was big enough and optimalBytesToRead
was small enough. With our current settings, I don't believe it's possible, but it's just a couple of setting tweaks away...
/publish-java-cdk
|
/publish-java-cdk
|
adding file transfer to destinaiton-s3
file transfer and record-based sync are exclusive. The platform will set the environment variables
USE_FILE_TRANSFER
totrue
andAIRBYTE_STAGING_DIRECTORY
to the mounting point of the staging directory when the destination supports file transfer and the source enabled it in its config.destination-s3 will check the
USE_FILE_TRANSFER
to decide whether to enable file transfer or record-based sync.Record-based integration tests are all passing, and there's an extra test that makes sure file-based transfer is disabled.