Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AsyncFileReader trait #1803

Merged
merged 3 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 157 additions & 42 deletions parquet/src/arrow/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,13 @@
use std::collections::VecDeque;
use std::fmt::Formatter;
use std::io::{Cursor, SeekFrom};
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, Bytes};
use futures::future::{BoxFuture, FutureExt};
use futures::stream::Stream;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
Expand All @@ -101,8 +103,65 @@ use crate::file::footer::parse_metadata_buffer;
use crate::file::metadata::ParquetMetaData;
use crate::file::reader::SerializedPageReader;
use crate::file::PARQUET_MAGIC;
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
use crate::util::memory::ByteBufferPtr;
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};

/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files
pub trait AsyncFileReader {
/// Retrieve the bytes in `range`
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;

/// Provides asynchronous access to the [`ParquetMetaData`] of a parquet file,
/// allowing fine-grained control over how metadata is sourced, in particular allowing
/// for caching, pre-fetching, catalog metadata, etc...
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it a little strange that the get_metadata is part of AsyncChunkReader as I would have expected the "read bytes" and "logically read and decode parquet data" more separated

Would it make sense to consider two separate traits? Something like the following perhaps 🤔

/// A reader that can asynchronously read a range of bytes
pub trait AsyncChunkReader: Send + Unpin + 'static {
    /// Retrieve the bytes in `range`
    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
}

/// Returns parquet metadata, possibly asynchronously 
pub trait AsyncParquetReader: Send + Unpin + 'static {
 /// Retrieve the [`ParquetMetaData`] for this file
    fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;
}

Or maybe call it AsyncChunkedParquetReader? (though I admit that is getting to be a mouthful)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsyncParquetFile?

I agree it is a little odd, but I want to give flexibility to how this metadata is sourced to allow for caching, pre-fetching, etc...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ParquetReaderAccess?

The usecase of cached / catalog'd metadata is a good one -- perhaps we can just add a comment explaining the rationale.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've split the traits up, but it feels very odd to me to have two traits implemented on the same type that is then passed to ParquetRecordBatchStreamBuilder::new...

I'll revisit in the morning, I feel this has just made it more confusing tbh...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree it is now more confusing -- hoping a good night sleep will make it clearer. I am happy with whatever you decide

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with AsyncFileReader and added a load of doc comments

}

impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
async move {
self.seek(SeekFrom::Start(range.start as u64)).await?;
Comment on lines +121 to +122
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, as get_bytes and get_metadata both async, and they both call seek. Is any chance there will be race condition between then? For example, calling get_bytes first but the file pos is changed by call get_metadata next?

Copy link
Contributor Author

@tustvold tustvold Jun 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method takes a mutable reference, and return a future with the same lifetime, so they can't race


let to_read = range.end - range.start;
let mut buffer = Vec::with_capacity(to_read);
let read = self.take(to_read as u64).read_to_end(&mut buffer).await?;
if read != to_read {
eof_err!("expected to read {} bytes, got {}", to_read, read);
}

Ok(buffer.into())
}
.boxed()
}

fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
async move {
self.seek(SeekFrom::End(-8)).await?;

let mut buf = [0_u8; 8];
self.read_exact(&mut buf).await?;

if buf[4..] != PARQUET_MAGIC {
return Err(general_err!("Invalid Parquet file. Corrupt footer"));
}

let metadata_len = LittleEndian::read_i32(&buf[..4]) as i64;
if metadata_len < 0 {
return Err(general_err!(
"Invalid Parquet file. Metadata length is less than zero ({})",
metadata_len
));
}

self.seek(SeekFrom::End(-8 - metadata_len)).await?;

let mut buf = Vec::with_capacity(metadata_len as usize + 8);
self.read_to_end(&mut buf).await?;

Ok(Arc::new(parse_metadata_buffer(&mut Cursor::new(buf))?))
}
.boxed()
}
}

/// A builder used to construct a [`ParquetRecordBatchStream`] for a parquet file
///
Expand All @@ -124,10 +183,10 @@ pub struct ParquetRecordBatchStreamBuilder<T> {
projection: ProjectionMask,
}

impl<T: AsyncRead + AsyncSeek + Unpin> ParquetRecordBatchStreamBuilder<T> {
impl<T: AsyncFileReader> ParquetRecordBatchStreamBuilder<T> {
/// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file
pub async fn new(mut input: T) -> Result<Self> {
let metadata = Arc::new(read_footer(&mut input).await?);
let metadata = input.get_metadata().await?;

let schema = Arc::new(parquet_to_arrow_schema(
metadata.file_metadata().schema_descr(),
Expand All @@ -149,6 +208,11 @@ impl<T: AsyncRead + AsyncSeek + Unpin> ParquetRecordBatchStreamBuilder<T> {
&self.metadata
}

/// Returns the parquet [`SchemaDescriptor`] for this parquet file
pub fn parquet_schema(&self) -> &SchemaDescriptor {
self.metadata.file_metadata().schema_descr()
}

/// Returns the arrow [`SchemaRef`] for this parquet file
pub fn schema(&self) -> &SchemaRef {
&self.schema
Expand Down Expand Up @@ -264,8 +328,9 @@ impl<T> ParquetRecordBatchStream<T> {
}
}

impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> Stream
for ParquetRecordBatchStream<T>
impl<T> Stream for ParquetRecordBatchStream<T>
where
T: AsyncFileReader + Unpin + Send + 'static,
{
type Item = Result<RecordBatch>;

Expand Down Expand Up @@ -309,25 +374,24 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> Stream
let mut column_chunks =
vec![None; row_group_metadata.columns().len()];

// TODO: Combine consecutive ranges
for (idx, chunk) in column_chunks.iter_mut().enumerate() {
if !projection.leaf_included(idx) {
continue;
}

let column = row_group_metadata.column(idx);
let (start, length) = column.byte_range();
let end = start + length;

input.seek(SeekFrom::Start(start)).await?;

let mut buffer = vec![0_u8; (end - start) as usize];
input.read_exact(buffer.as_mut_slice()).await?;
let data = input
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code is just factored into AsyncChunkReader::get_bytes correct?

.get_bytes(start as usize..(start + length) as usize)
.await?;

*chunk = Some(InMemoryColumnChunk {
num_values: column.num_values(),
compression: column.compression(),
physical_type: column.column_type(),
data: ByteBufferPtr::new(buffer),
data,
});
}

Expand Down Expand Up @@ -379,34 +443,6 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> Stream
}
}

async fn read_footer<T: AsyncRead + AsyncSeek + Unpin>(
input: &mut T,
) -> Result<ParquetMetaData> {
input.seek(SeekFrom::End(-8)).await?;

let mut buf = [0_u8; 8];
input.read_exact(&mut buf).await?;

if buf[4..] != PARQUET_MAGIC {
return Err(general_err!("Invalid Parquet file. Corrupt footer"));
}

let metadata_len = LittleEndian::read_i32(&buf[..4]) as i64;
if metadata_len < 0 {
return Err(general_err!(
"Invalid Parquet file. Metadata length is less than zero ({})",
metadata_len
));
}

input.seek(SeekFrom::End(-8 - metadata_len)).await?;

let mut buf = Vec::with_capacity(metadata_len as usize + 8);
input.read_to_end(&mut buf).await?;

parse_metadata_buffer(&mut Cursor::new(buf))
}

struct InMemoryRowGroup {
schema: SchemaDescPtr,
column_chunks: Vec<Option<InMemoryColumnChunk>>,
Expand Down Expand Up @@ -438,13 +474,13 @@ struct InMemoryColumnChunk {
num_values: i64,
compression: Compression,
physical_type: crate::basic::Type,
data: ByteBufferPtr,
data: Bytes,
}

impl InMemoryColumnChunk {
fn pages(&self) -> Result<Box<dyn PageReader>> {
let page_reader = SerializedPageReader::new(
Cursor::new(self.data.clone()),
self.data.clone().reader(),
self.num_values,
self.compression,
self.physical_type,
Expand Down Expand Up @@ -477,3 +513,82 @@ impl PageIterator for ColumnChunkIterator {
Ok(self.column_schema.clone())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::arrow::{ArrowReader, ParquetFileArrowReader};
use arrow::error::Result as ArrowResult;
use futures::TryStreamExt;
use std::sync::Mutex;

struct TestReader {
data: Bytes,
metadata: Arc<ParquetMetaData>,
requests: Arc<Mutex<Vec<Range<usize>>>>,
}

impl AsyncFileReader for TestReader {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
self.requests.lock().unwrap().push(range.clone());
futures::future::ready(Ok(self.data.slice(range))).boxed()
}

fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
futures::future::ready(Ok(self.metadata.clone())).boxed()
}
}

#[tokio::test]
async fn test_async_reader() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_plain.parquet", testdata);
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = crate::file::footer::parse_metadata(&data).unwrap();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);

let async_reader = TestReader {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
};

let requests = async_reader.requests.clone();
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
.await
.unwrap();

let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
let stream = builder
.with_projection(mask.clone())
.with_batch_size(1024)
.build()
.unwrap();

let async_batches: Vec<_> = stream.try_collect().await.unwrap();

let mut sync_reader = ParquetFileArrowReader::try_new(data).unwrap();
let sync_batches = sync_reader
.get_record_reader_by_columns(mask, 1024)
.unwrap()
.collect::<ArrowResult<Vec<_>>>()
.unwrap();

assert_eq!(async_batches, sync_batches);

let requests = requests.lock().unwrap();
let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();

assert_eq!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

&requests[..],
&[
offset_1 as usize..(offset_1 + length_1) as usize,
offset_2 as usize..(offset_2 + length_2) as usize
]
);
}
}
7 changes: 6 additions & 1 deletion parquet/src/util/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use std::{
/// when all slices are dropped.
///
/// TODO: Remove and replace with [`bytes::Bytes`]
#[allow(clippy::rc_buffer)]
#[derive(Clone, Debug)]
pub struct ByteBufferPtr {
data: Bytes,
Expand Down Expand Up @@ -109,6 +108,12 @@ impl From<Vec<u8>> for ByteBufferPtr {
}
}

impl From<Bytes> for ByteBufferPtr {
fn from(data: Bytes) -> Self {
Self { data }
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down