Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added validation of number of encodings
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jun 7, 2022
1 parent 6608071 commit 69be888
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 5 deletions.
22 changes: 22 additions & 0 deletions src/io/parquet/write/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@ use super::{

/// Maps a [`Chunk`] and parquet-specific options to an [`RowGroupIter`] used to
/// write to parquet
/// # Panics
/// Iff
/// * `encodings.len() != fields.len()` or
/// * `encodings.len() != chunk.arrays().len()`
pub fn row_group_iter<A: AsRef<dyn Array> + 'static + Send + Sync>(
chunk: Chunk<A>,
encodings: Vec<Vec<Encoding>>,
fields: Vec<ParquetType>,
options: WriteOptions,
) -> RowGroupIter<'static, Error> {
assert_eq!(encodings.len(), fields.len());
assert_eq!(encodings.len(), chunk.arrays().len());
DynIter::new(
chunk
.into_arrays()
Expand Down Expand Up @@ -63,12 +69,22 @@ pub struct RowGroupIterator<A: AsRef<dyn Array> + 'static, I: Iterator<Item = Re

impl<A: AsRef<dyn Array> + 'static, I: Iterator<Item = Result<Chunk<A>>>> RowGroupIterator<A, I> {
/// Creates a new [`RowGroupIterator`] from an iterator over [`Chunk`].
///
/// # Errors
/// Iff
/// * the Arrow schema can't be converted to a valid Parquet schema.
/// * the length of the encodings is different from the number of fields in schema
pub fn try_new(
iter: I,
schema: &Schema,
options: WriteOptions,
encodings: Vec<Vec<Encoding>>,
) -> Result<Self> {
if encodings.len() != schema.fields.len() {
return Err(Error::InvalidArgumentError(
"The number of encodings must equal the number of fields".to_string(),
));
}
let parquet_schema = to_parquet_schema(schema)?;

Ok(Self {
Expand All @@ -95,6 +111,12 @@ impl<A: AsRef<dyn Array> + 'static + Send + Sync, I: Iterator<Item = Result<Chun

self.iter.next().map(|maybe_chunk| {
let chunk = maybe_chunk?;
if self.encodings.len() != chunk.arrays().len() {
return Err(Error::InvalidArgumentError(
"The number of arrays in the chunk must equal the number of fields in the schema"
.to_string(),
));
};
let encodings = self.encodings.clone();
Ok(row_group_iter(
chunk,
Expand Down
24 changes: 19 additions & 5 deletions src/io/parquet/write/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub struct FileSink<'a, W: AsyncWrite + Send + Unpin> {
writer: Option<FileStreamer<W>>,
task: Option<BoxFuture<'a, Result<Option<FileStreamer<W>>, Error>>>,
options: WriteOptions,
encoding: Vec<Vec<Encoding>>,
encodings: Vec<Vec<Encoding>>,
schema: Schema,
parquet_schema: SchemaDescriptor,
/// Key-value metadata that will be written to the file on close.
Expand All @@ -73,13 +73,21 @@ where
/// Create a new sink that writes arrays to the provided `writer`.
///
/// # Error
/// If the Arrow schema can't be converted to a valid Parquet schema.
/// Iff
/// * the Arrow schema can't be converted to a valid Parquet schema.
/// * the length of the encodings is different from the number of fields in schema
pub fn try_new(
writer: W,
schema: Schema,
encoding: Vec<Vec<Encoding>>,
encodings: Vec<Vec<Encoding>>,
options: WriteOptions,
) -> Result<Self, Error> {
if encodings.len() != schema.fields.len() {
return Err(Error::InvalidArgumentError(
"The number of encodings must equal the number of fields".to_string(),
));
}

let parquet_schema = crate::io::parquet::write::to_parquet_schema(&schema)?;
let created_by = Some("Arrow2 - Native Rust implementation of Arrow".to_string());
let writer = FileStreamer::new(
Expand All @@ -96,7 +104,7 @@ where
task: None,
options,
schema,
encoding,
encodings,
parquet_schema,
metadata: HashMap::default(),
})
Expand Down Expand Up @@ -146,11 +154,17 @@ where
type Error = Error;

fn start_send(self: Pin<&mut Self>, item: Chunk<Arc<dyn Array>>) -> Result<(), Self::Error> {
if self.schema.fields.len() != item.arrays().len() {
return Err(Error::InvalidArgumentError(
"The number of arrays in the chunk must equal the number of fields in the schema"
.to_string(),
));
}
let this = self.get_mut();
if let Some(mut writer) = this.writer.take() {
let rows = crate::io::parquet::write::row_group_iter(
item,
this.encoding.clone(),
this.encodings.clone(),
this.parquet_schema.fields().to_vec(),
this.options,
);
Expand Down

0 comments on commit 69be888

Please sign in to comment.