diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index 97f1f4e03fd..fd75ee4dc96 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -1,4 +1,4 @@ -use std::collections::VecDeque; +use std::collections::{HashMap, HashSet, VecDeque}; use std::io::{Read, Seek}; use std::sync::Arc; @@ -277,3 +277,44 @@ mod tests { ) } } + +pub fn prepare_projection( + fields: &[Field], + mut projection: Vec, +) -> (Vec, HashMap, Vec) { + assert_eq!( + projection.iter().collect::>().len(), + projection.len(), + "The projection on IPC must not contain duplicates" + ); + + let fields = projection.iter().map(|x| fields[*x].clone()).collect(); + + // selected index; index in + let sorted_projection = projection + .iter() + .copied() + .enumerate() + .map(|x| (x.1, x.0)) + .collect::>(); // e.g. [2, 1] -> {2: 0, 1: 1} + projection.sort_unstable(); // e.g. [2, 1] -> [1, 2] + + (projection, sorted_projection, fields) +} + +pub fn apply_projection( + chunk: Chunk>, + projection: &[usize], + map: &HashMap, +) -> Chunk> { + // re-order according to projection + let arrays = chunk.into_arrays(); + let arrays = projection + .iter() + .map(|x| { + let index = map.get(x).unwrap(); + arrays[*index].clone() + }) + .collect(); + Chunk::new(arrays) +} diff --git a/src/io/ipc/read/file_async.rs b/src/io/ipc/read/file_async.rs index e7bc04eb845..99ab8817cbb 100644 --- a/src/io/ipc/read/file_async.rs +++ b/src/io/ipc/read/file_async.rs @@ -1,4 +1,5 @@ //! Async reader for Arrow IPC files +use std::collections::HashMap; use std::io::SeekFrom; use std::sync::Arc; @@ -16,7 +17,7 @@ use crate::datatypes::{Field, Schema}; use crate::error::{ArrowError, Result}; use crate::io::ipc::{IpcSchema, ARROW_MAGIC, CONTINUATION_MARKER}; -use super::common::{read_dictionary, read_record_batch}; +use super::common::{apply_projection, prepare_projection, read_dictionary, read_record_batch}; use super::reader::get_serialized_batch; use super::schema::fb_to_schema; use super::Dictionaries; @@ -25,8 +26,8 @@ use super::FileMetadata; /// Async reader for Arrow IPC files pub struct FileStream<'a> { stream: BoxStream<'a, Result>>>, + schema: Option, metadata: FileMetadata, - schema: Schema, } impl<'a> FileStream<'a> { @@ -38,23 +39,15 @@ impl<'a> FileStream<'a> { where R: AsyncRead + AsyncSeek + Unpin + Send + 'a, { - let schema = if let Some(projection) = projection.as_ref() { - projection.windows(2).for_each(|x| { - assert!( - x[0] < x[1], - "IPC projection must be ordered and non-overlapping", - ) - }); - let fields = projection - .iter() - .map(|&x| metadata.schema.fields[x].clone()) - .collect::>(); - Schema { + let (projection, schema) = if let Some(projection) = projection { + let (p, h, fields) = prepare_projection(&metadata.schema.fields, projection); + let schema = Schema { fields, metadata: metadata.schema.metadata.clone(), - } + }; + (Some((p, h)), Some(schema)) } else { - metadata.schema.clone() + (None, None) }; let stream = Self::stream(reader, metadata.clone(), projection); @@ -72,13 +65,13 @@ impl<'a> FileStream<'a> { /// Get the projected schema from the IPC file. pub fn schema(&self) -> &Schema { - &self.schema + self.schema.as_ref().unwrap_or(&self.metadata.schema) } fn stream( mut reader: R, metadata: FileMetadata, - projection: Option>, + projection: Option<(Vec, HashMap)>, ) -> BoxStream<'a, Result>>> where R: AsyncRead + AsyncSeek + Unpin + Send + 'a, @@ -90,11 +83,19 @@ impl<'a> FileStream<'a> { let chunk = read_batch( &mut reader, &metadata, - projection.as_deref(), + projection.as_ref().map(|x| x.0.as_ref()), block, &mut meta_buffer, &mut block_buffer, ).await?; + + let chunk = if let Some((projection, map)) = &projection { + // re-order according to projection + apply_projection(chunk, projection, map) + } else { + chunk + }; + yield chunk; } } diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index d31e975d666..0ffab629793 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::convert::TryInto; use std::io::{Read, Seek, SeekFrom}; use std::sync::Arc; @@ -37,7 +38,7 @@ pub struct FileReader { reader: R, metadata: FileMetadata, current_block: usize, - projection: Option<(Vec, Schema)>, + projection: Option<(Vec, HashMap, Schema)>, buffer: Vec, } @@ -230,24 +231,13 @@ impl FileReader { /// # Panic /// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid) pub fn new(reader: R, metadata: FileMetadata, projection: Option>) -> Self { - if let Some(projection) = projection.as_ref() { - projection.windows(2).for_each(|x| { - assert!( - x[0] < x[1], - "The projection on IPC must be ordered and non-overlapping" - ); - }); - } let projection = projection.map(|projection| { - let fields = projection - .iter() - .map(|x| metadata.schema.fields[*x].clone()) - .collect(); + let (p, h, fields) = prepare_projection(&metadata.schema.fields, projection); let schema = Schema { fields, metadata: metadata.schema.metadata.clone(), }; - (projection, schema) + (p, h, schema) }); Self { reader, @@ -262,7 +252,7 @@ impl FileReader { pub fn schema(&self) -> &Schema { self.projection .as_ref() - .map(|x| &x.1) + .map(|x| &x.2) .unwrap_or(&self.metadata.schema) } @@ -285,13 +275,21 @@ impl Iterator for FileReader { if self.current_block < self.metadata.blocks.len() { let block = self.current_block; self.current_block += 1; - Some(read_batch( + let chunk = read_batch( &mut self.reader, &self.metadata, self.projection.as_ref().map(|x| x.0.as_ref()), block, &mut self.buffer, - )) + ); + + let chunk = if let Some((projection, map, _)) = &self.projection { + // re-order according to projection + chunk.map(|chunk| apply_projection(chunk, projection, map)) + } else { + chunk + }; + Some(chunk) } else { None } diff --git a/tests/it/io/ipc/read/file.rs b/tests/it/io/ipc/read/file.rs index 5afbc9b1771..75dc9655d73 100644 --- a/tests/it/io/ipc/read/file.rs +++ b/tests/it/io/ipc/read/file.rs @@ -157,7 +157,7 @@ fn read_generated_200_compression_zstd() -> Result<()> { test_file("2.0.0-compression", "generated_zstd") } -fn test_projection(version: &str, file_name: &str, column: usize) -> Result<()> { +fn test_projection(version: &str, file_name: &str, columns: Vec) -> Result<()> { let testdata = crate::test_util::arrow_test_data(); let mut file = File::open(format!( "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", @@ -165,12 +165,19 @@ fn test_projection(version: &str, file_name: &str, column: usize) -> Result<()> ))?; let metadata = read_file_metadata(&mut file)?; - let mut reader = FileReader::new(&mut file, metadata, Some(vec![column])); - assert_eq!(reader.schema().fields.len(), 1); + let expected = columns + .iter() + .copied() + .map(|x| metadata.schema.fields[x].clone()) + .collect::>(); + + let mut reader = FileReader::new(&mut file, metadata, Some(columns)); + + assert_eq!(reader.schema().fields, expected); reader.try_for_each(|rhs| { - assert_eq!(rhs?.arrays().len(), 1); + assert_eq!(rhs?.arrays().len(), expected.len()); Result::Ok(()) })?; Ok(()) @@ -178,7 +185,9 @@ fn test_projection(version: &str, file_name: &str, column: usize) -> Result<()> #[test] fn read_projected() -> Result<()> { - test_projection("1.0.0-littleendian", "generated_primitive", 1)?; - test_projection("1.0.0-littleendian", "generated_dictionary", 2)?; - test_projection("1.0.0-littleendian", "generated_nested", 0) + test_projection("1.0.0-littleendian", "generated_primitive", vec![1])?; + test_projection("1.0.0-littleendian", "generated_dictionary", vec![2])?; + test_projection("1.0.0-littleendian", "generated_nested", vec![0])?; + + test_projection("1.0.0-littleendian", "generated_primitive", vec![2, 1]) }