Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added non-ordered projections in IPC
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Apr 22, 2022
1 parent 04605a2 commit ce45bde
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 44 deletions.
43 changes: 42 additions & 1 deletion src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::VecDeque;
use std::collections::{HashMap, HashSet, VecDeque};
use std::io::{Read, Seek};
use std::sync::Arc;

Expand Down Expand Up @@ -277,3 +277,44 @@ mod tests {
)
}
}

pub fn prepare_projection(
fields: &[Field],
mut projection: Vec<usize>,
) -> (Vec<usize>, HashMap<usize, usize>, Vec<Field>) {
assert_eq!(
projection.iter().collect::<HashSet<_>>().len(),
projection.len(),
"The projection on IPC must not contain duplicates"
);

let fields = projection.iter().map(|x| fields[*x].clone()).collect();

// selected index; index in
let sorted_projection = projection
.iter()
.copied()
.enumerate()
.map(|x| (x.1, x.0))
.collect::<HashMap<_, _>>(); // e.g. [2, 1] -> {2: 0, 1: 1}
projection.sort_unstable(); // e.g. [2, 1] -> [1, 2]

(projection, sorted_projection, fields)
}

pub fn apply_projection(
chunk: Chunk<Arc<dyn Array>>,
projection: &[usize],
map: &HashMap<usize, usize>,
) -> Chunk<Arc<dyn Array>> {
// re-order according to projection
let arrays = chunk.into_arrays();
let arrays = projection
.iter()
.map(|x| {
let index = map.get(x).unwrap();
arrays[*index].clone()
})
.collect();
Chunk::new(arrays)
}
39 changes: 20 additions & 19 deletions src/io/ipc/read/file_async.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Async reader for Arrow IPC files
use std::collections::HashMap;
use std::io::SeekFrom;
use std::sync::Arc;

Expand All @@ -16,7 +17,7 @@ use crate::datatypes::{Field, Schema};
use crate::error::{ArrowError, Result};
use crate::io::ipc::{IpcSchema, ARROW_MAGIC, CONTINUATION_MARKER};

use super::common::{read_dictionary, read_record_batch};
use super::common::{apply_projection, prepare_projection, read_dictionary, read_record_batch};
use super::reader::get_serialized_batch;
use super::schema::fb_to_schema;
use super::Dictionaries;
Expand All @@ -25,8 +26,8 @@ use super::FileMetadata;
/// Async reader for Arrow IPC files
pub struct FileStream<'a> {
stream: BoxStream<'a, Result<Chunk<Arc<dyn Array>>>>,
schema: Option<Schema>,
metadata: FileMetadata,
schema: Schema,
}

impl<'a> FileStream<'a> {
Expand All @@ -38,23 +39,15 @@ impl<'a> FileStream<'a> {
where
R: AsyncRead + AsyncSeek + Unpin + Send + 'a,
{
let schema = if let Some(projection) = projection.as_ref() {
projection.windows(2).for_each(|x| {
assert!(
x[0] < x[1],
"IPC projection must be ordered and non-overlapping",
)
});
let fields = projection
.iter()
.map(|&x| metadata.schema.fields[x].clone())
.collect::<Vec<_>>();
Schema {
let (projection, schema) = if let Some(projection) = projection {
let (p, h, fields) = prepare_projection(&metadata.schema.fields, projection);
let schema = Schema {
fields,
metadata: metadata.schema.metadata.clone(),
}
};
(Some((p, h)), Some(schema))
} else {
metadata.schema.clone()
(None, None)
};

let stream = Self::stream(reader, metadata.clone(), projection);
Expand All @@ -72,13 +65,13 @@ impl<'a> FileStream<'a> {

/// Get the projected schema from the IPC file.
pub fn schema(&self) -> &Schema {
&self.schema
self.schema.as_ref().unwrap_or(&self.metadata.schema)
}

fn stream<R>(
mut reader: R,
metadata: FileMetadata,
projection: Option<Vec<usize>>,
projection: Option<(Vec<usize>, HashMap<usize, usize>)>,
) -> BoxStream<'a, Result<Chunk<Arc<dyn Array>>>>
where
R: AsyncRead + AsyncSeek + Unpin + Send + 'a,
Expand All @@ -90,11 +83,19 @@ impl<'a> FileStream<'a> {
let chunk = read_batch(
&mut reader,
&metadata,
projection.as_deref(),
projection.as_ref().map(|x| x.0.as_ref()),
block,
&mut meta_buffer,
&mut block_buffer,
).await?;

let chunk = if let Some((projection, map)) = &projection {
// re-order according to projection
apply_projection(chunk, projection, map)
} else {
chunk
};

yield chunk;
}
}
Expand Down
32 changes: 15 additions & 17 deletions src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::convert::TryInto;
use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;
Expand Down Expand Up @@ -37,7 +38,7 @@ pub struct FileReader<R: Read + Seek> {
reader: R,
metadata: FileMetadata,
current_block: usize,
projection: Option<(Vec<usize>, Schema)>,
projection: Option<(Vec<usize>, HashMap<usize, usize>, Schema)>,
buffer: Vec<u8>,
}

Expand Down Expand Up @@ -230,24 +231,13 @@ impl<R: Read + Seek> FileReader<R> {
/// # Panic
/// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid)
pub fn new(reader: R, metadata: FileMetadata, projection: Option<Vec<usize>>) -> Self {
if let Some(projection) = projection.as_ref() {
projection.windows(2).for_each(|x| {
assert!(
x[0] < x[1],
"The projection on IPC must be ordered and non-overlapping"
);
});
}
let projection = projection.map(|projection| {
let fields = projection
.iter()
.map(|x| metadata.schema.fields[*x].clone())
.collect();
let (p, h, fields) = prepare_projection(&metadata.schema.fields, projection);
let schema = Schema {
fields,
metadata: metadata.schema.metadata.clone(),
};
(projection, schema)
(p, h, schema)
});
Self {
reader,
Expand All @@ -262,7 +252,7 @@ impl<R: Read + Seek> FileReader<R> {
pub fn schema(&self) -> &Schema {
self.projection
.as_ref()
.map(|x| &x.1)
.map(|x| &x.2)
.unwrap_or(&self.metadata.schema)
}

Expand All @@ -285,13 +275,21 @@ impl<R: Read + Seek> Iterator for FileReader<R> {
if self.current_block < self.metadata.blocks.len() {
let block = self.current_block;
self.current_block += 1;
Some(read_batch(
let chunk = read_batch(
&mut self.reader,
&self.metadata,
self.projection.as_ref().map(|x| x.0.as_ref()),
block,
&mut self.buffer,
))
);

let chunk = if let Some((projection, map, _)) = &self.projection {
// re-order according to projection
chunk.map(|chunk| apply_projection(chunk, projection, map))
} else {
chunk
};
Some(chunk)
} else {
None
}
Expand Down
23 changes: 16 additions & 7 deletions tests/it/io/ipc/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,28 +157,37 @@ fn read_generated_200_compression_zstd() -> Result<()> {
test_file("2.0.0-compression", "generated_zstd")
}

fn test_projection(version: &str, file_name: &str, column: usize) -> Result<()> {
fn test_projection(version: &str, file_name: &str, columns: Vec<usize>) -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let mut file = File::open(format!(
"{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
testdata, version, file_name
))?;

let metadata = read_file_metadata(&mut file)?;
let mut reader = FileReader::new(&mut file, metadata, Some(vec![column]));

assert_eq!(reader.schema().fields.len(), 1);
let expected = columns
.iter()
.copied()
.map(|x| metadata.schema.fields[x].clone())
.collect::<Vec<_>>();

let mut reader = FileReader::new(&mut file, metadata, Some(columns));

assert_eq!(reader.schema().fields, expected);

reader.try_for_each(|rhs| {
assert_eq!(rhs?.arrays().len(), 1);
assert_eq!(rhs?.arrays().len(), expected.len());
Result::Ok(())
})?;
Ok(())
}

#[test]
fn read_projected() -> Result<()> {
test_projection("1.0.0-littleendian", "generated_primitive", 1)?;
test_projection("1.0.0-littleendian", "generated_dictionary", 2)?;
test_projection("1.0.0-littleendian", "generated_nested", 0)
test_projection("1.0.0-littleendian", "generated_primitive", vec![1])?;
test_projection("1.0.0-littleendian", "generated_dictionary", vec![2])?;
test_projection("1.0.0-littleendian", "generated_nested", vec![0])?;

test_projection("1.0.0-littleendian", "generated_primitive", vec![2, 1])
}

0 comments on commit ce45bde

Please sign in to comment.