Skip to content

Commit

Permalink
refactor: move vacuum command to operations module (#1045)
Browse files Browse the repository at this point in the history
# Description

Moving the `vacuum` operation into the operations module and adopting
`IntoFuture` for the command builder. This is breaking the APIs for the
builder (now with consistent setter names) but we are able to keep the
APIs for `DeltaTable` in rust and python.

In a follow up I would like to move th optimize command as well, This
however may require refactoring the `PartitionValue` since we can only
deal with `static` lifetimes when using `IntoFuture`, A while back we
talked about pulling in `ScalarValue` from datafusion to optimize that
implementation and maybe that's a good opportunitiy to look into that as
well.

# Related Issue(s)
<!---
For example:

- closes #106
--->

# Documentation

<!---
Share links to useful documentation
--->

Co-authored-by: Will Jones <willjones127@gmail.com>
  • Loading branch information
roeap and wjones127 authored Jan 5, 2023
1 parent f59c21f commit 50ea9f5
Show file tree
Hide file tree
Showing 16 changed files with 642 additions and 610 deletions.
35 changes: 18 additions & 17 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,20 @@ mod schema;
mod utils;

use arrow::pyarrow::PyArrowType;
use chrono::{DateTime, FixedOffset, Utc};
use chrono::{DateTime, Duration, FixedOffset, Utc};
use deltalake::action::{
self, Action, ColumnCountStat, ColumnValueStat, DeltaOperation, SaveMode, Stats,
};
use deltalake::arrow::record_batch::RecordBatch;
use deltalake::arrow::{self, datatypes::Schema as ArrowSchema};
use deltalake::builder::DeltaTableBuilder;
use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::operations::vacuum::VacuumBuilder;
use deltalake::partitions::PartitionFilter;
use deltalake::DeltaDataTypeLong;
use deltalake::DeltaDataTypeTimestamp;
use deltalake::DeltaTableMetaData;
use deltalake::DeltaTransactionOptions;
use deltalake::{Invariant, Schema};
use deltalake::{
DeltaDataTypeLong, DeltaDataTypeTimestamp, DeltaTableMetaData, DeltaTransactionOptions,
Invariant, Schema,
};
use pyo3::create_exception;
use pyo3::exceptions::PyException;
use pyo3::exceptions::PyValueError;
Expand Down Expand Up @@ -49,10 +49,6 @@ impl PyDeltaTableError {
PyDeltaTableError::new_err(err.to_string())
}

fn from_vacuum_error(err: deltalake::vacuum::VacuumError) -> pyo3::PyErr {
PyDeltaTableError::new_err(err.to_string())
}

fn from_tokio(err: tokio::io::Error) -> pyo3::PyErr {
PyDeltaTableError::new_err(err.to_string())
}
Expand Down Expand Up @@ -288,12 +284,17 @@ impl RawDeltaTable {
retention_hours: Option<u64>,
enforce_retention_duration: bool,
) -> PyResult<Vec<String>> {
rt()?
.block_on(
self._table
.vacuum(retention_hours, dry_run, enforce_retention_duration),
)
.map_err(PyDeltaTableError::from_vacuum_error)
let mut cmd = VacuumBuilder::new(self._table.object_store(), self._table.state.clone())
.with_enforce_retention_duration(enforce_retention_duration)
.with_dry_run(dry_run);
if let Some(retention_period) = retention_hours {
cmd = cmd.with_retention_period(Duration::hours(retention_period as i64));
}
let (table, metrics) = rt()?
.block_on(async { cmd.await })
.map_err(PyDeltaTableError::from_raw)?;
self._table.state = table.state;
Ok(metrics.files_deleted)
}

// Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table.
Expand Down Expand Up @@ -321,7 +322,7 @@ impl RawDeltaTable {

pub fn update_incremental(&mut self) -> PyResult<()> {
rt()?
.block_on(self._table.update_incremental())
.block_on(self._table.update_incremental(None))
.map_err(PyDeltaTableError::from_raw)
}

Expand Down
2 changes: 1 addition & 1 deletion python/tests/test_vacuum.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_vacuum_dry_run_simple_table():
dt.vacuum(retention_periods)
assert (
str(exception.value)
== "Invalid retention period, minimum retention for vacuum is configured to be greater than 168 hours, got 167 hours"
== "Generic error: Invalid retention period, minimum retention for vacuum is configured to be greater than 168 hours, got 167 hours"
)


Expand Down
6 changes: 6 additions & 0 deletions rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ pub struct Add {
pub tags: Option<HashMap<String, Option<String>>>,
}

impl Hash for Add {
fn hash<H: Hasher>(&self, state: &mut H) {
self.path.hash(state);
}
}

impl Add {
/// Returns the Add action with path decoded.
pub fn path_decoded(self) -> Result<Self, ActionError> {
Expand Down
4 changes: 2 additions & 2 deletions rust/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl DeltaTableBuilder {
Ok(object_store)
}

/// Build the delta Table from specified options.
/// Build the [`DeltaTable`] from specified options.
///
/// This will not load the log, i.e. the table is not initialized. To get an initialized
/// table use the `load` function
Expand Down Expand Up @@ -258,7 +258,7 @@ impl DeltaTableBuilder {
Ok(DeltaTable::new(object_store, config))
}

/// finally load the table
/// Build the [`DeltaTable`] and load its state
pub async fn load(self) -> Result<DeltaTable, DeltaTableError> {
let version = self.options.version.clone();
let mut table = self.build()?;
Expand Down
Loading

0 comments on commit 50ea9f5

Please sign in to comment.