From 5aef21a54edd9791330f114142d25f7dbbef4212 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Thu, 24 Aug 2023 22:49:55 -0700 Subject: [PATCH 1/8] Beginning to fix clippy errors on the latest Rust 1.72 --- rust/src/action/checkpoints.rs | 2 +- rust/src/action/mod.rs | 4 ++-- rust/src/delta.rs | 2 +- rust/src/storage/file.rs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/rust/src/action/checkpoints.rs b/rust/src/action/checkpoints.rs index f3a280ad3b..61b4941ffc 100644 --- a/rust/src/action/checkpoints.rs +++ b/rust/src/action/checkpoints.rs @@ -180,7 +180,7 @@ pub async fn cleanup_expired_logs_for( ) -> Result { lazy_static! { static ref DELTA_LOG_REGEX: Regex = - Regex::new(r#"_delta_log/(\d{20})\.(json|checkpoint).*$"#).unwrap(); + Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint).*$").unwrap(); } let mut deleted_log_num = 0; diff --git a/rust/src/action/mod.rs b/rust/src/action/mod.rs index 38d338a622..60d0208c4b 100644 --- a/rust/src/action/mod.rs +++ b/rust/src/action/mod.rs @@ -881,9 +881,9 @@ pub(crate) async fn find_latest_check_point_for_version( ) -> Result, ProtocolError> { lazy_static! { static ref CHECKPOINT_REGEX: Regex = - Regex::new(r#"^_delta_log/(\d{20})\.checkpoint\.parquet$"#).unwrap(); + Regex::new(r"^_delta_log/(\d{20})\.checkpoint\.parquet$").unwrap(); static ref CHECKPOINT_PARTS_REGEX: Regex = - Regex::new(r#"^_delta_log/(\d{20})\.checkpoint\.\d{10}\.(\d{10})\.parquet$"#).unwrap(); + Regex::new(r"^_delta_log/(\d{20})\.checkpoint\.\d{10}\.(\d{10})\.parquet$").unwrap(); } let mut cp: Option = None; diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 7355becf6e..2a2d50f9b2 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -387,7 +387,7 @@ impl DeltaTable { // TODO check if regex matches against path lazy_static! { static ref DELTA_LOG_REGEX: Regex = - Regex::new(r#"^_delta_log/(\d{20})\.(json|checkpoint)*$"#).unwrap(); + Regex::new(r"^_delta_log/(\d{20})\.(json|checkpoint)*$").unwrap(); } let mut current_delta_log_ver = i64::MAX; diff --git a/rust/src/storage/file.rs b/rust/src/storage/file.rs index d9188b758c..69504b55fc 100644 --- a/rust/src/storage/file.rs +++ b/rust/src/storage/file.rs @@ -19,7 +19,7 @@ const STORE_NAME: &str = "DeltaLocalObjectStore"; /// Error raised by storage lock client #[derive(thiserror::Error, Debug)] #[allow(dead_code)] -pub(self) enum LocalFileSystemError { +pub enum LocalFileSystemError { #[error("Object exists already at path: {} ({:?})", path, source)] AlreadyExists { path: String, From 0b966ddc5aca59f9d53f8446821c89a972991df6 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 26 Aug 2023 09:59:41 -0700 Subject: [PATCH 2/8] Add missing doc strings to the LocalFileSystemError Rust 1.72 was particularly annoyed by these missing docs :shrug: --- rust/src/storage/file.rs | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/rust/src/storage/file.rs b/rust/src/storage/file.rs index 69504b55fc..6e64e52be9 100644 --- a/rust/src/storage/file.rs +++ b/rust/src/storage/file.rs @@ -20,36 +20,57 @@ const STORE_NAME: &str = "DeltaLocalObjectStore"; #[derive(thiserror::Error, Debug)] #[allow(dead_code)] pub enum LocalFileSystemError { + /// Object exists already at path #[error("Object exists already at path: {} ({:?})", path, source)] AlreadyExists { + /// Path of the already existing file path: String, + /// Originating error source: Box, }, + /// Object not found at the given path #[error("Object not found at path: {} ({:?})", path, source)] NotFound { + /// Provided path which does not exist path: String, + /// Originating error source: Box, }, + /// Invalid argument sent to OS call #[error("Invalid argument in OS call for path: {} ({:?})", path, source)] - InvalidArgument { path: String, source: errno::Errno }, + InvalidArgument { + /// Provided path + path: String, + /// Originating error + source: errno::Errno, + }, + /// Null error for path for FFI #[error("Null error in FFI for path: {} ({:?})", path, source)] NullError { + /// Given path path: String, + /// Originating error source: std::ffi::NulError, }, + /// Generic catch-all error for this store #[error("Generic error in store: {} ({:?})", store, source)] Generic { + /// String name of the object store store: &'static str, + /// Originating error source: Box, }, + /// Errors from the Tokio runtime #[error("Error executing async task for path: {} ({:?})", path, source)] Tokio { + /// Path path: String, + /// Originating error source: tokio::task::JoinError, }, } From d1cc3cf26f97db12ae062be12b5e005a95011ed4 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 26 Aug 2023 10:29:27 -0700 Subject: [PATCH 3/8] Removing some now unnecessary according to clippy code --- rust/src/delta_arrow.rs | 4 ++-- rust/src/delta_datafusion.rs | 9 ++++----- rust/src/operations/transaction/state.rs | 4 ++-- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/rust/src/delta_arrow.rs b/rust/src/delta_arrow.rs index 82ae2fb4d1..72e2168741 100644 --- a/rust/src/delta_arrow.rs +++ b/rust/src/delta_arrow.rs @@ -658,7 +658,7 @@ mod tests { delta_log_schema_for_table(table_schema.clone(), partition_columns.as_slice(), false); // verify top-level schema contains all expected fields and they are named correctly. - let expected_fields = vec!["metaData", "protocol", "txn", "remove", "add"]; + let expected_fields = ["metaData", "protocol", "txn", "remove", "add"]; for f in log_schema.fields().iter() { assert!(expected_fields.contains(&f.name().as_str())); } @@ -771,7 +771,7 @@ mod tests { }) .collect(); assert_eq!(7, remove_fields.len()); - let expected_fields = vec![ + let expected_fields = [ "path", "deletionTimestamp", "dataChange", diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 136e69389f..a8be8ecfac 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -106,8 +106,8 @@ impl DeltaTableState { let stats = self .files() .iter() - .fold( - Some(Statistics { + .try_fold( + Statistics { num_rows: Some(0), total_byte_size: Some(0), column_statistics: Some(vec![ @@ -120,9 +120,8 @@ impl DeltaTableState { self.schema().unwrap().get_fields().len() ]), is_exact: true, - }), + }, |acc, action| { - let acc = acc?; let new_stats = action .get_stats() .unwrap_or_else(|_| Some(action::Stats::default()))?; @@ -468,7 +467,7 @@ impl TableProvider for DeltaTable { self.get_state() .files() .iter() - .zip(files_to_prune.into_iter()) + .zip(files_to_prune) .filter_map( |(action, keep)| { if keep { diff --git a/rust/src/operations/transaction/state.rs b/rust/src/operations/transaction/state.rs index 7962ae6e4d..e1114b8eee 100644 --- a/rust/src/operations/transaction/state.rs +++ b/rust/src/operations/transaction/state.rs @@ -82,7 +82,7 @@ impl DeltaTableState { Ok(Either::Left( self.files() .iter() - .zip(pruning_predicate.prune(self)?.into_iter()) + .zip(pruning_predicate.prune(self)?) .filter_map( |(action, keep_file)| { if keep_file { @@ -234,7 +234,7 @@ impl<'a> AddContainer<'a> { Ok(self .inner .iter() - .zip(pruning_predicate.prune(self)?.into_iter()) + .zip(pruning_predicate.prune(self)?) .filter_map( |(action, keep_file)| { if keep_file { From 3acacf4612ae15559c9462ee86148b7c5de21d13 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 26 Aug 2023 10:39:48 -0700 Subject: [PATCH 4/8] Remove into_iter() clippy warning too --- python/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/src/lib.rs b/python/src/lib.rs index d8f2f92b83..3bf2625d6a 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -464,7 +464,7 @@ impl RawDeltaTable { .into_iter() .map(|part| PyFrozenSet::new(py, part.iter())) .collect::>()?; - PyFrozenSet::new(py, active_partitions.into_iter()) + PyFrozenSet::new(py, active_partitions) } fn create_write_transaction( From a2a279cbc01096c126dcf64f5a327d9f3e251582 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 26 Aug 2023 11:56:20 -0700 Subject: [PATCH 5/8] Handle the possibility that a filesystem is None when writing in Python I don't understand how this wasn't failing before since filesystem is Optional in the type, error in Actions: mypy deltalake/writer.py:223: error: Item "None" of "Optional[Any]" has no attribute "get_file_info" [union-attr] Found 1 error in 1 file (checked 7 source files) --- python/deltalake/writer.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 466010ed7f..91ecd3de81 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -219,8 +219,10 @@ def visitor(written_file: Any) -> None: # PyArrow added support for written_file.size in 9.0.0 if PYARROW_MAJOR_VERSION >= 9: size = written_file.size - else: + else if filesystem is not None: size = filesystem.get_file_info([path])[0].size + else: + size = 0 add_actions.append( AddAction( From 0246f06372f1a9b8437225c162e5051deee948f1 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 27 Aug 2023 17:05:59 -0700 Subject: [PATCH 6/8] temporary fix: just max 12 the max --- python/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index 7751407f49..e163544fc1 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -14,7 +14,7 @@ classifiers = [ "Programming Language :: Python :: 3 :: Only" ] dependencies = [ - "pyarrow>=8", + "pyarrow>=8,<=12", 'typing-extensions;python_version<"3.8"', ] From cefe533cfd85620e046416ef79334543b4934ca1 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 27 Aug 2023 17:36:47 -0700 Subject: [PATCH 7/8] ci fixes --- python/deltalake/writer.py | 11 +++++------ python/tests/conftest.py | 9 +++++++-- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 466010ed7f..648cfe2c97 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -162,14 +162,13 @@ def write_deltalake( if filesystem is not None: raise NotImplementedError("Filesystem support is not yet implemented. #570") - __enforce_append_only(table=table, configuration=configuration, mode=mode) + if table is not None: + storage_options = table._storage_options or {} + storage_options.update(storage_options or {}) - if filesystem is None: - if table is not None: - storage_options = table._storage_options or {} - storage_options.update(storage_options or {}) + filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri, storage_options)) - filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri, storage_options)) + __enforce_append_only(table=table, configuration=configuration, mode=mode) if isinstance(partition_by, str): partition_by = [partition_by] diff --git a/python/tests/conftest.py b/python/tests/conftest.py index 2b9e349b36..64465ce156 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -1,6 +1,7 @@ import os import pathlib import subprocess +import time from datetime import date, datetime, timedelta from decimal import Decimal from time import sleep @@ -11,11 +12,15 @@ from deltalake import DeltaTable, write_deltalake -def wait_till_host_is_available(host: str, timeout_sec: int = 30): +def wait_till_host_is_available(host: str, timeout_sec: int = 0.5): spacing = 2 + start = time.monotonic() while True: + if time.monotonic() - start > timeout_sec: + raise TimeoutError(f"Host {host} is not available after {timeout_sec} sec") + try: - subprocess.run(["curl", host], timeout=500, check=True) + subprocess.run(["curl", host], timeout=timeout_sec * 1000, check=True) except Exception: pass else: From 2afdf1c9d458960d54e3a7b178a6fb5f86817e91 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 27 Aug 2023 17:41:53 -0700 Subject: [PATCH 8/8] add dumb fix --- python/deltalake/writer.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 648cfe2c97..f2754a760d 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -218,8 +218,10 @@ def visitor(written_file: Any) -> None: # PyArrow added support for written_file.size in 9.0.0 if PYARROW_MAJOR_VERSION >= 9: size = written_file.size - else: + elif filesystem is not None: size = filesystem.get_file_info([path])[0].size + else: + size = 0 add_actions.append( AddAction(