Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect invalid (unsupported) compression types when parsing #4637

Merged
merged 5 commits into from
Dec 17, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions datafusion/common/src/parsers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,61 @@
use sqlparser::parser::ParserError;

use crate::{DataFusionError, Result, ScalarValue};
use std::result;
use std::str::FromStr;

const SECONDS_PER_HOUR: f64 = 3_600_f64;
const NANOS_PER_SECOND: f64 = 1_000_000_000_f64;

/// Readable file compression type
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompressionTypeVariant {
/// Gzip-ed file
GZIP,
/// Bzip2-ed file
BZIP2,
/// Xz-ed file (liblzma)
XZ,
/// Uncompressed file
UNCOMPRESSED,
}

impl FromStr for CompressionTypeVariant {
type Err = ParserError;

fn from_str(s: &str) -> result::Result<Self, ParserError> {
let s = s.to_uppercase();
match s.as_str() {
"GZIP" | "GZ" => Ok(Self::GZIP),
"BZIP2" | "BZ2" => Ok(Self::BZIP2),
"XZ" => Ok(Self::XZ),
"" => Ok(Self::UNCOMPRESSED),
_ => Err(ParserError::ParserError(format!(
"Unsupported file compression type {}",
s
))),
}
}
}

impl ToString for CompressionTypeVariant {
fn to_string(&self) -> String {
match self {
Self::GZIP => "GZIP",
Self::BZIP2 => "BZIP2",
Self::XZ => "XZ",
Self::UNCOMPRESSED => "",
}
.to_string()
}
}

impl CompressionTypeVariant {
pub const fn is_compressed(&self) -> bool {
!matches!(self, &Self::UNCOMPRESSED)
}
}

#[derive(Clone, Copy)]
#[repr(u16)]
enum IntervalType {
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/catalog/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::catalog::schema::SchemaProvider;
use crate::datasource::datasource::TableProviderFactory;
use crate::datasource::TableProvider;
use crate::execution::context::SessionState;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{DFSchema, DataFusionError, OwnedTableReference};
use datafusion_expr::CreateExternalTable;
use futures::TryStreamExt;
Expand Down Expand Up @@ -136,7 +137,7 @@ impl ListingSchemaProvider {
table_partition_cols: vec![],
if_not_exists: false,
definition: None,
file_compression_type: "".to_string(),
file_compression_type: CompressionTypeVariant::UNCOMPRESSED,
options: Default::default(),
},
)
Expand Down
95 changes: 52 additions & 43 deletions datafusion/core/src/datasource/file_format/file_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use async_compression::tokio::bufread::{
use bytes::Bytes;
#[cfg(feature = "compression")]
use bzip2::read::BzDecoder;
use datafusion_common::parsers::CompressionTypeVariant;
#[cfg(feature = "compression")]
use flate2::read::GzDecoder;
use futures::Stream;
Expand All @@ -41,6 +42,7 @@ use std::str::FromStr;
use tokio_util::io::{ReaderStream, StreamReader};
#[cfg(feature = "compression")]
use xz2::read::XzDecoder;
use CompressionTypeVariant::*;

/// Define each `FileType`/`FileCompressionType`'s extension
pub trait GetExt {
Expand All @@ -50,48 +52,59 @@ pub trait GetExt {

/// Readable file compression type
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FileCompressionType {
/// Gzip-ed file
GZIP,
/// Bzip2-ed file
BZIP2,
/// Xz-ed file (liblzma)
XZ,
/// Uncompressed file
UNCOMPRESSED,
pub struct FileCompressionType {
variant: CompressionTypeVariant,
}

impl GetExt for FileCompressionType {
fn get_ext(&self) -> String {
match self {
FileCompressionType::GZIP => ".gz".to_owned(),
FileCompressionType::BZIP2 => ".bz2".to_owned(),
FileCompressionType::XZ => ".xz".to_owned(),
FileCompressionType::UNCOMPRESSED => "".to_owned(),
match self.variant {
GZIP => ".gz".to_owned(),
BZIP2 => ".bz2".to_owned(),
XZ => ".xz".to_owned(),
UNCOMPRESSED => "".to_owned(),
}
}
}

impl From<CompressionTypeVariant> for FileCompressionType {
fn from(t: CompressionTypeVariant) -> Self {
Self { variant: t }
}
}

impl FromStr for FileCompressionType {
type Err = DataFusionError;

fn from_str(s: &str) -> Result<Self> {
let s = s.to_uppercase();
match s.as_str() {
"GZIP" | "GZ" => Ok(FileCompressionType::GZIP),
"BZIP2" | "BZ2" => Ok(FileCompressionType::BZIP2),
"XZ" => Ok(FileCompressionType::XZ),
"" => Ok(FileCompressionType::UNCOMPRESSED),
_ => Err(DataFusionError::NotImplemented(format!(
"Unknown FileCompressionType: {}",
s
))),
}
let variant = CompressionTypeVariant::from_str(s).map_err(|_| {
DataFusionError::NotImplemented(format!("Unknown FileCompressionType: {}", s))
})?;
Ok(Self { variant })
}
}

/// `FileCompressionType` implementation
impl FileCompressionType {
/// Gzip-ed file
pub const GZIP: Self = Self { variant: GZIP };

/// Bzip2-ed file
pub const BZIP2: Self = Self { variant: BZIP2 };

/// Xz-ed file (liblzma)
pub const XZ: Self = Self { variant: XZ };

/// Uncompressed file
pub const UNCOMPRESSED: Self = Self {
variant: UNCOMPRESSED,
};
Comment on lines +89 to +101
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are a little tricky, but can minimize the code change.


/// The file is compressed or not
pub const fn is_compressed(&self) -> bool {
self.variant.is_compressed()
}

/// Given a `Stream`, create a `Stream` which data are decompressed with `FileCompressionType`.
pub fn convert_stream<T: Stream<Item = Result<Bytes>> + Unpin + Send + 'static>(
&self,
Expand All @@ -111,31 +124,29 @@ impl FileCompressionType {
None => Into::<DataFusionError>::into(e),
};

Ok(match self {
Ok(match self.variant {
#[cfg(feature = "compression")]
FileCompressionType::GZIP => Box::new(
GZIP => Box::new(
ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s)))
.map_err(err_converter),
),
#[cfg(feature = "compression")]
FileCompressionType::BZIP2 => Box::new(
BZIP2 => Box::new(
ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s)))
.map_err(err_converter),
),
#[cfg(feature = "compression")]
FileCompressionType::XZ => Box::new(
XZ => Box::new(
ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s)))
.map_err(err_converter),
),
#[cfg(not(feature = "compression"))]
FileCompressionType::GZIP
| FileCompressionType::BZIP2
| FileCompressionType::XZ => {
GZIP | BZIP2 | XZ => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
}
FileCompressionType::UNCOMPRESSED => Box::new(s),
UNCOMPRESSED => Box::new(s),
})
}

Expand All @@ -144,22 +155,20 @@ impl FileCompressionType {
&self,
r: T,
) -> Result<Box<dyn std::io::Read + Send>> {
Ok(match self {
Ok(match self.variant {
#[cfg(feature = "compression")]
FileCompressionType::GZIP => Box::new(GzDecoder::new(r)),
GZIP => Box::new(GzDecoder::new(r)),
#[cfg(feature = "compression")]
FileCompressionType::BZIP2 => Box::new(BzDecoder::new(r)),
BZIP2 => Box::new(BzDecoder::new(r)),
#[cfg(feature = "compression")]
FileCompressionType::XZ => Box::new(XzDecoder::new(r)),
XZ => Box::new(XzDecoder::new(r)),
#[cfg(not(feature = "compression"))]
FileCompressionType::GZIP
| FileCompressionType::BZIP2
| FileCompressionType::XZ => {
GZIP | BZIP2 | XZ => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
}
FileCompressionType::UNCOMPRESSED => Box::new(r),
UNCOMPRESSED => Box::new(r),
})
}
}
Expand Down Expand Up @@ -213,8 +222,8 @@ impl FileType {

match self {
FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext, c.get_ext())),
FileType::PARQUET | FileType::AVRO => match c {
FileCompressionType::UNCOMPRESSED => Ok(ext),
FileType::PARQUET | FileType::AVRO => match c.variant {
UNCOMPRESSED => Ok(ext),
_ => Err(DataFusionError::Internal(
"FileCompressionType can be specified for CSV/JSON FileType.".into(),
)),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl ListingTableConfig {
let file_compression_type = FileCompressionType::from_str(splitted)
.unwrap_or(FileCompressionType::UNCOMPRESSED);

if file_compression_type != FileCompressionType::UNCOMPRESSED {
if file_compression_type.is_compressed() {
splitted = exts.next().unwrap_or("");
}

Expand Down
10 changes: 1 addition & 9 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,7 @@ impl TableProviderFactory for ListingTableFactory {
state: &SessionState,
cmd: &CreateExternalTable,
) -> datafusion_common::Result<Arc<dyn TableProvider>> {
let file_compression_type = FileCompressionType::from_str(
cmd.file_compression_type.as_str(),
)
.map_err(|_| {
DataFusionError::Execution(format!(
"Unknown FileCompressionType {}",
cmd.file_compression_type.as_str()
))
})?;
let file_compression_type = FileCompressionType::from(cmd.file_compression_type);
let file_type = FileType::from_str(cmd.file_type.as_str()).map_err(|_| {
DataFusionError::Execution(format!("Unknown FileType {}", cmd.file_type))
})?;
Expand Down
3 changes: 2 additions & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::utils::{
};
use crate::{Expr, ExprSchemable, TableProviderFilterPushDown, TableSource};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference,
ScalarValue,
Expand Down Expand Up @@ -1487,7 +1488,7 @@ pub struct CreateExternalTable {
/// SQL used to create the table, if available
pub definition: Option<String>,
/// File compression type (GZIP, BZIP2, XZ)
pub file_compression_type: String,
pub file_compression_type: CompressionTypeVariant,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

/// Table(provider) specific options
pub options: HashMap<String, String>,
}
Expand Down
4 changes: 3 additions & 1 deletion datafusion/proto/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use datafusion::{
datasource::{provider_as_source, source_as_provider},
prelude::SessionContext,
};
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{context, Column, DataFusionError, OwnedTableReference};
use datafusion_expr::logical_plan::{builder::project, Prepare};
use datafusion_expr::{
Expand All @@ -51,6 +52,7 @@ use datafusion_expr::{
use prost::bytes::BufMut;
use prost::Message;
use std::fmt::Debug;
use std::str::FromStr;
use std::sync::Arc;

fn byte_to_string(b: u8) -> Result<String, DataFusionError> {
Expand Down Expand Up @@ -607,7 +609,7 @@ impl AsLogicalPlan for LogicalPlanNode {
.table_partition_cols
.clone(),
if_not_exists: create_extern_table.if_not_exists,
file_compression_type: create_extern_table.file_compression_type.to_string(),
file_compression_type: CompressionTypeVariant::from_str(&create_extern_table.file_compression_type).map_err(|_| DataFusionError::NotImplemented(format!("Unsupported file compression type {}", create_extern_table.file_compression_type)))?,
definition,
options: create_extern_table.options.clone(),
}))
Expand Down
Loading