Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python, rust): add column operation #2562

Merged
merged 1 commit into from
Aug 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 221 additions & 6 deletions crates/core/src/kernel/models/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use std::str::FromStr;
// use std::sync::Arc;

// use roaring::RoaringTreemap;
use crate::DeltaConfigKey;
use maplit::hashset;
use serde::{Deserialize, Serialize};
use tracing::warn;
use url::Url;
Expand Down Expand Up @@ -137,30 +139,243 @@ pub struct Protocol {

impl Protocol {
/// Create a new protocol action
pub fn new(min_reader_version: i32, min_wrriter_version: i32) -> Self {
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
pub fn new(min_reader_version: i32, min_writer_version: i32) -> Self {
Self {
min_reader_version,
min_writer_version: min_wrriter_version,
min_writer_version,
reader_features: None,
writer_features: None,
}
}

/// set the reader features in the protocol action
/// set the reader features in the protocol action, automatically bumps min_reader_version
pub fn with_reader_features(
mut self,
reader_features: impl IntoIterator<Item = impl Into<ReaderFeatures>>,
) -> Self {
self.reader_features = Some(reader_features.into_iter().map(|c| c.into()).collect());
let all_reader_features = reader_features
.into_iter()
.map(Into::into)
.collect::<HashSet<_>>();
if !all_reader_features.is_empty() {
self.min_reader_version = 3
}
self.reader_features = Some(all_reader_features);
self
}

/// set the writer features in the protocol action
/// set the writer features in the protocol action, automatically bumps min_writer_version
pub fn with_writer_features(
mut self,
writer_features: impl IntoIterator<Item = impl Into<WriterFeatures>>,
) -> Self {
self.writer_features = Some(writer_features.into_iter().map(|c| c.into()).collect());
let all_writer_feautures = writer_features
.into_iter()
.map(|c| c.into())
.collect::<HashSet<_>>();
if !all_writer_feautures.is_empty() {
self.min_writer_version = 7
}
self.writer_features = Some(all_writer_feautures);
self
}

/// Converts existing properties into features if the reader_version is >=3 or writer_version >=3
/// only converts features that are "true"
pub fn move_table_properties_into_features(
mut self,
configuration: &HashMap<String, Option<String>>,
) -> Protocol {
if self.min_writer_version >= 7 {
let mut converted_writer_features = configuration
.iter()
.filter(|(_, value)| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just understanding here, so this unwraps the value of the key, if it's already none, return false. If it's Some, parse it into a bool which you check is both ok and actually true, but then collect the hashmap back into a string?

Why not at this point just keep it bools because you just drop the values below anyways. Or better yet collect::<HashMap<_, _>>>() ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I need to first filter the hashmap for enabled configurations, to then try parse each key as a feature

value.as_ref().map_or(false, |v| {
v.to_ascii_lowercase().parse::<bool>().is_ok_and(|v| v)
})
})
.collect::<HashMap<&String, &Option<String>>>()
.keys()
.map(|key| (*key).clone().into())
.filter(|v| !matches!(v, WriterFeatures::Other(_)))
.collect::<HashSet<WriterFeatures>>();

if configuration
.keys()
.any(|v| v.starts_with("delta.constraints."))
{
converted_writer_features.insert(WriterFeatures::CheckConstraints);
}

match self.writer_features {
Some(mut features) => {
features.extend(converted_writer_features);
self.writer_features = Some(features);
}
None => self.writer_features = Some(converted_writer_features),
}
}
if self.min_reader_version > 3 {
let converted_reader_features = configuration
.iter()
.filter(|(_, value)| {
value.as_ref().map_or(false, |v| {
v.to_ascii_lowercase().parse::<bool>().is_ok_and(|v| v)
})
})
.map(|(key, _)| (*key).clone().into())
.filter(|v| !matches!(v, ReaderFeatures::Other(_)))
.collect::<HashSet<ReaderFeatures>>();
match self.reader_features {
Some(mut features) => {
features.extend(converted_reader_features);
self.reader_features = Some(features);
}
None => self.reader_features = Some(converted_reader_features),
}
}
self
}
/// Will apply the properties to the protocol by either bumping the version or setting
/// features
pub fn apply_properties_to_protocol(
mut self,
new_properties: &HashMap<String, String>,
raise_if_not_exists: bool,
) -> DeltaResult<Protocol> {
let mut parsed_properties: HashMap<DeltaConfigKey, String> = HashMap::new();

for (key, value) in new_properties {
if let Ok(parsed_key) = key.parse::<DeltaConfigKey>() {
parsed_properties.insert(parsed_key, value.to_string());
} else if raise_if_not_exists {
return Err(Error::Generic(format!(
"Error parsing property '{}':'{}'",
key, value
)));
}
}

// Check and update delta.minReaderVersion
if let Some(min_reader_version) = parsed_properties.get(&DeltaConfigKey::MinReaderVersion) {
let new_min_reader_version = min_reader_version.parse::<i32>();
match new_min_reader_version {
Ok(version) => match version {
1..=3 => {
if version > self.min_reader_version {
self.min_reader_version = version
}
}
_ => {
return Err(Error::Generic(format!(
"delta.minReaderVersion = '{}' is invalid, valid values are ['1','2','3']",
min_reader_version
)))
}
},
Err(_) => {
return Err(Error::Generic(format!(
"delta.minReaderVersion = '{}' is invalid, valid values are ['1','2','3']",
min_reader_version
)))
}
}
}

// Check and update delta.minWriterVersion
if let Some(min_writer_version) = parsed_properties.get(&DeltaConfigKey::MinWriterVersion) {
let new_min_writer_version = min_writer_version.parse::<i32>();
match new_min_writer_version {
Ok(version) => match version {
2..=7 => {
if version > self.min_writer_version {
self.min_writer_version = version
}
}
_ => {
return Err(Error::Generic(format!(
"delta.minWriterVersion = '{}' is invalid, valid values are ['2','3','4','5','6','7']",
min_writer_version
)))
}
},
Err(_) => {
return Err(Error::Generic(format!(
"delta.minWriterVersion = '{}' is invalid, valid values are ['2','3','4','5','6','7']",
min_writer_version
)))
}
}
}

// Check enableChangeDataFeed and bump protocol or add writerFeature if writer versions is >=7
if let Some(enable_cdf) = parsed_properties.get(&DeltaConfigKey::EnableChangeDataFeed) {
let if_enable_cdf = enable_cdf.to_ascii_lowercase().parse::<bool>();
match if_enable_cdf {
Ok(true) => {
if self.min_writer_version >= 7 {
match self.writer_features {
Some(mut features) => {
features.insert(WriterFeatures::ChangeDataFeed);
self.writer_features = Some(features);
}
None => {
self.writer_features =
Some(hashset! {WriterFeatures::ChangeDataFeed})
}
}
} else if self.min_writer_version <= 3 {
self.min_writer_version = 4
}
}
Ok(false) => {}
_ => {
return Err(Error::Generic(format!(
"delta.enableChangeDataFeed = '{}' is invalid, valid values are ['true']",
enable_cdf
)))
}
}
}

if let Some(enable_dv) = parsed_properties.get(&DeltaConfigKey::EnableDeletionVectors) {
let if_enable_dv = enable_dv.to_ascii_lowercase().parse::<bool>();
match if_enable_dv {
Ok(true) => {
let writer_features = match self.writer_features {
Some(mut features) => {
features.insert(WriterFeatures::DeletionVectors);
features
}
None => hashset! {WriterFeatures::DeletionVectors},
};
let reader_features = match self.reader_features {
Some(mut features) => {
features.insert(ReaderFeatures::DeletionVectors);
features
}
None => hashset! {ReaderFeatures::DeletionVectors},
};
self.min_reader_version = 3;
self.min_writer_version = 7;
self.writer_features = Some(writer_features);
self.reader_features = Some(reader_features);
}
Ok(false) => {}
_ => {
return Err(Error::Generic(format!(
"delta.enableDeletionVectors = '{}' is invalid, valid values are ['true']",
enable_dv
)))
}
}
}
Ok(self)
}
/// Enable timestamp_ntz in the protocol
pub fn enable_timestamp_ntz(mut self) -> Protocol {
self = self.with_reader_features(vec![ReaderFeatures::TimestampWithoutTimezone]);
self = self.with_writer_features(vec![WriterFeatures::TimestampWithoutTimezone]);
self
}
}
Expand Down
114 changes: 114 additions & 0 deletions crates/core/src/operations/add_column.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
//! Add a new column to a table

use delta_kernel::schema::StructType;
use futures::future::BoxFuture;
use itertools::Itertools;

use super::cast::merge_struct;
use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL};

use crate::kernel::StructField;
use crate::logstore::LogStoreRef;
use crate::protocol::DeltaOperation;
use crate::table::state::DeltaTableState;
use crate::{DeltaResult, DeltaTable, DeltaTableError};

/// Add new columns and/or nested fields to a table
pub struct AddColumnBuilder {
/// A snapshot of the table's state
snapshot: DeltaTableState,
/// Fields to add/merge into schema
fields: Option<Vec<StructField>>,
/// Delta object store for handling data files
log_store: LogStoreRef,
/// Additional information to add to the commit
commit_properties: CommitProperties,
}

impl super::Operation<()> for AddColumnBuilder {}
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved

impl AddColumnBuilder {
/// Create a new builder
pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self {
Self {
snapshot,
log_store,
fields: None,
commit_properties: CommitProperties::default(),
}
}

/// Specify the fields to be added
pub fn with_fields(mut self, fields: impl IntoIterator<Item = StructField> + Clone) -> Self {
self.fields = Some(fields.into_iter().collect());
self
}
/// Additional metadata to be added to commit info
pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
self.commit_properties = commit_properties;
self
}
}

impl std::future::IntoFuture for AddColumnBuilder {
type Output = DeltaResult<DeltaTable>;

type IntoFuture = BoxFuture<'static, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
let this = self;

Box::pin(async move {
let mut metadata = this.snapshot.metadata().clone();
let fields = match this.fields {
Some(v) => v,
None => return Err(DeltaTableError::Generic("No fields provided".to_string())),
};

let fields_right = &StructType::new(fields.clone());
let table_schema = this.snapshot.schema();
let new_table_schema = merge_struct(table_schema, fields_right)?;

// TODO(ion): Think of a way how we can simply this checking through the API or centralize some checks.
let contains_timestampntz = PROTOCOL.contains_timestampntz(fields.iter());
let protocol = this.snapshot.protocol();

let maybe_new_protocol = if contains_timestampntz {
let updated_protocol = protocol.clone().enable_timestamp_ntz();
if !(protocol.min_reader_version == 3 && protocol.min_writer_version == 7) {
// Convert existing properties to features since we advanced the protocol to v3,7
Some(
updated_protocol
.move_table_properties_into_features(&metadata.configuration),
)
} else {
Some(updated_protocol)
}
} else {
None
};

let operation = DeltaOperation::AddColumn {
fields: fields.into_iter().collect_vec(),
};

metadata.schema_string = serde_json::to_string(&new_table_schema)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking out loud here, not feedback on this pull request, but implementing TryInto for StructType would probably be handle for us


let mut actions = vec![metadata.into()];

if let Some(new_protocol) = maybe_new_protocol {
actions.push(new_protocol.into())
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
}

let commit = CommitBuilder::from(this.commit_properties)
.with_actions(actions)
.build(Some(&this.snapshot), this.log_store.clone(), operation)
.await?;

Ok(DeltaTable::new_with_state(
this.log_store,
commit.snapshot(),
))
})
}
}
Loading
Loading