diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index 6df0dc324..4b9ddc5f5 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -19,23 +19,17 @@ use std::{ fs::{remove_file, File}, - io::{self, BufReader, Read, Seek, SeekFrom}, + io::BufReader, path::PathBuf, sync::Arc, - vec::IntoIter, }; -use arrow_array::{RecordBatch, TimestampMillisecondArray}; -use arrow_ipc::{reader::StreamReader, root_as_message_unchecked, MessageHeader}; +use arrow_array::RecordBatch; +use arrow_ipc::reader::StreamReader; use arrow_schema::Schema; -use byteorder::{LittleEndian, ReadBytesExt}; -use itertools::kmerge_by; use tracing::error; -use crate::{ - event::DEFAULT_TIMESTAMP_KEY, - utils::arrow::{adapt_batch, reverse}, -}; +use crate::utils::arrow::{adapt_batch, reverse}; #[derive(Debug)] pub struct MergedRecordReader { @@ -43,7 +37,7 @@ pub struct MergedRecordReader { } impl MergedRecordReader { - pub fn try_new(files: &[PathBuf]) -> Result { + pub fn try_new(files: &[PathBuf]) -> Self { let mut readers = Vec::with_capacity(files.len()); for file in files { @@ -63,55 +57,9 @@ impl MergedRecordReader { } } - Ok(Self { readers }) - } - - pub fn merged_schema(&self) -> Schema { - Schema::try_merge( - self.readers - .iter() - .map(|reader| reader.schema().as_ref().clone()), - ) - .unwrap() - } -} - -#[derive(Debug)] -pub struct MergedReverseRecordReader { - pub readers: Vec>>>, -} - -impl MergedReverseRecordReader { - pub fn try_new(files: &[PathBuf]) -> Self { - let mut readers = Vec::with_capacity(files.len()); - for file in files { - let Ok(reader) = get_reverse_reader(File::open(file).unwrap()) else { - error!("Invalid file detected, ignoring it: {:?}", file); - continue; - }; - - readers.push(reader); - } - Self { readers } } - pub fn merged_iter( - self, - schema: Arc, - time_partition: Option, - ) -> impl Iterator { - let adapted_readers = self.readers.into_iter().map(|reader| reader.flatten()); - kmerge_by(adapted_readers, move |a: &RecordBatch, b: &RecordBatch| { - // Capture time_partition by value - let a_time = get_timestamp_millis(a, time_partition.clone()); - let b_time = get_timestamp_millis(b, time_partition.clone()); - a_time > b_time - }) - .map(|batch| reverse(&batch)) - .map(move |batch| adapt_batch(&schema, &batch)) - } - pub fn merged_schema(&self) -> Schema { Schema::try_merge( self.readers @@ -120,210 +68,33 @@ impl MergedReverseRecordReader { ) .unwrap() } -} - -fn get_timestamp_millis(batch: &RecordBatch, time_partition: Option) -> i64 { - match time_partition { - Some(time_partition) => { - let time_partition = time_partition.as_str(); - match batch.column_by_name(time_partition) { - Some(column) => column - .as_any() - .downcast_ref::() - .unwrap() - .value(0), - None => get_default_timestamp_millis(batch), - } - } - None => get_default_timestamp_millis(batch), - } -} -fn get_default_timestamp_millis(batch: &RecordBatch) -> i64 { - match batch - .column(0) - .as_any() - .downcast_ref::() - { - // Ideally we expect the first column to be a timestamp (because we add the timestamp column first in the writer) - Some(array) => array.value(0), - // In case the first column is not a timestamp, we fallback to look for default timestamp column across all columns - None => batch - .column_by_name(DEFAULT_TIMESTAMP_KEY) - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .value(0), - } -} - -/// OffsetReader takes in a reader and list of offset and sizes and -/// provides a reader over the file by reading only the offsets -/// from start of the list to end. -/// -/// Safety Invariant: Reader is already validated and all offset and limit are valid to read. -/// -/// On empty list the reader returns no bytes read. -pub struct OffsetReader { - reader: R, - offset_list: IntoIter<(u64, usize)>, - current_offset: u64, - current_size: usize, - buffer: Vec, - buffer_position: usize, - finished: bool, -} - -impl OffsetReader { - fn new(reader: R, offset_list: Vec<(u64, usize)>) -> Self { - let mut offset_list = offset_list.into_iter(); - let mut finished = false; - - let (current_offset, current_size) = offset_list.next().unwrap_or_default(); - if current_offset == 0 && current_size == 0 { - finished = true - } - - OffsetReader { - reader, - offset_list, - current_offset, - current_size, - buffer: vec![0; 4096], - buffer_position: 0, - finished, - } - } -} - -impl Read for OffsetReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - let offset = self.current_offset; - let size = self.current_size; - - if self.finished { - return Ok(0); - } - // on empty buffer load current data represented by - // current_offset and current_size into self buffer - if self.buffer_position == 0 { - self.reader.seek(SeekFrom::Start(offset))?; - // resize for current message - if self.buffer.len() < size { - self.buffer.resize(size, 0) - } - self.reader.read_exact(&mut self.buffer[0..size])?; - } - - let remaining_bytes = size - self.buffer_position; - let max_read = usize::min(remaining_bytes, buf.len()); - - // Copy data from the buffer to the provided buffer - let read_data = &self.buffer[self.buffer_position..self.buffer_position + max_read]; - buf[..max_read].copy_from_slice(read_data); - - self.buffer_position += max_read; - - if self.buffer_position >= size { - // If we've read the entire section, move to the next offset - match self.offset_list.next() { - Some((offset, size)) => { - self.current_offset = offset; - self.current_size = size; - self.buffer_position = 0; - } - None => { - // iter is exhausted, no more read can be done - self.finished = true - } - } - } - - Ok(max_read) - } -} - -pub fn get_reverse_reader( - mut reader: T, -) -> Result>>, io::Error> { - let mut offset = 0; - let mut messages = Vec::new(); - - while let Some(res) = find_limit_and_type(&mut reader).transpose() { - match res { - Ok((header, size)) => { - messages.push((header, offset, size)); - offset += size; - } - Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break, - Err(err) => return Err(err), - } - } - - // reverse everything leaving the first because it has schema message. - messages[1..].reverse(); - let messages = messages - .into_iter() - .map(|(_, offset, size)| (offset as u64, size)) - .collect(); - - // reset reader - reader.rewind()?; - - Ok(StreamReader::try_new(BufReader::new(OffsetReader::new(reader, messages)), None).unwrap()) -} - -// return limit for -fn find_limit_and_type( - reader: &mut (impl Read + Seek), -) -> Result, io::Error> { - let mut size = 0; - let marker = reader.read_u32::()?; - size += 4; - - if marker != 0xFFFFFFFF { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "Invalid Continuation Marker", - )); - } - let metadata_size = reader.read_u32::()? as usize; - size += 4; - - if metadata_size == 0x00000000 { - return Ok(None); + pub fn merged_iter(self, schema: Arc) -> impl Iterator { + self.readers + .into_iter() + .flat_map(|reader| reader.flatten()) + .map(|batch| reverse(&batch)) + .map(move |batch| adapt_batch(&schema, &batch)) } - - let mut message = vec![0u8; metadata_size]; - reader.read_exact(&mut message)?; - size += metadata_size; - - let message = unsafe { root_as_message_unchecked(&message) }; - let header = message.header_type(); - let message_size = message.bodyLength(); - size += message_size as usize; - - let padding = (8 - (size % 8)) % 8; - reader.seek(SeekFrom::Current(padding as i64 + message_size))?; - size += padding; - - Ok(Some((header, size))) } #[cfg(test)] mod tests { - use std::{io::Cursor, sync::Arc}; + use std::{ + io::{BufReader, Cursor, Read, Seek}, + sync::Arc, + }; use arrow_array::{ cast::AsArray, types::Int64Type, Array, Float64Array, Int64Array, RecordBatch, StringArray, }; - use arrow_ipc::writer::{ - write_message, DictionaryTracker, IpcDataGenerator, IpcWriteOptions, StreamWriter, + use arrow_ipc::{ + reader::StreamReader, + writer::{ + write_message, DictionaryTracker, IpcDataGenerator, IpcWriteOptions, StreamWriter, + }, }; - use super::get_reverse_reader; - fn rb(rows: usize) -> RecordBatch { let array1: Arc = Arc::new(Int64Array::from_iter(0..(rows as i64))); let array2: Arc = Arc::new(Float64Array::from_iter((0..rows).map(|x| x as f64))); @@ -350,12 +121,16 @@ mod tests { writer.into_inner().unwrap() } + fn get_reverse_reader(reader: T) -> StreamReader> { + StreamReader::try_new(BufReader::new(reader), None).unwrap() + } + #[test] fn test_empty_row() { let rb = rb(0); let buf = write_mem(&[rb]); let reader = Cursor::new(buf); - let mut reader = get_reverse_reader(reader).unwrap(); + let mut reader = get_reverse_reader(reader); let rb = reader.next().unwrap().unwrap(); assert_eq!(rb.num_rows(), 0); } @@ -365,7 +140,7 @@ mod tests { let rb = rb(1); let buf = write_mem(&[rb]); let reader = Cursor::new(buf); - let mut reader = get_reverse_reader(reader).unwrap(); + let mut reader = get_reverse_reader(reader); let rb = reader.next().unwrap().unwrap(); assert_eq!(rb.num_rows(), 1); } @@ -374,22 +149,36 @@ mod tests { fn test_multiple_row_multiple_rbs() { let buf = write_mem(&[rb(1), rb(2), rb(3)]); let reader = Cursor::new(buf); - let mut reader = get_reverse_reader(reader).unwrap(); + let mut reader = get_reverse_reader(reader); let rb = reader.next().unwrap().unwrap(); - assert_eq!(rb.num_rows(), 3); + assert_eq!(rb.num_rows(), 1); let col1_val: Vec = rb .column(0) .as_primitive::() .iter() .flatten() .collect(); - assert_eq!(col1_val, vec![0, 1, 2]); + assert_eq!(col1_val, vec![0]); let rb = reader.next().unwrap().unwrap(); assert_eq!(rb.num_rows(), 2); + let col1_val: Vec = rb + .column(0) + .as_primitive::() + .iter() + .flatten() + .collect(); + assert_eq!(col1_val, vec![0, 1]); let rb = reader.next().unwrap().unwrap(); - assert_eq!(rb.num_rows(), 1); + assert_eq!(rb.num_rows(), 3); + let col1_val: Vec = rb + .column(0) + .as_primitive::() + .iter() + .flatten() + .collect(); + assert_eq!(col1_val, vec![0, 1, 2]); } #[test] @@ -424,7 +213,7 @@ mod tests { write_message(&mut buf, schema, &options).unwrap(); let buf = Cursor::new(buf); - let reader = get_reverse_reader(buf).unwrap().flatten(); + let reader = get_reverse_reader(buf).flatten(); let mut sum = 0; for rb in reader { diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 57d24a3fb..5491de835 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -57,11 +57,7 @@ use crate::{ }; use super::{ - staging::{ - reader::{MergedRecordReader, MergedReverseRecordReader}, - writer::Writer, - StagingError, - }, + staging::{reader::MergedRecordReader, writer::Writer, StagingError}, LogStream, }; @@ -473,7 +469,7 @@ impl Stream { // warn!("staging files-\n{staging_files:?}\n"); for (parquet_path, arrow_files) in staging_files { - let record_reader = MergedReverseRecordReader::try_new(&arrow_files); + let record_reader = MergedRecordReader::try_new(&arrow_files); if record_reader.readers.is_empty() { continue; } @@ -490,7 +486,7 @@ impl Stream { .open(&part_path) .map_err(|_| StagingError::Create)?; let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props))?; - for ref record in record_reader.merged_iter(schema, time_partition.cloned()) { + for ref record in record_reader.merged_iter(schema) { writer.write(record)?; } writer.close()?; @@ -533,7 +529,7 @@ impl Stream { pub fn updated_schema(&self, current_schema: Schema) -> Schema { let staging_files = self.arrow_files(); - let record_reader = MergedRecordReader::try_new(&staging_files).unwrap(); + let record_reader = MergedRecordReader::try_new(&staging_files); if record_reader.readers.is_empty() { return current_schema; }