Skip to content

Commit

Permalink
feat: add column
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Aug 4, 2024
1 parent dada188 commit acf1e35
Show file tree
Hide file tree
Showing 13 changed files with 631 additions and 356 deletions.
227 changes: 221 additions & 6 deletions crates/core/src/kernel/models/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use std::str::FromStr;
// use std::sync::Arc;

// use roaring::RoaringTreemap;
use crate::DeltaConfigKey;
use maplit::hashset;
use serde::{Deserialize, Serialize};
use tracing::warn;
use url::Url;
Expand Down Expand Up @@ -137,30 +139,243 @@ pub struct Protocol {

impl Protocol {
/// Create a new protocol action
pub fn new(min_reader_version: i32, min_wrriter_version: i32) -> Self {
pub fn new(min_reader_version: i32, min_writer_version: i32) -> Self {
Self {
min_reader_version,
min_writer_version: min_wrriter_version,
min_writer_version,
reader_features: None,
writer_features: None,
}
}

/// set the reader features in the protocol action
/// set the reader features in the protocol action, automatically bumps min_reader_version
pub fn with_reader_features(
mut self,
reader_features: impl IntoIterator<Item = impl Into<ReaderFeatures>>,
) -> Self {
self.reader_features = Some(reader_features.into_iter().map(|c| c.into()).collect());
let all_reader_features = reader_features
.into_iter()
.map(Into::into)
.collect::<HashSet<_>>();
if !all_reader_features.is_empty() {
self.min_reader_version = 3
}
self.reader_features = Some(all_reader_features);
self
}

/// set the writer features in the protocol action
/// set the writer features in the protocol action, automatically bumps min_writer_version
pub fn with_writer_features(
mut self,
writer_features: impl IntoIterator<Item = impl Into<WriterFeatures>>,
) -> Self {
self.writer_features = Some(writer_features.into_iter().map(|c| c.into()).collect());
let all_writer_feautures = writer_features
.into_iter()
.map(|c| c.into())
.collect::<HashSet<_>>();
if !all_writer_feautures.is_empty() {
self.min_writer_version = 7
}
self.writer_features = Some(all_writer_feautures);
self
}

/// Converts existing properties into features if the reader_version is >=3 or writer_version >=3
/// only converts features that are "true"
pub fn move_table_properties_into_features(
mut self,
configuration: &HashMap<String, Option<String>>,
) -> Protocol {
if self.min_writer_version >= 7 {
let mut converted_writer_features = configuration
.iter()
.filter(|(_, value)| {
value.as_ref().map_or(false, |v| {
v.to_ascii_lowercase().parse::<bool>().is_ok_and(|v| v)
})
})
.collect::<HashMap<&String, &Option<String>>>()
.keys()
.map(|key| (*key).clone().into())
.filter(|v| !matches!(v, WriterFeatures::Other(_)))
.collect::<HashSet<WriterFeatures>>();

if configuration
.keys()
.any(|v| v.starts_with("delta.constraints."))
{
converted_writer_features.insert(WriterFeatures::CheckConstraints);
}

match self.writer_features {
Some(mut features) => {
features.extend(converted_writer_features);
self.writer_features = Some(features);
}
None => self.writer_features = Some(converted_writer_features),
}
}
if self.min_reader_version > 3 {
let converted_reader_features = configuration
.iter()
.filter(|(_, value)| {
value.as_ref().map_or(false, |v| {
v.to_ascii_lowercase().parse::<bool>().is_ok_and(|v| v)
})
})
.map(|(key, _)| (*key).clone().into())
.filter(|v| !matches!(v, ReaderFeatures::Other(_)))
.collect::<HashSet<ReaderFeatures>>();
match self.reader_features {
Some(mut features) => {
features.extend(converted_reader_features);
self.reader_features = Some(features);
}
None => self.reader_features = Some(converted_reader_features),
}
}
self
}
/// Will apply the properties to the protocol by either bumping the version or setting
/// features
pub fn apply_properties_to_protocol(
mut self,
new_properties: &HashMap<String, String>,
raise_if_not_exists: bool,
) -> DeltaResult<Protocol> {
let mut parsed_properties: HashMap<DeltaConfigKey, String> = HashMap::new();

for (key, value) in new_properties {
if let Ok(parsed_key) = key.parse::<DeltaConfigKey>() {
parsed_properties.insert(parsed_key, value.to_string());
} else if raise_if_not_exists {
return Err(Error::Generic(format!(
"Error parsing property '{}':'{}'",
key, value
)));
}
}

// Check and update delta.minReaderVersion
if let Some(min_reader_version) = parsed_properties.get(&DeltaConfigKey::MinReaderVersion) {
let new_min_reader_version = min_reader_version.parse::<i32>();
match new_min_reader_version {
Ok(version) => match version {
1..=3 => {
if version > self.min_reader_version {
self.min_reader_version = version
}
}
_ => {
return Err(Error::Generic(format!(
"delta.minReaderVersion = '{}' is invalid, valid values are ['1','2','3']",
min_reader_version
)))
}
},
Err(_) => {
return Err(Error::Generic(format!(
"delta.minReaderVersion = '{}' is invalid, valid values are ['1','2','3']",
min_reader_version
)))
}
}
}

// Check and update delta.minWriterVersion
if let Some(min_writer_version) = parsed_properties.get(&DeltaConfigKey::MinWriterVersion) {
let new_min_writer_version = min_writer_version.parse::<i32>();
match new_min_writer_version {
Ok(version) => match version {
2..=7 => {
if version > self.min_writer_version {
self.min_writer_version = version
}
}
_ => {
return Err(Error::Generic(format!(
"delta.minWriterVersion = '{}' is invalid, valid values are ['2','3','4','5','6','7']",
min_writer_version
)))
}
},
Err(_) => {
return Err(Error::Generic(format!(
"delta.minWriterVersion = '{}' is invalid, valid values are ['2','3','4','5','6','7']",
min_writer_version
)))
}
}
}

// Check enableChangeDataFeed and bump protocol or add writerFeature if writer versions is >=7
if let Some(enable_cdf) = parsed_properties.get(&DeltaConfigKey::EnableChangeDataFeed) {
let if_enable_cdf = enable_cdf.to_ascii_lowercase().parse::<bool>();
match if_enable_cdf {
Ok(true) => {
if self.min_writer_version >= 7 {
match self.writer_features {
Some(mut features) => {
features.insert(WriterFeatures::ChangeDataFeed);
self.writer_features = Some(features);
}
None => {
self.writer_features =
Some(hashset! {WriterFeatures::ChangeDataFeed})
}
}
} else if self.min_writer_version <= 3 {
self.min_writer_version = 4
}
}
Ok(false) => {}
_ => {
return Err(Error::Generic(format!(
"delta.enableChangeDataFeed = '{}' is invalid, valid values are ['true']",
enable_cdf
)))
}
}
}

if let Some(enable_dv) = parsed_properties.get(&DeltaConfigKey::EnableDeletionVectors) {
let if_enable_dv = enable_dv.to_ascii_lowercase().parse::<bool>();
match if_enable_dv {
Ok(true) => {
let writer_features = match self.writer_features {
Some(mut features) => {
features.insert(WriterFeatures::DeletionVectors);
features
}
None => hashset! {WriterFeatures::DeletionVectors},
};
let reader_features = match self.reader_features {
Some(mut features) => {
features.insert(ReaderFeatures::DeletionVectors);
features
}
None => hashset! {ReaderFeatures::DeletionVectors},
};
self.min_reader_version = 3;
self.min_writer_version = 7;
self.writer_features = Some(writer_features);
self.reader_features = Some(reader_features);
}
Ok(false) => {}
_ => {
return Err(Error::Generic(format!(
"delta.enableDeletionVectors = '{}' is invalid, valid values are ['true']",
enable_dv
)))
}
}
}
Ok(self)
}
/// Enable timestamp_ntz in the protocol
pub fn enable_timestamp_ntz(mut self) -> Protocol {
self = self.with_reader_features(vec![ReaderFeatures::TimestampWithoutTimezone]);
self = self.with_writer_features(vec![WriterFeatures::TimestampWithoutTimezone]);
self
}
}
Expand Down
114 changes: 114 additions & 0 deletions crates/core/src/operations/add_column.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
//! Add a new column to a table

use delta_kernel::schema::StructType;
use futures::future::BoxFuture;
use itertools::Itertools;

use super::cast::merge_struct;
use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL};

use crate::kernel::StructField;
use crate::logstore::LogStoreRef;
use crate::protocol::DeltaOperation;
use crate::table::state::DeltaTableState;
use crate::{DeltaResult, DeltaTable, DeltaTableError};

/// Add new columns and/or nested fields to a table
pub struct AddColumnBuilder {
/// A snapshot of the table's state
snapshot: DeltaTableState,
/// Fields to add/merge into schema
fields: Option<Vec<StructField>>,
/// Delta object store for handling data files
log_store: LogStoreRef,
/// Additional information to add to the commit
commit_properties: CommitProperties,
}

impl super::Operation<()> for AddColumnBuilder {}

impl AddColumnBuilder {
/// Create a new builder
pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self {
Self {
snapshot,
log_store,
fields: None,
commit_properties: CommitProperties::default(),
}
}

/// Specify the fields to be added
pub fn with_fields(mut self, fields: impl IntoIterator<Item = StructField> + Clone) -> Self {
self.fields = Some(fields.into_iter().collect());
self
}
/// Additional metadata to be added to commit info
pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
self.commit_properties = commit_properties;
self
}
}

impl std::future::IntoFuture for AddColumnBuilder {
type Output = DeltaResult<DeltaTable>;

type IntoFuture = BoxFuture<'static, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
let this = self;

Box::pin(async move {
let mut metadata = this.snapshot.metadata().clone();
let fields = match this.fields {
Some(v) => v,
None => return Err(DeltaTableError::Generic("No fields provided".to_string())),
};

let fields_right = &StructType::new(fields.clone());
let table_schema = this.snapshot.schema();
let new_table_schema = merge_struct(table_schema, fields_right)?;

// TODO(ion): Think of a way how we can simply this checking through the API or centralize some checks.
let contains_timestampntz = PROTOCOL.contains_timestampntz(fields.iter());
let protocol = this.snapshot.protocol();

let maybe_new_protocol = if contains_timestampntz {
let updated_protocol = protocol.clone().enable_timestamp_ntz();
if !(protocol.min_reader_version == 3 && protocol.min_writer_version == 7) {
// Convert existing properties to features since we advanced the protocol to v3,7
Some(
updated_protocol
.move_table_properties_into_features(&metadata.configuration),
)
} else {
Some(updated_protocol)
}
} else {
None
};

let operation = DeltaOperation::AddColumn {
fields: fields.into_iter().collect_vec(),
};

metadata.schema_string = serde_json::to_string(&new_table_schema)?;

let mut actions = vec![metadata.into()];

if let Some(new_protocol) = maybe_new_protocol {
actions.push(new_protocol.into())
}

let commit = CommitBuilder::from(this.commit_properties)
.with_actions(actions)
.build(Some(&this.snapshot), this.log_store.clone(), operation)
.await?;

Ok(DeltaTable::new_with_state(
this.log_store,
commit.snapshot(),
))
})
}
}
Loading

0 comments on commit acf1e35

Please sign in to comment.