Skip to content

Commit

Permalink
feat(rust): expose WriterProperties method on RecordBatchWriter and D…
Browse files Browse the repository at this point in the history
…eltaWriter (#1497)

# Description
Adds the capability to pass a configured WriterProperties to the
`RecordBatchWriter` and `DeltaWriter` similar to how the
`OptimizeBuilder` can be updated.

# Related Issue(s)
- closes #1469 
- closes #1235 

# Documentation
  • Loading branch information
theelderbeever authored Jun 28, 2023
1 parent 87c68e3 commit d00b620
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 5 deletions.
19 changes: 18 additions & 1 deletion rust/examples/basic_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ use arrow::{
};
use deltalake::operations::collect_sendable_stream;
use deltalake::{action::SaveMode, DeltaOps, SchemaDataType, SchemaField};
use parquet::{
basic::{Compression, ZstdLevel},
file::properties::WriterProperties,
};

use std::sync::Arc;

fn get_table_columns() -> Vec<SchemaField> {
Expand Down Expand Up @@ -55,15 +60,27 @@ async fn main() -> Result<(), deltalake::errors::DeltaTableError> {

assert_eq!(table.version(), 0);

let writer_properties = WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap()))
.build();

let batch = get_table_batches();
let table = DeltaOps(table).write(vec![batch.clone()]).await?;
let table = DeltaOps(table)
.write(vec![batch.clone()])
.with_writer_properties(writer_properties)
.await?;

assert_eq!(table.version(), 1);

let writer_properties = WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap()))
.build();

// To overwrite instead of append (which is the default), use `.with_save_mode`:
let table = DeltaOps(table)
.write(vec![batch.clone()])
.with_save_mode(SaveMode::Overwrite)
.with_writer_properties(writer_properties)
.await?;

assert_eq!(table.version(), 2);
Expand Down
15 changes: 12 additions & 3 deletions rust/examples/recordbatch-writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ use deltalake::errors::DeltaTableError;
use deltalake::writer::{DeltaWriter, RecordBatchWriter};
use deltalake::*;
use log::*;

use object_store::path::Path;
use parquet::{
basic::{Compression, ZstdLevel},
file::properties::WriterProperties,
};

use std::collections::HashMap;
use std::sync::Arc;

Expand All @@ -42,8 +46,13 @@ async fn main() -> Result<(), anyhow::Error> {
Err(err) => Err(err).unwrap(),
};

let mut writer =
RecordBatchWriter::for_table(&table).expect("Failed to make RecordBatchWriter");
let writer_properties = WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap()))
.build();

let mut writer = RecordBatchWriter::for_table(&table)
.expect("Failed to make RecordBatchWriter")
.with_writer_properties(writer_properties);

let records = fetch_readings();
let batch = convert_to_batch(&table, &records);
Expand Down
11 changes: 10 additions & 1 deletion rust/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ pub struct WriteBuilder {
batches: Option<Vec<RecordBatch>>,
/// how to handle cast failures, either return NULL (safe=true) or return ERR (safe=false)
safe_cast: bool,
/// Parquet writer properties
writer_properties: Option<WriterProperties>,
}

impl WriteBuilder {
Expand All @@ -116,6 +118,7 @@ impl WriteBuilder {
write_batch_size: None,
batches: None,
safe_cast: false,
writer_properties: None,
}
}

Expand Down Expand Up @@ -178,6 +181,12 @@ impl WriteBuilder {
self
}

/// Specify the writer properties to use when writing a parquet file
pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self {
self.writer_properties = Some(writer_properties);
self
}

async fn check_preconditions(&self) -> DeltaResult<Vec<Action>> {
match self.store.is_delta_table_location().await? {
true => {
Expand Down Expand Up @@ -390,7 +399,7 @@ impl std::future::IntoFuture for WriteBuilder {
this.store.clone(),
this.target_file_size,
this.write_batch_size,
None,
this.writer_properties,
this.safe_cast,
)
.await?;
Expand Down
6 changes: 6 additions & 0 deletions rust/src/operations/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ impl DeltaWriter {
}
}

/// Apply custom writer_properties to the underlying parquet writer
pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self {
self.config.writer_properties = writer_properties;
self
}

fn divide_by_partition_values(
&mut self,
values: &RecordBatch,
Expand Down
6 changes: 6 additions & 0 deletions rust/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ impl RecordBatchWriter {
Ok(())
}

/// Sets the writer properties for the underlying arrow writer.
pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self {
self.writer_properties = writer_properties;
self
}

fn divide_by_partition_values(
&mut self,
values: &RecordBatch,
Expand Down

0 comments on commit d00b620

Please sign in to comment.