Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Fixed reading arrays from parquet with required children #1140

Merged
merged 1 commit into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions parquet_integration/bench_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def get_f32(size, null_density):


def bench_add_f32_pyarrow(log2_size, null_density):
size = 2 ** log2_size
size = 2**log2_size

values, validity = get_f32(size, null_density)
array1 = pa.array(values, pa.float32(), mask=validity)
Expand All @@ -50,7 +50,7 @@ def f():


def bench_add_f32_numpy(log2_size):
size = 2 ** log2_size
size = 2**log2_size

array1, _ = get_f32(size, 0)
array2, _ = get_f32(size, 0)
Expand All @@ -71,7 +71,7 @@ def f():


def _bench_unary_f32_pyarrow(log2_size, null_density, name, op):
size = 2 ** log2_size
size = 2**log2_size

values, validity = get_f32(size, null_density)
array = pa.array(values, pa.float32(), mask=validity)
Expand Down Expand Up @@ -100,7 +100,7 @@ def bench_min_f32_pyarrow(log2_size, null_density):


def _bench_unary_f32_numpy(log2_size, name, op):
size = 2 ** log2_size
size = 2**log2_size

values, _ = get_f32(size, 0)

Expand Down Expand Up @@ -128,7 +128,7 @@ def bench_min_f32_numpy(log2_size):


def bench_sort_f32_pyarrow(log2_size, null_density):
size = 2 ** log2_size
size = 2**log2_size

values, validity = get_f32(size, null_density)
array = pa.array(values, pa.float32(), mask=validity)
Expand All @@ -151,7 +151,7 @@ def f():

def bench_sort_f32_numpy(log2_size):
null_density = 0
size = 2 ** log2_size
size = 2**log2_size

array, _ = get_f32(size, null_density)

Expand All @@ -171,7 +171,7 @@ def f():


def bench_filter_f32_pyarrow(log2_size, null_density):
size = 2 ** log2_size
size = 2**log2_size

values, validity = get_f32(size, null_density)
_, mask = get_f32(size, 0.9)
Expand All @@ -198,7 +198,7 @@ def f():

def bench_filter_f32_numpy(log2_size):
null_density = 0
size = 2 ** log2_size
size = 2**log2_size

array, _ = get_f32(size, null_density)
_, mask = get_f32(size, 0.1)
Expand Down
38 changes: 28 additions & 10 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,20 +223,31 @@ def case_struct() -> Tuple[dict, pa.Schema, str]:
]
),
),
pa.field(
"struct_nullable",
pa.struct(struct_fields),
),
]
)

struct = pa.StructArray.from_arrays(
[pa.array(string), pa.array(boolean)],
fields=struct_fields,
)
struct_nullable = pa.StructArray.from_arrays(
[pa.array(string), pa.array(boolean)],
fields=struct_fields,
mask=pa.array([False, False, True, False, False, False, False, False, False, False]),
)

return (
{
"struct": struct,
"struct_struct": pa.StructArray.from_arrays(
[struct, pa.array(boolean)],
names=["f1", "f2"],
),
"struct_nullable": struct_nullable,
},
schema,
f"struct_nullable_10.parquet",
Expand Down Expand Up @@ -307,16 +318,16 @@ def write_pyarrow(
base_path = f"{base_path}/{compression}"

if multiple_pages:
data_page_size = 2 ** 10 # i.e. a small number to ensure multiple pages
data_page_size = 2**10 # i.e. a small number to ensure multiple pages
else:
data_page_size = 2 ** 40 # i.e. a large number to ensure a single page
data_page_size = 2**40 # i.e. a large number to ensure a single page

t = pa.table(data, schema=schema)
os.makedirs(base_path, exist_ok=True)
pa.parquet.write_table(
t,
f"{base_path}/{path}",
row_group_size=2 ** 40,
row_group_size=2**40,
use_dictionary=use_dictionary,
compression=compression,
write_statistics=True,
Expand All @@ -325,7 +336,14 @@ def write_pyarrow(
)


for case in [case_basic_nullable, case_basic_required, case_nested, case_struct, case_nested_edge, case_map]:
for case in [
case_basic_nullable,
case_basic_required,
case_nested,
case_struct,
case_nested_edge,
case_map,
]:
for version in [1, 2]:
for use_dict in [True, False]:
for compression in ["lz4", None, "snappy"]:
Expand All @@ -351,14 +369,14 @@ def case_benches_required(size):
# for read benchmarks
for i in range(10, 22, 2):
# two pages (dict)
write_pyarrow(case_benches(2 ** i), 1, True, False, None)
write_pyarrow(case_benches(2**i), 1, True, False, None)
# single page
write_pyarrow(case_benches(2 ** i), 1, False, False, None)
write_pyarrow(case_benches(2**i), 1, False, False, None)
# single page required
write_pyarrow(case_benches_required(2 ** i), 1, False, False, None)
write_pyarrow(case_benches_required(2**i), 1, False, False, None)
# multiple pages
write_pyarrow(case_benches(2 ** i), 1, False, True, None)
write_pyarrow(case_benches(2**i), 1, False, True, None)
# multiple compressed pages
write_pyarrow(case_benches(2 ** i), 1, False, True, "snappy")
write_pyarrow(case_benches(2**i), 1, False, True, "snappy")
# single compressed page
write_pyarrow(case_benches(2 ** i), 1, False, False, "snappy")
write_pyarrow(case_benches(2**i), 1, False, False, "snappy")
87 changes: 40 additions & 47 deletions src/io/parquet/read/deserialize/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,24 @@ use super::basic::ValuesDictionary;
use super::utils::*;
use super::{
super::utils,
basic::{finish, Required, TraitBinaryArray},
basic::{finish, TraitBinaryArray},
};

#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum State<'a> {
Optional(Optional<'a>, BinaryIter<'a>),
Required(Required<'a>),
Optional(BinaryIter<'a>),
Required(BinaryIter<'a>),
RequiredDictionary(ValuesDictionary<'a>),
OptionalDictionary(Optional<'a>, ValuesDictionary<'a>),
OptionalDictionary(ValuesDictionary<'a>),
}

impl<'a> utils::PageState<'a> for State<'a> {
fn len(&self) -> usize {
match self {
State::Optional(validity, _) => validity.len(),
State::Required(state) => state.len(),
State::Optional(validity) => validity.size_hint().0,
State::Required(state) => state.size_hint().0,
State::RequiredDictionary(required) => required.len(),
State::OptionalDictionary(optional, _) => optional.len(),
State::OptionalDictionary(optional) => optional.len(),
}
}
}
Expand All @@ -45,7 +44,7 @@ struct BinaryDecoder<O: Offset> {
phantom_o: std::marker::PhantomData<O>,
}

impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder<O> {
type State = State<'a>;
type DecodedState = (Binary<O>, MutableBitmap);

Expand All @@ -62,25 +61,26 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => {
let dict = dict.as_any().downcast_ref().unwrap();
Ok(State::RequiredDictionary(ValuesDictionary::try_new(
page, dict,
)?))
ValuesDictionary::try_new(page, dict).map(State::RequiredDictionary)
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => {
let dict = dict.as_any().downcast_ref().unwrap();
Ok(State::OptionalDictionary(
Optional::try_new(page)?,
ValuesDictionary::try_new(page, dict)?,
))
ValuesDictionary::try_new(page, dict).map(State::OptionalDictionary)
}
(Encoding::Plain, _, true, false) => {
let (_, _, values) = split_buffer(page)?;

let values = BinaryIter::new(values);

Ok(State::Optional(Optional::try_new(page)?, values))
Ok(State::Optional(values))
}
(Encoding::Plain, _, false, false) => {
let (_, _, values) = split_buffer(page)?;

let values = BinaryIter::new(values);

Ok(State::Required(values))
}
(Encoding::Plain, _, false, false) => Ok(State::Required(Required::try_new(page)?)),
_ => Err(utils::not_implemented(page)),
}
}
Expand All @@ -92,24 +92,17 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
)
}

fn extend_from_state(
&self,
state: &mut Self::State,
decoded: &mut Self::DecodedState,
additional: usize,
) {
fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) {
let (values, validity) = decoded;
match state {
State::Optional(page_validity, page_values) => {
let items = page_validity.by_ref().take(additional);
let items = Zip::new(items, page_values.by_ref());

read_optional_values(items, values, validity)
State::Optional(page) => {
let value = page.next().unwrap_or_default();
values.push(value);
validity.push(true);
}
State::Required(page) => {
for x in page.values.by_ref().take(additional) {
values.push(x)
}
let value = page.next().unwrap_or_default();
values.push(value);
}
State::RequiredDictionary(page) => {
let dict_values = page.dict.values();
Expand All @@ -121,36 +114,38 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
};
for x in page.values.by_ref().map(op).take(additional) {
values.push(x)
}
let item = page.values.next().map(op).unwrap_or_default();
values.push(item);
}
State::OptionalDictionary(page_validity, page_values) => {
let dict_values = page_values.dict.values();
let dict_offsets = page_values.dict.offsets();
State::OptionalDictionary(page) => {
let dict_values = page.dict.values();
let dict_offsets = page.dict.offsets();

let op = move |index: u32| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
};

let items = page_validity.by_ref().take(additional);
let items = Zip::new(items, page_values.values.by_ref().map(op));

read_optional_values(items, values, validity)
let item = page.values.next().map(op).unwrap_or_default();
values.push(item);
validity.push(true);
}
}
}

fn push_null(&self, decoded: &mut Self::DecodedState) {
let (values, validity) = decoded;
values.push(&[]);
validity.push(false);
}
}

pub struct ArrayIterator<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
iter: I,
data_type: DataType,
init: Vec<InitNested>,
items: VecDeque<(Binary<O>, MutableBitmap)>,
nested: VecDeque<NestedState>,
items: VecDeque<(NestedState, (Binary<O>, MutableBitmap))>,
chunk_size: Option<usize>,
phantom_a: std::marker::PhantomData<A>,
}
Expand All @@ -167,7 +162,6 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> ArrayIterator<O, A, I> {
data_type,
init,
items: VecDeque::new(),
nested: VecDeque::new(),
chunk_size,
phantom_a: Default::default(),
}
Expand All @@ -181,7 +175,6 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for ArrayIterator
let maybe_state = next(
&mut self.iter,
&mut self.items,
&mut self.nested,
&self.init,
self.chunk_size,
&BinaryDecoder::<O>::default(),
Expand Down
Loading