Skip to content

Commit

Permalink
Add run_deployment to create flow runs for deployments (#7047)
Browse files Browse the repository at this point in the history
Co-authored-by: Chris White <chris@prefect.io>
Co-authored-by: Michael Adkins <michael@prefect.io>
  • Loading branch information
3 people authored Oct 6, 2022
1 parent fab0783 commit 94b9e76
Show file tree
Hide file tree
Showing 5 changed files with 406 additions and 4 deletions.
62 changes: 62 additions & 0 deletions src/prefect/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
import importlib
import json
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from uuid import UUID

import anyio
import pendulum
import yaml
from pydantic import BaseModel, Field, parse_obj_as, validator

Expand All @@ -29,6 +32,65 @@
from prefect.utilities.importtools import import_object


@sync_compatible
@inject_client
async def run_deployment(
name: str,
client: OrionClient = None,
parameters: dict = None,
scheduled_time: datetime = None,
timeout: float = None,
poll_interval: float = 5,
):
"""
Create a flow run for a deployment and return it after completion or a timeout.
This function will return when the created flow run enters any terminal state or
the timeout is reached. If the timeout is reached and the flow run has not reached
a terminal state, it will still be returned. When using a timeout, we suggest
checking the state of the flow run if completion is important moving forward.
Args:
name: The deployment name in the form: '<flow-name>/<deployment-name>'
parameters: Parameter overrides for this flow run. Merged with the deployment
defaults
scheduled_time: The time to schedule the flow run for, defaults to scheduling
the flow run to start now.
timeout: The amount of time to wait for the flow run to complete before
returning. Setting `timeout` to 0 will return the flow run immediately.
Setting `timeout` to None will allow this function to poll indefinitely.
Defaults to None
poll_interval: The number of seconds between polls
"""
if timeout is not None and timeout < 0:
raise ValueError("`timeout` cannot be negative")

if scheduled_time is None:
scheduled_time = pendulum.now("UTC")

deployment = await client.read_deployment_by_name(name)
flow_run = await client.create_flow_run_from_deployment(
deployment.id,
state=schemas.states.Scheduled(scheduled_time=scheduled_time),
parameters=parameters,
)

flow_run_id = flow_run.id

if timeout == 0:
return flow_run

with anyio.move_on_after(timeout):
while True:
flow_run = await client.read_flow_run(flow_run_id)
flow_state = flow_run.state
if flow_state and flow_state.is_final():
return flow_run
await anyio.sleep(poll_interval)

return flow_run


@inject_client
async def load_flow_from_flow_run(
flow_run: schemas.core.FlowRun, client: OrionClient, ignore_storage: bool = False
Expand Down
11 changes: 9 additions & 2 deletions src/prefect/orion/models/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ async def schedule_runs(
start_time: datetime.datetime = None,
end_time: datetime.datetime = None,
max_runs: int = None,
auto_scheduled: bool = True,
) -> List[UUID]:
"""
Schedule flow runs for a deployment
Expand Down Expand Up @@ -392,6 +393,7 @@ async def schedule_runs(
start_time=start_time,
end_time=end_time,
max_runs=max_runs,
auto_scheduled=auto_scheduled,
)
return await _insert_scheduled_flow_runs(session=session, runs=runs)

Expand All @@ -404,6 +406,7 @@ async def _generate_scheduled_flow_runs(
end_time: datetime.datetime,
max_runs: int,
db: OrionDBInterface,
auto_scheduled: bool = True,
) -> List[Dict]:
"""
Given a `deployment_id` and schedule, generates a list of flow run objects and
Expand Down Expand Up @@ -436,6 +439,10 @@ async def _generate_scheduled_flow_runs(
n=max_runs, start=start_time, end=end_time
)

tags = deployment.tags
if auto_scheduled:
tags = ["auto-scheduled"] + tags

for date in dates:
runs.append(
{
Expand All @@ -446,8 +453,8 @@ async def _generate_scheduled_flow_runs(
"parameters": deployment.parameters,
"infrastructure_document_id": deployment.infrastructure_document_id,
"idempotency_key": f"scheduled {deployment.id} {date}",
"tags": ["auto-scheduled"] + deployment.tags,
"auto_scheduled": True,
"tags": tags,
"auto_scheduled": auto_scheduled,
"state": schemas.states.Scheduled(
scheduled_time=date,
message="Flow run scheduled",
Expand Down
2 changes: 1 addition & 1 deletion tests/fixtures/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def cleanup_all_new_docker_objects(docker: DockerClient, worker_id: str):

@pytest.mark.timeout(120)
@pytest.fixture(scope="session")
def prefect_base_image(pytestconfig: pytest.Config, docker: DockerClient):
def prefect_base_image(pytestconfig: "pytest.Config", docker: DockerClient):
"""Ensure that the prefect dev image is available and up-to-date"""
image_name = get_prefect_image_name()

Expand Down
30 changes: 30 additions & 0 deletions tests/orion/models/test_deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,36 @@ async def test_schedule_runs_applies_tags(self, tags, flow, flow_function, sessi
)
assert run.tags == ["auto-scheduled"] + tags

@pytest.mark.parametrize("tags", [[], ["foo"]])
async def test_schedule_runs_does_not_set_auto_schedule_tags(
self, tags, flow, flow_function, session
):
deployment = await models.deployments.create_deployment(
session=session,
deployment=schemas.core.Deployment(
name="My Deployment",
manifest_path="file.json",
schedule=schemas.schedules.IntervalSchedule(
interval=datetime.timedelta(days=1)
),
flow_id=flow.id,
tags=tags,
),
)
scheduled_runs = await models.deployments.schedule_runs(
session,
deployment_id=deployment.id,
max_runs=2,
auto_scheduled=False,
)
assert len(scheduled_runs) == 2
for run_id in scheduled_runs:
run = await models.flow_runs.read_flow_run(
session=session, flow_run_id=run_id
)
assert run.tags == tags
run.auto_scheduled = False

async def test_schedule_runs_applies_work_queue(self, flow, flow_function, session):
deployment = await models.deployments.create_deployment(
session=session,
Expand Down
Loading

0 comments on commit 94b9e76

Please sign in to comment.