Skip to content

Commit

Permalink
fix(python): reuse state in to_pyarrow_dataset (#2485)
Browse files Browse the repository at this point in the history
# Description
Reuse the state so we save time instead of reconstructing and verifying
the state.
  • Loading branch information
ion-elgreco authored May 10, 2024
1 parent 8a8702f commit c734926
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 27 deletions.
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-python"
version = "0.17.3"
version = "0.17.4"
authors = ["Qingping Hou <dave2008713@gmail.com>", "Will Jones <willjones127@gmail.com>"]
homepage = "https://github.com/delta-io/delta-rs"
license = "Apache-2.0"
Expand Down
15 changes: 11 additions & 4 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -721,10 +721,17 @@ class DeltaFileSystemHandler:

def __init__(
self,
root: str,
options: dict[str, str] | None = None,
known_sizes: dict[str, int] | None = None,
table_uri: str,
options: Dict[str, str] | None = None,
known_sizes: Dict[str, int] | None = None,
) -> None: ...
@classmethod
def from_table(
cls,
table: RawDeltaTable,
options: Dict[str, str] | None = None,
known_sizes: Dict[str, int] | None = None,
) -> "DeltaFileSystemHandler": ...
def get_type_name(self) -> str: ...
def copy_file(self, src: str, dst: str) -> None:
"""Copy a file.
Expand Down Expand Up @@ -776,7 +783,7 @@ class DeltaFileSystemHandler:
def open_input_file(self, path: str) -> ObjectInputFile:
"""Open an input file for random access reading."""
def open_output_stream(
self, path: str, metadata: dict[str, str] | None = None
self, path: str, metadata: Dict[str, str] | None = None
) -> ObjectOutputStream:
"""Open an output stream for sequential writing."""

Expand Down
108 changes: 97 additions & 11 deletions python/deltalake/fs.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,102 @@
from typing import Dict, List, Optional
from typing import Any, Dict, List, Mapping, Optional

import pyarrow as pa
from pyarrow.fs import FileInfo, FileSelector, FileSystemHandler

from ._internal import DeltaFileSystemHandler
from ._internal import DeltaFileSystemHandler, RawDeltaTable


# NOTE we need to inherit form FileSystemHandler to pass pyarrow's internal type checks.
class DeltaStorageHandler(DeltaFileSystemHandler, FileSystemHandler):
class DeltaStorageHandler(FileSystemHandler):
"""
DeltaStorageHandler is a concrete implementations of a PyArrow FileSystemHandler.
"""

def __init__(
self,
table_uri: str,
options: Optional[Dict[str, str]] = None,
known_sizes: Optional[Dict[str, int]] = None,
):
self._handler = DeltaFileSystemHandler(
table_uri=table_uri, options=options, known_sizes=known_sizes
)

@classmethod
def from_table(
cls,
table: RawDeltaTable,
options: Optional[Dict[str, str]] = None,
known_sizes: Optional[Dict[str, int]] = None,
) -> "DeltaStorageHandler":
self = cls.__new__(cls)
self._handler = DeltaFileSystemHandler.from_table(table, options, known_sizes)
return self

def get_type_name(self) -> str:
return self._handler.get_type_name()

def copy_file(self, src: str, dst: str) -> None:
"""Copy a file.
If the destination exists and is a directory, an error is returned. Otherwise, it is replaced.
"""
return self._handler.copy_file(src=src, dst=dst)

def create_dir(self, path: str, recursive: bool = True) -> None:
"""Create a directory and subdirectories.
This function succeeds if the directory already exists.
"""
return self._handler.create_dir(path, recursive)

def delete_dir(self, path: str) -> None:
"""Delete a directory and its contents, recursively."""
return self._handler.delete_dir(path)

def delete_file(self, path: str) -> None:
"""Delete a file."""
return self._handler.delete_file(path)

def equals(self, other: Any) -> bool:
return self._handler.equals(other)

def delete_dir_contents(
self, path: str, *, accept_root_dir: bool = False, missing_dir_ok: bool = False
) -> None:
"""Delete a directory's contents, recursively.
Like delete_dir, but doesn't delete the directory itself.
"""
return self._handler.delete_dir_contents(
path=path, accept_root_dir=accept_root_dir, missing_dir_ok=missing_dir_ok
)

def delete_root_dir_contents(self) -> None:
"""Delete the root directory contents, recursively."""
return self._handler.delete_root_dir_contents()

def get_file_info(self, paths: List[str]) -> List[FileInfo]:
"""Get info for the given files.
A non-existing or unreachable file returns a FileStat object and has a FileType of value NotFound.
An exception indicates a truly exceptional condition (low-level I/O error, etc.).
"""
return self._handler.get_file_info(paths)

def move(self, src: str, dest: str) -> None:
"""Move / rename a file or directory.
If the destination exists: - if it is a non-empty directory, an error is returned - otherwise,
if it has the same type as the source, it is replaced - otherwise, behavior is
unspecified (implementation-dependent).
"""
self._handler.move_file(src=src, dest=dest)

def normalize_path(self, path: str) -> str:
"""Normalize filesystem path."""
return self._handler.normalize_path(path)

def open_input_file(self, path: str) -> pa.PythonFile:
"""
Open an input file for random access reading.
Expand All @@ -22,7 +107,7 @@ def open_input_file(self, path: str) -> pa.PythonFile:
Returns:
NativeFile
"""
return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path))
return pa.PythonFile(self._handler.open_input_file(path))

def open_input_stream(self, path: str) -> pa.PythonFile:
"""
Expand All @@ -34,7 +119,7 @@ def open_input_stream(self, path: str) -> pa.PythonFile:
Returns:
NativeFile
"""
return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path))
return pa.PythonFile(self._handler.open_input_file(path))

def open_output_stream(
self, path: str, metadata: Optional[Dict[str, str]] = None
Expand All @@ -51,11 +136,9 @@ def open_output_stream(
Returns:
NativeFile
"""
return pa.PythonFile(
DeltaFileSystemHandler.open_output_stream(self, path, metadata)
)
return pa.PythonFile(self._handler.open_output_stream(path, metadata))

def get_file_info_selector(self, selector: FileSelector) -> List[FileInfo]: # type: ignore
def get_file_info_selector(self, selector: FileSelector) -> List[FileInfo]:
"""
Get info for the files defined by FileSelector.
Expand All @@ -65,6 +148,9 @@ def get_file_info_selector(self, selector: FileSelector) -> List[FileInfo]: # t
Returns:
list of file info objects
"""
return DeltaFileSystemHandler.get_file_info_selector(
self, selector.base_dir, selector.allow_not_found, selector.recursive
return self._handler.get_file_info_selector(
selector.base_dir, selector.allow_not_found, selector.recursive
)

def open_append_stream(self, path: str, metadata: Mapping[str, str]) -> None:
raise NotImplementedError
6 changes: 4 additions & 2 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1095,8 +1095,10 @@ def to_pyarrow_dataset(
x: y for x, y in zip(file_sizes["path"], file_sizes["size_bytes"])
}
filesystem = pa_fs.PyFileSystem(
DeltaStorageHandler(
self._table.table_uri(), self._storage_options, file_sizes
DeltaStorageHandler.from_table(
self._table,
self._storage_options,
file_sizes,
)
)
format = ParquetFileFormat(
Expand Down
10 changes: 9 additions & 1 deletion python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,13 +339,18 @@ def write_deltalake(
"schema_mode 'merge' is not supported in pyarrow engine. Use engine=rust"
)
# We need to write against the latest table version
filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri, storage_options))

def sort_arrow_schema(schema: pa.schema) -> pa.schema:
sorted_cols = sorted(iter(schema), key=lambda x: (x.name, str(x.type)))
return pa.schema(sorted_cols)

if table: # already exists
filesystem = pa_fs.PyFileSystem(
DeltaStorageHandler.from_table(
table=table._table, options=storage_options
)
)

if sort_arrow_schema(schema) != sort_arrow_schema(
table.schema().to_pyarrow(as_large_types=large_dtypes)
) and not (mode == "overwrite" and schema_mode == "overwrite"):
Expand All @@ -370,6 +375,9 @@ def sort_arrow_schema(schema: pa.schema) -> pa.schema:
partition_by = table.metadata().partition_columns

else: # creating a new table
filesystem = pa_fs.PyFileSystem(
DeltaStorageHandler(table_uri, options=storage_options)
)
current_version = -1

dtype_map = {
Expand Down
34 changes: 27 additions & 7 deletions python/src/filesystem.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::collections::HashMap;
use std::sync::Arc;

use crate::error::PythonError;
use crate::utils::{delete_dir, rt, walk_tree};
use crate::RawDeltaTable;
use deltalake::storage::{DynObjectStore, ListResult, MultipartId, ObjectStoreError, Path};
use deltalake::DeltaTableBuilder;
use pyo3::exceptions::{PyIOError, PyNotImplementedError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{IntoPyDict, PyBytes};
use pyo3::types::{IntoPyDict, PyBytes, PyType};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::io::{AsyncWrite, AsyncWriteExt};

const DEFAULT_MAX_BUFFER_SIZE: i64 = 4 * 1024 * 1024;
Expand Down Expand Up @@ -43,19 +43,39 @@ impl DeltaFileSystemHandler {
#[new]
#[pyo3(signature = (table_uri, options = None, known_sizes = None))]
fn new(
table_uri: &str,
table_uri: String,
options: Option<HashMap<String, String>>,
known_sizes: Option<HashMap<String, i64>>,
) -> PyResult<Self> {
let storage = DeltaTableBuilder::from_uri(table_uri)
let storage = DeltaTableBuilder::from_uri(&table_uri)
.with_storage_options(options.clone().unwrap_or_default())
.build_storage()
.map_err(PythonError::from)?
.object_store();

Ok(Self {
inner: storage,
config: FsConfig {
root_url: table_uri,
options: options.unwrap_or_default(),
},
known_sizes,
})
}

#[classmethod]
#[pyo3(signature = (table, options = None, known_sizes = None))]
fn from_table(
_cls: &PyType,
table: &RawDeltaTable,
options: Option<HashMap<String, String>>,
known_sizes: Option<HashMap<String, i64>>,
) -> PyResult<Self> {
let storage = table._table.object_store();
Ok(Self {
inner: storage,
config: FsConfig {
root_url: table_uri.into(),
root_url: table._table.table_uri(),
options: options.unwrap_or_default(),
},
known_sizes,
Expand Down
2 changes: 1 addition & 1 deletion python/tests/test_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def test_s3_authenticated_read_write(s3_localstack_creds, monkeypatch):
# Create unauthenticated handler
storage_handler = DeltaStorageHandler(
"s3://deltars/",
{
options={
"AWS_ENDPOINT_URL": s3_localstack_creds["AWS_ENDPOINT_URL"],
# Grants anonymous access. If we don't do this, will timeout trying
# to reading from EC2 instance provider.
Expand Down

0 comments on commit c734926

Please sign in to comment.