Skip to content

Commit cf1f993

Browse files
committed
Implement ParquetDecoder push API
1 parent c881d34 commit cf1f993

File tree

12 files changed

+2514
-15
lines changed

12 files changed

+2514
-15
lines changed
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::errors::ParquetError;
19+
use crate::file::reader::{ChunkReader, Length};
20+
use bytes::Bytes;
21+
use std::fmt::Display;
22+
use std::ops::Range;
23+
24+
/// Holds multiple buffers of data that have been requested by the ParquetDecoder
25+
///
26+
/// This is the in-memory buffer for the ParquetDecoder
27+
///
28+
/// Features it has:
29+
/// 1. Zero copy as much as possible
30+
/// 2. Keeps non contiguous ranges of bytes
31+
#[derive(Debug, Clone)]
32+
pub(crate) struct Buffers {
33+
/// the virtual "offset" of this buffers (added to any request)
34+
offset: u64,
35+
/// The total length of the file being decoded
36+
file_len: u64,
37+
/// The ranges of data that are available for decoding (not adjusted for offset)
38+
ranges: Vec<Range<u64>>,
39+
/// The buffers of data that can be used to decode the Parquet file
40+
buffers: Vec<Bytes>,
41+
}
42+
43+
impl Display for Buffers {
44+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45+
writeln!(
46+
f,
47+
"Buffers (offset: {}, file_len: {})",
48+
self.offset, self.file_len
49+
)?;
50+
writeln!(f, "Available Ranges (w/ offset):")?;
51+
for range in &self.ranges {
52+
writeln!(
53+
f,
54+
" {}..{} ({}..{}): {} bytes",
55+
range.start,
56+
range.end,
57+
range.start + self.offset,
58+
range.end + self.offset,
59+
range.end - range.start
60+
)?;
61+
}
62+
63+
Ok(())
64+
}
65+
}
66+
67+
impl Buffers {
68+
/// Create a new Buffers instance with the given file length
69+
pub fn new(file_len: u64) -> Self {
70+
Self {
71+
offset: 0,
72+
file_len,
73+
ranges: Vec::new(),
74+
buffers: Vec::new(),
75+
}
76+
}
77+
78+
/// Push all the ranges and buffers
79+
pub fn push_ranges(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
80+
assert_eq!(
81+
ranges.len(),
82+
buffers.len(),
83+
"Number of ranges must match number of buffers"
84+
);
85+
for (range, buffer) in ranges.into_iter().zip(buffers.into_iter()) {
86+
self.push_range(range, buffer);
87+
}
88+
}
89+
90+
/// Push a new range and its associated buffer
91+
pub fn push_range(&mut self, range: Range<u64>, buffer: Bytes) {
92+
assert_eq!(
93+
(range.end - range.start) as usize,
94+
buffer.len(),
95+
"Range length must match buffer length"
96+
);
97+
self.ranges.push(range);
98+
self.buffers.push(buffer);
99+
}
100+
101+
/// Returns true if the Buffers contains data for the given range
102+
pub fn has_range(&self, range: &Range<u64>) -> bool {
103+
self.ranges
104+
.iter()
105+
.any(|r| r.start <= range.start && r.end >= range.end)
106+
}
107+
108+
fn iter(&self) -> impl Iterator<Item = (&Range<u64>, &Bytes)> {
109+
self.ranges.iter().zip(self.buffers.iter())
110+
}
111+
112+
/// return the file length of the Parquet file being read
113+
pub fn file_len(&self) -> u64 {
114+
self.file_len
115+
}
116+
117+
/// Specify a new offset
118+
pub fn with_offset(mut self, offset: u64) -> Self {
119+
self.offset = offset;
120+
self
121+
}
122+
123+
/// Return the total of all buffered ranges
124+
pub fn buffered_bytes(&self) -> u64 {
125+
self.ranges.iter().map(|r| r.end - r.start).sum()
126+
}
127+
128+
/// Clear any range and corresponding buffer that is exactly in the ranges_to_clear
129+
pub fn clear_ranges(&mut self, ranges_to_clear: &[Range<u64>]) {
130+
let mut new_ranges = Vec::new();
131+
let mut new_buffers = Vec::new();
132+
133+
for (range, buffer) in self.iter() {
134+
if !ranges_to_clear
135+
.iter()
136+
.any(|r| r.start == range.start && r.end == range.end)
137+
{
138+
new_ranges.push(range.clone());
139+
new_buffers.push(buffer.clone());
140+
}
141+
}
142+
self.ranges = new_ranges;
143+
self.buffers = new_buffers;
144+
}
145+
}
146+
147+
impl Length for Buffers {
148+
fn len(&self) -> u64 {
149+
self.file_len
150+
}
151+
}
152+
153+
/// less efficinet implementation of Read for Buffers
154+
impl std::io::Read for Buffers {
155+
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
156+
// Find the range that contains the start offset
157+
let mut found = false;
158+
for (range, data) in self.iter() {
159+
if range.start <= self.offset && range.end >= self.offset + buf.len() as u64 {
160+
// Found the range, figure out the starting offset in the buffer
161+
let start_offset = (self.offset - range.start) as usize;
162+
let end_offset = start_offset + buf.len();
163+
let slice = data.slice(start_offset..end_offset);
164+
buf.copy_from_slice(slice.as_ref());
165+
found = true;
166+
}
167+
}
168+
if found {
169+
// If we found the range, we can return the number of bytes read
170+
// advance our offset
171+
self.offset += buf.len() as u64;
172+
Ok(buf.len())
173+
} else {
174+
Err(std::io::Error::new(
175+
std::io::ErrorKind::UnexpectedEof,
176+
"No data available in Buffers",
177+
))
178+
}
179+
}
180+
}
181+
182+
impl ChunkReader for Buffers {
183+
type T = Self;
184+
185+
fn get_read(&self, start: u64) -> Result<Self::T, ParquetError> {
186+
Ok(self.clone().with_offset(self.offset + start))
187+
}
188+
189+
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes, ParquetError> {
190+
if start > self.file_len {
191+
return Err(ParquetError::General(format!(
192+
"Requested start {start} is beyond the end of the file (file length: {})",
193+
self.file_len
194+
)));
195+
}
196+
197+
// find the range that contains the start offset
198+
for (range, data) in self.iter() {
199+
if range.start <= start && range.end >= start + length as u64 {
200+
// Found the range, figure out the starting offset in the buffer
201+
let start_offset = (start - range.start) as usize;
202+
return Ok(data.slice(start_offset..start_offset + length));
203+
}
204+
}
205+
// Signal that we need more data
206+
let requested_end = start + length as u64;
207+
Err(ParquetError::NeedMoreDataRange(start..requested_end))
208+
}
209+
}

0 commit comments

Comments
 (0)