Skip to content

Commit

Permalink
✨ Add table pipeline (#22)
Browse files Browse the repository at this point in the history
* ✨ Added table pipeline

* ✨ Add migration
  • Loading branch information
falexwolf authored Aug 26, 2022
1 parent 8d38231 commit d35999f
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 34 deletions.
65 changes: 40 additions & 25 deletions lnschema_core/_core.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from datetime import datetime as datetime
from typing import Optional, Union
from typing import Optional

from sqlmodel import Field, ForeignKeyConstraint, SQLModel, UniqueConstraint

from .id import id_dobject, id_dtransform, id_storage, id_usage
from . import id as idg
from .type import usage as usage_type


Expand Down Expand Up @@ -49,10 +49,10 @@ class storage(SQLModel, table=True): # type: ignore
along with metadata.
"""

id: Optional[str] = Field(default_factory=id_storage, primary_key=True)
id: Optional[str] = Field(default_factory=idg.storage, primary_key=True)
root: str = Field(index=True)
region: Optional[str]
type: Optional[str]
region: Optional[str] = None
type: Optional[str] = None
time_created: datetime = Field(default_factory=utcnow, nullable=False)
time_updated: datetime = Field(default_factory=utcnow, nullable=False)

Expand Down Expand Up @@ -94,8 +94,8 @@ class dobject(SQLModel, table=True): # type: ignore
- QC: `.html` ⟷ /
"""

id: Optional[str] = Field(default_factory=id_dobject, primary_key=True)
v: str = Field(default=None, primary_key=True)
id: Optional[str] = Field(default_factory=idg.dobject, primary_key=True)
v: Optional[str] = Field(default="1", primary_key=True)
name: Optional[str] = Field(index=True)
file_suffix: str = Field(index=True)
dtransform_id: str = Field(foreign_key="dtransform.id", index=True)
Expand Down Expand Up @@ -135,10 +135,10 @@ class dtransform(SQLModel, table=True): # type: ignore
name="dtransform_jupynb",
),
)
id: str = Field(default_factory=id_dtransform, primary_key=True)
jupynb_id: Union[str, None] = Field(default=None, index=True)
jupynb_v: Union[str, None] = Field(default=None, index=True)
pipeline_run_id: Union[str, None] = Field(
id: str = Field(default_factory=idg.dtransform, primary_key=True)
jupynb_id: Optional[str] = Field(default=None, index=True)
jupynb_v: Optional[str] = Field(default=None, index=True)
pipeline_run_id: Optional[str] = Field(
default=None, foreign_key="pipeline_run.id", index=True
)

Expand Down Expand Up @@ -176,35 +176,50 @@ class jupynb(SQLModel, table=True): # type: ignore
IDs for Jupyter notebooks are generated through nbproject.
"""

id: str = Field(default=None, primary_key=True)
v: str = Field(default=None, primary_key=True)
id: Optional[str] = Field(default=None, primary_key=True)
v: Optional[str] = Field(default="1", primary_key=True)
name: Optional[str] = Field(index=True)
user_id: str = Field(foreign_key="user.id", index=True)
time_created: datetime = Field(default_factory=utcnow, nullable=False, index=True)
time_updated: datetime = Field(default_factory=utcnow, nullable=False, index=True)


class pipeline(SQLModel, table=True): # type: ignore
"""Pipelines.
A pipeline is typically versioned software that can perform a data
transformation/processing workflow. This can be anything from typical
workflow tools (Nextflow, Snakemake, Prefect, Apache Airflow, etc.) to
simple (versioned) scripts.
"""

id: Optional[str] = Field(default_factory=idg.pipeline, primary_key=True)
v: Optional[str] = Field(default="1", primary_key=True)
name: Optional[str] = Field(default=None, index=True)
reference: Optional[str] = Field(default=None, index=True)


class pipeline_run(SQLModel, table=True): # type: ignore
"""Pipeline runs.
Pipeline runs represent one type of data transformation (`dtransform`) and
have a unique correspondence in `dtransform`.
A pipeline is typically versioned software that can perform a data
transformation/processing workflow. This can be anything from typical
workflow tools (Nextflow, snakemake, prefect, Apache Airflow, etc.) to
simple (versioned) scripts.
For instance, `lnbfx` stores references to bioinformatics workflow runs by
linking to entries in this table.
"""

id: str = Field(default=None, primary_key=True)


# ----------
# Access log
# ----------
__table_args__ = (
ForeignKeyConstraint(
["pipeline_id", "pipeline_v"],
["pipeline.id", "pipeline.v"],
name="pipeline",
),
)
id: Optional[str] = Field(default_factory=idg.pipeline_run, primary_key=True)
pipeline_id: str = Field(index=True)
pipeline_v: str = Field(index=True)
name: Optional[str] = Field(default=None, index=True)


class usage(SQLModel, table=True): # type: ignore
Expand All @@ -221,7 +236,7 @@ class usage(SQLModel, table=True): # type: ignore
),
)

id: Optional[str] = Field(default_factory=id_usage, primary_key=True)
id: Optional[str] = Field(default_factory=idg.usage, primary_key=True)
type: usage_type = Field(nullable=False, index=True)
user_id: str = Field(foreign_key="user.id", nullable=False, index=True)
time: datetime = Field(default_factory=utcnow, nullable=False, index=True)
Expand Down
34 changes: 25 additions & 9 deletions lnschema_core/_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@ def base26(n_char: int):


def schema():
"""Schema module: 4-char base26."""
"""Schema module: 4 base26."""
return base26(4)


def dobject() -> str:
"""Data object: 21-char base62.
"""Data object: 21 base62.
21 characters (62**21=4e+37 possibilities) outperform UUID (2*122=5e+36).
"""
return base62(n_char=21)


def dtransform() -> str:
"""Data transformation: 21-char base62."""
return base62(n_char=22)
"""Data transformation: 21 base62."""
return base62(n_char=21)


def user() -> str:
Expand All @@ -67,34 +67,50 @@ def user() -> str:


def usage() -> str:
"""Usage event: 24-char base62."""
"""Usage event: 24 base62."""
return base62(n_char=24)


def secret() -> str:
"""Password or secret: 40-char base62."""
"""Password or secret: 40 base62."""
return base62(n_char=40)


def instance() -> str:
"""LaminDB instance: 10-char base62.
"""LaminDB instance: 10 base62.
Collision probability is 6e-03 for 100M instances: 1M users with 100 instances/user.
"""
return base62(n_char=10)


def storage() -> str:
"""Storage root: 10-char base62.
"""Storage root: 10 base62.
Collision probability is 6e-03 for 100M storage roots: 1M users with 100
storage roots/user.
"""
return base62(n_char=10)


def pipeline() -> str:
"""Pipeline: 9 base62.
Collision probability is low for 10M pipelines: 1M users with 10 pipelines/user.
"""
return base62(n_char=9)


def pipeline_run() -> str:
"""Pipeline run: 20 base62.
One char less than dtransform and dobject!
"""
return base62(n_char=20)


def jupynb():
"""Jupyter notebook: 12-char base62.
"""Jupyter notebook: 12 base62.
Collision probability is 2e-04 for 1B notebooks: 1M users with 1k notebooks/user.
Expand Down
4 changes: 4 additions & 0 deletions lnschema_core/id.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
dtransform
usage
jupynb
pipeline
pipeline_run
secret
"""
from ._id import ( # noqa
Expand All @@ -34,6 +36,8 @@
dtransform,
instance,
jupynb,
pipeline,
pipeline_run,
schema,
secret,
storage,
Expand Down
95 changes: 95 additions & 0 deletions migrations/versions/2022-08-26-3badf20f18c8-v0_5_0.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""v0.5.0.
Revision ID: 3badf20f18c8
Revises: 01fcb82dafd4
Create Date: 2022-08-26 14:17:51.167740
"""
import sqlalchemy as sa
import sqlmodel
from alembic import op

# revision identifiers, used by Alembic.
revision = "3badf20f18c8"
down_revision = "01fcb82dafd4"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.create_table(
"pipeline",
sa.Column("id", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column("v", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column("reference", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.PrimaryKeyConstraint("id", "v"),
)
op.create_index(op.f("ix_pipeline_name"), "pipeline", ["name"], unique=False)
op.create_index(
op.f("ix_pipeline_reference"), "pipeline", ["reference"], unique=False
)
op.add_column(
"dobject", sa.Column("storage_id", sqlmodel.sql.sqltypes.AutoString())
)
op.alter_column("dobject", "dtransform_id", existing_type=sa.VARCHAR())
op.drop_index("ix_dobject_tmp_dtransform_id", table_name="dobject")
op.drop_index("ix_dobject_tmp_file_suffix", table_name="dobject")
op.drop_index("ix_dobject_tmp_name", table_name="dobject")
op.drop_index("ix_dobject_tmp_storage_root", table_name="dobject")
op.drop_index("ix_dobject_tmp_time_created", table_name="dobject")
op.drop_index("ix_dobject_tmp_time_updated", table_name="dobject")
op.create_index(
op.f("ix_dobject_dtransform_id"), "dobject", ["dtransform_id"], unique=False
)
op.create_index(
op.f("ix_dobject_file_suffix"), "dobject", ["file_suffix"], unique=False
)
op.create_index(op.f("ix_dobject_name"), "dobject", ["name"], unique=False)
op.create_index(
op.f("ix_dobject_storage_id"), "dobject", ["storage_id"], unique=False
)
op.create_index(
op.f("ix_dobject_time_created"), "dobject", ["time_created"], unique=False
)
op.create_index(
op.f("ix_dobject_time_updated"), "dobject", ["time_updated"], unique=False
)
op.add_column(
"pipeline_run",
sa.Column("pipeline_id", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
)
op.add_column(
"pipeline_run",
sa.Column("pipeline_v", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
)
op.add_column(
"pipeline_run",
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
)
op.create_index(
op.f("ix_pipeline_run_name"), "pipeline_run", ["name"], unique=False
)
op.create_index(
op.f("ix_pipeline_run_pipeline_id"),
"pipeline_run",
["pipeline_id"],
unique=False,
)
op.create_index(
op.f("ix_pipeline_run_pipeline_v"), "pipeline_run", ["pipeline_v"], unique=False
)
with op.batch_alter_table("pipeline") as batch_op:
batch_op.create_foreign_key(
"pipeline_run", "pipeline", ["pipeline_id", "pipeline_v"], ["id", "v"]
)
op.add_column(
"storage", sa.Column("id", sqlmodel.sql.sqltypes.AutoString(), nullable=True)
)
op.create_index(op.f("ix_storage_root"), "storage", ["root"], unique=False)
op.alter_column("user", "handle", existing_type=sa.VARCHAR())
with op.batch_alter_table("user") as batch_op:
batch_op.create_unique_constraint("user", ["handle"])


def downgrade() -> None:
pass
Binary file modified testdb/mydata.lndb
Binary file not shown.

0 comments on commit d35999f

Please sign in to comment.