Skip to content

Commit

Permalink
Merge branch 'main' into fix/use_require_files
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco authored Aug 21, 2024
2 parents bae0af1 + 1f45881 commit 89a4c5a
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 160 deletions.
2 changes: 2 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ futures = { workspace = true }
num_cpus = { workspace = true }
tokio = { workspace = true, features = [
"macros",
"process",
"rt",
"rt-multi-thread",
"signal",
"sync",
"fs",
"parking_lot",
Expand Down
4 changes: 4 additions & 0 deletions docs/api/delta_writer.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ search:

::: deltalake.write_deltalake

::: deltalake.BloomFilterProperties

::: deltalake.ColumnProperties

::: deltalake.WriterProperties

## Convert to Delta Tables
Expand Down
2 changes: 1 addition & 1 deletion docs/usage/optimize/small-file-compaction-with-optimize.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ Let’s run the optimize command to compact the existing small files into larger
```python
dt = DeltaTable("observation_data")

dt.optimize()
dt.optimize.compact()
```

Here’s the output of the command:
Expand Down
35 changes: 34 additions & 1 deletion docs/usage/writing/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,37 @@ In this case, you can use a `predicate` to overwrite only the relevant records o
Data written must conform to the same predicate, i.e. not contain any records that don't match the `predicate` condition,
otherwise the operation will fail

{{ code_example('operations', 'replace_where', ['replaceWhere'])}}
{{ code_example('operations', 'replace_where', ['replaceWhere'])}}

## Using Writer Properites

You can customize the Rust Parquet writer by using the [WriterProperties](../../api/delta_writer.md#deltalake.WriterProperties). Additionally, you can apply extra configurations through the [BloomFilterProperties](../../api/delta_writer.md#deltalake.BloomFilterProperties) and [ColumnProperties](../../api/delta_writer.md#deltalake.ColumnProperties) data classes.


Here's how you can do it:
``` python
from deltalake import BloomFilterProperties, ColumnProperties, WriterProperties, write_deltalake
import pyarrow as pa

wp = WriterProperties(
statistics_truncate_length=200,
default_column_properties=ColumnProperties(
bloom_filter_properties=BloomFilterProperties(True, 0.2, 30)
),
column_properties={
"value_non_bloom": ColumnProperties(bloom_filter_properties=None),
},
)

table_path = "/tmp/my_table"

data = pa.table(
{
"id": pa.array(["1", "1"], pa.string()),
"value": pa.array([11, 12], pa.int64()),
"value_non_bloom": pa.array([11, 12], pa.int64()),
}
)

write_deltalake(table_path, data, writer_properties=wp)
```
14 changes: 7 additions & 7 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,10 @@ class RawDeltaTable:
def protocol_versions(self) -> List[Any]: ...
def load_version(self, version: int) -> None: ...
def load_with_datetime(self, ds: str) -> None: ...
def files_by_partitions(
self, partitions_filters: Optional[FilterType]
def files(self, partition_filters: Optional[PartitionFilterType]) -> List[str]: ...
def file_uris(
self, partition_filters: Optional[PartitionFilterType]
) -> List[str]: ...
def files(self, partition_filters: Optional[FilterType]) -> List[str]: ...
def file_uris(self, partition_filters: Optional[FilterType]) -> List[str]: ...
def vacuum(
self,
dry_run: bool,
Expand All @@ -64,7 +63,7 @@ class RawDeltaTable:
) -> List[str]: ...
def compact_optimize(
self,
partition_filters: Optional[FilterType],
partition_filters: Optional[PartitionFilterType],
target_size: Optional[int],
max_concurrent_tasks: Optional[int],
min_commit_interval: Optional[int],
Expand All @@ -75,7 +74,7 @@ class RawDeltaTable:
def z_order_optimize(
self,
z_order_columns: List[str],
partition_filters: Optional[FilterType],
partition_filters: Optional[PartitionFilterType],
target_size: Optional[int],
max_concurrent_tasks: Optional[int],
max_spill_size: Optional[int],
Expand Down Expand Up @@ -119,7 +118,7 @@ class RawDeltaTable:
def history(self, limit: Optional[int]) -> List[str]: ...
def update_incremental(self) -> None: ...
def dataset_partitions(
self, schema: pyarrow.Schema, partition_filters: Optional[FilterType]
self, schema: pyarrow.Schema, partition_filters: Optional[FilterConjunctionType]
) -> List[Any]: ...
def create_checkpoint(self) -> None: ...
def get_add_actions(self, flatten: bool) -> pyarrow.RecordBatch: ...
Expand Down Expand Up @@ -852,3 +851,4 @@ FilterLiteralType = Tuple[str, str, Any]
FilterConjunctionType = List[FilterLiteralType]
FilterDNFType = List[FilterConjunctionType]
FilterType = Union[FilterConjunctionType, FilterDNFType]
PartitionFilterType = List[Tuple[str, str, Union[str, List[str]]]]
157 changes: 27 additions & 130 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@
NOT_SUPPORTED_READER_VERSION = 2
SUPPORTED_READER_FEATURES = {"timestampNtz"}

FilterLiteralType = Tuple[str, str, Any]
FilterConjunctionType = List[FilterLiteralType]
FilterDNFType = List[FilterConjunctionType]
FilterType = Union[FilterConjunctionType, FilterDNFType]
PartitionFilterType = List[Tuple[str, str, Union[str, List[str]]]]


class Compression(Enum):
UNCOMPRESSED = "UNCOMPRESSED"
Expand Down Expand Up @@ -336,15 +342,6 @@ class ProtocolVersions(NamedTuple):
reader_features: Optional[List[str]]


FilterLiteralType = Tuple[str, str, Any]

FilterConjunctionType = List[FilterLiteralType]

FilterDNFType = List[FilterConjunctionType]

FilterType = Union[FilterConjunctionType, FilterDNFType]


@dataclass(init=False)
class DeltaTable:
"""Represents a Delta Table"""
Expand Down Expand Up @@ -410,14 +407,8 @@ def from_data_catalog(
but will also increase memory usage. Possible rate limits of the storage backend should
also be considered for optimal performance. Defaults to 4 * number of cpus.
"""
table_uri = RawDeltaTable.get_table_uri_from_data_catalog(
data_catalog=data_catalog.value,
data_catalog_id=data_catalog_id,
database_name=database_name,
table_name=table_name,
)
return cls(
table_uri=table_uri, version=version, log_buffer_size=log_buffer_size
raise NotImplementedError(
"Reading from data catalog is not supported at this point in time."
)

@staticmethod
Expand Down Expand Up @@ -550,10 +541,10 @@ def files(
("z", "not in", ["a","b"])
```
"""
return self._table.files(self.__stringify_partition_values(partition_filters))
return self._table.files(self._stringify_partition_values(partition_filters))

def file_uris(
self, partition_filters: Optional[List[Tuple[str, str, Any]]] = None
self, partition_filters: Optional[FilterConjunctionType] = None
) -> List[str]:
"""
Get the list of files as absolute URIs, including the scheme (e.g. "s3://").
Expand Down Expand Up @@ -588,7 +579,7 @@ def file_uris(
```
"""
return self._table.file_uris(
self.__stringify_partition_values(partition_filters)
self._stringify_partition_values(partition_filters)
)

file_uris.__doc__ = ""
Expand Down Expand Up @@ -635,48 +626,6 @@ def load_as_version(self, version: Union[int, str, datetime]) -> None:
"Invalid datatype provided for version, only int, str or datetime are accepted."
)

def load_version(self, version: int) -> None:
"""
Load a DeltaTable with a specified version.
!!! warning "Deprecated"
Load_version and load_with_datetime have been combined into `DeltaTable.load_as_version`.
Args:
version: the identifier of the version of the DeltaTable to load
"""
warnings.warn(
"Call to deprecated method DeltaTable.load_version. Use DeltaTable.load_as_version() instead.",
category=DeprecationWarning,
stacklevel=2,
)
self._table.load_version(version)

def load_with_datetime(self, datetime_string: str) -> None:
"""
Time travel Delta table to the latest version that's created at or before provided `datetime_string` argument.
The `datetime_string` argument should be an RFC 3339 and ISO 8601 date and time string.
!!! warning "Deprecated"
Load_version and load_with_datetime have been combined into `DeltaTable.load_as_version`.
Args:
datetime_string: the identifier of the datetime point of the DeltaTable to load
Example:
```
"2018-01-26T18:30:09Z"
"2018-12-19T16:39:57-08:00"
"2018-01-26T18:30:09.453+00:00"
```
"""
warnings.warn(
"Call to deprecated method DeltaTable.load_with_datetime. Use DeltaTable.load_as_version() instead.",
category=DeprecationWarning,
stacklevel=2,
)
self._table.load_with_datetime(datetime_string)

def load_cdf(
self,
starting_version: int = 0,
Expand Down Expand Up @@ -706,12 +655,18 @@ def schema(self) -> DeltaSchema:
"""
return self._table.schema

def files_by_partitions(self, partition_filters: Optional[FilterType]) -> List[str]:
def files_by_partitions(self, partition_filters: PartitionFilterType) -> List[str]:
"""
Get the files for each partition
"""
return self._table.files_by_partitions(partition_filters)
warnings.warn(
"files_by_partitions is deprecated, please use DeltaTable.files() instead.",
category=DeprecationWarning,
stacklevel=2,
)

return self.files(partition_filters)

def metadata(self) -> Metadata:
"""
Expand Down Expand Up @@ -1045,7 +1000,7 @@ def restore(

def to_pyarrow_dataset(
self,
partitions: Optional[List[Tuple[str, str, Any]]] = None,
partitions: Optional[FilterConjunctionType] = None,
filesystem: Optional[Union[str, pa_fs.FileSystem]] = None,
parquet_read_options: Optional[ParquetReadOptions] = None,
schema: Optional[pyarrow.Schema] = None,
Expand Down Expand Up @@ -1208,9 +1163,9 @@ def cleanup_metadata(self) -> None:
"""
self._table.cleanup_metadata()

def __stringify_partition_values(
self, partition_filters: Optional[List[Tuple[str, str, Any]]]
) -> Optional[List[Tuple[str, str, Union[str, List[str]]]]]:
def _stringify_partition_values(
self, partition_filters: Optional[FilterConjunctionType]
) -> Optional[PartitionFilterType]:
if partition_filters is None:
return partition_filters
out = []
Expand Down Expand Up @@ -1372,45 +1327,6 @@ def __init__(
self.not_matched_by_source_delete_predicate: Optional[List[str]] = None
self.not_matched_by_source_delete_all: Optional[bool] = None

def with_writer_properties(
self,
data_page_size_limit: Optional[int] = None,
dictionary_page_size_limit: Optional[int] = None,
data_page_row_count_limit: Optional[int] = None,
write_batch_size: Optional[int] = None,
max_row_group_size: Optional[int] = None,
) -> "TableMerger":
"""
!!! warning "Deprecated"
Use `.merge(writer_properties = WriterProperties())` instead
Pass writer properties to the Rust parquet writer, see options https://arrow.apache.org/rust/parquet/file/properties/struct.WriterProperties.html:
Args:
data_page_size_limit: Limit DataPage size to this in bytes.
dictionary_page_size_limit: Limit the size of each DataPage to store dicts to this amount in bytes.
data_page_row_count_limit: Limit the number of rows in each DataPage.
write_batch_size: Splits internally to smaller batch size.
max_row_group_size: Max number of rows in row group.
Returns:
TableMerger: TableMerger Object
"""
warnings.warn(
"Call to deprecated method TableMerger.with_writer_properties. Use DeltaTable.merge(writer_properties=WriterProperties()) instead.",
category=DeprecationWarning,
stacklevel=2,
)

writer_properties: Dict[str, Any] = {
"data_page_size_limit": data_page_size_limit,
"dictionary_page_size_limit": dictionary_page_size_limit,
"data_page_row_count_limit": data_page_row_count_limit,
"write_batch_size": write_batch_size,
"max_row_group_size": max_row_group_size,
}
self.writer_properties = WriterProperties(**writer_properties)
return self

def when_matched_update(
self, updates: Dict[str, str], predicate: Optional[str] = None
) -> "TableMerger":
Expand Down Expand Up @@ -2005,28 +1921,9 @@ class TableOptimizer:
def __init__(self, table: DeltaTable):
self.table = table

def __call__(
self,
partition_filters: Optional[FilterType] = None,
target_size: Optional[int] = None,
max_concurrent_tasks: Optional[int] = None,
) -> Dict[str, Any]:
"""
!!! warning "DEPRECATED 0.10.0"
Use [compact][deltalake.table.DeltaTable.compact] instead, which has the same signature.
"""

warnings.warn(
"Call to deprecated method DeltaTable.optimize. Use DeltaTable.optimize.compact() instead.",
category=DeprecationWarning,
stacklevel=2,
)

return self.compact(partition_filters, target_size, max_concurrent_tasks)

def compact(
self,
partition_filters: Optional[FilterType] = None,
partition_filters: Optional[FilterConjunctionType] = None,
target_size: Optional[int] = None,
max_concurrent_tasks: Optional[int] = None,
min_commit_interval: Optional[Union[int, timedelta]] = None,
Expand Down Expand Up @@ -2081,7 +1978,7 @@ def compact(
min_commit_interval = int(min_commit_interval.total_seconds())

metrics = self.table._table.compact_optimize(
partition_filters,
self.table._stringify_partition_values(partition_filters),
target_size,
max_concurrent_tasks,
min_commit_interval,
Expand All @@ -2095,7 +1992,7 @@ def compact(
def z_order(
self,
columns: Iterable[str],
partition_filters: Optional[FilterType] = None,
partition_filters: Optional[FilterConjunctionType] = None,
target_size: Optional[int] = None,
max_concurrent_tasks: Optional[int] = None,
max_spill_size: int = 20 * 1024 * 1024 * 1024,
Expand Down Expand Up @@ -2150,7 +2047,7 @@ def z_order(

metrics = self.table._table.z_order_optimize(
list(columns),
partition_filters,
self.table._stringify_partition_values(partition_filters),
target_size,
max_concurrent_tasks,
max_spill_size,
Expand Down
Loading

0 comments on commit 89a4c5a

Please sign in to comment.