Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds migration logic for work pools #8214

Merged
merged 30 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
cff8b75
WIP: Adds migration plumbing
desertaxle Jan 19, 2023
2c2a8cb
Updates warning for Deployment work pool name field
desertaxle Jan 19, 2023
2d220ea
Fixes failing tests
desertaxle Jan 19, 2023
e850cf4
Adds additional migration logic
desertaxle Jan 20, 2023
4344504
Adds handling for attempts to delete non-existant work queue
desertaxle Jan 20, 2023
dfcf682
Adds work pool to deployment build CLI
desertaxle Jan 20, 2023
e1c1809
Merge branch 'main' into work-pool-deployments-refactor
desertaxle Jan 20, 2023
5e11688
Fixes typo
desertaxle Jan 20, 2023
84e44fc
Adds handling for default agent work pool
desertaxle Jan 20, 2023
7d07dad
Adds work queue migration on server startup
desertaxle Jan 21, 2023
504918b
Hides dynamic migration logic behind a experimental flag
desertaxle Jan 21, 2023
6032e78
Attempts to fix tests
desertaxle Jan 21, 2023
2d2bfe0
Enables workers feature for migration tests
desertaxle Jan 21, 2023
51b9a90
Enables workers feature for migration tests
desertaxle Jan 21, 2023
c1c70ab
Adds session management to migration on server start up
desertaxle Jan 21, 2023
8d2898d
Merge branch 'main' into work-pool-deployments-refactor
cicdw Jan 23, 2023
2b2ee90
Merge branch 'main' into work-pool-deployments-refactor
desertaxle Jan 24, 2023
83676a3
Addresses review comments
desertaxle Jan 24, 2023
61bd9aa
Creates work pool experimental flag
desertaxle Jan 24, 2023
acb693a
Moves default work pool creation logic
desertaxle Jan 24, 2023
eeca540
Updates experimental decorators with work_pools group
desertaxle Jan 24, 2023
90fd891
Makes work pool type required
desertaxle Jan 24, 2023
c205c29
Fixes fixture
desertaxle Jan 24, 2023
8620c45
Updates UI to work with new feature flag
desertaxle Jan 24, 2023
1b69953
Adds batching to migration logic
desertaxle Jan 24, 2023
74950f3
Fixes tests
desertaxle Jan 24, 2023
664d24f
Fixes tests
desertaxle Jan 24, 2023
8a220c6
Fixes after QA
desertaxle Jan 25, 2023
eee0dbe
Reverts work_queue_name removal from deployment
desertaxle Jan 25, 2023
5120945
Fixes tests
desertaxle Jan 25, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions orion-ui/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion orion-ui/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"test": "playwright test"
},
"dependencies": {
"@prefecthq/orion-design": "1.2.6",
"@prefecthq/orion-design": "1.2.7",
"@prefecthq/prefect-design": "1.2.2",
"@prefecthq/vue-compositions": "1.2.9",
"@types/lodash.debounce": "4.0.7",
Expand Down
4 changes: 2 additions & 2 deletions orion-ui/src/components/ContextSidebar.vue
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<p-context-nav-item title="Flow Runs" icon="FlowRun" :to="routes.flowRuns()" />
<p-context-nav-item title="Flows" icon="Flow" :to="routes.flows()" />
<p-context-nav-item title="Deployments" icon="LocationMarkerIcon" :to="routes.deployments()" />
<p-context-nav-item v-if="canSeeWorkers" title="Work Pools" icon="DatabaseIcon" :to="routes.workPools()" />
<p-context-nav-item v-if="canSeeWorkPools" title="Work Pools" icon="DatabaseIcon" :to="routes.workPools()" />
<p-context-nav-item title="Work Queues" icon="DatabaseIcon" :to="routes.workQueues()" />
<p-context-nav-item title="Blocks" icon="CubeIcon" :to="routes.blocks()" />
<p-context-nav-item title="Notifications" icon="BellIcon" :to="routes.notifications()" />
Expand All @@ -22,5 +22,5 @@
import { routes } from '@/router'

const can = useCan()
const canSeeWorkers = computed(() => can.access.workers && can.read.work_pool)
const canSeeWorkPools = computed(() => can.access.work_pools && can.read.work_pool)
</script>
2 changes: 2 additions & 0 deletions orion-ui/src/maps/featureFlag.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ export const mapFlagResponseToFeatureFlag: MapFunction<FlagResponse, FeatureFlag
switch (source) {
case 'workers':
return 'access:workers'
case 'work_pools':
return 'access:work_pools'
default:
return null
}
Expand Down
3 changes: 2 additions & 1 deletion orion-ui/src/types/flagResponse.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export type FlagResponse =
| 'workers'
| 'workers'
| 'work_pools'
2 changes: 1 addition & 1 deletion orion-ui/src/utilities/permissions.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Can, WorkspacePermission, WorkspaceFeatureFlag } from '@prefecthq/orion-design'
import { InjectionKey } from 'vue'

const featureFlags = ['access:workers'] as const
const featureFlags = ['access:workers', 'access:work_pools'] as const

export type FeatureFlag = typeof featureFlags[number]

Expand Down
2 changes: 1 addition & 1 deletion src/prefect/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

class OrionAgent:
@experimental_parameter(
"work_pool_name", group="workers", when=lambda y: y is not None
"work_pool_name", group="work_pools", when=lambda y: y is not None
)
def __init__(
self,
Expand Down
8 changes: 8 additions & 0 deletions src/prefect/cli/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,12 @@ async def build(
"Note that if a work queue is not set, work will not be scheduled."
),
),
work_pool_name: str = typer.Option(
None,
"-p",
"--pool",
help="The work pool that will handle this deployment's runs.",
),
work_queue_concurrency: int = typer.Option(
None,
"--limit",
Expand Down Expand Up @@ -934,6 +940,8 @@ async def build(
init_kwargs.update(infrastructure=infrastructure)
if work_queue_name:
init_kwargs.update(work_queue_name=work_queue_name)
if work_pool_name:
init_kwargs.update(work_pool_name=work_pool_name)

deployment_loc = output_file or f"{obj_name}-deployment.yaml"
deployment = await Deployment.build_from_flow(
Expand Down
3 changes: 0 additions & 3 deletions src/prefect/client/orion.py
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,6 @@ async def create_deployment(
description: str = None,
work_queue_name: str = None,
work_pool_name: str = None,
work_pool_queue_name: str = None,
tags: List[str] = None,
storage_document_id: UUID = None,
manifest_path: str = None,
Expand Down Expand Up @@ -1323,8 +1322,6 @@ async def create_deployment(

if work_pool_name is not None:
deployment_create.work_pool_name = work_pool_name
if work_pool_queue_name is not None:
deployment_create.work_pool_queue_name = work_pool_queue_name

# Exclude newer fields that are not set to avoid compatibility issues
exclude = {
Expand Down
14 changes: 4 additions & 10 deletions src/prefect/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from prefect.infrastructure import Infrastructure, Process
from prefect.logging.loggers import flow_run_logger
from prefect.orion import schemas
from prefect.orion.models.workers_migration import DEFAULT_AGENT_WORK_POOL_NAME
from prefect.states import Scheduled
from prefect.tasks import Task
from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible
Expand Down Expand Up @@ -210,12 +211,10 @@ def load_deployments_from_yaml(
return registry


@experimental_field("work_pool_name", group="workers", when=lambda x: x is not None)
@experimental_field(
"work_pool_queue_name",
group="workers",
when=lambda x: x is not None,
stacklevel=4,
"work_pool_name",
group="work_pools",
when=lambda x: x is not None and x != DEFAULT_AGENT_WORK_POOL_NAME,
)
class Deployment(BaseModel):
"""
Expand Down Expand Up @@ -290,7 +289,6 @@ def _editable_fields(self) -> List[str]:
"version",
"work_queue_name",
"work_pool_name",
"work_pool_queue_name",
"tags",
"parameters",
"schedule",
Expand Down Expand Up @@ -398,9 +396,6 @@ def _yaml_dict(self) -> dict:
work_pool_name: Optional[str] = Field(
default=None, description="The work pool for the deployment"
)
work_pool_queue_name: Optional[str] = Field(
default=None, description="The work pool queue for the deployment."
)
# flow data
parameters: Dict[str, Any] = Field(default_factory=dict)
manifest_path: Optional[str] = Field(
Expand Down Expand Up @@ -658,7 +653,6 @@ async def apply(
name=self.name,
work_queue_name=self.work_queue_name,
work_pool_name=self.work_pool_name,
work_pool_queue_name=self.work_pool_queue_name,
version=self.version,
schedule=self.schedule,
is_schedule_active=self.is_schedule_active,
Expand Down
67 changes: 43 additions & 24 deletions src/prefect/orion/api/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
from prefect.orion.database.dependencies import provide_database_interface
from prefect.orion.database.interface import OrionDBInterface
from prefect.orion.exceptions import MissingVariableError, ObjectNotFoundError
from prefect.orion.models.workers_migration import DEFAULT_AGENT_WORK_POOL_NAME
from prefect.orion.utilities.schemas import DateTimeTZ
from prefect.orion.utilities.server import OrionRouter
from prefect.settings import PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS

router = OrionRouter(prefix="/deployments", tags=["Deployments"])

Expand All @@ -39,7 +41,10 @@ async def create_deployment(
"""

async with db.session_context(begin_transaction=True) as session:
if deployment.work_pool_name:
if (
deployment.work_pool_name
and deployment.work_pool_name != DEFAULT_AGENT_WORK_POOL_NAME
):
# Make sure that deployment is valid before beginning creation process
work_pool = await models.workers.read_work_pool_by_name(
session=session, work_pool_name=deployment.work_pool_name
Expand All @@ -58,29 +63,43 @@ async def create_deployment(
)

# hydrate the input model into a full model
deployment_dict = deployment.dict(
exclude={"work_pool_name", "work_pool_queue_name"}
)
if deployment.work_pool_name and deployment.work_pool_queue_name:
# If a specific pool name/queue name combination was provided, get the
# ID for that work pool queue.
deployment_dict[
"work_pool_queue_id"
] = await worker_lookups._get_work_pool_queue_id_from_name(
session=session,
work_pool_name=deployment.work_pool_name,
work_pool_queue_name=deployment.work_pool_queue_name,
create_queue_if_not_found=True,
)
elif deployment.work_pool_name:
# If just a pool name was provided, get the ID for its default
# work pool queue.
deployment_dict[
"work_pool_queue_id"
] = await worker_lookups._get_default_work_pool_queue_id_from_work_pool_name(
session=session,
work_pool_name=deployment.work_pool_name,
)
deployment_dict = deployment.dict(exclude={"work_pool_name"})
if PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS.value():
if deployment.work_pool_name and deployment.work_queue_name:
# If a specific pool name/queue name combination was provided, get the
# ID for that work pool queue.
deployment_dict[
"work_pool_queue_id"
] = await worker_lookups._get_work_pool_queue_id_from_name(
cicdw marked this conversation as resolved.
Show resolved Hide resolved
session=session,
work_pool_name=deployment.work_pool_name,
work_pool_queue_name=deployment.work_queue_name,
create_queue_if_not_found=True,
)
# Pop work queue name to prevent work from being pulled from
# a legacy work queue.
deployment_dict.pop("work_queue_name", None)
elif deployment.work_pool_name:
# If just a pool name was provided, get the ID for its default
# work pool queue.
deployment_dict[
"work_pool_queue_id"
] = await worker_lookups._get_default_work_pool_queue_id_from_work_pool_name(
session=session,
work_pool_name=deployment.work_pool_name,
)
elif deployment.work_queue_name:
# If just a work queue name was provided, we assume this deployment is using
# an agent and create a queue in the default agents work pool. This is a
# legacy case and can be removed once agents are removed.
_, work_pool_queue = await models.work_queues._ensure_work_queue_exists(
session=session, name=deployment.work_queue_name, db=db
)
if work_pool_queue:
deployment_dict["work_pool_queue_id"] = work_pool_queue.id
# Pop work queue name to prevent work from being pulled from
# a legacy work queue.
deployment_dict.pop("work_queue_name", None)

deployment = schemas.core.Deployment(**deployment_dict)
# check to see if relevant blocks exist, allowing us throw a useful error message
Expand Down
36 changes: 36 additions & 0 deletions src/prefect/orion/api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from prefect.orion.utilities.server import method_paths_from_routes
from prefect.settings import (
PREFECT_DEBUG_MODE,
PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS,
PREFECT_MEMO_STORE_PATH,
PREFECT_MEMOIZE_BLOCK_AUTO_REGISTRATION,
PREFECT_ORION_DATABASE_CONNECTION_URL,
Expand Down Expand Up @@ -365,6 +366,40 @@ async def add_block_types():
except Exception as exc:
logger.warn(f"Error occurred during block auto-registration: {exc!r}")

async def migrate_work_queues():
"""Duplicates work queues to work pool queues"""
if PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS.value():
from prefect.orion.database.dependencies import provide_database_interface
from prefect.orion.models.configuration import (
read_configuration,
write_configuration,
)
from prefect.orion.models.workers_migration import migrate_all_work_queues
from prefect.orion.schemas.core import Configuration

db = provide_database_interface()
session = await db.session()

async with session.begin():
migration_status_configuration = await read_configuration(
session=session, key="WORK_POOL_QUEUE_MIGRATION", db=db
)
has_run = (
migration_status_configuration.value.get("has_run", False)
if migration_status_configuration is not None
else False
)
if not has_run:
await migrate_all_work_queues(session=session, db=db)
await write_configuration(
session=session,
configuration=Configuration(
key="WORK_POOL_QUEUE_MIGRATION",
value={"has_run": True},
),
db=db,
)

async def start_services():
"""Start additional services when the Orion API starts up."""

Expand Down Expand Up @@ -435,6 +470,7 @@ def on_service_exit(service, task):
run_migrations,
add_block_types,
start_services,
migrate_work_queues,
],
on_shutdown=[stop_services],
)
Expand Down
34 changes: 21 additions & 13 deletions src/prefect/orion/api/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
import prefect.orion.schemas as schemas
from prefect.orion.database.dependencies import provide_database_interface
from prefect.orion.database.interface import OrionDBInterface
from prefect.orion.models.workers_migration import DEFAULT_AGENT_WORK_POOL_NAME
from prefect.orion.utilities.schemas import DateTimeTZ
from prefect.orion.utilities.server import OrionRouter
from prefect.settings import PREFECT_EXPERIMENTAL_ENABLE_WORKERS
from prefect.settings import PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS


def error_404_if_workers_not_enabled():
if not PREFECT_EXPERIMENTAL_ENABLE_WORKERS:
raise HTTPException(status_code=404, detail="Workers are not enabled")
if not PREFECT_EXPERIMENTAL_ENABLE_WORK_POOLS:
raise HTTPException(status_code=404, detail="Work pools are not enabled")


router = OrionRouter(
Expand Down Expand Up @@ -103,16 +104,23 @@ async def _get_work_pool_queue_id_from_name(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Work pool queue '{work_pool_name}/{work_pool_queue_name}' not found.",
)
work_pool_id = await self._get_work_pool_id_from_name(
session=session, work_pool_name=work_pool_name
)
work_pool_queue = await models.workers.create_work_pool_queue(
session=session,
work_pool_id=work_pool_id,
work_pool_queue=schemas.actions.WorkPoolQueueCreate(
name=work_pool_queue_name
),
)
if work_pool_name.lower() == DEFAULT_AGENT_WORK_POOL_NAME:
work_pool_queue = (
await models.workers_migration.get_or_create_work_pool_queue(
session=session, work_queue_name=work_pool_queue_name
)
)
else:
work_pool_id = await self._get_work_pool_id_from_name(
session=session, work_pool_name=work_pool_name
)
work_pool_queue = await models.workers.create_work_pool_queue(
session=session,
work_pool_id=work_pool_id,
work_pool_queue=schemas.actions.WorkPoolQueueCreate(
name=work_pool_queue_name
),
)

return work_pool_queue.id

Expand Down
1 change: 1 addition & 0 deletions src/prefect/orion/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
saved_searches,
logs,
workers,
workers_migration,
work_queues,
agents,
)
Loading