From 97e0c6052630421043840fa11eec33bd4bbc61ff Mon Sep 17 00:00:00 2001 From: framlog Date: Tue, 28 Mar 2023 17:41:45 +0800 Subject: [PATCH] Fix reading ipc files with unordered projections --- arrow-ipc/src/reader.rs | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index bb367f9447d5..bd7e33185a40 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -650,15 +650,15 @@ pub fn read_record_batch( // keep track of buffer and node index, the functions that create arrays mutate these let mut buffer_index = 0; let mut node_index = 0; - let mut arrays = vec![]; let options = RecordBatchOptions::new().with_row_count(Some(batch.length() as usize)); if let Some(projection) = projection { + let mut arrays = vec![]; // project fields for (idx, field) in schema.fields().iter().enumerate() { // Create array for projected field - if projection.contains(&idx) { + if let Some(proj_idx) = projection.iter().position(|p| p == &idx) { let triple = create_array( field_nodes, field, @@ -672,7 +672,7 @@ pub fn read_record_batch( )?; node_index = triple.1; buffer_index = triple.2; - arrays.push(triple.0); + arrays.push((proj_idx, triple.0)); } else { // Skip field. // This must be called to advance `node_index` and `buffer_index`. @@ -681,13 +681,14 @@ pub fn read_record_batch( buffer_index = tuple.1; } } - + arrays.sort_by_key(|t| t.0); RecordBatch::try_new_with_options( Arc::new(schema.project(projection)?), - arrays, + arrays.into_iter().map(|t| t.1).collect(), &options, ) } else { + let mut arrays = vec![]; // keep track of index as lists require more than one node for field in schema.fields() { let triple = create_array( @@ -1423,6 +1424,17 @@ mod tests { // check the projected column equals the expected column assert_eq!(projected_column.as_ref(), expected_column.as_ref()); } + + { + // read record batch with reversed projection + let reader = FileReader::try_new( + std::io::Cursor::new(buf.clone()), + Some(vec![3, 2, 1]), + ); + let read_batch = reader.unwrap().next().unwrap().unwrap(); + let expected_batch = batch.project(&[3, 2, 1]).unwrap(); + assert_eq!(read_batch, expected_batch); + } } #[test]