Skip to content

Commit

Permalink
feat: merge operation (#1522)
Browse files Browse the repository at this point in the history
# Description
Implement the Merge operation using Datafusion.

Currently the implementation rewrites the entire DeltaTable limiting the
files that are rewritten will be performed in future work.

# Related Issue(s)
- progresses #850


# Documentation

<!---
Share links to useful documentation
--->

---------

Co-authored-by: Will Jones <willjones127@gmail.com>
Co-authored-by: Robert Pack <42610831+roeap@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 11, 2023
1 parent eb36ddf commit 6e00c75
Show file tree
Hide file tree
Showing 4 changed files with 1,835 additions and 124 deletions.
32 changes: 32 additions & 0 deletions rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,18 @@ impl Action {
}
}

#[allow(clippy::large_enum_variant)]
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
/// Used to record the operations performed to the Delta Log
pub struct MergePredicate {
/// The type of merge operation performed
pub action_type: String,
/// The predicate used for the merge operation
#[serde(skip_serializing_if = "Option::is_none")]
pub predicate: Option<String>,
}

/// Operation performed when creating a new log entry with one or more actions.
/// This is a key element of the `CommitInfo` action.
#[allow(clippy::large_enum_variant)]
Expand Down Expand Up @@ -750,12 +762,29 @@ pub enum DeltaOperation {
/// The condition the to be deleted data must match
predicate: Option<String>,
},

/// Update data matching predicate from delta table
Update {
/// The update predicate
predicate: Option<String>,
},

/// Merge data with a source data with the following predicate
#[serde(rename_all = "camelCase")]
Merge {
/// The merge predicate
predicate: Option<String>,

/// Match operations performed
matched_predicates: Vec<MergePredicate>,

/// Not Match operations performed
not_matched_predicates: Vec<MergePredicate>,

/// Not Match by Source operations performed
not_matched_by_source_predicates: Vec<MergePredicate>,
},

/// Represents a Delta `StreamingUpdate` operation.
#[serde(rename_all = "camelCase")]
StreamingUpdate {
Expand Down Expand Up @@ -801,6 +830,7 @@ impl DeltaOperation {
DeltaOperation::Write { .. } => "WRITE",
DeltaOperation::Delete { .. } => "DELETE",
DeltaOperation::Update { .. } => "UPDATE",
DeltaOperation::Merge { .. } => "MERGE",
DeltaOperation::StreamingUpdate { .. } => "STREAMING UPDATE",
DeltaOperation::Optimize { .. } => "OPTIMIZE",
DeltaOperation::FileSystemCheck { .. } => "FSCK",
Expand Down Expand Up @@ -846,6 +876,7 @@ impl DeltaOperation {
| Self::StreamingUpdate { .. }
| Self::Write { .. }
| Self::Delete { .. }
| Self::Merge { .. }
| Self::Update { .. }
| Self::Restore { .. } => true,
}
Expand All @@ -868,6 +899,7 @@ impl DeltaOperation {
Self::Write { predicate, .. } => predicate.clone(),
Self::Delete { predicate, .. } => predicate.clone(),
Self::Update { predicate, .. } => predicate.clone(),
Self::Merge { predicate, .. } => predicate.clone(),
_ => None,
}
}
Expand Down
Loading

0 comments on commit 6e00c75

Please sign in to comment.