From 6e1f1248ad50f25dada6d9beb82f6b01e05660da Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Fri, 20 Jan 2023 16:28:00 -0600 Subject: [PATCH 1/9] WIP: Makes agents work pool aware --- src/prefect/agent.py | 44 +++++++++++++++++++++++++++++-------- src/prefect/cli/agent.py | 7 ++++++ src/prefect/client/orion.py | 19 ++++++++++++++++ 3 files changed, 61 insertions(+), 9 deletions(-) diff --git a/src/prefect/agent.py b/src/prefect/agent.py index e5bf7b4f3803..a6a40598a114 100644 --- a/src/prefect/agent.py +++ b/src/prefect/agent.py @@ -3,7 +3,7 @@ their execution. """ import inspect -from typing import Iterator, List, Optional, Set, Union +from typing import AsyncIterator, List, Optional, Set, Union from uuid import UUID import anyio @@ -22,7 +22,8 @@ ) from prefect.infrastructure import Infrastructure, InfrastructureResult, Process from prefect.logging import get_logger -from prefect.orion.schemas.core import BlockDocument, FlowRun, WorkQueue +from prefect.orion import schemas +from prefect.orion.schemas.core import BlockDocument, FlowRun, WorkPoolQueue, WorkQueue from prefect.orion.schemas.filters import ( FlowRunFilter, FlowRunFilterId, @@ -52,6 +53,7 @@ def __init__( ) self.work_queues: Set[str] = set(work_queues) if work_queues else set() + self.work_pool_name = work_pool_name self.prefetch_seconds = prefetch_seconds self.submitting_flow_run_ids = set() self.cancelling_flow_run_ids = set() @@ -99,7 +101,7 @@ async def update_matched_agent_work_queues(self): ) self.work_queues = matched_queues - async def get_work_queues(self) -> Iterator[WorkQueue]: + async def get_work_queues(self) -> AsyncIterator[Union[WorkQueue, WorkPoolQueue]]: """ Loads the work queue objects corresponding to the agent's target work queues. If any of them don't exist, they are created. @@ -121,7 +123,12 @@ async def get_work_queues(self) -> Iterator[WorkQueue]: for name in self.work_queues: try: - work_queue = await self.client.read_work_queue_by_name(name) + if self.work_pool_name: + work_queue = await self.client.read_work_pool_queue( + work_pool_name=self.work_pool_name, work_pool_queue_name=name + ) + else: + work_queue = await self.client.read_work_queue_by_name(name) except ObjectNotFound: # if the work queue wasn't found, create it @@ -129,8 +136,19 @@ async def get_work_queues(self) -> Iterator[WorkQueue]: # do not attempt to create work queues if the agent is polling for # queues using a regex try: - work_queue = await self.client.create_work_queue(name=name) - self.logger.info(f"Created work queue '{name}'.") + if self.work_pool_name: + work_queue = await self.client.create_work_pool_queue( + work_pool_name=self.work_pool_name, + work_pool_queue=schemas.actions.WorkPoolQueueCreate( + name=name + ), + ) + self.logger.info( + f"Created work queue {name!r} in work pool {self.work_pool_name!r}." + ) + else: + work_queue = await self.client.create_work_queue(name=name) + self.logger.info(f"Created work queue '{name}'.") # if creating it raises an exception, it was probably just # created by some other agent; rather than entering a re-read @@ -170,9 +188,17 @@ async def get_and_submit_flow_runs(self) -> List[FlowRun]: else: try: - queue_runs = await self.client.get_runs_in_work_queue( - id=work_queue.id, limit=10, scheduled_before=before - ) + if isinstance(work_queue, WorkPoolQueue): + responses = await self.client.get_scheduled_flow_runs_for_work_pool_queues( + work_pool_name=self.work_pool_name, + work_pool_queue_names=[work_queue.name], + scheduled_before=before, + ) + queue_runs = [response.flow_run for response in responses] + else: + queue_runs = await self.client.get_runs_in_work_queue( + id=work_queue.id, limit=10, scheduled_before=before + ) submittable_runs.extend(queue_runs) except ObjectNotFound: self.logger.error( diff --git a/src/prefect/cli/agent.py b/src/prefect/cli/agent.py index d3308accade9..ed450092efd2 100644 --- a/src/prefect/cli/agent.py +++ b/src/prefect/cli/agent.py @@ -60,6 +60,12 @@ async def start( "for example `dev-` will match all work queues with a name that starts with `dev-`" ), ), + work_pool_name: str = typer.Option( + None, + "-p", + "--pool", + help="A work pool name for the agent to pull from.", + ), hide_welcome: bool = typer.Option(False, "--hide-welcome"), api: str = SettingsOption(PREFECT_API_URL), run_once: bool = typer.Option( @@ -145,6 +151,7 @@ async def start( async with OrionAgent( work_queues=work_queues, work_queue_prefix=work_queue_prefix, + work_pool_name=work_pool_name, prefetch_seconds=prefetch_seconds, limit=limit, ) as agent: diff --git a/src/prefect/client/orion.py b/src/prefect/client/orion.py index b8bc0106bbd9..8f0f6b927a44 100644 --- a/src/prefect/client/orion.py +++ b/src/prefect/client/orion.py @@ -2078,6 +2078,25 @@ async def read_work_pool_queues( return pydantic.parse_obj_as(List[WorkPoolQueue], response.json()) + async def read_work_pool_queue( + self, work_pool_name: str, work_pool_queue_name: str + ) -> schemas.core.WorkPoolQueue: + """ + Retrieves queues for a work pool. + + Args: + work_pool_name: Name of the work pool the queue belong to. + work_pool_queue_name: Name of the work pool queue to get. + + Returns: + The specified work pool queue. + """ + response = await self._client.get( + f"/experimental/work_pools/{work_pool_name}/queues/{work_pool_queue_name}" + ) + + return pydantic.parse_obj_as(WorkPoolQueue, response.json()) + async def create_work_pool_queue( self, work_pool_name: str, From aeb32a0d22206dc18382787970972700eecc828e Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Fri, 20 Jan 2023 20:35:59 -0600 Subject: [PATCH 2/9] Adds work pool queue name prefix handling --- src/prefect/agent.py | 15 +++++++++++++- src/prefect/client/orion.py | 30 ++++++++++++++++++++++++---- src/prefect/orion/api/workers.py | 12 +++++++++-- src/prefect/orion/models/workers.py | 17 +++++++++++++++- src/prefect/orion/schemas/filters.py | 18 +++++++++++++++++ 5 files changed, 84 insertions(+), 8 deletions(-) diff --git a/src/prefect/agent.py b/src/prefect/agent.py index a6a40598a114..ccf871ea5291 100644 --- a/src/prefect/agent.py +++ b/src/prefect/agent.py @@ -41,6 +41,7 @@ def __init__( self, work_queues: List[str] = None, work_queue_prefix: Union[str, List[str]] = None, + work_pool_name: str = None, prefetch_seconds: int = None, default_infrastructure: Infrastructure = None, default_infrastructure_document_id: UUID = None, @@ -86,7 +87,19 @@ def __init__( async def update_matched_agent_work_queues(self): if self.work_queue_prefix: - matched_queues = await self.client.match_work_queues(self.work_queue_prefix) + if self.work_pool_name: + matched_queues = await self.client.read_work_pool_queues( + work_pool_name=self.work_pool_name, + work_pool_queue_filter=schemas.filters.WorkPoolQueueFilter( + name=schemas.filters.WorkPoolQueueFilterName( + startswith_=self.work_queue_prefix + ) + ), + ) + else: + matched_queues = await self.client.match_work_queues( + self.work_queue_prefix + ) matched_queues = set(q.name for q in matched_queues) if matched_queues != self.work_queues: new_queues = matched_queues - self.work_queues diff --git a/src/prefect/client/orion.py b/src/prefect/client/orion.py index 8f0f6b927a44..d93ea379486d 100644 --- a/src/prefect/client/orion.py +++ b/src/prefect/client/orion.py @@ -34,7 +34,11 @@ WorkPool, WorkPoolQueue, ) -from prefect.orion.schemas.filters import FlowRunNotificationPolicyFilter, LogFilter +from prefect.orion.schemas.filters import ( + FlowRunNotificationPolicyFilter, + LogFilter, + WorkPoolQueueFilter, +) from prefect.orion.schemas.responses import WorkerFlowRunResponse from prefect.settings import ( PREFECT_API_ENABLE_HTTP2, @@ -2061,19 +2065,37 @@ async def update_work_pool( ) async def read_work_pool_queues( - self, work_pool_name: str + self, + work_pool_name: str, + work_pool_queue_filter: Optional[WorkPoolQueueFilter] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, ) -> List[schemas.core.WorkPoolQueue]: """ Retrieves queues for a work pool. Args: work_pool_name: Name of the work pool for which to get queues. + work_pool_queue_filter: Criteria by which to filter queues. + limit: Limit for the queue query. + offset: Limit for the queue query. Returns: List of queues for the specified work pool. """ - response = await self._client.get( - f"/experimental/work_pools/{work_pool_name}/queues" + json = { + "flow_run_notification_policy_filter": work_pool_queue_filter.dict( + json_compatible=True, exclude_unset=True + ) + if work_pool_queue_filter + else None, + "limit": limit, + "offset": offset, + } + + response = await self._client.post( + f"/experimental/work_pools/{work_pool_name}/queues/filter", + json=json, ) return pydantic.parse_obj_as(List[WorkPoolQueue], response.json()) diff --git a/src/prefect/orion/api/workers.py b/src/prefect/orion/api/workers.py index e6345c176330..dc33672aa824 100644 --- a/src/prefect/orion/api/workers.py +++ b/src/prefect/orion/api/workers.py @@ -373,9 +373,12 @@ async def read_work_pool_queue( ) -@router.get("/{work_pool_name}/queues") +@router.post("/{work_pool_name}/queues/filter") async def read_work_pool_queues( work_pool_name: str = Path(..., description="The work pool name"), + work_pool_queues: schemas.filters.WorkPoolQueueFilter = None, + limit: int = dependencies.LimitBody(), + offset: int = Body(0, ge=0), worker_lookups: WorkerLookups = Depends(WorkerLookups), db: OrionDBInterface = Depends(provide_database_interface), ) -> List[schemas.core.WorkPoolQueue]: @@ -388,7 +391,12 @@ async def read_work_pool_queues( work_pool_name=work_pool_name, ) return await models.workers.read_work_pool_queues( - session=session, work_pool_id=work_pool_id, db=db + session=session, + work_pool_id=work_pool_id, + work_pool_queue_filter=work_pool_queues, + limit=limit, + offset=offset, + db=db, ) diff --git a/src/prefect/orion/models/workers.py b/src/prefect/orion/models/workers.py index a3c40b45bde0..6ba237371527 100644 --- a/src/prefect/orion/models/workers.py +++ b/src/prefect/orion/models/workers.py @@ -3,7 +3,7 @@ Intended for internal use by the Orion API. """ import datetime -from typing import Dict, List +from typing import Dict, List, Optional from uuid import UUID import pendulum @@ -333,6 +333,9 @@ async def read_work_pool_queues( session: AsyncSession, work_pool_id: UUID, db: OrionDBInterface, + work_pool_queue_filter: Optional[schemas.filters.WorkPoolQueueFilter] = None, + offset: Optional[int] = None, + limit: Optional[int] = None, ) -> List[ORMWorkPoolQueue]: """ Read all work pool queues for a work pool. Results are ordered by ascending priority. @@ -340,6 +343,10 @@ async def read_work_pool_queues( Args: session (AsyncSession): a database session work_pool_id (UUID): a work pool id + work_pool_queue_filter: Filter criteria for work pool queues + offset: Query offset + limit: Query limit + Returns: List[db.WorkPoolQueue]: the WorkPoolQueues @@ -350,6 +357,14 @@ async def read_work_pool_queues( .where(db.WorkPoolQueue.work_pool_id == work_pool_id) .order_by(db.WorkPoolQueue.priority.asc()) ) + + if work_pool_queue_filter is not None: + query = query.where(work_pool_queue_filter.as_sql_filter(db)) + if offset is not None: + query = query.offset(offset) + if limit is not None: + query = query.limit(limit) + result = await session.execute(query) return result.scalars().unique().all() diff --git a/src/prefect/orion/schemas/filters.py b/src/prefect/orion/schemas/filters.py index 2ec722ff592b..8c8ab13dab9b 100644 --- a/src/prefect/orion/schemas/filters.py +++ b/src/prefect/orion/schemas/filters.py @@ -1372,11 +1372,29 @@ class WorkPoolQueueFilterName(PrefectFilterBaseModel): any_: Optional[List[str]] = Field( default=None, description="A list of work pool queue names to include" ) + startswith_: Optional[List[str]] = Field( + default=None, + description=( + "A list of case-insensitive starts-with matches. For example, " + " passing 'marvin' will match " + "'marvin', and 'Marvin-robot', but not 'sad-marvin'." + ), + example=["marvin", "Marvin-robot"], + ) def _get_filter_list(self, db: "OrionDBInterface") -> List: filters = [] if self.any_ is not None: filters.append(db.WorkPoolQueue.name.in_(self.any_)) + if self.startswith_ is not None: + filters.append( + sa.or_( + *[ + db.WorkPoolQueue.name.ilike(f"{item}%") + for item in self.startswith_ + ] + ) + ) return filters From 95744eaeefa06cefdb9995c9d20b55dd762c0811 Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Fri, 20 Jan 2023 20:56:03 -0600 Subject: [PATCH 3/9] Fixes failing CLI tests --- tests/cli/test_agent.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/cli/test_agent.py b/tests/cli/test_agent.py index 0669cf65293d..e3fc2480ca5e 100644 --- a/tests/cli/test_agent.py +++ b/tests/cli/test_agent.py @@ -69,6 +69,7 @@ def test_start_agent_with_prefetch_seconds(monkeypatch): mock_agent.assert_called_once_with( work_queues=["test"], work_queue_prefix=ANY, + work_pool_name=None, prefetch_seconds=30, limit=None, ) @@ -91,6 +92,7 @@ def test_start_agent_with_prefetch_seconds_from_setting_by_default(monkeypatch): mock_agent.assert_called_once_with( work_queues=ANY, work_queue_prefix=ANY, + work_pool_name=None, prefetch_seconds=100, limit=None, ) @@ -106,6 +108,7 @@ def test_start_agent_respects_work_queue_names(monkeypatch): mock_agent.assert_called_once_with( work_queues=["a", "b"], work_queue_prefix=[], + work_pool_name=None, prefetch_seconds=ANY, limit=None, ) @@ -121,6 +124,7 @@ def test_start_agent_respects_work_queue_prefixes(monkeypatch): mock_agent.assert_called_once_with( work_queues=[], work_queue_prefix=["a", "b"], + work_pool_name=None, prefetch_seconds=ANY, limit=None, ) @@ -136,11 +140,28 @@ def test_start_agent_respects_limit(monkeypatch): mock_agent.assert_called_once_with( work_queues=["test"], work_queue_prefix=[], + work_pool_name=None, prefetch_seconds=ANY, limit=10, ) +def test_start_agent_respects_work_pool_name(monkeypatch): + mock_agent = MagicMock() + monkeypatch.setattr(prefect.cli.agent, "OrionAgent", mock_agent) + invoke_and_assert( + command=["agent", "start", "--pool", "test-pool", "--run-once", "-q", "test"], + expected_code=0, + ) + mock_agent.assert_called_once_with( + work_queues=["test"], + work_queue_prefix=[], + work_pool_name="test-pool", + prefetch_seconds=ANY, + limit=None, + ) + + def test_start_agent_with_work_queue_match_and_work_queue(): invoke_and_assert( command=["agent", "start", "hello", "-m", "blue"], From 8a688c145dc9f5a7a107e02210b050a70c658d6d Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Fri, 20 Jan 2023 21:26:27 -0600 Subject: [PATCH 4/9] Adds ability to get scheduled runs from entire work pool --- src/prefect/agent.py | 11 ++ tests/agent/test_agent_run_submission.py | 201 +++++++++++++++++++++++ 2 files changed, 212 insertions(+) diff --git a/src/prefect/agent.py b/src/prefect/agent.py index ccf871ea5291..a9820518c447 100644 --- a/src/prefect/agent.py +++ b/src/prefect/agent.py @@ -11,6 +11,7 @@ import anyio.to_process import pendulum +from prefect._internal.compatibility.experimental import experimental_parameter from prefect.blocks.core import Block from prefect.client.orion import OrionClient, get_client from prefect.engine import propose_state @@ -37,6 +38,9 @@ class OrionAgent: + @experimental_parameter( + "work_pool_name", group="workers", when=lambda y: y is not None + ) def __init__( self, work_queues: List[str] = None, @@ -190,6 +194,13 @@ async def get_and_submit_flow_runs(self) -> List[FlowRun]: submittable_runs: List[FlowRun] = [] + if self.work_pool_name and not self.work_queues: + responses = await self.client.get_scheduled_flow_runs_for_work_pool_queues( + work_pool_name=self.work_pool_name, + scheduled_before=before, + ) + submittable_runs.extend([response.flow_run for response in responses]) + # load runs from each work queue async for work_queue in self.get_work_queues(): diff --git a/tests/agent/test_agent_run_submission.py b/tests/agent/test_agent_run_submission.py index 76aa24d8d1c8..3681e56eb78d 100644 --- a/tests/agent/test_agent_run_submission.py +++ b/tests/agent/test_agent_run_submission.py @@ -8,6 +8,7 @@ from prefect import flow from prefect.agent import OrionAgent from prefect.blocks.core import Block +from prefect.client.orion import OrionClient from prefect.exceptions import Abort, CrashedRun, FailedRun from prefect.infrastructure.base import Infrastructure from prefect.orion import models, schemas @@ -886,3 +887,203 @@ async def test_agent_displays_message_on_work_queue_pause( await agent.get_and_submit_flow_runs() assert f"Work queue 'wq' ({work_queue.id}) is paused." in prefect_caplog.text + + +async def test_agent_with_work_pool_queue( + orion_client: OrionClient, + deployment: schemas.core.Deployment, + work_pool: schemas.core.WorkPool, + work_pool_queue: schemas.core.WorkPoolQueue, + enable_workers, +): + @flow + def foo(): + pass + + create_run_with_deployment = ( + lambda state: orion_client.create_flow_run_from_deployment( + deployment.id, state=state + ) + ) + + flow_runs = [ + await create_run_with_deployment(Pending()), + await create_run_with_deployment( + Scheduled(scheduled_time=pendulum.now("utc").subtract(days=1)) + ), + await create_run_with_deployment( + Scheduled(scheduled_time=pendulum.now("utc").add(seconds=5)) + ), + await create_run_with_deployment( + Scheduled(scheduled_time=pendulum.now("utc").add(seconds=5)) + ), + await create_run_with_deployment( + Scheduled(scheduled_time=pendulum.now("utc").add(seconds=20)) + ), + await create_run_with_deployment(Running()), + await create_run_with_deployment(Completed()), + await orion_client.create_flow_run(foo, state=Scheduled()), + ] + flow_run_ids = [run.id for run in flow_runs] + + # Pull runs from the work queue to get expected runs + work_pool_queue = await orion_client.read_work_pool_queue( + work_pool_name=work_pool.name, + work_pool_queue_name=work_pool_queue.name, + ) + responses = await orion_client.get_scheduled_flow_runs_for_work_pool_queues( + work_pool_name=work_pool.name, + work_pool_queue_names=[work_pool_queue.name], + scheduled_before=pendulum.now().add(seconds=10), + ) + work_queue_flow_run_ids = {response.flow_run.id for response in responses} + + # Should only include scheduled runs in the past or next prefetch seconds + # Should not include runs without deployments + assert work_queue_flow_run_ids == set(flow_run_ids[1:4]) + + agent = OrionAgent( + work_queues=[work_pool_queue.name], + work_pool_name=work_pool.name, + prefetch_seconds=10, + ) + + async with agent: + agent.submit_run = AsyncMock() # do not actually run anything + submitted_flow_runs = await agent.get_and_submit_flow_runs() + + submitted_flow_run_ids = {flow_run.id for flow_run in submitted_flow_runs} + assert submitted_flow_run_ids == work_queue_flow_run_ids + + +async def test_agent_with_work_pool( + orion_client: OrionClient, + deployment: schemas.core.Deployment, + work_pool: schemas.core.WorkPool, + work_pool_queue: schemas.core.WorkPoolQueue, + enable_workers, +): + @flow + def foo(): + pass + + create_run_with_deployment = ( + lambda state: orion_client.create_flow_run_from_deployment( + deployment.id, state=state + ) + ) + + flow_runs = [ + await create_run_with_deployment(Pending()), + await create_run_with_deployment( + Scheduled(scheduled_time=pendulum.now("utc").subtract(days=1)) + ), + await create_run_with_deployment( + Scheduled(scheduled_time=pendulum.now("utc").add(seconds=5)) + ), + await create_run_with_deployment( + Scheduled(scheduled_time=pendulum.now("utc").add(seconds=5)) + ), + await create_run_with_deployment( + Scheduled(scheduled_time=pendulum.now("utc").add(seconds=20)) + ), + await create_run_with_deployment(Running()), + await create_run_with_deployment(Completed()), + await orion_client.create_flow_run(foo, state=Scheduled()), + ] + flow_run_ids = [run.id for run in flow_runs] + + # Pull runs from the work queue to get expected runs + work_pool_queue = await orion_client.read_work_pool_queue( + work_pool_name=work_pool.name, + work_pool_queue_name=work_pool_queue.name, + ) + responses = await orion_client.get_scheduled_flow_runs_for_work_pool_queues( + work_pool_name=work_pool.name, + work_pool_queue_names=[work_pool_queue.name], + scheduled_before=pendulum.now().add(seconds=10), + ) + work_queue_flow_run_ids = {response.flow_run.id for response in responses} + + # Should only include scheduled runs in the past or next prefetch seconds + # Should not include runs without deployments + assert work_queue_flow_run_ids == set(flow_run_ids[1:4]) + + agent = OrionAgent( + work_pool_name=work_pool.name, + prefetch_seconds=10, + ) + + async with agent: + agent.submit_run = AsyncMock() # do not actually run anything + submitted_flow_runs = await agent.get_and_submit_flow_runs() + + submitted_flow_run_ids = {flow_run.id for flow_run in submitted_flow_runs} + assert submitted_flow_run_ids == work_queue_flow_run_ids + + +async def test_agent_with_work_pool_and_work_queue_prefix( + orion_client: OrionClient, + deployment: schemas.core.Deployment, + work_pool: schemas.core.WorkPool, + work_pool_queue: schemas.core.WorkPoolQueue, + enable_workers, +): + @flow + def foo(): + pass + + create_run_with_deployment = ( + lambda state: orion_client.create_flow_run_from_deployment( + deployment.id, state=state + ) + ) + + flow_runs = [ + await create_run_with_deployment(Pending()), + await create_run_with_deployment( + Scheduled(scheduled_time=pendulum.now("utc").subtract(days=1)) + ), + await create_run_with_deployment( + Scheduled(scheduled_time=pendulum.now("utc").add(seconds=5)) + ), + await create_run_with_deployment( + Scheduled(scheduled_time=pendulum.now("utc").add(seconds=5)) + ), + await create_run_with_deployment( + Scheduled(scheduled_time=pendulum.now("utc").add(seconds=20)) + ), + await create_run_with_deployment(Running()), + await create_run_with_deployment(Completed()), + await orion_client.create_flow_run(foo, state=Scheduled()), + ] + flow_run_ids = [run.id for run in flow_runs] + + # Pull runs from the work queue to get expected runs + work_pool_queue = await orion_client.read_work_pool_queue( + work_pool_name=work_pool.name, + work_pool_queue_name=work_pool_queue.name, + ) + responses = await orion_client.get_scheduled_flow_runs_for_work_pool_queues( + work_pool_name=work_pool.name, + work_pool_queue_names=[work_pool_queue.name], + scheduled_before=pendulum.now().add(seconds=10), + ) + work_queue_flow_run_ids = {response.flow_run.id for response in responses} + + # Should only include scheduled runs in the past or next prefetch seconds + # Should not include runs without deployments + assert work_queue_flow_run_ids == set(flow_run_ids[1:4]) + + agent = OrionAgent( + work_pool_name=work_pool.name, + work_queue_prefix="test", + prefetch_seconds=10, + ) + + async with agent: + agent.submit_run = AsyncMock() # do not actually run anything + submitted_flow_runs = await agent.get_and_submit_flow_runs() + + submitted_flow_run_ids = {flow_run.id for flow_run in submitted_flow_runs} + assert submitted_flow_run_ids == work_queue_flow_run_ids From d1dc665f9fb442d8ac69407eef4bdc826837ea1a Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Fri, 20 Jan 2023 21:39:01 -0600 Subject: [PATCH 5/9] Adds test for work pool queue creation by agent --- src/prefect/client/orion.py | 15 ++++++++++----- tests/agent/test_agent_run_submission.py | 18 ++++++++++++++++++ 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/src/prefect/client/orion.py b/src/prefect/client/orion.py index d93ea379486d..f1a8e966ec2b 100644 --- a/src/prefect/client/orion.py +++ b/src/prefect/client/orion.py @@ -2113,11 +2113,16 @@ async def read_work_pool_queue( Returns: The specified work pool queue. """ - response = await self._client.get( - f"/experimental/work_pools/{work_pool_name}/queues/{work_pool_queue_name}" - ) - - return pydantic.parse_obj_as(WorkPoolQueue, response.json()) + try: + response = await self._client.get( + f"/experimental/work_pools/{work_pool_name}/queues/{work_pool_queue_name}" + ) + return pydantic.parse_obj_as(WorkPoolQueue, response.json()) + except httpx.HTTPStatusError as e: + if e.response.status_code == status.HTTP_404_NOT_FOUND: + raise prefect.exceptions.ObjectNotFound(http_exc=e) from e + else: + raise async def create_work_pool_queue( self, diff --git a/tests/agent/test_agent_run_submission.py b/tests/agent/test_agent_run_submission.py index 3681e56eb78d..6fe624ac3019 100644 --- a/tests/agent/test_agent_run_submission.py +++ b/tests/agent/test_agent_run_submission.py @@ -257,6 +257,24 @@ async def test_agent_creates_work_queue_if_doesnt_exist(session, prefect_caplog) assert f"Created work queue '{name}'." in prefect_caplog.text +async def test_agent_creates_work_pool_queue_if_doesnt_exist( + session, work_pool, prefect_caplog, enable_workers +): + name = "hello-there" + assert not await models.workers.read_work_pool_queue_by_name( + session=session, work_pool_name=work_pool.name, work_pool_queue_name=name + ) + async with OrionAgent(work_queues=[name], work_pool_name=work_pool.name) as agent: + await agent.get_and_submit_flow_runs() + assert await models.workers.read_work_pool_queue_by_name( + session=session, work_pool_name=work_pool.name, work_pool_queue_name=name + ) + assert ( + f"Created work queue '{name}' in work pool '{work_pool.name}'." + in prefect_caplog.text + ) + + async def test_agent_does_not_create_work_queues_if_matching_with_prefix( session, prefect_caplog ): From b3620f0ebb48fc5fa38c684c1d9e583382073b05 Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Fri, 20 Jan 2023 21:41:04 -0600 Subject: [PATCH 6/9] Corrects docstring --- src/prefect/client/orion.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/prefect/client/orion.py b/src/prefect/client/orion.py index f1a8e966ec2b..23a062e2a0f0 100644 --- a/src/prefect/client/orion.py +++ b/src/prefect/client/orion.py @@ -2104,7 +2104,7 @@ async def read_work_pool_queue( self, work_pool_name: str, work_pool_queue_name: str ) -> schemas.core.WorkPoolQueue: """ - Retrieves queues for a work pool. + Retrieves a given queue for a work pool. Args: work_pool_name: Name of the work pool the queue belong to. From c4e66a8f363383e018145fc02164762b87bcd56a Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Sat, 21 Jan 2023 13:10:08 -0600 Subject: [PATCH 7/9] Adds checking for when pool option is used --- src/prefect/cli/agent.py | 6 +++++- tests/cli/test_agent.py | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/src/prefect/cli/agent.py b/src/prefect/cli/agent.py index ed450092efd2..4391c4581de8 100644 --- a/src/prefect/cli/agent.py +++ b/src/prefect/cli/agent.py @@ -107,13 +107,17 @@ async def start( style="blue", ) - if not work_queues and not tags and not work_queue_prefix: + if not work_queues and not tags and not work_queue_prefix and not work_pool_name: exit_with_error("No work queues provided!", style="red") elif bool(work_queues) + bool(tags) + bool(work_queue_prefix) > 1: exit_with_error( "Only one of `work_queues`, `match`, or `tags` can be provided.", style="red", ) + if work_pool_name and tags: + exit_with_error( + "`tag` and `pool` options cannot be used together.", style="red" + ) if tags: work_queue_name = f"Agent queue {'-'.join(sorted(tags))}" diff --git a/tests/cli/test_agent.py b/tests/cli/test_agent.py index e3fc2480ca5e..742ef1632fa3 100644 --- a/tests/cli/test_agent.py +++ b/tests/cli/test_agent.py @@ -174,3 +174,35 @@ def test_start_agent_with_work_queue_match_and_work_queue(): expected_output_contains="Only one of `work_queues`, `match`, or `tags` can be provided.", expected_code=1, ) + + +def test_start_agent_with_just_work_pool(monkeypatch): + mock_agent = MagicMock() + monkeypatch.setattr(prefect.cli.agent, "OrionAgent", mock_agent) + invoke_and_assert( + command=["agent", "start", "--pool", "test-pool", "--run-once"], + expected_code=0, + ) + mock_agent.assert_called_once_with( + work_queues=[], + work_queue_prefix=[], + work_pool_name="test-pool", + prefetch_seconds=ANY, + limit=None, + ) + + +def test_start_agent_errors_with_work_pool_and_tags(): + invoke_and_assert( + command=[ + "agent", + "start", + "--pool", + "test-pool", + "--run-once", + "--tag", + "test", + ], + expected_output_contains="`tag` and `pool` options cannot be used together.", + expected_code=1, + ) From 3bfea5d6bc605d46824423b3b8e00ff9f4653f96 Mon Sep 17 00:00:00 2001 From: Chris White Date: Sun, 22 Jan 2023 13:53:37 -0800 Subject: [PATCH 8/9] Tweaks to agents <-> work pools (#8223) --- orion-ui/src/components/ContextSidebar.vue | 2 +- orion-ui/src/pages/WorkPool.vue | 7 +-- src/prefect/agent.py | 56 +++++++++++----------- src/prefect/cli/agent.py | 7 ++- src/prefect/cli/deployment.py | 9 ++++ src/prefect/client/orion.py | 2 +- src/prefect/orion/api/workers.py | 16 +++---- src/prefect/orion/schemas/actions.py | 31 ++++++------ 8 files changed, 70 insertions(+), 60 deletions(-) diff --git a/orion-ui/src/components/ContextSidebar.vue b/orion-ui/src/components/ContextSidebar.vue index 93118c1677a2..54b0226bc13a 100644 --- a/orion-ui/src/components/ContextSidebar.vue +++ b/orion-ui/src/components/ContextSidebar.vue @@ -3,7 +3,7 @@ - + diff --git a/orion-ui/src/pages/WorkPool.vue b/orion-ui/src/pages/WorkPool.vue index 7baf1528f3dc..b06071bf502e 100644 --- a/orion-ui/src/pages/WorkPool.vue +++ b/orion-ui/src/pages/WorkPool.vue @@ -17,9 +17,6 @@ -