Skip to content

Commit

Permalink
Move remaining code out of legacy core/logical_plan module (#2701)
Browse files Browse the repository at this point in the history
* move DefaultTableSource to datasource module

* add public re-exports

* clippy
  • Loading branch information
andygrove authored Jun 6, 2022
1 parent bbb674a commit 352f8b2
Show file tree
Hide file tree
Showing 11 changed files with 151 additions and 127 deletions.
26 changes: 13 additions & 13 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,30 @@

//! DataFrame API for building and executing query plans.
use crate::arrow::record_batch::RecordBatch;
use crate::error::Result;
use crate::logical_plan::{
col, DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan, LogicalPlanBuilder,
Partitioning,
};
use parquet::file::properties::WriterProperties;
use std::sync::Arc;

use crate::physical_plan::SendableRecordBatchStream;
use async_trait::async_trait;

use crate::arrow::datatypes::Schema;
use crate::arrow::datatypes::SchemaRef;
use crate::arrow::record_batch::RecordBatch;
use crate::arrow::util::pretty;
use crate::datasource::TableProvider;
use crate::execution::context::{SessionState, TaskContext};
use crate::error::Result;
use crate::execution::{
context::{SessionState, TaskContext},
FunctionRegistry,
};
use crate::logical_expr::{utils::find_window_exprs, TableType};
use crate::logical_plan::{
col, DFSchema, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Partitioning,
};
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{collect, collect_partitioned};
use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan};
use crate::scalar::ScalarValue;
use async_trait::async_trait;
use parking_lot::RwLock;
use parquet::file::properties::WriterProperties;
use std::any::Any;
use std::sync::Arc;

/// DataFrame represents a logical set of rows with the same named columns.
/// Similar to a [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html) or
Expand Down
87 changes: 87 additions & 0 deletions datafusion/core/src/datasource/default_table_source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Default TableSource implementation used in DataFusion physical plans
use crate::datasource::TableProvider;
use arrow::datatypes::SchemaRef;
use datafusion_common::DataFusionError;
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableSource};
use std::any::Any;
use std::sync::Arc;

/// DataFusion default table source, wrapping TableProvider
///
/// This structure adapts a `TableProvider` (physical plan trait) to the `TableSource`
/// (logical plan trait)
pub struct DefaultTableSource {
/// table provider
pub table_provider: Arc<dyn TableProvider>,
}

impl DefaultTableSource {
/// Create a new DefaultTableSource to wrap a TableProvider
pub fn new(table_provider: Arc<dyn TableProvider>) -> Self {
Self { table_provider }
}
}

impl TableSource for DefaultTableSource {
/// Returns the table source as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any {
self
}

/// Get a reference to the schema for this table
fn schema(&self) -> SchemaRef {
self.table_provider.schema()
}

/// Tests whether the table provider can make use of a filter expression
/// to optimise data retrieval.
fn supports_filter_pushdown(
&self,
filter: &Expr,
) -> datafusion_common::Result<TableProviderFilterPushDown> {
self.table_provider.supports_filter_pushdown(filter)
}
}

/// Wrap TableProvider in TableSource
pub fn provider_as_source(
table_provider: Arc<dyn TableProvider>,
) -> Arc<dyn TableSource> {
Arc::new(DefaultTableSource::new(table_provider))
}

/// Attempt to downcast a TableSource to DefaultTableSource and access the
/// TableProvider. This will only work with a TableSource created by DataFusion.
pub fn source_as_provider(
source: &Arc<dyn TableSource>,
) -> datafusion_common::Result<Arc<dyn TableProvider>> {
match source
.as_ref()
.as_any()
.downcast_ref::<DefaultTableSource>()
{
Some(source) => Ok(source.table_provider.clone()),
_ => Err(DataFusionError::Internal(
"TableSource was not DefaultTableSource".to_string(),
)),
}
}
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#![allow(clippy::module_inception)]
pub mod datasource;
pub mod default_table_source;
pub mod empty;
pub mod file_format;
pub mod listing;
Expand All @@ -29,6 +30,9 @@ pub mod view;
use futures::Stream;

pub use self::datasource::TableProvider;
pub use self::default_table_source::{
provider_as_source, source_as_provider, DefaultTableSource,
};
use self::listing::PartitionedFile;
pub use self::memory::MemTable;
pub use self::view::ViewTable;
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ pub mod context;
pub mod disk_manager;
pub mod memory_manager;
pub mod options;
pub mod registry;
pub mod runtime_env;

pub use disk_manager::DiskManager;
pub use memory_manager::{
human_readable_size, MemoryConsumer, MemoryConsumerId, MemoryManager,
};
pub use registry::FunctionRegistry;
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

//! FunctionRegistry trait
use crate::error::Result;
use datafusion_expr::{AggregateUDF, ScalarUDF};
use std::{collections::HashSet, sync::Arc};
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! This module provides an `Expr` enum for representing expressions
//! such as `col = 5` or `SUM(col)`. See examples on the [`Expr`] struct.
//! This is a legacy module that only contains re-exports of other modules
pub use datafusion_common::{Column, ExprSchema};
pub use datafusion_expr::{expr_fn::*, lit, lit_timestamp_nano, Expr, Literal, Operator};
69 changes: 32 additions & 37 deletions datafusion/core/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,51 +15,46 @@
// specific language governing permissions and limitations
// under the License.

//! This module provides a logical query plan enum that can describe queries. Logical query
//! plans can be created from a SQL statement or built programmatically via the Table API.
//!
//! Logical query plans can then be optimized and executed directly, or translated into
//! physical query plans and executed.
//! This is a legacy module that only contains re-exports of other modules
mod expr;
pub mod plan;
mod registry;
pub mod window_frames;
pub use datafusion_common::{DFField, DFSchema, DFSchemaRef, ToDFSchema};
pub use datafusion_expr::{
expr_fn::binary_expr,
expr_rewriter,
expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion},
logical_plan::builder::{
build_join_schema, union_with_alias, LogicalPlanBuilder, UNNAMED_TABLE,
},
ExprSchemable, Operator,

pub use crate::datasource::{provider_as_source, source_as_provider};
pub use crate::execution::FunctionRegistry;
pub use datafusion_common::{
Column, DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema,
};
pub use datafusion_optimizer::expr_simplifier::{ExprSimplifiable, SimplifyInfo};
pub use expr::{
pub use datafusion_expr::{
abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii, asin, atan,
avg, bit_length, btrim, call_fn, case, ceil, character_length, chr, coalesce, col,
combine_filters, concat, concat_expr, concat_ws, concat_ws_expr, cos, count,
count_distinct, create_udaf, create_udf, date_part, date_trunc, digest, exists, exp,
expr_rewriter,
expr_rewriter::{
normalize_col, normalize_col_with_schemas, normalize_cols, replace_col,
rewrite_sort_cols_by_aggs, unnormalize_col, unnormalize_cols, ExprRewritable,
ExprRewriter, RewriteRecursion,
},
expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion},
floor, in_list, in_subquery, initcap, left, length, lit, lit_timestamp_nano, ln,
log10, log2, lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now,
now_expr, nullif, octet_length, or, power, random, regexp_match, regexp_replace,
repeat, replace, reverse, right, round, rpad, rtrim, scalar_subquery, sha224, sha256,
sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan,
to_hex, to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate,
trim, trunc, unalias, upper, when, Column, Expr, ExprSchema, Literal,
};
pub use expr_rewriter::{
normalize_col, normalize_col_with_schemas, normalize_cols, replace_col,
rewrite_sort_cols_by_aggs, unnormalize_col, unnormalize_cols, ExprRewritable,
ExprRewriter, RewriteRecursion,
};
pub use plan::{provider_as_source, source_as_provider};
pub use plan::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
CreateView, CrossJoin, DropTable, EmptyRelation, FileType, JoinConstraint, JoinType,
Limit, LogicalPlan, Offset, Partitioning, PlanType, PlanVisitor, Repartition,
StringifiedPlan, Subquery, TableScan, ToStringifiedPlan, Union,
UserDefinedLogicalNode, Values,
log10, log2,
logical_plan::{
builder::{
build_join_schema, union_with_alias, LogicalPlanBuilder, UNNAMED_TABLE,
},
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
CreateView, CrossJoin, DropTable, EmptyRelation, FileType, JoinConstraint,
JoinType, Limit, LogicalPlan, Offset, Partitioning, PlanType, PlanVisitor,
Repartition, StringifiedPlan, Subquery, TableScan, ToStringifiedPlan, Union,
UserDefinedLogicalNode, Values,
},
lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now, now_expr,
nullif, octet_length, or, power, random, regexp_match, regexp_replace, repeat,
replace, reverse, right, round, rpad, rtrim, scalar_subquery, sha224, sha256, sha384,
sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex,
to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trim,
trunc, unalias, upper, when, Expr, ExprSchemable, Literal, Operator,
};
pub use registry::FunctionRegistry;
pub use datafusion_optimizer::expr_simplifier::{ExprSimplifiable, SimplifyInfo};
75 changes: 4 additions & 71 deletions datafusion/core/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,11 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! This module contains the `LogicalPlan` enum that describes queries
//! via a logical query plan.

use super::expr::Expr;
use crate::arrow::datatypes::SchemaRef;
use crate::datasource::TableProvider;
use crate::error::DataFusionError;
pub use crate::logical_expr::{
//! This is a legacy module that only contains re-exports of other modules
pub use crate::datasource::{provider_as_source, source_as_provider, DefaultTableSource};
pub use datafusion_expr::{
logical_plan::{
display::{GraphvizVisitor, IndentVisitor},
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
Expand All @@ -33,67 +30,3 @@ pub use crate::logical_expr::{
},
TableProviderFilterPushDown, TableSource,
};
use std::any::Any;
use std::sync::Arc;

/// DataFusion default table source, wrapping TableProvider
///
/// This structure adapts a `TableProvider` (physical plan trait) to the `TableSource`
/// (logical plan trait)
pub struct DefaultTableSource {
/// table provider
pub table_provider: Arc<dyn TableProvider>,
}

impl DefaultTableSource {
/// Create a new DefaultTableSource to wrap a TableProvider
pub fn new(table_provider: Arc<dyn TableProvider>) -> Self {
Self { table_provider }
}
}

impl TableSource for DefaultTableSource {
/// Returns the table source as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any {
self
}

/// Get a reference to the schema for this table
fn schema(&self) -> SchemaRef {
self.table_provider.schema()
}

/// Tests whether the table provider can make use of a filter expression
/// to optimise data retrieval.
fn supports_filter_pushdown(
&self,
filter: &Expr,
) -> datafusion_common::Result<TableProviderFilterPushDown> {
self.table_provider.supports_filter_pushdown(filter)
}
}

/// Wrap TableProvider in TableSource
pub fn provider_as_source(
table_provider: Arc<dyn TableProvider>,
) -> Arc<dyn TableSource> {
Arc::new(DefaultTableSource::new(table_provider))
}

/// Attempt to downcast a TableSource to DefaultTableSource and access the
/// TableProvider. This will only work with a TableSource created by DataFusion.
pub fn source_as_provider(
source: &Arc<dyn TableSource>,
) -> datafusion_common::Result<Arc<dyn TableProvider>> {
match source
.as_ref()
.as_any()
.downcast_ref::<DefaultTableSource>()
{
Some(source) => Ok(source.table_provider.clone()),
_ => Err(DataFusionError::Internal(
"TableSource was not DefaultTableSource".to_string(),
)),
}
}
2 changes: 1 addition & 1 deletion datafusion/core/src/logical_plan/window_frames.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
// specific language governing permissions and limitations
// under the License.

//! Window frame types, reimported from datafusion_expr
//! This is a legacy module that only contains re-exports of other modules
pub use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};
3 changes: 2 additions & 1 deletion datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use datafusion_expr::binary_expr;
use datafusion_expr::utils::expr_to_columns;
use datafusion_physical_expr::create_physical_expr;

Expand Down Expand Up @@ -710,7 +711,7 @@ fn build_predicate_expression(
if op == Operator::And || op == Operator::Or {
let left_expr = build_predicate_expression(left, schema, required_columns)?;
let right_expr = build_predicate_expression(right, schema, required_columns)?;
return Ok(logical_plan::binary_expr(left_expr, op, right_expr));
return Ok(binary_expr(left_expr, op, right_expr));
}

let expr_builder =
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ use super::{
aggregates, empty::EmptyExec, hash_join::PartitionMode, udaf, union::UnionExec,
values::ValuesExec, windows,
};
use crate::datasource::source_as_provider;
use crate::execution::context::{ExecutionProps, SessionState};
use crate::logical_expr::utils::generate_sort_key;
use crate::logical_plan::plan::{
source_as_provider, Aggregate, EmptyRelation, Filter, Join, Projection, Sort,
SubqueryAlias, TableScan, Window,
Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, TableScan,
Window,
};
use crate::logical_plan::{
unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan,
Expand Down

0 comments on commit 352f8b2

Please sign in to comment.