Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow runs from deployments #7047

Merged
merged 50 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
84ab10e
Spike out immediately-scheduled run endpoint
anticorrelator Sep 22, 2022
2e2d81f
Spike out run-deployment coordination utility
anticorrelator Sep 23, 2022
e031671
Add basic error handling
anticorrelator Sep 26, 2022
50e16f9
Merge branch 'main' of github.com:PrefectHQ/prefect into single-deplo…
anticorrelator Sep 28, 2022
2d5cb50
Merge branch 'main' of github.com:PrefectHQ/prefect into single-deplo…
anticorrelator Sep 29, 2022
3834076
Add scheduled run coordination utility
anticorrelator Sep 29, 2022
4e0b65a
Allow parameter customization
anticorrelator Sep 30, 2022
c844c9d
Improve route names
anticorrelator Sep 30, 2022
518e1e1
Start adding coordination plane tests
anticorrelator Sep 30, 2022
015ee52
Test all terminal state types
anticorrelator Sep 30, 2022
09ddd58
Add schedule deployment tests
anticorrelator Sep 30, 2022
454be65
Update API test
anticorrelator Sep 30, 2022
21f65af
Remove needless import
anticorrelator Sep 30, 2022
3bd733d
Merge branch 'main' into single-deployment-runs
anticorrelator Sep 30, 2022
6721675
anticorrelator Sep 30, 2022
2d18a1d
isort
anticorrelator Sep 30, 2022
4c0ab89
Revert module name and unintentional Block changes
anticorrelator Oct 1, 2022
810ba00
Only include `auto-schedule` tag if necessary
anticorrelator Oct 1, 2022
3febd46
Finish reverting unintentional Block changes
anticorrelator Oct 1, 2022
3e004b0
Prevent flow name collisions
anticorrelator Oct 3, 2022
91d24c2
Merge branch 'main' of github.com:PrefectHQ/prefect into single-deplo…
anticorrelator Oct 3, 2022
e526a2a
Ensure that parametrized test order is deterministic
anticorrelator Oct 3, 2022
8fe69f5
Preserve auto-scheduled tag order
anticorrelator Oct 3, 2022
f03668c
Coordination -> client/utilities
anticorrelator Oct 5, 2022
2583b49
Use Orion client
anticorrelator Oct 5, 2022
10a38f1
Add async tests
anticorrelator Oct 5, 2022
5c8abb6
Return flow runs / implement "fire and forget"
anticorrelator Oct 5, 2022
55208fd
Implement unlimited polling
anticorrelator Oct 5, 2022
6f5e1cb
Remove unused exceptions
anticorrelator Oct 5, 2022
894fb67
Remove needless imports
anticorrelator Oct 5, 2022
76656c9
Remove extra route
anticorrelator Oct 5, 2022
5129bc5
anticorrelator Oct 5, 2022
2abfb42
Merge branch 'main' of github.com:PrefectHQ/prefect into single-deplo…
anticorrelator Oct 5, 2022
bb0970f
Remove unused model methods
anticorrelator Oct 5, 2022
8691414
Remove needless imports
anticorrelator Oct 5, 2022
5630b75
Remove another needless import
anticorrelator Oct 5, 2022
d8d7d1a
Improve utility docstring
anticorrelator Oct 5, 2022
2d19f1a
Update src/prefect/client/utilities.py
anticorrelator Oct 5, 2022
b2b1277
Test auto-schedule kwarg
anticorrelator Oct 5, 2022
cd6da72
Add ability to set a scheduled time
anticorrelator Oct 5, 2022
434fe73
Add `scheduled_time` to docstring
anticorrelator Oct 5, 2022
005e34d
client/utilities -> deployments
anticorrelator Oct 6, 2022
42eb408
Use `timeout` semantics instead of `max_polls`
anticorrelator Oct 6, 2022
7b2b240
anticorrelator Oct 6, 2022
bbbd20f
⚫ keeps flipping back and forth
anticorrelator Oct 6, 2022
58c9187
Indicate that client is optional
zanieb Oct 6, 2022
6153308
Refer to the timeout instead of polling duration in docstring
zanieb Oct 6, 2022
c138b2f
Minor copy in docstring
zanieb Oct 6, 2022
c7f469c
Fix pytest.Config type hint
zanieb Oct 6, 2022
f90fe51
Attempt to fix ephemeral API test
zanieb Oct 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions src/prefect/client/utilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import time

from prefect.client.orion import get_client
from prefect.utilities.asyncutils import sync_compatible


@sync_compatible
async def run_deployment(
cicdw marked this conversation as resolved.
Show resolved Hide resolved
name: str,
parameters: dict = None,
max_polls: int = 60,
zanieb marked this conversation as resolved.
Show resolved Hide resolved
poll_interval: float = 5,
):
"""
Runs a deployment immediately and returns a FlowRun object.

This function will return when the created flow run enters a terminal state or until
the polling duration has been exceeded.

Args:
name: The deployment name in the form: '<flow-name>/<deployment-name>'
parameters: Parameter overrides for this flow run. Merged with the deployment
defaults
max_polls: The maxinum number of times to poll the flow run before returning.
anticorrelator marked this conversation as resolved.
Show resolved Hide resolved
Setting `max_polls` to 0 will return the FlowRun object immediately. Setting
`max_polls` to -1 will allow this function to poll indefinitely.
"""
if max_polls < -1:
raise ValueError(
"`max_polls` must be -1 (unlimited polling), 0 (return immediately) or any positive integer"
)

async with get_client() as client:
zanieb marked this conversation as resolved.
Show resolved Hide resolved
deployment = await client.read_deployment_by_name(name)
flow_run = await client.create_flow_run_from_deployment(
deployment.id,
parameters=parameters,
)

flow_run_id = flow_run.id

polls = 0
while max_polls == -1 or polls < max_polls:
zanieb marked this conversation as resolved.
Show resolved Hide resolved
time.sleep(poll_interval)
zanieb marked this conversation as resolved.
Show resolved Hide resolved
flow_run = await client.read_flow_run(flow_run_id)
flow_state = flow_run.state
polls += 1

if flow_state and flow_state.is_final():
return flow_run

return flow_run
9 changes: 7 additions & 2 deletions src/prefect/orion/models/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ async def _generate_scheduled_flow_runs(
end_time: datetime.datetime,
max_runs: int,
db: OrionDBInterface,
auto_scheduled: bool = True,
) -> List[Dict]:
"""
Given a `deployment_id` and schedule, generates a list of flow run objects and
Expand Down Expand Up @@ -436,6 +437,10 @@ async def _generate_scheduled_flow_runs(
n=max_runs, start=start_time, end=end_time
)

tags = deployment.tags
if auto_scheduled:
tags = ["auto-scheduled"] + tags
cicdw marked this conversation as resolved.
Show resolved Hide resolved

for date in dates:
runs.append(
{
Expand All @@ -446,8 +451,8 @@ async def _generate_scheduled_flow_runs(
"parameters": deployment.parameters,
"infrastructure_document_id": deployment.infrastructure_document_id,
"idempotency_key": f"scheduled {deployment.id} {date}",
"tags": ["auto-scheduled"] + deployment.tags,
"auto_scheduled": True,
"tags": tags,
"auto_scheduled": auto_scheduled,
"state": schemas.states.Scheduled(
scheduled_time=date,
message="Flow run scheduled",
Expand Down
272 changes: 272 additions & 0 deletions tests/client/test_client_utilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
import re
from uuid import uuid4

import pytest
import respx
from httpx import Response

from prefect import flow
from prefect.client.utilities import run_deployment
from prefect.deployments import Deployment
from prefect.orion.schemas import states
from prefect.settings import PREFECT_API_URL
from prefect.testing.cli import invoke_and_assert


@flow
def ad_hoc_flow():
pass


@pytest.fixture
def dep_path():
return "./dog.py"


@pytest.fixture
def patch_import(monkeypatch):
@flow(description="Need a non-trivial description here.", version="A")
def fn():
pass

monkeypatch.setattr("prefect.utilities.importtools.import_object", lambda path: fn)


@pytest.fixture
def test_deployment(patch_import, tmp_path):
d = Deployment(
name="TEST",
flow_name="fn",
)
deployment_id = d.apply()

invoke_and_assert(
[
"deployment",
"build",
"fake-path.py:fn",
"-n",
"TEST",
"-o",
str(tmp_path / "test.yaml"),
"--apply",
],
expected_code=0,
expected_output_contains=[
f"Deployment '{d.flow_name}/{d.name}' successfully created with id '{deployment_id}'."
],
temp_dir=tmp_path,
)
return d, deployment_id


class TestRunDeployment:
@pytest.mark.parametrize(
"terminal_state", list(sorted(s.name for s in states.TERMINAL_STATES))
)
def test_running_a_deployment_blocks_until_termination(
self,
test_deployment,
use_hosted_orion,
terminal_state,
):
d, deployment_id = test_deployment

mock_flowrun_response = {
"id": str(uuid4()),
"flow_id": str(uuid4()),
}

with respx.mock(
base_url=PREFECT_API_URL.value(),
assert_all_mocked=True,
) as router:
poll_responses = [
Response(
200, json={**mock_flowrun_response, "state": {"type": "PENDING"}}
),
Response(
200, json={**mock_flowrun_response, "state": {"type": "RUNNING"}}
),
Response(
200,
json={**mock_flowrun_response, "state": {"type": terminal_state}},
),
]

router.get(f"/deployments/name/{d.flow_name}/{d.name}").pass_through()
router.post(f"/deployments/{deployment_id}/create_flow_run").pass_through()
flow_polls = router.get(re.compile("/flow_runs/.*")).mock(
side_effect=poll_responses
)

assert (
run_deployment(
f"{d.flow_name}/{d.name}", max_polls=5, poll_interval=0
).state.type
== terminal_state
), "run_deployment does not exit on {terminal_state}"
assert len(flow_polls.calls) == 3

@pytest.mark.parametrize(
"terminal_state", list(sorted(s.name for s in states.TERMINAL_STATES))
)
async def test_running_a_deployment_blocks_until_termination_async(
self,
test_deployment,
use_hosted_orion,
terminal_state,
):
d, deployment_id = test_deployment

mock_flowrun_response = {
"id": str(uuid4()),
"flow_id": str(uuid4()),
}

async with respx.mock(
base_url=PREFECT_API_URL.value(),
assert_all_mocked=True,
) as router:
poll_responses = [
Response(
200, json={**mock_flowrun_response, "state": {"type": "PENDING"}}
),
Response(
200, json={**mock_flowrun_response, "state": {"type": "RUNNING"}}
),
Response(
200,
json={**mock_flowrun_response, "state": {"type": terminal_state}},
),
]

router.get(f"/deployments/name/{d.flow_name}/{d.name}").pass_through()
router.post(f"/deployments/{deployment_id}/create_flow_run").pass_through()
flow_polls = router.get(re.compile("/flow_runs/.*")).mock(
side_effect=poll_responses
)

assert (
await run_deployment(
f"{d.flow_name}/{d.name}", max_polls=5, poll_interval=0
)
).state.type == terminal_state, (
"run_deployment does not exit on {terminal_state}"
)
assert len(flow_polls.calls) == 3

def test_ephemeral_api_works(
self,
test_deployment,
orion_client,
):
d, deployment_id = test_deployment

assert run_deployment(
f"{d.flow_name}/{d.name}", max_polls=5, poll_interval=0
).state.is_scheduled()

def test_returns_flow_run_on_max_polls(
self,
test_deployment,
use_hosted_orion,
):
d, deployment_id = test_deployment

mock_flowrun_response = {
"id": str(uuid4()),
"flow_id": str(uuid4()),
}

with respx.mock(
base_url=PREFECT_API_URL.value(), assert_all_mocked=True
) as router:
router.get(f"/deployments/name/{d.flow_name}/{d.name}").pass_through()
router.post(f"/deployments/{deployment_id}/create_flow_run").pass_through()
flow_polls = router.request(
"GET", re.compile(PREFECT_API_URL.value() + "/flow_runs/.*")
).mock(
return_value=Response(
200, json={**mock_flowrun_response, "state": {"type": "SCHEDULED"}}
)
)

flow_run = run_deployment(
f"{d.flow_name}/{d.name}", max_polls=5, poll_interval=0
)
assert len(flow_polls.calls) == 5
assert flow_run.state.is_scheduled()

def test_returns_flow_run_immediately_when_max_polls_is_zero(
self,
test_deployment,
use_hosted_orion,
):
d, deployment_id = test_deployment

mock_flowrun_response = {
"id": str(uuid4()),
"flow_id": str(uuid4()),
}

with respx.mock(
base_url=PREFECT_API_URL.value(),
assert_all_mocked=True,
assert_all_called=False,
) as router:
router.get(f"/deployments/name/{d.flow_name}/{d.name}").pass_through()
router.post(f"/deployments/{deployment_id}/create_flow_run").pass_through()
flow_polls = router.request(
"GET", re.compile(PREFECT_API_URL.value() + "/flow_runs/.*")
).mock(
return_value=Response(
200, json={**mock_flowrun_response, "state": {"type": "SCHEDULED"}}
)
)

flow_run = run_deployment(
f"{d.flow_name}/{d.name}", max_polls=0, poll_interval=0
)
assert len(flow_polls.calls) == 0
assert flow_run.state.is_scheduled()

def test_polls_indefinitely(
self,
test_deployment,
use_hosted_orion,
):
class LotsOfPolls(Exception):
pass

d, deployment_id = test_deployment

mock_flowrun_response = {
"id": str(uuid4()),
"flow_id": str(uuid4()),
}

side_effects = [
Response(
200, json={**mock_flowrun_response, "state": {"type": "SCHEDULED"}}
)
] * 99
side_effects.append(
Response(
200, json={**mock_flowrun_response, "state": {"type": "COMPLETED"}}
)
)

with respx.mock(
base_url=PREFECT_API_URL.value(),
assert_all_mocked=True,
assert_all_called=False,
) as router:
router.get(f"/deployments/name/{d.flow_name}/{d.name}").pass_through()
router.post(f"/deployments/{deployment_id}/create_flow_run").pass_through()
flow_polls = router.request(
"GET", re.compile(PREFECT_API_URL.value() + "/flow_runs/.*")
).mock(side_effect=side_effects)

run_deployment(f"{d.flow_name}/{d.name}", max_polls=-1, poll_interval=0)
assert len(flow_polls.calls) == 100