Skip to content

Commit

Permalink
Move parquet async functionality behind feature flag (jorgecarleitao#…
Browse files Browse the repository at this point in the history
  • Loading branch information
MostlyAmiable authored Oct 28, 2023
1 parent b073454 commit 3ddc6a1
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 9 deletions.
10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ odbc-api = { version = "0.36", optional = true }
# Faster hashing
ahash = "0.8"

# For `LIKE` matching "contains" fast-path
# For `LIKE` matching "contains" fast-path
memchr = { version = "2.6", optional = true }

# Support conversion to/from arrow-rs
Expand All @@ -117,7 +117,6 @@ getrandom = { version = "0.2", features = ["js"] }
version = "0.17"
optional = true
default_features = false
features = ["async"]

[dev-dependencies]
criterion = "0.4"
Expand Down Expand Up @@ -160,7 +159,7 @@ full = [
"io_ipc_compression",
"io_json_integration",
"io_print",
"io_parquet",
"io_parquet_async",
"io_parquet_compression",
"io_avro",
"io_orc",
Expand Down Expand Up @@ -189,7 +188,8 @@ io_ipc_compression = ["lz4", "zstd"]
io_flight = ["io_ipc", "arrow-format/flight-data"]

# base64 + io_ipc because arrow schemas are stored as base64-encoded ipc format.
io_parquet = ["parquet2", "io_ipc", "base64", "futures", "streaming-iterator", "fallible-streaming-iterator"]
io_parquet = ["parquet2", "io_ipc", "base64", "streaming-iterator", "fallible-streaming-iterator"]
io_parquet_async = ["futures", "io_parquet", "parquet2/async"]

io_parquet_compression = [
"io_parquet_zstd",
Expand All @@ -200,7 +200,7 @@ io_parquet_compression = [
]

# sample testing of generated arrow data
io_parquet_sample_test = ["io_parquet"]
io_parquet_sample_test = ["io_parquet_async"]

# compression backends
io_parquet_zstd = ["parquet2/zstd"]
Expand Down
13 changes: 9 additions & 4 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,22 @@ pub mod statistics;

use std::io::{Read, Seek};

#[cfg(feature = "io_parquet_async")]
use futures::{AsyncRead, AsyncSeek};

// re-exports of parquet2's relevant APIs
#[cfg(feature = "io_parquet_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_parquet_async")))]
pub use parquet2::read::{get_page_stream, read_metadata_async as _read_metadata_async};
pub use parquet2::{
error::Error as ParquetError,
fallible_streaming_iterator,
metadata::{ColumnChunkMetaData, ColumnDescriptor, RowGroupMetaData},
page::{CompressedDataPage, DataPageHeader, Page},
read::{
decompress, get_column_iterator, get_page_stream,
read_columns_indexes as _read_columns_indexes, read_metadata as _read_metadata,
read_metadata_async as _read_metadata_async, read_pages_locations, BasicDecompressor,
Decompressor, MutStreamingIterator, PageFilter, PageReader, ReadColumnIterator, State,
decompress, get_column_iterator, read_columns_indexes as _read_columns_indexes,
read_metadata as _read_metadata, read_pages_locations, BasicDecompressor, Decompressor,
MutStreamingIterator, PageFilter, PageReader, ReadColumnIterator, State,
},
schema::types::{
GroupLogicalType, ParquetType, PhysicalType, PrimitiveConvertedType, PrimitiveLogicalType,
Expand Down Expand Up @@ -60,6 +63,8 @@ pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetaData> {
}

/// Reads parquets' metadata asynchronously.
#[cfg(feature = "io_parquet_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_parquet_async")))]
pub async fn read_metadata_async<R: AsyncRead + AsyncSeek + Send + Unpin>(
reader: &mut R,
) -> Result<FileMetaData> {
Expand Down
6 changes: 6 additions & 0 deletions src/io/parquet/read/row_group.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::io::{Read, Seek};

#[cfg(feature = "io_parquet_async")]
use futures::{
future::{try_join_all, BoxFuture},
AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt,
Expand Down Expand Up @@ -138,6 +139,7 @@ where
Ok((meta, chunk))
}

#[cfg(feature = "io_parquet_async")]
async fn _read_single_column_async<'b, R, F>(
reader_factory: F,
meta: &ColumnChunkMetaData,
Expand All @@ -163,6 +165,8 @@ where
///
/// It does so asynchronously via a single `join_all` over all the necessary columns for
/// `field_name`.
#[cfg(feature = "io_parquet_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_parquet_async")))]
pub async fn read_columns_async<
'a,
'b,
Expand Down Expand Up @@ -303,6 +307,8 @@ pub fn read_columns_many<'a, R: Read + Seek>(
/// This operation is IO-bounded `O(C)` where C is the number of columns in the row group -
/// it reads all the columns to memory from the row group associated to the requested fields.
/// It does so asynchronously via `join_all`
#[cfg(feature = "io_parquet_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_parquet_async")))]
pub async fn read_columns_many_async<
'a,
'b,
Expand Down
3 changes: 3 additions & 0 deletions src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod pages;
mod primitive;
mod row_group;
mod schema;
#[cfg(feature = "io_parquet_async")]
mod sink;
mod utf8;
mod utils;
Expand Down Expand Up @@ -68,6 +69,8 @@ use crate::compute::aggregate::estimated_bytes_size;
pub use file::FileWriter;
pub use row_group::{row_group_iter, RowGroupIterator};
pub use schema::to_parquet_type;
#[cfg(feature = "io_parquet_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_parquet_async")))]
pub use sink::FileSink;

pub use pages::array_to_columns;
Expand Down

0 comments on commit 3ddc6a1

Please sign in to comment.