Skip to content

Commit

Permalink
feat: expose function to get table of add actions (#1033)
Browse files Browse the repository at this point in the history
# Description

Exposes function to get a dataframe of add actions for selected version
of the table.

TODO:

 * [x] add unit tests
 * [x] write user guide
 * [x] handle partition columns
 * [x] handle stats
 * [x] handle tags
 * [x] add a `flatten` option

# Related Issue(s)

- closes #1031

# Documentation

<!---
Share links to useful documentation
--->
  • Loading branch information
wjones127 authored Jan 11, 2023
1 parent 4f9dec2 commit 83260a8
Show file tree
Hide file tree
Showing 8 changed files with 1,122 additions and 1 deletion.
34 changes: 34 additions & 0 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,3 +443,37 @@ def __stringify_partition_values(
str_value = str(value)
out.append((field, op, str_value))
return out

def get_add_actions(self, flatten: bool = False) -> pyarrow.RecordBatch:
"""
Return a dataframe with all current add actions.
Add actions represent the files that currently make up the table. This
data is a low-level representation parsed from the transaction log.
:param flatten: whether to flatten the schema. Partition values columns are
given the prefix `partition.`, statistics (null_count, min, and max) are
given the prefix `null_count.`, `min.`, and `max.`, and tags the
prefix `tags.`. Nested field names are concatenated with `.`.
:returns: a PyArrow RecordBatch containing the add action data.
Examples:
>>> from deltalake import DeltaTable, write_deltalake
>>> import pyarrow as pa
>>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
>>> write_deltalake("tmp", data, partition_by=["x"])
>>> dt = DeltaTable("tmp")
>>> dt.get_add_actions_df().to_pandas()
path size_bytes modification_time data_change partition_values num_records null_count min max
0 x=2/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True {'x': 2} 1 {'y': 0} {'y': 5} {'y': 5}
1 x=3/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True {'x': 3} 1 {'y': 0} {'y': 6} {'y': 6}
2 x=1/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True {'x': 1} 1 {'y': 0} {'y': 4} {'y': 4}
>>> dt.get_add_actions_df(flatten=True).to_pandas()
path size_bytes modification_time data_change partition.x num_records null_count.y min.y max.y
0 x=2/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True 2 1 0 5 5
1 x=3/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True 3 1 0 6 6
2 x=1/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True 1 1 0 4 4
"""
return self._table.get_add_actions(flatten)
9 changes: 9 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,15 @@ impl RawDeltaTable {

Ok(())
}

pub fn get_add_actions(&self, flatten: bool) -> PyResult<PyArrowType<RecordBatch>> {
Ok(PyArrowType(
self._table
.get_state()
.add_actions_table(flatten)
.map_err(PyDeltaTableError::from_raw)?,
))
}
}

fn convert_partition_filters<'a>(
Expand Down
39 changes: 39 additions & 0 deletions python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,45 @@ def test_history_partitioned_table_metadata():
}


@pytest.mark.parametrize("flatten", [True, False])
def test_add_actions_table(flatten: bool):
table_path = "../rust/tests/data/delta-0.8.0-partitioned"
dt = DeltaTable(table_path)
actions_df = dt.get_add_actions(flatten)
# RecordBatch doesn't have a sort_by method yet
actions_df = pa.Table.from_batches([actions_df]).sort_by("path").to_batches()[0]

assert actions_df.num_rows == 6
assert actions_df["path"] == pa.array(
[
"year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet",
"year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet",
"year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet",
"year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet",
"year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet",
"year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet",
]
)
assert actions_df["size_bytes"] == pa.array([414, 414, 414, 407, 414, 414])
assert actions_df["data_change"] == pa.array([True] * 6)
assert actions_df["modification_time"] == pa.array(
[1615555646000] * 6, type=pa.timestamp("ms")
)

if flatten:
partition_year = actions_df["partition.year"]
partition_month = actions_df["partition.month"]
partition_day = actions_df["partition.day"]
else:
partition_year = actions_df["partition_values"].field("year")
partition_month = actions_df["partition_values"].field("month")
partition_day = actions_df["partition_values"].field("day")

assert partition_year == pa.array(["2020"] * 3 + ["2021"] * 3)
assert partition_month == pa.array(["1", "2", "2", "12", "12", "4"])
assert partition_day == pa.array(["1", "3", "5", "20", "4", "5"])


def assert_correct_files(dt: DeltaTable, partition_filters, expected_paths):
assert dt.files(partition_filters) == expected_paths
absolute_paths = [os.path.join(dt.table_uri, path) for path in expected_paths]
Expand Down
1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ chrono = { version = "0.4.22", default-features = false, features = ["clock"] }
cfg-if = "1"
errno = "0.2"
futures = "0.3"
itertools = "0.10"
lazy_static = "1"
log = "0"
libc = ">=0.2.90, <1"
Expand Down
4 changes: 4 additions & 0 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
#![deny(warnings)]
#![deny(missing_docs)]
#![allow(rustdoc::invalid_html_tags)]

#[cfg(all(feature = "parquet", feature = "parquet2"))]
compile_error!(
Expand All @@ -92,6 +93,9 @@ pub mod table_properties;
pub mod table_state;
pub mod time_utils;

#[cfg(all(feature = "arrow"))]
pub mod table_state_arrow;

#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod checkpoints;
#[cfg(all(feature = "arrow", feature = "parquet"))]
Expand Down
1 change: 0 additions & 1 deletion rust/src/table_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,6 @@ impl DeltaTableState {
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use std::collections::HashMap;

#[test]
fn state_round_trip() {
Expand Down
Loading

0 comments on commit 83260a8

Please sign in to comment.