Skip to content

Commit

Permalink
feat: merge using partition filters (#1958)
Browse files Browse the repository at this point in the history
# Description
This upgrades merge so that it can leverage partitions where specified
in the join predicate. There are two ways we can leverage partitions:

1. static references, i.e `target.partition = 1`.
2. Inferring from the data, i.e `source.partition = target.partition`.

In the first case, this implements the logic described in [this
comment](https://github.com/delta-io/delta-rs/blob/main/crates/deltalake-core/src/operations/merge.rs#L670).
Any predicate mentioning the source that is not covered by (2) is
pruned, which will leave predicates on just the target columns (and will
be amenable to file pruning)

In the second case, we first construct a version of the predicate with
references to source replaced with placeholders:

```sql
target.partition = source.partition and foo > 42
```

becomes:

```sql
target.partition = $1 and foo > 42
```

We then stream through the source table, gathering the distinct tuples
of the mentioned partitions:

```
| partition |
-------------
|       1   |
|       5   |
|       7   |
```

and then expand out the sql to take these into account:

```sql
(target.partition = 1 and foo > 42)
or (target.partition = 5 and foo > 42)
or (target.partition = 7 and foo > 42)
```
And insert this filter into the target chain. We also use the same
filter to process the file list, meaning we only make remove actions for
files that will be targeted by the scan.

I considered whether it would be possible to do this via datafusion sql
in a generic manner, for example by first joining against the distinct
partitions. I don't think it's possible - because each of the filters on
the logical plans are static, there's no opportunity for it to push the
distinct partition tuples down into the scan. Another variant would be
to make it so the source and partition tables share the same
`output_partitioning` structure, but as far as I can tell you wouldn't
be able to make the partitions line up such that you can do the merge
effectively and not read the whole table (plus `DeltaScan` doesn't
guarantee that one datafusion partition is one DeltaTable partition).

I think the static bit is a no brainer but the eager read of the source
table may cause issues if the source table is of a similar size to the
target table. It may be prudent hide that part behind a feature flag on
the merge, but would love comments on it.

# Performance

I created a 16GB table locally with 1.25 billion rows over 1k
partitions, and when updating 1 partition a full merge takes 1000-ish
seconds:

```
merge took 985.0801 seconds
merge metrics: MergeMetrics { num_source_rows: 1250000, num_target_rows_inserted: 468790, num_target_rows_updated: 781210, num_target_rows_deleted: 0, num_target_rows_copied: 1249687667, num_output_rows: 1250937667, num_target_files_added: 1001, num_target_files_removed: 1001, execution_time_ms: 983851, scan_time_ms: 0, rewrite_time_ms: 983322 }
```

but with partitioning it takes about 3:
```
merge took 2.6337671 seconds
merge metrics: MergeMetrics { num_source_rows: 1250000, num_target_rows_inserted: 468877, num_target_rows_updated: 781123, num_target_rows_deleted: 0, num_target_rows_copied: 468877, num_output_rows: 1718877, num_target_files_added: 2, num_target_files_removed: 2, execution_time_ms: 2622, scan_time_ms: 0, rewrite_time_ms: 2316 }
```

In practice, the tables I'm wanting to use this for are terabytes in
size so using merge is currently impractical. This would be a
significant speed boost to them.


# Related Issue(s)
closes #1846

---------

Co-authored-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com>
  • Loading branch information
emcake and ion-elgreco authored Dec 20, 2023
1 parent c14d577 commit 11ea2a5
Show file tree
Hide file tree
Showing 2 changed files with 640 additions and 20 deletions.
26 changes: 26 additions & 0 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use datafusion_sql::planner::ParserOptions;
use futures::TryStreamExt;

use itertools::Itertools;
use log::error;
Expand Down Expand Up @@ -1019,6 +1020,31 @@ pub(crate) fn logical_expr_to_physical_expr(
create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap()
}

pub(crate) async fn execute_plan_to_batch(
state: &SessionState,
plan: Arc<dyn ExecutionPlan>,
) -> DeltaResult<arrow::record_batch::RecordBatch> {
let data =
futures::future::try_join_all((0..plan.output_partitioning().partition_count()).map(|p| {
let plan_copy = plan.clone();
let task_context = state.task_ctx().clone();
async move {
let batch_stream = plan_copy.execute(p, task_context)?;

let schema = batch_stream.schema();

let batches = batch_stream.try_collect::<Vec<_>>().await?;

DataFusionResult::<_>::Ok(arrow::compute::concat_batches(&schema, batches.iter())?)
}
}))
.await?;

let batch = arrow::compute::concat_batches(&plan.schema(), data.iter())?;

Ok(batch)
}

/// Responsible for checking batches of data conform to table's invariants.
#[derive(Clone)]
pub struct DeltaDataChecker {
Expand Down
Loading

0 comments on commit 11ea2a5

Please sign in to comment.