diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index fc2238d03b..f8956c6a75 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -284,7 +284,7 @@ fn parquet_bytes_from_state( remove.extended_file_metadata = Some(false); } } - let files = state.file_actions().unwrap(); + let files = state.file_actions_iter().unwrap(); // protocol let jsons = std::iter::once(Action::Protocol(Protocol { min_reader_version: state.protocol().min_reader_version, @@ -323,8 +323,8 @@ fn parquet_bytes_from_state( })) .map(|a| serde_json::to_value(a).map_err(ProtocolError::from)) // adds - .chain(files.iter().map(|f| { - checkpoint_add_from_state(f, partition_col_data_types.as_slice(), &stats_conversions) + .chain(files.map(|f| { + checkpoint_add_from_state(&f, partition_col_data_types.as_slice(), &stats_conversions) })); // Create the arrow schema that represents the Checkpoint parquet file. @@ -349,9 +349,15 @@ fn parquet_bytes_from_state( let mut decoder = ReaderBuilder::new(arrow_schema) .with_batch_size(CHECKPOINT_RECORD_BATCH_SIZE) .build_decoder()?; - let jsons = jsons.collect::, _>>()?; - decoder.serialize(&jsons)?; + // Count of actions + let mut total_actions = 0; + + for j in jsons { + let buf = serde_json::to_string(&j?).unwrap(); + let _ = decoder.decode(buf.as_bytes())?; + total_actions += 1; + } while let Some(batch) = decoder.flush()? { writer.write(&batch)?; } @@ -359,7 +365,7 @@ fn parquet_bytes_from_state( let _ = writer.close()?; debug!("Finished writing checkpoint parquet buffer."); - let checkpoint = CheckPointBuilder::new(state.version(), jsons.len() as i64) + let checkpoint = CheckPointBuilder::new(state.version(), total_actions) .with_size_in_bytes(bytes.len() as i64) .build(); Ok((checkpoint, bytes::Bytes::from(bytes)))