Skip to content

Commit

Permalink
Fix reading ipc files with unordered projections (#3966)
Browse files Browse the repository at this point in the history
  • Loading branch information
framlog authored Mar 28, 2023
1 parent 151ce6f commit 72794a4
Showing 1 changed file with 17 additions and 5 deletions.
22 changes: 17 additions & 5 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,15 +650,15 @@ pub fn read_record_batch(
// keep track of buffer and node index, the functions that create arrays mutate these
let mut buffer_index = 0;
let mut node_index = 0;
let mut arrays = vec![];

let options = RecordBatchOptions::new().with_row_count(Some(batch.length() as usize));

if let Some(projection) = projection {
let mut arrays = vec![];
// project fields
for (idx, field) in schema.fields().iter().enumerate() {
// Create array for projected field
if projection.contains(&idx) {
if let Some(proj_idx) = projection.iter().position(|p| p == &idx) {
let triple = create_array(
field_nodes,
field,
Expand All @@ -672,7 +672,7 @@ pub fn read_record_batch(
)?;
node_index = triple.1;
buffer_index = triple.2;
arrays.push(triple.0);
arrays.push((proj_idx, triple.0));
} else {
// Skip field.
// This must be called to advance `node_index` and `buffer_index`.
Expand All @@ -681,13 +681,14 @@ pub fn read_record_batch(
buffer_index = tuple.1;
}
}

arrays.sort_by_key(|t| t.0);
RecordBatch::try_new_with_options(
Arc::new(schema.project(projection)?),
arrays,
arrays.into_iter().map(|t| t.1).collect(),
&options,
)
} else {
let mut arrays = vec![];
// keep track of index as lists require more than one node
for field in schema.fields() {
let triple = create_array(
Expand Down Expand Up @@ -1423,6 +1424,17 @@ mod tests {
// check the projected column equals the expected column
assert_eq!(projected_column.as_ref(), expected_column.as_ref());
}

{
// read record batch with reversed projection
let reader = FileReader::try_new(
std::io::Cursor::new(buf.clone()),
Some(vec![3, 2, 1]),
);
let read_batch = reader.unwrap().next().unwrap().unwrap();
let expected_batch = batch.project(&[3, 2, 1]).unwrap();
assert_eq!(read_batch, expected_batch);
}
}

#[test]
Expand Down

0 comments on commit 72794a4

Please sign in to comment.