Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment: try adding recursive CTEs #4

Draft
wants to merge 4 commits into
base: gh-462
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use std::{
};

use arrow::datatypes::{DataType, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;

use crate::catalog::{
Expand Down Expand Up @@ -118,7 +119,9 @@ use datafusion_sql::{
parser::DFParser,
planner::{ContextProvider, SqlToRel},
};
use parking_lot::Mutex;
use parquet::file::properties::WriterProperties;
use tokio::sync::mpsc::Receiver as SingleChannelReceiver;
use uuid::Uuid;

use super::options::{
Expand Down Expand Up @@ -1736,6 +1739,8 @@ pub enum TaskProperties {
KVPairs(HashMap<String, String>),
}

type RelationHandler = SingleChannelReceiver<ArrowResult<RecordBatch>>;

/// Task Execution Context
pub struct TaskContext {
/// Session Id
Expand All @@ -1750,6 +1755,8 @@ pub struct TaskContext {
aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
/// Runtime environment associated with this task context
runtime: Arc<RuntimeEnv>,
/// Registered relation handlers
relation_handlers: Mutex<HashMap<String, RelationHandler>>,
}

impl TaskContext {
Expand All @@ -1769,6 +1776,7 @@ impl TaskContext {
scalar_functions,
aggregate_functions,
runtime,
relation_handlers: Mutex::new(HashMap::new()),
}
}

Expand Down Expand Up @@ -1824,6 +1832,34 @@ impl TaskContext {
pub fn runtime_env(&self) -> Arc<RuntimeEnv> {
self.runtime.clone()
}

/// Register a new relation handler. If a handler with the same name already exists
/// this function will return an error.
pub fn push_relation_handler(
&self,
name: String,
handler: RelationHandler,
) -> Result<()> {
let mut handlers = self.relation_handlers.lock();
if handlers.contains_key(&name) {
return Err(DataFusionError::Internal(format!(
"Relation handler {} already registered",
name
)));
}
handlers.insert(name, handler);
Ok(())
}

/// Retrieve the relation handler for the given name. It will remove the handler from
/// the storage if it exists, and return it as is.
pub fn pop_relation_handler(&self, name: String) -> Result<RelationHandler> {
let mut handlers = self.relation_handlers.lock();

handlers.remove(name.as_str()).ok_or_else(|| {
DataFusionError::Internal(format!("Relation handler {} not registered", name))
})
}
}

/// Create a new task context instance from SessionContext
Expand All @@ -1846,6 +1882,7 @@ impl From<&SessionContext> for TaskContext {
scalar_functions,
aggregate_functions,
runtime,
relation_handlers: Mutex::new(HashMap::new()),
}
}
}
Expand All @@ -1865,6 +1902,7 @@ impl From<&SessionState> for TaskContext {
scalar_functions,
aggregate_functions,
runtime,
relation_handlers: Mutex::new(HashMap::new()),
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ pub use datafusion_expr::{
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, EmptyRelation,
Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
UserDefinedLogicalNode, Values, Window,
NamedRelation, Partitioning, PlanType, PlanVisitor, Projection, RecursiveQuery,
Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan,
ToStringifiedPlan, Union, UserDefinedLogicalNode, Values, Window,
},
TableProviderFilterPushDown, TableSource,
};
162 changes: 162 additions & 0 deletions datafusion/core/src/physical_plan/continuance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Defines the continuance query plan

use std::any::Any;
use std::sync::Arc;

use crate::error::{DataFusionError, Result};
use crate::physical_plan::{
DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
};
use arrow::datatypes::SchemaRef;

use super::expressions::PhysicalSortExpr;
use super::stream::RecordBatchReceiverStream;
use super::{
metrics::{ExecutionPlanMetricsSet, MetricsSet},
SendableRecordBatchStream, Statistics,
};

use crate::execution::context::TaskContext;

/// A temporary "working table" operation wehre the input data will be
/// taken from the named handle during the execution and will be re-published
/// as is (kind of like a mirror).
///
/// Most notably used in the implementation of recursive queries where the
/// underlying relation does not exist yet but the data will come as the previous
/// term is evaluated.
#[derive(Debug)]
pub struct ContinuanceExec {
/// Name of the relation handler
name: String,
/// The schema of the stream
schema: SchemaRef,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}

impl ContinuanceExec {
/// Create a new execution plan for a continuance stream. The given relation
/// handler must exist in the task context before calling [`execute`] on this
/// plan.
pub fn new(name: String, schema: SchemaRef) -> Self {
Self {
name,
schema,
metrics: ExecutionPlanMetricsSet::new(),
}
}
}

impl ExecutionPlan for ContinuanceExec {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}

fn required_child_distribution(&self) -> Distribution {
Distribution::UnspecifiedDistribution
}

fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
}

fn relies_on_input_order(&self) -> bool {
false
}

fn maintains_input_order(&self) -> bool {
false
}

fn benefits_from_input_partitioning(&self) -> bool {
false
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(ContinuanceExec::new(
self.name.clone(),
self.schema.clone(),
)))
}

/// This plan does not come with any special streams, but rather we use
/// the existing [`RecordBatchReceiverStream`] to receive the data from
/// the registered handle.
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
// Continuance streams must be the plan base.
if partition != 0 {
return Err(DataFusionError::Internal(format!(
"ContinuanceExec got an invalid partition {} (expected 0)",
partition
)));
}

// The relation handler must be already registered by the
// parent op.
let receiver = context.pop_relation_handler(self.name.clone())?;
Ok(RecordBatchReceiverStream::create_without_handle(
&self.schema,
receiver,
))
}

fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(f, "ContinuanceExec: name={}", self.name)
}
}
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Statistics {
Statistics::default()
}
}

#[cfg(test)]
mod tests {}
2 changes: 2 additions & 0 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ pub mod analyze;
pub mod coalesce_batches;
pub mod coalesce_partitions;
pub mod common;
pub mod continuance;
pub mod cross_join;
pub mod display;
pub mod empty;
Expand All @@ -563,6 +564,7 @@ pub mod memory;
pub mod metrics;
pub mod planner;
pub mod projection;
pub mod recursive_query;
pub mod repartition;
pub mod sort_merge_join;
pub mod sorts;
Expand Down
29 changes: 27 additions & 2 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use crate::datasource::source_as_provider;
use crate::execution::context::{ExecutionProps, SessionState};
use crate::logical_expr::utils::generate_sort_key;
use crate::logical_plan::plan::{
Aggregate, Distinct, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias,
TableScan, Window,
Aggregate, Distinct, EmptyRelation, Filter, Join, NamedRelation, Projection, Sort,
SubqueryAlias, TableScan, Window,
};
use crate::logical_plan::{
unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan,
Expand All @@ -40,13 +40,15 @@ use crate::logical_plan::{Limit, Values};
use crate::physical_expr::create_physical_expr;
use crate::physical_optimizer::optimizer::PhysicalOptimizerRule;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use crate::physical_plan::continuance::ContinuanceExec;
use crate::physical_plan::cross_join::CrossJoinExec;
use crate::physical_plan::explain::ExplainExec;
use crate::physical_plan::expressions::{Column, PhysicalSortExpr};
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::hash_join::HashJoinExec;
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::recursive_query::RecursiveQueryExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::windows::WindowAggExec;
Expand All @@ -61,6 +63,7 @@ use arrow::datatypes::{Schema, SchemaRef};
use async_trait::async_trait;
use datafusion_common::ScalarValue;
use datafusion_expr::expr::GroupingSet;
use datafusion_expr::logical_plan::RecursiveQuery;
use datafusion_expr::utils::{expand_wildcard, expr_to_columns};
use datafusion_expr::WindowFrameUnits;
use datafusion_physical_expr::expressions::Literal;
Expand Down Expand Up @@ -1027,6 +1030,28 @@ impl DefaultPhysicalPlanner {

Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch)))
}
LogicalPlan::RecursiveQuery(RecursiveQuery { name, static_term, recursive_term, is_distinct }) => {
let static_term = self.create_initial_plan(static_term, session_state).await?;
let recursive_term = self.create_initial_plan(recursive_term, session_state).await?;

Ok(Arc::new(RecursiveQueryExec::new(name.clone(), static_term, recursive_term, *is_distinct)))
}
LogicalPlan::NamedRelation(NamedRelation {name, schema}) => {
// Named relations is how we represent access to any sort of dynamic data provider. They
// differ from tables in the sense that they can start existing dynamically during the
// execution of a query and then disappear before it even finishes.
//
// This system allows us to replicate the tricky behavior of classical databases where a
// temporary "working table" (as it is called in Postgres) can be used when dealing with
// complex operations (such as recursive CTEs) and then can be dropped. Since DataFusion
// at its core is heavily stream-based and vectorized, we try to avoid using 'real' tables
// and let the streams take care of the data flow in this as well.

// Since the actual "input"'s will be only available to us at runtime (through task context)
// we can't really do any sort of meaningful validation here.
let schema = SchemaRef::new(schema.as_ref().to_owned().into());
Ok(Arc::new(ContinuanceExec::new(name.clone(), schema)))
}
LogicalPlan::CreateExternalTable(_) => {
// There is no default plan for "CREATE EXTERNAL
// TABLE" -- it must be handled at a higher level (so
Expand Down
Loading