-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
1ba482b
commit 7bd182f
Showing
2 changed files
with
362 additions
and
0 deletions.
There are no files selected for viewing
181 changes: 181 additions & 0 deletions
181
...igrations/versions/postgresql/2023_01_31_110543_f98ae6d8e2cc_work_queue_data_migration.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,181 @@ | ||
"""Work queue data migration | ||
Revision ID: f98ae6d8e2cc | ||
Revises: 0a1250a5aa25 | ||
Create Date: 2023-01-31 11:05:43.356002 | ||
""" | ||
import sqlalchemy as sa | ||
from alembic import op | ||
|
||
# revision identifiers, used by Alembic. | ||
revision = "f98ae6d8e2cc" | ||
down_revision = "0a1250a5aa25" | ||
branch_labels = None | ||
depends_on = None | ||
|
||
|
||
def upgrade(): | ||
# ### commands auto generated by Alembic - please adjust! ### | ||
# Create default agent work pool and associate all existing queues with it | ||
connection = op.get_bind() | ||
|
||
connection.execute( | ||
sa.text( | ||
"INSERT INTO work_pool (name, type) VALUES ('default-agent-pool', 'prefect-agent')" | ||
) | ||
) | ||
|
||
default_pool_id = connection.execute( | ||
sa.text("SELECT id FROM work_pool WHERE name = 'default-agent-pool'") | ||
).fetchone()[0] | ||
|
||
default_queue = connection.execute( | ||
sa.text("SELECT id FROM work_queue WHERE name = 'default'") | ||
).fetchone() | ||
|
||
if not default_queue: | ||
connection.execute( | ||
sa.text( | ||
f"INSERT INTO work_queue (name, work_pool_id) VALUES ('default', :default_pool_id)" | ||
).params({"default_pool_id": default_pool_id}), | ||
) | ||
|
||
connection.execute( | ||
sa.text( | ||
"UPDATE work_queue SET work_pool_id = :default_pool_id WHERE work_pool_id IS NULL" | ||
).params({"default_pool_id": default_pool_id}), | ||
) | ||
|
||
default_queue_id = connection.execute( | ||
sa.text( | ||
"SELECT id FROM work_queue WHERE name = 'default' and work_pool_id = :default_pool_id" | ||
).params({"default_pool_id": default_pool_id}), | ||
).fetchone()[0] | ||
|
||
connection.execute( | ||
sa.text( | ||
"UPDATE work_pool SET default_queue_id = :default_queue_id WHERE id = :default_pool_id" | ||
).params( | ||
{"default_pool_id": default_pool_id, "default_queue_id": default_queue_id} | ||
), | ||
) | ||
|
||
# Set priority on all queues and update flow runs and deployments | ||
queue_rows = connection.execute( | ||
sa.text( | ||
"SELECT id, name FROM work_queue WHERE work_pool_id = :default_pool_id" | ||
).params({"default_pool_id": default_pool_id}), | ||
).fetchall() | ||
|
||
with op.get_context().autocommit_block(): | ||
for enumeration, row in enumerate(queue_rows): | ||
connection.execute( | ||
sa.text( | ||
"UPDATE work_queue SET priority = :priority WHERE id = :id" | ||
).params({"priority": enumeration + 1, "id": row[0]}), | ||
) | ||
|
||
batch_size = 250 | ||
|
||
while True: | ||
result = connection.execute( | ||
sa.text( | ||
""" | ||
UPDATE flow_run | ||
SET work_queue_id=:id | ||
WHERE flow_run.id in ( | ||
SELECT id | ||
FROM flow_run | ||
WHERE flow_run.work_queue_id IS NULL and flow_run.work_queue_name=:name | ||
LIMIT :batch_size | ||
) | ||
""" | ||
).params({"id": row[0], "name": row[1], "batch_size": batch_size}), | ||
) | ||
if result.rowcount <= batch_size: | ||
break | ||
|
||
while True: | ||
result = connection.execute( | ||
sa.text( | ||
""" | ||
UPDATE deployment | ||
SET work_queue_id=:id | ||
WHERE deployment.id in ( | ||
SELECT id | ||
FROM deployment | ||
WHERE deployment.work_queue_id IS NULL and deployment.work_queue_name=:name | ||
LIMIT :batch_size | ||
) | ||
""" | ||
).params({"id": row[0], "name": row[1], "batch_size": batch_size}), | ||
) | ||
if result.rowcount <= batch_size: | ||
break | ||
|
||
# ### end Alembic commands ### | ||
|
||
|
||
def downgrade(): | ||
connection = op.get_bind() | ||
|
||
# Delete all non-default queues and pools | ||
default_pool_id = connection.execute( | ||
sa.text("SELECT id FROM work_pool WHERE name = 'default-agent-pool'") | ||
).fetchone()[0] | ||
default_pool_id = default_pool_id_result[0] | ||
connection.execute( | ||
sa.text("DELETE FROM work_queue WHERE work_pool_id != :default_pool_id").params( | ||
{"default_pool_id": default_pool_id} | ||
) | ||
) | ||
queue_rows = connection.execute( | ||
sa.text( | ||
"SELECT id, name FROM work_queue WHERE work_pool_id = :default_pool_id" | ||
).params({"default_pool_id": default_pool_id}), | ||
).fetchall() | ||
|
||
with op.get_context().autocommit_block(): | ||
for row in queue_rows: | ||
batch_size = 250 | ||
|
||
while True: | ||
result = connection.execute( | ||
sa.text( | ||
""" | ||
UPDATE flow_run | ||
SET work_queue_id=NULL | ||
WHERE flow_run.id in ( | ||
SELECT id | ||
FROM flow_run | ||
WHERE flow_run.work_queue_id IS NOT NULL and flow_run.work_queue_id=:id | ||
LIMIT :batch_size | ||
) | ||
""" | ||
).params({"id": row[0], "batch_size": batch_size}), | ||
) | ||
if result.rowcount <= batch_size: | ||
break | ||
|
||
while True: | ||
result = connection.execute( | ||
sa.text( | ||
""" | ||
UPDATE deployment | ||
SET work_queue_id=NULL | ||
WHERE deployment.id in ( | ||
SELECT id | ||
FROM deployment | ||
WHERE deployment.work_queue_id IS NOT NULL and deployment.work_queue_id=:id | ||
LIMIT :batch_size | ||
) | ||
""" | ||
).params({"id": row[0], "batch_size": batch_size}), | ||
) | ||
if result.rowcount <= batch_size: | ||
break | ||
|
||
connection.execute(sa.text("UPDATE work_queue SET work_pool_id = NULL")) | ||
|
||
connection.execute(sa.text("DELETE FROM work_pool")) |
181 changes: 181 additions & 0 deletions
181
...se/migrations/versions/sqlite/2023_01_31_105442_1678f2fb8b33_work_queue_data_migration.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,181 @@ | ||
"""Work queue data migration | ||
Revision ID: 1678f2fb8b33 | ||
Revises: b9bda9f142f1 | ||
Create Date: 2023-01-31 10:54:42.747849 | ||
""" | ||
import sqlalchemy as sa | ||
from alembic import op | ||
|
||
# revision identifiers, used by Alembic. | ||
revision = "1678f2fb8b33" | ||
down_revision = "b9bda9f142f1" | ||
branch_labels = None | ||
depends_on = None | ||
|
||
|
||
def upgrade(): | ||
# ### commands auto generated by Alembic - please adjust! ### | ||
# Create default agent work pool and associate all existing queues with it | ||
connection = op.get_bind() | ||
|
||
connection.execute( | ||
sa.text( | ||
"INSERT INTO work_pool (name, type) VALUES ('default-agent-pool', 'prefect-agent')" | ||
) | ||
) | ||
|
||
default_pool_id = connection.execute( | ||
sa.text("SELECT id FROM work_pool WHERE name = 'default-agent-pool'") | ||
).fetchone()[0] | ||
|
||
default_queue = connection.execute( | ||
sa.text("SELECT id FROM work_queue WHERE name = 'default'") | ||
).fetchone() | ||
|
||
if not default_queue: | ||
connection.execute( | ||
sa.text( | ||
f"INSERT INTO work_queue (name, work_pool_id) VALUES ('default', :default_pool_id)" | ||
).params({"default_pool_id": default_pool_id}), | ||
) | ||
|
||
connection.execute( | ||
sa.text( | ||
"UPDATE work_queue SET work_pool_id = :default_pool_id WHERE work_pool_id IS NULL" | ||
).params({"default_pool_id": default_pool_id}), | ||
) | ||
|
||
default_queue_id = connection.execute( | ||
sa.text( | ||
"SELECT id FROM work_queue WHERE name = 'default' and work_pool_id = :default_pool_id" | ||
).params({"default_pool_id": default_pool_id}), | ||
).fetchone()[0] | ||
|
||
connection.execute( | ||
sa.text( | ||
"UPDATE work_pool SET default_queue_id = :default_queue_id WHERE id = :default_pool_id" | ||
).params( | ||
{"default_pool_id": default_pool_id, "default_queue_id": default_queue_id} | ||
), | ||
) | ||
|
||
# Set priority on all queues and update flow runs and deployments | ||
queue_rows = connection.execute( | ||
sa.text( | ||
"SELECT id, name FROM work_queue WHERE work_pool_id = :default_pool_id" | ||
).params({"default_pool_id": default_pool_id}), | ||
).fetchall() | ||
|
||
with op.get_context().autocommit_block(): | ||
for enumeration, row in enumerate(queue_rows): | ||
connection.execute( | ||
sa.text( | ||
"UPDATE work_queue SET priority = :priority WHERE id = :id" | ||
).params({"priority": enumeration + 1, "id": row[0]}), | ||
) | ||
|
||
batch_size = 250 | ||
|
||
while True: | ||
result = connection.execute( | ||
sa.text( | ||
""" | ||
UPDATE flow_run | ||
SET work_queue_id=:id | ||
WHERE flow_run.id in ( | ||
SELECT id | ||
FROM flow_run | ||
WHERE flow_run.work_queue_id IS NULL and flow_run.work_queue_name=:name | ||
LIMIT :batch_size | ||
) | ||
""" | ||
).params({"id": row[0], "name": row[1], "batch_size": batch_size}), | ||
) | ||
if result.rowcount <= batch_size: | ||
break | ||
|
||
while True: | ||
result = connection.execute( | ||
sa.text( | ||
""" | ||
UPDATE deployment | ||
SET work_queue_id=:id | ||
WHERE deployment.id in ( | ||
SELECT id | ||
FROM deployment | ||
WHERE deployment.work_queue_id IS NULL and deployment.work_queue_name=:name | ||
LIMIT :batch_size | ||
) | ||
""" | ||
).params({"id": row[0], "name": row[1], "batch_size": batch_size}), | ||
) | ||
if result.rowcount <= batch_size: | ||
break | ||
|
||
# ### end Alembic commands ### | ||
|
||
|
||
def downgrade(): | ||
connection = op.get_bind() | ||
|
||
# Delete all non-default queues and pools | ||
default_pool_id = connection.execute( | ||
sa.text("SELECT id FROM work_pool WHERE name = 'default-agent-pool'") | ||
).fetchone()[0] | ||
default_pool_id = default_pool_id_result[0] | ||
connection.execute( | ||
sa.text("DELETE FROM work_queue WHERE work_pool_id != :default_pool_id").params( | ||
{"default_pool_id": default_pool_id} | ||
) | ||
) | ||
queue_rows = connection.execute( | ||
sa.text( | ||
"SELECT id, name FROM work_queue WHERE work_pool_id = :default_pool_id" | ||
).params({"default_pool_id": default_pool_id}), | ||
).fetchall() | ||
|
||
with op.get_context().autocommit_block(): | ||
for row in queue_rows: | ||
batch_size = 250 | ||
|
||
while True: | ||
result = connection.execute( | ||
sa.text( | ||
""" | ||
UPDATE flow_run | ||
SET work_queue_id=NULL | ||
WHERE flow_run.id in ( | ||
SELECT id | ||
FROM flow_run | ||
WHERE flow_run.work_queue_id IS NOT NULL and flow_run.work_queue_id=:id | ||
LIMIT :batch_size | ||
) | ||
""" | ||
).params({"id": row[0], "batch_size": batch_size}), | ||
) | ||
if result.rowcount <= batch_size: | ||
break | ||
|
||
while True: | ||
result = connection.execute( | ||
sa.text( | ||
""" | ||
UPDATE deployment | ||
SET work_queue_id=NULL | ||
WHERE deployment.id in ( | ||
SELECT id | ||
FROM deployment | ||
WHERE deployment.work_queue_id IS NOT NULL and deployment.work_queue_id=:id | ||
LIMIT :batch_size | ||
) | ||
""" | ||
).params({"id": row[0], "batch_size": batch_size}), | ||
) | ||
if result.rowcount <= batch_size: | ||
break | ||
|
||
connection.execute(sa.text("UPDATE work_queue SET work_pool_id = NULL")) | ||
|
||
connection.execute(sa.text("DELETE FROM work_pool")) |