Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: just make pyarrow 12 the max #1603

Merged
merged 9 commits into from
Aug 28, 2023
15 changes: 8 additions & 7 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,13 @@ def write_deltalake(
if filesystem is not None:
raise NotImplementedError("Filesystem support is not yet implemented. #570")

__enforce_append_only(table=table, configuration=configuration, mode=mode)
if table is not None:
storage_options = table._storage_options or {}
storage_options.update(storage_options or {})

if filesystem is None:
if table is not None:
storage_options = table._storage_options or {}
storage_options.update(storage_options or {})
filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri, storage_options))

filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri, storage_options))
__enforce_append_only(table=table, configuration=configuration, mode=mode)

if isinstance(partition_by, str):
partition_by = [partition_by]
Expand Down Expand Up @@ -219,8 +218,10 @@ def visitor(written_file: Any) -> None:
# PyArrow added support for written_file.size in 9.0.0
if PYARROW_MAJOR_VERSION >= 9:
size = written_file.size
else:
elif filesystem is not None:
size = filesystem.get_file_info([path])[0].size
else:
size = 0

add_actions.append(
AddAction(
Expand Down
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ classifiers = [
"Programming Language :: Python :: 3 :: Only"
]
dependencies = [
"pyarrow>=8",
"pyarrow>=8,<=12",
'typing-extensions;python_version<"3.8"',
]

Expand Down
2 changes: 1 addition & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ impl RawDeltaTable {
.into_iter()
.map(|part| PyFrozenSet::new(py, part.iter()))
.collect::<Result<_, PyErr>>()?;
PyFrozenSet::new(py, active_partitions.into_iter())
PyFrozenSet::new(py, active_partitions)
}

fn create_write_transaction(
Expand Down
9 changes: 7 additions & 2 deletions python/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import pathlib
import subprocess
import time
from datetime import date, datetime, timedelta
from decimal import Decimal
from time import sleep
Expand All @@ -11,11 +12,15 @@
from deltalake import DeltaTable, write_deltalake


def wait_till_host_is_available(host: str, timeout_sec: int = 30):
def wait_till_host_is_available(host: str, timeout_sec: int = 0.5):
spacing = 2
start = time.monotonic()
while True:
if time.monotonic() - start > timeout_sec:
raise TimeoutError(f"Host {host} is not available after {timeout_sec} sec")

try:
subprocess.run(["curl", host], timeout=500, check=True)
subprocess.run(["curl", host], timeout=timeout_sec * 1000, check=True)
except Exception:
pass
else:
Expand Down
2 changes: 1 addition & 1 deletion rust/src/action/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ pub async fn cleanup_expired_logs_for(
) -> Result<i32, ProtocolError> {
lazy_static! {
static ref DELTA_LOG_REGEX: Regex =
Regex::new(r#"_delta_log/(\d{20})\.(json|checkpoint).*$"#).unwrap();
Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint).*$").unwrap();
}

let mut deleted_log_num = 0;
Expand Down
4 changes: 2 additions & 2 deletions rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -881,9 +881,9 @@ pub(crate) async fn find_latest_check_point_for_version(
) -> Result<Option<CheckPoint>, ProtocolError> {
lazy_static! {
static ref CHECKPOINT_REGEX: Regex =
Regex::new(r#"^_delta_log/(\d{20})\.checkpoint\.parquet$"#).unwrap();
Regex::new(r"^_delta_log/(\d{20})\.checkpoint\.parquet$").unwrap();
static ref CHECKPOINT_PARTS_REGEX: Regex =
Regex::new(r#"^_delta_log/(\d{20})\.checkpoint\.\d{10}\.(\d{10})\.parquet$"#).unwrap();
Regex::new(r"^_delta_log/(\d{20})\.checkpoint\.\d{10}\.(\d{10})\.parquet$").unwrap();
}

let mut cp: Option<CheckPoint> = None;
Expand Down
2 changes: 1 addition & 1 deletion rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ impl DeltaTable {
// TODO check if regex matches against path
lazy_static! {
static ref DELTA_LOG_REGEX: Regex =
Regex::new(r#"^_delta_log/(\d{20})\.(json|checkpoint)*$"#).unwrap();
Regex::new(r"^_delta_log/(\d{20})\.(json|checkpoint)*$").unwrap();
}

let mut current_delta_log_ver = i64::MAX;
Expand Down
4 changes: 2 additions & 2 deletions rust/src/delta_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ mod tests {
delta_log_schema_for_table(table_schema.clone(), partition_columns.as_slice(), false);

// verify top-level schema contains all expected fields and they are named correctly.
let expected_fields = vec!["metaData", "protocol", "txn", "remove", "add"];
let expected_fields = ["metaData", "protocol", "txn", "remove", "add"];
for f in log_schema.fields().iter() {
assert!(expected_fields.contains(&f.name().as_str()));
}
Expand Down Expand Up @@ -771,7 +771,7 @@ mod tests {
})
.collect();
assert_eq!(7, remove_fields.len());
let expected_fields = vec![
let expected_fields = [
"path",
"deletionTimestamp",
"dataChange",
Expand Down
9 changes: 4 additions & 5 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ impl DeltaTableState {
let stats = self
.files()
.iter()
.fold(
Some(Statistics {
.try_fold(
Statistics {
num_rows: Some(0),
total_byte_size: Some(0),
column_statistics: Some(vec![
Expand All @@ -120,9 +120,8 @@ impl DeltaTableState {
self.schema().unwrap().get_fields().len()
]),
is_exact: true,
}),
},
|acc, action| {
let acc = acc?;
let new_stats = action
.get_stats()
.unwrap_or_else(|_| Some(action::Stats::default()))?;
Expand Down Expand Up @@ -468,7 +467,7 @@ impl TableProvider for DeltaTable {
self.get_state()
.files()
.iter()
.zip(files_to_prune.into_iter())
.zip(files_to_prune)
.filter_map(
|(action, keep)| {
if keep {
Expand Down
4 changes: 2 additions & 2 deletions rust/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl DeltaTableState {
Ok(Either::Left(
self.files()
.iter()
.zip(pruning_predicate.prune(self)?.into_iter())
.zip(pruning_predicate.prune(self)?)
.filter_map(
|(action, keep_file)| {
if keep_file {
Expand Down Expand Up @@ -234,7 +234,7 @@ impl<'a> AddContainer<'a> {
Ok(self
.inner
.iter()
.zip(pruning_predicate.prune(self)?.into_iter())
.zip(pruning_predicate.prune(self)?)
.filter_map(
|(action, keep_file)| {
if keep_file {
Expand Down
25 changes: 23 additions & 2 deletions rust/src/storage/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,58 @@ const STORE_NAME: &str = "DeltaLocalObjectStore";
/// Error raised by storage lock client
#[derive(thiserror::Error, Debug)]
#[allow(dead_code)]
pub(self) enum LocalFileSystemError {
pub enum LocalFileSystemError {
/// Object exists already at path
#[error("Object exists already at path: {} ({:?})", path, source)]
AlreadyExists {
/// Path of the already existing file
path: String,
/// Originating error
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},

/// Object not found at the given path
#[error("Object not found at path: {} ({:?})", path, source)]
NotFound {
/// Provided path which does not exist
path: String,
/// Originating error
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},

/// Invalid argument sent to OS call
#[error("Invalid argument in OS call for path: {} ({:?})", path, source)]
InvalidArgument { path: String, source: errno::Errno },
InvalidArgument {
/// Provided path
path: String,
/// Originating error
source: errno::Errno,
},

/// Null error for path for FFI
#[error("Null error in FFI for path: {} ({:?})", path, source)]
NullError {
/// Given path
path: String,
/// Originating error
source: std::ffi::NulError,
},

/// Generic catch-all error for this store
#[error("Generic error in store: {} ({:?})", store, source)]
Generic {
/// String name of the object store
store: &'static str,
/// Originating error
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},

/// Errors from the Tokio runtime
#[error("Error executing async task for path: {} ({:?})", path, source)]
Tokio {
/// Path
path: String,
/// Originating error
source: tokio::task::JoinError,
},
}
Expand Down
Loading