-
Notifications
You must be signed in to change notification settings - Fork 443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement filesystem check #1103
Changes from all commits
5d32c56
0747602
e944633
18e7974
e90ba53
d42eb78
48a460a
85886a5
3dc4b67
7cbe4fa
558e890
46841cc
9d19ef3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,221 @@ | ||
//! Audit the Delta Table for active files that do not exist in the underlying filesystem and remove them. | ||
//! | ||
//! Active files are ones that have an add action in the log, but no corresponding remove action. | ||
//! This operation creates a new transaction containing a remove action for each of the missing files. | ||
//! | ||
//! This can be used to repair tables where a data file has been deleted accidentally or | ||
//! purposefully, if the file was corrupted. | ||
//! | ||
//! # Example | ||
//! ```rust ignore | ||
//! let mut table = open_table("../path/to/table")?; | ||
//! let (table, metrics) = FileSystemCheckBuilder::new(table.object_store(), table.state).await?; | ||
//! ```` | ||
use crate::action::{Action, Add, DeltaOperation, Remove}; | ||
use crate::operations::transaction::commit; | ||
use crate::storage::DeltaObjectStore; | ||
use crate::table_state::DeltaTableState; | ||
use crate::DeltaDataTypeVersion; | ||
use crate::{DeltaDataTypeLong, DeltaResult, DeltaTable, DeltaTableError}; | ||
use futures::future::BoxFuture; | ||
use futures::StreamExt; | ||
pub use object_store::path::Path; | ||
use object_store::ObjectStore; | ||
use std::collections::HashMap; | ||
use std::fmt::Debug; | ||
use std::sync::Arc; | ||
use std::time::SystemTime; | ||
use std::time::UNIX_EPOCH; | ||
use url::{ParseError, Url}; | ||
|
||
/// Audit the Delta Table's active files with the underlying file system. | ||
/// See this module's documentaiton for more information | ||
#[derive(Debug)] | ||
pub struct FileSystemCheckBuilder { | ||
/// A snapshot of the to-be-checked table's state | ||
state: DeltaTableState, | ||
/// Delta object store for handling data files | ||
store: Arc<DeltaObjectStore>, | ||
/// Don't remove actions to the table log. Just determine which files can be removed | ||
dry_run: bool, | ||
} | ||
|
||
/// Details of the FSCK operation including which files were removed from the log | ||
#[derive(Debug)] | ||
pub struct FileSystemCheckMetrics { | ||
/// Was this a dry run | ||
pub dry_run: bool, | ||
/// Files that wrere removed successfully | ||
pub files_removed: Vec<String>, | ||
} | ||
|
||
struct FileSystemCheckPlan { | ||
/// Version of the snapshot provided | ||
version: DeltaDataTypeVersion, | ||
/// Delta object store for handling data files | ||
store: Arc<DeltaObjectStore>, | ||
/// Files that no longer exists in undlying ObjectStore but have active add actions | ||
pub files_to_remove: Vec<Add>, | ||
} | ||
|
||
fn is_absolute_path(path: &str) -> DeltaResult<bool> { | ||
match Url::parse(path) { | ||
Ok(_) => Ok(true), | ||
Err(ParseError::RelativeUrlWithoutBase) => Ok(false), | ||
Err(_) => Err(DeltaTableError::Generic(format!( | ||
"Unable to parse path: {}", | ||
&path | ||
))), | ||
} | ||
} | ||
|
||
impl FileSystemCheckBuilder { | ||
/// Create a new [`FileSystemCheckBuilder`] | ||
pub fn new(store: Arc<DeltaObjectStore>, state: DeltaTableState) -> Self { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we validate that the provided state is the latest version of the table? Since I don't think this is a valid operation for earlier states? or maybe that is an overall issue with the operations module? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think dry run of this operation would be helpful for diagnosing issues that arise from a combination of vacuum + time travel. If executed on a older version I do expect it to fail during the commit operation. |
||
FileSystemCheckBuilder { | ||
state, | ||
store, | ||
dry_run: false, | ||
} | ||
} | ||
|
||
/// Only determine which add actions should be removed. A dry run will not commit actions to the Delta log | ||
pub fn with_dry_run(mut self, dry_run: bool) -> Self { | ||
self.dry_run = dry_run; | ||
self | ||
} | ||
|
||
async fn create_fsck_plan(&self) -> DeltaResult<FileSystemCheckPlan> { | ||
let mut files_relative: HashMap<&str, &Add> = | ||
HashMap::with_capacity(self.state.files().len()); | ||
let version = self.state.version(); | ||
let store = self.store.clone(); | ||
|
||
for active in self.state.files() { | ||
if is_absolute_path(&active.path)? { | ||
return Err(DeltaTableError::Generic( | ||
"Filesystem check does not support absolute paths".to_string(), | ||
)); | ||
} else { | ||
files_relative.insert(&active.path, active); | ||
} | ||
} | ||
|
||
let mut files = self.store.list(None).await?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Delta Tables are allowed to have data files outside the table root. Currently, if that is the case, this operation will remove them all from the log. I see two options:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense. Do any examples of tables that use absolute paths? It states the following
So I can segment paths based of it the scheme is set on the path and then use the correct api call for each. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh that's a good point! Checking for the scheme at the beginning makes sense. We don't have any example tables right now. We don't generally support them in most operations, but we should try to keep them in mind. Hard to have example tables without setting up a public S3 bucket or something. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not comfortable with supporting absolute paths without having some integration tests. Currently the operation will fail when an absolute path is encountered. |
||
while let Some(result) = files.next().await { | ||
let file = result?; | ||
files_relative.remove(file.location.as_ref()); | ||
|
||
if files_relative.is_empty() { | ||
break; | ||
} | ||
} | ||
|
||
let files_to_remove: Vec<Add> = files_relative | ||
.into_values() | ||
.map(|file| file.to_owned()) | ||
.collect(); | ||
|
||
Ok(FileSystemCheckPlan { | ||
files_to_remove, | ||
version, | ||
store, | ||
}) | ||
} | ||
} | ||
|
||
impl FileSystemCheckPlan { | ||
pub async fn execute(self) -> DeltaResult<FileSystemCheckMetrics> { | ||
if self.files_to_remove.is_empty() { | ||
return Ok(FileSystemCheckMetrics { | ||
dry_run: false, | ||
files_removed: Vec::new(), | ||
}); | ||
} | ||
|
||
let mut actions = Vec::with_capacity(self.files_to_remove.len()); | ||
let mut removed_file_paths = Vec::with_capacity(self.files_to_remove.len()); | ||
let version = self.version; | ||
let store = &self.store; | ||
|
||
for file in self.files_to_remove { | ||
let deletion_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); | ||
let deletion_time = deletion_time.as_millis() as DeltaDataTypeLong; | ||
removed_file_paths.push(file.path.clone()); | ||
actions.push(Action::remove(Remove { | ||
path: file.path, | ||
deletion_timestamp: Some(deletion_time), | ||
data_change: true, | ||
extended_file_metadata: None, | ||
partition_values: Some(file.partition_values), | ||
size: Some(file.size), | ||
tags: file.tags, | ||
})); | ||
} | ||
|
||
commit( | ||
store, | ||
version + 1, | ||
actions, | ||
DeltaOperation::FileSystemCheck {}, | ||
None, | ||
) | ||
.await?; | ||
|
||
Ok(FileSystemCheckMetrics { | ||
dry_run: false, | ||
files_removed: removed_file_paths, | ||
}) | ||
} | ||
} | ||
|
||
impl std::future::IntoFuture for FileSystemCheckBuilder { | ||
type Output = DeltaResult<(DeltaTable, FileSystemCheckMetrics)>; | ||
type IntoFuture = BoxFuture<'static, Self::Output>; | ||
|
||
fn into_future(self) -> Self::IntoFuture { | ||
let this = self; | ||
|
||
Box::pin(async move { | ||
let plan = this.create_fsck_plan().await?; | ||
if this.dry_run { | ||
return Ok(( | ||
DeltaTable::new_with_state(this.store, this.state), | ||
FileSystemCheckMetrics { | ||
files_removed: plan.files_to_remove.into_iter().map(|f| f.path).collect(), | ||
dry_run: true, | ||
}, | ||
)); | ||
} | ||
|
||
let metrics = plan.execute().await?; | ||
let mut table = DeltaTable::new_with_state(this.store, this.state); | ||
table.update().await?; | ||
Ok((table, metrics)) | ||
}) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
|
||
#[test] | ||
fn absolute_path() { | ||
assert!(!is_absolute_path( | ||
"part-00003-53f42606-6cda-4f13-8d07-599a21197296-c000.snappy.parquet" | ||
) | ||
.unwrap()); | ||
assert!(!is_absolute_path( | ||
"x=9/y=9.9/part-00007-3c50fba1-4264-446c-9c67-d8e24a1ccf83.c000.snappy.parquet" | ||
) | ||
.unwrap()); | ||
|
||
assert!(is_absolute_path("abfss://container@account_name.blob.core.windows.net/full/part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c000.snappy.parquet").unwrap()); | ||
assert!(is_absolute_path("file:///C:/my_table/windows.parquet").unwrap()); | ||
assert!(is_absolute_path("file:///home/my_table/unix.parquet").unwrap()); | ||
assert!(is_absolute_path("s3://container/path/file.parquet").unwrap()); | ||
assert!(is_absolute_path("gs://container/path/file.parquet").unwrap()); | ||
assert!(is_absolute_path("scheme://table/file.parquet").unwrap()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will not work for local paths. One pattern we apply in
object_store
and here is to try and cannonicalize the path and if it fails proceed with url parsing. e.g.delta-rs/rust/src/builder.rs
Lines 355 to 358 in a60129b
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't follow on why it would not work on local paths. Paths in the Delta log must follow rfc2396 which states absolute paths must have a scheme.
Do you have a example that would cause the unit test for this function to fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well .. in that case I am wrong! If that paths are required to have a scheme, then url parsing should always work for valid paths ... The windows case is also covered in your tests, so all is well :).