Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Combine record into DObject #77

Merged
merged 5 commits into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,4 @@ repos:
hooks:
- id: pydocstyle
args: # google style + __init__, see http://www.pydocstyle.org/en/stable/error_codes.html
- --ignore=D100,D101,D102,D103,D106,D107,D203,D204,D213,D215,D400,D401,D403,D404,D406,D407,D408,D409,D413
- --ignore=D100,D101,D102,D103,D106,D107,D203,D204,D213,D215,D400,D401,D403,D404,D406,D407,D408,D409,D413,D418
306 changes: 305 additions & 1 deletion lnschema_core/_core.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import base64
import hashlib
from datetime import datetime as datetime
from pathlib import Path
from typing import List, Optional, Union
from typing import Any, List, Optional, Set, Tuple, Union, overload # noqa

import anndata as ad
import pandas as pd
import sqlalchemy as sa
from cloudpathlib import CloudPath
from lamin_logger import logger
from pydantic.fields import PrivateAttr
from sqlmodel import Field, ForeignKeyConstraint, Relationship
from sqlmodel import SQLModel as SQLModelPublicSchema
Expand All @@ -20,6 +25,245 @@
SQLModel, prefix, schema_arg = schema_sqlmodel(schema_name)


def serialize(
data: Union[Path, str, pd.DataFrame, ad.AnnData], name, format
) -> Tuple[Any, Path, str, str]:
"""Serialize a data object that's provided as file or in memory."""
from lamindb.dev._core import get_name_suffix_from_filepath
from lamindb.dev.object import infer_suffix, write_to_file

memory_rep = None
if isinstance(data, (Path, str)):
local_filepath = Path(data)
name, suffix = get_name_suffix_from_filepath(local_filepath)
elif isinstance(data, (pd.DataFrame, ad.AnnData)):
if name is None:
raise RuntimeError("Provide name if recording in-memory data.")
memory_rep = data
suffix = infer_suffix(data, format)
local_filepath = Path(f"{name}{suffix}")
if suffix != ".zarr":
write_to_file(data, local_filepath)
else:
raise NotImplementedError("Recording not yet implemented for this type.")
return memory_rep, local_filepath, name, suffix


def get_features_records(
parsing_id: str,
features_ref: Any,
df_curated: pd.DataFrame,
) -> List[Any]:
# insert species entry if not exists
import lnschema_bionty as bionty
from lamindb import add, select
from lamindb.schema._table import table_meta

species = select(bionty.Species, common_name=features_ref.species).one_or_none()
if species is None:
species = add(bionty.Species(common_name=features_ref.species))

model = table_meta.get_model(f"bionty.{features_ref.entity}")

# all existing feature records of the species in the db
stmt = (
select(model)
.where(getattr(model, parsing_id).in_(df_curated.index))
.where(getattr(model, "species_id") == species.id)
)
records = stmt.all()
records_df = df_curated.index.intersection(stmt.df()[parsing_id])

# new records to be appended
new_ids = df_curated.index.difference(records_df)
if len(new_ids) > 0:
# mapped new_ids
mapped = features_ref.df.loc[features_ref.df.index.intersection(new_ids)].copy()
mapped.index.name = parsing_id
if mapped.shape[0] > 0:
for kwargs in mapped.reset_index().to_dict(orient="records"):
kwargs["species_id"] = species.id
record = model(**kwargs)
records.append(record)
# unmapped new_ids
unmapped = set(new_ids).difference(mapped.index)
if len(unmapped) > 0:
for i in unmapped:
record = model(**{parsing_id: i, "species_id": species.id})
records.append(record)

return records


def create_dobject_from_data(
data: Union[Path, str, pd.DataFrame, ad.AnnData],
*,
name: Optional[str] = None,
features_ref: Any = None,
source: Optional["Run"] = None,
id: Optional[str] = None,
format: Optional[str] = None,
):
"""Record a data object.

Guide: :doc:`/db/guide/ingest`.

Args:
data: Filepath or in-memory data.
name: Name of the data object, required if an in-memory object is passed.
features_ref: Reference against which to link features.
source: The data transform that links to the data source of the data object.
id: The id of the dobject.
format: Whether to use `h5ad` or `zarr` to store an `AnnData` object.
"""
from lamindb import select, settings
from lamindb.dev.object import size_adata

run = get_run(source)
memory_rep, local_filepath, name, suffix = serialize(data, name, format)
if suffix != ".zarr":
size = Path(local_filepath).stat().st_size
else:
size = size_adata(memory_rep)
hash = get_hash(local_filepath, suffix)
storage = select(Storage, root=str(settings.instance.storage_root)).one()
dobject = DObject( # type: ignore
name=name,
suffix=suffix,
hash=hash,
run_id=run.id,
size=size,
storage_id=storage.id,
source=run,
)
if id is not None: # cannot pass it into constructor due to default factory
dobject.id = id
dobject._local_filepath = local_filepath
dobject._memory_rep = memory_rep
if features_ref is not None:
dobject.features.append(get_features(dobject, features_ref))
return dobject


def get_run(run: Optional["Run"]) -> "Run":
if run is None:
from lamindb._nb import _run

run = _run
if run is None:
raise ValueError("Pass a Run record.")
return run


def parse_features(df: pd.DataFrame, features_ref: Any) -> None:
"""Link features to a knowledge table.

Args:
df: a DataFrame
features_ref: Features reference class.
"""
from lamindb import select
from lamindb.knowledge import CellMarker, Gene, Protein

parsing_id = features_ref._id_field

# Add and curate features against a knowledge table
column = None
if parsing_id in df.columns:
column = parsing_id
else:
logger.warning(f"{parsing_id} column not found, using index as features.")
df_curated = features_ref.curate(df=df, column=column)

# logging of curation
n = df_curated["__curated__"].count()
n_mapped = df_curated["__curated__"].sum()
log = { # noqa TODO: store this somewhere in the db
"feature": parsing_id,
"n_mapped": n_mapped,
"percent_mapped": round(n_mapped / n * 100, 1),
"unmapped": df_curated.index[~df_curated["__curated__"]],
}

features_hash = hash_set(set(df_curated.index))

features = select(
Features,
id=features_hash,
type=features_ref.entity,
).one_or_none()
if features is not None:
return features # features already exists!

features = Features(id=features_hash, type=features_ref.entity)
records = get_features_records(parsing_id, features_ref, df_curated)

if isinstance(features_ref, Gene):
for record in records:
features.genes.append(record)
elif isinstance(features_ref, Protein):
for record in records:
features.proteins.append(record)
elif isinstance(features_ref, CellMarker):
for record in records:
features.cell_markers.append(record)

return features


def get_features(dobject, features_ref):
"""Updates dobject in place."""
from lamindb.dev.file import load_to_memory

memory_rep = dobject._memory_rep
if memory_rep is None:
memory_rep = load_to_memory(dobject._local_filepath)
try:
df = getattr(memory_rep, "var") # for AnnData objects
if callable(df):
df = memory_rep
except AttributeError:
df = memory_rep
return parse_features(df, features_ref)


def get_hash(local_filepath, suffix):
from lamindb import select

if suffix != ".zarr": # if not streamed
hash = hash_file(local_filepath)
result = select(DObject, hash=hash).one_or_none()
if result is not None:
logger.warning(
"Based on the MD5 hash, the same data object is already"
f" in the DB: {result}"
)
else:
hash = None
return hash


def hash_file(path: Path) -> str:
# based on https://stackoverflow.com/questions/3431825/generating-an-md5-hash-of-a-file # noqa
hash_md5 = hashlib.md5()
with open(path, "rb") as file:
for chunk in iter(lambda: file.read(4096), b""):
hash_md5.update(chunk)
return to_b64_str(hash_md5.digest())


def to_b64_str(bstr: bytes):
b64 = base64.urlsafe_b64encode(bstr).decode().strip("=")
return b64


# a lot to read about this: lamin-notes/2022/hashing
def hash_set(s: Set[str]) -> str:
bstr = ":".join(sorted(s)).encode("utf-8")
# as we're truncating at 20 b64, we choose md5 over sha512
return to_b64_str(hashlib.md5(bstr).digest())[:20]


class User(SQLModel, table=True): # type: ignore
"""User accounts.

Expand Down Expand Up @@ -182,6 +426,66 @@ def path(self) -> Union[Path, CloudPath]:
"""Path on storage."""
return filepath_from_dobject(self)

@overload
def __init__(
self,
data: Union[Path, str, pd.DataFrame, ad.AnnData] = None,
*,
name: Optional[str] = None,
features_ref: Any = None,
source: Optional["Run"] = None,
id: Optional[str] = None,
format: Optional[str] = None,
):
"""Create a DObject record from data."""
...

@overload
def __init__(
self,
id: Optional[str] = None,
name: Optional[str] = None,
source: Optional["Run"] = None,
suffix: Optional[str] = None,
hash: Optional[str] = None,
run_id: Optional[str] = None,
storage_id: Optional[str] = None,
features: List["Features"] = [],
targets: List["Run"] = [],
):
"""Create a DObject record from fields."""
...

def __init__( # type: ignore
self,
data: Union[Path, str, pd.DataFrame, ad.AnnData] = None,
*,
features_ref: Any = None,
source: Optional["Run"] = None,
format: Optional[str] = None,
id: Optional[str] = None,
name: Optional[str] = None,
suffix: Optional[str] = None,
hash: Optional[str] = None,
run_id: Optional[str] = None,
storage_id: Optional[str] = None,
features: List["Features"] = [],
targets: List["Run"] = [],
):
kwargs = locals()
if data is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from lamindb import create_dobject_from_data

record = create_dobject_from_data(
data=data,
name=name,
features_ref=features_ref,
source=source,
id=id,
format=format,
)
kwargs = record.dict()

super().__init__(**kwargs)


class Run(SQLModel, table=True): # type: ignore
"""Code runs that transform data.
Expand Down