Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid serialization external to delta_log module #34

Merged
merged 5 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions xdlake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ def visitor(visited_file):
path=relpath,
modificationTime=utils.timestamp(),
size=self.loc.fs.size(visited_file.path),
stats=stats.json(),
stats=stats,
partitionValues=partition_values
)
)
Expand Down Expand Up @@ -391,7 +391,7 @@ def add_actions_for_foreign_dataset(self, ds: pa.dataset.FileSystemDataset) -> l
path=fragment.path,
modificationTime=utils.timestamp(),
size=info.size,
stats=stats.json(),
stats=stats,
partitionValues=partition_values,
)
)
Expand Down
31 changes: 22 additions & 9 deletions xdlake/delta_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ def default(self, o):
return repr(o.decode("raw_unicode_escape", "backslashreplace"))
return super().default(o)


@dataclass_transform()
class DeltaLogActionMeta(type):
def __new__(cls, name, bases, dct):
Expand All @@ -66,7 +65,6 @@ def __new__(cls, name, bases, dct):
res = dataclass(new_cls)
return res


class DeltaLogAction(metaclass=DeltaLogActionMeta):
def asdict(self):
return asdict(self)
Expand All @@ -78,7 +76,7 @@ def replace(self, **kwargs):
return replace(self, **kwargs)

def to_action_dict(self) -> dict:
raise NotADirectoryError()
raise NotImplementedError()

@classmethod
def with_info(cls, info: dict):
Expand Down Expand Up @@ -191,7 +189,7 @@ def __eq__(self, o):
return a_fields == b_fields

class TableMetadata(DeltaLogAction):
schemaString: str
schemaString: Schema | str
createdTime: int = field(default_factory=lambda: utils.timestamp())
id: str = field(default_factory=lambda: f"{uuid4()}")
name: str | None = None
Expand All @@ -200,11 +198,22 @@ class TableMetadata(DeltaLogAction):
partitionColumns: list[str] = field(default_factory=lambda: list())
configuration: dict = field(default_factory=lambda: dict())

def __post_init__(self):
match self.schemaString:
case Schema():
self.schemaString = self.schemaString.json()
case str():
self.schemaString = self.schemaString
case _:
raise ValueError(f"Cannot handle schemaString of type {type(self.schemaString)}")

def to_action_dict(self) -> dict:
return {Actions.table_metadata.value: self.asdict()}

@property
def schema(self) -> Schema:
if not isinstance(self.schemaString, str):
raise TypeError("schemaString is not a string")
return Schema(**json.loads(self.schemaString))

class TableOperationParm:
Expand Down Expand Up @@ -352,14 +361,18 @@ class Add(DeltaLogAction):
partitionValues: dict
size: int
modificationTime: int
stats: str
stats: Statistics | str
dataChange: bool | None = None
tags: list | None = None
deletionVector: dict | None = None
baseRowId: str | None = None
defaultRowCommitVersion: int | None = None
clusteringProvider: str | None = None

def __post_init__(self):
if isinstance(self.stats, Statistics):
self.stats = self.stats.json()

def to_action_dict(self) -> dict:
return {Actions.add.name: self.asdict()}

Expand Down Expand Up @@ -439,7 +452,7 @@ def write(self, handle):
Args:
handle (IO): A file handle
"""
handle.write("\n".join([json.dumps(a.to_action_dict()) for a in self.actions]))
handle.write("\n".join([json.dumps(a.to_action_dict(), cls=_JSONEncoder) for a in self.actions]))

def add_actions(self) -> list[Add]:
"""Get all add actions in the entry."""
Expand Down Expand Up @@ -493,7 +506,7 @@ def create_table(cls, path: str, schema: Schema, partition_by: list, add_actions
DeltaLogEntry: A new DeltaLogEntry object
"""
protocol = Protocol()
table_metadata = TableMetadata(schemaString=schema.json(), partitionColumns=partition_by)
table_metadata = TableMetadata(schemaString=schema, partitionColumns=partition_by)
commit = TableCommit.create(path, utils.timestamp(), table_metadata, protocol)
return cls.with_actions([protocol, table_metadata, *add_actions, commit])

Expand All @@ -512,7 +525,7 @@ def append_table(cls, partition_by: list, add_actions: list[Add], schema: Schema
commit = TableCommit.write(utils.timestamp(), mode=WriteMode.append.value, partition_by=partition_by)
actions = add_actions + [commit]
if schema is not None:
table_metadata = TableMetadata(schemaString=schema.json(), partitionColumns=partition_by)
table_metadata = TableMetadata(schemaString=schema, partitionColumns=partition_by)
actions = [table_metadata] + actions
return cls.with_actions(actions)

Expand Down Expand Up @@ -588,7 +601,7 @@ def restore_table(
}
commit = TableCommit.restore(utils.timestamp(), read_version, restore_version, operation_metrics)
remove_actions = generate_remove_acctions(add_actions_to_remove)
table_metadata = TableMetadata(schemaString=restore_schema.json(), partitionColumns=restore_partition_by)
table_metadata = TableMetadata(schemaString=restore_schema, partitionColumns=restore_partition_by)
return cls.with_actions([table_metadata, *remove_actions, *add_actions, commit])

class DeltaLog:
Expand Down
Loading