From d00b620b9dbc565ec16598b7e32e2fe5cdeaf4ba Mon Sep 17 00:00:00 2001 From: Taylor Beever Date: Wed, 28 Jun 2023 15:33:01 -0600 Subject: [PATCH] feat(rust): expose WriterProperties method on RecordBatchWriter and DeltaWriter (#1497) # Description Adds the capability to pass a configured WriterProperties to the `RecordBatchWriter` and `DeltaWriter` similar to how the `OptimizeBuilder` can be updated. # Related Issue(s) - closes #1469 - closes #1235 # Documentation --- rust/examples/basic_operations.rs | 19 ++++++++++++++++++- rust/examples/recordbatch-writer.rs | 15 ++++++++++++--- rust/src/operations/write.rs | 11 ++++++++++- rust/src/operations/writer.rs | 6 ++++++ rust/src/writer/record_batch.rs | 6 ++++++ 5 files changed, 52 insertions(+), 5 deletions(-) diff --git a/rust/examples/basic_operations.rs b/rust/examples/basic_operations.rs index 8aa91a6cb3..5c1fb46e86 100644 --- a/rust/examples/basic_operations.rs +++ b/rust/examples/basic_operations.rs @@ -5,6 +5,11 @@ use arrow::{ }; use deltalake::operations::collect_sendable_stream; use deltalake::{action::SaveMode, DeltaOps, SchemaDataType, SchemaField}; +use parquet::{ + basic::{Compression, ZstdLevel}, + file::properties::WriterProperties, +}; + use std::sync::Arc; fn get_table_columns() -> Vec { @@ -55,15 +60,27 @@ async fn main() -> Result<(), deltalake::errors::DeltaTableError> { assert_eq!(table.version(), 0); + let writer_properties = WriterProperties::builder() + .set_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap())) + .build(); + let batch = get_table_batches(); - let table = DeltaOps(table).write(vec![batch.clone()]).await?; + let table = DeltaOps(table) + .write(vec![batch.clone()]) + .with_writer_properties(writer_properties) + .await?; assert_eq!(table.version(), 1); + let writer_properties = WriterProperties::builder() + .set_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap())) + .build(); + // To overwrite instead of append (which is the default), use `.with_save_mode`: let table = DeltaOps(table) .write(vec![batch.clone()]) .with_save_mode(SaveMode::Overwrite) + .with_writer_properties(writer_properties) .await?; assert_eq!(table.version(), 2); diff --git a/rust/examples/recordbatch-writer.rs b/rust/examples/recordbatch-writer.rs index ab61e93b1d..85f055b705 100644 --- a/rust/examples/recordbatch-writer.rs +++ b/rust/examples/recordbatch-writer.rs @@ -14,8 +14,12 @@ use deltalake::errors::DeltaTableError; use deltalake::writer::{DeltaWriter, RecordBatchWriter}; use deltalake::*; use log::*; - use object_store::path::Path; +use parquet::{ + basic::{Compression, ZstdLevel}, + file::properties::WriterProperties, +}; + use std::collections::HashMap; use std::sync::Arc; @@ -42,8 +46,13 @@ async fn main() -> Result<(), anyhow::Error> { Err(err) => Err(err).unwrap(), }; - let mut writer = - RecordBatchWriter::for_table(&table).expect("Failed to make RecordBatchWriter"); + let writer_properties = WriterProperties::builder() + .set_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap())) + .build(); + + let mut writer = RecordBatchWriter::for_table(&table) + .expect("Failed to make RecordBatchWriter") + .with_writer_properties(writer_properties); let records = fetch_readings(); let batch = convert_to_batch(&table, &records); diff --git a/rust/src/operations/write.rs b/rust/src/operations/write.rs index 9c7b7ca3d5..d2285e7446 100644 --- a/rust/src/operations/write.rs +++ b/rust/src/operations/write.rs @@ -99,6 +99,8 @@ pub struct WriteBuilder { batches: Option>, /// how to handle cast failures, either return NULL (safe=true) or return ERR (safe=false) safe_cast: bool, + /// Parquet writer properties + writer_properties: Option, } impl WriteBuilder { @@ -116,6 +118,7 @@ impl WriteBuilder { write_batch_size: None, batches: None, safe_cast: false, + writer_properties: None, } } @@ -178,6 +181,12 @@ impl WriteBuilder { self } + /// Specify the writer properties to use when writing a parquet file + pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self { + self.writer_properties = Some(writer_properties); + self + } + async fn check_preconditions(&self) -> DeltaResult> { match self.store.is_delta_table_location().await? { true => { @@ -390,7 +399,7 @@ impl std::future::IntoFuture for WriteBuilder { this.store.clone(), this.target_file_size, this.write_batch_size, - None, + this.writer_properties, this.safe_cast, ) .await?; diff --git a/rust/src/operations/writer.rs b/rust/src/operations/writer.rs index 64c5ba1d12..4fef892bf8 100644 --- a/rust/src/operations/writer.rs +++ b/rust/src/operations/writer.rs @@ -134,6 +134,12 @@ impl DeltaWriter { } } + /// Apply custom writer_properties to the underlying parquet writer + pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self { + self.config.writer_properties = writer_properties; + self + } + fn divide_by_partition_values( &mut self, values: &RecordBatch, diff --git a/rust/src/writer/record_batch.rs b/rust/src/writer/record_batch.rs index 652f62f1e0..2b62fe686c 100644 --- a/rust/src/writer/record_batch.rs +++ b/rust/src/writer/record_batch.rs @@ -196,6 +196,12 @@ impl RecordBatchWriter { Ok(()) } + /// Sets the writer properties for the underlying arrow writer. + pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self { + self.writer_properties = writer_properties; + self + } + fn divide_by_partition_values( &mut self, values: &RecordBatch,