Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support compression levels #3847

Merged
merged 6 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 34 additions & 18 deletions parquet/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

use std::{fmt, str};

use crate::compression::{BrotliLevel, GzipLevel, ZstdLevel};
use crate::format as parquet;

use crate::errors::{ParquetError, Result};
Expand Down Expand Up @@ -286,11 +287,11 @@ pub enum Encoding {
pub enum Compression {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change and is set in

pub fn set_compression(mut self, value: Compression) -> Self {

One way to make this not breaking could be to add a new enum CompressionOptions that can also be set via the WriterPropertiesBuilder.

UNCOMPRESSED,
SNAPPY,
GZIP,
GZIP(GzipLevel),
LZO,
BROTLI,
BROTLI(BrotliLevel),
LZ4,
ZSTD,
ZSTD(ZstdLevel),
LZ4_RAW,
}

Expand Down Expand Up @@ -830,11 +831,11 @@ impl TryFrom<parquet::CompressionCodec> for Compression {
Ok(match value {
parquet::CompressionCodec::UNCOMPRESSED => Compression::UNCOMPRESSED,
parquet::CompressionCodec::SNAPPY => Compression::SNAPPY,
parquet::CompressionCodec::GZIP => Compression::GZIP,
parquet::CompressionCodec::GZIP => Compression::GZIP(Default::default()),
parquet::CompressionCodec::LZO => Compression::LZO,
parquet::CompressionCodec::BROTLI => Compression::BROTLI,
parquet::CompressionCodec::BROTLI => Compression::BROTLI(Default::default()),
parquet::CompressionCodec::LZ4 => Compression::LZ4,
parquet::CompressionCodec::ZSTD => Compression::ZSTD,
parquet::CompressionCodec::ZSTD => Compression::ZSTD(Default::default()),
parquet::CompressionCodec::LZ4_RAW => Compression::LZ4_RAW,
_ => {
return Err(general_err!(
Expand All @@ -851,11 +852,11 @@ impl From<Compression> for parquet::CompressionCodec {
match value {
Compression::UNCOMPRESSED => parquet::CompressionCodec::UNCOMPRESSED,
Compression::SNAPPY => parquet::CompressionCodec::SNAPPY,
Compression::GZIP => parquet::CompressionCodec::GZIP,
Compression::GZIP(_) => parquet::CompressionCodec::GZIP,
Compression::LZO => parquet::CompressionCodec::LZO,
Compression::BROTLI => parquet::CompressionCodec::BROTLI,
Compression::BROTLI(_) => parquet::CompressionCodec::BROTLI,
Compression::LZ4 => parquet::CompressionCodec::LZ4,
Compression::ZSTD => parquet::CompressionCodec::ZSTD,
Compression::ZSTD(_) => parquet::CompressionCodec::ZSTD,
Compression::LZ4_RAW => parquet::CompressionCodec::LZ4_RAW,
}
}
Expand Down Expand Up @@ -1783,11 +1784,20 @@ mod tests {
fn test_display_compression() {
assert_eq!(Compression::UNCOMPRESSED.to_string(), "UNCOMPRESSED");
assert_eq!(Compression::SNAPPY.to_string(), "SNAPPY");
assert_eq!(Compression::GZIP.to_string(), "GZIP");
assert_eq!(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not so sure about whether the level should get displayed.

Compression::GZIP(Default::default()).to_string(),
"GZIP(GzipLevel(6))"
);
assert_eq!(Compression::LZO.to_string(), "LZO");
assert_eq!(Compression::BROTLI.to_string(), "BROTLI");
assert_eq!(
Compression::BROTLI(Default::default()).to_string(),
"BROTLI(BrotliLevel(1))"
);
assert_eq!(Compression::LZ4.to_string(), "LZ4");
assert_eq!(Compression::ZSTD.to_string(), "ZSTD");
assert_eq!(
Compression::ZSTD(Default::default()).to_string(),
"ZSTD(ZstdLevel(1))"
);
}

#[test]
Expand All @@ -1802,23 +1812,23 @@ mod tests {
);
assert_eq!(
Compression::try_from(parquet::CompressionCodec::GZIP).unwrap(),
Compression::GZIP
Compression::GZIP(Default::default())
);
assert_eq!(
Compression::try_from(parquet::CompressionCodec::LZO).unwrap(),
Compression::LZO
);
assert_eq!(
Compression::try_from(parquet::CompressionCodec::BROTLI).unwrap(),
Compression::BROTLI
Compression::BROTLI(Default::default())
);
assert_eq!(
Compression::try_from(parquet::CompressionCodec::LZ4).unwrap(),
Compression::LZ4
);
assert_eq!(
Compression::try_from(parquet::CompressionCodec::ZSTD).unwrap(),
Compression::ZSTD
Compression::ZSTD(Default::default())
);
}

Expand All @@ -1832,14 +1842,20 @@ mod tests {
parquet::CompressionCodec::SNAPPY,
Compression::SNAPPY.into()
);
assert_eq!(parquet::CompressionCodec::GZIP, Compression::GZIP.into());
assert_eq!(
parquet::CompressionCodec::GZIP,
Compression::GZIP(Default::default()).into()
);
assert_eq!(parquet::CompressionCodec::LZO, Compression::LZO.into());
assert_eq!(
parquet::CompressionCodec::BROTLI,
Compression::BROTLI.into()
Compression::BROTLI(Default::default()).into()
);
assert_eq!(parquet::CompressionCodec::LZ4, Compression::LZ4.into());
assert_eq!(parquet::CompressionCodec::ZSTD, Compression::ZSTD.into());
assert_eq!(
parquet::CompressionCodec::ZSTD,
Compression::ZSTD(Default::default()).into()
);
}

#[test]
Expand Down
21 changes: 15 additions & 6 deletions parquet/src/bin/parquet-fromcsv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,11 @@ fn compression_from_str(cmp: &str) -> Result<Compression, String> {
match cmp.to_uppercase().as_str() {
"UNCOMPRESSED" => Ok(Compression::UNCOMPRESSED),
"SNAPPY" => Ok(Compression::SNAPPY),
"GZIP" => Ok(Compression::GZIP),
"GZIP" => Ok(Compression::GZIP(Default::default())),
"LZO" => Ok(Compression::LZO),
"BROTLI" => Ok(Compression::BROTLI),
"BROTLI" => Ok(Compression::BROTLI(Default::default())),
"LZ4" => Ok(Compression::LZ4),
"ZSTD" => Ok(Compression::ZSTD),
"ZSTD" => Ok(Compression::ZSTD(Default::default())),
v => Err(
format!("Unknown compression {v} : possible values UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD \n\nFor more information try --help")
)
Expand Down Expand Up @@ -507,15 +507,24 @@ mod tests {
let args = parse_args(vec!["--parquet-compression", "snappy"]).unwrap();
assert_eq!(args.parquet_compression, Compression::SNAPPY);
let args = parse_args(vec!["--parquet-compression", "gzip"]).unwrap();
assert_eq!(args.parquet_compression, Compression::GZIP);
assert_eq!(
args.parquet_compression,
Compression::GZIP(Default::default())
);
let args = parse_args(vec!["--parquet-compression", "lzo"]).unwrap();
assert_eq!(args.parquet_compression, Compression::LZO);
let args = parse_args(vec!["--parquet-compression", "lz4"]).unwrap();
assert_eq!(args.parquet_compression, Compression::LZ4);
let args = parse_args(vec!["--parquet-compression", "brotli"]).unwrap();
assert_eq!(args.parquet_compression, Compression::BROTLI);
assert_eq!(
args.parquet_compression,
Compression::BROTLI(Default::default())
);
let args = parse_args(vec!["--parquet-compression", "zstd"]).unwrap();
assert_eq!(args.parquet_compression, Compression::ZSTD);
assert_eq!(
args.parquet_compression,
Compression::ZSTD(Default::default())
);
}

#[test]
Expand Down
6 changes: 3 additions & 3 deletions parquet/src/bin/parquet-layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,11 @@ fn compression(compression: Compression) -> Option<&'static str> {
match compression {
Compression::UNCOMPRESSED => None,
Compression::SNAPPY => Some("snappy"),
Compression::GZIP => Some("gzip"),
Compression::GZIP(_) => Some("gzip"),
Compression::LZO => Some("lzo"),
Compression::BROTLI => Some("brotli"),
Compression::BROTLI(_) => Some("brotli"),
Compression::LZ4 => Some("lz4"),
Compression::ZSTD => Some("zstd"),
Compression::ZSTD(_) => Some("zstd"),
Compression::LZ4_RAW => Some("lz4_raw"),
}
}
Expand Down
6 changes: 3 additions & 3 deletions parquet/src/bin/parquet-rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ impl From<CompressionArgs> for Compression {
match value {
CompressionArgs::None => Self::UNCOMPRESSED,
CompressionArgs::Snappy => Self::SNAPPY,
CompressionArgs::Gzip => Self::GZIP,
CompressionArgs::Gzip => Self::GZIP(Default::default()),
CompressionArgs::Lzo => Self::LZO,
CompressionArgs::Brotli => Self::BROTLI,
CompressionArgs::Brotli => Self::BROTLI(Default::default()),
CompressionArgs::Lz4 => Self::LZ4,
CompressionArgs::Zstd => Self::ZSTD,
CompressionArgs::Zstd => Self::ZSTD(Default::default()),
CompressionArgs::Lz4Raw => Self::LZ4_RAW,
}
}
Expand Down
Loading