Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 51 additions & 32 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,25 @@ use itertools::Itertools;
use object_store::ObjectStore;

/// Configuration for creating a [`ListingTable`]
///
///
#[derive(Debug, Clone)]
pub struct ListingTableConfig {
/// Paths on the `ObjectStore` for creating `ListingTable`.
/// They should share the same schema and object store.
pub table_paths: Vec<ListingTableUrl>,
/// Optional `SchemaRef` for the to be created `ListingTable`.
///
/// See details on [`ListingTableConfig::with_schema`]
pub file_schema: Option<SchemaRef>,
/// Optional `ListingOptions` for the to be created `ListingTable`.
/// Optional [`ListingOptions`] for the to be created [`ListingTable`].
///
/// See details on [`ListingTableConfig::with_listing_options`]
pub options: Option<ListingOptions>,
}

impl ListingTableConfig {
/// Creates new [`ListingTableConfig`].
///
/// The [`SchemaRef`] and [`ListingOptions`] are inferred based on
/// the suffix of the provided `table_paths` first element.
/// Creates new [`ListingTableConfig`] for reading the specified URL
pub fn new(table_path: ListingTableUrl) -> Self {
let table_paths = vec![table_path];
Self {
Expand All @@ -89,16 +92,24 @@ impl ListingTableConfig {

/// Creates new [`ListingTableConfig`] with multiple table paths.
///
/// The [`SchemaRef`] and [`ListingOptions`] are inferred based on
/// the suffix of the provided `table_paths` first element.
/// See [`Self::infer_options`] for details on what happens with multiple paths
pub fn new_with_multi_paths(table_paths: Vec<ListingTableUrl>) -> Self {
Self {
table_paths,
file_schema: None,
options: None,
}
}
/// Add `schema` to [`ListingTableConfig`]
/// Set the `schema` for the overall [`ListingTable`]
///
/// [`ListingTable`] will automatically coerce, when possible, the schema
/// for individual files to match this schema.
///
/// If a schema is not provided, it is inferred using
/// [`Self::infer_schema`].
///
/// If the schema is provided, it must contain only the fields in the file
/// without the table partitioning columns.
pub fn with_schema(self, schema: SchemaRef) -> Self {
Self {
table_paths: self.table_paths,
Expand All @@ -108,6 +119,9 @@ impl ListingTableConfig {
}

/// Add `listing_options` to [`ListingTableConfig`]
///
/// If not provided, format and other options are inferred via
/// [`Self::infer_options`].
pub fn with_listing_options(self, listing_options: ListingOptions) -> Self {
Self {
table_paths: self.table_paths,
Expand All @@ -116,7 +130,7 @@ impl ListingTableConfig {
}
}

///Returns a tupe of (file_extension, optional compression_extension)
/// Returns a tuple of `(file_extension, optional compression_extension)`
///
/// For example a path ending with blah.test.csv.gz returns `("csv", Some("gz"))`
/// For example a path ending with blah.test.csv returns `("csv", None)`
Expand All @@ -138,7 +152,9 @@ impl ListingTableConfig {
}
}

/// Infer `ListingOptions` based on `table_path` suffix.
/// Infer `ListingOptions` based on `table_path` and file suffix.
///
/// The format is inferred based on the first `table_path`.
pub async fn infer_options(self, state: &dyn Session) -> Result<Self> {
let store = if let Some(url) = self.table_paths.first() {
state.runtime_env().object_store(url)?
Expand Down Expand Up @@ -192,7 +208,13 @@ impl ListingTableConfig {
})
}

/// Infer the [`SchemaRef`] based on `table_path` suffix. Requires `self.options` to be set prior to using.
/// Infer the [`SchemaRef`] based on `table_path`s.
///
/// This method infers the table schema using the first `table_path`.
/// See [`ListingOptions::infer_schema`] for more details
///
/// # Errors
/// * if `self.options` is not set. See [`Self::with_listing_options`]
pub async fn infer_schema(self, state: &dyn Session) -> Result<Self> {
match self.options {
Some(options) => {
Expand All @@ -212,12 +234,15 @@ impl ListingTableConfig {
}
}

/// Convenience wrapper for calling `infer_options` and `infer_schema`
/// Convenience method to call both [`Self::infer_options`] and [`Self::infer_schema`]
pub async fn infer(self, state: &dyn Session) -> Result<Self> {
self.infer_options(state).await?.infer_schema(state).await
}

/// Infer the partition columns from the path. Requires `self.options` to be set prior to using.
/// Infer the partition columns from `table_paths`.
///
/// # Errors
/// * if `self.options` is not set. See [`Self::with_listing_options`]
pub async fn infer_partitions_from_path(self, state: &dyn Session) -> Result<Self> {
match self.options {
Some(options) => {
Expand Down Expand Up @@ -277,6 +302,7 @@ pub struct ListingOptions {
/// parquet metadata.
///
/// See <https://github.com/apache/datafusion/issues/4177>
///
/// NOTE: This attribute stores all equivalent orderings (the outer `Vec`)
/// where each ordering consists of an individual lexicographic
/// ordering (encapsulated by a `Vec<Expr>`). If there aren't
Expand Down Expand Up @@ -479,11 +505,13 @@ impl ListingOptions {
}

/// Infer the schema of the files at the given path on the provided object store.
/// The inferred schema does not include the partitioning columns.
///
/// This method will not be called by the table itself but before creating it.
/// This way when creating the logical plan we can decide to resolve the schema
/// locally or ask a remote service to do it (e.g a scheduler).
/// If the table_path contains one or more files (i.e. it is a directory /
/// prefix of files) their schema is merged by calling [`FileFormat::infer_schema`]
///
/// Note: The inferred schema does not include any partitioning columns.
///
/// This method is called as part of creating a [`ListingTable`].
pub async fn infer_schema<'a>(
&'a self,
state: &dyn Session,
Expand Down Expand Up @@ -656,16 +684,14 @@ impl ListingOptions {
/// `ListingTable` also supports limit, filter and projection pushdown for formats that
/// support it as such as Parquet.
///
/// # Implementation
/// # See Also
///
/// `ListingTable` Uses [`DataSourceExec`] to execute the data. See that struct
/// for more details.
/// 1. [`ListingTableConfig`]: Configuration options
/// 1. [`DataSourceExec`]: `ExecutionPlan` used by `ListingTable`
///
/// [`DataSourceExec`]: crate::datasource::source::DataSourceExec
///
/// # Example
///
/// To read a directory of parquet files using a [`ListingTable`]:
/// # Example: Read a directory of parquet files using a [`ListingTable`]
///
/// ```no_run
/// # use datafusion::prelude::SessionContext;
Expand Down Expand Up @@ -731,16 +757,9 @@ pub struct ListingTable {
}

impl ListingTable {
/// Create new [`ListingTable`] that lists the FS to get the files
/// to scan. See [`ListingTable`] for and example.
///
/// Takes a `ListingTableConfig` as input which requires an `ObjectStore` and `table_path`.
/// `ListingOptions` and `SchemaRef` are optional. If they are not
/// provided the file type is inferred based on the file suffix.
/// If the schema is provided then it must be resolved before creating the table
/// and should contain the fields of the file without the table
/// partitioning columns.
/// Create new [`ListingTable`]
///
/// See documentation and example on [`ListingTable`] and [`ListingTableConfig`]
pub fn try_new(config: ListingTableConfig) -> Result<Self> {
let file_schema = config
.file_schema
Expand Down
19 changes: 11 additions & 8 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,14 +311,17 @@
//! ```
//!
//! A [`TableProvider`] provides information for planning and
//! an [`ExecutionPlan`]s for execution. DataFusion includes [`ListingTable`]
//! which supports reading several common file formats, and you can support any
//! new file format by implementing the [`TableProvider`] trait. See also:
//!
//! 1. [`ListingTable`]: Reads data from Parquet, JSON, CSV, or AVRO
//! files. Supports single files or multiple files with HIVE style
//! partitioning, optional compression, directly reading from remote
//! object store and more.
//! an [`ExecutionPlan`]s for execution. DataFusion includes [`ListingTable`],
//! a [`TableProvider`] which reads individual files or directories of files
//! ("partitioned datasets") of several common file formats. Uses can add
//! support for new file formats by implementing the [`TableProvider`]
//! trait.
//!
//! See also:
//!
//! 1. [`ListingTable`]: Reads data from one or more Parquet, JSON, CSV, or AVRO
//! files supporting HIVE style partitioning, optional compression, directly
//! reading from remote object store and more.
//!
//! 2. [`MemTable`]: Reads data from in memory [`RecordBatch`]es.
//!
Expand Down