Skip to content

Commit

Permalink
feat(python): expose delete operation (#1687)
Browse files Browse the repository at this point in the history
# Description
Naively expose the delete operation, with the option to provide a
predicate.

I first tried to expose a richer API with the Python `FilterType` and
DNF expressions, but from what I understand delta-rs doesn't implement
generic filters but only `PartitionFilter`. The `DeleteBuilder` also
only accepts datafusion expressions. So Instead of hacking my way around
or proposing a refactor I went for the simpler approach of sending a
string predicate to the rust lib.

If this implementation is OK I will add tests.

# Related Issue(s)
- closes #1417

---------

Co-authored-by: Will Jones <willjones127@gmail.com>
  • Loading branch information
guilhem-dvr and wjones127 authored Oct 5, 2023
1 parent 4da7d66 commit 3ba3426
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 8 deletions.
26 changes: 20 additions & 6 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ def _filters_to_expression(filters: FilterType) -> Expression:
Predicates are expressed in disjunctive normal form (DNF), like [("x", "=", "a"), ...].
DNF allows arbitrary boolean logical combinations of single partition predicates.
The innermost tuples each describe a single partition predicate. The list of inner
predicates is interpreted as a conjunction (AND), forming a more selective and
multiple partition predicates. Each tuple has format: (key, op, value) and compares
the key with the value. The supported op are: `=`, `!=`, `in`, and `not in`. If
predicates is interpreted as a conjunction (AND), forming a more selective and
multiple partition predicates. Each tuple has format: (key, op, value) and compares
the key with the value. The supported op are: `=`, `!=`, `in`, and `not in`. If
the op is in or not in, the value must be a collection such as a list, a set or a tuple.
The supported type for value is str. Use empty string `''` for Null partition value.
Expand Down Expand Up @@ -302,13 +302,13 @@ def files(

files.__doc__ = f"""
Get the .parquet files of the DeltaTable.
The paths are as they are saved in the delta log, which may either be
relative to the table root or absolute URIs.
:param partition_filters: the partition filters that will be used for
:param partition_filters: the partition filters that will be used for
getting the matched files
:return: list of the .parquet files referenced for the current version
:return: list of the .parquet files referenced for the current version
of the DeltaTable
{_DNF_filter_doc}
"""
Expand Down Expand Up @@ -666,6 +666,20 @@ def get_add_actions(self, flatten: bool = False) -> pyarrow.RecordBatch:
"""
return self._table.get_add_actions(flatten)

def delete(self, predicate: Optional[str] = None) -> Dict[str, Any]:
"""Delete records from a Delta Table that statisfy a predicate.
When a predicate is not provided then all records are deleted from the Delta
Table. Otherwise a scan of the Delta table is performed to mark any files
that contain records that satisfy the predicate. Once files are determined
they are rewritten without the records.
:param predicate: a SQL where clause. If not passed, will delete all rows.
:return: the metrics from delete.
"""
metrics = self._table.delete(predicate)
return json.loads(metrics)


class TableOptimizer:
"""API for various table optimization commands."""
Expand Down
15 changes: 15 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use deltalake::checkpoints::create_checkpoint;
use deltalake::datafusion::prelude::SessionContext;
use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::errors::DeltaTableError;
use deltalake::operations::delete::DeleteBuilder;
use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType};
use deltalake::operations::restore::RestoreBuilder;
use deltalake::operations::transaction::commit;
Expand Down Expand Up @@ -594,6 +595,20 @@ impl RawDeltaTable {
.map_err(PythonError::from)?,
))
}

/// Run the delete command on the delta table: delete records following a predicate and return the delete metrics.
#[pyo3(signature = (predicate = None))]
pub fn delete(&mut self, predicate: Option<String>) -> PyResult<String> {
let mut cmd = DeleteBuilder::new(self._table.object_store(), self._table.state.clone());
if let Some(predicate) = predicate {
cmd = cmd.with_predicate(predicate);
}
let (table, metrics) = rt()?
.block_on(cmd.into_future())
.map_err(PythonError::from)?;
self._table.state = table.state;
Ok(serde_json::to_string(&metrics).unwrap())
}
}

fn convert_partition_filters<'a>(
Expand Down
58 changes: 58 additions & 0 deletions python/tests/test_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import pathlib

import pyarrow as pa
import pyarrow.compute as pc

from deltalake.table import DeltaTable
from deltalake.writer import write_deltalake


def test_delete_no_predicates(existing_table: DeltaTable):
old_version = existing_table.version()

existing_table.delete()

last_action = existing_table.history(1)[0]
assert last_action["operation"] == "DELETE"
assert existing_table.version() == old_version + 1

dataset = existing_table.to_pyarrow_dataset()
assert dataset.count_rows() == 0
assert len(existing_table.files()) == 0


def test_delete_a_partition(tmp_path: pathlib.Path, sample_data: pa.Table):
write_deltalake(tmp_path, sample_data, partition_by=["bool"])

dt = DeltaTable(tmp_path)
old_version = dt.version()

mask = pc.equal(sample_data["bool"], False)
expected_table = sample_data.filter(mask)

dt.delete(predicate="bool = true")

last_action = dt.history(1)[0]
assert last_action["operation"] == "DELETE"
assert dt.version() == old_version + 1

table = dt.to_pyarrow_table()
assert table.equals(expected_table)
assert len(dt.files()) == 1


def test_delete_some_rows(existing_table: DeltaTable):
old_version = existing_table.version()

existing = existing_table.to_pyarrow_table()
mask = pc.invert(pc.is_in(existing["utf8"], pa.array(["0", "1"])))
expected_table = existing.filter(mask)

existing_table.delete(predicate="utf8 in ('0', '1')")

last_action = existing_table.history(1)[0]
assert last_action["operation"] == "DELETE"
assert existing_table.version() == old_version + 1

table = existing_table.to_pyarrow_table()
assert table.equals(expected_table)
5 changes: 3 additions & 2 deletions rust/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use datafusion_common::scalar::ScalarValue;
use datafusion_common::DFSchema;
use futures::future::BoxFuture;
use parquet::file::properties::WriterProperties;
use serde::Serialize;
use serde_json::Map;
use serde_json::Value;

Expand Down Expand Up @@ -63,7 +64,7 @@ pub struct DeleteBuilder {
app_metadata: Option<Map<String, serde_json::Value>>,
}

#[derive(Default, Debug)]
#[derive(Default, Debug, Serialize)]
/// Metrics for the Delete Operation
pub struct DeleteMetrics {
/// Number of files added
Expand Down Expand Up @@ -116,7 +117,7 @@ impl DeleteBuilder {
self
}

/// Writer properties passed to parquet writer for when fiiles are rewritten
/// Writer properties passed to parquet writer for when files are rewritten
pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self {
self.writer_properties = Some(writer_properties);
self
Expand Down

0 comments on commit 3ba3426

Please sign in to comment.