Skip to content

Commit

Permalink
Implement physical plan serialization for csv COPY plans , add `as_an…
Browse files Browse the repository at this point in the history
…y`, `Debug` to `FileFormatFactory` (#11588)

* Implement physical plan serialization for COPY plans CsvLogicalExtensionCodec

* fix check

* optimize code

* optimize code
  • Loading branch information
Lordworms authored Jul 23, 2024
1 parent 6d35217 commit 6d8bd2c
Show file tree
Hide file tree
Showing 10 changed files with 251 additions and 30 deletions.
6 changes: 5 additions & 1 deletion datafusion-examples/examples/custom_file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl FileFormat for TSVFileFormat {
}
}

#[derive(Default)]
#[derive(Default, Debug)]
/// Factory for creating TSV file formats
///
/// This factory is a wrapper around the CSV file format factory
Expand Down Expand Up @@ -166,6 +166,10 @@ impl FileFormatFactory for TSVFileFactory {
fn default(&self) -> std::sync::Arc<dyn FileFormat> {
todo!()
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl GetExt for TSVFileFactory {
Expand Down
6 changes: 5 additions & 1 deletion datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ const INITIAL_BUFFER_BYTES: usize = 1048576;
/// If the buffered Arrow data exceeds this size, it is flushed to object store
const BUFFER_FLUSH_BYTES: usize = 1024000;

#[derive(Default)]
#[derive(Default, Debug)]
/// Factory struct used to create [ArrowFormat]
pub struct ArrowFormatFactory;

Expand All @@ -89,6 +89,10 @@ impl FileFormatFactory for ArrowFormatFactory {
fn default(&self) -> Arc<dyn FileFormat> {
Arc::new(ArrowFormat)
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl GetExt for ArrowFormatFactory {
Expand Down
11 changes: 11 additions & 0 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;

use arrow::datatypes::Schema;
Expand Down Expand Up @@ -64,6 +65,16 @@ impl FileFormatFactory for AvroFormatFactory {
fn default(&self) -> Arc<dyn FileFormat> {
Arc::new(AvroFormat)
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl fmt::Debug for AvroFormatFactory {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AvroFormatFactory").finish()
}
}

impl GetExt for AvroFormatFactory {
Expand Down
15 changes: 14 additions & 1 deletion datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore}
#[derive(Default)]
/// Factory struct used to create [CsvFormatFactory]
pub struct CsvFormatFactory {
options: Option<CsvOptions>,
/// the options for csv file read
pub options: Option<CsvOptions>,
}

impl CsvFormatFactory {
Expand All @@ -75,6 +76,14 @@ impl CsvFormatFactory {
}
}

impl fmt::Debug for CsvFormatFactory {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CsvFormatFactory")
.field("options", &self.options)
.finish()
}
}

impl FileFormatFactory for CsvFormatFactory {
fn create(
&self,
Expand Down Expand Up @@ -103,6 +112,10 @@ impl FileFormatFactory for CsvFormatFactory {
fn default(&self) -> Arc<dyn FileFormat> {
Arc::new(CsvFormat::default())
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl GetExt for CsvFormatFactory {
Expand Down
12 changes: 12 additions & 0 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ impl FileFormatFactory for JsonFormatFactory {
fn default(&self) -> Arc<dyn FileFormat> {
Arc::new(JsonFormat::default())
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl GetExt for JsonFormatFactory {
Expand All @@ -111,6 +115,14 @@ impl GetExt for JsonFormatFactory {
}
}

impl fmt::Debug for JsonFormatFactory {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("JsonFormatFactory")
.field("options", &self.options)
.finish()
}
}

/// New line delimited JSON `FileFormat` implementation.
#[derive(Debug, Default)]
pub struct JsonFormat {
Expand Down
16 changes: 13 additions & 3 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use async_trait::async_trait;
use file_compression_type::FileCompressionType;
use object_store::{ObjectMeta, ObjectStore};

use std::fmt::Debug;
/// Factory for creating [`FileFormat`] instances based on session and command level options
///
/// Users can provide their own `FileFormatFactory` to support arbitrary file formats
pub trait FileFormatFactory: Sync + Send + GetExt {
pub trait FileFormatFactory: Sync + Send + GetExt + Debug {
/// Initialize a [FileFormat] and configure based on session and command level options
fn create(
&self,
Expand All @@ -63,6 +63,10 @@ pub trait FileFormatFactory: Sync + Send + GetExt {

/// Initialize a [FileFormat] with all options set to default values
fn default(&self) -> Arc<dyn FileFormat>;

/// Returns the table source as [`Any`] so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
}

/// This trait abstracts all the file format specific implementations
Expand Down Expand Up @@ -138,6 +142,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
/// The former trait is a superset of the latter trait, which includes execution time
/// relevant methods. [FileType] is only used in logical planning and only implements
/// the subset of methods required during logical planning.
#[derive(Debug)]
pub struct DefaultFileType {
file_format_factory: Arc<dyn FileFormatFactory>,
}
Expand All @@ -149,6 +154,11 @@ impl DefaultFileType {
file_format_factory,
}
}

/// get a reference to the inner [FileFormatFactory] struct
pub fn as_format_factory(&self) -> &Arc<dyn FileFormatFactory> {
&self.file_format_factory
}
}

impl FileType for DefaultFileType {
Expand All @@ -159,7 +169,7 @@ impl FileType for DefaultFileType {

impl Display for DefaultFileType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.file_format_factory.default().fmt(f)
write!(f, "{:?}", self.file_format_factory)
}
}

Expand Down
11 changes: 11 additions & 0 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ impl FileFormatFactory for ParquetFormatFactory {
fn default(&self) -> Arc<dyn FileFormat> {
Arc::new(ParquetFormat::default())
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl GetExt for ParquetFormatFactory {
Expand All @@ -149,6 +153,13 @@ impl GetExt for ParquetFormatFactory {
}
}

impl fmt::Debug for ParquetFormatFactory {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ParquetFormatFactory")
.field("ParquetFormatFactory", &self.options)
.finish()
}
}
/// The Apache Parquet `FileFormat` implementation
#[derive(Debug, Default)]
pub struct ParquetFormat {
Expand Down
Loading

0 comments on commit 6d8bd2c

Please sign in to comment.