Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update writers to include compression method in file name #1431

Merged
merged 6 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions rust/src/operations/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

use std::collections::HashMap;

use crate::action::Add;
use crate::storage::ObjectStoreRef;
use crate::writer::record_batch::{divide_by_partition_values, PartitionResult};
use crate::writer::stats::create_add;
use crate::writer::utils::{
self, arrow_schema_without_partitions, record_batch_without_partitions, PartitionPath,
ShareableBuffer,
};
use crate::{crate_version, DeltaResult, DeltaTableError};

use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
Expand All @@ -11,17 +21,6 @@ use parquet::arrow::ArrowWriter;
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;

use crate::action::Add;
use crate::crate_version;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::storage::ObjectStoreRef;
use crate::writer::record_batch::{divide_by_partition_values, PartitionResult};
use crate::writer::stats::create_add;
use crate::writer::utils::{
arrow_schema_without_partitions, record_batch_without_partitions, PartitionPath,
ShareableBuffer,
};

// TODO databricks often suggests a file size of 100mb, should we set this default?
const DEFAULT_TARGET_FILE_SIZE: usize = 104_857_600;
const DEFAULT_WRITE_BATCH_SIZE: usize = 1024;
Expand Down Expand Up @@ -297,12 +296,14 @@ impl PartitionWriter {
}

fn next_data_path(&mut self) -> Path {
let part = format!("{:0>5}", self.part_counter);
self.part_counter += 1;
// TODO: what does c000 mean?
// TODO handle file name for different compressions
let file_name = format!("part-{}-{}-c000.snappy.parquet", part, self.writer_id);
self.config.prefix.child(file_name)

utils::next_data_path(
&self.config.prefix,
self.part_counter,
&self.writer_id,
&self.config.writer_properties,
)
}

fn reset_writer(&mut self) -> DeltaResult<(ArrowWriter<ShareableBuffer>, ShareableBuffer)> {
Expand Down
30 changes: 18 additions & 12 deletions rust/src/writer/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,28 @@ use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::Arc;

use super::stats::create_add;
use super::utils::{
arrow_schema_without_partitions, next_data_path, record_batch_from_message,
record_batch_without_partitions, stringified_partition_value, PartitionPath,
};
use super::{DeltaWriter, DeltaWriterError};
use crate::builder::DeltaTableBuilder;
use crate::{action::Add, DeltaTable, DeltaTableError, DeltaTableMetaData, Schema};
use crate::{storage::DeltaObjectStore, writer::utils::ShareableBuffer};

use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use arrow::record_batch::*;
use bytes::Bytes;
use log::{info, warn};
use object_store::path::Path;
use object_store::ObjectStore;
use parquet::{
arrow::ArrowWriter, basic::Compression, errors::ParquetError,
file::properties::WriterProperties,
};
use serde_json::Value;

use super::stats::create_add;
use super::utils::{
arrow_schema_without_partitions, next_data_path, record_batch_from_message,
record_batch_without_partitions, stringified_partition_value,
};
use super::{DeltaWriter, DeltaWriterError};
use crate::builder::DeltaTableBuilder;
use crate::errors::DeltaTableError;
use crate::{action::Add, DeltaTable, DeltaTableMetaData, Schema};
use crate::{storage::DeltaObjectStore, writer::utils::ShareableBuffer};
use uuid::Uuid;

type BadValue = (Value, ParquetError);

Expand Down Expand Up @@ -362,7 +363,12 @@ impl DeltaWriter<Vec<Value>> for JsonWriter {

for (_, writer) in writers {
let metadata = writer.arrow_writer.close()?;
let path = next_data_path(&self.partition_columns, &writer.partition_values, None)?;
let prefix =
PartitionPath::from_hashmap(&self.partition_columns, &writer.partition_values)?;
let prefix = Path::parse(prefix)?;
let uuid = Uuid::new_v4();

let path = next_data_path(&prefix, 0, &uuid, &writer.writer_properties);
let obj_bytes = Bytes::from(writer.buffer.to_vec());
let file_size = obj_bytes.len() as i64;
self.storage.put(&path, obj_bytes).await?;
Expand Down
9 changes: 7 additions & 2 deletions rust/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ use arrow::record_batch::RecordBatch;
use arrow_array::ArrayRef;
use arrow_row::{RowConverter, SortField};
use bytes::Bytes;
use object_store::ObjectStore;
use object_store::{path::Path, ObjectStore};
use parquet::{arrow::ArrowWriter, errors::ParquetError};
use parquet::{basic::Compression, file::properties::WriterProperties};
use uuid::Uuid;

use super::stats::create_add;
use super::utils::{
Expand Down Expand Up @@ -226,7 +227,11 @@ impl DeltaWriter<RecordBatch> for RecordBatchWriter {

for (_, writer) in writers {
let metadata = writer.arrow_writer.close()?;
let path = next_data_path(&self.partition_columns, &writer.partition_values, None)?;
let prefix =
PartitionPath::from_hashmap(&self.partition_columns, &writer.partition_values)?;
let prefix = Path::parse(prefix)?;
let uuid = Uuid::new_v4();
let path = next_data_path(&prefix, 0, &uuid, &writer.writer_properties);
let obj_bytes = Bytes::from(writer.buffer.to_vec());
let file_size = obj_bytes.len() as i64;
self.storage.put(&path, obj_bytes).await?;
Expand Down
127 changes: 104 additions & 23 deletions rust/src/writer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ use arrow::json::ReaderBuilder;
use arrow::record_batch::*;
use object_store::path::Path;
use parking_lot::RwLock;
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
use parquet::schema::types::ColumnPath;
use serde_json::Value;
use uuid::Uuid;

Expand Down Expand Up @@ -75,33 +78,44 @@ impl Display for PartitionPath {
}
}

// TODO: parquet files have a 5 digit zero-padded prefix and a "c\d{3}" suffix that
// I have not been able to find documentation for yet.
/// Generate the name of the file to be written
/// prefix: The location of the file to be written
/// part_count: Used the indicate that single logical partition was split into multiple physical files
/// starts at 0. Is typically used when writer splits that data due to file size constraints
pub(crate) fn next_data_path(
partition_columns: &[String],
partition_values: &HashMap<String, Option<String>>,
part: Option<i32>,
) -> Result<Path, DeltaWriterError> {
// TODO: what does 00000 mean?
// TODO (roeap): my understanding is, that the values are used as a counter - i.e. if a single batch of
// data written to one partition needs to be split due to desired file size constraints.
let first_part = match part {
Some(count) => format!("{count:0>5}"),
_ => "00000".to_string(),
};
let uuid_part = Uuid::new_v4();
// TODO: what does c000 mean?
let last_part = "c000";
prefix: &Path,
part_count: usize,
writer_id: &Uuid,
writer_properties: &WriterProperties,
) -> Path {
fn compression_to_str(compression: &Compression) -> &str {
match compression {
Compression::UNCOMPRESSED => "",
Compression::SNAPPY => ".snappy",
Compression::GZIP(_) => ".gz",
Compression::LZO => ".lzo",
Compression::BROTLI(_) => ".brotli",
Compression::LZ4 => ".lz4",
Compression::ZSTD(_) => ".zstd",
Compression::LZ4_RAW => ".lz4_raw",
Blajda marked this conversation as resolved.
Show resolved Hide resolved
}
}

// NOTE: If we add a non-snappy option, file name must change
let file_name = format!("part-{first_part}-{uuid_part}-{last_part}.snappy.parquet");
// We can not access the default column properties but the current implementation will return
// the default compression when the column is not found
let column_path = ColumnPath::new(Vec::new());
let compression = writer_properties.compression(&column_path);

if partition_columns.is_empty() {
return Ok(Path::from(file_name));
}
let part = format!("{:0>5}", part_count);

let partition_key = PartitionPath::from_hashmap(partition_columns, partition_values)?;
Ok(Path::from(format!("{partition_key}/{file_name}")))
// TODO: what does c000 mean?
let file_name = format!(
"part-{}-{}-c000{}.parquet",
part,
writer_id,
compression_to_str(&compression)
);
Comment on lines +113 to +119
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We seem to be trying to copy Spark. This information is mostly added to make debugging easier. For example, they have a writer id because they might want to see all the files written by a particular node in a Spark cluster. But we don't have nodes or retries like Spark does. So I think we can have our own convention.

At a minimum, I think all we need is {uuid}.parquet. This is just to make sure we don't have collisions between files. Adding the compression codec can be nice for debugging. If we find more things that's are useful to delta-rs, we can consider adding them too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for locating the Spark source. I tried searching myself but only found the mapping for orc files.
I left some of the comments / TODO that were scattered in the implementation just in case if we wanted to completely copy the Spark approach. Since all writers now use this function it should be easier in the future to make our own convention as you mentioned.

prefix.child(file_name)
}

/// Convert a vector of json values to a RecordBatch
Expand Down Expand Up @@ -291,6 +305,7 @@ mod tests {
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array,
UInt32Array, UInt64Array, UInt8Array,
};
use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};

#[test]
fn test_stringified_partition_value() {
Expand Down Expand Up @@ -347,4 +362,70 @@ mod tests {
)
}
}

#[test]
fn test_data_path() {
let prefix = Path::parse("x=0/y=0").unwrap();
let uuid = Uuid::parse_str("02f09a3f-1624-3b1d-8409-44eff7708208").unwrap();

// Validated against Spark

let props = WriterProperties::builder()
.set_compression(Compression::UNCOMPRESSED)
.build();

assert_eq!(
next_data_path(&prefix, 1, &uuid, &props).as_ref(),
"x=0/y=0/part-00001-02f09a3f-1624-3b1d-8409-44eff7708208-c000.parquet"
);

let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
assert_eq!(
next_data_path(&prefix, 1, &uuid, &props).as_ref(),
"x=0/y=0/part-00001-02f09a3f-1624-3b1d-8409-44eff7708208-c000.snappy.parquet"
);

let props = WriterProperties::builder()
.set_compression(Compression::GZIP(GzipLevel::default()))
.build();
assert_eq!(
next_data_path(&prefix, 1, &uuid, &props).as_ref(),
"x=0/y=0/part-00001-02f09a3f-1624-3b1d-8409-44eff7708208-c000.gz.parquet"
);

let props = WriterProperties::builder()
.set_compression(Compression::LZ4)
.build();
assert_eq!(
next_data_path(&prefix, 1, &uuid, &props).as_ref(),
"x=0/y=0/part-00001-02f09a3f-1624-3b1d-8409-44eff7708208-c000.lz4.parquet"
);

let props = WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.build();
assert_eq!(
next_data_path(&prefix, 1, &uuid, &props).as_ref(),
"x=0/y=0/part-00001-02f09a3f-1624-3b1d-8409-44eff7708208-c000.zstd.parquet"
);

// Unable to validate against spark
let props = WriterProperties::builder()
.set_compression(Compression::LZ4_RAW)
.build();
assert_eq!(
next_data_path(&prefix, 1, &uuid, &props).as_ref(),
"x=0/y=0/part-00001-02f09a3f-1624-3b1d-8409-44eff7708208-c000.lz4_raw.parquet"
);

let props = WriterProperties::builder()
.set_compression(Compression::BROTLI(BrotliLevel::default()))
.build();
assert_eq!(
next_data_path(&prefix, 1, &uuid, &props).as_ref(),
"x=0/y=0/part-00001-02f09a3f-1624-3b1d-8409-44eff7708208-c000.brotli.parquet"
);
}
}