Skip to content

Commit

Permalink
feat: use delta planner in delete and merge
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Aug 4, 2024
1 parent 6488b82 commit 31a1aa6
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 58 deletions.
115 changes: 82 additions & 33 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,23 @@
//! .await?;
//! ````

use crate::delta_datafusion::logical::MetricObserver;
use crate::delta_datafusion::physical::{find_metric_node, get_metric, MetricObserverExec};
use crate::delta_datafusion::planner::DeltaPlanner;
use crate::logstore::LogStoreRef;
use core::panic;
use async_trait::async_trait;
use datafusion::dataframe::DataFrame;
use datafusion::datasource::provider_as_source;
use datafusion::error::Result as DataFusionResult;
use datafusion::execution::context::{SessionContext, SessionState};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::metrics::MetricBuilder;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
use datafusion::prelude::Expr;
use datafusion_common::scalar::ScalarValue;
use datafusion_common::DFSchema;
use datafusion_expr::{lit, LogicalPlanBuilder};
use datafusion_physical_expr::{
expressions::{self},
PhysicalExpr,
};
use datafusion_common::ScalarValue;
use datafusion_expr::{lit, Extension, LogicalPlan, LogicalPlanBuilder, UserDefinedLogicalNode};

use futures::future::BoxFuture;
use std::iter;
use std::sync::Arc;
use std::time::{Instant, SystemTime, UNIX_EPOCH};

Expand All @@ -48,15 +47,18 @@ use super::write::{write_execution_plan_cdc, WriterStatsConfig};

use crate::delta_datafusion::expr::fmt_expr_to_sql;
use crate::delta_datafusion::{
find_files, register_store, DataFusionMixins, DeltaScanBuilder, DeltaScanConfigBuilder,
DeltaSessionContext, DeltaTableProvider,
find_files, register_store, DataFusionMixins, DeltaScanConfigBuilder, DeltaSessionContext,
DeltaTableProvider,
};
use crate::errors::DeltaResult;
use crate::kernel::{Action, Add, Remove};
use crate::operations::write::{write_execution_plan, SchemaMode};
use crate::protocol::DeltaOperation;
use crate::table::state::DeltaTableState;
use crate::DeltaTable;
use crate::{DeltaTable, DeltaTableError};

const SOURCE_COUNT_ID: &str = "delete_source_count";
const SOURCE_COUNT_METRIC: &str = "num_source_rows";

/// Delete Records from the Delta Table.
/// See this module's documentation for more information
Expand All @@ -83,9 +85,9 @@ pub struct DeleteMetrics {
/// Number of files removed
pub num_removed_files: usize,
/// Number of rows removed
pub num_deleted_rows: Option<usize>,
pub num_deleted_rows: usize,
/// Number of rows copied in the process of deleting files
pub num_copied_rows: Option<usize>,
pub num_copied_rows: usize,
/// Time taken to execute the entire operation
pub execution_time_ms: u64,
/// Time taken to scan the file for matches
Expand Down Expand Up @@ -134,6 +136,36 @@ impl DeleteBuilder {
}
}

#[derive(Clone)]
struct DeleteMetricExtensionPlanner {}

#[async_trait]
impl ExtensionPlanner for DeleteMetricExtensionPlanner {
async fn plan_extension(
&self,
planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
_logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
session_state: &SessionState,
) -> DataFusionResult<Option<Arc<dyn ExecutionPlan>>> {
if let Some(metric_observer) = node.as_any().downcast_ref::<MetricObserver>() {
if metric_observer.id.eq(SOURCE_COUNT_ID) {
return Ok(Some(MetricObserverExec::try_new(
SOURCE_COUNT_ID.into(),
physical_inputs,
|batch, metrics| {
MetricBuilder::new(metrics)
.global_counter(SOURCE_COUNT_METRIC)
.add(batch.num_rows());
},
)?));
}
}
Ok(None)
}
}

#[allow(clippy::too_many_arguments)]
async fn excute_non_empty_expr(
snapshot: &DeltaTableState,
Expand All @@ -150,6 +182,12 @@ async fn excute_non_empty_expr(
let mut actions: Vec<Action> = Vec::new();
let table_partition_cols = snapshot.metadata().partition_columns.clone();

let delete_planner = DeltaPlanner::<DeleteMetricExtensionPlanner> {
extension_planner: DeleteMetricExtensionPlanner {},
};

let state = state.clone().with_query_planner(Arc::new(delete_planner));

let scan_config = DeltaScanConfigBuilder::default()
.with_file_column(false)
.with_schema(snapshot.input_schema()?)
Expand All @@ -160,9 +198,17 @@ async fn excute_non_empty_expr(
.with_files(rewrite.to_vec()),
);
let target_provider = provider_as_source(target_provider);
let plan = LogicalPlanBuilder::scan("target", target_provider.clone(), None)?.build()?;
let source = LogicalPlanBuilder::scan("target", target_provider.clone(), None)?.build()?;

let source = LogicalPlan::Extension(Extension {
node: Arc::new(MetricObserver {
id: "delete_source_count".into(),
input: source,
enable_pushdown: false,
}),
});

let df = DataFrame::new(state.clone(), plan);
let df = DataFrame::new(state.clone(), source);

let writer_stats_config = WriterStatsConfig::new(
snapshot.table_config().num_indexed_cols(),
Expand Down Expand Up @@ -200,12 +246,15 @@ async fn excute_non_empty_expr(

actions.extend(add_actions);

let read_records = Some(df.clone().count().await?);
let filter_records = filter.metrics().and_then(|m| m.output_rows());
let source_count = find_metric_node(SOURCE_COUNT_ID, &filter).ok_or_else(|| {
DeltaTableError::Generic("Unable to locate expected metric node".into())
})?;
let source_count_metrics = source_count.metrics().unwrap();
let read_records = get_metric(&source_count_metrics, SOURCE_COUNT_METRIC);
let filter_records = filter.metrics().and_then(|m| m.output_rows()).unwrap_or(0);

metrics.num_copied_rows = filter_records;
metrics.num_deleted_rows = read_records
.zip(filter_records)
.map(|(read, filter)| read - filter);
metrics.num_deleted_rows = read_records - filter_records;
}

// CDC logic, simply filters data with predicate and adds the _change_type="delete" as literal column
Expand Down Expand Up @@ -459,8 +508,8 @@ mod tests {
assert_eq!(table.get_files_count(), 0);
assert_eq!(metrics.num_added_files, 0);
assert_eq!(metrics.num_removed_files, 1);
assert_eq!(metrics.num_deleted_rows, None);
assert_eq!(metrics.num_copied_rows, None);
assert_eq!(metrics.num_deleted_rows, 0);
assert_eq!(metrics.num_copied_rows, 0);

let commit_info = table.history(None).await.unwrap();
let last_commit = &commit_info[0];
Expand All @@ -475,8 +524,8 @@ mod tests {
assert_eq!(table.version(), 2);
assert_eq!(metrics.num_added_files, 0);
assert_eq!(metrics.num_removed_files, 0);
assert_eq!(metrics.num_deleted_rows, None);
assert_eq!(metrics.num_copied_rows, None);
assert_eq!(metrics.num_deleted_rows, 0);
assert_eq!(metrics.num_copied_rows, 0);
}

#[tokio::test]
Expand Down Expand Up @@ -547,8 +596,8 @@ mod tests {
assert_eq!(metrics.num_added_files, 1);
assert_eq!(metrics.num_removed_files, 1);
assert!(metrics.scan_time_ms > 0);
assert_eq!(metrics.num_deleted_rows, Some(1));
assert_eq!(metrics.num_copied_rows, Some(3));
assert_eq!(metrics.num_deleted_rows, 1);
assert_eq!(metrics.num_copied_rows, 3);

let commit_info = table.history(None).await.unwrap();
let last_commit = &commit_info[0];
Expand Down Expand Up @@ -702,8 +751,8 @@ mod tests {

assert_eq!(metrics.num_added_files, 0);
assert_eq!(metrics.num_removed_files, 1);
assert_eq!(metrics.num_deleted_rows, None);
assert_eq!(metrics.num_copied_rows, None);
assert_eq!(metrics.num_deleted_rows, 0);
assert_eq!(metrics.num_copied_rows, 0);
assert!(metrics.scan_time_ms > 0);

let expected = vec![
Expand Down Expand Up @@ -763,8 +812,8 @@ mod tests {

assert_eq!(metrics.num_added_files, 0);
assert_eq!(metrics.num_removed_files, 1);
assert_eq!(metrics.num_deleted_rows, Some(1));
assert_eq!(metrics.num_copied_rows, Some(0));
assert_eq!(metrics.num_deleted_rows, 1);
assert_eq!(metrics.num_copied_rows, 0);
assert!(metrics.scan_time_ms > 0);

let expected = [
Expand Down
33 changes: 8 additions & 25 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ use super::datafusion_utils::{into_expr, maybe_into_expr, Expression};
use super::transaction::{CommitProperties, PROTOCOL};
use crate::delta_datafusion::expr::{fmt_expr_to_sql, parse_predicate_expression};
use crate::delta_datafusion::logical::MetricObserver;
use crate::delta_datafusion::physical::{find_metric_node, MetricObserverExec};
use crate::delta_datafusion::physical::{find_metric_node, get_metric, MetricObserverExec};
use crate::delta_datafusion::planner::DeltaPlanner;
use crate::delta_datafusion::{
execute_plan_to_batch, register_store, DeltaColumn, DeltaScanConfigBuilder, DeltaSessionConfig,
DeltaTableProvider,
Expand Down Expand Up @@ -576,7 +577,7 @@ pub struct MergeMetrics {
/// Time taken to rewrite the matched files
pub rewrite_time_ms: u64,
}

#[derive(Clone)]
struct MergeMetricExtensionPlanner {}

#[async_trait]
Expand Down Expand Up @@ -1017,7 +1018,11 @@ async fn execute(
let exec_start = Instant::now();

let current_metadata = snapshot.metadata();
let state = state.with_query_planner(Arc::new(MergePlanner {}));
let merge_planner = DeltaPlanner::<MergeMetricExtensionPlanner> {
extension_planner: MergeMetricExtensionPlanner {},
};

let state = state.with_query_planner(Arc::new(merge_planner));

// TODO: Given the join predicate, remove any expression that involve the
// source table and keep expressions that only involve the target table.
Expand Down Expand Up @@ -1486,9 +1491,6 @@ async fn execute(

let source_count_metrics = source_count.metrics().unwrap();
let target_count_metrics = op_count.metrics().unwrap();
fn get_metric(metrics: &MetricsSet, name: &str) -> usize {
metrics.sum_by_name(name).map(|m| m.as_usize()).unwrap_or(0)
}

metrics.num_source_rows = get_metric(&source_count_metrics, SOURCE_COUNT_METRIC);
metrics.num_target_rows_inserted = get_metric(&target_count_metrics, TARGET_INSERTED_METRIC);
Expand Down Expand Up @@ -1555,25 +1557,6 @@ fn remove_table_alias(expr: Expr, table_alias: &str) -> Expr {
.data
}

// TODO: Abstract MergePlanner into DeltaPlanner to support other delta operations in the future.
struct MergePlanner {}

#[async_trait]
impl QueryPlanner for MergePlanner {
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let planner = Arc::new(Box::new(DefaultPhysicalPlanner::with_extension_planners(
vec![Arc::new(MergeMetricExtensionPlanner {})],
)));
planner
.create_physical_plan(logical_plan, session_state)
.await
}
}

impl std::future::IntoFuture for MergeBuilder {
type Output = DeltaResult<(DeltaTable, MergeMetrics)>;
type IntoFuture = BoxFuture<'static, Self::Output>;
Expand Down

0 comments on commit 31a1aa6

Please sign in to comment.