Skip to content

Commit

Permalink
Add bindings for arrow2 metadata (without serde support) (#153)
Browse files Browse the repository at this point in the history
kylebarron authored Jul 10, 2022
1 parent 451c9bf commit 926e322
Showing 9 changed files with 537 additions and 45 deletions.
82 changes: 47 additions & 35 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -68,6 +68,8 @@ parquet = { version = "17.0", default-features = false, optional = true, feature
] }
bytes = { version = "1", optional = true }

serde-wasm-bindgen = "0.4.3"
# serde = "1.0.137"
# 3.2 added deprecations that don't currently pass clippy
clap = { version = ">=3.1.15, <3.2", optional = true, features = ["derive"] }

20 changes: 14 additions & 6 deletions scripts/build.sh
Original file line number Diff line number Diff line change
@@ -4,8 +4,10 @@ mkdir -p tmp_build

if [ "$ENV" == "DEV" ]; then
BUILD="--dev"
FLAGS="--features debug"
else
BUILD="--release"
FLAGS=""
fi

######################################
@@ -16,23 +18,26 @@ wasm-pack build \
$BUILD \
--out-dir tmp_build/node \
--out-name arrow1 \
--target nodejs
--target nodejs \
$FLAGS

# Build web version into tmp_build/esm
echo "Building arrow-rs esm"
wasm-pack build \
$BUILD \
--out-dir tmp_build/esm \
--out-name arrow1 \
--target web
--target web \
$FLAGS

# Build bundler version into tmp_build/bundler
echo "Building arrow-rs bundler"
wasm-pack build \
$BUILD \
--out-dir tmp_build/bundler \
--out-name arrow1 \
--target bundler
--target bundler \
$FLAGS

######################################
# ARROW 2 turn on the feature manually
@@ -47,7 +52,8 @@ wasm-pack build \
--features arrow2 \
--features reader \
--features writer \
--features all_compressions
--features all_compressions \
$FLAGS

# Build web version into tmp_build/esm2
echo "Building arrow2 esm"
@@ -60,7 +66,8 @@ wasm-pack build \
--features arrow2 \
--features reader \
--features writer \
--features all_compressions
--features all_compressions \
$FLAGS

# Build bundler version into tmp_build/bundler2
echo "Building arrow2 bundler"
@@ -73,7 +80,8 @@ wasm-pack build \
--features arrow2 \
--features reader \
--features writer \
--features all_compressions
--features all_compressions \
$FLAGS

# Copy files into pkg/
mkdir -p pkg/{node,esm,bundler}
327 changes: 327 additions & 0 deletions src/arrow2/metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,327 @@
use std::collections::HashMap;
use wasm_bindgen::prelude::*;

/// Metadata for a Parquet file.
#[derive(Debug, Clone)]
#[wasm_bindgen]
pub struct FileMetaData(arrow2::io::parquet::read::FileMetaData);

#[wasm_bindgen]
impl FileMetaData {
/// Version of this file.
#[wasm_bindgen]
pub fn version(&self) -> i32 {
self.0.version
}

/// number of rows in the file.
#[wasm_bindgen(js_name = numRows)]
pub fn num_rows(&self) -> usize {
self.0.num_rows
}

/// String message for application that wrote this file.
#[wasm_bindgen(js_name = createdBy)]
pub fn created_by(&self) -> Option<String> {
self.0.created_by.clone()
}

/// Number of row groups in the file
#[wasm_bindgen(js_name = numRowGroups)]
pub fn num_row_groups(&self) -> usize {
self.0.row_groups.len()
}

/// Returns a single RowGroupMetaData by index
#[wasm_bindgen(js_name = rowGroup)]
pub fn row_group(&self, i: usize) -> RowGroupMetaData {
self.0.row_groups[i].clone().into()
}

#[wasm_bindgen]
pub fn schema(&self) -> SchemaDescriptor {
SchemaDescriptor::new(self.0.schema().clone())
}

#[wasm_bindgen(js_name = keyValueMetadata)]
pub fn key_value_metadata(&self) -> Result<JsValue, JsValue> {
let mut map: HashMap<String, Option<String>> = HashMap::new();
let metadata = &self.0.key_value_metadata;
if let Some(metadata) = metadata {
for item in metadata {
map.insert(item.key.clone(), item.value.clone());
}
}

match serde_wasm_bindgen::to_value(&map) {
Ok(value) => Ok(value),
Err(error) => Err(JsValue::from_str(format!("{}", error).as_str())),
}
}

// /// Column (sort) order used for `min` and `max` values of each column in this file.
// ///
// /// Each column order corresponds to one column, determined by its position in the
// /// list, matching the position of the column in the schema.
// ///
// /// When `None` is returned, there are no column orders available, and each column
// /// should be assumed to have undefined (legacy) column order.
// pub fn column_order(&self, i: usize) -> ColumnOrder {
// let col_order = self.0.column_order(i);
// col_order.
// }
}

impl From<arrow2::io::parquet::read::FileMetaData> for FileMetaData {
fn from(meta: arrow2::io::parquet::read::FileMetaData) -> Self {
FileMetaData(meta)
}
}

impl From<FileMetaData> for arrow2::io::parquet::read::FileMetaData {
fn from(meta: FileMetaData) -> arrow2::io::parquet::read::FileMetaData {
meta.0
}
}

/// Metadata for a row group.
#[derive(Debug, Clone)]
#[wasm_bindgen]
pub struct RowGroupMetaData(arrow2::io::parquet::read::RowGroupMetaData);

#[wasm_bindgen]
impl RowGroupMetaData {
/// Number of rows in this row group.
#[wasm_bindgen(js_name = numRows)]
pub fn num_rows(&self) -> usize {
self.0.num_rows()
}

/// Number of columns in this row group.
#[wasm_bindgen(js_name = numColumns)]
pub fn num_columns(&self) -> usize {
self.0.columns().len()
}

/// Returns a single column chunk metadata by index
#[wasm_bindgen]
pub fn column(&self, i: usize) -> ColumnChunkMetaData {
self.0.columns()[i].clone().into()
}

/// Total byte size of all uncompressed column data in this row group.
#[wasm_bindgen(js_name = totalByteSize)]
pub fn total_byte_size(&self) -> usize {
self.0.total_byte_size()
}

/// Total size of all compressed column data in this row group.
#[wasm_bindgen(js_name = compressedSize)]
pub fn compressed_size(&self) -> usize {
self.0.compressed_size()
}
}

impl From<arrow2::io::parquet::read::RowGroupMetaData> for RowGroupMetaData {
fn from(meta: arrow2::io::parquet::read::RowGroupMetaData) -> Self {
RowGroupMetaData(meta)
}
}

/// Metadata for a column chunk.
// This contains the `ColumnDescriptor` associated with the chunk so that deserializers have
// access to the descriptor (e.g. physical, converted, logical).
#[derive(Debug, Clone)]
#[wasm_bindgen]
pub struct ColumnChunkMetaData(arrow2::io::parquet::read::ColumnChunkMetaData);

#[wasm_bindgen]
impl ColumnChunkMetaData {
/// File where the column chunk is stored.
///
/// If not set, assumed to belong to the same file as the metadata.
/// This path is relative to the current file.
#[wasm_bindgen(js_name = filePath)]
pub fn file_path(&self) -> Option<String> {
self.0.file_path().clone()
}

/// Byte offset in `file_path()`.
#[wasm_bindgen(js_name = fileOffset)]
pub fn file_offset(&self) -> i64 {
self.0.file_offset()
}

// /// Returns this column's [`ColumnChunk`]
// #[wasm_bindgen(js_name = columnChunk)]
// pub fn column_chunk(&self) -> usize {
// // let a = self.0.column_chunk();
// // let map
// // let val = serde_wasm_bindgen::to_value(a);

// // &self.column_chunk
// }

// /// The column's [`ColumnMetaData`]
// #[wasm_bindgen]
// pub fn metadata(&self) -> &ColumnMetaData {
// self.column_chunk.meta_data.as_ref().unwrap()
// }

// /// The [`ColumnDescriptor`] for this column. This descriptor contains the physical and logical type
// /// of the pages.
// #[wasm_bindgen]
// pub fn descriptor(&self) -> &ColumnDescriptor {
// &self.column_descr
// }

// /// The [`PhysicalType`] of this column.
// #[wasm_bindgen(js_name = physicalType)]
// pub fn physical_type(&self) -> PhysicalType {
// self.column_descr.descriptor.primitive_type.physical_type
// }

// #[wasm_bindgen(js_name = getStatistics)]
// pub fn get_statistics(&self) -> () {
// let maybe_statistics = self.0.statistics();
// if let Some(statistics) = maybe_statistics {
// let statistics = statistics.unwrap();
// let js_val: serde_wasm_bindgen::to_value(statistics);
// statistics.physical_type()
// }
// }

// /// Decodes the raw statistics into [`Statistics`].
// #[wasm_bindgen]
// pub fn statistics(&self) -> Option<Result<Arc<dyn Statistics>>> {
// self.metadata()
// .statistics
// .as_ref()
// .map(|x| deserialize_statistics(x, self.column_descr.descriptor.primitive_type.clone()))
// }

/// Total number of values in this column chunk. Note that this is not necessarily the number
/// of rows. E.g. the (nested) array `[[1, 2], [3]]` has 2 rows and 3 values.
#[wasm_bindgen(js_name = numValues)]
pub fn num_values(&self) -> i64 {
self.0.num_values()
}

// /// [`Compression`] for this column.
// #[wasm_bindgen(js_name = compression)]
// pub fn compression(&self) -> Compression {
// let compression = self.0.compression();
// compression.
// }

/// Returns the total compressed data size of this column chunk.
#[wasm_bindgen(js_name = compressedSize)]
pub fn compressed_size(&self) -> i64 {
self.0.compressed_size()
}

/// Returns the total uncompressed data size of this column chunk.
#[wasm_bindgen(js_name = uncompressedSize)]
pub fn uncompressed_size(&self) -> i64 {
self.0.uncompressed_size()
}

/// Returns the offset for the column data.
#[wasm_bindgen(js_name = dataPageOffset)]
pub fn data_page_offset(&self) -> i64 {
self.0.data_page_offset()
}

/// Returns `true` if this column chunk contains a index page, `false` otherwise.
#[wasm_bindgen(js_name = hasIndexPage)]
pub fn has_index_page(&self) -> bool {
self.0.has_index_page()
}

/// Returns the offset for the index page.
#[wasm_bindgen(js_name = indexPageOffset)]
pub fn index_page_offset(&self) -> Option<i64> {
self.0.index_page_offset()
}

/// Returns the offset for the dictionary page, if any.
#[wasm_bindgen(js_name = dictionaryPageOffset)]
pub fn dictionary_page_offset(&self) -> Option<i64> {
self.0.dictionary_page_offset()
}

/// Returns the number of encodings for this column
#[wasm_bindgen(js_name = numColumnEncodings)]
pub fn num_column_encodings(&self) -> usize {
self.0.column_encoding().len()
}

// /// Returns the encoding for this column
// #[wasm_bindgen(js_name = columnEncoding)]
// pub fn column_encoding(&self, i: usize) -> Encoding {
// self.0.column_encoding()[i]
// }

/// Returns the offset and length in bytes of the column chunk within the file
#[wasm_bindgen(js_name = byteRange)]
pub fn byte_range(&self) -> Vec<u64> {
let mut vec: Vec<u64> = Vec::new();
let byte_range = self.0.byte_range();
vec.push(byte_range.0);
vec.push(byte_range.1);
vec
}
}

impl From<arrow2::io::parquet::read::ColumnChunkMetaData> for ColumnChunkMetaData {
fn from(meta: arrow2::io::parquet::read::ColumnChunkMetaData) -> Self {
ColumnChunkMetaData(meta)
}
}

/// A schema descriptor. This encapsulates the top-level schemas for all the columns,
/// as well as all descriptors for all the primitive columns.
#[wasm_bindgen]
#[derive(Debug, Clone)]
pub struct SchemaDescriptor(parquet2::metadata::SchemaDescriptor);

impl SchemaDescriptor {
pub fn new(meta: parquet2::metadata::SchemaDescriptor) -> Self {
Self(meta)
}
}

#[wasm_bindgen]
impl SchemaDescriptor {
/// The schemas' name.
#[wasm_bindgen]
pub fn name(&self) -> String {
self.0.name().to_string()
}

/// The number of columns in the schema
#[wasm_bindgen(js_name = numColumns)]
pub fn num_columns(&self) -> usize {
self.0.columns().len()
}

// /// The [`ColumnDescriptor`] (leafs) of this schema.
// ///
// /// Note that, for nested fields, this may contain more entries than the number of fields
// /// in the file - e.g. a struct field may have two columns.
// pub fn column(&self, i: usize) -> ColumnDescriptor {
// ColumnDescriptor::new(self.0.columns()[i])
// }

/// The number of fields in the schema
#[wasm_bindgen(js_name = numFields)]
pub fn num_fields(&self) -> usize {
self.0.fields().len()
}

// /// The schemas' fields.
// #[wasm_bindgen]
// pub fn fields(&self, i: usize) -> ParquetType {
// ParquetType::new(self.0.fields()[i])
// }
}
3 changes: 3 additions & 0 deletions src/arrow2/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#[cfg(feature = "reader")]
pub mod reader;

#[cfg(feature = "reader")]
pub mod metadata;

pub mod wasm;

#[cfg(feature = "writer")]
46 changes: 44 additions & 2 deletions src/arrow2/reader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use arrow2::error::Error as ArrowError;
use arrow2::io::ipc::write::{StreamWriter as IPCStreamWriter, WriteOptions as IPCWriteOptions};
// NOTE: It's FileReader on latest main but RecordReader in 0.9.2
use arrow2::io::parquet::read::FileReader as ParquetFileReader;
use arrow2::io::parquet::read::{
infer_schema, read_columns_many, FileReader as ParquetFileReader, RowGroupDeserializer,
};
use parquet2::metadata::FileMetaData;
use std::io::Cursor;

/// Internal function to read a buffer with Parquet data into a buffer with Arrow IPC Stream data
@@ -27,3 +29,43 @@ pub fn read_parquet(parquet_file: &[u8]) -> Result<Vec<u8>, ArrowError> {
writer.finish()?;
Ok(output_file)
}

/// Read metadata from parquet buffer
pub fn read_metadata(parquet_file: &[u8]) -> Result<FileMetaData, ArrowError> {
let input_file = Cursor::new(parquet_file);
let file_reader = ParquetFileReader::try_new(input_file, None, None, None, None)?;
Ok(file_reader.metadata().clone())
}

/// Read single row group
pub fn read_row_group(
parquet_file: &[u8],
meta: &FileMetaData,
i: usize,
) -> Result<Vec<u8>, ArrowError> {
let mut reader = Cursor::new(parquet_file);
let arrow_schema = infer_schema(meta)?;

let row_group_meta = &meta.row_groups[i];
let column_chunks = read_columns_many(
&mut reader,
row_group_meta,
arrow_schema.fields.clone(),
None,
)?;

let result = RowGroupDeserializer::new(column_chunks, row_group_meta.num_rows() as usize, None);

let mut output_file = Vec::new();
let options = IPCWriteOptions { compression: None };
let mut writer = IPCStreamWriter::new(&mut output_file, options);
writer.start(&arrow_schema, None)?;

for maybe_chunk in result {
let chunk = maybe_chunk?;
writer.write(&chunk, None)?;
}

writer.finish()?;
Ok(output_file)
}
78 changes: 78 additions & 0 deletions src/arrow2/wasm.rs
Original file line number Diff line number Diff line change
@@ -35,6 +35,84 @@ pub fn read_parquet2(parquet_file: &[u8]) -> Result<Uint8Array, JsValue> {
}
}

/// Read metadata from a Parquet file using the [`arrow2`](https://crates.io/crates/arrow2) and
/// [`parquet2`](https://crates.io/crates/parquet2) Rust crates.
///
/// Example:
///
/// ```js
/// // Edit the `parquet-wasm` import as necessary
/// import { readMetadata2 } from "parquet-wasm/node2";
///
/// const resp = await fetch("https://example.com/file.parquet");
/// const parquetUint8Array = new Uint8Array(await resp.arrayBuffer());
/// const parquetFileMetaData = readMetadata2(parquetUint8Array);
/// ```
///
/// @param parquet_file Uint8Array containing Parquet data
/// @returns a {@linkcode FileMetaData} object containing metadata of the Parquet file.
#[wasm_bindgen(js_name = readMetadata2)]
#[cfg(feature = "reader")]
pub fn read_metadata2(
parquet_file: &[u8],
) -> Result<crate::arrow2::metadata::FileMetaData, JsValue> {
if parquet_file.is_empty() {
return Err(JsValue::from_str(
"Empty input provided or not a Uint8Array.",
));
}

match crate::arrow2::reader::read_metadata(parquet_file) {
Ok(metadata) => Ok(metadata.into()),
Err(error) => Err(JsValue::from_str(format!("{}", error).as_str())),
}
}

/// Read a single row group from a Parquet file into Arrow data using the
/// [`arrow2`](https://crates.io/crates/arrow2) and [`parquet2`](https://crates.io/crates/parquet2)
/// Rust crates.
///
/// Example:
///
/// ```js
/// import { tableFromIPC } from "apache-arrow";
/// // Edit the `parquet-wasm` import as necessary
/// import { readRowGroup2, readMetadata2 } from "parquet-wasm/node2";
///
/// const resp = await fetch("https://example.com/file.parquet");
/// const parquetUint8Array = new Uint8Array(await resp.arrayBuffer());
/// const parquetFileMetaData = readMetadata2(parquetUint8Array);
///
/// // Read only the first row group
/// const arrowIpcBuffer = wasm.readRowGroup2(parquetUint8Array, parquetFileMetaData, 0);
/// const arrowTable = tableFromIPC(arrowUint8Array);
/// ```
///
/// Note that you can get the number of row groups in a Parquet file using {@linkcode FileMetaData.numRowGroups}
///
/// @param parquet_file Uint8Array containing Parquet data
/// @param meta {@linkcode FileMetaData} from a call to {@linkcode readMetadata2}
/// @param i Number index of the row group to parse
/// @returns Uint8Array containing Arrow data in [IPC Stream format](https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format). To parse this into an Arrow table, pass to `tableFromIPC` in the Arrow JS bindings.
#[wasm_bindgen(js_name = readRowGroup2)]
#[cfg(feature = "reader")]
pub fn read_row_group2(
parquet_file: &[u8],
meta: &crate::arrow2::metadata::FileMetaData,
i: usize,
) -> Result<Uint8Array, JsValue> {
if parquet_file.is_empty() {
return Err(JsValue::from_str(
"Empty input provided or not a Uint8Array.",
));
}

match crate::arrow2::reader::read_row_group(parquet_file, &meta.clone().into(), i) {
Ok(buffer) => copy_vec_to_uint8_array(buffer),
Err(error) => Err(JsValue::from_str(format!("{}", error).as_str())),
}
}

/// Write Arrow data to a Parquet file using the [`arrow2`](https://crates.io/crates/arrow2) and
/// [`parquet2`](https://crates.io/crates/parquet2) Rust crates.
///
21 changes: 20 additions & 1 deletion tests/js/arrow2.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as test from "tape";
import * as wasm from "../../pkg/node/arrow2";
import { readFileSync } from "fs";
import { tableFromIPC, tableToIPC } from "apache-arrow";
import { RecordBatch, Table, tableFromIPC, tableToIPC } from "apache-arrow";
import { testArrowTablesEqual, readExpectedArrowData } from "./utils";

// Path from repo root
@@ -76,3 +76,22 @@ test("error produced trying to read file with arrayBuffer", (t) => {

t.end();
});

test("iterate over row groups", (t) => {
const dataPath = `${dataDir}/2-partition-brotli.parquet`;
const buffer = readFileSync(dataPath);
const arr = new Uint8Array(buffer);
const fileMetaData = wasm.readMetadata2(arr);

const chunks: RecordBatch[] = [];
for (let i = 0; i < fileMetaData.numRowGroups(); i++) {
let arrowIpcBuffer = wasm.readRowGroup2(arr, fileMetaData, i);
chunks.push(...tableFromIPC(arrowIpcBuffer).batches);
}

const table = new Table(chunks);
const expectedTable = readExpectedArrowData();
testArrowTablesEqual(t, expectedTable, table);

t.end();
});
3 changes: 2 additions & 1 deletion www/index.js
Original file line number Diff line number Diff line change
@@ -8,9 +8,10 @@ setPanicHook();

// const filePath = "./water-stress_rcp26and85_2020-2040-10.parquet";

const filePath = "./data/2-partition-none.parquet";
// const filePath = "./data/2-partition-brotli.parquet";
// const filePath = "./data/1-partition-gzip.parquet";
const filePath = "./data/1-partition-none.parquet";
// const filePath = "./data/1-partition-none.parquet";
// const filePath = "./data/1-partition-snappy.parquet";
// const filePath = "./data/1-partition-none.parquet";
// const filePath = "./data/2-partition-brotli.parquet";

0 comments on commit 926e322

Please sign in to comment.