diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index 5cd091184bfa..8f8268f4a5cf 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -78,11 +78,13 @@ use std::collections::VecDeque; use std::fmt::Formatter; use std::io::{Cursor, SeekFrom}; +use std::ops::Range; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use byteorder::{ByteOrder, LittleEndian}; +use bytes::{Buf, Bytes}; use futures::future::{BoxFuture, FutureExt}; use futures::stream::Stream; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; @@ -101,8 +103,65 @@ use crate::file::footer::parse_metadata_buffer; use crate::file::metadata::ParquetMetaData; use crate::file::reader::SerializedPageReader; use crate::file::PARQUET_MAGIC; -use crate::schema::types::{ColumnDescPtr, SchemaDescPtr}; -use crate::util::memory::ByteBufferPtr; +use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor}; + +/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files +pub trait AsyncFileReader { + /// Retrieve the bytes in `range` + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result>; + + /// Provides asynchronous access to the [`ParquetMetaData`] of a parquet file, + /// allowing fine-grained control over how metadata is sourced, in particular allowing + /// for caching, pre-fetching, catalog metadata, etc... + fn get_metadata(&mut self) -> BoxFuture<'_, Result>>; +} + +impl AsyncFileReader for T { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + async move { + self.seek(SeekFrom::Start(range.start as u64)).await?; + + let to_read = range.end - range.start; + let mut buffer = Vec::with_capacity(to_read); + let read = self.take(to_read as u64).read_to_end(&mut buffer).await?; + if read != to_read { + eof_err!("expected to read {} bytes, got {}", to_read, read); + } + + Ok(buffer.into()) + } + .boxed() + } + + fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { + async move { + self.seek(SeekFrom::End(-8)).await?; + + let mut buf = [0_u8; 8]; + self.read_exact(&mut buf).await?; + + if buf[4..] != PARQUET_MAGIC { + return Err(general_err!("Invalid Parquet file. Corrupt footer")); + } + + let metadata_len = LittleEndian::read_i32(&buf[..4]) as i64; + if metadata_len < 0 { + return Err(general_err!( + "Invalid Parquet file. Metadata length is less than zero ({})", + metadata_len + )); + } + + self.seek(SeekFrom::End(-8 - metadata_len)).await?; + + let mut buf = Vec::with_capacity(metadata_len as usize + 8); + self.read_to_end(&mut buf).await?; + + Ok(Arc::new(parse_metadata_buffer(&mut Cursor::new(buf))?)) + } + .boxed() + } +} /// A builder used to construct a [`ParquetRecordBatchStream`] for a parquet file /// @@ -124,10 +183,10 @@ pub struct ParquetRecordBatchStreamBuilder { projection: ProjectionMask, } -impl ParquetRecordBatchStreamBuilder { +impl ParquetRecordBatchStreamBuilder { /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file pub async fn new(mut input: T) -> Result { - let metadata = Arc::new(read_footer(&mut input).await?); + let metadata = input.get_metadata().await?; let schema = Arc::new(parquet_to_arrow_schema( metadata.file_metadata().schema_descr(), @@ -149,6 +208,11 @@ impl ParquetRecordBatchStreamBuilder { &self.metadata } + /// Returns the parquet [`SchemaDescriptor`] for this parquet file + pub fn parquet_schema(&self) -> &SchemaDescriptor { + self.metadata.file_metadata().schema_descr() + } + /// Returns the arrow [`SchemaRef`] for this parquet file pub fn schema(&self) -> &SchemaRef { &self.schema @@ -264,8 +328,9 @@ impl ParquetRecordBatchStream { } } -impl Stream - for ParquetRecordBatchStream +impl Stream for ParquetRecordBatchStream +where + T: AsyncFileReader + Unpin + Send + 'static, { type Item = Result; @@ -309,6 +374,7 @@ impl Stream let mut column_chunks = vec![None; row_group_metadata.columns().len()]; + // TODO: Combine consecutive ranges for (idx, chunk) in column_chunks.iter_mut().enumerate() { if !projection.leaf_included(idx) { continue; @@ -316,18 +382,16 @@ impl Stream let column = row_group_metadata.column(idx); let (start, length) = column.byte_range(); - let end = start + length; - - input.seek(SeekFrom::Start(start)).await?; - let mut buffer = vec![0_u8; (end - start) as usize]; - input.read_exact(buffer.as_mut_slice()).await?; + let data = input + .get_bytes(start as usize..(start + length) as usize) + .await?; *chunk = Some(InMemoryColumnChunk { num_values: column.num_values(), compression: column.compression(), physical_type: column.column_type(), - data: ByteBufferPtr::new(buffer), + data, }); } @@ -379,34 +443,6 @@ impl Stream } } -async fn read_footer( - input: &mut T, -) -> Result { - input.seek(SeekFrom::End(-8)).await?; - - let mut buf = [0_u8; 8]; - input.read_exact(&mut buf).await?; - - if buf[4..] != PARQUET_MAGIC { - return Err(general_err!("Invalid Parquet file. Corrupt footer")); - } - - let metadata_len = LittleEndian::read_i32(&buf[..4]) as i64; - if metadata_len < 0 { - return Err(general_err!( - "Invalid Parquet file. Metadata length is less than zero ({})", - metadata_len - )); - } - - input.seek(SeekFrom::End(-8 - metadata_len)).await?; - - let mut buf = Vec::with_capacity(metadata_len as usize + 8); - input.read_to_end(&mut buf).await?; - - parse_metadata_buffer(&mut Cursor::new(buf)) -} - struct InMemoryRowGroup { schema: SchemaDescPtr, column_chunks: Vec>, @@ -438,13 +474,13 @@ struct InMemoryColumnChunk { num_values: i64, compression: Compression, physical_type: crate::basic::Type, - data: ByteBufferPtr, + data: Bytes, } impl InMemoryColumnChunk { fn pages(&self) -> Result> { let page_reader = SerializedPageReader::new( - Cursor::new(self.data.clone()), + self.data.clone().reader(), self.num_values, self.compression, self.physical_type, @@ -477,3 +513,82 @@ impl PageIterator for ColumnChunkIterator { Ok(self.column_schema.clone()) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::arrow::{ArrowReader, ParquetFileArrowReader}; + use arrow::error::Result as ArrowResult; + use futures::TryStreamExt; + use std::sync::Mutex; + + struct TestReader { + data: Bytes, + metadata: Arc, + requests: Arc>>>, + } + + impl AsyncFileReader for TestReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + self.requests.lock().unwrap().push(range.clone()); + futures::future::ready(Ok(self.data.slice(range))).boxed() + } + + fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { + futures::future::ready(Ok(self.metadata.clone())).boxed() + } + } + + #[tokio::test] + async fn test_async_reader() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{}/alltypes_plain.parquet", testdata); + let data = Bytes::from(std::fs::read(path).unwrap()); + + let metadata = crate::file::footer::parse_metadata(&data).unwrap(); + let metadata = Arc::new(metadata); + + assert_eq!(metadata.num_row_groups(), 1); + + let async_reader = TestReader { + data: data.clone(), + metadata: metadata.clone(), + requests: Default::default(), + }; + + let requests = async_reader.requests.clone(); + let builder = ParquetRecordBatchStreamBuilder::new(async_reader) + .await + .unwrap(); + + let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]); + let stream = builder + .with_projection(mask.clone()) + .with_batch_size(1024) + .build() + .unwrap(); + + let async_batches: Vec<_> = stream.try_collect().await.unwrap(); + + let mut sync_reader = ParquetFileArrowReader::try_new(data).unwrap(); + let sync_batches = sync_reader + .get_record_reader_by_columns(mask, 1024) + .unwrap() + .collect::>>() + .unwrap(); + + assert_eq!(async_batches, sync_batches); + + let requests = requests.lock().unwrap(); + let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range(); + let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range(); + + assert_eq!( + &requests[..], + &[ + offset_1 as usize..(offset_1 + length_1) as usize, + offset_2 as usize..(offset_2 + length_2) as usize + ] + ); + } +} diff --git a/parquet/src/util/memory.rs b/parquet/src/util/memory.rs index 0b0c707ff34f..909878a6d538 100644 --- a/parquet/src/util/memory.rs +++ b/parquet/src/util/memory.rs @@ -31,7 +31,6 @@ use std::{ /// when all slices are dropped. /// /// TODO: Remove and replace with [`bytes::Bytes`] -#[allow(clippy::rc_buffer)] #[derive(Clone, Debug)] pub struct ByteBufferPtr { data: Bytes, @@ -109,6 +108,12 @@ impl From> for ByteBufferPtr { } } +impl From for ByteBufferPtr { + fn from(data: Bytes) -> Self { + Self { data } + } +} + #[cfg(test)] mod tests { use super::*;