Skip to content

Commit

Permalink
Merge branch 'main' into fix_stats_not_always_present
Browse files Browse the repository at this point in the history
  • Loading branch information
yjshen authored May 6, 2024
2 parents 4b1e831 + e25aed7 commit 527328f
Show file tree
Hide file tree
Showing 15 changed files with 319 additions and 28 deletions.
2 changes: 1 addition & 1 deletion crates/aws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-aws"
version = "0.1.1"
version = "0.1.2"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand Down
31 changes: 29 additions & 2 deletions crates/aws/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use aws_credential_types::provider::error::CredentialsError;
use aws_sdk_dynamodb::{
error::SdkError,
operation::{
create_table::CreateTableError, get_item::GetItemError, put_item::PutItemError,
query::QueryError, update_item::UpdateItemError,
create_table::CreateTableError, delete_item::DeleteItemError, get_item::GetItemError,
put_item::PutItemError, query::QueryError, update_item::UpdateItemError,
},
};
use aws_smithy_runtime_api::client::result::ServiceError;
Expand Down Expand Up @@ -89,6 +89,9 @@ pub enum LockClientError {
to opt out of support for concurrent writers."
)]
LockClientRequired,

#[error("Log entry for table '{table_path}' and version '{version}' is already complete")]
VersionAlreadyCompleted { table_path: String, version: i64 },
}

impl From<GetItemError> for LockClientError {
Expand Down Expand Up @@ -164,7 +167,31 @@ impl From<UpdateItemError> for LockClientError {
}
}

impl From<DeleteItemError> for LockClientError {
fn from(err: DeleteItemError) -> Self {
match err {
DeleteItemError::ConditionalCheckFailedException(_) => {
unreachable!("error must be handled explicitly")
}
DeleteItemError::InternalServerError(_) => err.into(),
DeleteItemError::ProvisionedThroughputExceededException(_) => {
LockClientError::ProvisionedThroughputExceeded
}
DeleteItemError::RequestLimitExceeded(_) => {
LockClientError::ProvisionedThroughputExceeded
}
DeleteItemError::ResourceNotFoundException(_) => LockClientError::LockTableNotFound,
DeleteItemError::ItemCollectionSizeLimitExceededException(_) => err.into(),
DeleteItemError::TransactionConflictException(_) => err.into(),
_ => LockClientError::GenericDynamoDb {
source: Box::new(err),
},
}
}
}

impl_from_service_error!(GetItemError);
impl_from_service_error!(PutItemError);
impl_from_service_error!(QueryError);
impl_from_service_error!(UpdateItemError);
impl_from_service_error!(DeleteItemError);
45 changes: 43 additions & 2 deletions crates/aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ pub mod storage;
use aws_config::SdkConfig;
use aws_sdk_dynamodb::{
operation::{
create_table::CreateTableError, get_item::GetItemError, put_item::PutItemError,
query::QueryError, update_item::UpdateItemError,
create_table::CreateTableError, delete_item::DeleteItemError, get_item::GetItemError,
put_item::PutItemError, query::QueryError, update_item::UpdateItemError,
},
types::{
AttributeDefinition, AttributeValue, BillingMode, KeySchemaElement, KeyType,
Expand Down Expand Up @@ -416,6 +416,43 @@ impl DynamoDbLockClient {
.await
}

/// Delete existing log entry if it is not already complete
pub async fn delete_commit_entry(
&self,
version: i64,
table_path: &str,
) -> Result<(), LockClientError> {
self.retry(|| async {
match self
.dynamodb_client
.delete_item()
.table_name(self.get_lock_table_name())
.set_key(Some(self.get_primary_key(version, table_path)))
.set_expression_attribute_values(Some(maplit::hashmap! {
":f".into() => string_attr("false"),
}))
.condition_expression(constants::CONDITION_DELETE_INCOMPLETE.as_str())
.send()
.await
{
Ok(_) => Ok(()),
Err(err) => match err.as_service_error() {
Some(DeleteItemError::ProvisionedThroughputExceededException(_)) => Err(
backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded),
),
Some(DeleteItemError::ConditionalCheckFailedException(_)) => Err(
backoff::Error::permanent(LockClientError::VersionAlreadyCompleted {
table_path: table_path.to_owned(),
version,
}),
),
_ => Err(backoff::Error::permanent(err.into())),
},
}
})
.await
}

async fn retry<I, E, Fn, Fut>(&self, operation: Fn) -> Result<I, E>
where
Fn: FnMut() -> Fut,
Expand Down Expand Up @@ -553,6 +590,10 @@ pub mod constants {
pub static ref CONDITION_EXPR_CREATE: String = format!(
"attribute_not_exists({ATTR_TABLE_PATH}) and attribute_not_exists({ATTR_FILE_NAME})"
);

pub static ref CONDITION_DELETE_INCOMPLETE: String = format!(
"(complete = :f) or (attribute_not_exists({ATTR_TABLE_PATH}) and attribute_not_exists({ATTR_FILE_NAME}))"
);
}

pub const CONDITION_UPDATE_INCOMPLETE: &str = "complete = :f";
Expand Down
30 changes: 30 additions & 0 deletions crates/aws/src/logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,36 @@ impl LogStore for S3DynamoDbLogStore {
Ok(())
}

/// Tries to abort an entry by first deleting the commit log entry, then deleting the temp commit file
async fn abort_commit_entry(
&self,
version: i64,
tmp_commit: &Path,
) -> Result<(), TransactionError> {
self.lock_client
.delete_commit_entry(version, &self.table_path)
.await
.map_err(|err| match err {
LockClientError::ProvisionedThroughputExceeded => todo!(
"deltalake-aws does not yet handle DynamoDB provisioned throughput errors"
),
LockClientError::VersionAlreadyCompleted { version, .. } => {
error!("Trying to abort a completed commit");
TransactionError::LogStoreError {
msg: format!("trying to abort a completed log entry: {}", version),
source: Box::new(err),
}
}
err => TransactionError::LogStoreError {
msg: "dynamodb client failed to delete log entry".to_owned(),
source: Box::new(err),
},
})?;

abort_commit_entry(&self.storage, version, tmp_commit).await?;
Ok(())
}

async fn get_latest_version(&self, current_version: i64) -> DeltaResult<i64> {
debug!("Retrieving latest version of {self:?} at v{current_version}");
let entry = self
Expand Down
2 changes: 1 addition & 1 deletion crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
.0
.contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref())
{
Ok((Arc::from(store), prefix))
Ok((store, prefix))
} else {
let s3_options = S3StorageOptions::from_map(&options.0)?;

Expand Down
76 changes: 75 additions & 1 deletion crates/aws/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use deltalake_core::protocol::{DeltaOperation, SaveMode};
use deltalake_core::storage::commit_uri_from_version;
use deltalake_core::storage::StorageOptions;
use deltalake_core::table::builder::ensure_table_uri;
use deltalake_core::{DeltaOps, DeltaTable, DeltaTableBuilder};
use deltalake_core::{DeltaOps, DeltaTable, DeltaTableBuilder, ObjectStoreError};
use deltalake_test::utils::*;
use lazy_static::lazy_static;
use object_store::path::Path;
Expand Down Expand Up @@ -182,6 +182,80 @@ async fn test_repair_on_load() -> TestResult<()> {
Ok(())
}

#[tokio::test]
#[serial]
async fn test_abort_commit_entry() -> TestResult<()> {
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;
let client = make_client()?;
let table = prepare_table(&context, "abort_entry").await?;
let options: StorageOptions = OPTIONS.clone().into();
let log_store: S3DynamoDbLogStore = S3DynamoDbLogStore::try_new(
ensure_table_uri(table.table_uri())?,
options.clone(),
&S3_OPTIONS,
std::sync::Arc::new(table.object_store()),
)?;

let entry = create_incomplete_commit_entry(&table, 1, "unfinished_commit").await?;

log_store
.abort_commit_entry(entry.version, &entry.temp_path)
.await?;

// The entry should have been aborted - the latest entry should be one version lower
if let Some(new_entry) = client.get_latest_entry(&table.table_uri()).await? {
assert_eq!(entry.version - 1, new_entry.version);
}
// Temp commit file should have been deleted
assert!(matches!(
log_store.object_store().get(&entry.temp_path).await,
Err(ObjectStoreError::NotFound { .. })
));

// Test abort commit is idempotent - still works if already aborted
log_store
.abort_commit_entry(entry.version, &entry.temp_path)
.await?;

Ok(())
}

#[tokio::test]
#[serial]
async fn test_abort_commit_entry_fail_to_delete_entry() -> TestResult<()> {
// Test abort commit does not delete the temp commit if the DynamoDB entry is not deleted
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;
let client = make_client()?;
let table = prepare_table(&context, "abort_entry_fail").await?;
let options: StorageOptions = OPTIONS.clone().into();
let log_store: S3DynamoDbLogStore = S3DynamoDbLogStore::try_new(
ensure_table_uri(table.table_uri())?,
options.clone(),
&S3_OPTIONS,
std::sync::Arc::new(table.object_store()),
)?;

let entry = create_incomplete_commit_entry(&table, 1, "finished_commit").await?;

// Mark entry as complete
client
.update_commit_entry(entry.version, &table.table_uri())
.await?;

// Abort will fail since we marked the entry as complete
assert!(matches!(
log_store
.abort_commit_entry(entry.version, &entry.temp_path)
.await,
Err(_),
));

// Check temp commit file still exists
assert!(log_store.object_store().get(&entry.temp_path).await.is_ok());

Ok(())
}

const WORKERS: i64 = 3;
const COMMITS: i64 = 15;

Expand Down
20 changes: 14 additions & 6 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,13 @@ impl Snapshot {
}

/// Get the statistics schema of the snapshot
pub fn stats_schema(&self) -> DeltaResult<StructType> {
pub fn stats_schema(&self, table_schema: Option<&StructType>) -> DeltaResult<StructType> {
let schema = table_schema.unwrap_or_else(|| self.schema());

let stats_fields = if let Some(stats_cols) = self.table_config().stats_columns() {
stats_cols
.iter()
.map(|col| match self.schema().field_with_name(col) {
.map(|col| match schema.field_with_name(col) {
Ok(field) => match field.data_type() {
DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => {
Err(DeltaTableError::Generic(format!(
Expand All @@ -314,7 +316,7 @@ impl Snapshot {
.collect::<Result<Vec<_>, _>>()?
} else {
let num_indexed_cols = self.table_config().num_indexed_cols();
self.schema()
schema
.fields
.iter()
.enumerate()
Expand Down Expand Up @@ -362,7 +364,7 @@ impl EagerSnapshot {
let mut files = Vec::new();
let mut scanner = LogReplayScanner::new();
files.push(scanner.process_files_batch(&batch, true)?);
let mapper = LogMapper::try_new(&snapshot)?;
let mapper = LogMapper::try_new(&snapshot, None)?;
files = files
.into_iter()
.map(|b| mapper.map_batch(b))
Expand Down Expand Up @@ -401,7 +403,7 @@ impl EagerSnapshot {
)
.boxed()
};
let mapper = LogMapper::try_new(&self.snapshot)?;
let mapper = LogMapper::try_new(&self.snapshot, None)?;
let files = ReplayStream::try_new(log_stream, checkpoint_stream, &self.snapshot)?
.map(|batch| batch.and_then(|b| mapper.map_batch(b)))
.try_collect()
Expand Down Expand Up @@ -517,7 +519,13 @@ impl EagerSnapshot {
files.push(scanner.process_files_batch(&batch?, true)?);
}

let mapper = LogMapper::try_new(&self.snapshot)?;
let mapper = if let Some(metadata) = &metadata {
let new_schema: StructType = serde_json::from_str(&metadata.schema_string)?;
LogMapper::try_new(&self.snapshot, Some(&new_schema))?
} else {
LogMapper::try_new(&self.snapshot, None)?
};

self.files = files
.into_iter()
.chain(
Expand Down
10 changes: 7 additions & 3 deletions crates/core/src/kernel/snapshot/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use tracing::debug;

use crate::kernel::arrow::extract::{self as ex, ProvidesColumnByName};
use crate::kernel::arrow::json;
use crate::kernel::StructType;
use crate::{DeltaResult, DeltaTableConfig, DeltaTableError};

use super::Snapshot;
Expand All @@ -41,7 +42,7 @@ pin_project! {

impl<S> ReplayStream<S> {
pub(super) fn try_new(commits: S, checkpoint: S, snapshot: &Snapshot) -> DeltaResult<Self> {
let stats_schema = Arc::new((&snapshot.stats_schema()?).try_into()?);
let stats_schema = Arc::new((&snapshot.stats_schema(None)?).try_into()?);
let mapper = Arc::new(LogMapper {
stats_schema,
config: snapshot.config.clone(),
Expand All @@ -61,9 +62,12 @@ pub(super) struct LogMapper {
}

impl LogMapper {
pub(super) fn try_new(snapshot: &Snapshot) -> DeltaResult<Self> {
pub(super) fn try_new(
snapshot: &Snapshot,
table_schema: Option<&StructType>,
) -> DeltaResult<Self> {
Ok(Self {
stats_schema: Arc::new((&snapshot.stats_schema()?).try_into()?),
stats_schema: Arc::new((&snapshot.stats_schema(table_schema)?).try_into()?),
config: snapshot.config.clone(),
})
}
Expand Down
8 changes: 8 additions & 0 deletions crates/core/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ impl LogStore for DefaultLogStore {
super::write_commit_entry(self.storage.as_ref(), version, tmp_commit).await
}

async fn abort_commit_entry(
&self,
version: i64,
tmp_commit: &Path,
) -> Result<(), TransactionError> {
super::abort_commit_entry(self.storage.as_ref(), version, tmp_commit).await
}

async fn get_latest_version(&self, current_version: i64) -> DeltaResult<i64> {
super::get_latest_version(self, current_version).await
}
Expand Down
Loading

0 comments on commit 527328f

Please sign in to comment.