From 9633f14f8d9e5b9b8fdf3514b7acb77217b30584 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Sat, 12 Oct 2024 07:24:48 -0700 Subject: [PATCH] add finish to AsyncArrowWriter with test (#6543) --- parquet/src/arrow/async_writer/mod.rs | 35 ++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index 50bb5c0463e6..8155b57d9ac6 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -239,7 +239,11 @@ impl AsyncArrowWriter { /// Close and finalize the writer. /// /// All the data in the inner buffer will be force flushed. - pub async fn close(mut self) -> Result { + /// + /// Unlike [`Self::close`] this does not consume self + /// + /// Attempting to write after calling finish will result in an error + pub async fn finish(&mut self) -> Result { let metadata = self.sync_writer.finish()?; // Force to flush the remaining data. @@ -249,6 +253,13 @@ impl AsyncArrowWriter { Ok(metadata) } + /// Close and finalize the writer. + /// + /// All the data in the inner buffer will be force flushed. + pub async fn close(mut self) -> Result { + self.finish().await + } + /// Flush the data written by `sync_writer` into the `async_writer` /// /// # Notes @@ -385,6 +396,28 @@ mod tests { } } + #[tokio::test] + async fn test_async_writer_bytes_written() { + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; + let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); + + let temp = tempfile::tempfile().unwrap(); + + let file = tokio::fs::File::from_std(temp.try_clone().unwrap()); + let mut writer = + AsyncArrowWriter::try_new(file.try_clone().await.unwrap(), to_write.schema(), None) + .unwrap(); + writer.write(&to_write).await.unwrap(); + let _metadata = writer.finish().await.unwrap(); + // After `finish` this should include the metadata and footer + let reported = writer.bytes_written(); + + // Get actual size from file metadata + let actual = file.metadata().await.unwrap().len() as usize; + + assert_eq!(reported, actual); + } + #[tokio::test] async fn test_async_writer_file() { let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;