Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Temp
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Oct 17, 2021
1 parent ce37fda commit da77d13
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 35 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ futures = { version = "0.3", optional = true }
ahash = { version = "0.7", optional = true }

#parquet2 = { version = "0.5.2", optional = true, default_features = false, features = ["stream"] }
#parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "reuse_compress", optional = true, default_features = false, features = ["stream"] }
parquet2 = { path = "../parquet2", optional = true, default_features = false, features = ["stream"] }
parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "reuse_compress", optional = true, default_features = false, features = ["stream"] }
#parquet2 = { path = "../parquet2", optional = true, default_features = false, features = ["stream"] }

avro-rs = { version = "0.13", optional = true, default_features = false }

Expand Down Expand Up @@ -100,7 +100,7 @@ full = [
"io_avro",
"regex",
"merge_sort",
#"compute",
"compute",
# parses timezones used in timestamp conversions
"chrono-tz"
]
Expand Down
3 changes: 2 additions & 1 deletion examples/parquet_read.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::fs::File;
use std::io::BufReader;

use arrow2::io::parquet::read;
use arrow2::{array::Array, error::Result};

fn read_column_chunk(path: &str, row_group: usize, column: usize) -> Result<Box<dyn Array>> {
// Open a file, a common operation in Rust
let mut file = File::open(path)?;
let mut file = BufReader::new(File::open(path)?);

// Read the files' metadata. This has a small IO cost because it requires seeking to the end
// of the file to read its footer.
Expand Down
8 changes: 3 additions & 5 deletions examples/parquet_read_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,14 @@ fn parallel_read(path: &str) -> Result<Vec<Box<dyn Array>>> {
let metadata_consumer = file_metadata.clone();
let arrow_schema_consumer = arrow_schema.clone();
let child = thread::spawn(move || {
let (column, row_group, iter) = rx_consumer.recv().unwrap();
let (column, row_group, pages) = rx_consumer.recv().unwrap();
let start = SystemTime::now();
println!("consumer start - {} {}", column, row_group);
let metadata = metadata_consumer.row_groups[row_group].column(column);
let data_type = arrow_schema_consumer.fields()[column].data_type().clone();

let pages = iter
.into_iter()
.map(|x| x.and_then(|x| read::decompress(x, &mut vec![])));
let mut pages = read::streaming_iterator::convert(pages);
let mut pages = read::BasicDecompressor::new(pages.into_iter(), vec![]);

let array = read::page_iter_to_array(&mut pages, metadata, data_type);
println!(
"consumer end - {:?}: {} {}",
Expand Down
31 changes: 19 additions & 12 deletions examples/parquet_write.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::fs::File;
use std::iter::once;

use arrow2::error::ArrowError;
use arrow2::io::parquet::write::to_parquet_schema;
use arrow2::{
array::{Array, Int32Array},
datatypes::{Field, Schema},
error::Result,
io::parquet::write::{
array_to_page, write_file, Compression, DynIter, Encoding, Version, WriteOptions,
array_to_pages, write_file, Compression, Compressor, DynIter, DynStreamingIterator,
Encoding, FallibleStreamingIterator, Version, WriteOptions,
},
};

Expand All @@ -24,17 +26,22 @@ fn write_single_array(path: &str, array: &dyn Array, field: Field) -> Result<()>
// map arrow fields to parquet fields
let parquet_schema = to_parquet_schema(&schema)?;

// Declare the row group iterator. This must be an iterator of iterators of iterators:
// * first iterator of row groups
// * second iterator of column chunks
// * third iterator of pages
// an array can be divided in multiple pages via `.slice(offset, length)` (`O(1)`).
// All column chunks within a row group MUST have the same length.
let row_groups = once(Result::Ok(DynIter::new(once(Ok(DynIter::new(
once(array)
.zip(parquet_schema.columns().to_vec().into_iter())
.map(|(array, descriptor)| array_to_page(array, descriptor, options, encoding)),
))))));
let descriptor = parquet_schema.columns()[0].clone();

// Declare the row group iterator. This must be an iterator of iterators of streaming iterators
// * first iterator over row groups
let row_groups = once(Result::Ok(DynIter::new(
// * second iterator over column chunks (we assume no struct arrays -> `once` column)
once(
// * third iterator over (compressed) pages; dictionary encoding may lead to multiple pages per array.
array_to_pages(array, descriptor, options, encoding).map(move |pages| {
let encoded_pages = DynIter::new(pages.map(|x| Ok(x?)));
let compressed_pages = Compressor::new(encoded_pages, options.compression, vec![])
.map_err(ArrowError::from);
DynStreamingIterator::new(compressed_pages)
}),
),
)));

// Create a new empty file
let mut file = File::create(path)?;
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ pub use parquet2::{
page::{CompressedDataPage, DataPage, DataPageHeader},
read::{
decompress, get_page_iterator as _get_page_iterator, get_page_stream as _get_page_stream,
read_metadata as _read_metadata, read_metadata_async as _read_metadata_async, Decompressor,
PageFilter, PageIterator,
read_metadata as _read_metadata, read_metadata_async as _read_metadata_async,
BasicDecompressor, Decompressor, PageFilter, PageIterator,
},
schema::types::{
LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType,
Expand Down
7 changes: 3 additions & 4 deletions src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ mod utils;

pub mod stream;

use std::sync::Arc;

use crate::array::*;
use crate::bitmap::Bitmap;
use crate::buffer::{Buffer, MutableBuffer};
Expand All @@ -35,6 +33,7 @@ pub use parquet2::{
write_file as parquet_write_file, Compressor, DynIter, DynStreamingIterator, RowGroupIter,
Version, WriteOptions,
},
FallibleStreamingIterator,
};
pub use record_batch::RowGroupIterator;
use schema::schema_to_metadata_key;
Expand Down Expand Up @@ -110,7 +109,7 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool {

/// Returns an iterator of [`EncodedPage`].
pub fn array_to_pages(
array: Arc<dyn Array>,
array: &dyn Array,
descriptor: ColumnDescriptor,
options: WriteOptions,
encoding: Encoding,
Expand All @@ -126,7 +125,7 @@ pub fn array_to_pages(
)
})
}
_ => array_to_page(array.as_ref(), descriptor, options, encoding)
_ => array_to_page(array, descriptor, options, encoding)
.map(|page| DynIter::new(std::iter::once(Ok(page)))),
}
}
Expand Down
16 changes: 9 additions & 7 deletions src/io/parquet/write/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,15 @@ impl<I: Iterator<Item = Result<RecordBatch>>> Iterator for RowGroupIterator<I> {
.zip(self.parquet_schema.columns().to_vec().into_iter())
.zip(encodings.into_iter())
.map(move |((array, descriptor), encoding)| {
array_to_pages(array, descriptor, options, encoding).map(move |pages| {
let encoded_pages = DynIter::new(pages.map(|x| Ok(x?)));
let compressed_pages =
Compressor::new(encoded_pages, options.compression, vec![])
.map_err(ArrowError::from);
DynStreamingIterator::new(compressed_pages)
})
array_to_pages(array.as_ref(), descriptor, options, encoding).map(
move |pages| {
let encoded_pages = DynIter::new(pages.map(|x| Ok(x?)));
let compressed_pages =
Compressor::new(encoded_pages, options.compression, vec![])
.map_err(ArrowError::from);
DynStreamingIterator::new(compressed_pages)
},
)
}),
))
})
Expand Down
2 changes: 1 addition & 1 deletion tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ fn integration_write(schema: &Schema, batches: &[RecordBatch]) -> Result<Vec<u8>
} else {
Encoding::Plain
};
array_to_pages(array.clone(), descriptor, options, encoding).map(|pages| {
array_to_pages(array.as_ref(), descriptor, options, encoding).map(|pages| {
let encoded_pages = DynIter::new(pages.map(|x| Ok(x?)));
let compressed_pages =
Compressor::new(encoded_pages, options.compression, vec![])
Expand Down

0 comments on commit da77d13

Please sign in to comment.