diff --git a/src/prefect/deployments.py b/src/prefect/deployments.py index 9ddaf98c9191..e9cb7b6eb2eb 100644 --- a/src/prefect/deployments.py +++ b/src/prefect/deployments.py @@ -5,10 +5,13 @@ import importlib import json import sys +from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional from uuid import UUID +import anyio +import pendulum import yaml from pydantic import BaseModel, Field, parse_obj_as, validator @@ -29,6 +32,65 @@ from prefect.utilities.importtools import import_object +@sync_compatible +@inject_client +async def run_deployment( + name: str, + client: OrionClient = None, + parameters: dict = None, + scheduled_time: datetime = None, + timeout: float = None, + poll_interval: float = 5, +): + """ + Create a flow run for a deployment and return it after completion or a timeout. + + This function will return when the created flow run enters any terminal state or + the timeout is reached. If the timeout is reached and the flow run has not reached + a terminal state, it will still be returned. When using a timeout, we suggest + checking the state of the flow run if completion is important moving forward. + + Args: + name: The deployment name in the form: '/' + parameters: Parameter overrides for this flow run. Merged with the deployment + defaults + scheduled_time: The time to schedule the flow run for, defaults to scheduling + the flow run to start now. + timeout: The amount of time to wait for the flow run to complete before + returning. Setting `timeout` to 0 will return the flow run immediately. + Setting `timeout` to None will allow this function to poll indefinitely. + Defaults to None + poll_interval: The number of seconds between polls + """ + if timeout is not None and timeout < 0: + raise ValueError("`timeout` cannot be negative") + + if scheduled_time is None: + scheduled_time = pendulum.now("UTC") + + deployment = await client.read_deployment_by_name(name) + flow_run = await client.create_flow_run_from_deployment( + deployment.id, + state=schemas.states.Scheduled(scheduled_time=scheduled_time), + parameters=parameters, + ) + + flow_run_id = flow_run.id + + if timeout == 0: + return flow_run + + with anyio.move_on_after(timeout): + while True: + flow_run = await client.read_flow_run(flow_run_id) + flow_state = flow_run.state + if flow_state and flow_state.is_final(): + return flow_run + await anyio.sleep(poll_interval) + + return flow_run + + @inject_client async def load_flow_from_flow_run( flow_run: schemas.core.FlowRun, client: OrionClient, ignore_storage: bool = False diff --git a/src/prefect/orion/models/deployments.py b/src/prefect/orion/models/deployments.py index 7b02ce24eeb6..5530440ab4b6 100644 --- a/src/prefect/orion/models/deployments.py +++ b/src/prefect/orion/models/deployments.py @@ -361,6 +361,7 @@ async def schedule_runs( start_time: datetime.datetime = None, end_time: datetime.datetime = None, max_runs: int = None, + auto_scheduled: bool = True, ) -> List[UUID]: """ Schedule flow runs for a deployment @@ -392,6 +393,7 @@ async def schedule_runs( start_time=start_time, end_time=end_time, max_runs=max_runs, + auto_scheduled=auto_scheduled, ) return await _insert_scheduled_flow_runs(session=session, runs=runs) @@ -404,6 +406,7 @@ async def _generate_scheduled_flow_runs( end_time: datetime.datetime, max_runs: int, db: OrionDBInterface, + auto_scheduled: bool = True, ) -> List[Dict]: """ Given a `deployment_id` and schedule, generates a list of flow run objects and @@ -436,6 +439,10 @@ async def _generate_scheduled_flow_runs( n=max_runs, start=start_time, end=end_time ) + tags = deployment.tags + if auto_scheduled: + tags = ["auto-scheduled"] + tags + for date in dates: runs.append( { @@ -446,8 +453,8 @@ async def _generate_scheduled_flow_runs( "parameters": deployment.parameters, "infrastructure_document_id": deployment.infrastructure_document_id, "idempotency_key": f"scheduled {deployment.id} {date}", - "tags": ["auto-scheduled"] + deployment.tags, - "auto_scheduled": True, + "tags": tags, + "auto_scheduled": auto_scheduled, "state": schemas.states.Scheduled( scheduled_time=date, message="Flow run scheduled", diff --git a/tests/fixtures/docker.py b/tests/fixtures/docker.py index d8c4adf4fb6a..489fabd132d8 100644 --- a/tests/fixtures/docker.py +++ b/tests/fixtures/docker.py @@ -67,7 +67,7 @@ def cleanup_all_new_docker_objects(docker: DockerClient, worker_id: str): @pytest.mark.timeout(120) @pytest.fixture(scope="session") -def prefect_base_image(pytestconfig: pytest.Config, docker: DockerClient): +def prefect_base_image(pytestconfig: "pytest.Config", docker: DockerClient): """Ensure that the prefect dev image is available and up-to-date""" image_name = get_prefect_image_name() diff --git a/tests/orion/models/test_deployments.py b/tests/orion/models/test_deployments.py index c42b77ab03b3..457f8d625b55 100644 --- a/tests/orion/models/test_deployments.py +++ b/tests/orion/models/test_deployments.py @@ -621,6 +621,36 @@ async def test_schedule_runs_applies_tags(self, tags, flow, flow_function, sessi ) assert run.tags == ["auto-scheduled"] + tags + @pytest.mark.parametrize("tags", [[], ["foo"]]) + async def test_schedule_runs_does_not_set_auto_schedule_tags( + self, tags, flow, flow_function, session + ): + deployment = await models.deployments.create_deployment( + session=session, + deployment=schemas.core.Deployment( + name="My Deployment", + manifest_path="file.json", + schedule=schemas.schedules.IntervalSchedule( + interval=datetime.timedelta(days=1) + ), + flow_id=flow.id, + tags=tags, + ), + ) + scheduled_runs = await models.deployments.schedule_runs( + session, + deployment_id=deployment.id, + max_runs=2, + auto_scheduled=False, + ) + assert len(scheduled_runs) == 2 + for run_id in scheduled_runs: + run = await models.flow_runs.read_flow_run( + session=session, flow_run_id=run_id + ) + assert run.tags == tags + run.auto_scheduled = False + async def test_schedule_runs_applies_work_queue(self, flow, flow_function, session): deployment = await models.deployments.create_deployment( session=session, diff --git a/tests/test_deployments.py b/tests/test_deployments.py index d3817a182f4d..69f281263b4f 100644 --- a/tests/test_deployments.py +++ b/tests/test_deployments.py @@ -1,12 +1,22 @@ +import re +from uuid import uuid4 + +import pendulum import pytest +import respx import yaml +from httpx import Response from pydantic.error_wrappers import ValidationError +from prefect import flow from prefect.blocks.core import Block -from prefect.deployments import Deployment +from prefect.deployments import Deployment, run_deployment from prefect.exceptions import BlockMissingCapabilities from prefect.filesystems import S3, GitHub, LocalFileSystem from prefect.infrastructure import DockerContainer, Infrastructure, Process +from prefect.orion.schemas import states +from prefect.settings import PREFECT_API_URL +from prefect.testing.cli import invoke_and_assert class TestDeploymentBasicInterface: @@ -409,3 +419,296 @@ def test_yaml_comment_for_work_queue(self, tmp_path): "# The work queue that will handle this deployment's runs\n" ) assert contents[comment_index + 1] == "work_queue_name: default\n" + + +@flow +def ad_hoc_flow(): + pass + + +@pytest.fixture +def dep_path(): + return "./dog.py" + + +@pytest.fixture +def patch_import(monkeypatch): + @flow(description="Need a non-trivial description here.", version="A") + def fn(): + pass + + monkeypatch.setattr("prefect.utilities.importtools.import_object", lambda path: fn) + + +@pytest.fixture +def test_deployment(patch_import, tmp_path): + d = Deployment( + name="TEST", + flow_name="fn", + ) + deployment_id = d.apply() + + invoke_and_assert( + [ + "deployment", + "build", + "fake-path.py:fn", + "-n", + "TEST", + "-o", + str(tmp_path / "test.yaml"), + "--apply", + ], + expected_code=0, + expected_output_contains=[ + f"Deployment '{d.flow_name}/{d.name}' successfully created with id '{deployment_id}'." + ], + temp_dir=tmp_path, + ) + return d, deployment_id + + +class TestRunDeployment: + @pytest.mark.parametrize( + "terminal_state", list(sorted(s.name for s in states.TERMINAL_STATES)) + ) + def test_running_a_deployment_blocks_until_termination( + self, + test_deployment, + use_hosted_orion, + terminal_state, + ): + d, deployment_id = test_deployment + + mock_flowrun_response = { + "id": str(uuid4()), + "flow_id": str(uuid4()), + } + + with respx.mock( + base_url=PREFECT_API_URL.value(), + assert_all_mocked=True, + ) as router: + poll_responses = [ + Response( + 200, json={**mock_flowrun_response, "state": {"type": "PENDING"}} + ), + Response( + 200, json={**mock_flowrun_response, "state": {"type": "RUNNING"}} + ), + Response( + 200, + json={**mock_flowrun_response, "state": {"type": terminal_state}}, + ), + ] + + router.get(f"/deployments/name/{d.flow_name}/{d.name}").pass_through() + router.post(f"/deployments/{deployment_id}/create_flow_run").pass_through() + flow_polls = router.get(re.compile("/flow_runs/.*")).mock( + side_effect=poll_responses + ) + + assert ( + run_deployment( + f"{d.flow_name}/{d.name}", timeout=2, poll_interval=0 + ).state.type + == terminal_state + ), "run_deployment does not exit on {terminal_state}" + assert len(flow_polls.calls) == 3 + + @pytest.mark.parametrize( + "terminal_state", list(sorted(s.name for s in states.TERMINAL_STATES)) + ) + async def test_running_a_deployment_blocks_until_termination_async( + self, + test_deployment, + use_hosted_orion, + terminal_state, + ): + d, deployment_id = test_deployment + + mock_flowrun_response = { + "id": str(uuid4()), + "flow_id": str(uuid4()), + } + + async with respx.mock( + base_url=PREFECT_API_URL.value(), + assert_all_mocked=True, + ) as router: + poll_responses = [ + Response( + 200, json={**mock_flowrun_response, "state": {"type": "PENDING"}} + ), + Response( + 200, json={**mock_flowrun_response, "state": {"type": "RUNNING"}} + ), + Response( + 200, + json={**mock_flowrun_response, "state": {"type": terminal_state}}, + ), + ] + + router.get(f"/deployments/name/{d.flow_name}/{d.name}").pass_through() + router.post(f"/deployments/{deployment_id}/create_flow_run").pass_through() + flow_polls = router.get(re.compile("/flow_runs/.*")).mock( + side_effect=poll_responses + ) + + assert ( + await run_deployment( + f"{d.flow_name}/{d.name}", + timeout=2, + poll_interval=0, + ) + ).state.type == terminal_state, ( + "run_deployment does not exit on {terminal_state}" + ) + assert len(flow_polls.calls) == 3 + + async def test_run_deployment_with_ephemeral_api( + self, + test_deployment, + orion_client, + ): + d, deployment_id = test_deployment + + flow_run = await run_deployment( + f"{d.flow_name}/{d.name}", + timeout=2, + poll_interval=0, + client=orion_client, + ) + assert flow_run.deployment_id == deployment_id + assert flow_run.state + + def test_returns_flow_run_on_timeout( + self, + test_deployment, + use_hosted_orion, + ): + d, deployment_id = test_deployment + + mock_flowrun_response = { + "id": str(uuid4()), + "flow_id": str(uuid4()), + } + + with respx.mock( + base_url=PREFECT_API_URL.value(), assert_all_mocked=True + ) as router: + router.get(f"/deployments/name/{d.flow_name}/{d.name}").pass_through() + router.post(f"/deployments/{deployment_id}/create_flow_run").pass_through() + flow_polls = router.request( + "GET", re.compile(PREFECT_API_URL.value() + "/flow_runs/.*") + ).mock( + return_value=Response( + 200, json={**mock_flowrun_response, "state": {"type": "SCHEDULED"}} + ) + ) + + flow_run = run_deployment( + f"{d.flow_name}/{d.name}", timeout=1, poll_interval=0 + ) + assert len(flow_polls.calls) > 0 + assert flow_run.state + + def test_returns_flow_run_immediately_when_timeout_is_zero( + self, + test_deployment, + use_hosted_orion, + ): + d, deployment_id = test_deployment + + mock_flowrun_response = { + "id": str(uuid4()), + "flow_id": str(uuid4()), + } + + with respx.mock( + base_url=PREFECT_API_URL.value(), + assert_all_mocked=True, + assert_all_called=False, + ) as router: + router.get(f"/deployments/name/{d.flow_name}/{d.name}").pass_through() + router.post(f"/deployments/{deployment_id}/create_flow_run").pass_through() + flow_polls = router.request( + "GET", re.compile(PREFECT_API_URL.value() + "/flow_runs/.*") + ).mock( + return_value=Response( + 200, json={**mock_flowrun_response, "state": {"type": "SCHEDULED"}} + ) + ) + + flow_run = run_deployment( + f"{d.flow_name}/{d.name}", timeout=0, poll_interval=0 + ) + assert len(flow_polls.calls) == 0 + assert flow_run.state.is_scheduled() + + def test_polls_indefinitely( + self, + test_deployment, + use_hosted_orion, + ): + + d, deployment_id = test_deployment + + mock_flowrun_response = { + "id": str(uuid4()), + "flow_id": str(uuid4()), + } + + side_effects = [ + Response( + 200, json={**mock_flowrun_response, "state": {"type": "SCHEDULED"}} + ) + ] * 99 + side_effects.append( + Response( + 200, json={**mock_flowrun_response, "state": {"type": "COMPLETED"}} + ) + ) + + with respx.mock( + base_url=PREFECT_API_URL.value(), + assert_all_mocked=True, + assert_all_called=False, + ) as router: + router.get(f"/deployments/name/{d.flow_name}/{d.name}").pass_through() + router.post(f"/deployments/{deployment_id}/create_flow_run").pass_through() + flow_polls = router.request( + "GET", re.compile(PREFECT_API_URL.value() + "/flow_runs/.*") + ).mock(side_effect=side_effects) + + run_deployment(f"{d.flow_name}/{d.name}", timeout=None, poll_interval=0) + assert len(flow_polls.calls) == 100 + + def test_schedule_deployment_schedules_immediately_by_default( + self, test_deployment, use_hosted_orion + ): + d, deployment_id = test_deployment + + scheduled_time = pendulum.now() + flow_run = run_deployment( + f"{d.flow_name}/{d.name}", + timeout=0, + poll_interval=0, + ) + + assert (flow_run.expected_start_time - scheduled_time).total_seconds() < 1 + + def test_schedule_deployment_accepts_custom_scheduled_time( + self, test_deployment, use_hosted_orion + ): + d, deployment_id = test_deployment + + scheduled_time = pendulum.now() + pendulum.Duration(minutes=5) + flow_run = run_deployment( + f"{d.flow_name}/{d.name}", + scheduled_time=scheduled_time, + timeout=0, + poll_interval=0, + ) + + assert (flow_run.expected_start_time - scheduled_time).total_seconds() < 1