Skip to content

Commit

Permalink
expose transactions in python
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterKeDer authored and ion-elgreco committed Oct 16, 2024
1 parent 7a3b03f commit b0bb029
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 2 deletions.
1 change: 1 addition & 0 deletions python/deltalake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .table import DeltaTable as DeltaTable
from .table import Metadata as Metadata
from .table import PostCommitHookProperties as PostCommitHookProperties
from .table import Transaction as Transaction
from .table import (
WriterProperties as WriterProperties,
)
Expand Down
1 change: 1 addition & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ class RawDeltaTable:
starting_timestamp: Optional[str] = None,
ending_timestamp: Optional[str] = None,
) -> pyarrow.RecordBatchReader: ...
def transaction_versions(self) -> Dict[str, str]: ...

def rust_core_version() -> str: ...
def write_new_deltalake(
Expand Down
15 changes: 15 additions & 0 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ def __init__(
self.cleanup_expired_logs = cleanup_expired_logs


@dataclass
class Transaction:
app_id: str
version: int
last_updated: Optional[int] = None


@dataclass(init=True)
class CommitProperties:
"""The commit properties. Controls the behaviour of the commit."""
Expand All @@ -158,6 +165,7 @@ def __init__(
self,
custom_metadata: Optional[Dict[str, str]] = None,
max_commit_retries: Optional[int] = None,
app_transactions: Optional[List[Transaction]] = None,
):
"""Custom metadata to be stored in the commit. Controls the number of retries for the commit.
Expand All @@ -167,6 +175,7 @@ def __init__(
"""
self.custom_metadata = custom_metadata
self.max_commit_retries = max_commit_retries
self.app_transactions = app_transactions


def _commit_properties_from_custom_metadata(
Expand Down Expand Up @@ -1417,6 +1426,12 @@ def repair(
)
return json.loads(metrics)

def transaction_versions(self) -> Dict[str, Dict[str, Any]]:
return {
app_id: json.loads(transaction)
for app_id, transaction in self._table.transaction_versions().items()
}


class TableMerger:
"""API for various table `MERGE` commands."""
Expand Down
38 changes: 37 additions & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use deltalake::datafusion::prelude::SessionContext;
use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::errors::DeltaTableError;
use deltalake::kernel::{
scalars::ScalarExt, Action, Add, Invariant, LogicalFile, Remove, StructType,
scalars::ScalarExt, Action, Add, Invariant, LogicalFile, Remove, StructType, Transaction,
};
use deltalake::operations::add_column::AddColumnBuilder;
use deltalake::operations::add_feature::AddTableFeatureBuilder;
Expand Down Expand Up @@ -1232,6 +1232,19 @@ impl RawDeltaTable {
self._table.state = table.state;
Ok(serde_json::to_string(&metrics).unwrap())
}

pub fn transaction_versions(&self) -> HashMap<String, String> {
self._table
.get_app_transaction_version()
.iter()
.map(|(app_id, transaction)| {
(
app_id.to_owned(),
serde_json::to_string(transaction).unwrap(),
)
})
.collect()
}
}

fn set_post_commithook_properties(
Expand Down Expand Up @@ -1378,6 +1391,11 @@ fn maybe_create_commit_properties(
if let Some(max_retries) = commit_props.max_commit_retries {
commit_properties = commit_properties.with_max_retries(max_retries);
};

if let Some(app_transactions) = commit_props.app_transactions {
let app_transactions = app_transactions.iter().map(Transaction::from).collect();
commit_properties = commit_properties.with_application_transactions(app_transactions);
}
}

if let Some(post_commit_hook_props) = post_commithook_properties {
Expand Down Expand Up @@ -1656,10 +1674,28 @@ pub struct PyPostCommitHookProperties {
cleanup_expired_logs: Option<bool>,
}

#[derive(FromPyObject)]
pub struct PyTransaction {
app_id: String,
version: i64,
last_updated: Option<i64>,
}

impl From<&PyTransaction> for Transaction {
fn from(value: &PyTransaction) -> Self {
Transaction {
app_id: value.app_id.clone(),
version: value.version,
last_updated: value.last_updated,
}
}
}

#[derive(FromPyObject)]
pub struct PyCommitProperties {
custom_metadata: Option<HashMap<String, String>>,
max_commit_retries: Option<usize>,
app_transactions: Option<Vec<PyTransaction>>,
}

#[pyfunction]
Expand Down
31 changes: 30 additions & 1 deletion python/tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
DeltaProtocolError,
SchemaMismatchError,
)
from deltalake.table import ProtocolVersions
from deltalake.table import CommitProperties, ProtocolVersions, Transaction
from deltalake.writer import try_get_table_and_table_uri

try:
Expand Down Expand Up @@ -1993,3 +1993,32 @@ def test_write_timestamp(tmp_path: pathlib.Path):
# Now that a datetime has been passed through the writer version needs to
# be upgraded to 7 to support timestampNtz
assert protocol.min_writer_version == 2


def test_write_transactions(tmp_path: pathlib.Path, sample_data: pa.Table):
transactions = [
Transaction(app_id="app_1", version=1),
Transaction(app_id="app_2", version=2, last_updated=123456),
]
commit_properties = CommitProperties(app_transactions=transactions)
write_deltalake(
table_or_uri=tmp_path,
data=sample_data,
mode="overwrite",
schema_mode="overwrite",
commit_properties=commit_properties,
)

delta_table = DeltaTable(tmp_path)
transactions = delta_table.transaction_versions()

assert len(transactions) == 2
assert transactions["app_1"] == {
"appId": "app_1",
"version": 1,
}
assert transactions["app_2"] == {
"appId": "app_2",
"version": 2,
"lastUpdated": 123456,
}

0 comments on commit b0bb029

Please sign in to comment.