Skip to content

Commit

Permalink
Merge 98515a0 into 9126bd7
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored Jun 7, 2022
2 parents 9126bd7 + 98515a0 commit c6d8230
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 43 deletions.
199 changes: 157 additions & 42 deletions parquet/src/arrow/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,13 @@
use std::collections::VecDeque;
use std::fmt::Formatter;
use std::io::{Cursor, SeekFrom};
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, Bytes};
use futures::future::{BoxFuture, FutureExt};
use futures::stream::Stream;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
Expand All @@ -101,8 +103,65 @@ use crate::file::footer::parse_metadata_buffer;
use crate::file::metadata::ParquetMetaData;
use crate::file::reader::SerializedPageReader;
use crate::file::PARQUET_MAGIC;
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
use crate::util::memory::ByteBufferPtr;
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};

/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files
pub trait AsyncFileReader {
/// Retrieve the bytes in `range`
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;

/// Provides asynchronous access to the [`ParquetMetaData`] of a parquet file,
/// allowing fine-grained control over how metadata is sourced, in particular allowing
/// for caching, pre-fetching, catalog metadata, etc...
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;
}

impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
async move {
self.seek(SeekFrom::Start(range.start as u64)).await?;

let to_read = range.end - range.start;
let mut buffer = Vec::with_capacity(to_read);
let read = self.take(to_read as u64).read_to_end(&mut buffer).await?;
if read != to_read {
eof_err!("expected to read {} bytes, got {}", to_read, read);
}

Ok(buffer.into())
}
.boxed()
}

fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
async move {
self.seek(SeekFrom::End(-8)).await?;

let mut buf = [0_u8; 8];
self.read_exact(&mut buf).await?;

if buf[4..] != PARQUET_MAGIC {
return Err(general_err!("Invalid Parquet file. Corrupt footer"));
}

let metadata_len = LittleEndian::read_i32(&buf[..4]) as i64;
if metadata_len < 0 {
return Err(general_err!(
"Invalid Parquet file. Metadata length is less than zero ({})",
metadata_len
));
}

self.seek(SeekFrom::End(-8 - metadata_len)).await?;

let mut buf = Vec::with_capacity(metadata_len as usize + 8);
self.read_to_end(&mut buf).await?;

Ok(Arc::new(parse_metadata_buffer(&mut Cursor::new(buf))?))
}
.boxed()
}
}

/// A builder used to construct a [`ParquetRecordBatchStream`] for a parquet file
///
Expand All @@ -124,10 +183,10 @@ pub struct ParquetRecordBatchStreamBuilder<T> {
projection: ProjectionMask,
}

impl<T: AsyncRead + AsyncSeek + Unpin> ParquetRecordBatchStreamBuilder<T> {
impl<T: AsyncFileReader> ParquetRecordBatchStreamBuilder<T> {
/// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file
pub async fn new(mut input: T) -> Result<Self> {
let metadata = Arc::new(read_footer(&mut input).await?);
let metadata = input.get_metadata().await?;

let schema = Arc::new(parquet_to_arrow_schema(
metadata.file_metadata().schema_descr(),
Expand All @@ -149,6 +208,11 @@ impl<T: AsyncRead + AsyncSeek + Unpin> ParquetRecordBatchStreamBuilder<T> {
&self.metadata
}

/// Returns the parquet [`SchemaDescriptor`] for this parquet file
pub fn parquet_schema(&self) -> &SchemaDescriptor {
self.metadata.file_metadata().schema_descr()
}

/// Returns the arrow [`SchemaRef`] for this parquet file
pub fn schema(&self) -> &SchemaRef {
&self.schema
Expand Down Expand Up @@ -264,8 +328,9 @@ impl<T> ParquetRecordBatchStream<T> {
}
}

impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> Stream
for ParquetRecordBatchStream<T>
impl<T> Stream for ParquetRecordBatchStream<T>
where
T: AsyncFileReader + Unpin + Send + 'static,
{
type Item = Result<RecordBatch>;

Expand Down Expand Up @@ -309,25 +374,24 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> Stream
let mut column_chunks =
vec![None; row_group_metadata.columns().len()];

// TODO: Combine consecutive ranges
for (idx, chunk) in column_chunks.iter_mut().enumerate() {
if !projection.leaf_included(idx) {
continue;
}

let column = row_group_metadata.column(idx);
let (start, length) = column.byte_range();
let end = start + length;

input.seek(SeekFrom::Start(start)).await?;

let mut buffer = vec![0_u8; (end - start) as usize];
input.read_exact(buffer.as_mut_slice()).await?;
let data = input
.get_bytes(start as usize..(start + length) as usize)
.await?;

*chunk = Some(InMemoryColumnChunk {
num_values: column.num_values(),
compression: column.compression(),
physical_type: column.column_type(),
data: ByteBufferPtr::new(buffer),
data,
});
}

Expand Down Expand Up @@ -379,34 +443,6 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> Stream
}
}

async fn read_footer<T: AsyncRead + AsyncSeek + Unpin>(
input: &mut T,
) -> Result<ParquetMetaData> {
input.seek(SeekFrom::End(-8)).await?;

let mut buf = [0_u8; 8];
input.read_exact(&mut buf).await?;

if buf[4..] != PARQUET_MAGIC {
return Err(general_err!("Invalid Parquet file. Corrupt footer"));
}

let metadata_len = LittleEndian::read_i32(&buf[..4]) as i64;
if metadata_len < 0 {
return Err(general_err!(
"Invalid Parquet file. Metadata length is less than zero ({})",
metadata_len
));
}

input.seek(SeekFrom::End(-8 - metadata_len)).await?;

let mut buf = Vec::with_capacity(metadata_len as usize + 8);
input.read_to_end(&mut buf).await?;

parse_metadata_buffer(&mut Cursor::new(buf))
}

struct InMemoryRowGroup {
schema: SchemaDescPtr,
column_chunks: Vec<Option<InMemoryColumnChunk>>,
Expand Down Expand Up @@ -438,13 +474,13 @@ struct InMemoryColumnChunk {
num_values: i64,
compression: Compression,
physical_type: crate::basic::Type,
data: ByteBufferPtr,
data: Bytes,
}

impl InMemoryColumnChunk {
fn pages(&self) -> Result<Box<dyn PageReader>> {
let page_reader = SerializedPageReader::new(
Cursor::new(self.data.clone()),
self.data.clone().reader(),
self.num_values,
self.compression,
self.physical_type,
Expand Down Expand Up @@ -477,3 +513,82 @@ impl PageIterator for ColumnChunkIterator {
Ok(self.column_schema.clone())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::arrow::{ArrowReader, ParquetFileArrowReader};
use arrow::error::Result as ArrowResult;
use futures::TryStreamExt;
use std::sync::Mutex;

struct TestReader {
data: Bytes,
metadata: Arc<ParquetMetaData>,
requests: Arc<Mutex<Vec<Range<usize>>>>,
}

impl AsyncFileReader for TestReader {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
self.requests.lock().unwrap().push(range.clone());
futures::future::ready(Ok(self.data.slice(range))).boxed()
}

fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
futures::future::ready(Ok(self.metadata.clone())).boxed()
}
}

#[tokio::test]
async fn test_async_reader() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_plain.parquet", testdata);
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = crate::file::footer::parse_metadata(&data).unwrap();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);

let async_reader = TestReader {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
};

let requests = async_reader.requests.clone();
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
.await
.unwrap();

let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
let stream = builder
.with_projection(mask.clone())
.with_batch_size(1024)
.build()
.unwrap();

let async_batches: Vec<_> = stream.try_collect().await.unwrap();

let mut sync_reader = ParquetFileArrowReader::try_new(data).unwrap();
let sync_batches = sync_reader
.get_record_reader_by_columns(mask, 1024)
.unwrap()
.collect::<ArrowResult<Vec<_>>>()
.unwrap();

assert_eq!(async_batches, sync_batches);

let requests = requests.lock().unwrap();
let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range();
let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range();

assert_eq!(
&requests[..],
&[
offset_1 as usize..(offset_1 + length_1) as usize,
offset_2 as usize..(offset_2 + length_2) as usize
]
);
}
}
7 changes: 6 additions & 1 deletion parquet/src/util/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use std::{
/// when all slices are dropped.
///
/// TODO: Remove and replace with [`bytes::Bytes`]
#[allow(clippy::rc_buffer)]
#[derive(Clone, Debug)]
pub struct ByteBufferPtr {
data: Bytes,
Expand Down Expand Up @@ -109,6 +108,12 @@ impl From<Vec<u8>> for ByteBufferPtr {
}
}

impl From<Bytes> for ByteBufferPtr {
fn from(data: Bytes) -> Self {
Self { data }
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit c6d8230

Please sign in to comment.