Skip to content

Commit

Permalink
Adds migration logic for work pools (#8214)
Browse files Browse the repository at this point in the history
  • Loading branch information
desertaxle authored and zanieb committed Feb 3, 2023
1 parent 7aea9cf commit c1fd1f3
Show file tree
Hide file tree
Showing 10 changed files with 494 additions and 24 deletions.
36 changes: 36 additions & 0 deletions src/prefect/orion/api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from prefect.orion.utilities.server import method_paths_from_routes
from prefect.settings import (
PREFECT_DEBUG_MODE,
PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS,
PREFECT_MEMO_STORE_PATH,
PREFECT_MEMOIZE_BLOCK_AUTO_REGISTRATION,
PREFECT_ORION_DATABASE_CONNECTION_URL,
Expand Down Expand Up @@ -365,6 +366,40 @@ async def add_block_types():
except Exception as exc:
logger.warn(f"Error occurred during block auto-registration: {exc!r}")

async def migrate_work_queues():
"""Duplicates work queues to work pool queues"""
if PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS.value():
from prefect.orion.database.dependencies import provide_database_interface
from prefect.orion.models.configuration import (
read_configuration,
write_configuration,
)
from prefect.orion.models.workers_migration import migrate_all_work_queues
from prefect.orion.schemas.core import Configuration

db = provide_database_interface()
session = await db.session()

async with session.begin():
migration_status_configuration = await read_configuration(
session=session, key="WORK_POOL_QUEUE_MIGRATION", db=db
)
has_run = (
migration_status_configuration.value.get("has_run", False)
if migration_status_configuration is not None
else False
)
if not has_run:
await migrate_all_work_queues(session=session, db=db)
await write_configuration(
session=session,
configuration=Configuration(
key="WORK_POOL_QUEUE_MIGRATION",
value={"has_run": True},
),
db=db,
)

async def start_services():
"""Start additional services when the Orion API starts up."""

Expand Down Expand Up @@ -459,6 +494,7 @@ def on_service_exit(service, task):
run_migrations,
add_block_types,
start_services,
migrate_work_queues,
],
on_shutdown=[stop_services],
)
Expand Down
33 changes: 23 additions & 10 deletions src/prefect/orion/api/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import prefect.orion.schemas as schemas
from prefect.orion.database.dependencies import provide_database_interface
from prefect.orion.database.interface import OrionDBInterface
from prefect.orion.models.workers_migration import DEFAULT_AGENT_WORK_POOL_NAME
from prefect.orion.utilities.schemas import DateTimeTZ
from prefect.orion.utilities.server import OrionRouter
from prefect.settings import PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS
Expand Down Expand Up @@ -103,16 +104,28 @@ async def _get_work_pool_queue_id_from_name(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Work pool queue '{work_pool_name}/{work_pool_queue_name}' not found.",
)
work_pool_id = await self._get_work_pool_id_from_name(
session=session, work_pool_name=work_pool_name
)
work_pool_queue = await models.workers.create_work_pool_queue(
session=session,
work_pool_id=work_pool_id,
work_pool_queue=schemas.actions.WorkPoolQueueCreate(
name=work_pool_queue_name
),
)
if work_pool_name == DEFAULT_AGENT_WORK_POOL_NAME:
work_pool = await models.workers_migration.get_or_create_default_agent_work_pool(
session=session
)
work_pool_queue = await models.workers.create_work_pool_queue(
session=session,
work_pool_id=work_pool.id,
work_pool_queue=schemas.actions.WorkPoolQueueCreate(
name=work_pool_queue_name
),
)
else:
work_pool_id = await self._get_work_pool_id_from_name(
session=session, work_pool_name=work_pool_name
)
work_pool_queue = await models.workers.create_work_pool_queue(
session=session,
work_pool_id=work_pool_id,
work_pool_queue=schemas.actions.WorkPoolQueueCreate(
name=work_pool_queue_name
),
)

return work_pool_queue.id

Expand Down
1 change: 1 addition & 0 deletions src/prefect/orion/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
saved_searches,
logs,
workers,
workers_migration,
work_queues,
agents,
)
6 changes: 0 additions & 6 deletions src/prefect/orion/models/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,6 @@ async def update_deployment(
session=session, deployment_id=deployment_id, db=db, auto_scheduled_only=True
)

# create work queue if it doesn't exist
if update_data.get("work_queue_name"):
await models.work_queues._ensure_work_queue_exists(
session=session, name=update_data["work_queue_name"], db=db
)

return result.rowcount > 0


Expand Down
13 changes: 13 additions & 0 deletions src/prefect/orion/models/flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from prefect.orion.schemas.responses import OrchestrationResult, SetStateStatus
from prefect.orion.schemas.states import State
from prefect.orion.utilities.schemas import PrefectBaseModel
from prefect.settings import PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS


@inject_db
Expand Down Expand Up @@ -64,6 +65,18 @@ async def create_flow_run(
created=now,
)

# If the flow run has a work queue name but no worker pool queue id, migrate it
# This is unusual and would only come from legacy internal systems.
if (
PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS.value()
and flow_run_dict.get("work_queue_name")
and not flow_run_dict.get("work_pool_queue_id")
):
work_pool_queue = await models.workers_migration.get_or_create_work_pool_queue(
session=session, work_queue_name=flow_run_dict["work_queue_name"]
)
flow_run_dict["work_pool_queue_id"] = work_pool_queue.id

# if no idempotency key was provided, create the run directly
if not flow_run.idempotency_key:
model = db.FlowRun(**flow_run_dict)
Expand Down
37 changes: 29 additions & 8 deletions src/prefect/orion/models/work_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
from prefect.orion.database.dependencies import inject_db
from prefect.orion.database.interface import OrionDBInterface
from prefect.orion.exceptions import ObjectNotFoundError
from prefect.orion.models import workers_migration
from prefect.orion.schemas.states import StateType
from prefect.settings import PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS


@inject_db
Expand All @@ -43,6 +45,11 @@ async def create_work_queue(
session.add(model)
await session.flush()

if PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS.value() and model.filter is None:
await workers_migration.get_or_create_work_pool_queue(
session=session, work_queue_id=model.id
)

return model


Expand Down Expand Up @@ -195,14 +202,28 @@ async def get_runs_in_work_queue(
raise ObjectNotFoundError(f"Work queue with id {work_queue_id} not found.")

if work_queue.filter is None:
query = db.queries.get_scheduled_flow_runs_from_work_queues(
db=db,
limit_per_queue=limit,
work_queue_ids=[work_queue_id],
scheduled_before=scheduled_before,
)
result = await session.execute(query)
return result.scalars().unique().all()
# If workers are enabled, ensure that a corresponding work pool queue exists
# and retrieve runs from that work pool queue.
if PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS.value():
worker_flow_run_response = (
await workers_migration.get_runs_from_work_pool_queue(
session=session,
work_queue_id=work_queue_id,
scheduled_before=scheduled_before,
limit=limit,
)
)
return [r.flow_run for r in worker_flow_run_response]
# Otherwise get runs from the legacy work queue.
else:
query = db.queries.get_scheduled_flow_runs_from_work_queues(
db=db,
limit_per_queue=limit,
work_queue_ids=[work_queue_id],
scheduled_before=scheduled_before,
)
result = await session.execute(query)
return result.scalars().unique().all()

# if the work queue has a filter, it's a deprecated tag-based work queue
# and uses an old approach
Expand Down
Loading

0 comments on commit c1fd1f3

Please sign in to comment.