Skip to content

Commit

Permalink
Restrict RecordReader and friends to POD types (#1132)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jan 11, 2022
1 parent 5302b92 commit ca6d938
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 11 deletions.
17 changes: 9 additions & 8 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{ConvertedType, Repetition, Type as PhysicalType};
use crate::column::page::PageIterator;
use crate::column::reader::ColumnReaderImpl;
use crate::data_type::private::ScalarDataType;
use crate::data_type::{
BoolType, ByteArrayType, DataType, DoubleType, FixedLenByteArrayType, FloatType,
Int32Type, Int64Type, Int96Type,
Expand Down Expand Up @@ -104,7 +105,7 @@ pub trait ArrayReader {
///
/// Returns the number of records read, which can be less than batch_size if
/// pages is exhausted.
fn read_records<T: DataType>(
fn read_records<T: ScalarDataType>(
record_reader: &mut RecordReader<T>,
pages: &mut dyn PageIterator,
batch_size: usize,
Expand Down Expand Up @@ -132,7 +133,7 @@ fn read_records<T: DataType>(

/// A NullArrayReader reads Parquet columns stored as null int32s with an Arrow
/// NullArray type.
pub struct NullArrayReader<T: DataType> {
pub struct NullArrayReader<T: ScalarDataType> {
data_type: ArrowType,
pages: Box<dyn PageIterator>,
def_levels_buffer: Option<Buffer>,
Expand All @@ -142,7 +143,7 @@ pub struct NullArrayReader<T: DataType> {
_type_marker: PhantomData<T>,
}

impl<T: DataType> NullArrayReader<T> {
impl<T: ScalarDataType> NullArrayReader<T> {
/// Construct null array reader.
pub fn new(pages: Box<dyn PageIterator>, column_desc: ColumnDescPtr) -> Result<Self> {
let record_reader = RecordReader::<T>::new(column_desc.clone());
Expand All @@ -160,7 +161,7 @@ impl<T: DataType> NullArrayReader<T> {
}

/// Implementation of primitive array reader.
impl<T: DataType> ArrayReader for NullArrayReader<T> {
impl<T: ScalarDataType> ArrayReader for NullArrayReader<T> {
fn as_any(&self) -> &dyn Any {
self
}
Expand Down Expand Up @@ -200,7 +201,7 @@ impl<T: DataType> ArrayReader for NullArrayReader<T> {

/// Primitive array readers are leaves of array reader tree. They accept page iterator
/// and read them into primitive arrays.
pub struct PrimitiveArrayReader<T: DataType> {
pub struct PrimitiveArrayReader<T: ScalarDataType> {
data_type: ArrowType,
pages: Box<dyn PageIterator>,
def_levels_buffer: Option<Buffer>,
Expand All @@ -210,7 +211,7 @@ pub struct PrimitiveArrayReader<T: DataType> {
_type_marker: PhantomData<T>,
}

impl<T: DataType> PrimitiveArrayReader<T> {
impl<T: ScalarDataType> PrimitiveArrayReader<T> {
/// Construct primitive array reader.
pub fn new(
pages: Box<dyn PageIterator>,
Expand Down Expand Up @@ -240,7 +241,7 @@ impl<T: DataType> PrimitiveArrayReader<T> {
}

/// Implementation of primitive array reader.
impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> {
impl<T: ScalarDataType> ArrayReader for PrimitiveArrayReader<T> {
fn as_any(&self) -> &dyn Any {
self
}
Expand Down Expand Up @@ -288,7 +289,7 @@ impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> {
}
};

// Convert to arrays by using the Parquet phyisical type.
// Convert to arrays by using the Parquet physical type.
// The physical types are then cast to Arrow types if necessary

let mut record_data = self.record_reader.consume_record_data()?;
Expand Down
6 changes: 3 additions & 3 deletions parquet/src/arrow/record_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::cmp::{max, min};
use std::mem::{replace, size_of};

use crate::column::{page::PageReader, reader::ColumnReaderImpl};
use crate::data_type::DataType;
use crate::data_type::private::ScalarDataType;
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use arrow::array::BooleanBufferBuilder;
Expand All @@ -29,7 +29,7 @@ use arrow::buffer::{Buffer, MutableBuffer};
const MIN_BATCH_SIZE: usize = 1024;

/// A `RecordReader` is a stateful column reader that delimits semantic records.
pub struct RecordReader<T: DataType> {
pub struct RecordReader<T: ScalarDataType> {
column_desc: ColumnDescPtr,

records: MutableBuffer,
Expand All @@ -47,7 +47,7 @@ pub struct RecordReader<T: DataType> {
values_written: usize,
}

impl<T: DataType> RecordReader<T> {
impl<T: ScalarDataType> RecordReader<T> {
pub fn new(column_schema: ColumnDescPtr) -> Self {
let (def_levels, null_map) = if column_schema.max_def_level() > 0 {
(
Expand Down
16 changes: 16 additions & 0 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ impl AsBytes for str {
}

pub(crate) mod private {
use super::*;
use crate::encodings::decoding::PlainDecoderDetails;
use crate::util::bit_util::{round_upto_power_of_2, BitReader, BitWriter};
use crate::util::memory::ByteBufferPtr;
Expand Down Expand Up @@ -1032,6 +1033,21 @@ pub(crate) mod private {
self
}
}

/// A marker trait for [`DataType`] with a [scalar] physical type
///
/// This means that a `[Self::T::default()]` of length `len` can be safely created from a
/// zero-initialized `[u8]` with length `len * Self::get_type_size()` and
/// alignment of `Self::get_type_size()`
///
/// [scalar]: https://doc.rust-lang.org/book/ch03-02-data-types.html#scalar-types
///
pub trait ScalarDataType: DataType {}
impl ScalarDataType for BoolType {}
impl ScalarDataType for Int32Type {}
impl ScalarDataType for Int64Type {}
impl ScalarDataType for FloatType {}
impl ScalarDataType for DoubleType {}
}

/// Contains the Parquet physical type information as well as the Rust primitive type
Expand Down

0 comments on commit ca6d938

Please sign in to comment.