From 6f98eaaed7e16125c0d7bdfd3c64aa8143ae7dac Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Tue, 22 Oct 2024 22:59:08 +0000 Subject: [PATCH 1/2] fix: avoid collection all actions into memory when producing a checkpoint For excessively large tables which do not have sufficient checkpoints or excessive `action` volume between checkpoints, the checkpoint code can consume an unreasonable amount of memory. In the case evaluated there were over a thousand transactions between transactions, but that resulted in over 2M actions which needed to be persisted to the checkpoint. This scenario led to 19.9GB of memory utilized when producing checkpoints for a table which used 3.8GB to open. By iteratively processing the buffers which need to be serialized, the memory is dramatically reduced: **Before**: Maximum resident set size (kbytes): 19964728 **After:** Maximum resident set size (kbytes): 4621648 Sponsored-by: Scribd Inc Signed-off-by: R. Tyler Croy --- crates/core/src/protocol/checkpoints.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index fc2238d03b..0d3836cefe 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -349,9 +349,15 @@ fn parquet_bytes_from_state( let mut decoder = ReaderBuilder::new(arrow_schema) .with_batch_size(CHECKPOINT_RECORD_BATCH_SIZE) .build_decoder()?; - let jsons = jsons.collect::, _>>()?; - decoder.serialize(&jsons)?; + // Count of actions + let mut total_actions = 0; + + for j in jsons { + let buf = serde_json::to_string(&j?).unwrap(); + let _ = decoder.decode(buf.as_bytes())?; + total_actions += 1; + } while let Some(batch) = decoder.flush()? { writer.write(&batch)?; } @@ -359,7 +365,7 @@ fn parquet_bytes_from_state( let _ = writer.close()?; debug!("Finished writing checkpoint parquet buffer."); - let checkpoint = CheckPointBuilder::new(state.version(), jsons.len() as i64) + let checkpoint = CheckPointBuilder::new(state.version(), total_actions) .with_size_in_bytes(bytes.len() as i64) .build(); Ok((checkpoint, bytes::Bytes::from(bytes))) From ad83562a36e03662bc64f65f37bc702138c037ee Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Tue, 22 Oct 2024 23:04:06 +0000 Subject: [PATCH 2/2] fix: avoid collecting file actions only to re-iterate upon them Similar to the prior commit, this improves the iterative nature of checkpoint creation. **Before:** Maximum resident set size (kbytes): 4621648 **After:** Maximum resident set size (kbytes): 4017132 Sponsored-by: Scribd Inc Signed-off-by: R. Tyler Croy --- crates/core/src/protocol/checkpoints.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index 0d3836cefe..f8956c6a75 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -284,7 +284,7 @@ fn parquet_bytes_from_state( remove.extended_file_metadata = Some(false); } } - let files = state.file_actions().unwrap(); + let files = state.file_actions_iter().unwrap(); // protocol let jsons = std::iter::once(Action::Protocol(Protocol { min_reader_version: state.protocol().min_reader_version, @@ -323,8 +323,8 @@ fn parquet_bytes_from_state( })) .map(|a| serde_json::to_value(a).map_err(ProtocolError::from)) // adds - .chain(files.iter().map(|f| { - checkpoint_add_from_state(f, partition_col_data_types.as_slice(), &stats_conversions) + .chain(files.map(|f| { + checkpoint_add_from_state(&f, partition_col_data_types.as_slice(), &stats_conversions) })); // Create the arrow schema that represents the Checkpoint parquet file.