Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Now the upsert is also going to call the store if it is necessary #108

Merged
merged 1 commit into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setup(
name='TensorDB',
version='0.25',
version='0.26',
description='Database based in a file system storage combined with Xarray and Zarr',
author='Joseph Nowak',
author_email='josephgonowak97@gmail.com',
Expand Down
10 changes: 9 additions & 1 deletion tensordb/storages/zarr_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def update(
new_data: Union[xr.DataArray, xr.Dataset],
compute: bool = True,
complete_update_dims: Union[List[str], str] = None,
) -> xr.backends.ZarrStore:
) -> Union[xr.backends.ZarrStore, None]:
"""
Replace data on an existing Zarr files based on the new_data, internally calls the method
`to_zarr <https://xr.pydata.org/en/stable/generated/xr.Dataset.to_zarr.html>`_ using the
Expand Down Expand Up @@ -339,6 +339,8 @@ def update(
k: new_data.coords[k].isin(v)
for k, v in act_coords.items()
})
if any(size == 0 for size in new_data.sizes.values()):
return None

if complete_update_dims is not None:
if isinstance(complete_update_dims, str):
Expand Down Expand Up @@ -383,12 +385,18 @@ def upsert(
A list of xr.backends.ZarrStore produced by the append and update methods

"""
if not self.exist():
return [
self.store(new_data, compute=compute)
]

delayed_writes = [
self.update(new_data, compute=compute, complete_update_dims=complete_update_dims)
]
delayed_writes.extend(
self.append(new_data, compute=compute)
)
delayed_writes = [write for write in delayed_writes if write is not None]
return delayed_writes

def drop(
Expand Down
43 changes: 22 additions & 21 deletions tensordb/tests/test_algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,34 +139,35 @@ def test_bitmask_topk_tie_breaker(top_size, dim):
coords={'c': list(range(2)), 'a': list(range(6)), 'b': list(range(3))}
).chunk((1, 3, 1))

df = pd.DataFrame({
"data1": arr.isel(c=0).to_series(),
"data2": arr.isel(c=1).to_series()
}).reset_index()

group_dim = "b" if dim == "a" else "a"

ranked = df.groupby(group_dim, group_keys=False).apply(
lambda x: x[["data1", "data2"]].fillna(-np.inf).apply(
tuple, axis=1
).rank(
ascending=False, method="min"
).astype(int)
)
df["rank"] = ranked
df.loc[df["data1"].isna(), "rank"] = np.nan

expected = df.pivot_table(
index="a", columns="b", values="rank", dropna=False
) <= top_size

result = Algorithms.bitmask_topk(
arr,
dim=dim,
tie_breaker_dim="c",
top_size=top_size
).isel(c=0, drop=True)

total_dfs = [
arr.sel(c=index, drop=True).to_pandas()
for index in arr.c.values
]
df = (
pd.concat(total_dfs)
.fillna(-np.inf)
.stack(dropna=False)
.groupby(level=[0, 1])
.apply(tuple)
.unstack()
)

df = df.where(~total_dfs[0].isna())

expected = df.rank(
# Subtract 1 due that the C dim is at the first position
arr.dims.index(dim) - 1,
method="min",
ascending=False
) <= top_size

assert result.equals(
xr.DataArray(
expected.values,
Expand Down
17 changes: 17 additions & 0 deletions tensordb/tests/test_tensor_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,23 @@ def test_update(self, use_local):
tensor_client.update(new_data=self.arr2, path='data_one')
assert tensor_client.read(path='data_one').equals(self.arr2)

assert tensor_client.update(
new_data=self.arr2.reindex(index=[100]),
path='data_one'
) is None

@pytest.mark.parametrize('use_local', [True, False])
def test_upsert(self, use_local):
tensor_client = self.local_tensor_client if use_local else self.tensor_client
tensor_client.create_tensor({"path": "data_upsert"})
# Test creating from strach the tensor, this should call the store method
tensor_client.upsert(new_data=self.arr2, path="data_upsert")
assert tensor_client.read(path='data_upsert').equals(self.arr2)

# This method should call the append and the update method
tensor_client.upsert(new_data=self.arr3, path="data_upsert")
assert tensor_client.read(path='data_upsert').equals(self.arr3)

@pytest.mark.parametrize('use_local', [True, False])
def test_drop(self, use_local):
tensor_client = self.local_tensor_client if use_local else self.tensor_client
Expand Down