Skip to content

Commit

Permalink
feat: add delta planner
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Aug 4, 2024
1 parent 3ca0de0 commit 6488b82
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 0 deletions.
1 change: 1 addition & 0 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub mod cdf;
pub mod expr;
pub mod logical;
pub mod physical;
pub mod planner;

mod find_files;
mod schema_adapter;
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/delta_datafusion/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,7 @@ pub(crate) fn find_metric_node(

None
}

pub(crate) fn get_metric(metrics: &MetricsSet, name: &str) -> usize {
metrics.sum_by_name(name).map(|m| m.as_usize()).unwrap_or(0)
}
59 changes: 59 additions & 0 deletions crates/core/src/delta_datafusion/planner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//! Custom planners for datafusion so that you can convert custom nodes, can be used
//! to trace custom metrics in an operation
//!
//! # Example
//!
//! #[derive(Clone)]
//! struct MergeMetricExtensionPlanner {}
//!
//! #[async_trait]
//! impl ExtensionPlanner for MergeMetricExtensionPlanner {
//! async fn plan_extension(
//! &self,
//! planner: &dyn PhysicalPlanner,
//! node: &dyn UserDefinedLogicalNode,
//! _logical_inputs: &[&LogicalPlan],
//! physical_inputs: &[Arc<dyn ExecutionPlan>],
//! session_state: &SessionState,
//! ) -> DataFusionResult<Option<Arc<dyn ExecutionPlan>>> {}
//!
//! let merge_planner = DeltaPlanner::<MergeMetricExtensionPlanner> {
//! extension_planner: MergeMetricExtensionPlanner {}
//! };
//!
//! let state = state.with_query_planner(Arc::new(merge_planner));
use std::sync::Arc;

use crate::delta_datafusion::DataFusionResult;
use async_trait::async_trait;
use datafusion::physical_planner::PhysicalPlanner;
use datafusion::{
execution::{context::QueryPlanner, session_state::SessionState},
physical_plan::ExecutionPlan,
physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner},
};
use datafusion_expr::LogicalPlan;

/// Deltaplanner
pub struct DeltaPlanner<T: ExtensionPlanner> {
/// custom extension planner
pub extension_planner: T,
}

#[async_trait]
impl<T: ExtensionPlanner + std::marker::Send + Sync + 'static + Clone> QueryPlanner
for DeltaPlanner<T>
{
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let planner = Arc::new(Box::new(DefaultPhysicalPlanner::with_extension_planners(
vec![Arc::new(self.extension_planner.clone())],
)));
planner
.create_physical_plan(logical_plan, session_state)
.await
}
}

0 comments on commit 6488b82

Please sign in to comment.