From 55d8eed830cfc183ca15637b1cb8534b5695bad9 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 18 Jun 2022 10:23:50 +0000 Subject: [PATCH] Fixed IPC projection --- src/io/ipc/read/common.rs | 36 +++++++++++++++++------------------ src/io/ipc/read/file_async.rs | 4 ++-- src/io/ipc/read/reader.rs | 4 ++-- tests/it/io/ipc/read/file.rs | 26 +++++++++++++++++++------ 4 files changed, 42 insertions(+), 28 deletions(-) diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index 1190b158879..3e218e7ccaf 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -285,31 +285,31 @@ pub fn prepare_projection( let fields = projection.iter().map(|x| fields[*x].clone()).collect(); - // selected index; index in - let sorted_projection = projection - .iter() - .copied() - .enumerate() - .map(|x| (x.1, x.0)) - .collect::>(); // e.g. [2, 1] -> {2: 0, 1: 1} - projection.sort_unstable(); // e.g. [2, 1] -> [1, 2] + // todo: find way to do this more efficiently + let mut indices = (0..projection.len()).collect::>(); + indices.sort_unstable_by_key(|&i| &projection[i]); + let map = indices.iter().copied().enumerate().fold( + HashMap::default(), + |mut acc, (index, new_index)| { + if !acc.contains_key(&new_index) { + acc.insert(index, new_index); + }; + acc + }, + ); + projection.sort_unstable(); - (projection, sorted_projection, fields) + (projection, map, fields) } pub fn apply_projection( chunk: Chunk>, - projection: &[usize], map: &HashMap, ) -> Chunk> { // re-order according to projection - let arrays = chunk.into_arrays(); - let arrays = projection - .iter() - .map(|x| { - let index = map.get(x).unwrap(); - arrays[*index].clone() - }) - .collect(); + let mut arrays = chunk.into_arrays(); + map.iter().for_each(|(old, new)| { + arrays.swap(*old, *new); + }); Chunk::new(arrays) } diff --git a/src/io/ipc/read/file_async.rs b/src/io/ipc/read/file_async.rs index 90f90cadc6b..79432785ce1 100644 --- a/src/io/ipc/read/file_async.rs +++ b/src/io/ipc/read/file_async.rs @@ -89,9 +89,9 @@ impl<'a> FileStream<'a> { &mut block_buffer, ).await?; - let chunk = if let Some((projection, map)) = &projection { + let chunk = if let Some((_, map)) = &projection { // re-order according to projection - apply_projection(chunk, projection, map) + apply_projection(chunk, map) } else { chunk }; diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index 3bd82591d7f..5c3f3578351 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -342,9 +342,9 @@ impl Iterator for FileReader { &mut self.buffer, ); - let chunk = if let Some((projection, map, _)) = &self.projection { + let chunk = if let Some((_, map, _)) = &self.projection { // re-order according to projection - chunk.map(|chunk| apply_projection(chunk, projection, map)) + chunk.map(|chunk| apply_projection(chunk, map)) } else { chunk }; diff --git a/tests/it/io/ipc/read/file.rs b/tests/it/io/ipc/read/file.rs index 75dc9655d73..c54804fd183 100644 --- a/tests/it/io/ipc/read/file.rs +++ b/tests/it/io/ipc/read/file.rs @@ -1,5 +1,6 @@ use std::fs::File; +use arrow2::chunk::Chunk; use arrow2::error::Result; use arrow2::io::ipc::read::*; @@ -166,18 +167,29 @@ fn test_projection(version: &str, file_name: &str, columns: Vec) -> Resul let metadata = read_file_metadata(&mut file)?; - let expected = columns + let (_, _, chunks) = read_gzip_json(version, file_name)?; + + let expected_fields = columns .iter() .copied() .map(|x| metadata.schema.fields[x].clone()) .collect::>(); - let mut reader = FileReader::new(&mut file, metadata, Some(columns)); + let expected_chunks = chunks.into_iter().map(|chunk| { + let columns = columns + .iter() + .copied() + .map(|x| chunk.arrays()[x].clone()) + .collect::>(); + Chunk::new(columns) + }); + + let reader = FileReader::new(&mut file, metadata, Some(columns.clone())); - assert_eq!(reader.schema().fields, expected); + assert_eq!(reader.schema().fields, expected_fields); - reader.try_for_each(|rhs| { - assert_eq!(rhs?.arrays().len(), expected.len()); + reader.zip(expected_chunks).try_for_each(|(lhs, rhs)| { + assert_eq!(&lhs?.arrays()[0], &rhs.arrays()[0]); Result::Ok(()) })?; Ok(()) @@ -189,5 +201,7 @@ fn read_projected() -> Result<()> { test_projection("1.0.0-littleendian", "generated_dictionary", vec![2])?; test_projection("1.0.0-littleendian", "generated_nested", vec![0])?; - test_projection("1.0.0-littleendian", "generated_primitive", vec![2, 1]) + test_projection("1.0.0-littleendian", "generated_primitive", vec![2, 1])?; + + test_projection("1.0.0-littleendian", "generated_primitive", vec![0, 2, 1]) }