Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add table pipeline #22

Merged
merged 2 commits into from
Aug 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 40 additions & 25 deletions lnschema_core/_core.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from datetime import datetime as datetime
from typing import Optional, Union
from typing import Optional

from sqlmodel import Field, ForeignKeyConstraint, SQLModel, UniqueConstraint

from .id import id_dobject, id_dtransform, id_storage, id_usage
from . import id as idg
from .type import usage as usage_type


Expand Down Expand Up @@ -49,10 +49,10 @@ class storage(SQLModel, table=True): # type: ignore
along with metadata.
"""

id: Optional[str] = Field(default_factory=id_storage, primary_key=True)
id: Optional[str] = Field(default_factory=idg.storage, primary_key=True)
root: str = Field(index=True)
region: Optional[str]
type: Optional[str]
region: Optional[str] = None
type: Optional[str] = None
time_created: datetime = Field(default_factory=utcnow, nullable=False)
time_updated: datetime = Field(default_factory=utcnow, nullable=False)

Expand Down Expand Up @@ -94,8 +94,8 @@ class dobject(SQLModel, table=True): # type: ignore
- QC: `.html` ⟷ /
"""

id: Optional[str] = Field(default_factory=id_dobject, primary_key=True)
v: str = Field(default=None, primary_key=True)
id: Optional[str] = Field(default_factory=idg.dobject, primary_key=True)
v: Optional[str] = Field(default="1", primary_key=True)
name: Optional[str] = Field(index=True)
file_suffix: str = Field(index=True)
dtransform_id: str = Field(foreign_key="dtransform.id", index=True)
Expand Down Expand Up @@ -135,10 +135,10 @@ class dtransform(SQLModel, table=True): # type: ignore
name="dtransform_jupynb",
),
)
id: str = Field(default_factory=id_dtransform, primary_key=True)
jupynb_id: Union[str, None] = Field(default=None, index=True)
jupynb_v: Union[str, None] = Field(default=None, index=True)
pipeline_run_id: Union[str, None] = Field(
id: str = Field(default_factory=idg.dtransform, primary_key=True)
jupynb_id: Optional[str] = Field(default=None, index=True)
jupynb_v: Optional[str] = Field(default=None, index=True)
pipeline_run_id: Optional[str] = Field(
default=None, foreign_key="pipeline_run.id", index=True
)

Expand Down Expand Up @@ -176,35 +176,50 @@ class jupynb(SQLModel, table=True): # type: ignore
IDs for Jupyter notebooks are generated through nbproject.
"""

id: str = Field(default=None, primary_key=True)
v: str = Field(default=None, primary_key=True)
id: Optional[str] = Field(default=None, primary_key=True)
v: Optional[str] = Field(default="1", primary_key=True)
name: Optional[str] = Field(index=True)
user_id: str = Field(foreign_key="user.id", index=True)
time_created: datetime = Field(default_factory=utcnow, nullable=False, index=True)
time_updated: datetime = Field(default_factory=utcnow, nullable=False, index=True)


class pipeline(SQLModel, table=True): # type: ignore
"""Pipelines.

A pipeline is typically versioned software that can perform a data
transformation/processing workflow. This can be anything from typical
workflow tools (Nextflow, Snakemake, Prefect, Apache Airflow, etc.) to
simple (versioned) scripts.
"""

id: Optional[str] = Field(default_factory=idg.pipeline, primary_key=True)
v: Optional[str] = Field(default="1", primary_key=True)
name: Optional[str] = Field(default=None, index=True)
reference: Optional[str] = Field(default=None, index=True)


class pipeline_run(SQLModel, table=True): # type: ignore
"""Pipeline runs.

Pipeline runs represent one type of data transformation (`dtransform`) and
have a unique correspondence in `dtransform`.

A pipeline is typically versioned software that can perform a data
transformation/processing workflow. This can be anything from typical
workflow tools (Nextflow, snakemake, prefect, Apache Airflow, etc.) to
simple (versioned) scripts.

For instance, `lnbfx` stores references to bioinformatics workflow runs by
linking to entries in this table.
"""

id: str = Field(default=None, primary_key=True)


# ----------
# Access log
# ----------
__table_args__ = (
ForeignKeyConstraint(
["pipeline_id", "pipeline_v"],
["pipeline.id", "pipeline.v"],
name="pipeline",
),
)
id: Optional[str] = Field(default_factory=idg.pipeline_run, primary_key=True)
pipeline_id: str = Field(index=True)
pipeline_v: str = Field(index=True)
name: Optional[str] = Field(default=None, index=True)


class usage(SQLModel, table=True): # type: ignore
Expand All @@ -221,7 +236,7 @@ class usage(SQLModel, table=True): # type: ignore
),
)

id: Optional[str] = Field(default_factory=id_usage, primary_key=True)
id: Optional[str] = Field(default_factory=idg.usage, primary_key=True)
type: usage_type = Field(nullable=False, index=True)
user_id: str = Field(foreign_key="user.id", nullable=False, index=True)
time: datetime = Field(default_factory=utcnow, nullable=False, index=True)
Expand Down
34 changes: 25 additions & 9 deletions lnschema_core/_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@ def base26(n_char: int):


def schema():
"""Schema module: 4-char base26."""
"""Schema module: 4 base26."""
return base26(4)


def dobject() -> str:
"""Data object: 21-char base62.
"""Data object: 21 base62.

21 characters (62**21=4e+37 possibilities) outperform UUID (2*122=5e+36).
"""
return base62(n_char=21)


def dtransform() -> str:
"""Data transformation: 21-char base62."""
return base62(n_char=22)
"""Data transformation: 21 base62."""
return base62(n_char=21)


def user() -> str:
Expand All @@ -67,34 +67,50 @@ def user() -> str:


def usage() -> str:
"""Usage event: 24-char base62."""
"""Usage event: 24 base62."""
return base62(n_char=24)


def secret() -> str:
"""Password or secret: 40-char base62."""
"""Password or secret: 40 base62."""
return base62(n_char=40)


def instance() -> str:
"""LaminDB instance: 10-char base62.
"""LaminDB instance: 10 base62.

Collision probability is 6e-03 for 100M instances: 1M users with 100 instances/user.
"""
return base62(n_char=10)


def storage() -> str:
"""Storage root: 10-char base62.
"""Storage root: 10 base62.

Collision probability is 6e-03 for 100M storage roots: 1M users with 100
storage roots/user.
"""
return base62(n_char=10)


def pipeline() -> str:
"""Pipeline: 9 base62.

Collision probability is low for 10M pipelines: 1M users with 10 pipelines/user.
"""
return base62(n_char=9)


def pipeline_run() -> str:
"""Pipeline run: 20 base62.

One char less than dtransform and dobject!
"""
return base62(n_char=20)


def jupynb():
"""Jupyter notebook: 12-char base62.
"""Jupyter notebook: 12 base62.

Collision probability is 2e-04 for 1B notebooks: 1M users with 1k notebooks/user.

Expand Down
4 changes: 4 additions & 0 deletions lnschema_core/id.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
dtransform
usage
jupynb
pipeline
pipeline_run
secret
"""
from ._id import ( # noqa
Expand All @@ -34,6 +36,8 @@
dtransform,
instance,
jupynb,
pipeline,
pipeline_run,
schema,
secret,
storage,
Expand Down
95 changes: 95 additions & 0 deletions migrations/versions/2022-08-26-3badf20f18c8-v0_5_0.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""v0.5.0.

Revision ID: 3badf20f18c8
Revises: 01fcb82dafd4
Create Date: 2022-08-26 14:17:51.167740
"""
import sqlalchemy as sa
import sqlmodel
from alembic import op

# revision identifiers, used by Alembic.
revision = "3badf20f18c8"
down_revision = "01fcb82dafd4"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.create_table(
"pipeline",
sa.Column("id", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column("v", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column("reference", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.PrimaryKeyConstraint("id", "v"),
)
op.create_index(op.f("ix_pipeline_name"), "pipeline", ["name"], unique=False)
op.create_index(
op.f("ix_pipeline_reference"), "pipeline", ["reference"], unique=False
)
op.add_column(
"dobject", sa.Column("storage_id", sqlmodel.sql.sqltypes.AutoString())
)
op.alter_column("dobject", "dtransform_id", existing_type=sa.VARCHAR())
op.drop_index("ix_dobject_tmp_dtransform_id", table_name="dobject")
op.drop_index("ix_dobject_tmp_file_suffix", table_name="dobject")
op.drop_index("ix_dobject_tmp_name", table_name="dobject")
op.drop_index("ix_dobject_tmp_storage_root", table_name="dobject")
op.drop_index("ix_dobject_tmp_time_created", table_name="dobject")
op.drop_index("ix_dobject_tmp_time_updated", table_name="dobject")
op.create_index(
op.f("ix_dobject_dtransform_id"), "dobject", ["dtransform_id"], unique=False
)
op.create_index(
op.f("ix_dobject_file_suffix"), "dobject", ["file_suffix"], unique=False
)
op.create_index(op.f("ix_dobject_name"), "dobject", ["name"], unique=False)
op.create_index(
op.f("ix_dobject_storage_id"), "dobject", ["storage_id"], unique=False
)
op.create_index(
op.f("ix_dobject_time_created"), "dobject", ["time_created"], unique=False
)
op.create_index(
op.f("ix_dobject_time_updated"), "dobject", ["time_updated"], unique=False
)
op.add_column(
"pipeline_run",
sa.Column("pipeline_id", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
)
op.add_column(
"pipeline_run",
sa.Column("pipeline_v", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
)
op.add_column(
"pipeline_run",
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
)
op.create_index(
op.f("ix_pipeline_run_name"), "pipeline_run", ["name"], unique=False
)
op.create_index(
op.f("ix_pipeline_run_pipeline_id"),
"pipeline_run",
["pipeline_id"],
unique=False,
)
op.create_index(
op.f("ix_pipeline_run_pipeline_v"), "pipeline_run", ["pipeline_v"], unique=False
)
with op.batch_alter_table("pipeline") as batch_op:
batch_op.create_foreign_key(
"pipeline_run", "pipeline", ["pipeline_id", "pipeline_v"], ["id", "v"]
)
op.add_column(
"storage", sa.Column("id", sqlmodel.sql.sqltypes.AutoString(), nullable=True)
)
op.create_index(op.f("ix_storage_root"), "storage", ["root"], unique=False)
op.alter_column("user", "handle", existing_type=sa.VARCHAR())
with op.batch_alter_table("user") as batch_op:
batch_op.create_unique_constraint("user", ["handle"])


def downgrade() -> None:
pass
Binary file modified testdb/mydata.lndb
Binary file not shown.