Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement physical plan serialization for csv COPY plans , add as_any, Debug to FileFormatFactory #11588

Merged
merged 4 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion datafusion-examples/examples/custom_file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl FileFormat for TSVFileFormat {
}
}

#[derive(Default)]
#[derive(Default, Debug)]
/// Factory for creating TSV file formats
///
/// This factory is a wrapper around the CSV file format factory
Expand Down Expand Up @@ -166,6 +166,10 @@ impl FileFormatFactory for TSVFileFactory {
fn default(&self) -> std::sync::Arc<dyn FileFormat> {
todo!()
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl GetExt for TSVFileFactory {
Expand Down
6 changes: 5 additions & 1 deletion datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ const INITIAL_BUFFER_BYTES: usize = 1048576;
/// If the buffered Arrow data exceeds this size, it is flushed to object store
const BUFFER_FLUSH_BYTES: usize = 1024000;

#[derive(Default)]
#[derive(Default, Debug)]
/// Factory struct used to create [ArrowFormat]
pub struct ArrowFormatFactory;

Expand All @@ -89,6 +89,10 @@ impl FileFormatFactory for ArrowFormatFactory {
fn default(&self) -> Arc<dyn FileFormat> {
Arc::new(ArrowFormat)
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl GetExt for ArrowFormatFactory {
Expand Down
11 changes: 11 additions & 0 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;

use arrow::datatypes::Schema;
Expand Down Expand Up @@ -64,6 +65,16 @@ impl FileFormatFactory for AvroFormatFactory {
fn default(&self) -> Arc<dyn FileFormat> {
Arc::new(AvroFormat)
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl fmt::Debug for AvroFormatFactory {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AvroFormatFactory").finish()
}
}

impl GetExt for AvroFormatFactory {
Expand Down
15 changes: 14 additions & 1 deletion datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore}
#[derive(Default)]
/// Factory struct used to create [CsvFormatFactory]
pub struct CsvFormatFactory {
options: Option<CsvOptions>,
/// the options for csv file read
pub options: Option<CsvOptions>,
}

impl CsvFormatFactory {
Expand All @@ -75,6 +76,14 @@ impl CsvFormatFactory {
}
}

impl fmt::Debug for CsvFormatFactory {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CsvFormatFactory")
.field("options", &self.options)
.finish()
}
}

impl FileFormatFactory for CsvFormatFactory {
fn create(
&self,
Expand Down Expand Up @@ -103,6 +112,10 @@ impl FileFormatFactory for CsvFormatFactory {
fn default(&self) -> Arc<dyn FileFormat> {
Arc::new(CsvFormat::default())
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl GetExt for CsvFormatFactory {
Expand Down
12 changes: 12 additions & 0 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ impl FileFormatFactory for JsonFormatFactory {
fn default(&self) -> Arc<dyn FileFormat> {
Arc::new(JsonFormat::default())
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl GetExt for JsonFormatFactory {
Expand All @@ -111,6 +115,14 @@ impl GetExt for JsonFormatFactory {
}
}

impl fmt::Debug for JsonFormatFactory {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("JsonFormatFactory")
.field("options", &self.options)
.finish()
}
}

/// New line delimited JSON `FileFormat` implementation.
#[derive(Debug, Default)]
pub struct JsonFormat {
Expand Down
16 changes: 13 additions & 3 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use async_trait::async_trait;
use file_compression_type::FileCompressionType;
use object_store::{ObjectMeta, ObjectStore};

use std::fmt::Debug;
/// Factory for creating [`FileFormat`] instances based on session and command level options
///
/// Users can provide their own `FileFormatFactory` to support arbitrary file formats
pub trait FileFormatFactory: Sync + Send + GetExt {
pub trait FileFormatFactory: Sync + Send + GetExt + Debug {
/// Initialize a [FileFormat] and configure based on session and command level options
fn create(
&self,
Expand All @@ -63,6 +63,10 @@ pub trait FileFormatFactory: Sync + Send + GetExt {

/// Initialize a [FileFormat] with all options set to default values
fn default(&self) -> Arc<dyn FileFormat>;

/// Returns the table source as [`Any`] so that it can be
Lordworms marked this conversation as resolved.
Show resolved Hide resolved
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
}

/// This trait abstracts all the file format specific implementations
Expand Down Expand Up @@ -138,6 +142,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
/// The former trait is a superset of the latter trait, which includes execution time
/// relevant methods. [FileType] is only used in logical planning and only implements
/// the subset of methods required during logical planning.
#[derive(Debug)]
pub struct DefaultFileType {
file_format_factory: Arc<dyn FileFormatFactory>,
}
Expand All @@ -149,6 +154,11 @@ impl DefaultFileType {
file_format_factory,
}
}

/// get a reference to the inner [FileFormatFactory] struct
pub fn as_format_factory(&self) -> &Arc<dyn FileFormatFactory> {
&self.file_format_factory
}
}

impl FileType for DefaultFileType {
Expand All @@ -159,7 +169,7 @@ impl FileType for DefaultFileType {

impl Display for DefaultFileType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.file_format_factory.default().fmt(f)
write!(f, "{:?}", self.file_format_factory)
}
}

Expand Down
11 changes: 11 additions & 0 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ impl FileFormatFactory for ParquetFormatFactory {
fn default(&self) -> Arc<dyn FileFormat> {
Arc::new(ParquetFormat::default())
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl GetExt for ParquetFormatFactory {
Expand All @@ -149,6 +153,13 @@ impl GetExt for ParquetFormatFactory {
}
}

impl fmt::Debug for ParquetFormatFactory {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ParquetFormatFactory")
.field("ParquetFormatFactory", &self.options)
.finish()
}
}
/// The Apache Parquet `FileFormat` implementation
#[derive(Debug, Default)]
pub struct ParquetFormat {
Expand Down
Loading