Skip to content

Commit

Permalink
Async reader (#96)
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron authored Jul 10, 2022
1 parent 926e322 commit b49105a
Show file tree
Hide file tree
Showing 12 changed files with 326 additions and 11 deletions.
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{
"rust-analyzer.cargo.target": "wasm32-unknown-unknown"
"rust-analyzer.cargo.target": "wasm32-unknown-unknown",
"rust-analyzer.cargo.features": ["all"]
}
60 changes: 60 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 25 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ crate-type = ["cdylib", "rlib"]
[features]
default = ["arrow1", "all_compressions", "reader", "writer"]
arrow1 = ["dep:arrow", "dep:parquet", "dep:bytes"]
arrow2 = ["dep:arrow2", "dep:parquet2"]
arrow2 = ["dep:arrow2", "dep:parquet2", "dep:serde-wasm-bindgen"]
reader = []
writer = []
async = ["dep:wasm-bindgen-futures", "dep:futures", "dep:range-reader"]
debug = ["console_error_panic_hook", "clap"]

brotli = ["parquet?/brotli", "parquet2?/brotli"]
Expand All @@ -32,10 +33,10 @@ lz4 = ["parquet2?/lz4_flex"]
all_compressions = ["brotli", "gzip", "snappy", "zstd", "lz4"]

# Full list of available features
full = ["arrow1", "arrow2", "debug", "all_compressions", "reader", "writer"]
full = ["arrow1", "arrow2", "async", "debug", "all_compressions", "reader", "writer"]

[dependencies]
wasm-bindgen = "0.2.80"
wasm-bindgen = { version = "0.2.80", features = ["serde-serialize"] }

# The `console_error_panic_hook` crate provides better debugging of panics by
# logging them with `console.error`. This is great for development, but requires
Expand All @@ -51,28 +52,44 @@ console_error_panic_hook = { version = "0.1.6", optional = true }

js-sys = "0.3.58"
getrandom = { version = "0.2.6", features = ["js"] }
web-sys = { version = "0.3", features = ["console"] }

arrow2 = { version = "0.12", optional = true, features = [
"io_ipc",
"io_parquet",
] }
# parquet2 is only used indirectly through arrow2, but we define it here so that we can turn on
# parquet2 features as needed, e.g. with --features parquet2/gzip
parquet2 = { version = "0.13", default_features = false, optional = true }

arrow = { version = "17.0", default-features = false, optional = true }
parquet = { version = "17.0", default-features = false, optional = true, features = [
"arrow",
"base64",
"async",
] }
async-compat = "0.2.1"
bytes = { version = "1", optional = true }

serde-wasm-bindgen = "0.4.3"
# serde = "1.0.137"
serde-wasm-bindgen = {version = "0.4.3", optional = true}

# 3.2 added deprecations that don't currently pass clippy
clap = { version = ">=3.1.15, <3.2", optional = true, features = ["derive"] }

wasm-bindgen-futures = {version = "0.4.30", optional = true}
futures = {version = "0.3", optional = true}
range-reader = {version = "0.1", optional = true}

[dependencies.web-sys]
version = "0.3.4"
features = [
'console',
'Headers',
'Request',
'RequestInit',
'RequestMode',
'Response',
'Window',
"Document", "Element"
]

[dev-dependencies]
wasm-bindgen-test = "0.3.31"

Expand Down
3 changes: 3 additions & 0 deletions scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ wasm-pack build \
--features reader \
--features writer \
--features all_compressions \
--features async \
$FLAGS

# Build web version into tmp_build/esm2
Expand All @@ -67,6 +68,7 @@ wasm-pack build \
--features reader \
--features writer \
--features all_compressions \
--features async \
$FLAGS

# Build bundler version into tmp_build/bundler2
Expand All @@ -81,6 +83,7 @@ wasm-pack build \
--features reader \
--features writer \
--features all_compressions \
--features async \
$FLAGS

# Copy files into pkg/
Expand Down
6 changes: 6 additions & 0 deletions src/arrow2/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ pub struct FileMetaData(arrow2::io::parquet::read::FileMetaData);

#[wasm_bindgen]
impl FileMetaData {
/// Clone this struct in wasm memory.
#[wasm_bindgen]
pub fn copy(&self) -> Self {
FileMetaData(self.0.clone())
}

/// Version of this file.
#[wasm_bindgen]
pub fn version(&self) -> i32 {
Expand Down
3 changes: 3 additions & 0 deletions src/arrow2/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#[cfg(feature = "reader")]
pub mod reader;

#[cfg(all(feature = "reader", feature = "async"))]
pub mod reader_async;

#[cfg(feature = "reader")]
pub mod metadata;

Expand Down
91 changes: 91 additions & 0 deletions src/arrow2/reader_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use crate::common::fetch::make_range_request;
use crate::log;
use arrow2::error::Error as ArrowError;
use arrow2::io::ipc::write::{StreamWriter as IPCStreamWriter, WriteOptions as IPCWriteOptions};
use arrow2::io::parquet::read::{infer_schema, FileMetaData};
use arrow2::io::parquet::read::{read_columns_many_async, RowGroupDeserializer};
use futures::channel::oneshot;
use futures::future::BoxFuture;
use parquet2::read::read_metadata_async as _read_metadata_async;
use range_reader::{RangeOutput, RangedAsyncReader};
use wasm_bindgen_futures::spawn_local;

/// Create a RangedAsyncReader
fn create_reader(
url: String,
content_length: usize,
min_request_size: Option<usize>,
) -> RangedAsyncReader {
// at least 4kb per s3 request. Adjust to your liking.
let min_request_size = min_request_size.unwrap_or(4 * 1024);

// Closure for making an individual HTTP range request to a file
let range_get = Box::new(move |start: u64, length: usize| {
let url = url.clone();

Box::pin(async move {
let (sender2, receiver2) = oneshot::channel::<Vec<u8>>();
spawn_local(async move {
log!("Making range request");
let inner_data = make_range_request(&url, start, length).await.unwrap();
sender2.send(inner_data).unwrap();
});
let data = receiver2.await.unwrap();

Ok(RangeOutput { start, data })
}) as BoxFuture<'static, std::io::Result<RangeOutput>>
});

RangedAsyncReader::new(content_length, min_request_size, range_get)
}

pub async fn read_metadata_async(
url: String,
content_length: usize,
) -> Result<FileMetaData, ArrowError> {
let mut reader = create_reader(url, content_length, None);
let metadata = _read_metadata_async(&mut reader).await?;
Ok(metadata)
}

pub async fn read_row_group(
url: String,
content_length: usize,
metadata: &FileMetaData,
i: usize,
) -> Result<Vec<u8>, ArrowError> {
let reader_factory = || {
Box::pin(futures::future::ready(Ok(create_reader(
url.clone(),
content_length,
None,
)))) as BoxFuture<'static, std::result::Result<RangedAsyncReader, std::io::Error>>
};

// let's read the first row group only. Iterate over them to your liking
let group = &metadata.row_groups[i];

// no chunk size in deserializing
let chunk_size = None;

let schema = infer_schema(metadata)?;
let fields = schema.fields.clone();

// this is IO-bounded (and issues a join, thus the reader_factory)
let column_chunks = read_columns_many_async(reader_factory, group, fields, chunk_size).await?;

// Create IPC writer
let mut output_file = Vec::new();
let options = IPCWriteOptions { compression: None };
let mut writer = IPCStreamWriter::new(&mut output_file, options);
writer.start(&schema, None)?;

let deserializer = RowGroupDeserializer::new(column_chunks, group.num_rows() as usize, None);
for maybe_chunk in deserializer {
let chunk = maybe_chunk?;
writer.write(&chunk, None)?;
}

writer.finish()?;
Ok(output_file)
}
28 changes: 28 additions & 0 deletions src/arrow2/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,34 @@ pub fn read_row_group2(
}
}

#[wasm_bindgen(js_name = readMetadataAsync2)]
#[cfg(all(feature = "reader", feature = "async"))]
pub async fn read_metadata_async2(
url: String,
content_length: usize,
) -> Result<crate::arrow2::metadata::FileMetaData, JsValue> {
match crate::arrow2::reader_async::read_metadata_async(url, content_length).await {
Ok(metadata) => Ok(metadata.into()),
Err(error) => Err(JsValue::from_str(format!("{}", error).as_str())),
}
}

#[wasm_bindgen(js_name = readRowGroupAsync2)]
#[cfg(all(feature = "reader", feature = "async"))]
pub async fn read_row_group_async2(
url: String,
content_length: usize,
meta: crate::arrow2::metadata::FileMetaData,
i: usize,
) -> Result<Uint8Array, JsValue> {
match crate::arrow2::reader_async::read_row_group(url, content_length, &meta.clone().into(), i)
.await
{
Ok(buffer) => copy_vec_to_uint8_array(buffer),
Err(error) => Err(JsValue::from_str(format!("{}", error).as_str())),
}
}

/// Write Arrow data to a Parquet file using the [`arrow2`](https://crates.io/crates/arrow2) and
/// [`parquet2`](https://crates.io/crates/parquet2) Rust crates.
///
Expand Down
Loading

0 comments on commit b49105a

Please sign in to comment.