Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support for non-ordered projections to IPC reading #961

Merged
merged 1 commit into from
Apr 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::VecDeque;
use std::collections::{HashMap, HashSet, VecDeque};
use std::io::{Read, Seek};
use std::sync::Arc;

Expand Down Expand Up @@ -277,3 +277,44 @@ mod tests {
)
}
}

pub fn prepare_projection(
fields: &[Field],
mut projection: Vec<usize>,
) -> (Vec<usize>, HashMap<usize, usize>, Vec<Field>) {
assert_eq!(
projection.iter().collect::<HashSet<_>>().len(),
projection.len(),
"The projection on IPC must not contain duplicates"
);

let fields = projection.iter().map(|x| fields[*x].clone()).collect();

// selected index; index in
let sorted_projection = projection
.iter()
.copied()
.enumerate()
.map(|x| (x.1, x.0))
.collect::<HashMap<_, _>>(); // e.g. [2, 1] -> {2: 0, 1: 1}
projection.sort_unstable(); // e.g. [2, 1] -> [1, 2]

(projection, sorted_projection, fields)
}

pub fn apply_projection(
chunk: Chunk<Arc<dyn Array>>,
projection: &[usize],
map: &HashMap<usize, usize>,
) -> Chunk<Arc<dyn Array>> {
// re-order according to projection
let arrays = chunk.into_arrays();
let arrays = projection
.iter()
.map(|x| {
let index = map.get(x).unwrap();
arrays[*index].clone()
})
.collect();
Chunk::new(arrays)
}
39 changes: 20 additions & 19 deletions src/io/ipc/read/file_async.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Async reader for Arrow IPC files
use std::collections::HashMap;
use std::io::SeekFrom;
use std::sync::Arc;

Expand All @@ -16,7 +17,7 @@ use crate::datatypes::{Field, Schema};
use crate::error::{ArrowError, Result};
use crate::io::ipc::{IpcSchema, ARROW_MAGIC, CONTINUATION_MARKER};

use super::common::{read_dictionary, read_record_batch};
use super::common::{apply_projection, prepare_projection, read_dictionary, read_record_batch};
use super::reader::get_serialized_batch;
use super::schema::fb_to_schema;
use super::Dictionaries;
Expand All @@ -25,8 +26,8 @@ use super::FileMetadata;
/// Async reader for Arrow IPC files
pub struct FileStream<'a> {
stream: BoxStream<'a, Result<Chunk<Arc<dyn Array>>>>,
schema: Option<Schema>,
metadata: FileMetadata,
schema: Schema,
}

impl<'a> FileStream<'a> {
Expand All @@ -38,23 +39,15 @@ impl<'a> FileStream<'a> {
where
R: AsyncRead + AsyncSeek + Unpin + Send + 'a,
{
let schema = if let Some(projection) = projection.as_ref() {
projection.windows(2).for_each(|x| {
assert!(
x[0] < x[1],
"IPC projection must be ordered and non-overlapping",
)
});
let fields = projection
.iter()
.map(|&x| metadata.schema.fields[x].clone())
.collect::<Vec<_>>();
Schema {
let (projection, schema) = if let Some(projection) = projection {
let (p, h, fields) = prepare_projection(&metadata.schema.fields, projection);
let schema = Schema {
fields,
metadata: metadata.schema.metadata.clone(),
}
};
(Some((p, h)), Some(schema))
} else {
metadata.schema.clone()
(None, None)
};

let stream = Self::stream(reader, metadata.clone(), projection);
Expand All @@ -72,13 +65,13 @@ impl<'a> FileStream<'a> {

/// Get the projected schema from the IPC file.
pub fn schema(&self) -> &Schema {
&self.schema
self.schema.as_ref().unwrap_or(&self.metadata.schema)
}

fn stream<R>(
mut reader: R,
metadata: FileMetadata,
projection: Option<Vec<usize>>,
projection: Option<(Vec<usize>, HashMap<usize, usize>)>,
) -> BoxStream<'a, Result<Chunk<Arc<dyn Array>>>>
where
R: AsyncRead + AsyncSeek + Unpin + Send + 'a,
Expand All @@ -90,11 +83,19 @@ impl<'a> FileStream<'a> {
let chunk = read_batch(
&mut reader,
&metadata,
projection.as_deref(),
projection.as_ref().map(|x| x.0.as_ref()),
block,
&mut meta_buffer,
&mut block_buffer,
).await?;

let chunk = if let Some((projection, map)) = &projection {
// re-order according to projection
apply_projection(chunk, projection, map)
} else {
chunk
};

yield chunk;
}
}
Expand Down
32 changes: 15 additions & 17 deletions src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::convert::TryInto;
use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;
Expand Down Expand Up @@ -37,7 +38,7 @@ pub struct FileReader<R: Read + Seek> {
reader: R,
metadata: FileMetadata,
current_block: usize,
projection: Option<(Vec<usize>, Schema)>,
projection: Option<(Vec<usize>, HashMap<usize, usize>, Schema)>,
buffer: Vec<u8>,
}

Expand Down Expand Up @@ -230,24 +231,13 @@ impl<R: Read + Seek> FileReader<R> {
/// # Panic
/// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid)
pub fn new(reader: R, metadata: FileMetadata, projection: Option<Vec<usize>>) -> Self {
if let Some(projection) = projection.as_ref() {
projection.windows(2).for_each(|x| {
assert!(
x[0] < x[1],
"The projection on IPC must be ordered and non-overlapping"
);
});
}
let projection = projection.map(|projection| {
let fields = projection
.iter()
.map(|x| metadata.schema.fields[*x].clone())
.collect();
let (p, h, fields) = prepare_projection(&metadata.schema.fields, projection);
let schema = Schema {
fields,
metadata: metadata.schema.metadata.clone(),
};
(projection, schema)
(p, h, schema)
});
Self {
reader,
Expand All @@ -262,7 +252,7 @@ impl<R: Read + Seek> FileReader<R> {
pub fn schema(&self) -> &Schema {
self.projection
.as_ref()
.map(|x| &x.1)
.map(|x| &x.2)
.unwrap_or(&self.metadata.schema)
}

Expand All @@ -285,13 +275,21 @@ impl<R: Read + Seek> Iterator for FileReader<R> {
if self.current_block < self.metadata.blocks.len() {
let block = self.current_block;
self.current_block += 1;
Some(read_batch(
let chunk = read_batch(
&mut self.reader,
&self.metadata,
self.projection.as_ref().map(|x| x.0.as_ref()),
block,
&mut self.buffer,
))
);

let chunk = if let Some((projection, map, _)) = &self.projection {
// re-order according to projection
chunk.map(|chunk| apply_projection(chunk, projection, map))
} else {
chunk
};
Some(chunk)
} else {
None
}
Expand Down
23 changes: 16 additions & 7 deletions tests/it/io/ipc/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,28 +157,37 @@ fn read_generated_200_compression_zstd() -> Result<()> {
test_file("2.0.0-compression", "generated_zstd")
}

fn test_projection(version: &str, file_name: &str, column: usize) -> Result<()> {
fn test_projection(version: &str, file_name: &str, columns: Vec<usize>) -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let mut file = File::open(format!(
"{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
testdata, version, file_name
))?;

let metadata = read_file_metadata(&mut file)?;
let mut reader = FileReader::new(&mut file, metadata, Some(vec![column]));

assert_eq!(reader.schema().fields.len(), 1);
let expected = columns
.iter()
.copied()
.map(|x| metadata.schema.fields[x].clone())
.collect::<Vec<_>>();

let mut reader = FileReader::new(&mut file, metadata, Some(columns));

assert_eq!(reader.schema().fields, expected);

reader.try_for_each(|rhs| {
assert_eq!(rhs?.arrays().len(), 1);
assert_eq!(rhs?.arrays().len(), expected.len());
Result::Ok(())
})?;
Ok(())
}

#[test]
fn read_projected() -> Result<()> {
test_projection("1.0.0-littleendian", "generated_primitive", 1)?;
test_projection("1.0.0-littleendian", "generated_dictionary", 2)?;
test_projection("1.0.0-littleendian", "generated_nested", 0)
test_projection("1.0.0-littleendian", "generated_primitive", vec![1])?;
test_projection("1.0.0-littleendian", "generated_dictionary", vec![2])?;
test_projection("1.0.0-littleendian", "generated_nested", vec![0])?;

test_projection("1.0.0-littleendian", "generated_primitive", vec![2, 1])
}