-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Commits and parquet file sink #197
Conversation
5941084
to
cdb52e6
Compare
arroyo-connectors/src/parquet.rs
Outdated
connection_type: ConnectionType::Sink, | ||
schema: schema | ||
.map(|s| s.to_owned()) | ||
.ok_or_else(|| anyhow!("No schema defined for SSE source"))?, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SSE => Parquet
data_recovery: Vec<Self::DataRecovery>, | ||
) -> Result<()>; | ||
async fn insert_record(&mut self, record: &Record<K, T>) -> Result<()>; | ||
// TODO: figure out how to have the relevant vectors be of pointers across async boundaries. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
they can be pointers across async bounderies so long as they are &mut
(because &T
is send only if T
is sync, but &mut T
is send if T
is send).
|
||
async fn on_close(&mut self, ctx: &mut crate::engine::Context<(), ()>) { | ||
info!("waiting for commit message"); | ||
if let Some(ControlMessage::Commit { epoch }) = ctx.control_rx.recv().await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic assumes that the next control message on the queue is a commit message -- is that guaranteed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is except for if the source data ends, which right now is only for test sources.
85184df
to
8c147ce
Compare
_ |
…rquet, refactor writing code.
57a5eef
to
eefcbe4
Compare
@@ -283,6 +283,26 @@ export function FormInner({ | |||
</Stack> | |||
</fieldset> | |||
); | |||
} else if (values[key] > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be (values[key].properties?.length || 0) > 0
-- values[key] is an object and can't be (meaningfully) compared to a number
This adds a parquet sink and introduces a committing phase during checkpointing. This allows for the support of exactly once sinks, the first of which is implemented to support writing Parquet to S3. Parquet on S3 allows for the production of typed parquet files to an S3 directory. The files are created using S3's multipart upload API, and we support the creation of parquet files across multiple checkpoints.
When restoring from checkpoints the subtask with id 0 will be responsible for finishing any files that were previously in-flight, as well as committing any files that were ready to be committed.
Right now we assume idempotency for the completion of multipart uploads, which seems to work. According to the S3 Glacier documentation, "Complete Multipart Upload is an idempotent operation. After your first successful complete multipart upload, if you call the operation again within a short period, the operation will succeed and return the same archive ID." However, the normal S3 docs say that trying to complete an already finished upload might return a 404: "Description: The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed."
I've manually tested the behavior under a variety of cases, including stopping it through the UI, uncontrolled stopping, and failing to log commits. In all cases it was able to resume writes without losing or duplicating data.