Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed reading arrays from parquet with required children (#1140)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jul 6, 2022
1 parent 78a2a63 commit 98e4913
Show file tree
Hide file tree
Showing 10 changed files with 351 additions and 341 deletions.
16 changes: 8 additions & 8 deletions parquet_integration/bench_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def get_f32(size, null_density):


def bench_add_f32_pyarrow(log2_size, null_density):
size = 2 ** log2_size
size = 2**log2_size

values, validity = get_f32(size, null_density)
array1 = pa.array(values, pa.float32(), mask=validity)
Expand All @@ -50,7 +50,7 @@ def f():


def bench_add_f32_numpy(log2_size):
size = 2 ** log2_size
size = 2**log2_size

array1, _ = get_f32(size, 0)
array2, _ = get_f32(size, 0)
Expand All @@ -71,7 +71,7 @@ def f():


def _bench_unary_f32_pyarrow(log2_size, null_density, name, op):
size = 2 ** log2_size
size = 2**log2_size

values, validity = get_f32(size, null_density)
array = pa.array(values, pa.float32(), mask=validity)
Expand Down Expand Up @@ -100,7 +100,7 @@ def bench_min_f32_pyarrow(log2_size, null_density):


def _bench_unary_f32_numpy(log2_size, name, op):
size = 2 ** log2_size
size = 2**log2_size

values, _ = get_f32(size, 0)

Expand Down Expand Up @@ -128,7 +128,7 @@ def bench_min_f32_numpy(log2_size):


def bench_sort_f32_pyarrow(log2_size, null_density):
size = 2 ** log2_size
size = 2**log2_size

values, validity = get_f32(size, null_density)
array = pa.array(values, pa.float32(), mask=validity)
Expand All @@ -151,7 +151,7 @@ def f():

def bench_sort_f32_numpy(log2_size):
null_density = 0
size = 2 ** log2_size
size = 2**log2_size

array, _ = get_f32(size, null_density)

Expand All @@ -171,7 +171,7 @@ def f():


def bench_filter_f32_pyarrow(log2_size, null_density):
size = 2 ** log2_size
size = 2**log2_size

values, validity = get_f32(size, null_density)
_, mask = get_f32(size, 0.9)
Expand All @@ -198,7 +198,7 @@ def f():

def bench_filter_f32_numpy(log2_size):
null_density = 0
size = 2 ** log2_size
size = 2**log2_size

array, _ = get_f32(size, null_density)
_, mask = get_f32(size, 0.1)
Expand Down
38 changes: 28 additions & 10 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,20 +223,31 @@ def case_struct() -> Tuple[dict, pa.Schema, str]:
]
),
),
pa.field(
"struct_nullable",
pa.struct(struct_fields),
),
]
)

struct = pa.StructArray.from_arrays(
[pa.array(string), pa.array(boolean)],
fields=struct_fields,
)
struct_nullable = pa.StructArray.from_arrays(
[pa.array(string), pa.array(boolean)],
fields=struct_fields,
mask=pa.array([False, False, True, False, False, False, False, False, False, False]),
)

return (
{
"struct": struct,
"struct_struct": pa.StructArray.from_arrays(
[struct, pa.array(boolean)],
names=["f1", "f2"],
),
"struct_nullable": struct_nullable,
},
schema,
f"struct_nullable_10.parquet",
Expand Down Expand Up @@ -307,16 +318,16 @@ def write_pyarrow(
base_path = f"{base_path}/{compression}"

if multiple_pages:
data_page_size = 2 ** 10 # i.e. a small number to ensure multiple pages
data_page_size = 2**10 # i.e. a small number to ensure multiple pages
else:
data_page_size = 2 ** 40 # i.e. a large number to ensure a single page
data_page_size = 2**40 # i.e. a large number to ensure a single page

t = pa.table(data, schema=schema)
os.makedirs(base_path, exist_ok=True)
pa.parquet.write_table(
t,
f"{base_path}/{path}",
row_group_size=2 ** 40,
row_group_size=2**40,
use_dictionary=use_dictionary,
compression=compression,
write_statistics=True,
Expand All @@ -325,7 +336,14 @@ def write_pyarrow(
)


for case in [case_basic_nullable, case_basic_required, case_nested, case_struct, case_nested_edge, case_map]:
for case in [
case_basic_nullable,
case_basic_required,
case_nested,
case_struct,
case_nested_edge,
case_map,
]:
for version in [1, 2]:
for use_dict in [True, False]:
for compression in ["lz4", None, "snappy"]:
Expand All @@ -351,14 +369,14 @@ def case_benches_required(size):
# for read benchmarks
for i in range(10, 22, 2):
# two pages (dict)
write_pyarrow(case_benches(2 ** i), 1, True, False, None)
write_pyarrow(case_benches(2**i), 1, True, False, None)
# single page
write_pyarrow(case_benches(2 ** i), 1, False, False, None)
write_pyarrow(case_benches(2**i), 1, False, False, None)
# single page required
write_pyarrow(case_benches_required(2 ** i), 1, False, False, None)
write_pyarrow(case_benches_required(2**i), 1, False, False, None)
# multiple pages
write_pyarrow(case_benches(2 ** i), 1, False, True, None)
write_pyarrow(case_benches(2**i), 1, False, True, None)
# multiple compressed pages
write_pyarrow(case_benches(2 ** i), 1, False, True, "snappy")
write_pyarrow(case_benches(2**i), 1, False, True, "snappy")
# single compressed page
write_pyarrow(case_benches(2 ** i), 1, False, False, "snappy")
write_pyarrow(case_benches(2**i), 1, False, False, "snappy")
87 changes: 40 additions & 47 deletions src/io/parquet/read/deserialize/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,24 @@ use super::basic::ValuesDictionary;
use super::utils::*;
use super::{
super::utils,
basic::{finish, Required, TraitBinaryArray},
basic::{finish, TraitBinaryArray},
};

#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum State<'a> {
Optional(Optional<'a>, BinaryIter<'a>),
Required(Required<'a>),
Optional(BinaryIter<'a>),
Required(BinaryIter<'a>),
RequiredDictionary(ValuesDictionary<'a>),
OptionalDictionary(Optional<'a>, ValuesDictionary<'a>),
OptionalDictionary(ValuesDictionary<'a>),
}

impl<'a> utils::PageState<'a> for State<'a> {
fn len(&self) -> usize {
match self {
State::Optional(validity, _) => validity.len(),
State::Required(state) => state.len(),
State::Optional(validity) => validity.size_hint().0,
State::Required(state) => state.size_hint().0,
State::RequiredDictionary(required) => required.len(),
State::OptionalDictionary(optional, _) => optional.len(),
State::OptionalDictionary(optional) => optional.len(),
}
}
}
Expand All @@ -45,7 +44,7 @@ struct BinaryDecoder<O: Offset> {
phantom_o: std::marker::PhantomData<O>,
}

impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder<O> {
type State = State<'a>;
type DecodedState = (Binary<O>, MutableBitmap);

Expand All @@ -62,25 +61,26 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => {
let dict = dict.as_any().downcast_ref().unwrap();
Ok(State::RequiredDictionary(ValuesDictionary::try_new(
page, dict,
)?))
ValuesDictionary::try_new(page, dict).map(State::RequiredDictionary)
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => {
let dict = dict.as_any().downcast_ref().unwrap();
Ok(State::OptionalDictionary(
Optional::try_new(page)?,
ValuesDictionary::try_new(page, dict)?,
))
ValuesDictionary::try_new(page, dict).map(State::OptionalDictionary)
}
(Encoding::Plain, _, true, false) => {
let (_, _, values) = split_buffer(page)?;

let values = BinaryIter::new(values);

Ok(State::Optional(Optional::try_new(page)?, values))
Ok(State::Optional(values))
}
(Encoding::Plain, _, false, false) => {
let (_, _, values) = split_buffer(page)?;

let values = BinaryIter::new(values);

Ok(State::Required(values))
}
(Encoding::Plain, _, false, false) => Ok(State::Required(Required::try_new(page)?)),
_ => Err(utils::not_implemented(page)),
}
}
Expand All @@ -92,24 +92,17 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
)
}

fn extend_from_state(
&self,
state: &mut Self::State,
decoded: &mut Self::DecodedState,
additional: usize,
) {
fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) {
let (values, validity) = decoded;
match state {
State::Optional(page_validity, page_values) => {
let items = page_validity.by_ref().take(additional);
let items = Zip::new(items, page_values.by_ref());

read_optional_values(items, values, validity)
State::Optional(page) => {
let value = page.next().unwrap_or_default();
values.push(value);
validity.push(true);
}
State::Required(page) => {
for x in page.values.by_ref().take(additional) {
values.push(x)
}
let value = page.next().unwrap_or_default();
values.push(value);
}
State::RequiredDictionary(page) => {
let dict_values = page.dict.values();
Expand All @@ -121,36 +114,38 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
};
for x in page.values.by_ref().map(op).take(additional) {
values.push(x)
}
let item = page.values.next().map(op).unwrap_or_default();
values.push(item);
}
State::OptionalDictionary(page_validity, page_values) => {
let dict_values = page_values.dict.values();
let dict_offsets = page_values.dict.offsets();
State::OptionalDictionary(page) => {
let dict_values = page.dict.values();
let dict_offsets = page.dict.offsets();

let op = move |index: u32| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
};

let items = page_validity.by_ref().take(additional);
let items = Zip::new(items, page_values.values.by_ref().map(op));

read_optional_values(items, values, validity)
let item = page.values.next().map(op).unwrap_or_default();
values.push(item);
validity.push(true);
}
}
}

fn push_null(&self, decoded: &mut Self::DecodedState) {
let (values, validity) = decoded;
values.push(&[]);
validity.push(false);
}
}

pub struct ArrayIterator<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
iter: I,
data_type: DataType,
init: Vec<InitNested>,
items: VecDeque<(Binary<O>, MutableBitmap)>,
nested: VecDeque<NestedState>,
items: VecDeque<(NestedState, (Binary<O>, MutableBitmap))>,
chunk_size: Option<usize>,
phantom_a: std::marker::PhantomData<A>,
}
Expand All @@ -167,7 +162,6 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> ArrayIterator<O, A, I> {
data_type,
init,
items: VecDeque::new(),
nested: VecDeque::new(),
chunk_size,
phantom_a: Default::default(),
}
Expand All @@ -181,7 +175,6 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for ArrayIterator
let maybe_state = next(
&mut self.iter,
&mut self.items,
&mut self.nested,
&self.init,
self.chunk_size,
&BinaryDecoder::<O>::default(),
Expand Down
Loading

0 comments on commit 98e4913

Please sign in to comment.