From c5e0053e07329ca224d793822a8ef5f51cefdc1f Mon Sep 17 00:00:00 2001 From: Brian Hannafious <32105697+xbrianh@users.noreply.github.com> Date: Mon, 9 Sep 2024 22:21:58 -0600 Subject: [PATCH 1/2] Create commit context for DeltaTable --- tests/test_delta_log.py | 23 ++++++++++++++++++++++- tests/test_xdlake.py | 32 +++++++++++++++++++++++++++++++- xdlake/__init__.py | 30 +++++++++++++++++++++++++++--- xdlake/delta_log.py | 13 ++++++++----- 4 files changed, 88 insertions(+), 10 deletions(-) diff --git a/tests/test_delta_log.py b/tests/test_delta_log.py index 32af47c..fef62c3 100644 --- a/tests/test_delta_log.py +++ b/tests/test_delta_log.py @@ -2,8 +2,9 @@ import unittest from uuid import uuid4 from unittest import mock +from tempfile import TemporaryDirectory +from contextlib import contextmanager -import xdlake from xdlake import delta_log, utils @@ -58,6 +59,26 @@ def test_partition_columns(self): dlog.entries[len(dlog.entries)] = tc self.assertEqual(expected_partition_columns, dlog.partition_columns()) + def test_commit(self): + obj = mock.MagicMock() + + @contextmanager + def commit_ctx(loc): + try: + obj(loc.path) + yield + finally: + pass + + with TemporaryDirectory() as scratch: + dlog = delta_log.DeltaLog.with_location(f"{scratch}/_delta_log") + write_loc = dlog.loc.append_path("00000000000000000000.json") + with write_loc.open(mode="w") as fh: + fh.write("") + obj.assert_not_called() + dlog.commit(mock.MagicMock(), commit_ctx) + obj.assert_called_once_with(write_loc.path) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_xdlake.py b/tests/test_xdlake.py index c4e2c46..4a7e6c1 100644 --- a/tests/test_xdlake.py +++ b/tests/test_xdlake.py @@ -1,7 +1,8 @@ import os import unittest -from contextlib import nullcontext +from contextlib import contextmanager, nullcontext from uuid import uuid4 +from unittest import mock import pyarrow as pa import pyarrow.dataset @@ -228,6 +229,35 @@ def test_from_pandas(self): self._test_delete(xdl) self._test_clone(xdl) + def test_commit(self): + loc = f"{self.scratch_folder}" + expected_path = f"{loc}/_delta_log/00000000000000000000.json" + + xdl = xdlake.DeltaTable(loc) + with open(expected_path, "w") as fh: + fh.write("") + with self.assertRaises(FileExistsError): + xdl = xdl.commit(mock.MagicMock()) + + def test_commit_context(self): + loc = f"{self.scratch_folder}" + expected_path = f"{loc}/_delta_log/00000000000000000000.json" + mock_obj = mock.MagicMock() + + class TestDeltaTable(xdlake.DeltaTable): + @contextmanager + def commit_context(s, loc): + try: + mock_obj(loc.path) + yield + finally: + pass + + xdl = TestDeltaTable(loc) + mock_obj.assert_not_called() + xdl.commit(mock.MagicMock()) + mock_obj.assert_called_once_with(expected_path) + if __name__ == '__main__': unittest.main() diff --git a/xdlake/__init__.py b/xdlake/__init__.py index 72ad4ff..9e3c9b8 100644 --- a/xdlake/__init__.py +++ b/xdlake/__init__.py @@ -1,6 +1,7 @@ import functools import operator from uuid import uuid4 +from contextlib import contextmanager from collections import defaultdict from typing import Iterable @@ -172,7 +173,7 @@ def write( schema = self.dlog.evaluate_schema(ds.schema, mode, schema_mode) new_add_actions = self.write_data(ds, partition_by, write_arrow_dataset_options) entry = self.dlog.entry_for_write_mode(mode, schema, new_add_actions, partition_by) - return type(self)(self.loc, self.dlog.commit(entry)) + return self.commit(entry) def import_refs( self, @@ -209,7 +210,7 @@ def import_refs( for child_ds in ds.children: new_add_actions.extend(self.add_actions_for_foreign_dataset(child_ds)) entry = self.dlog.entry_for_write_mode(mode, schema, new_add_actions, partition_by) - return type(self)(self.loc, self.dlog.commit(entry)) + return self.commit(entry) def clone(self, dst_loc: str | storage.Location, dst_log_loc: str | None = None) -> "DeltaTable": """Clone the DeltaTable @@ -288,7 +289,7 @@ def delete(self, where: pc.Expression, write_arrow_dataset_options: dict | None num_copied_rows=num_copied_rows, num_deleted_rows=num_deleted_rows, ) - return type(self)(self.loc, self.dlog.commit(new_entry)) + return self.commit(new_entry) def write_data( self, @@ -379,3 +380,26 @@ def add_actions_for_foreign_dataset(self, ds: pa.dataset.FileSystemDataset) -> l ) return add_actions + + @contextmanager + def commit_context(self, loc: storage.Location): + """Context for transaction log writes. + + Locking can be implimented by overriding this method. The default behavior is to raise if loc exists. + + Args: + loc (storage.Location): The location where the new transaction log etnry will be written. + + Returns: + A context manager. + """ + try: + if loc.exists(): + raise FileExistsError("This transaction log version already exists!") + yield + finally: + pass + + def commit(self, entry: delta_log.DeltaLogEntry) -> "DeltaTable": + new_dlog = self.dlog.commit(entry, self.commit_context) + return type(self)(self.loc, new_dlog) diff --git a/xdlake/delta_log.py b/xdlake/delta_log.py index 1dd58ae..b49d443 100644 --- a/xdlake/delta_log.py +++ b/xdlake/delta_log.py @@ -3,6 +3,7 @@ import datetime from enum import Enum from uuid import uuid4 +from contextlib import nullcontext from collections import defaultdict from dataclasses import dataclass, asdict, field, fields, replace from collections.abc import ValuesView @@ -688,12 +689,14 @@ def entry_for_write_mode( entry = DeltaLogEntry.commit_overwrite_table(partition_by, existing_add_actions, add_actions) return entry - def commit(self, entry: DeltaLogEntry) -> "DeltaLog": + def commit(self, entry: DeltaLogEntry, context = nullcontext) -> "DeltaLog": if 0 == self.version_to_write: - self.loc.mkdir() - with self.loc.append_path(utils.filename_for_version(self.version_to_write)).open(mode="w") as fh: - entry.write(fh) - return type(self).with_location(self.loc) + self.loc.mkdir(exists_ok=True) + loc = self.loc.append_path(utils.filename_for_version(self.version_to_write)) + with context(loc): + with loc.open(mode="w") as fh: + entry.write(fh) + return type(self).with_location(self.loc) def generate_remove_acctions(add_actions: Iterable[Add]) -> list[Remove]: From c36d586074c45e604a56539a97de6faac742927c Mon Sep 17 00:00:00 2001 From: Brian Hannafious <32105697+xbrianh@users.noreply.github.com> Date: Wed, 11 Sep 2024 07:00:02 -0600 Subject: [PATCH 2/2] wip --- tests/test_xdlake.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_xdlake.py b/tests/test_xdlake.py index 4a7e6c1..bbb6ff0 100644 --- a/tests/test_xdlake.py +++ b/tests/test_xdlake.py @@ -234,6 +234,7 @@ def test_commit(self): expected_path = f"{loc}/_delta_log/00000000000000000000.json" xdl = xdlake.DeltaTable(loc) + os.mkdir(os.path.dirname(expected_path)) with open(expected_path, "w") as fh: fh.write("") with self.assertRaises(FileExistsError):