Skip to content

Commit

Permalink
Further doc tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jan 1, 2022
1 parent 0fe966a commit 6a21ad2
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 7 deletions.
2 changes: 1 addition & 1 deletion parquet/src/arrow/record_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ where
)
})?;

let iter = def_levels.valid_position_iter(
let iter = def_levels.rev_valid_positions_iter(
self.values_written..self.values_written + levels_read,
);

Expand Down
22 changes: 17 additions & 5 deletions parquet/src/arrow/record_reader/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ pub trait BufferQueue: Sized {

type Slice: ?Sized;

/// Split out the first `len` committed items
/// Split out the first `len` items
///
/// # Panics
///
/// Implementations must panic if `len` is beyond the length of [`BufferQueue`]
///
fn split_off(&mut self, len: usize) -> Self::Output;

/// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used
Expand Down Expand Up @@ -59,7 +64,7 @@ pub trait BufferQueue: Sized {
fn set_len(&mut self, len: usize);
}

/// A typed buffer similar to [`Vec<T>`] but making use of [`MutableBuffer`]
/// A typed buffer similar to [`Vec<T>`] but using [`MutableBuffer`] for storage
pub struct TypedBuffer<T> {
buffer: MutableBuffer,

Expand Down Expand Up @@ -152,23 +157,30 @@ impl<T> BufferQueue for TypedBuffer<T> {
}
}

/// A [`BufferQueue`] capable of storing column values
pub trait ValuesBuffer: BufferQueue {
/// Iterate through the indexes in `range` in reverse order, moving the value at each
/// index to the next index returned by `rev_valid_position_iter`
///
/// It is guaranteed that the `i`th index returned by `rev_valid_position_iter` is greater
/// than or equal to `range.end - i - 1`
///
fn pad_nulls(
&mut self,
range: Range<usize>,
rev_position_iter: impl Iterator<Item = usize>,
rev_valid_position_iter: impl Iterator<Item = usize>,
);
}

impl<T> ValuesBuffer for TypedBuffer<T> {
fn pad_nulls(
&mut self,
range: Range<usize>,
rev_position_iter: impl Iterator<Item = usize>,
rev_valid_position_iter: impl Iterator<Item = usize>,
) {
let slice = self.as_slice_mut();

for (value_pos, level_pos) in range.rev().zip(rev_position_iter) {
for (value_pos, level_pos) in range.rev().zip(rev_valid_position_iter) {
debug_assert!(level_pos >= value_pos);
if level_pos <= value_pos {
break;
Expand Down
3 changes: 2 additions & 1 deletion parquet/src/arrow/record_reader/definition_levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ impl DefinitionLevelBuffer {
old_bitmap
}

pub fn valid_position_iter(
/// Returns an iterator of the valid positions in `range` in descending order
pub fn rev_valid_positions_iter(
&self,
range: Range<usize>,
) -> impl Iterator<Item = usize> + '_ {
Expand Down
25 changes: 25 additions & 0 deletions parquet/src/column/reader/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ use crate::util::bit_util::BitReader;

/// A slice of levels buffer data that is written to by a [`ColumnLevelDecoder`]
pub trait LevelsBufferSlice {
/// Returns the capacity of this slice or `usize::MAX` if no limit
fn capacity(&self) -> usize;

/// Count the number of levels in `range` not equal to `max_level`
fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize;
}

Expand All @@ -46,6 +48,7 @@ impl LevelsBufferSlice for [i16] {

/// A slice of values buffer data that is written to by a [`ColumnValueDecoder`]
pub trait ValuesBufferSlice {
/// Returns the capacity of this slice or `usize::MAX` if no limit
fn capacity(&self) -> usize;
}

Expand All @@ -59,17 +62,29 @@ impl<T> ValuesBufferSlice for [T] {
pub trait ColumnLevelDecoder {
type Slice: LevelsBufferSlice + ?Sized;

/// Create a new [`ColumnLevelDecoder`]
fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self;

/// Read level data into `out[range]` returning the number of levels read
///
/// `range` is provided by the caller to allow for types such as default-initialized `[T]`
/// that only track capacity and not length
///
/// # Panics
///
/// Implementations may panic if `range` overlaps with already written data
///
fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> Result<usize>;
}

/// Decodes value data to a [`ValuesBufferSlice`]
pub trait ColumnValueDecoder {
type Slice: ValuesBufferSlice + ?Sized;

/// Create a new [`ColumnValueDecoder`]
fn new(col: &ColumnDescPtr) -> Self;

/// Set the current dictionary page
fn set_dict(
&mut self,
buf: ByteBufferPtr,
Expand All @@ -78,13 +93,23 @@ pub trait ColumnValueDecoder {
is_sorted: bool,
) -> Result<()>;

/// Set the current data page
fn set_data(
&mut self,
encoding: Encoding,
data: ByteBufferPtr,
num_values: usize,
) -> Result<()>;

/// Read values data into `out[range]` returning the number of values read
///
/// `range` is provided by the caller to allow for types such as default-initialized `[T]`
/// that only track capacity and not length
///
/// # Panics
///
/// Implementations may panic if `range` overlaps with already written data
///
fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> Result<usize>;
}

Expand Down

0 comments on commit 6a21ad2

Please sign in to comment.