Skip to content

Commit

Permalink
Adds migration logic for work pools (#8214)
Browse files Browse the repository at this point in the history
  • Loading branch information
desertaxle authored and zanieb committed Feb 6, 2023
1 parent 600dffd commit 81a610b
Show file tree
Hide file tree
Showing 12 changed files with 421 additions and 19 deletions.
1 change: 1 addition & 0 deletions src/prefect/orion/api/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from prefect.orion.models.workers import DEFAULT_AGENT_WORK_POOL_NAME
from prefect.orion.utilities.schemas import DateTimeTZ
from prefect.orion.utilities.server import OrionRouter
from prefect.settings import PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS

router = OrionRouter(prefix="/deployments", tags=["Deployments"])

Expand Down
36 changes: 36 additions & 0 deletions src/prefect/orion/api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from prefect.orion.utilities.server import method_paths_from_routes
from prefect.settings import (
PREFECT_DEBUG_MODE,
PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS,
PREFECT_MEMO_STORE_PATH,
PREFECT_MEMOIZE_BLOCK_AUTO_REGISTRATION,
PREFECT_ORION_DATABASE_CONNECTION_URL,
Expand Down Expand Up @@ -365,6 +366,40 @@ async def add_block_types():
except Exception as exc:
logger.warn(f"Error occurred during block auto-registration: {exc!r}")

async def migrate_work_queues():
"""Duplicates work queues to work pool queues"""
if PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS.value():
from prefect.orion.database.dependencies import provide_database_interface
from prefect.orion.models.configuration import (
read_configuration,
write_configuration,
)
from prefect.orion.models.workers_migration import migrate_all_work_queues
from prefect.orion.schemas.core import Configuration

db = provide_database_interface()
session = await db.session()

async with session.begin():
migration_status_configuration = await read_configuration(
session=session, key="WORK_POOL_QUEUE_MIGRATION", db=db
)
has_run = (
migration_status_configuration.value.get("has_run", False)
if migration_status_configuration is not None
else False
)
if not has_run:
await migrate_all_work_queues(session=session, db=db)
await write_configuration(
session=session,
configuration=Configuration(
key="WORK_POOL_QUEUE_MIGRATION",
value={"has_run": True},
),
db=db,
)

async def start_services():
"""Start additional services when the Orion API starts up."""

Expand Down Expand Up @@ -462,6 +497,7 @@ def on_service_exit(service, task):
run_migrations,
add_block_types,
start_services,
migrate_work_queues,
],
on_shutdown=[stop_services],
)
Expand Down
1 change: 1 addition & 0 deletions src/prefect/orion/api/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import prefect.orion.schemas as schemas
from prefect.orion.database.dependencies import provide_database_interface
from prefect.orion.database.interface import OrionDBInterface
from prefect.orion.models.workers_migration import DEFAULT_AGENT_WORK_POOL_NAME
from prefect.orion.utilities.schemas import DateTimeTZ
from prefect.orion.utilities.server import OrionRouter
from prefect.settings import PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS
Expand Down
1 change: 1 addition & 0 deletions src/prefect/orion/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
saved_searches,
logs,
workers,
workers_migration,
work_queues,
agents,
)
7 changes: 1 addition & 6 deletions src/prefect/orion/models/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from prefect.orion.exceptions import ObjectNotFoundError
from prefect.orion.utilities.database import json_contains
from prefect.settings import (
PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS,
PREFECT_ORION_SERVICES_SCHEDULER_MAX_RUNS,
PREFECT_ORION_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME,
PREFECT_ORION_SERVICES_SCHEDULER_MIN_RUNS,
Expand Down Expand Up @@ -186,12 +187,6 @@ async def update_deployment(
session=session, deployment_id=deployment_id, db=db, auto_scheduled_only=True
)

# create work queue if it doesn't exist
if update_data.get("work_queue_name"):
await models.work_queues._ensure_work_queue_exists(
session=session, name=update_data["work_queue_name"], db=db
)

return result.rowcount > 0


Expand Down
13 changes: 13 additions & 0 deletions src/prefect/orion/models/flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from prefect.orion.schemas.responses import OrchestrationResult, SetStateStatus
from prefect.orion.schemas.states import State
from prefect.orion.utilities.schemas import PrefectBaseModel
from prefect.settings import PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS


@inject_db
Expand Down Expand Up @@ -64,6 +65,18 @@ async def create_flow_run(
created=now,
)

# If the flow run has a work queue name but no worker pool queue id, migrate it
# This is unusual and would only come from legacy internal systems.
if (
PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS.value()
and flow_run_dict.get("work_queue_name")
and not flow_run_dict.get("work_pool_queue_id")
):
work_pool_queue = await models.workers_migration.get_or_create_work_pool_queue(
session=session, work_queue_name=flow_run_dict["work_queue_name"]
)
flow_run_dict["work_pool_queue_id"] = work_pool_queue.id

# if no idempotency key was provided, create the run directly
if not flow_run.idempotency_key:
model = db.FlowRun(**flow_run_dict)
Expand Down
36 changes: 28 additions & 8 deletions src/prefect/orion/models/work_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from prefect.orion.exceptions import ObjectNotFoundError
from prefect.orion.models.workers import DEFAULT_AGENT_WORK_POOL_NAME
from prefect.orion.schemas.states import StateType
from prefect.settings import PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS


@inject_db
Expand Down Expand Up @@ -76,6 +77,11 @@ async def create_work_queue(
session.add(model)
await session.flush()

if PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS.value() and model.filter is None:
await workers_migration.get_or_create_work_pool_queue(
session=session, work_queue_id=model.id
)

return model


Expand Down Expand Up @@ -236,14 +242,28 @@ async def get_runs_in_work_queue(
raise ObjectNotFoundError(f"Work queue with id {work_queue_id} not found.")

if work_queue.filter is None:
query = db.queries.get_scheduled_flow_runs_from_work_queues(
db=db,
limit_per_queue=limit,
work_queue_ids=[work_queue_id],
scheduled_before=scheduled_before,
)
result = await session.execute(query)
return result.scalars().unique().all()
# If workers are enabled, ensure that a corresponding work pool queue exists
# and retrieve runs from that work pool queue.
if PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS.value():
worker_flow_run_response = (
await workers_migration.get_runs_from_work_pool_queue(
session=session,
work_queue_id=work_queue_id,
scheduled_before=scheduled_before,
limit=limit,
)
)
return [r.flow_run for r in worker_flow_run_response]
# Otherwise get runs from the legacy work queue.
else:
query = db.queries.get_scheduled_flow_runs_from_work_queues(
db=db,
limit_per_queue=limit,
work_queue_ids=[work_queue_id],
scheduled_before=scheduled_before,
)
result = await session.execute(query)
return result.scalars().unique().all()

# if the work queue has a filter, it's a deprecated tag-based work queue
# and uses an old approach
Expand Down
200 changes: 200 additions & 0 deletions src/prefect/orion/models/workers_migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
"""
Utility functions for migrating from work queues to work pools
"""
from typing import Optional
from uuid import UUID

import sqlalchemy as sa
from sqlalchemy.ext.asyncio import AsyncSession

from prefect.orion import models, schemas
from prefect.orion.database.dependencies import inject_db
from prefect.orion.database.interface import OrionDBInterface
from prefect.orion.exceptions import ObjectNotFoundError
from prefect.orion.utilities.schemas import DateTimeTZ

DEFAULT_AGENT_WORK_POOL_NAME = "default-agent-pool"
PREFECT_AGENT_WORK_POOL_TYPE = "prefect-agent"


@inject_db
async def get_or_create_default_agent_work_pool(
session: AsyncSession, db: OrionDBInterface = None
):
"""
Gets or creates the default work pool for agents.
Args:
session (AsyncSession): a database session
Returns:
db.WorkPool: The default agents work pool
"""
pool = await models.workers.read_work_pool_by_name(
session=session, work_pool_name=DEFAULT_AGENT_WORK_POOL_NAME
)
if pool is None:
pool = await models.workers.create_work_pool(
session=session,
work_pool=schemas.actions.WorkPoolCreate(
name=DEFAULT_AGENT_WORK_POOL_NAME,
description="A default work pool for Prefect Agents",
type=PREFECT_AGENT_WORK_POOL_TYPE,
),
)
return pool


@inject_db
async def get_or_create_work_pool_queue(
session: AsyncSession,
work_queue_id: Optional[UUID] = None,
work_queue_name: Optional[str] = None,
db: OrionDBInterface = None,
):
"""
Gets or creates a work pool queue for a work queue.
Will update corresponding deployments and flow runs to point at the new work pool queue.
Args:
session (AsyncSession): a database session
work_queue_id (UUID): the work queue id
work_queue_name (str): the work queue name
db (OrionDBInterface, optional): a database interface. Defaults to None.
Returns:
db.WorkPoolQueue: The work pool queue
"""

if (work_queue_id, work_queue_name) == (None, None):
raise ValueError(
"Either the work queue name or the work queue ID must be provided."
)

if work_queue_id is not None:
work_queue = await models.work_queues.read_work_queue(
session=session, work_queue_id=work_queue_id
)
else:
work_queue = await models.work_queues.read_work_queue_by_name(
session=session, name=work_queue_name
)

if not work_queue:
raise ObjectNotFoundError()

# get or create the default agents pool
pool = await get_or_create_default_agent_work_pool(session=session, db=db)

# get or create the work pool queue
work_pool_queue = await models.workers.read_work_pool_queue_by_name(
session=session,
work_pool_name=DEFAULT_AGENT_WORK_POOL_NAME,
work_pool_queue_name=work_queue.name,
)
if work_pool_queue is None:
work_pool_queue = await models.workers.create_work_pool_queue(
session=session,
work_pool_id=pool.id,
work_pool_queue=schemas.actions.WorkPoolQueueCreate(
name=work_queue.name,
description=work_queue.description,
is_paused=work_queue.is_paused,
concurrency_limit=work_queue.concurrency_limit,
),
)

# update deployments to point at the new work queue
update_deployments_stmt = (
sa.update(db.Deployment)
.where(
db.Deployment.work_queue_name == work_queue.name,
db.Deployment.work_pool_queue_id.is_(None),
)
.values(work_pool_queue_id=work_pool_queue.id)
)
await session.execute(update_deployments_stmt)

# update scheduled flow runs to point at the new work queue
offset = 0
while True:
where_clause = (
sa.select([db.FlowRun.id])
.where(
db.FlowRun.work_queue_name == work_queue.name,
db.FlowRun.state_type == "SCHEDULED",
db.FlowRun.work_pool_queue_id.is_(None),
)
.order_by(db.FlowRun.id.asc())
.limit(50)
.offset(offset)
)
result = await session.execute(where_clause)
flow_run_ids = result.scalars().all()

if not flow_run_ids:
break

update_flow_runs_stmt = (
sa.update(db.FlowRun)
.where(
db.FlowRun.id.in_(flow_run_ids),
)
.values(work_pool_queue_id=work_pool_queue.id)
)
await session.execute(update_flow_runs_stmt)
await session.commit()
offset += 50

# return the new queue
return work_pool_queue


@inject_db
async def get_runs_from_work_pool_queue(
session: AsyncSession,
work_queue_id: UUID,
scheduled_before: Optional[DateTimeTZ] = None,
limit: Optional[int] = None,
db: Optional[OrionDBInterface] = None,
):
work_pool_queue = await get_or_create_work_pool_queue(
session=session, work_queue_id=work_queue_id, db=db
)

return await models.workers.get_scheduled_flow_runs(
session=session,
work_pool_ids=[work_pool_queue.work_pool_id],
work_pool_queue_ids=[work_pool_queue.id],
scheduled_before=scheduled_before,
limit=limit,
db=db,
)


@inject_db
async def migrate_all_work_queues(
session: AsyncSession,
db: Optional[OrionDBInterface] = None,
):
"""
Migrates all existing work queues to the default Prefect agent work pool.
Can be run at any time to facilitate user-initiated migrations without
waiting for an agent to poll.
"""
offset = 0
while True:
work_queues = await models.work_queues.read_work_queues(
session=session, offset=offset, limit=25
)
if not work_queues:
break

for work_queue in work_queues:
await get_or_create_work_pool_queue(
session=session, work_queue_id=work_queue.id, db=db
)

offset += 25
1 change: 1 addition & 0 deletions src/prefect/orion/schemas/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
PrefectBaseModel,
copy_model_fields,
)
from prefect.settings import PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS
from prefect.utilities.collections import AutoEnum

if TYPE_CHECKING:
Expand Down
Loading

0 comments on commit 81a610b

Please sign in to comment.