Skip to content

Commit

Permalink
feat: use delta planner in delete and merge
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco authored and rtyler committed Aug 6, 2024
1 parent 7f28ddf commit a6aeeca
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 80 deletions.
8 changes: 4 additions & 4 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,10 +523,10 @@ impl<'a> DeltaScanBuilder<'a> {
None => DeltaScanConfigBuilder::new().build(self.snapshot)?,
};

let schema = config
.schema
.clone()
.unwrap_or(self.snapshot.arrow_schema()?);
let schema = match config.schema.clone() {
Some(value) => Ok(value),
None => self.snapshot.arrow_schema(),
}?;

let logical_schema = df_logical_schema(self.snapshot, &config)?;

Expand Down
4 changes: 1 addition & 3 deletions crates/core/src/delta_datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ pub struct DeltaPlanner<T: ExtensionPlanner> {
}

#[async_trait]
impl<T: ExtensionPlanner + std::marker::Send + Sync + 'static + Clone> QueryPlanner
for DeltaPlanner<T>
{
impl<T: ExtensionPlanner + Send + Sync + 'static + Clone> QueryPlanner for DeltaPlanner<T> {
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
Expand Down
122 changes: 86 additions & 36 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,22 @@
//! .await?;
//! ````
use crate::delta_datafusion::logical::MetricObserver;
use crate::delta_datafusion::physical::{find_metric_node, get_metric, MetricObserverExec};
use crate::delta_datafusion::planner::DeltaPlanner;
use crate::logstore::LogStoreRef;
use core::panic;
use async_trait::async_trait;
use datafusion::dataframe::DataFrame;
use datafusion::datasource::provider_as_source;
use datafusion::error::Result as DataFusionResult;
use datafusion::execution::context::{SessionContext, SessionState};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::metrics::MetricBuilder;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
use datafusion::prelude::Expr;
use datafusion_common::scalar::ScalarValue;
use datafusion_common::DFSchema;
use datafusion_expr::{lit, LogicalPlanBuilder};
use datafusion_physical_expr::{
expressions::{self},
PhysicalExpr,
};
use datafusion_common::ScalarValue;
use datafusion_expr::{lit, Extension, LogicalPlan, LogicalPlanBuilder, UserDefinedLogicalNode};

use futures::future::BoxFuture;
use std::sync::Arc;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
Expand All @@ -43,19 +43,22 @@ use serde::Serialize;
use super::cdc::should_write_cdc;
use super::datafusion_utils::Expression;
use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL};
use super::write::{execute_non_empty_expr_cdc, WriterStatsConfig};
use super::write::WriterStatsConfig;

use crate::delta_datafusion::expr::fmt_expr_to_sql;
use crate::delta_datafusion::{
find_files, register_store, DataFusionMixins, DeltaScanBuilder, DeltaScanConfigBuilder,
DeltaSessionContext, DeltaTableProvider,
find_files, register_store, DataFusionMixins, DeltaScanConfigBuilder, DeltaSessionContext,
DeltaTableProvider,
};
use crate::errors::DeltaResult;
use crate::kernel::{Action, Add, Remove};
use crate::operations::write::{write_execution_plan, SchemaMode};
use crate::operations::write::{write_execution_plan, write_execution_plan_cdc, SchemaMode};
use crate::protocol::DeltaOperation;
use crate::table::state::DeltaTableState;
use crate::DeltaTable;
use crate::{DeltaTable, DeltaTableError};

const SOURCE_COUNT_ID: &str = "delete_source_count";
const SOURCE_COUNT_METRIC: &str = "num_source_rows";

/// Delete Records from the Delta Table.
/// See this module's documentation for more information
Expand All @@ -82,9 +85,9 @@ pub struct DeleteMetrics {
/// Number of files removed
pub num_removed_files: usize,
/// Number of rows removed
pub num_deleted_rows: Option<usize>,
pub num_deleted_rows: usize,
/// Number of rows copied in the process of deleting files
pub num_copied_rows: Option<usize>,
pub num_copied_rows: usize,
/// Time taken to execute the entire operation
pub execution_time_ms: u64,
/// Time taken to scan the file for matches
Expand Down Expand Up @@ -133,6 +136,36 @@ impl DeleteBuilder {
}
}

#[derive(Clone)]
struct DeleteMetricExtensionPlanner {}

#[async_trait]
impl ExtensionPlanner for DeleteMetricExtensionPlanner {
async fn plan_extension(
&self,
_planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
_logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
_session_state: &SessionState,
) -> DataFusionResult<Option<Arc<dyn ExecutionPlan>>> {
if let Some(metric_observer) = node.as_any().downcast_ref::<MetricObserver>() {
if metric_observer.id.eq(SOURCE_COUNT_ID) {
return Ok(Some(MetricObserverExec::try_new(
SOURCE_COUNT_ID.into(),
physical_inputs,
|batch, metrics| {
MetricBuilder::new(metrics)
.global_counter(SOURCE_COUNT_METRIC)
.add(batch.num_rows());
},
)?));
}
}
Ok(None)
}
}

#[allow(clippy::too_many_arguments)]
async fn excute_non_empty_expr(
snapshot: &DeltaTableState,
Expand All @@ -149,19 +182,33 @@ async fn excute_non_empty_expr(
let mut actions: Vec<Action> = Vec::new();
let table_partition_cols = snapshot.metadata().partition_columns.clone();

let delete_planner = DeltaPlanner::<DeleteMetricExtensionPlanner> {
extension_planner: DeleteMetricExtensionPlanner {},
};

let state = state.clone().with_query_planner(Arc::new(delete_planner));

let scan_config = DeltaScanConfigBuilder::default()
.with_file_column(false)
.with_schema(snapshot.input_schema()?)
.build(&snapshot)?;
.build(snapshot)?;

let target_provider = Arc::new(
DeltaTableProvider::try_new(snapshot.clone(), log_store.clone(), scan_config.clone())?
.with_files(rewrite.to_vec()),
);
let target_provider = provider_as_source(target_provider);
let plan = LogicalPlanBuilder::scan("target", target_provider.clone(), None)?.build()?;
let source = LogicalPlanBuilder::scan("target", target_provider.clone(), None)?.build()?;

let source = LogicalPlan::Extension(Extension {
node: Arc::new(MetricObserver {
id: "delete_source_count".into(),
input: source,
enable_pushdown: false,
}),
});

let df = DataFrame::new(state.clone(), plan);
let df = DataFrame::new(state.clone(), source);

let writer_stats_config = WriterStatsConfig::new(
snapshot.table_config().num_indexed_cols(),
Expand Down Expand Up @@ -199,16 +246,19 @@ async fn excute_non_empty_expr(

actions.extend(add_actions);

let read_records = Some(df.clone().count().await?);
let filter_records = filter.metrics().and_then(|m| m.output_rows());
let source_count = find_metric_node(SOURCE_COUNT_ID, &filter).ok_or_else(|| {
DeltaTableError::Generic("Unable to locate expected metric node".into())
})?;
let source_count_metrics = source_count.metrics().unwrap();
let read_records = get_metric(&source_count_metrics, SOURCE_COUNT_METRIC);
let filter_records = filter.metrics().and_then(|m| m.output_rows()).unwrap_or(0);

metrics.num_copied_rows = filter_records;
metrics.num_deleted_rows = read_records
.zip(filter_records)
.map(|(read, filter)| read - filter);
metrics.num_deleted_rows = read_records - filter_records;
}

// CDC logic, simply filters data with predicate and adds the _change_type="delete" as literal column
if let Ok(true) = should_write_cdc(&snapshot) {
if let Ok(true) = should_write_cdc(snapshot) {
// Create CDC scan
let change_type_lit = lit(ScalarValue::Utf8(Some("delete".to_string())));
let cdc_filter = df
Expand Down Expand Up @@ -460,8 +510,8 @@ mod tests {
assert_eq!(table.get_files_count(), 0);
assert_eq!(metrics.num_added_files, 0);
assert_eq!(metrics.num_removed_files, 1);
assert_eq!(metrics.num_deleted_rows, None);
assert_eq!(metrics.num_copied_rows, None);
assert_eq!(metrics.num_deleted_rows, 0);
assert_eq!(metrics.num_copied_rows, 0);

let commit_info = table.history(None).await.unwrap();
let last_commit = &commit_info[0];
Expand All @@ -476,8 +526,8 @@ mod tests {
assert_eq!(table.version(), 2);
assert_eq!(metrics.num_added_files, 0);
assert_eq!(metrics.num_removed_files, 0);
assert_eq!(metrics.num_deleted_rows, None);
assert_eq!(metrics.num_copied_rows, None);
assert_eq!(metrics.num_deleted_rows, 0);
assert_eq!(metrics.num_copied_rows, 0);
}

#[tokio::test]
Expand Down Expand Up @@ -548,8 +598,8 @@ mod tests {
assert_eq!(metrics.num_added_files, 1);
assert_eq!(metrics.num_removed_files, 1);
assert!(metrics.scan_time_ms > 0);
assert_eq!(metrics.num_deleted_rows, Some(1));
assert_eq!(metrics.num_copied_rows, Some(3));
assert_eq!(metrics.num_deleted_rows, 1);
assert_eq!(metrics.num_copied_rows, 3);

let commit_info = table.history(None).await.unwrap();
let last_commit = &commit_info[0];
Expand Down Expand Up @@ -703,8 +753,8 @@ mod tests {

assert_eq!(metrics.num_added_files, 0);
assert_eq!(metrics.num_removed_files, 1);
assert_eq!(metrics.num_deleted_rows, None);
assert_eq!(metrics.num_copied_rows, None);
assert_eq!(metrics.num_deleted_rows, 0);
assert_eq!(metrics.num_copied_rows, 0);
assert!(metrics.scan_time_ms > 0);

let expected = vec![
Expand Down Expand Up @@ -764,8 +814,8 @@ mod tests {

assert_eq!(metrics.num_added_files, 0);
assert_eq!(metrics.num_removed_files, 1);
assert_eq!(metrics.num_deleted_rows, Some(1));
assert_eq!(metrics.num_copied_rows, Some(0));
assert_eq!(metrics.num_deleted_rows, 1);
assert_eq!(metrics.num_copied_rows, 0);
assert!(metrics.scan_time_ms > 0);

let expected = [
Expand Down
43 changes: 12 additions & 31 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,13 @@ use std::time::Instant;
use async_trait::async_trait;
use datafusion::datasource::provider_as_source;
use datafusion::error::Result as DataFusionResult;
use datafusion::execution::context::{QueryPlanner, SessionConfig};
use datafusion::execution::context::SessionConfig;
use datafusion::logical_expr::build_join_schema;
use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
use datafusion::physical_plan::metrics::MetricBuilder;
use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
use datafusion::{
execution::context::SessionState,
physical_plan::{
metrics::{MetricBuilder, MetricsSet},
ExecutionPlan,
},
physical_plan::ExecutionPlan,
prelude::{DataFrame, SessionContext},
};
use datafusion_common::tree_node::{Transformed, TreeNode};
Expand All @@ -68,7 +66,8 @@ use super::datafusion_utils::{into_expr, maybe_into_expr, Expression};
use super::transaction::{CommitProperties, PROTOCOL};
use crate::delta_datafusion::expr::{fmt_expr_to_sql, parse_predicate_expression};
use crate::delta_datafusion::logical::MetricObserver;
use crate::delta_datafusion::physical::{find_metric_node, MetricObserverExec};
use crate::delta_datafusion::physical::{find_metric_node, get_metric, MetricObserverExec};
use crate::delta_datafusion::planner::DeltaPlanner;
use crate::delta_datafusion::{
execute_plan_to_batch, register_store, DeltaColumn, DeltaScanConfigBuilder, DeltaSessionConfig,
DeltaTableProvider,
Expand Down Expand Up @@ -576,7 +575,7 @@ pub struct MergeMetrics {
/// Time taken to rewrite the matched files
pub rewrite_time_ms: u64,
}

#[derive(Clone)]
struct MergeMetricExtensionPlanner {}

#[async_trait]
Expand Down Expand Up @@ -1017,7 +1016,11 @@ async fn execute(
let exec_start = Instant::now();

let current_metadata = snapshot.metadata();
let state = state.with_query_planner(Arc::new(MergePlanner {}));
let merge_planner = DeltaPlanner::<MergeMetricExtensionPlanner> {
extension_planner: MergeMetricExtensionPlanner {},
};

let state = state.with_query_planner(Arc::new(merge_planner));

// TODO: Given the join predicate, remove any expression that involve the
// source table and keep expressions that only involve the target table.
Expand Down Expand Up @@ -1486,9 +1489,6 @@ async fn execute(

let source_count_metrics = source_count.metrics().unwrap();
let target_count_metrics = op_count.metrics().unwrap();
fn get_metric(metrics: &MetricsSet, name: &str) -> usize {
metrics.sum_by_name(name).map(|m| m.as_usize()).unwrap_or(0)
}

metrics.num_source_rows = get_metric(&source_count_metrics, SOURCE_COUNT_METRIC);
metrics.num_target_rows_inserted = get_metric(&target_count_metrics, TARGET_INSERTED_METRIC);
Expand Down Expand Up @@ -1555,25 +1555,6 @@ fn remove_table_alias(expr: Expr, table_alias: &str) -> Expr {
.data
}

// TODO: Abstract MergePlanner into DeltaPlanner to support other delta operations in the future.
struct MergePlanner {}

#[async_trait]
impl QueryPlanner for MergePlanner {
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let planner = Arc::new(Box::new(DefaultPhysicalPlanner::with_extension_planners(
vec![Arc::new(MergeMetricExtensionPlanner {})],
)));
planner
.create_physical_plan(logical_plan, session_state)
.await
}
}

impl std::future::IntoFuture for MergeBuilder {
type Output = DeltaResult<(DeltaTable, MergeMetrics)>;
type IntoFuture = BoxFuture<'static, Self::Output>;
Expand Down
Loading

0 comments on commit a6aeeca

Please sign in to comment.