Skip to content

Commit

Permalink
Add datetime checksum functionality to icechunk writer
Browse files Browse the repository at this point in the history
  • Loading branch information
mpiannucci committed Jan 3, 2025
1 parent 6f32ef2 commit 8aebec0
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 2 deletions.
13 changes: 12 additions & 1 deletion virtualizarr/accessor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Callable, Literal, Optional, overload

Expand Down Expand Up @@ -39,7 +40,10 @@ def to_zarr(self, storepath: str) -> None:
dataset_to_zarr(self.ds, storepath)

def to_icechunk(
self, store: "IcechunkStore", append_dim: Optional[str] = None
self,
store: "IcechunkStore",
append_dim: Optional[str] = None,
last_updated_at: Optional[datetime] = None,
) -> None:
"""
Write an xarray dataset to an Icechunk store.
Expand All @@ -48,10 +52,17 @@ def to_icechunk(
If `append_dim` is provided, the virtual dataset will be appended to the existing IcechunkStore along the `append_dim` dimension.
If `last_updated_at` is provided, it will be used as a checksum for any virtual chunks written to the store with this operation. At read time, if any of the virtual chunks have been updated since this provided datetime, an error will be raised.
This protects against reading outdated virtual chunks that have been updated since the last read.
Parameters
----------
store: IcechunkStore
append_dim: str, optional
When provided, specifies the dimension along which to append the virtual dataset.
last_updated_at: datetime, optional
When provided, uses provided datetime as a checksum for any virtual chunks written to the store with this operation.
"""
from virtualizarr.writers.icechunk import dataset_to_icechunk

Expand Down
19 changes: 18 additions & 1 deletion virtualizarr/writers/icechunk.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from typing import TYPE_CHECKING, List, Optional, Union, cast

import numpy as np
Expand All @@ -23,7 +24,10 @@


def dataset_to_icechunk(
ds: Dataset, store: "IcechunkStore", append_dim: Optional[str] = None
ds: Dataset,
store: "IcechunkStore",
append_dim: Optional[str] = None,
last_updated_at: Optional[datetime] = None,
) -> None:
"""
Write an xarray dataset whose variables wrap ManifestArrays to an Icechunk store.
Expand All @@ -34,6 +38,12 @@ def dataset_to_icechunk(
----------
ds: xr.Dataset
store: IcechunkStore
append_dim: Optional[str]
Name of the dimension along which to append data. If provided, the dataset must have a dimension with this name.
last_updated_at: Optional[datetime]
The time at which the virtual dataset was last updated. When specified, if any of the virtual chunks written in this
session are modified in storage after this time, icechunk will raise an error at runtime when trying to read the
virtual chunk
"""
try:
from icechunk import IcechunkStore # type: ignore[import-not-found]
Expand Down Expand Up @@ -71,6 +81,7 @@ def dataset_to_icechunk(
store=store,
group=root_group,
append_dim=append_dim,
last_updated_at=last_updated_at,
)


Expand All @@ -80,6 +91,7 @@ def write_variables_to_icechunk_group(
store,
group,
append_dim: Optional[str] = None,
last_updated_at: Optional[datetime] = None,
):
virtual_variables = {
name: var
Expand Down Expand Up @@ -108,6 +120,7 @@ def write_variables_to_icechunk_group(
name=name,
var=var,
append_dim=append_dim,
last_updated_at=last_updated_at,
)


Expand Down Expand Up @@ -155,6 +168,7 @@ def write_virtual_variable_to_icechunk(
name: str,
var: Variable,
append_dim: Optional[str] = None,
last_updated_at: Optional[datetime] = None,
) -> None:
"""Write a single virtual variable into an icechunk store"""
from zarr import Array
Expand Down Expand Up @@ -217,6 +231,7 @@ def write_virtual_variable_to_icechunk(
manifest=ma.manifest,
append_axis=append_axis,
existing_num_chunks=existing_num_chunks,
last_updated_at=last_updated_at,
)


Expand Down Expand Up @@ -244,6 +259,7 @@ def write_manifest_virtual_refs(
manifest: ChunkManifest,
append_axis: Optional[int] = None,
existing_num_chunks: Optional[int] = None,
last_updated_at: Optional[datetime] = None,
) -> None:
"""Write all the virtual references for one array manifest at once."""

Expand Down Expand Up @@ -275,4 +291,5 @@ def write_manifest_virtual_refs(
location=path.item(),
offset=offset.item(),
length=length.item(),
checksum=last_updated_at,
)

0 comments on commit 8aebec0

Please sign in to comment.