Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): expose delete operation #1687

Merged
merged 7 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ def _filters_to_expression(filters: FilterType) -> Expression:
Predicates are expressed in disjunctive normal form (DNF), like [("x", "=", "a"), ...].
DNF allows arbitrary boolean logical combinations of single partition predicates.
The innermost tuples each describe a single partition predicate. The list of inner
predicates is interpreted as a conjunction (AND), forming a more selective and
multiple partition predicates. Each tuple has format: (key, op, value) and compares
the key with the value. The supported op are: `=`, `!=`, `in`, and `not in`. If
predicates is interpreted as a conjunction (AND), forming a more selective and
multiple partition predicates. Each tuple has format: (key, op, value) and compares
the key with the value. The supported op are: `=`, `!=`, `in`, and `not in`. If
the op is in or not in, the value must be a collection such as a list, a set or a tuple.
The supported type for value is str. Use empty string `''` for Null partition value.

Expand Down Expand Up @@ -302,13 +302,13 @@ def files(

files.__doc__ = f"""
Get the .parquet files of the DeltaTable.

The paths are as they are saved in the delta log, which may either be
relative to the table root or absolute URIs.

:param partition_filters: the partition filters that will be used for
:param partition_filters: the partition filters that will be used for
getting the matched files
:return: list of the .parquet files referenced for the current version
:return: list of the .parquet files referenced for the current version
of the DeltaTable
{_DNF_filter_doc}
"""
Expand Down Expand Up @@ -666,6 +666,20 @@ def get_add_actions(self, flatten: bool = False) -> pyarrow.RecordBatch:
"""
return self._table.get_add_actions(flatten)

def delete(self, predicate: Optional[str] = None) -> Dict[str, Any]:
"""Delete records from a Delta Table that statisfy a predicate.

When a predicate is not provided then all records are deleted from the Delta
Table. Otherwise a scan of the Delta table is performed to mark any files
that contain records that satisfy the predicate. Once files are determined
they are rewritten without the records.

:param predicate: a SQL where clause. If not passed, will delete all rows.
:return: the metrics from delete.
"""
metrics = self._table.delete(predicate)
return json.loads(metrics)


class TableOptimizer:
"""API for various table optimization commands."""
Expand Down
15 changes: 15 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use deltalake::checkpoints::create_checkpoint;
use deltalake::datafusion::prelude::SessionContext;
use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::errors::DeltaTableError;
use deltalake::operations::delete::DeleteBuilder;
use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType};
use deltalake::operations::restore::RestoreBuilder;
use deltalake::operations::transaction::commit;
Expand Down Expand Up @@ -594,6 +595,20 @@ impl RawDeltaTable {
.map_err(PythonError::from)?,
))
}

/// Run the delete command on the delta table: delete records following a predicate and return the delete metrics.
#[pyo3(signature = (predicate = None))]
pub fn delete(&mut self, predicate: Option<String>) -> PyResult<String> {
let mut cmd = DeleteBuilder::new(self._table.object_store(), self._table.state.clone());
if let Some(predicate) = predicate {
cmd = cmd.with_predicate(predicate);
}
let (table, metrics) = rt()?
.block_on(cmd.into_future())
.map_err(PythonError::from)?;
self._table.state = table.state;
Ok(serde_json::to_string(&metrics).unwrap())
}
}

fn convert_partition_filters<'a>(
Expand Down
58 changes: 58 additions & 0 deletions python/tests/test_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import pathlib

import pyarrow as pa
import pyarrow.compute as pc

from deltalake.table import DeltaTable
from deltalake.writer import write_deltalake


def test_delete_no_predicates(existing_table: DeltaTable):
old_version = existing_table.version()

existing_table.delete()

last_action = existing_table.history(1)[0]
assert last_action["operation"] == "DELETE"
assert existing_table.version() == old_version + 1

dataset = existing_table.to_pyarrow_dataset()
assert dataset.count_rows() == 0
assert len(existing_table.files()) == 0


def test_delete_a_partition(tmp_path: pathlib.Path, sample_data: pa.Table):
write_deltalake(tmp_path, sample_data, partition_by=["bool"])

dt = DeltaTable(tmp_path)
old_version = dt.version()

mask = pc.equal(sample_data["bool"], False)
expected_table = sample_data.filter(mask)

dt.delete(predicate="bool = true")

last_action = dt.history(1)[0]
assert last_action["operation"] == "DELETE"
assert dt.version() == old_version + 1

table = dt.to_pyarrow_table()
assert table.equals(expected_table)
assert len(dt.files()) == 1


def test_delete_some_rows(existing_table: DeltaTable):
old_version = existing_table.version()

existing = existing_table.to_pyarrow_table()
mask = pc.invert(pc.is_in(existing["utf8"], pa.array(["0", "1"])))
expected_table = existing.filter(mask)

existing_table.delete(predicate="utf8 in ('0', '1')")

last_action = existing_table.history(1)[0]
assert last_action["operation"] == "DELETE"
assert existing_table.version() == old_version + 1

table = existing_table.to_pyarrow_table()
assert table.equals(expected_table)
5 changes: 3 additions & 2 deletions rust/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use datafusion_common::scalar::ScalarValue;
use datafusion_common::DFSchema;
use futures::future::BoxFuture;
use parquet::file::properties::WriterProperties;
use serde::Serialize;
use serde_json::Map;
use serde_json::Value;

Expand Down Expand Up @@ -63,7 +64,7 @@ pub struct DeleteBuilder {
app_metadata: Option<Map<String, serde_json::Value>>,
}

#[derive(Default, Debug)]
#[derive(Default, Debug, Serialize)]
/// Metrics for the Delete Operation
pub struct DeleteMetrics {
/// Number of files added
Expand Down Expand Up @@ -116,7 +117,7 @@ impl DeleteBuilder {
self
}

/// Writer properties passed to parquet writer for when fiiles are rewritten
/// Writer properties passed to parquet writer for when files are rewritten
pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self {
self.writer_properties = Some(writer_properties);
self
Expand Down
Loading