Skip to content

Commit c881d34

Browse files
committed
[Parquet] Refactor InMemoryRowGroup to separate CPU and IO
1 parent 876585c commit c881d34

File tree

1 file changed

+68
-18
lines changed
  • parquet/src/arrow/async_reader

1 file changed

+68
-18
lines changed

parquet/src/arrow/async_reader/mod.rs

Lines changed: 68 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -886,6 +886,15 @@ struct InMemoryRowGroup<'a> {
886886
metadata: &'a ParquetMetaData,
887887
}
888888

889+
/// What ranges to fetch for the columns in this row group
890+
#[derive(Debug)]
891+
struct FetchRanges {
892+
/// The byte ranges to fetch
893+
ranges: Vec<Range<u64>>,
894+
/// If `Some`, the start offsets of each page for each column chunk
895+
page_start_offsets: Option<Vec<Vec<u64>>>,
896+
}
897+
889898
impl InMemoryRowGroup<'_> {
890899
/// Fetches any additional column data specified in `projection` that is not already
891900
/// present in `self.column_chunks`.
@@ -898,13 +907,32 @@ impl InMemoryRowGroup<'_> {
898907
projection: &ProjectionMask,
899908
selection: Option<&RowSelection>,
900909
) -> Result<()> {
910+
// Figure out what ranges to fetch
911+
let FetchRanges {
912+
ranges,
913+
page_start_offsets,
914+
} = self.fetch_ranges(projection, selection);
915+
// do the actual fetch
916+
let chunk_data = input.get_byte_ranges(ranges).await?.into_iter();
917+
// update our in memory buffers (self.column_chunks) with the fetched data
918+
self.fill_column_chunks(projection, page_start_offsets, chunk_data);
919+
Ok(())
920+
}
921+
922+
/// Returns the byte ranges to fetch for the columns specified in
923+
/// `projection` and `selection`.
924+
fn fetch_ranges(
925+
&self,
926+
projection: &ProjectionMask,
927+
selection: Option<&RowSelection>,
928+
) -> FetchRanges {
901929
let metadata = self.metadata.row_group(self.row_group_idx);
902930
if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
903931
// If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
904932
// `RowSelection`
905933
let mut page_start_offsets: Vec<Vec<u64>> = vec![];
906934

907-
let fetch_ranges = self
935+
let ranges = self
908936
.column_chunks
909937
.iter()
910938
.zip(metadata.columns())
@@ -930,8 +958,46 @@ impl InMemoryRowGroup<'_> {
930958
ranges
931959
})
932960
.collect();
961+
FetchRanges {
962+
ranges,
963+
page_start_offsets: Some(page_start_offsets),
964+
}
965+
} else {
966+
let ranges = self
967+
.column_chunks
968+
.iter()
969+
.enumerate()
970+
.filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
971+
.map(|(idx, _chunk)| {
972+
let column = metadata.column(idx);
973+
let (start, length) = column.byte_range();
974+
start..(start + length)
975+
})
976+
.collect();
977+
FetchRanges {
978+
ranges,
979+
page_start_offsets: None,
980+
}
981+
}
982+
}
933983

934-
let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
984+
/// Fills in `self.column_chunks` with the data fetched from `chunk_data`.
985+
///
986+
/// This function **must** be called with the data from the ranges returned by
987+
/// `fetch_ranges` and the corresponding page_start_offsets, with the exact same and `selection`.
988+
fn fill_column_chunks<I>(
989+
&mut self,
990+
projection: &ProjectionMask,
991+
page_start_offsets: Option<Vec<Vec<u64>>>,
992+
chunk_data: I,
993+
) where
994+
I: IntoIterator<Item = Bytes>,
995+
{
996+
let mut chunk_data = chunk_data.into_iter();
997+
let metadata = self.metadata.row_group(self.row_group_idx);
998+
if let Some(page_start_offsets) = page_start_offsets {
999+
// If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
1000+
// `RowSelection`
9351001
let mut page_start_offsets = page_start_offsets.into_iter();
9361002

9371003
for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
@@ -956,20 +1022,6 @@ impl InMemoryRowGroup<'_> {
9561022
}
9571023
}
9581024
} else {
959-
let fetch_ranges = self
960-
.column_chunks
961-
.iter()
962-
.enumerate()
963-
.filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
964-
.map(|(idx, _chunk)| {
965-
let column = metadata.column(idx);
966-
let (start, length) = column.byte_range();
967-
start..(start + length)
968-
})
969-
.collect();
970-
971-
let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
972-
9731025
for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
9741026
if chunk.is_some() || !projection.leaf_included(idx) {
9751027
continue;
@@ -983,8 +1035,6 @@ impl InMemoryRowGroup<'_> {
9831035
}
9841036
}
9851037
}
986-
987-
Ok(())
9881038
}
9891039
}
9901040

0 commit comments

Comments
 (0)