Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to write Arrow IPC streams asynchronously #577

Merged
merged 2 commits into from
Nov 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ full = [
"io_json",
"io_ipc",
"io_flight",
"io_ipc_write_async",
"io_ipc_compression",
"io_json_integration",
"io_print",
Expand All @@ -121,6 +122,7 @@ io_csv_read_async = ["csv-async", "lexical-core", "futures"]
io_csv_write = ["csv", "streaming-iterator", "lexical-core"]
io_json = ["serde", "serde_json", "indexmap"]
io_ipc = ["arrow-format"]
io_ipc_write_async = ["io_ipc", "futures"]
io_ipc_compression = ["lz4", "zstd"]
io_flight = ["io_ipc", "arrow-format/flight-data"]
io_parquet_compression = [
Expand Down Expand Up @@ -163,6 +165,7 @@ skip_feature_sets = [
["io_json"],
["io_flight"],
["io_ipc"],
["io_ipc_write_async"],
["io_parquet"],
["io_json_integration"],
# this does not change the public API
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ we also use the `0.x.y` versioning, since we are iterating over the API.

* Read and write of delta-encoded utf8 to and from parquet
* parquet roundtrip of all supported arrow types.
* Async writer of the Arrow stream format

## Features in pyarrow not in this crate

Expand Down
2 changes: 1 addition & 1 deletion src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub fn serialize_schema_to_info(schema: &Schema) -> Result<Vec<u8>> {
let encoded_data = schema_as_encoded_data(schema);

let mut schema = vec![];
write::common::write_message(&mut schema, encoded_data)?;
write::common_sync::write_message(&mut schema, encoded_data)?;
Ok(schema)
}

Expand Down
59 changes: 0 additions & 59 deletions src/io/ipc/write/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.
//! Common utilities used to write to Arrow's IPC format.

use std::io::Write;
use std::{collections::HashMap, sync::Arc};

use arrow_format::ipc;
Expand All @@ -29,7 +28,6 @@ use crate::io::ipc::endianess::is_native_little_endian;
use crate::record_batch::RecordBatch;
use crate::{array::DictionaryArray, datatypes::*};

use super::super::CONTINUATION_MARKER;
use super::{write, write_dictionary};

/// Compression codec
Expand Down Expand Up @@ -292,63 +290,6 @@ pub struct EncodedData {
pub arrow_data: Vec<u8>,
}

/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
pub fn write_message<W: Write>(writer: &mut W, encoded: EncodedData) -> Result<(usize, usize)> {
let arrow_data_len = encoded.arrow_data.len();
if arrow_data_len % 8 != 0 {
return Err(ArrowError::Ipc("Arrow data not aligned".to_string()));
}

let a = 8 - 1;
let buffer = encoded.ipc_message;
let flatbuf_size = buffer.len();
let prefix_size = 8;
let aligned_size = (flatbuf_size + prefix_size + a) & !a;
let padding_bytes = aligned_size - flatbuf_size - prefix_size;

write_continuation(writer, (aligned_size - prefix_size) as i32)?;

// write the flatbuf
if flatbuf_size > 0 {
writer.write_all(&buffer)?;
}
// write padding
writer.write_all(&vec![0; padding_bytes])?;

// write arrow data
let body_len = if arrow_data_len > 0 {
write_body_buffers(writer, &encoded.arrow_data)?
} else {
0
};

Ok((aligned_size, body_len))
}

fn write_body_buffers<W: Write>(mut writer: W, data: &[u8]) -> Result<usize> {
let len = data.len();
let pad_len = pad_to_8(data.len());
let total_len = len + pad_len;

// write body buffer
writer.write_all(data)?;
if pad_len > 0 {
writer.write_all(&vec![0u8; pad_len][..])?;
}

writer.flush()?;
Ok(total_len)
}

/// Write a record batch to the writer, writing the message size before the message
/// if the record batch is being written to a stream
pub fn write_continuation<W: Write>(writer: &mut W, total_len: i32) -> Result<usize> {
writer.write_all(&CONTINUATION_MARKER)?;
writer.write_all(&total_len.to_le_bytes()[..])?;
writer.flush()?;
Ok(8)
}

/// Calculate an 8-byte boundary and return the number of bytes needed to pad to 8 bytes
#[inline]
pub(crate) fn pad_to_8(len: usize) -> usize {
Expand Down
74 changes: 74 additions & 0 deletions src/io/ipc/write/common_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use futures::AsyncWrite;
use futures::AsyncWriteExt;

use crate::error::{ArrowError, Result};

use super::super::CONTINUATION_MARKER;
use super::common::pad_to_8;
use super::common::EncodedData;

/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
pub async fn write_message<W: AsyncWrite + Unpin + Send>(
writer: &mut W,
encoded: EncodedData,
) -> Result<(usize, usize)> {
let arrow_data_len = encoded.arrow_data.len();
if arrow_data_len % 8 != 0 {
return Err(ArrowError::Ipc("Arrow data not aligned".to_string()));
}

let a = 8 - 1;
let buffer = encoded.ipc_message;
let flatbuf_size = buffer.len();
let prefix_size = 8;
let aligned_size = (flatbuf_size + prefix_size + a) & !a;
let padding_bytes = aligned_size - flatbuf_size - prefix_size;

write_continuation(writer, (aligned_size - prefix_size) as i32).await?;

// write the flatbuf
if flatbuf_size > 0 {
writer.write_all(&buffer).await?;
}
// write padding
writer.write_all(&vec![0; padding_bytes]).await?;

// write arrow data
let body_len = if arrow_data_len > 0 {
write_body_buffers(writer, &encoded.arrow_data).await?
} else {
0
};

Ok((aligned_size, body_len))
}

/// Write a record batch to the writer, writing the message size before the message
/// if the record batch is being written to a stream
pub async fn write_continuation<W: AsyncWrite + Unpin + Send>(
writer: &mut W,
total_len: i32,
) -> Result<usize> {
writer.write_all(&CONTINUATION_MARKER).await?;
writer.write_all(&total_len.to_le_bytes()[..]).await?;
writer.flush().await?;
Ok(8)
}

async fn write_body_buffers<W: AsyncWrite + Unpin + Send>(
mut writer: W,
data: &[u8],
) -> Result<usize> {
let len = data.len();
let pad_len = pad_to_8(data.len());
let total_len = len + pad_len;

// write body buffer
writer.write_all(data).await?;
if pad_len > 0 {
writer.write_all(&vec![0u8; pad_len][..]).await?;
}

writer.flush().await?;
Ok(total_len)
}
64 changes: 64 additions & 0 deletions src/io/ipc/write/common_sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use std::io::Write;

use crate::error::{ArrowError, Result};

use super::super::CONTINUATION_MARKER;
use super::common::pad_to_8;
use super::common::EncodedData;

/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
pub fn write_message<W: Write>(writer: &mut W, encoded: EncodedData) -> Result<(usize, usize)> {
let arrow_data_len = encoded.arrow_data.len();
if arrow_data_len % 8 != 0 {
return Err(ArrowError::Ipc("Arrow data not aligned".to_string()));
}

let a = 8 - 1;
let buffer = encoded.ipc_message;
let flatbuf_size = buffer.len();
let prefix_size = 8;
let aligned_size = (flatbuf_size + prefix_size + a) & !a;
let padding_bytes = aligned_size - flatbuf_size - prefix_size;

write_continuation(writer, (aligned_size - prefix_size) as i32)?;

// write the flatbuf
if flatbuf_size > 0 {
writer.write_all(&buffer)?;
}
// write padding
writer.write_all(&vec![0; padding_bytes])?;

// write arrow data
let body_len = if arrow_data_len > 0 {
write_body_buffers(writer, &encoded.arrow_data)?
} else {
0
};

Ok((aligned_size, body_len))
}

fn write_body_buffers<W: Write>(mut writer: W, data: &[u8]) -> Result<usize> {
let len = data.len();
let pad_len = pad_to_8(data.len());
let total_len = len + pad_len;

// write body buffer
writer.write_all(data)?;
if pad_len > 0 {
writer.write_all(&vec![0u8; pad_len][..])?;
}

writer.flush()?;
Ok(total_len)
}

/// Write a record batch to the writer, writing the message size before the message
/// if the record batch is being written to a stream
pub fn write_continuation<W: Write>(writer: &mut W, total_len: i32) -> Result<usize> {
writer.write_all(&CONTINUATION_MARKER)?;
writer.write_all(&total_len.to_le_bytes()[..])?;
writer.flush()?;
Ok(8)
}
10 changes: 9 additions & 1 deletion src/io/ipc/write/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! APIs to write to Arrow's IPC format.
pub mod common;
pub(crate) mod common;
mod schema;
mod serialize;
mod stream;
Expand All @@ -10,3 +10,11 @@ pub use schema::schema_to_bytes;
pub use serialize::{write, write_dictionary};
pub use stream::StreamWriter;
pub use writer::FileWriter;

pub(crate) mod common_sync;

#[cfg(feature = "io_ipc_write_async")]
mod common_async;
#[cfg(feature = "io_ipc_write_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_write_async")))]
pub mod stream_async;
5 changes: 2 additions & 3 deletions src/io/ipc/write/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@

use std::io::Write;

use super::common::{
encoded_batch, write_continuation, write_message, DictionaryTracker, EncodedData, WriteOptions,
};
use super::common::{encoded_batch, DictionaryTracker, EncodedData, WriteOptions};
use super::common_sync::{write_continuation, write_message};
use super::schema_to_bytes;

use crate::datatypes::*;
Expand Down
77 changes: 77 additions & 0 deletions src/io/ipc/write/stream_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//! `async` writing of arrow streams
use futures::AsyncWrite;

pub use super::common::WriteOptions;
use super::common::{encoded_batch, DictionaryTracker, EncodedData};
use super::common_async::{write_continuation, write_message};
use super::schema_to_bytes;

use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;

/// An `async` writer to the Apache Arrow stream format.
pub struct StreamWriter<W: AsyncWrite + Unpin + Send> {
/// The object to write to
writer: W,
/// IPC write options
write_options: WriteOptions,
/// Whether the writer footer has been written, and the writer is finished
finished: bool,
/// Keeps track of dictionaries that have been written
dictionary_tracker: DictionaryTracker,
}

impl<W: AsyncWrite + Unpin + Send> StreamWriter<W> {
/// Creates a new [`StreamWriter`]
pub fn new(writer: W, write_options: WriteOptions) -> Self {
Self {
writer,
write_options,
finished: false,
dictionary_tracker: DictionaryTracker::new(false),
}
}

/// Starts the stream
pub async fn start(&mut self, schema: &Schema) -> Result<()> {
let encoded_message = EncodedData {
ipc_message: schema_to_bytes(schema),
arrow_data: vec![],
};
write_message(&mut self.writer, encoded_message).await?;
Ok(())
}

/// Writes a [`RecordBatch`] to the stream
pub async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
if self.finished {
return Err(ArrowError::Ipc(
"Cannot write record batch to stream writer as it is closed".to_string(),
));
}

// todo: move this out of the `async` since this is blocking.
let (encoded_dictionaries, encoded_message) =
encoded_batch(batch, &mut self.dictionary_tracker, &self.write_options)?;

for encoded_dictionary in encoded_dictionaries {
write_message(&mut self.writer, encoded_dictionary).await?;
}

write_message(&mut self.writer, encoded_message).await?;
Ok(())
}

/// Finishes the stream
pub async fn finish(&mut self) -> Result<()> {
write_continuation(&mut self.writer, 0).await?;
self.finished = true;
Ok(())
}

/// Consumes itself, returning the inner writer.
pub fn into_inner(self) -> W {
self.writer
}
}
6 changes: 2 additions & 4 deletions src/io/ipc/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ use arrow_format::ipc::flatbuffers::FlatBufferBuilder;
use super::super::ARROW_MAGIC;
use super::{
super::convert,
common::{
encoded_batch, write_continuation, write_message, DictionaryTracker, EncodedData,
WriteOptions,
},
common::{encoded_batch, DictionaryTracker, EncodedData, WriteOptions},
common_sync::{write_continuation, write_message},
schema_to_bytes,
};

Expand Down
3 changes: 3 additions & 0 deletions tests/it/io/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ mod read;
mod write;

pub use common::read_gzip_json;

#[cfg(feature = "io_ipc_write_async")]
mod write_async;
Loading