Skip to content

feat: drop use of MergeReverseRecordReader #1213

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
301 changes: 45 additions & 256 deletions src/parseable/staging/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,25 @@

use std::{
fs::{remove_file, File},
io::{self, BufReader, Read, Seek, SeekFrom},
io::BufReader,
path::PathBuf,
sync::Arc,
vec::IntoIter,
};

use arrow_array::{RecordBatch, TimestampMillisecondArray};
use arrow_ipc::{reader::StreamReader, root_as_message_unchecked, MessageHeader};
use arrow_array::RecordBatch;
use arrow_ipc::reader::StreamReader;
use arrow_schema::Schema;
use byteorder::{LittleEndian, ReadBytesExt};
use itertools::kmerge_by;
use tracing::error;

use crate::{
event::DEFAULT_TIMESTAMP_KEY,
utils::arrow::{adapt_batch, reverse},
};
use crate::utils::arrow::{adapt_batch, reverse};

#[derive(Debug)]
pub struct MergedRecordReader {
pub readers: Vec<StreamReader<BufReader<File>>>,
}

impl MergedRecordReader {
pub fn try_new(files: &[PathBuf]) -> Result<Self, ()> {
pub fn try_new(files: &[PathBuf]) -> Self {
let mut readers = Vec::with_capacity(files.len());

for file in files {
Expand All @@ -63,55 +57,9 @@ impl MergedRecordReader {
}
}

Ok(Self { readers })
}

pub fn merged_schema(&self) -> Schema {
Schema::try_merge(
self.readers
.iter()
.map(|reader| reader.schema().as_ref().clone()),
)
.unwrap()
}
}

#[derive(Debug)]
pub struct MergedReverseRecordReader {
pub readers: Vec<StreamReader<BufReader<OffsetReader<File>>>>,
}

impl MergedReverseRecordReader {
pub fn try_new(files: &[PathBuf]) -> Self {
let mut readers = Vec::with_capacity(files.len());
for file in files {
let Ok(reader) = get_reverse_reader(File::open(file).unwrap()) else {
error!("Invalid file detected, ignoring it: {:?}", file);
continue;
};

readers.push(reader);
}

Self { readers }
}

pub fn merged_iter(
self,
schema: Arc<Schema>,
time_partition: Option<String>,
) -> impl Iterator<Item = RecordBatch> {
let adapted_readers = self.readers.into_iter().map(|reader| reader.flatten());
kmerge_by(adapted_readers, move |a: &RecordBatch, b: &RecordBatch| {
// Capture time_partition by value
let a_time = get_timestamp_millis(a, time_partition.clone());
let b_time = get_timestamp_millis(b, time_partition.clone());
a_time > b_time
})
.map(|batch| reverse(&batch))
.map(move |batch| adapt_batch(&schema, &batch))
}

pub fn merged_schema(&self) -> Schema {
Schema::try_merge(
self.readers
Expand All @@ -120,210 +68,33 @@ impl MergedReverseRecordReader {
)
.unwrap()
}
}

fn get_timestamp_millis(batch: &RecordBatch, time_partition: Option<String>) -> i64 {
match time_partition {
Some(time_partition) => {
let time_partition = time_partition.as_str();
match batch.column_by_name(time_partition) {
Some(column) => column
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.value(0),
None => get_default_timestamp_millis(batch),
}
}
None => get_default_timestamp_millis(batch),
}
}
fn get_default_timestamp_millis(batch: &RecordBatch) -> i64 {
match batch
.column(0)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
{
// Ideally we expect the first column to be a timestamp (because we add the timestamp column first in the writer)
Some(array) => array.value(0),
// In case the first column is not a timestamp, we fallback to look for default timestamp column across all columns
None => batch
.column_by_name(DEFAULT_TIMESTAMP_KEY)
.unwrap()
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.value(0),
}
}

/// OffsetReader takes in a reader and list of offset and sizes and
/// provides a reader over the file by reading only the offsets
/// from start of the list to end.
///
/// Safety Invariant: Reader is already validated and all offset and limit are valid to read.
///
/// On empty list the reader returns no bytes read.
pub struct OffsetReader<R: Read + Seek> {
reader: R,
offset_list: IntoIter<(u64, usize)>,
current_offset: u64,
current_size: usize,
buffer: Vec<u8>,
buffer_position: usize,
finished: bool,
}

impl<R: Read + Seek> OffsetReader<R> {
fn new(reader: R, offset_list: Vec<(u64, usize)>) -> Self {
let mut offset_list = offset_list.into_iter();
let mut finished = false;

let (current_offset, current_size) = offset_list.next().unwrap_or_default();
if current_offset == 0 && current_size == 0 {
finished = true
}

OffsetReader {
reader,
offset_list,
current_offset,
current_size,
buffer: vec![0; 4096],
buffer_position: 0,
finished,
}
}
}

impl<R: Read + Seek> Read for OffsetReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let offset = self.current_offset;
let size = self.current_size;

if self.finished {
return Ok(0);
}
// on empty buffer load current data represented by
// current_offset and current_size into self buffer
if self.buffer_position == 0 {
self.reader.seek(SeekFrom::Start(offset))?;
// resize for current message
if self.buffer.len() < size {
self.buffer.resize(size, 0)
}
self.reader.read_exact(&mut self.buffer[0..size])?;
}

let remaining_bytes = size - self.buffer_position;
let max_read = usize::min(remaining_bytes, buf.len());

// Copy data from the buffer to the provided buffer
let read_data = &self.buffer[self.buffer_position..self.buffer_position + max_read];
buf[..max_read].copy_from_slice(read_data);

self.buffer_position += max_read;

if self.buffer_position >= size {
// If we've read the entire section, move to the next offset
match self.offset_list.next() {
Some((offset, size)) => {
self.current_offset = offset;
self.current_size = size;
self.buffer_position = 0;
}
None => {
// iter is exhausted, no more read can be done
self.finished = true
}
}
}

Ok(max_read)
}
}

pub fn get_reverse_reader<T: Read + Seek>(
mut reader: T,
) -> Result<StreamReader<BufReader<OffsetReader<T>>>, io::Error> {
let mut offset = 0;
let mut messages = Vec::new();

while let Some(res) = find_limit_and_type(&mut reader).transpose() {
match res {
Ok((header, size)) => {
messages.push((header, offset, size));
offset += size;
}
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break,
Err(err) => return Err(err),
}
}

// reverse everything leaving the first because it has schema message.
messages[1..].reverse();
let messages = messages
.into_iter()
.map(|(_, offset, size)| (offset as u64, size))
.collect();

// reset reader
reader.rewind()?;

Ok(StreamReader::try_new(BufReader::new(OffsetReader::new(reader, messages)), None).unwrap())
}

// return limit for
fn find_limit_and_type(
reader: &mut (impl Read + Seek),
) -> Result<Option<(MessageHeader, usize)>, io::Error> {
let mut size = 0;
let marker = reader.read_u32::<LittleEndian>()?;
size += 4;

if marker != 0xFFFFFFFF {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid Continuation Marker",
));
}

let metadata_size = reader.read_u32::<LittleEndian>()? as usize;
size += 4;

if metadata_size == 0x00000000 {
return Ok(None);
pub fn merged_iter(self, schema: Arc<Schema>) -> impl Iterator<Item = RecordBatch> {
self.readers
.into_iter()
.flat_map(|reader| reader.flatten())
.map(|batch| reverse(&batch))
.map(move |batch| adapt_batch(&schema, &batch))
}

let mut message = vec![0u8; metadata_size];
reader.read_exact(&mut message)?;
size += metadata_size;

let message = unsafe { root_as_message_unchecked(&message) };
let header = message.header_type();
let message_size = message.bodyLength();
size += message_size as usize;

let padding = (8 - (size % 8)) % 8;
reader.seek(SeekFrom::Current(padding as i64 + message_size))?;
size += padding;

Ok(Some((header, size)))
}

#[cfg(test)]
mod tests {
use std::{io::Cursor, sync::Arc};
use std::{
io::{BufReader, Cursor, Read, Seek},
sync::Arc,
};

use arrow_array::{
cast::AsArray, types::Int64Type, Array, Float64Array, Int64Array, RecordBatch, StringArray,
};
use arrow_ipc::writer::{
write_message, DictionaryTracker, IpcDataGenerator, IpcWriteOptions, StreamWriter,
use arrow_ipc::{
reader::StreamReader,
writer::{
write_message, DictionaryTracker, IpcDataGenerator, IpcWriteOptions, StreamWriter,
},
};

use super::get_reverse_reader;

fn rb(rows: usize) -> RecordBatch {
let array1: Arc<dyn Array> = Arc::new(Int64Array::from_iter(0..(rows as i64)));
let array2: Arc<dyn Array> = Arc::new(Float64Array::from_iter((0..rows).map(|x| x as f64)));
Expand All @@ -350,12 +121,16 @@ mod tests {
writer.into_inner().unwrap()
}

fn get_reverse_reader<T: Read + Seek>(reader: T) -> StreamReader<BufReader<T>> {
StreamReader::try_new(BufReader::new(reader), None).unwrap()
}

#[test]
fn test_empty_row() {
let rb = rb(0);
let buf = write_mem(&[rb]);
let reader = Cursor::new(buf);
let mut reader = get_reverse_reader(reader).unwrap();
let mut reader = get_reverse_reader(reader);
let rb = reader.next().unwrap().unwrap();
assert_eq!(rb.num_rows(), 0);
}
Expand All @@ -365,7 +140,7 @@ mod tests {
let rb = rb(1);
let buf = write_mem(&[rb]);
let reader = Cursor::new(buf);
let mut reader = get_reverse_reader(reader).unwrap();
let mut reader = get_reverse_reader(reader);
let rb = reader.next().unwrap().unwrap();
assert_eq!(rb.num_rows(), 1);
}
Expand All @@ -374,22 +149,36 @@ mod tests {
fn test_multiple_row_multiple_rbs() {
let buf = write_mem(&[rb(1), rb(2), rb(3)]);
let reader = Cursor::new(buf);
let mut reader = get_reverse_reader(reader).unwrap();
let mut reader = get_reverse_reader(reader);
let rb = reader.next().unwrap().unwrap();
assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_rows(), 1);
let col1_val: Vec<i64> = rb
.column(0)
.as_primitive::<Int64Type>()
.iter()
.flatten()
.collect();
assert_eq!(col1_val, vec![0, 1, 2]);
assert_eq!(col1_val, vec![0]);

let rb = reader.next().unwrap().unwrap();
assert_eq!(rb.num_rows(), 2);
let col1_val: Vec<i64> = rb
.column(0)
.as_primitive::<Int64Type>()
.iter()
.flatten()
.collect();
assert_eq!(col1_val, vec![0, 1]);

let rb = reader.next().unwrap().unwrap();
assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_rows(), 3);
let col1_val: Vec<i64> = rb
.column(0)
.as_primitive::<Int64Type>()
.iter()
.flatten()
.collect();
assert_eq!(col1_val, vec![0, 1, 2]);
}

#[test]
Expand Down Expand Up @@ -424,7 +213,7 @@ mod tests {
write_message(&mut buf, schema, &options).unwrap();

let buf = Cursor::new(buf);
let reader = get_reverse_reader(buf).unwrap().flatten();
let reader = get_reverse_reader(buf).flatten();

let mut sum = 0;
for rb in reader {
Expand Down
Loading
Loading