Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor into modules #32

Merged
merged 1 commit into from
Mar 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 0 additions & 84 deletions src/arrow1.rs

This file was deleted.

4 changes: 4 additions & 0 deletions src/arrow1/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod reader;
pub mod wasm;
pub mod writer;
pub mod writer_properties;
35 changes: 35 additions & 0 deletions src/arrow1/reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use arrow::ipc::writer::StreamWriter;
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
use parquet::errors::ParquetError;
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::file::serialized_reader::SliceableCursor;
use std::sync::Arc;

pub fn read_parquet(parquet_file: &[u8]) -> Result<Vec<u8>, ParquetError> {
// Create Parquet reader
let sliceable_cursor = SliceableCursor::new(Arc::new(parquet_file.to_vec()));
let parquet_reader = SerializedFileReader::new(sliceable_cursor)?;
let parquet_metadata = parquet_reader.metadata();
// TODO check that there exists at least one row group
let first_row_group_metadata = parquet_metadata.row_group(0);
let row_group_count = first_row_group_metadata.num_rows() as usize;

// Create Arrow reader from Parquet reader
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader));
let record_batch_reader = arrow_reader.get_record_reader(row_group_count)?;
let arrow_schema = arrow_reader.get_schema()?;

// Create IPC Writer
let mut output_file = Vec::new();
let mut writer = StreamWriter::try_new(&mut output_file, &arrow_schema)?;

// Iterate over record batches, writing them to IPC stream
for maybe_record_batch in record_batch_reader {
let record_batch = maybe_record_batch?;
writer.write(&record_batch)?;
}
writer.finish()?;

let writer_buffer = writer.into_inner()?;
return Ok(writer_buffer.to_vec());
}
40 changes: 40 additions & 0 deletions src/arrow1/wasm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use js_sys::Uint8Array;
use wasm_bindgen::prelude::*;

#[wasm_bindgen(js_name = readParquet)]
pub fn read_parquet(parquet_file: &[u8]) -> Result<Uint8Array, JsValue> {
let buffer = match crate::arrow1::reader::read_parquet(parquet_file) {
// This function would return a rust vec that would be copied to a Uint8Array here
Ok(buffer) => buffer,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};

let return_len = match (buffer.len() as usize).try_into() {
Ok(return_len) => return_len,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
let return_vec = Uint8Array::new_with_length(return_len);
return_vec.copy_from(&buffer);
return Ok(return_vec);
}

#[wasm_bindgen(js_name = writeParquet)]
pub fn write_parquet(
arrow_file: &[u8],
// TODO: make this param optional?
writer_properties: crate::arrow1::writer_properties::WriterProperties,
) -> Result<Uint8Array, JsValue> {
let buffer = match crate::arrow1::writer::write_parquet(arrow_file, writer_properties) {
// This function would return a rust vec that would be copied to a Uint8Array here
Ok(buffer) => buffer,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};

let return_len = match (buffer.len() as usize).try_into() {
Ok(return_len) => return_len,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
let return_vec = Uint8Array::new_with_length(return_len);
return_vec.copy_from(&buffer);
return Ok(return_vec);
}
30 changes: 30 additions & 0 deletions src/arrow1/writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use arrow::ipc::reader::StreamReader;
use parquet::arrow::arrow_writer::ArrowWriter;
use parquet::errors::ParquetError;
use parquet::file::writer::InMemoryWriteableCursor;
use std::io::Cursor;

pub fn write_parquet(
arrow_file: &[u8],
writer_properties: crate::arrow1::writer_properties::WriterProperties,
) -> Result<Vec<u8>, ParquetError> {
// Create IPC reader
let input_file = Cursor::new(arrow_file);
let arrow_ipc_reader = StreamReader::try_new(input_file)?;
let arrow_schema = arrow_ipc_reader.schema();

// Create Parquet writer
let cursor = InMemoryWriteableCursor::default();
let props = writer_properties.to_upstream();
let mut writer = ArrowWriter::try_new(cursor.clone(), arrow_schema, Some(props))?;

// Iterate over IPC chunks, writing each batch to Parquet
for maybe_record_batch in arrow_ipc_reader {
let record_batch = maybe_record_batch?;
writer.write(&record_batch)?;
}

writer.close()?;

return Ok(cursor.data());
}
18 changes: 0 additions & 18 deletions src/writer_properties1.rs → src/arrow1/writer_properties.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,5 @@
extern crate web_sys;
// #[cfg(feature = "arrow1")]

use wasm_bindgen::prelude::*;

// A macro to provide `println!(..)`-style syntax for `console.log` logging.
#[cfg(target_arch = "wasm32")]
macro_rules! log {
( $( $t:tt )* ) => {
web_sys::console::log_1(&format!( $( $t )* ).into());
}
}

#[cfg(not(target_arch = "wasm32"))]
macro_rules! log {
( $( $t:tt )* ) => {
println!("LOG - {}", format!( $( $t )* ));
}
}

/// Encodings supported by Parquet.
/// Not all encodings are valid for all types. These enums are also used to specify the
/// encoding of definition and repetition levels.
Expand Down
3 changes: 3 additions & 0 deletions src/arrow2/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod reader;
pub mod wasm;
pub mod writer;
27 changes: 27 additions & 0 deletions src/arrow2/reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use arrow2::error::ArrowError;
use arrow2::io::ipc::write::{StreamWriter as IPCStreamWriter, WriteOptions as IPCWriteOptions};
// NOTE: It's FileReader on latest main but RecordReader in 0.9.2
use arrow2::io::parquet::read::FileReader as ParquetFileReader;
use std::io::Cursor;

pub fn read_parquet(parquet_file: &[u8]) -> Result<Vec<u8>, ArrowError> {
// Create Parquet reader
let input_file = Cursor::new(parquet_file);
let file_reader = ParquetFileReader::try_new(input_file, None, None, None, None)?;
let schema = file_reader.schema().clone();

// Create IPC writer
let mut output_file = Vec::new();
let options = IPCWriteOptions { compression: None };
let mut writer = IPCStreamWriter::new(&mut output_file, options);
writer.start(&schema, None)?;

// Iterate over reader chunks, writing each into the IPC writer
for maybe_chunk in file_reader {
let chunk = maybe_chunk?;
writer.write(&chunk, None)?;
}

writer.finish()?;
return Ok(output_file);
}
36 changes: 36 additions & 0 deletions src/arrow2/wasm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use js_sys::Uint8Array;
use wasm_bindgen::prelude::*;

#[wasm_bindgen(js_name = readParquet2)]
pub fn read_parquet2(parquet_file: &[u8]) -> Result<Uint8Array, JsValue> {
let buffer = match crate::arrow2::reader::read_parquet(parquet_file) {
// This function would return a rust vec that would be copied to a Uint8Array here
Ok(buffer) => buffer,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};

let return_len = match (buffer.len() as usize).try_into() {
Ok(return_len) => return_len,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
let return_vec = Uint8Array::new_with_length(return_len);
return_vec.copy_from(&buffer);
return Ok(return_vec);
}

#[wasm_bindgen(js_name = writeParquet2)]
pub fn write_parquet2(arrow_file: &[u8]) -> Result<Uint8Array, JsValue> {
let buffer = match crate::arrow2::writer::write_parquet(arrow_file) {
// This function would return a rust vec that would be copied to a Uint8Array here
Ok(buffer) => buffer,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};

let return_len = match (buffer.len() as usize).try_into() {
Ok(return_len) => return_len,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
let return_vec = Uint8Array::new_with_length(return_len);
return_vec.copy_from(&buffer);
return Ok(return_vec);
}
42 changes: 6 additions & 36 deletions src/arrow2.rs → src/arrow2/writer.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,11 @@
#[cfg(feature = "arrow2")]
use {
arrow2::error::ArrowError,
arrow2::io::ipc::read::{read_file_metadata, FileReader as IPCFileReader},
arrow2::io::ipc::write::{StreamWriter as IPCStreamWriter, WriteOptions as IPCWriteOptions},
arrow2::io::parquet::read::FileReader as ParquetFileReader,
// NOTE: It's FileReader on latest main but RecordReader in 0.9.2
arrow2::io::parquet::write::{
Compression, Encoding, FileWriter as ParquetFileWriter, RowGroupIterator, Version,
WriteOptions as ParquetWriteOptions,
},
std::io::Cursor,
use arrow2::error::ArrowError;
use arrow2::io::ipc::read::{read_file_metadata, FileReader as IPCFileReader};
use arrow2::io::parquet::write::{
Compression, Encoding, FileWriter as ParquetFileWriter, RowGroupIterator, Version,
WriteOptions as ParquetWriteOptions,
};
use std::io::Cursor;

#[cfg(feature = "arrow2")]
pub fn read_parquet(parquet_file: &[u8]) -> Result<Vec<u8>, ArrowError> {
// Create Parquet reader
let input_file = Cursor::new(parquet_file);
let file_reader = ParquetFileReader::try_new(input_file, None, None, None, None)?;
let schema = file_reader.schema().clone();

// Create IPC writer
let mut output_file = Vec::new();
let options = IPCWriteOptions { compression: None };
let mut writer = IPCStreamWriter::new(&mut output_file, options);
writer.start(&schema, None)?;

// Iterate over reader chunks, writing each into the IPC writer
for maybe_chunk in file_reader {
let chunk = maybe_chunk?;
writer.write(&chunk, None)?;
}

writer.finish()?;
return Ok(output_file);
}

#[cfg(feature = "arrow2")]
pub fn write_parquet(arrow_file: &[u8]) -> Result<Vec<u8>, ArrowError> {
// Create IPC reader
let mut input_file = Cursor::new(arrow_file);
Expand Down
Loading