Skip to content

Commit

Permalink
change alembic migrations to create new tables with bigint cols (dags…
Browse files Browse the repository at this point in the history
…ter-io#16045)

## Summary & Motivation
For existing users who are behind on their migrations, make sure they
create bigint id cols.

## How I Tested These Changes
BK
  • Loading branch information
prha authored Sep 7, 2023
1 parent b453aa3 commit 839d487
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from alembic import op
from dagster._core.storage.migration.utils import has_column, has_table
from sqlalchemy import inspect
from sqlalchemy.dialects import sqlite

# alembic magic breaks pylint

Expand All @@ -29,7 +30,13 @@ def upgrade():
if not has_table("snapshots"):
op.create_table(
"snapshots",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True, nullable=False),
sa.Column(
"id",
sa.BigInteger().with_variant(sqlite.INTEGER(), "sqlite"),
primary_key=True,
autoincrement=True,
nullable=False,
),
sa.Column("snapshot_id", sa.String(255), unique=True, nullable=False),
sa.Column("snapshot_body", sa.LargeBinary, nullable=False),
sa.Column("snapshot_type", sa.String(63), nullable=False),
Expand Down
57 changes: 49 additions & 8 deletions python_modules/dagster/dagster/_core/storage/migration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import sqlalchemy as db
from alembic import op
from sqlalchemy import inspect
from sqlalchemy.dialects import sqlite

import dagster._check as check
from dagster._core.instance import DagsterInstance
Expand Down Expand Up @@ -74,7 +75,12 @@ def create_0_10_0_run_tables() -> None:
if not has_table("secondary_indexes"):
op.create_table(
"secondary_indexes",
db.Column("id", db.Integer, primary_key=True, autoincrement=True),
db.Column(
"id",
db.BigInteger().with_variant(sqlite.INTEGER(), "sqlite"),
primary_key=True,
autoincrement=True,
),
db.Column("name", db.String, unique=True),
db.Column("create_timestamp", db.DateTime, server_default=db.text("CURRENT_TIMESTAMP")),
db.Column("migration_completed", db.DateTime),
Expand All @@ -97,7 +103,12 @@ def create_0_10_0_event_log_tables() -> None:
if not has_table("secondary_indexes"):
op.create_table(
"secondary_indexes",
db.Column("id", db.Integer, primary_key=True, autoincrement=True),
db.Column(
"id",
db.BigInteger().with_variant(sqlite.INTEGER(), "sqlite"),
primary_key=True,
autoincrement=True,
),
db.Column("name", db.String, unique=True),
db.Column("create_timestamp", db.DateTime, server_default=db.text("CURRENT_TIMESTAMP")),
db.Column("migration_completed", db.DateTime),
Expand All @@ -106,7 +117,12 @@ def create_0_10_0_event_log_tables() -> None:
if not has_table("asset_keys"):
op.create_table(
"asset_keys",
db.Column("id", db.Integer, primary_key=True, autoincrement=True),
db.Column(
"id",
db.BigInteger().with_variant(sqlite.INTEGER(), "sqlite"),
primary_key=True,
autoincrement=True,
),
db.Column("asset_key", db.String, unique=True),
db.Column("create_timestamp", db.DateTime, server_default=db.text("CURRENT_TIMESTAMP")),
)
Expand All @@ -119,7 +135,12 @@ def create_0_10_0_schedule_tables() -> None:
if not has_table("jobs"):
op.create_table(
"jobs",
db.Column("id", db.Integer, primary_key=True, autoincrement=True),
db.Column(
"id",
db.BigInteger().with_variant(sqlite.INTEGER(), "sqlite"),
primary_key=True,
autoincrement=True,
),
db.Column("job_origin_id", db.String(255), unique=True),
db.Column("repository_origin_id", db.String(255)),
db.Column("status", db.String(63)),
Expand All @@ -132,7 +153,12 @@ def create_0_10_0_schedule_tables() -> None:
if not has_table("job_ticks"):
op.create_table(
"job_ticks",
db.Column("id", db.Integer, primary_key=True, autoincrement=True),
db.Column(
"id",
db.BigInteger().with_variant(sqlite.INTEGER(), "sqlite"),
primary_key=True,
autoincrement=True,
),
db.Column("job_origin_id", db.String(255), index=True),
db.Column("status", db.String(63)),
db.Column("type", db.String(63)),
Expand Down Expand Up @@ -162,7 +188,12 @@ def create_bulk_actions_table() -> None:
if not has_table("bulk_actions"):
op.create_table(
"bulk_actions",
db.Column("id", db.Integer, primary_key=True, autoincrement=True),
db.Column(
"id",
db.BigInteger().with_variant(sqlite.INTEGER(), "sqlite"),
primary_key=True,
autoincrement=True,
),
db.Column("key", db.String(32), unique=True, nullable=False),
db.Column("status", db.String(255), nullable=False),
db.Column("timestamp", db.types.TIMESTAMP, nullable=False),
Expand Down Expand Up @@ -271,7 +302,12 @@ def create_schedule_secondary_index_table() -> None:
if not has_table("secondary_indexes"):
op.create_table(
"secondary_indexes",
db.Column("id", db.Integer, primary_key=True, autoincrement=True),
db.Column(
"id",
db.BigInteger().with_variant(sqlite.INTEGER(), "sqlite"),
primary_key=True,
autoincrement=True,
),
db.Column("name", db.String, unique=True),
db.Column("create_timestamp", db.DateTime, server_default=db.text("CURRENT_TIMESTAMP")),
db.Column("migration_completed", db.DateTime),
Expand All @@ -286,7 +322,12 @@ def create_instigators_table() -> None:
if not has_table("instigators"):
op.create_table(
"instigators",
db.Column("id", db.Integer, primary_key=True, autoincrement=True),
db.Column(
"id",
db.BigInteger().with_variant(sqlite.INTEGER(), "sqlite"),
primary_key=True,
autoincrement=True,
),
db.Column("selector_id", db.String(255), unique=True),
db.Column("repository_selector_id", db.String(255)),
db.Column("status", db.String(63)),
Expand Down

0 comments on commit 839d487

Please sign in to comment.