Skip to content

Commit

Permalink
Detect invalid (unsupported) compression types when parsing (#4637)
Browse files Browse the repository at this point in the history
* check compression type when parsing

Signed-off-by: remzi <13716567376yh@gmail.com>

* refactor

Signed-off-by: remzi <13716567376yh@gmail.com>

* test

Signed-off-by: remzi <13716567376yh@gmail.com>

Signed-off-by: remzi <13716567376yh@gmail.com>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
HaoYang670 and alamb authored Dec 17, 2022
1 parent 067d044 commit c2f199a
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 78 deletions.
50 changes: 50 additions & 0 deletions datafusion/common/src/parsers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,61 @@
use sqlparser::parser::ParserError;

use crate::{DataFusionError, Result, ScalarValue};
use std::result;
use std::str::FromStr;

const SECONDS_PER_HOUR: f64 = 3_600_f64;
const NANOS_PER_SECOND: f64 = 1_000_000_000_f64;

/// Readable file compression type
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompressionTypeVariant {
/// Gzip-ed file
GZIP,
/// Bzip2-ed file
BZIP2,
/// Xz-ed file (liblzma)
XZ,
/// Uncompressed file
UNCOMPRESSED,
}

impl FromStr for CompressionTypeVariant {
type Err = ParserError;

fn from_str(s: &str) -> result::Result<Self, ParserError> {
let s = s.to_uppercase();
match s.as_str() {
"GZIP" | "GZ" => Ok(Self::GZIP),
"BZIP2" | "BZ2" => Ok(Self::BZIP2),
"XZ" => Ok(Self::XZ),
"" => Ok(Self::UNCOMPRESSED),
_ => Err(ParserError::ParserError(format!(
"Unsupported file compression type {}",
s
))),
}
}
}

impl ToString for CompressionTypeVariant {
fn to_string(&self) -> String {
match self {
Self::GZIP => "GZIP",
Self::BZIP2 => "BZIP2",
Self::XZ => "XZ",
Self::UNCOMPRESSED => "",
}
.to_string()
}
}

impl CompressionTypeVariant {
pub const fn is_compressed(&self) -> bool {
!matches!(self, &Self::UNCOMPRESSED)
}
}

#[derive(Clone, Copy)]
#[repr(u16)]
enum IntervalType {
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/catalog/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::catalog::schema::SchemaProvider;
use crate::datasource::datasource::TableProviderFactory;
use crate::datasource::TableProvider;
use crate::execution::context::SessionState;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{DFSchema, DataFusionError, OwnedTableReference};
use datafusion_expr::CreateExternalTable;
use futures::TryStreamExt;
Expand Down Expand Up @@ -136,7 +137,7 @@ impl ListingSchemaProvider {
table_partition_cols: vec![],
if_not_exists: false,
definition: None,
file_compression_type: "".to_string(),
file_compression_type: CompressionTypeVariant::UNCOMPRESSED,
options: Default::default(),
},
)
Expand Down
95 changes: 52 additions & 43 deletions datafusion/core/src/datasource/file_format/file_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use async_compression::tokio::bufread::{
use bytes::Bytes;
#[cfg(feature = "compression")]
use bzip2::read::BzDecoder;
use datafusion_common::parsers::CompressionTypeVariant;
#[cfg(feature = "compression")]
use flate2::read::GzDecoder;
use futures::Stream;
Expand All @@ -41,6 +42,7 @@ use std::str::FromStr;
use tokio_util::io::{ReaderStream, StreamReader};
#[cfg(feature = "compression")]
use xz2::read::XzDecoder;
use CompressionTypeVariant::*;

/// Define each `FileType`/`FileCompressionType`'s extension
pub trait GetExt {
Expand All @@ -50,48 +52,59 @@ pub trait GetExt {

/// Readable file compression type
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FileCompressionType {
/// Gzip-ed file
GZIP,
/// Bzip2-ed file
BZIP2,
/// Xz-ed file (liblzma)
XZ,
/// Uncompressed file
UNCOMPRESSED,
pub struct FileCompressionType {
variant: CompressionTypeVariant,
}

impl GetExt for FileCompressionType {
fn get_ext(&self) -> String {
match self {
FileCompressionType::GZIP => ".gz".to_owned(),
FileCompressionType::BZIP2 => ".bz2".to_owned(),
FileCompressionType::XZ => ".xz".to_owned(),
FileCompressionType::UNCOMPRESSED => "".to_owned(),
match self.variant {
GZIP => ".gz".to_owned(),
BZIP2 => ".bz2".to_owned(),
XZ => ".xz".to_owned(),
UNCOMPRESSED => "".to_owned(),
}
}
}

impl From<CompressionTypeVariant> for FileCompressionType {
fn from(t: CompressionTypeVariant) -> Self {
Self { variant: t }
}
}

impl FromStr for FileCompressionType {
type Err = DataFusionError;

fn from_str(s: &str) -> Result<Self> {
let s = s.to_uppercase();
match s.as_str() {
"GZIP" | "GZ" => Ok(FileCompressionType::GZIP),
"BZIP2" | "BZ2" => Ok(FileCompressionType::BZIP2),
"XZ" => Ok(FileCompressionType::XZ),
"" => Ok(FileCompressionType::UNCOMPRESSED),
_ => Err(DataFusionError::NotImplemented(format!(
"Unknown FileCompressionType: {}",
s
))),
}
let variant = CompressionTypeVariant::from_str(s).map_err(|_| {
DataFusionError::NotImplemented(format!("Unknown FileCompressionType: {}", s))
})?;
Ok(Self { variant })
}
}

/// `FileCompressionType` implementation
impl FileCompressionType {
/// Gzip-ed file
pub const GZIP: Self = Self { variant: GZIP };

/// Bzip2-ed file
pub const BZIP2: Self = Self { variant: BZIP2 };

/// Xz-ed file (liblzma)
pub const XZ: Self = Self { variant: XZ };

/// Uncompressed file
pub const UNCOMPRESSED: Self = Self {
variant: UNCOMPRESSED,
};

/// The file is compressed or not
pub const fn is_compressed(&self) -> bool {
self.variant.is_compressed()
}

/// Given a `Stream`, create a `Stream` which data are decompressed with `FileCompressionType`.
pub fn convert_stream<T: Stream<Item = Result<Bytes>> + Unpin + Send + 'static>(
&self,
Expand All @@ -111,31 +124,29 @@ impl FileCompressionType {
None => Into::<DataFusionError>::into(e),
};

Ok(match self {
Ok(match self.variant {
#[cfg(feature = "compression")]
FileCompressionType::GZIP => Box::new(
GZIP => Box::new(
ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s)))
.map_err(err_converter),
),
#[cfg(feature = "compression")]
FileCompressionType::BZIP2 => Box::new(
BZIP2 => Box::new(
ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s)))
.map_err(err_converter),
),
#[cfg(feature = "compression")]
FileCompressionType::XZ => Box::new(
XZ => Box::new(
ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s)))
.map_err(err_converter),
),
#[cfg(not(feature = "compression"))]
FileCompressionType::GZIP
| FileCompressionType::BZIP2
| FileCompressionType::XZ => {
GZIP | BZIP2 | XZ => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
}
FileCompressionType::UNCOMPRESSED => Box::new(s),
UNCOMPRESSED => Box::new(s),
})
}

Expand All @@ -144,22 +155,20 @@ impl FileCompressionType {
&self,
r: T,
) -> Result<Box<dyn std::io::Read + Send>> {
Ok(match self {
Ok(match self.variant {
#[cfg(feature = "compression")]
FileCompressionType::GZIP => Box::new(GzDecoder::new(r)),
GZIP => Box::new(GzDecoder::new(r)),
#[cfg(feature = "compression")]
FileCompressionType::BZIP2 => Box::new(BzDecoder::new(r)),
BZIP2 => Box::new(BzDecoder::new(r)),
#[cfg(feature = "compression")]
FileCompressionType::XZ => Box::new(XzDecoder::new(r)),
XZ => Box::new(XzDecoder::new(r)),
#[cfg(not(feature = "compression"))]
FileCompressionType::GZIP
| FileCompressionType::BZIP2
| FileCompressionType::XZ => {
GZIP | BZIP2 | XZ => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
}
FileCompressionType::UNCOMPRESSED => Box::new(r),
UNCOMPRESSED => Box::new(r),
})
}
}
Expand Down Expand Up @@ -213,8 +222,8 @@ impl FileType {

match self {
FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext, c.get_ext())),
FileType::PARQUET | FileType::AVRO => match c {
FileCompressionType::UNCOMPRESSED => Ok(ext),
FileType::PARQUET | FileType::AVRO => match c.variant {
UNCOMPRESSED => Ok(ext),
_ => Err(DataFusionError::Internal(
"FileCompressionType can be specified for CSV/JSON FileType.".into(),
)),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl ListingTableConfig {
let file_compression_type = FileCompressionType::from_str(splitted)
.unwrap_or(FileCompressionType::UNCOMPRESSED);

if file_compression_type != FileCompressionType::UNCOMPRESSED {
if file_compression_type.is_compressed() {
splitted = exts.next().unwrap_or("");
}

Expand Down
10 changes: 1 addition & 9 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,7 @@ impl TableProviderFactory for ListingTableFactory {
state: &SessionState,
cmd: &CreateExternalTable,
) -> datafusion_common::Result<Arc<dyn TableProvider>> {
let file_compression_type = FileCompressionType::from_str(
cmd.file_compression_type.as_str(),
)
.map_err(|_| {
DataFusionError::Execution(format!(
"Unknown FileCompressionType {}",
cmd.file_compression_type.as_str()
))
})?;
let file_compression_type = FileCompressionType::from(cmd.file_compression_type);
let file_type = FileType::from_str(cmd.file_type.as_str()).map_err(|_| {
DataFusionError::Execution(format!("Unknown FileType {}", cmd.file_type))
})?;
Expand Down
3 changes: 2 additions & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::{
build_join_schema, Expr, ExprSchemable, TableProviderFilterPushDown, TableSource,
};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference,
ScalarValue,
Expand Down Expand Up @@ -1491,7 +1492,7 @@ pub struct CreateExternalTable {
/// SQL used to create the table, if available
pub definition: Option<String>,
/// File compression type (GZIP, BZIP2, XZ)
pub file_compression_type: String,
pub file_compression_type: CompressionTypeVariant,
/// Table(provider) specific options
pub options: HashMap<String, String>,
}
Expand Down
4 changes: 3 additions & 1 deletion datafusion/proto/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use datafusion::{
datasource::{provider_as_source, source_as_provider},
prelude::SessionContext,
};
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{context, DataFusionError, OwnedTableReference};
use datafusion_expr::logical_plan::{builder::project, Prepare};
use datafusion_expr::{
Expand All @@ -51,6 +52,7 @@ use datafusion_expr::{
use prost::bytes::BufMut;
use prost::Message;
use std::fmt::Debug;
use std::str::FromStr;
use std::sync::Arc;

fn byte_to_string(b: u8) -> Result<String, DataFusionError> {
Expand Down Expand Up @@ -607,7 +609,7 @@ impl AsLogicalPlan for LogicalPlanNode {
.table_partition_cols
.clone(),
if_not_exists: create_extern_table.if_not_exists,
file_compression_type: create_extern_table.file_compression_type.to_string(),
file_compression_type: CompressionTypeVariant::from_str(&create_extern_table.file_compression_type).map_err(|_| DataFusionError::NotImplemented(format!("Unsupported file compression type {}", create_extern_table.file_compression_type)))?,
definition,
options: create_extern_table.options.clone(),
}))
Expand Down
Loading

0 comments on commit c2f199a

Please sign in to comment.