diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 466010ed7f..f2754a760d 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -162,14 +162,13 @@ def write_deltalake( if filesystem is not None: raise NotImplementedError("Filesystem support is not yet implemented. #570") - __enforce_append_only(table=table, configuration=configuration, mode=mode) + if table is not None: + storage_options = table._storage_options or {} + storage_options.update(storage_options or {}) - if filesystem is None: - if table is not None: - storage_options = table._storage_options or {} - storage_options.update(storage_options or {}) + filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri, storage_options)) - filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri, storage_options)) + __enforce_append_only(table=table, configuration=configuration, mode=mode) if isinstance(partition_by, str): partition_by = [partition_by] @@ -219,8 +218,10 @@ def visitor(written_file: Any) -> None: # PyArrow added support for written_file.size in 9.0.0 if PYARROW_MAJOR_VERSION >= 9: size = written_file.size - else: + elif filesystem is not None: size = filesystem.get_file_info([path])[0].size + else: + size = 0 add_actions.append( AddAction( diff --git a/python/pyproject.toml b/python/pyproject.toml index 7751407f49..e163544fc1 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -14,7 +14,7 @@ classifiers = [ "Programming Language :: Python :: 3 :: Only" ] dependencies = [ - "pyarrow>=8", + "pyarrow>=8,<=12", 'typing-extensions;python_version<"3.8"', ] diff --git a/python/src/lib.rs b/python/src/lib.rs index d8f2f92b83..3bf2625d6a 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -464,7 +464,7 @@ impl RawDeltaTable { .into_iter() .map(|part| PyFrozenSet::new(py, part.iter())) .collect::>()?; - PyFrozenSet::new(py, active_partitions.into_iter()) + PyFrozenSet::new(py, active_partitions) } fn create_write_transaction( diff --git a/python/tests/conftest.py b/python/tests/conftest.py index 2b9e349b36..64465ce156 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -1,6 +1,7 @@ import os import pathlib import subprocess +import time from datetime import date, datetime, timedelta from decimal import Decimal from time import sleep @@ -11,11 +12,15 @@ from deltalake import DeltaTable, write_deltalake -def wait_till_host_is_available(host: str, timeout_sec: int = 30): +def wait_till_host_is_available(host: str, timeout_sec: int = 0.5): spacing = 2 + start = time.monotonic() while True: + if time.monotonic() - start > timeout_sec: + raise TimeoutError(f"Host {host} is not available after {timeout_sec} sec") + try: - subprocess.run(["curl", host], timeout=500, check=True) + subprocess.run(["curl", host], timeout=timeout_sec * 1000, check=True) except Exception: pass else: diff --git a/rust/src/action/checkpoints.rs b/rust/src/action/checkpoints.rs index f3a280ad3b..61b4941ffc 100644 --- a/rust/src/action/checkpoints.rs +++ b/rust/src/action/checkpoints.rs @@ -180,7 +180,7 @@ pub async fn cleanup_expired_logs_for( ) -> Result { lazy_static! { static ref DELTA_LOG_REGEX: Regex = - Regex::new(r#"_delta_log/(\d{20})\.(json|checkpoint).*$"#).unwrap(); + Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint).*$").unwrap(); } let mut deleted_log_num = 0; diff --git a/rust/src/action/mod.rs b/rust/src/action/mod.rs index 38d338a622..60d0208c4b 100644 --- a/rust/src/action/mod.rs +++ b/rust/src/action/mod.rs @@ -881,9 +881,9 @@ pub(crate) async fn find_latest_check_point_for_version( ) -> Result, ProtocolError> { lazy_static! { static ref CHECKPOINT_REGEX: Regex = - Regex::new(r#"^_delta_log/(\d{20})\.checkpoint\.parquet$"#).unwrap(); + Regex::new(r"^_delta_log/(\d{20})\.checkpoint\.parquet$").unwrap(); static ref CHECKPOINT_PARTS_REGEX: Regex = - Regex::new(r#"^_delta_log/(\d{20})\.checkpoint\.\d{10}\.(\d{10})\.parquet$"#).unwrap(); + Regex::new(r"^_delta_log/(\d{20})\.checkpoint\.\d{10}\.(\d{10})\.parquet$").unwrap(); } let mut cp: Option = None; diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 7355becf6e..2a2d50f9b2 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -387,7 +387,7 @@ impl DeltaTable { // TODO check if regex matches against path lazy_static! { static ref DELTA_LOG_REGEX: Regex = - Regex::new(r#"^_delta_log/(\d{20})\.(json|checkpoint)*$"#).unwrap(); + Regex::new(r"^_delta_log/(\d{20})\.(json|checkpoint)*$").unwrap(); } let mut current_delta_log_ver = i64::MAX; diff --git a/rust/src/delta_arrow.rs b/rust/src/delta_arrow.rs index 82ae2fb4d1..72e2168741 100644 --- a/rust/src/delta_arrow.rs +++ b/rust/src/delta_arrow.rs @@ -658,7 +658,7 @@ mod tests { delta_log_schema_for_table(table_schema.clone(), partition_columns.as_slice(), false); // verify top-level schema contains all expected fields and they are named correctly. - let expected_fields = vec!["metaData", "protocol", "txn", "remove", "add"]; + let expected_fields = ["metaData", "protocol", "txn", "remove", "add"]; for f in log_schema.fields().iter() { assert!(expected_fields.contains(&f.name().as_str())); } @@ -771,7 +771,7 @@ mod tests { }) .collect(); assert_eq!(7, remove_fields.len()); - let expected_fields = vec![ + let expected_fields = [ "path", "deletionTimestamp", "dataChange", diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 136e69389f..a8be8ecfac 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -106,8 +106,8 @@ impl DeltaTableState { let stats = self .files() .iter() - .fold( - Some(Statistics { + .try_fold( + Statistics { num_rows: Some(0), total_byte_size: Some(0), column_statistics: Some(vec![ @@ -120,9 +120,8 @@ impl DeltaTableState { self.schema().unwrap().get_fields().len() ]), is_exact: true, - }), + }, |acc, action| { - let acc = acc?; let new_stats = action .get_stats() .unwrap_or_else(|_| Some(action::Stats::default()))?; @@ -468,7 +467,7 @@ impl TableProvider for DeltaTable { self.get_state() .files() .iter() - .zip(files_to_prune.into_iter()) + .zip(files_to_prune) .filter_map( |(action, keep)| { if keep { diff --git a/rust/src/operations/transaction/state.rs b/rust/src/operations/transaction/state.rs index 7962ae6e4d..e1114b8eee 100644 --- a/rust/src/operations/transaction/state.rs +++ b/rust/src/operations/transaction/state.rs @@ -82,7 +82,7 @@ impl DeltaTableState { Ok(Either::Left( self.files() .iter() - .zip(pruning_predicate.prune(self)?.into_iter()) + .zip(pruning_predicate.prune(self)?) .filter_map( |(action, keep_file)| { if keep_file { @@ -234,7 +234,7 @@ impl<'a> AddContainer<'a> { Ok(self .inner .iter() - .zip(pruning_predicate.prune(self)?.into_iter()) + .zip(pruning_predicate.prune(self)?) .filter_map( |(action, keep_file)| { if keep_file { diff --git a/rust/src/storage/file.rs b/rust/src/storage/file.rs index d9188b758c..6e64e52be9 100644 --- a/rust/src/storage/file.rs +++ b/rust/src/storage/file.rs @@ -19,37 +19,58 @@ const STORE_NAME: &str = "DeltaLocalObjectStore"; /// Error raised by storage lock client #[derive(thiserror::Error, Debug)] #[allow(dead_code)] -pub(self) enum LocalFileSystemError { +pub enum LocalFileSystemError { + /// Object exists already at path #[error("Object exists already at path: {} ({:?})", path, source)] AlreadyExists { + /// Path of the already existing file path: String, + /// Originating error source: Box, }, + /// Object not found at the given path #[error("Object not found at path: {} ({:?})", path, source)] NotFound { + /// Provided path which does not exist path: String, + /// Originating error source: Box, }, + /// Invalid argument sent to OS call #[error("Invalid argument in OS call for path: {} ({:?})", path, source)] - InvalidArgument { path: String, source: errno::Errno }, + InvalidArgument { + /// Provided path + path: String, + /// Originating error + source: errno::Errno, + }, + /// Null error for path for FFI #[error("Null error in FFI for path: {} ({:?})", path, source)] NullError { + /// Given path path: String, + /// Originating error source: std::ffi::NulError, }, + /// Generic catch-all error for this store #[error("Generic error in store: {} ({:?})", store, source)] Generic { + /// String name of the object store store: &'static str, + /// Originating error source: Box, }, + /// Errors from the Tokio runtime #[error("Error executing async task for path: {} ({:?})", path, source)] Tokio { + /// Path path: String, + /// Originating error source: tokio::task::JoinError, }, }