Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support to write Arrow IPC streams asynchronously (#577)
Browse files Browse the repository at this point in the history
* Added `async` writer of the Arrow stream.

* Added test.
  • Loading branch information
jorgecarleitao authored Nov 6, 2021
1 parent 83d828c commit c7a802a
Show file tree
Hide file tree
Showing 12 changed files with 287 additions and 68 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ full = [
"io_json",
"io_ipc",
"io_flight",
"io_ipc_write_async",
"io_ipc_compression",
"io_json_integration",
"io_print",
Expand All @@ -121,6 +122,7 @@ io_csv_read_async = ["csv-async", "lexical-core", "futures"]
io_csv_write = ["csv", "streaming-iterator", "lexical-core"]
io_json = ["serde", "serde_json", "indexmap"]
io_ipc = ["arrow-format"]
io_ipc_write_async = ["io_ipc", "futures"]
io_ipc_compression = ["lz4", "zstd"]
io_flight = ["io_ipc", "arrow-format/flight-data"]
io_parquet_compression = [
Expand Down Expand Up @@ -163,6 +165,7 @@ skip_feature_sets = [
["io_json"],
["io_flight"],
["io_ipc"],
["io_ipc_write_async"],
["io_parquet"],
["io_json_integration"],
# this does not change the public API
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ we also use the `0.x.y` versioning, since we are iterating over the API.

* Read and write of delta-encoded utf8 to and from parquet
* parquet roundtrip of all supported arrow types.
* Async writer of the Arrow stream format

## Features in pyarrow not in this crate

Expand Down
2 changes: 1 addition & 1 deletion src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub fn serialize_schema_to_info(schema: &Schema) -> Result<Vec<u8>> {
let encoded_data = schema_as_encoded_data(schema);

let mut schema = vec![];
write::common::write_message(&mut schema, encoded_data)?;
write::common_sync::write_message(&mut schema, encoded_data)?;
Ok(schema)
}

Expand Down
59 changes: 0 additions & 59 deletions src/io/ipc/write/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.
//! Common utilities used to write to Arrow's IPC format.
use std::io::Write;
use std::{collections::HashMap, sync::Arc};

use arrow_format::ipc;
Expand All @@ -29,7 +28,6 @@ use crate::io::ipc::endianess::is_native_little_endian;
use crate::record_batch::RecordBatch;
use crate::{array::DictionaryArray, datatypes::*};

use super::super::CONTINUATION_MARKER;
use super::{write, write_dictionary};

/// Compression codec
Expand Down Expand Up @@ -292,63 +290,6 @@ pub struct EncodedData {
pub arrow_data: Vec<u8>,
}

/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
pub fn write_message<W: Write>(writer: &mut W, encoded: EncodedData) -> Result<(usize, usize)> {
let arrow_data_len = encoded.arrow_data.len();
if arrow_data_len % 8 != 0 {
return Err(ArrowError::Ipc("Arrow data not aligned".to_string()));
}

let a = 8 - 1;
let buffer = encoded.ipc_message;
let flatbuf_size = buffer.len();
let prefix_size = 8;
let aligned_size = (flatbuf_size + prefix_size + a) & !a;
let padding_bytes = aligned_size - flatbuf_size - prefix_size;

write_continuation(writer, (aligned_size - prefix_size) as i32)?;

// write the flatbuf
if flatbuf_size > 0 {
writer.write_all(&buffer)?;
}
// write padding
writer.write_all(&vec![0; padding_bytes])?;

// write arrow data
let body_len = if arrow_data_len > 0 {
write_body_buffers(writer, &encoded.arrow_data)?
} else {
0
};

Ok((aligned_size, body_len))
}

fn write_body_buffers<W: Write>(mut writer: W, data: &[u8]) -> Result<usize> {
let len = data.len();
let pad_len = pad_to_8(data.len());
let total_len = len + pad_len;

// write body buffer
writer.write_all(data)?;
if pad_len > 0 {
writer.write_all(&vec![0u8; pad_len][..])?;
}

writer.flush()?;
Ok(total_len)
}

/// Write a record batch to the writer, writing the message size before the message
/// if the record batch is being written to a stream
pub fn write_continuation<W: Write>(writer: &mut W, total_len: i32) -> Result<usize> {
writer.write_all(&CONTINUATION_MARKER)?;
writer.write_all(&total_len.to_le_bytes()[..])?;
writer.flush()?;
Ok(8)
}

/// Calculate an 8-byte boundary and return the number of bytes needed to pad to 8 bytes
#[inline]
pub(crate) fn pad_to_8(len: usize) -> usize {
Expand Down
74 changes: 74 additions & 0 deletions src/io/ipc/write/common_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use futures::AsyncWrite;
use futures::AsyncWriteExt;

use crate::error::{ArrowError, Result};

use super::super::CONTINUATION_MARKER;
use super::common::pad_to_8;
use super::common::EncodedData;

/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
pub async fn write_message<W: AsyncWrite + Unpin + Send>(
writer: &mut W,
encoded: EncodedData,
) -> Result<(usize, usize)> {
let arrow_data_len = encoded.arrow_data.len();
if arrow_data_len % 8 != 0 {
return Err(ArrowError::Ipc("Arrow data not aligned".to_string()));
}

let a = 8 - 1;
let buffer = encoded.ipc_message;
let flatbuf_size = buffer.len();
let prefix_size = 8;
let aligned_size = (flatbuf_size + prefix_size + a) & !a;
let padding_bytes = aligned_size - flatbuf_size - prefix_size;

write_continuation(writer, (aligned_size - prefix_size) as i32).await?;

// write the flatbuf
if flatbuf_size > 0 {
writer.write_all(&buffer).await?;
}
// write padding
writer.write_all(&vec![0; padding_bytes]).await?;

// write arrow data
let body_len = if arrow_data_len > 0 {
write_body_buffers(writer, &encoded.arrow_data).await?
} else {
0
};

Ok((aligned_size, body_len))
}

/// Write a record batch to the writer, writing the message size before the message
/// if the record batch is being written to a stream
pub async fn write_continuation<W: AsyncWrite + Unpin + Send>(
writer: &mut W,
total_len: i32,
) -> Result<usize> {
writer.write_all(&CONTINUATION_MARKER).await?;
writer.write_all(&total_len.to_le_bytes()[..]).await?;
writer.flush().await?;
Ok(8)
}

async fn write_body_buffers<W: AsyncWrite + Unpin + Send>(
mut writer: W,
data: &[u8],
) -> Result<usize> {
let len = data.len();
let pad_len = pad_to_8(data.len());
let total_len = len + pad_len;

// write body buffer
writer.write_all(data).await?;
if pad_len > 0 {
writer.write_all(&vec![0u8; pad_len][..]).await?;
}

writer.flush().await?;
Ok(total_len)
}
64 changes: 64 additions & 0 deletions src/io/ipc/write/common_sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use std::io::Write;

use crate::error::{ArrowError, Result};

use super::super::CONTINUATION_MARKER;
use super::common::pad_to_8;
use super::common::EncodedData;

/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
pub fn write_message<W: Write>(writer: &mut W, encoded: EncodedData) -> Result<(usize, usize)> {
let arrow_data_len = encoded.arrow_data.len();
if arrow_data_len % 8 != 0 {
return Err(ArrowError::Ipc("Arrow data not aligned".to_string()));
}

let a = 8 - 1;
let buffer = encoded.ipc_message;
let flatbuf_size = buffer.len();
let prefix_size = 8;
let aligned_size = (flatbuf_size + prefix_size + a) & !a;
let padding_bytes = aligned_size - flatbuf_size - prefix_size;

write_continuation(writer, (aligned_size - prefix_size) as i32)?;

// write the flatbuf
if flatbuf_size > 0 {
writer.write_all(&buffer)?;
}
// write padding
writer.write_all(&vec![0; padding_bytes])?;

// write arrow data
let body_len = if arrow_data_len > 0 {
write_body_buffers(writer, &encoded.arrow_data)?
} else {
0
};

Ok((aligned_size, body_len))
}

fn write_body_buffers<W: Write>(mut writer: W, data: &[u8]) -> Result<usize> {
let len = data.len();
let pad_len = pad_to_8(data.len());
let total_len = len + pad_len;

// write body buffer
writer.write_all(data)?;
if pad_len > 0 {
writer.write_all(&vec![0u8; pad_len][..])?;
}

writer.flush()?;
Ok(total_len)
}

/// Write a record batch to the writer, writing the message size before the message
/// if the record batch is being written to a stream
pub fn write_continuation<W: Write>(writer: &mut W, total_len: i32) -> Result<usize> {
writer.write_all(&CONTINUATION_MARKER)?;
writer.write_all(&total_len.to_le_bytes()[..])?;
writer.flush()?;
Ok(8)
}
10 changes: 9 additions & 1 deletion src/io/ipc/write/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! APIs to write to Arrow's IPC format.
pub mod common;
pub(crate) mod common;
mod schema;
mod serialize;
mod stream;
Expand All @@ -10,3 +10,11 @@ pub use schema::schema_to_bytes;
pub use serialize::{write, write_dictionary};
pub use stream::StreamWriter;
pub use writer::FileWriter;

pub(crate) mod common_sync;

#[cfg(feature = "io_ipc_write_async")]
mod common_async;
#[cfg(feature = "io_ipc_write_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_write_async")))]
pub mod stream_async;
5 changes: 2 additions & 3 deletions src/io/ipc/write/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
use std::io::Write;

use super::common::{
encoded_batch, write_continuation, write_message, DictionaryTracker, EncodedData, WriteOptions,
};
use super::common::{encoded_batch, DictionaryTracker, EncodedData, WriteOptions};
use super::common_sync::{write_continuation, write_message};
use super::schema_to_bytes;

use crate::datatypes::*;
Expand Down
77 changes: 77 additions & 0 deletions src/io/ipc/write/stream_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//! `async` writing of arrow streams
use futures::AsyncWrite;

pub use super::common::WriteOptions;
use super::common::{encoded_batch, DictionaryTracker, EncodedData};
use super::common_async::{write_continuation, write_message};
use super::schema_to_bytes;

use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;

/// An `async` writer to the Apache Arrow stream format.
pub struct StreamWriter<W: AsyncWrite + Unpin + Send> {
/// The object to write to
writer: W,
/// IPC write options
write_options: WriteOptions,
/// Whether the writer footer has been written, and the writer is finished
finished: bool,
/// Keeps track of dictionaries that have been written
dictionary_tracker: DictionaryTracker,
}

impl<W: AsyncWrite + Unpin + Send> StreamWriter<W> {
/// Creates a new [`StreamWriter`]
pub fn new(writer: W, write_options: WriteOptions) -> Self {
Self {
writer,
write_options,
finished: false,
dictionary_tracker: DictionaryTracker::new(false),
}
}

/// Starts the stream
pub async fn start(&mut self, schema: &Schema) -> Result<()> {
let encoded_message = EncodedData {
ipc_message: schema_to_bytes(schema),
arrow_data: vec![],
};
write_message(&mut self.writer, encoded_message).await?;
Ok(())
}

/// Writes a [`RecordBatch`] to the stream
pub async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
if self.finished {
return Err(ArrowError::Ipc(
"Cannot write record batch to stream writer as it is closed".to_string(),
));
}

// todo: move this out of the `async` since this is blocking.
let (encoded_dictionaries, encoded_message) =
encoded_batch(batch, &mut self.dictionary_tracker, &self.write_options)?;

for encoded_dictionary in encoded_dictionaries {
write_message(&mut self.writer, encoded_dictionary).await?;
}

write_message(&mut self.writer, encoded_message).await?;
Ok(())
}

/// Finishes the stream
pub async fn finish(&mut self) -> Result<()> {
write_continuation(&mut self.writer, 0).await?;
self.finished = true;
Ok(())
}

/// Consumes itself, returning the inner writer.
pub fn into_inner(self) -> W {
self.writer
}
}
6 changes: 2 additions & 4 deletions src/io/ipc/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ use arrow_format::ipc::flatbuffers::FlatBufferBuilder;
use super::super::ARROW_MAGIC;
use super::{
super::convert,
common::{
encoded_batch, write_continuation, write_message, DictionaryTracker, EncodedData,
WriteOptions,
},
common::{encoded_batch, DictionaryTracker, EncodedData, WriteOptions},
common_sync::{write_continuation, write_message},
schema_to_bytes,
};

Expand Down
3 changes: 3 additions & 0 deletions tests/it/io/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ mod read;
mod write;

pub use common::read_gzip_json;

#[cfg(feature = "io_ipc_write_async")]
mod write_async;
Loading

0 comments on commit c7a802a

Please sign in to comment.