Skip to content

Commit

Permalink
feat(rust, python): add drop constraint operation (#2071)
Browse files Browse the repository at this point in the history
# Description
Adds `drop constraint` as an operation, and also exposed it to python
under the alter namespace 😄

# Related Issue(s)
- closes #2070

---------

Co-authored-by: Robert Pack <42610831+roeap@users.noreply.github.com>
Co-authored-by: R. Tyler Croy <rtyler@brokenco.de>
  • Loading branch information
3 people authored Feb 27, 2024
1 parent 8152101 commit 7f0454e
Show file tree
Hide file tree
Showing 8 changed files with 346 additions and 6 deletions.
5 changes: 1 addition & 4 deletions crates/core/src/operations/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,7 @@ impl std::future::IntoFuture for ConstraintBuilder {
expr: expr_str.clone(),
};

let app_metadata = match this.app_metadata {
Some(metadata) => metadata,
None => HashMap::default(),
};
let app_metadata = this.app_metadata.unwrap_or_default();

let commit_info = CommitInfo {
timestamp: Some(Utc::now().timestamp_millis()),
Expand Down
205 changes: 205 additions & 0 deletions crates/core/src/operations/drop_constraints.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
//! Drop a constraint from a table

use std::collections::HashMap;

use chrono::Utc;
use futures::future::BoxFuture;
use serde_json::json;

use crate::kernel::{Action, CommitInfo, IsolationLevel};
use crate::logstore::LogStoreRef;
use crate::operations::transaction::commit;
use crate::protocol::DeltaOperation;
use crate::table::state::DeltaTableState;
use crate::DeltaTable;
use crate::{DeltaResult, DeltaTableError};

/// Remove constraints from the table
pub struct DropConstraintBuilder {
/// A snapshot of the table's state
snapshot: DeltaTableState,
/// Name of the constraint
name: Option<String>,
/// Raise if constraint doesn't exist
raise_if_not_exists: bool,
/// Delta object store for handling data files
log_store: LogStoreRef,
/// Additional metadata to be added to commit
app_metadata: Option<HashMap<String, serde_json::Value>>,
}

impl DropConstraintBuilder {
/// Create a new builder
pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self {
Self {
name: None,
raise_if_not_exists: true,
snapshot,
log_store,
app_metadata: None,
}
}

/// Specify the constraint to be removed
pub fn with_constraint<S: Into<String>>(mut self, name: S) -> Self {
self.name = Some(name.into());
self
}

/// Specify if you want to raise if the constraint does not exist
pub fn with_raise_if_not_exists(mut self, raise: bool) -> Self {
self.raise_if_not_exists = raise;
self
}

/// Additional metadata to be added to commit info
pub fn with_metadata(
mut self,
metadata: impl IntoIterator<Item = (String, serde_json::Value)>,
) -> Self {
self.app_metadata = Some(HashMap::from_iter(metadata));
self
}
}

impl std::future::IntoFuture for DropConstraintBuilder {
type Output = DeltaResult<DeltaTable>;

type IntoFuture = BoxFuture<'static, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
let mut this = self;

Box::pin(async move {
let name = this
.name
.ok_or(DeltaTableError::Generic("No name provided".to_string()))?;

let mut metadata = this.snapshot.metadata().clone();
let configuration_key = format!("delta.constraints.{}", name);

if metadata.configuration.remove(&configuration_key).is_none() {
if this.raise_if_not_exists {
return Err(DeltaTableError::Generic(format!(
"Constraint with name: {} doesn't exists",
name
)));
}
return Ok(DeltaTable::new_with_state(this.log_store, this.snapshot));
}
let operational_parameters = HashMap::from_iter([("name".to_string(), json!(&name))]);

let operations = DeltaOperation::DropConstraint { name: name.clone() };

let app_metadata = this.app_metadata.unwrap_or_default();

let commit_info = CommitInfo {
timestamp: Some(Utc::now().timestamp_millis()),
operation: Some(operations.name().to_string()),
operation_parameters: Some(operational_parameters),
read_version: Some(this.snapshot.version()),
isolation_level: Some(IsolationLevel::Serializable),
is_blind_append: Some(false),
info: app_metadata,
..Default::default()
};

let actions = vec![Action::CommitInfo(commit_info), Action::Metadata(metadata)];

let version = commit(
this.log_store.as_ref(),
&actions,
operations.clone(),
Some(&this.snapshot),
None,
)
.await?;

this.snapshot.merge(actions, &operations, version)?;
Ok(DeltaTable::new_with_state(this.log_store, this.snapshot))
})
}
}

#[cfg(feature = "datafusion")]
#[cfg(test)]
mod tests {
use crate::writer::test_utils::{create_bare_table, get_record_batch};
use crate::{DeltaOps, DeltaResult, DeltaTable};

async fn get_constraint_op_params(table: &mut DeltaTable) -> String {
let commit_info = table.history(None).await.unwrap();
let last_commit = &commit_info[0];

last_commit
.operation_parameters
.as_ref()
.unwrap()
.get("name")
.unwrap()
.as_str()
.unwrap()
.to_owned()
}

#[tokio::test]
async fn drop_valid_constraint() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.await?;
let table = DeltaOps(write);

let table = table
.add_constraint()
.with_constraint("id", "value < 1000")
.await?;

let mut table = DeltaOps(table)
.drop_constraints()
.with_constraint("id")
.await?;

let expected_name = "id";
assert_eq!(get_constraint_op_params(&mut table).await, expected_name);
assert_eq!(table.metadata().unwrap().configuration.get("id"), None);
Ok(())
}

#[tokio::test]
async fn drop_invalid_constraint_not_existing() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.await?;

let table = DeltaOps(write)
.drop_constraints()
.with_constraint("not_existing")
.await;
assert!(table.is_err());

Ok(())
}

#[tokio::test]
async fn drop_invalid_constraint_ignore() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = DeltaOps(create_bare_table())
.write(vec![batch.clone()])
.await?;

let version = write.version();

let table = DeltaOps(write)
.drop_constraints()
.with_constraint("not_existing")
.with_raise_if_not_exists(false)
.await?;

let version_after = table.version();

assert_eq!(version, version_after);
Ok(())
}
}
11 changes: 10 additions & 1 deletion crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::collections::HashMap;
pub mod cast;
pub mod convert_to_delta;
pub mod create;
pub mod drop_constraints;
pub mod filesystem_check;
pub mod optimize;
pub mod restore;
Expand All @@ -27,7 +28,8 @@ pub mod vacuum;
#[cfg(feature = "datafusion")]
use self::{
constraints::ConstraintBuilder, datafusion_utils::Expression, delete::DeleteBuilder,
load::LoadBuilder, merge::MergeBuilder, update::UpdateBuilder, write::WriteBuilder,
drop_constraints::DropConstraintBuilder, load::LoadBuilder, merge::MergeBuilder,
update::UpdateBuilder, write::WriteBuilder,
};
#[cfg(feature = "datafusion")]
pub use ::datafusion::physical_plan::common::collect as collect_sendable_stream;
Expand Down Expand Up @@ -199,6 +201,13 @@ impl DeltaOps {
pub fn add_constraint(self) -> ConstraintBuilder {
ConstraintBuilder::new(self.0.log_store, self.0.state.unwrap())
}

/// Drops constraints from a table
#[cfg(feature = "datafusion")]
#[must_use]
pub fn drop_constraints(self) -> DropConstraintBuilder {
DropConstraintBuilder::new(self.0.log_store, self.0.state.unwrap())
}
}

impl From<DeltaTable> for DeltaOps {
Expand Down
10 changes: 9 additions & 1 deletion crates/core/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,12 @@ pub enum DeltaOperation {
expr: String,
},

/// Drops constraints from a table
DropConstraint {
/// Constraints name
name: String,
},

/// Merge data with a source data with the following predicate
#[serde(rename_all = "camelCase")]
Merge {
Expand Down Expand Up @@ -458,6 +464,7 @@ impl DeltaOperation {
DeltaOperation::VacuumStart { .. } => "VACUUM START",
DeltaOperation::VacuumEnd { .. } => "VACUUM END",
DeltaOperation::AddConstraint { .. } => "ADD CONSTRAINT",
DeltaOperation::DropConstraint { .. } => "DROP CONSTRAINT",
}
}

Expand Down Expand Up @@ -496,7 +503,8 @@ impl DeltaOperation {
Self::Optimize { .. }
| Self::VacuumStart { .. }
| Self::VacuumEnd { .. }
| Self::AddConstraint { .. } => false,
| Self::AddConstraint { .. }
| Self::DropConstraint { .. } => false,
Self::Create { .. }
| Self::FileSystemCheck {}
| Self::StreamingUpdate { .. }
Expand Down
6 changes: 6 additions & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ class RawDeltaTable:
constraints: Dict[str, str],
custom_metadata: Optional[Dict[str, str]],
) -> None: ...
def drop_constraints(
self,
name: str,
raise_if_not_exists: bool,
custom_metadata: Optional[Dict[str, str]],
) -> None: ...
def restore(
self,
target: Optional[Any],
Expand Down
34 changes: 34 additions & 0 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1775,6 +1775,40 @@ def add_constraint(

self.table._table.add_constraints(constraints, custom_metadata)

def drop_constraint(
self,
name: str,
raise_if_not_exists: bool = True,
custom_metadata: Optional[Dict[str, str]] = None,
) -> None:
"""
Drop constraints from a table. Limited to `single constraint` at once.
Args:
name: constraint name which to drop.
raise_if_not_exists: set if should raise if not exists.
custom_metadata: custom metadata that will be added to the transaction commit.
Example:
```python
from deltalake import DeltaTable
dt = DeltaTable("test_table_constraints")
dt.metadata().configuration
{'delta.constraints.value_gt_5': 'value > 5'}
```
**Drop the constraint**
```python
dt.alter.drop_constraint(name = "value_gt_5")
```
**Configuration after dropping**
```python
dt.metadata().configuration
{}
```
"""
self.table._table.drop_constraints(name, raise_if_not_exists, custom_metadata)


class TableOptimizer:
"""API for various table optimization commands."""
Expand Down
28 changes: 28 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use deltalake::kernel::{Action, Add, Invariant, LogicalFile, Remove, Scalar, Str
use deltalake::operations::constraints::ConstraintBuilder;
use deltalake::operations::convert_to_delta::{ConvertToDeltaBuilder, PartitionStrategy};
use deltalake::operations::delete::DeleteBuilder;
use deltalake::operations::drop_constraints::DropConstraintBuilder;
use deltalake::operations::filesystem_check::FileSystemCheckBuilder;
use deltalake::operations::merge::MergeBuilder;
use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType};
Expand Down Expand Up @@ -470,6 +471,33 @@ impl RawDeltaTable {
Ok(())
}

#[pyo3(signature = (name, raise_if_not_exists, custom_metadata=None))]
pub fn drop_constraints(
&mut self,
name: String,
raise_if_not_exists: bool,
custom_metadata: Option<HashMap<String, String>>,
) -> PyResult<()> {
let mut cmd = DropConstraintBuilder::new(
self._table.log_store(),
self._table.snapshot().map_err(PythonError::from)?.clone(),
)
.with_constraint(name)
.with_raise_if_not_exists(raise_if_not_exists);

if let Some(metadata) = custom_metadata {
let json_metadata: Map<String, Value> =
metadata.into_iter().map(|(k, v)| (k, v.into())).collect();
cmd = cmd.with_metadata(json_metadata);
};

let table = rt()?
.block_on(cmd.into_future())
.map_err(PythonError::from)?;
self._table.state = table.state;
Ok(())
}

#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (source,
predicate,
Expand Down
Loading

0 comments on commit 7f0454e

Please sign in to comment.