Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed limit pushdown in parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jul 24, 2022
1 parent dc77578 commit c13c9c5
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/nested_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ fn extend_offsets2<'a, D: NestedDecoder<'a>>(

let next_rep = page.iter.peek().map(|x| x.0).unwrap_or(0);

if next_rep == 0 && rows == additional.saturating_add(1) {
if next_rep == 0 && rows == additional {
break;
}
}
Expand Down
1 change: 1 addition & 0 deletions src/io/parquet/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ impl<R: Read + Seek> RowGroupReader<R> {
row_group,
self.schema.fields.clone(),
self.chunk_size,
Some(self.remaining_rows),
)?;

let result = RowGroupDeserializer::new(
Expand Down
17 changes: 6 additions & 11 deletions src/io/parquet/read/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,7 @@ impl Iterator for RowGroupDeserializer {
let chunk = self
.column_chunks
.iter_mut()
.map(|iter| {
let array = iter.next().unwrap()?;
Ok(if array.len() > self.remaining_rows {
array.slice(0, array.len() - self.remaining_rows)
} else {
array
})
})
.map(|iter| iter.next().unwrap())
.collect::<Result<Vec<_>>>()
.and_then(Chunk::try_new);
self.remaining_rows = self.remaining_rows.saturating_sub(
Expand Down Expand Up @@ -218,7 +211,11 @@ pub fn read_columns_many<'a, R: Read + Seek>(
row_group: &RowGroupMetaData,
fields: Vec<Field>,
chunk_size: Option<usize>,
limit: Option<usize>,
) -> Result<Vec<ArrayIter<'a>>> {
let num_rows = row_group.num_rows();
let num_rows = limit.map(|limit| limit.min(num_rows)).unwrap_or(num_rows);

// reads all the necessary columns for all fields from the row group
// This operation is IO-bounded `O(C)` where C is the number of columns in the row group
let field_columns = fields
Expand All @@ -229,9 +226,7 @@ pub fn read_columns_many<'a, R: Read + Seek>(
field_columns
.into_iter()
.zip(fields.into_iter())
.map(|(columns, field)| {
to_deserializer(columns, field, row_group.num_rows() as usize, chunk_size)
})
.map(|(columns, field)| to_deserializer(columns, field, num_rows, chunk_size))
.collect()
}

Expand Down
12 changes: 6 additions & 6 deletions src/io/print.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,27 @@ use crate::{
use comfy_table::{Cell, Table};

/// Returns a visual representation of [`Chunk`]
pub fn write<A: AsRef<dyn Array>, N: AsRef<str>>(batches: &[Chunk<A>], names: &[N]) -> String {
pub fn write<A: AsRef<dyn Array>, N: AsRef<str>>(chunks: &[Chunk<A>], names: &[N]) -> String {
let mut table = Table::new();
table.load_preset("||--+-++| ++++++");

if batches.is_empty() {
if chunks.is_empty() {
return table.to_string();
}

let header = names.iter().map(|name| Cell::new(name.as_ref()));
table.set_header(header);

for batch in batches {
let displayes = batch
for chunk in chunks {
let displayes = chunk
.arrays()
.iter()
.map(|array| get_display(array.as_ref(), ""))
.collect::<Vec<_>>();

for row in 0..batch.len() {
for row in 0..chunk.len() {
let mut cells = Vec::new();
(0..batch.arrays().len()).for_each(|col| {
(0..chunk.arrays().len()).for_each(|col| {
let mut string = String::new();
displayes[col](&mut string, row).unwrap();
cells.push(Cell::new(string));
Expand Down

0 comments on commit c13c9c5

Please sign in to comment.