Skip to content

Commit

Permalink
add finish to AsyncArrowWriter with test (#6543)
Browse files Browse the repository at this point in the history
  • Loading branch information
etseidl authored Oct 12, 2024
1 parent 04fa369 commit 9633f14
Showing 1 changed file with 34 additions and 1 deletion.
35 changes: 34 additions & 1 deletion parquet/src/arrow/async_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,11 @@ impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
/// Close and finalize the writer.
///
/// All the data in the inner buffer will be force flushed.
pub async fn close(mut self) -> Result<FileMetaData> {
///
/// Unlike [`Self::close`] this does not consume self
///
/// Attempting to write after calling finish will result in an error
pub async fn finish(&mut self) -> Result<FileMetaData> {
let metadata = self.sync_writer.finish()?;

// Force to flush the remaining data.
Expand All @@ -249,6 +253,13 @@ impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
Ok(metadata)
}

/// Close and finalize the writer.
///
/// All the data in the inner buffer will be force flushed.
pub async fn close(mut self) -> Result<FileMetaData> {
self.finish().await
}

/// Flush the data written by `sync_writer` into the `async_writer`
///
/// # Notes
Expand Down Expand Up @@ -385,6 +396,28 @@ mod tests {
}
}

#[tokio::test]
async fn test_async_writer_bytes_written() {
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();

let temp = tempfile::tempfile().unwrap();

let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
let mut writer =
AsyncArrowWriter::try_new(file.try_clone().await.unwrap(), to_write.schema(), None)
.unwrap();
writer.write(&to_write).await.unwrap();
let _metadata = writer.finish().await.unwrap();
// After `finish` this should include the metadata and footer
let reported = writer.bytes_written();

// Get actual size from file metadata
let actual = file.metadata().await.unwrap().len() as usize;

assert_eq!(reported, actual);
}

#[tokio::test]
async fn test_async_writer_file() {
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
Expand Down

0 comments on commit 9633f14

Please sign in to comment.