Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async writer tweaks #3967

Merged
merged 2 commits into from
Mar 30, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 35 additions & 38 deletions parquet/src/arrow/async_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@
//! # }
//! ```

use std::{
io::Write,
sync::{Arc, Mutex},
};
use std::{io::Write, sync::Arc};

use crate::{
arrow::ArrowWriter,
Expand All @@ -80,45 +77,41 @@ pub struct AsyncArrowWriter<W> {

/// The inner buffer shared by the `sync_writer` and the `async_writer`
shared_buffer: SharedBuffer,

/// The threshold triggering buffer flush
buffer_flush_threshold: usize,
}

impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
/// Try to create a new Async Arrow Writer.
///
/// `buffer_flush_threshold` will be used to trigger flush of the inner buffer.
/// `buffer_size` determines the initial size of the intermediate buffer.
///
/// The intermediate buffer will automatically be resized if necessary
///
/// [`Self::write`] will flush this intermediate buffer if it is at least
/// half full
pub fn try_new(
writer: W,
arrow_schema: SchemaRef,
buffer_flush_threshold: usize,
buffer_size: usize,
props: Option<WriterProperties>,
) -> Result<Self> {
let shared_buffer = SharedBuffer::default();
let shared_buffer = SharedBuffer::new(buffer_size);
Copy link
Contributor Author

@tustvold tustvold Mar 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the major motivation for this PR, being able to avoid bump allocation where the Vec is repeatedly resized is important for performance

Copy link
Member

@ShiKaiWi ShiKaiWi Mar 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, in the #3957, buffer_flush_threshold is designed to be able to be usize::MAX in order to let the async writer not flush until all the encoding work is done. And for this reason, the buffer can't be pre-allocated at initialization.

And now I think it looks good here because of its efficiency, and it may be a fake feature to let the writer do flush only when all encoded bytes are ready. 😆

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fake feature to let the writer do flush only when all encoded bytes are read

Yeah, at that point you might as well just use the sync writer 😅

let sync_writer =
ArrowWriter::try_new(shared_buffer.clone(), arrow_schema, props)?;

Ok(Self {
sync_writer,
async_writer: writer,
shared_buffer,
buffer_flush_threshold,
})
}

/// Enqueues the provided `RecordBatch` to be written
///
/// After every sync write by the inner [ArrowWriter], the inner buffer will be
/// checked and flush if threshold is reached.
/// checked and flush if at least half full
pub async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.sync_writer.write(batch)?;
Self::try_flush(
&self.shared_buffer,
&mut self.async_writer,
self.buffer_flush_threshold,
)
.await
Self::try_flush(&mut self.shared_buffer, &mut self.async_writer, false).await
}

/// Append [`KeyValue`] metadata in addition to those in [`WriterProperties`]
Expand All @@ -135,64 +128,68 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
let metadata = self.sync_writer.close()?;

// Force to flush the remaining data.
Self::try_flush(&self.shared_buffer, &mut self.async_writer, 0).await?;
Self::try_flush(&mut self.shared_buffer, &mut self.async_writer, true).await?;

Ok(metadata)
}

/// Flush the data in the [`SharedBuffer`] into the `async_writer` if its size
/// exceeds the threshold.
async fn try_flush(
shared_buffer: &SharedBuffer,
shared_buffer: &mut SharedBuffer,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A mutable reference isn't technically required here, but acts as a lint that shared_buffer shouldn't be shared

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we actually just remove the Mutex entirely? Hold a Arc<SharedBuffer> and use Arc::get_mut to grab a mutable reference

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arc::get_mut only works if there are no other Arc references, which in this case wouldn't be the case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right the async writer would also need a reference. I suppose you could hold an Arc<Vec<u8>> in the the async writer and then have SharedBuffer hold a Weak<Vec<u8>>. Not sure that would end up pencilling out just to remove an uncontended mutex lock though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of try_lock here boils down to much the same thing - https://docs.rs/futures-util/0.3.27/src/futures_util/lock/mutex.rs.html#103

async_writer: &mut W,
threshold: usize,
force: bool,
) -> Result<()> {
let mut buffer = {
let mut buffer = shared_buffer.buffer.lock().unwrap();

if buffer.is_empty() || buffer.len() < threshold {
// no need to flush
return Ok(());
}
std::mem::take(&mut *buffer)
};
let mut buffer = shared_buffer.buffer.try_lock().unwrap();
if !force && buffer.len() < buffer.capacity() / 2 {
// no need to flush
return Ok(());
}

async_writer
.write(&buffer)
.write(buffer.as_slice())
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;

async_writer
.flush()
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;

// reuse the buffer.
buffer.clear();
*shared_buffer.buffer.lock().unwrap() = buffer;

Ok(())
}
}

/// A buffer with interior mutability shared by the [`ArrowWriter`] and
/// [`AsyncArrowWriter`].
#[derive(Clone, Default)]
#[derive(Clone)]
struct SharedBuffer {
/// The inner buffer for reading and writing
///
/// The lock is used to obtain internal mutability, so no worry about the
/// lock contention.
buffer: Arc<Mutex<Vec<u8>>>,
buffer: Arc<futures::lock::Mutex<Vec<u8>>>,
}

impl SharedBuffer {
pub fn new(capacity: usize) -> Self {
Self {
buffer: Arc::new(futures::lock::Mutex::new(Vec::with_capacity(capacity))),
}
}
}

impl Write for SharedBuffer {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let mut buffer = self.buffer.lock().unwrap();
let mut buffer = self.buffer.try_lock().unwrap();
Write::write(&mut *buffer, buf)
}

fn flush(&mut self) -> std::io::Result<()> {
let mut buffer = self.buffer.lock().unwrap();
let mut buffer = self.buffer.try_lock().unwrap();
Write::flush(&mut *buffer)
}
}
Expand Down Expand Up @@ -342,7 +339,7 @@ mod tests {
};

let test_buffer_flush_thresholds =
vec![0, 1024, 40 * 1024, 50 * 1024, 100 * 1024, usize::MAX];
vec![0, 1024, 40 * 1024, 50 * 1024, 100 * 1024];

for buffer_flush_threshold in test_buffer_flush_thresholds {
let reader = get_test_reader();
Expand All @@ -354,7 +351,7 @@ mod tests {
let mut async_writer = AsyncArrowWriter::try_new(
&mut test_async_sink,
reader.schema(),
buffer_flush_threshold,
buffer_flush_threshold * 2,
Some(write_props.clone()),
)
.unwrap();
Expand Down