Skip to content

Commit 913979a

Browse files
committed
limit record_batch row count
1 parent 2bb93ec commit 913979a

File tree

3 files changed

+20
-12
lines changed

3 files changed

+20
-12
lines changed

src/parseable/staging/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,6 @@ pub enum StagingError {
3030
ObjectStorage(#[from] std::io::Error),
3131
#[error("Could not generate parquet file")]
3232
Create,
33-
// #[error("Metadata Error: {0}")]
34-
// Metadata(#[from] MetadataError),
33+
#[error("Too many rows: {0}")]
34+
RowLimit(usize),
3535
}

src/parseable/staging/writer.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,13 @@ use crate::utils::arrow::adapt_batch;
3737
use super::StagingError;
3838

3939
/// Context regarding `.arrows` file being persisted onto disk
40-
pub struct DiskWriter<const N: usize> {
40+
pub struct DiskWriter {
4141
inner: FileWriter<BufWriter<File>>,
4242
/// Used to ensure un"finish"ed arrow files are renamed on "finish"
4343
path_prefix: String,
4444
}
4545

46-
impl<const N: usize> DiskWriter<N> {
46+
impl DiskWriter {
4747
pub fn new(path_prefix: String, schema: &Schema) -> Result<Self, StagingError> {
4848
// Live writes happen into partfile
4949
let partfile_path = format!("{path_prefix}.{ARROW_PART_FILE_EXTENSION}");
@@ -81,12 +81,6 @@ impl<const N: usize> DiskWriter<N> {
8181
}
8282
}
8383

84-
#[derive(Default)]
85-
pub struct Writer<const N: usize> {
86-
pub mem: MemWriter<N>,
87-
pub disk: HashMap<String, DiskWriter<N>>,
88-
}
89-
9084
/// Structure to keep recordbatches in memory.
9185
///
9286
/// Any new schema is updated in the schema map.
@@ -178,3 +172,9 @@ impl<const N: usize> MutableBuffer<N> {
178172
}
179173
}
180174
}
175+
176+
#[derive(Default)]
177+
pub struct Writer<const N: usize> {
178+
pub mem: MemWriter<N>,
179+
pub disk: HashMap<String, DiskWriter>,
180+
}

src/parseable/streams.rs

+10-2
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ use super::{
6666
LogStream, ARROW_FILE_EXTENSION,
6767
};
6868

69+
// ~16K rows is default in-memory limit for each recordbatch
70+
const MAX_RECORD_BATCH_SIZE: usize = 16384;
71+
6972
/// Regex pattern for parsing arrow file names.
7073
///
7174
/// # Format
@@ -113,8 +116,8 @@ pub struct Stream {
113116
pub metadata: RwLock<LogStreamMetadata>,
114117
pub data_path: PathBuf,
115118
pub options: Arc<Options>,
116-
/// Writer with a 16KB buffer size for optimal I/O performance.
117-
pub writer: Mutex<Writer<16384>>,
119+
/// Writer with a ~16K rows limit for optimal I/O performance.
120+
pub writer: Mutex<Writer<MAX_RECORD_BATCH_SIZE>>,
118121
pub ingestor_id: Option<String>,
119122
}
120123

@@ -147,6 +150,11 @@ impl Stream {
147150
custom_partition_values: &HashMap<String, String>,
148151
stream_type: StreamType,
149152
) -> Result<(), StagingError> {
153+
let row_count = record.num_rows();
154+
if row_count > MAX_RECORD_BATCH_SIZE {
155+
return Err(StagingError::RowLimit(row_count));
156+
}
157+
150158
let mut guard = self.writer.lock().unwrap();
151159
if self.options.mode != Mode::Query || stream_type == StreamType::Internal {
152160
match guard.disk.get_mut(schema_key) {

0 commit comments

Comments
 (0)