From 1ca7b32ea68e6473b0d225f73a3e536eafa91c10 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 26 Feb 2025 01:08:42 +0530 Subject: [PATCH 1/3] feat: drop use of `MergeReverseRecordReader` --- src/parseable/staging/reader.rs | 379 +------------------------------- src/parseable/streams.rs | 12 +- 2 files changed, 14 insertions(+), 377 deletions(-) diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index 6df0dc324..93189f6af 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -19,23 +19,17 @@ use std::{ fs::{remove_file, File}, - io::{self, BufReader, Read, Seek, SeekFrom}, + io::BufReader, path::PathBuf, sync::Arc, - vec::IntoIter, }; -use arrow_array::{RecordBatch, TimestampMillisecondArray}; -use arrow_ipc::{reader::StreamReader, root_as_message_unchecked, MessageHeader}; +use arrow_array::RecordBatch; +use arrow_ipc::reader::StreamReader; use arrow_schema::Schema; -use byteorder::{LittleEndian, ReadBytesExt}; -use itertools::kmerge_by; use tracing::error; -use crate::{ - event::DEFAULT_TIMESTAMP_KEY, - utils::arrow::{adapt_batch, reverse}, -}; +use crate::utils::arrow::adapt_batch; #[derive(Debug)] pub struct MergedRecordReader { @@ -43,7 +37,7 @@ pub struct MergedRecordReader { } impl MergedRecordReader { - pub fn try_new(files: &[PathBuf]) -> Result { + pub fn try_new(files: &[PathBuf]) -> Self { let mut readers = Vec::with_capacity(files.len()); for file in files { @@ -63,55 +57,9 @@ impl MergedRecordReader { } } - Ok(Self { readers }) - } - - pub fn merged_schema(&self) -> Schema { - Schema::try_merge( - self.readers - .iter() - .map(|reader| reader.schema().as_ref().clone()), - ) - .unwrap() - } -} - -#[derive(Debug)] -pub struct MergedReverseRecordReader { - pub readers: Vec>>>, -} - -impl MergedReverseRecordReader { - pub fn try_new(files: &[PathBuf]) -> Self { - let mut readers = Vec::with_capacity(files.len()); - for file in files { - let Ok(reader) = get_reverse_reader(File::open(file).unwrap()) else { - error!("Invalid file detected, ignoring it: {:?}", file); - continue; - }; - - readers.push(reader); - } - Self { readers } } - pub fn merged_iter( - self, - schema: Arc, - time_partition: Option, - ) -> impl Iterator { - let adapted_readers = self.readers.into_iter().map(|reader| reader.flatten()); - kmerge_by(adapted_readers, move |a: &RecordBatch, b: &RecordBatch| { - // Capture time_partition by value - let a_time = get_timestamp_millis(a, time_partition.clone()); - let b_time = get_timestamp_millis(b, time_partition.clone()); - a_time > b_time - }) - .map(|batch| reverse(&batch)) - .map(move |batch| adapt_batch(&schema, &batch)) - } - pub fn merged_schema(&self) -> Schema { Schema::try_merge( self.readers @@ -120,318 +68,11 @@ impl MergedReverseRecordReader { ) .unwrap() } -} - -fn get_timestamp_millis(batch: &RecordBatch, time_partition: Option) -> i64 { - match time_partition { - Some(time_partition) => { - let time_partition = time_partition.as_str(); - match batch.column_by_name(time_partition) { - Some(column) => column - .as_any() - .downcast_ref::() - .unwrap() - .value(0), - None => get_default_timestamp_millis(batch), - } - } - None => get_default_timestamp_millis(batch), - } -} -fn get_default_timestamp_millis(batch: &RecordBatch) -> i64 { - match batch - .column(0) - .as_any() - .downcast_ref::() - { - // Ideally we expect the first column to be a timestamp (because we add the timestamp column first in the writer) - Some(array) => array.value(0), - // In case the first column is not a timestamp, we fallback to look for default timestamp column across all columns - None => batch - .column_by_name(DEFAULT_TIMESTAMP_KEY) - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .value(0), - } -} - -/// OffsetReader takes in a reader and list of offset and sizes and -/// provides a reader over the file by reading only the offsets -/// from start of the list to end. -/// -/// Safety Invariant: Reader is already validated and all offset and limit are valid to read. -/// -/// On empty list the reader returns no bytes read. -pub struct OffsetReader { - reader: R, - offset_list: IntoIter<(u64, usize)>, - current_offset: u64, - current_size: usize, - buffer: Vec, - buffer_position: usize, - finished: bool, -} - -impl OffsetReader { - fn new(reader: R, offset_list: Vec<(u64, usize)>) -> Self { - let mut offset_list = offset_list.into_iter(); - let mut finished = false; - - let (current_offset, current_size) = offset_list.next().unwrap_or_default(); - if current_offset == 0 && current_size == 0 { - finished = true - } - - OffsetReader { - reader, - offset_list, - current_offset, - current_size, - buffer: vec![0; 4096], - buffer_position: 0, - finished, - } - } -} - -impl Read for OffsetReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - let offset = self.current_offset; - let size = self.current_size; - - if self.finished { - return Ok(0); - } - // on empty buffer load current data represented by - // current_offset and current_size into self buffer - if self.buffer_position == 0 { - self.reader.seek(SeekFrom::Start(offset))?; - // resize for current message - if self.buffer.len() < size { - self.buffer.resize(size, 0) - } - self.reader.read_exact(&mut self.buffer[0..size])?; - } - - let remaining_bytes = size - self.buffer_position; - let max_read = usize::min(remaining_bytes, buf.len()); - - // Copy data from the buffer to the provided buffer - let read_data = &self.buffer[self.buffer_position..self.buffer_position + max_read]; - buf[..max_read].copy_from_slice(read_data); - - self.buffer_position += max_read; - - if self.buffer_position >= size { - // If we've read the entire section, move to the next offset - match self.offset_list.next() { - Some((offset, size)) => { - self.current_offset = offset; - self.current_size = size; - self.buffer_position = 0; - } - None => { - // iter is exhausted, no more read can be done - self.finished = true - } - } - } - - Ok(max_read) - } -} - -pub fn get_reverse_reader( - mut reader: T, -) -> Result>>, io::Error> { - let mut offset = 0; - let mut messages = Vec::new(); - - while let Some(res) = find_limit_and_type(&mut reader).transpose() { - match res { - Ok((header, size)) => { - messages.push((header, offset, size)); - offset += size; - } - Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break, - Err(err) => return Err(err), - } - } - - // reverse everything leaving the first because it has schema message. - messages[1..].reverse(); - let messages = messages - .into_iter() - .map(|(_, offset, size)| (offset as u64, size)) - .collect(); - - // reset reader - reader.rewind()?; - - Ok(StreamReader::try_new(BufReader::new(OffsetReader::new(reader, messages)), None).unwrap()) -} - -// return limit for -fn find_limit_and_type( - reader: &mut (impl Read + Seek), -) -> Result, io::Error> { - let mut size = 0; - let marker = reader.read_u32::()?; - size += 4; - - if marker != 0xFFFFFFFF { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "Invalid Continuation Marker", - )); - } - - let metadata_size = reader.read_u32::()? as usize; - size += 4; - - if metadata_size == 0x00000000 { - return Ok(None); - } - - let mut message = vec![0u8; metadata_size]; - reader.read_exact(&mut message)?; - size += metadata_size; - - let message = unsafe { root_as_message_unchecked(&message) }; - let header = message.header_type(); - let message_size = message.bodyLength(); - size += message_size as usize; - - let padding = (8 - (size % 8)) % 8; - reader.seek(SeekFrom::Current(padding as i64 + message_size))?; - size += padding; - - Ok(Some((header, size))) -} - -#[cfg(test)] -mod tests { - use std::{io::Cursor, sync::Arc}; - - use arrow_array::{ - cast::AsArray, types::Int64Type, Array, Float64Array, Int64Array, RecordBatch, StringArray, - }; - use arrow_ipc::writer::{ - write_message, DictionaryTracker, IpcDataGenerator, IpcWriteOptions, StreamWriter, - }; - - use super::get_reverse_reader; - - fn rb(rows: usize) -> RecordBatch { - let array1: Arc = Arc::new(Int64Array::from_iter(0..(rows as i64))); - let array2: Arc = Arc::new(Float64Array::from_iter((0..rows).map(|x| x as f64))); - let array3: Arc = Arc::new(StringArray::from_iter( - (0..rows).map(|x| Some(format!("str {}", x))), - )); - - RecordBatch::try_from_iter_with_nullable([ - ("a", array1, true), - ("b", array2, true), - ("c", array3, true), - ]) - .unwrap() - } - - fn write_mem(rbs: &[RecordBatch]) -> Vec { - let buf = Vec::new(); - let mut writer = StreamWriter::try_new(buf, &rbs[0].schema()).unwrap(); - - for rb in rbs { - writer.write(rb).unwrap() - } - - writer.into_inner().unwrap() - } - - #[test] - fn test_empty_row() { - let rb = rb(0); - let buf = write_mem(&[rb]); - let reader = Cursor::new(buf); - let mut reader = get_reverse_reader(reader).unwrap(); - let rb = reader.next().unwrap().unwrap(); - assert_eq!(rb.num_rows(), 0); - } - - #[test] - fn test_one_row() { - let rb = rb(1); - let buf = write_mem(&[rb]); - let reader = Cursor::new(buf); - let mut reader = get_reverse_reader(reader).unwrap(); - let rb = reader.next().unwrap().unwrap(); - assert_eq!(rb.num_rows(), 1); - } - - #[test] - fn test_multiple_row_multiple_rbs() { - let buf = write_mem(&[rb(1), rb(2), rb(3)]); - let reader = Cursor::new(buf); - let mut reader = get_reverse_reader(reader).unwrap(); - let rb = reader.next().unwrap().unwrap(); - assert_eq!(rb.num_rows(), 3); - let col1_val: Vec = rb - .column(0) - .as_primitive::() - .iter() - .flatten() - .collect(); - assert_eq!(col1_val, vec![0, 1, 2]); - - let rb = reader.next().unwrap().unwrap(); - assert_eq!(rb.num_rows(), 2); - - let rb = reader.next().unwrap().unwrap(); - assert_eq!(rb.num_rows(), 1); - } - - #[test] - fn manual_write() { - let error_on_replacement = true; - let options = IpcWriteOptions::default(); - let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement); - let data_gen = IpcDataGenerator {}; - - let mut buf = Vec::new(); - let rb1 = rb(1); - - let schema = data_gen.schema_to_bytes_with_dictionary_tracker( - &rb1.schema(), - &mut dictionary_tracker, - &options, - ); - write_message(&mut buf, schema, &options).unwrap(); - - for i in (1..=3).cycle().skip(1).take(10000) { - let (_, encoded_message) = data_gen - .encoded_batch(&rb(i), &mut dictionary_tracker, &options) - .unwrap(); - write_message(&mut buf, encoded_message, &options).unwrap(); - } - - let schema = data_gen.schema_to_bytes_with_dictionary_tracker( - &rb1.schema(), - &mut dictionary_tracker, - &options, - ); - write_message(&mut buf, schema, &options).unwrap(); - - let buf = Cursor::new(buf); - let reader = get_reverse_reader(buf).unwrap().flatten(); - - let mut sum = 0; - for rb in reader { - sum += 1; - assert!(rb.num_rows() > 0); - } - assert_eq!(sum, 10000); + pub fn merged_iter(self, schema: Arc) -> impl Iterator { + self.readers + .into_iter() + .flat_map(|reader| reader.flatten()) + .map(move |batch| adapt_batch(&schema, &batch)) } } diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 57d24a3fb..5491de835 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -57,11 +57,7 @@ use crate::{ }; use super::{ - staging::{ - reader::{MergedRecordReader, MergedReverseRecordReader}, - writer::Writer, - StagingError, - }, + staging::{reader::MergedRecordReader, writer::Writer, StagingError}, LogStream, }; @@ -473,7 +469,7 @@ impl Stream { // warn!("staging files-\n{staging_files:?}\n"); for (parquet_path, arrow_files) in staging_files { - let record_reader = MergedReverseRecordReader::try_new(&arrow_files); + let record_reader = MergedRecordReader::try_new(&arrow_files); if record_reader.readers.is_empty() { continue; } @@ -490,7 +486,7 @@ impl Stream { .open(&part_path) .map_err(|_| StagingError::Create)?; let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props))?; - for ref record in record_reader.merged_iter(schema, time_partition.cloned()) { + for ref record in record_reader.merged_iter(schema) { writer.write(record)?; } writer.close()?; @@ -533,7 +529,7 @@ impl Stream { pub fn updated_schema(&self, current_schema: Schema) -> Schema { let staging_files = self.arrow_files(); - let record_reader = MergedRecordReader::try_new(&staging_files).unwrap(); + let record_reader = MergedRecordReader::try_new(&staging_files); if record_reader.readers.is_empty() { return current_schema; } From 0817e308e370bcdb5ec591f52f8eb89a0462fd4b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 26 Feb 2025 01:44:19 +0530 Subject: [PATCH 2/3] fix: reverse per batch --- src/parseable/staging/reader.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index 93189f6af..ed0497c0f 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -29,7 +29,7 @@ use arrow_ipc::reader::StreamReader; use arrow_schema::Schema; use tracing::error; -use crate::utils::arrow::adapt_batch; +use crate::utils::arrow::{adapt_batch, reverse}; #[derive(Debug)] pub struct MergedRecordReader { @@ -73,6 +73,7 @@ impl MergedRecordReader { self.readers .into_iter() .flat_map(|reader| reader.flatten()) + .map(|batch| reverse(&batch)) .map(move |batch| adapt_batch(&schema, &batch)) } } From 306e440dc5729324dfe08b2e86258680626e125c Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 26 Feb 2025 16:44:19 +0530 Subject: [PATCH 3/3] test: revert deletion and fix for changes --- src/parseable/staging/reader.rs | 147 ++++++++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index ed0497c0f..4b9ddc5f5 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -77,3 +77,150 @@ impl MergedRecordReader { .map(move |batch| adapt_batch(&schema, &batch)) } } + +#[cfg(test)] +mod tests { + use std::{ + io::{BufReader, Cursor, Read, Seek}, + sync::Arc, + }; + + use arrow_array::{ + cast::AsArray, types::Int64Type, Array, Float64Array, Int64Array, RecordBatch, StringArray, + }; + use arrow_ipc::{ + reader::StreamReader, + writer::{ + write_message, DictionaryTracker, IpcDataGenerator, IpcWriteOptions, StreamWriter, + }, + }; + + fn rb(rows: usize) -> RecordBatch { + let array1: Arc = Arc::new(Int64Array::from_iter(0..(rows as i64))); + let array2: Arc = Arc::new(Float64Array::from_iter((0..rows).map(|x| x as f64))); + let array3: Arc = Arc::new(StringArray::from_iter( + (0..rows).map(|x| Some(format!("str {}", x))), + )); + + RecordBatch::try_from_iter_with_nullable([ + ("a", array1, true), + ("b", array2, true), + ("c", array3, true), + ]) + .unwrap() + } + + fn write_mem(rbs: &[RecordBatch]) -> Vec { + let buf = Vec::new(); + let mut writer = StreamWriter::try_new(buf, &rbs[0].schema()).unwrap(); + + for rb in rbs { + writer.write(rb).unwrap() + } + + writer.into_inner().unwrap() + } + + fn get_reverse_reader(reader: T) -> StreamReader> { + StreamReader::try_new(BufReader::new(reader), None).unwrap() + } + + #[test] + fn test_empty_row() { + let rb = rb(0); + let buf = write_mem(&[rb]); + let reader = Cursor::new(buf); + let mut reader = get_reverse_reader(reader); + let rb = reader.next().unwrap().unwrap(); + assert_eq!(rb.num_rows(), 0); + } + + #[test] + fn test_one_row() { + let rb = rb(1); + let buf = write_mem(&[rb]); + let reader = Cursor::new(buf); + let mut reader = get_reverse_reader(reader); + let rb = reader.next().unwrap().unwrap(); + assert_eq!(rb.num_rows(), 1); + } + + #[test] + fn test_multiple_row_multiple_rbs() { + let buf = write_mem(&[rb(1), rb(2), rb(3)]); + let reader = Cursor::new(buf); + let mut reader = get_reverse_reader(reader); + let rb = reader.next().unwrap().unwrap(); + assert_eq!(rb.num_rows(), 1); + let col1_val: Vec = rb + .column(0) + .as_primitive::() + .iter() + .flatten() + .collect(); + assert_eq!(col1_val, vec![0]); + + let rb = reader.next().unwrap().unwrap(); + assert_eq!(rb.num_rows(), 2); + let col1_val: Vec = rb + .column(0) + .as_primitive::() + .iter() + .flatten() + .collect(); + assert_eq!(col1_val, vec![0, 1]); + + let rb = reader.next().unwrap().unwrap(); + assert_eq!(rb.num_rows(), 3); + let col1_val: Vec = rb + .column(0) + .as_primitive::() + .iter() + .flatten() + .collect(); + assert_eq!(col1_val, vec![0, 1, 2]); + } + + #[test] + fn manual_write() { + let error_on_replacement = true; + let options = IpcWriteOptions::default(); + let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement); + let data_gen = IpcDataGenerator {}; + + let mut buf = Vec::new(); + let rb1 = rb(1); + + let schema = data_gen.schema_to_bytes_with_dictionary_tracker( + &rb1.schema(), + &mut dictionary_tracker, + &options, + ); + write_message(&mut buf, schema, &options).unwrap(); + + for i in (1..=3).cycle().skip(1).take(10000) { + let (_, encoded_message) = data_gen + .encoded_batch(&rb(i), &mut dictionary_tracker, &options) + .unwrap(); + write_message(&mut buf, encoded_message, &options).unwrap(); + } + + let schema = data_gen.schema_to_bytes_with_dictionary_tracker( + &rb1.schema(), + &mut dictionary_tracker, + &options, + ); + write_message(&mut buf, schema, &options).unwrap(); + + let buf = Cursor::new(buf); + let reader = get_reverse_reader(buf).flatten(); + + let mut sum = 0; + for rb in reader { + sum += 1; + assert!(rb.num_rows() > 0); + } + + assert_eq!(sum, 10000); + } +}