Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: merge operation #1522

Merged
merged 24 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,18 @@ impl Action {
}
}

#[allow(clippy::large_enum_variant)]
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
/// Used to record the operations performed to the Delta Log
pub struct MergePredicate {
/// The type of merge operation performed
pub action_type: String,
/// The predicate used for the merge operation
#[serde(skip_serializing_if = "Option::is_none")]
pub predicate: Option<String>,
}

/// Operation performed when creating a new log entry with one or more actions.
/// This is a key element of the `CommitInfo` action.
#[allow(clippy::large_enum_variant)]
Expand Down Expand Up @@ -701,12 +713,29 @@ pub enum DeltaOperation {
/// The condition the to be deleted data must match
predicate: Option<String>,
},

/// Update data matching predicate from delta table
Update {
/// The update predicate
predicate: Option<String>,
},

/// Merge data with a source data with the following predicate
#[serde(rename_all = "camelCase")]
Merge {
/// The merge predicate
predicate: Option<String>,

/// Match operations performed
matched_predicates: Vec<MergePredicate>,

/// Not Match operations performed
not_matched_predicates: Vec<MergePredicate>,

/// Not Match by Source operations performed
not_matched_by_source_predicates: Vec<MergePredicate>,
},

/// Represents a Delta `StreamingUpdate` operation.
#[serde(rename_all = "camelCase")]
StreamingUpdate {
Expand Down Expand Up @@ -752,6 +781,7 @@ impl DeltaOperation {
DeltaOperation::Write { .. } => "WRITE",
DeltaOperation::Delete { .. } => "DELETE",
DeltaOperation::Update { .. } => "UPDATE",
DeltaOperation::Merge { .. } => "MERGE",
DeltaOperation::StreamingUpdate { .. } => "STREAMING UPDATE",
DeltaOperation::Optimize { .. } => "OPTIMIZE",
DeltaOperation::FileSystemCheck { .. } => "FSCK",
Expand Down Expand Up @@ -797,6 +827,7 @@ impl DeltaOperation {
| Self::StreamingUpdate { .. }
| Self::Write { .. }
| Self::Delete { .. }
| Self::Merge { .. }
| Self::Update { .. }
| Self::Restore { .. } => true,
}
Expand All @@ -819,6 +850,7 @@ impl DeltaOperation {
Self::Write { predicate, .. } => predicate.clone(),
Self::Delete { predicate, .. } => predicate.clone(),
Self::Update { predicate, .. } => predicate.clone(),
Self::Merge { predicate, .. } => predicate.clone(),
_ => None,
}
}
Expand Down
Loading
Loading