Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent workers from running flow runs scheduled for in process retry #15482

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/3.0/api-ref/rest-api/server/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -19164,6 +19164,22 @@
"title": "Resuming",
"description": "Indicates if this run is resuming from a pause.",
"default": false
},
"retry_type": {
"anyOf": [
{
"type": "string",
"enum": [
"in_process",
"reschedule"
]
},
{
"type": "null"
}
],
"title": "Retry Type",
"description": "The type of retry this run is undergoing."
}
},
"type": "object",
Expand Down
3 changes: 3 additions & 0 deletions src/prefect/client/schemas/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,9 @@ class FlowRunPolicy(PrefectBaseModel):
resuming: Optional[bool] = Field(
default=False, description="Indicates if this run is resuming from a pause."
)
retry_type: Optional[Literal["in_process", "reschedule"]] = Field(
default=None, description="The type of retry this run is undergoing."
)

@model_validator(mode="before")
@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ FROM
WHERE
fr.work_queue_id = wq.id
AND fr.state_type = 'SCHEDULED'
AND (fr.empirical_policy->>'retry_type' IS NULL OR fr.empirical_policy->>'retry_type' != 'in_process')
{% if scheduled_after %}
AND fr.next_scheduled_start_time >= :scheduled_after
{% endif %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ scheduled_flow_runs AS (
ROW_NUMBER() OVER (PARTITION BY work_queue_id ORDER BY next_scheduled_start_time) AS work_queue_rank
FROM flow_run fr
WHERE fr.state_type = 'SCHEDULED'
AND (json_extract(fr.empirical_policy, '$.retry_type') IS NULL OR json_extract(fr.empirical_policy, '$.retry_type') != 'in_process')
{% if scheduled_after %}
AND fr.next_scheduled_start_time >= :scheduled_after
{% endif %}
Expand Down
5 changes: 5 additions & 0 deletions src/prefect/server/orchestration/core_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ async def before_transition(
updated_policy = context.run.empirical_policy.model_dump()
updated_policy["resuming"] = False
updated_policy["pause_keys"] = set()
updated_policy["retry_type"] = "in_process"
context.run.empirical_policy = core.FlowRunPolicy(**updated_policy)

# Generate a new state for the flow
Expand Down Expand Up @@ -1067,6 +1068,10 @@ async def before_transition(
updated_policy = context.run.empirical_policy.model_dump()
updated_policy["resuming"] = False
updated_policy["pause_keys"] = set()
if proposed_state.is_scheduled():
updated_policy["retry_type"] = "reschedule"
else:
updated_policy["retry_type"] = None
context.run.empirical_policy = core.FlowRunPolicy(**updated_policy)

async def cleanup(
Expand Down
5 changes: 3 additions & 2 deletions src/prefect/server/schemas/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ class Flow(ORMBaseModel):
class FlowRunPolicy(PrefectBaseModel):
"""Defines of how a flow run should retry."""

# TODO: Determine how to separate between infrastructure and within-process level
# retries
max_retries: int = Field(
default=0,
description=(
Expand All @@ -121,6 +119,9 @@ class FlowRunPolicy(PrefectBaseModel):
resuming: Optional[bool] = Field(
default=False, description="Indicates if this run is resuming from a pause."
)
retry_type: Optional[Literal["in_process", "reschedule"]] = Field(
default=None, description="The type of retry this run is undergoing."
)

@model_validator(mode="before")
def populate_deprecated_fields(cls, values):
Expand Down
3 changes: 2 additions & 1 deletion tests/server/database/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async def deployment_3(self, session, flow, work_queue_2):

@pytest.fixture
async def fr_1(self, session, deployment_1):
return await models.flow_runs.create_flow_run(
flow_run = await models.flow_runs.create_flow_run(
session=session,
flow_run=schemas.core.FlowRun(
name="fr1",
Expand All @@ -58,6 +58,7 @@ async def fr_1(self, session, deployment_1):
state=schemas.states.Scheduled(pendulum.now("UTC").subtract(minutes=2)),
),
)
return flow_run

@pytest.fixture
async def fr_2(self, session, deployment_2):
Expand Down
55 changes: 55 additions & 0 deletions tests/server/orchestration/test_core_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,30 @@ async def test_stops_retrying_eventually(
assert ctx.response_status == SetStateStatus.ACCEPT
assert ctx.validated_state_type == states.StateType.FAILED

async def test_sets_retry_type(
self,
session,
initialize_orchestration,
):
retry_policy = [RetryFailedFlows]
initial_state_type = states.StateType.RUNNING
proposed_state_type = states.StateType.FAILED
intended_transition = (initial_state_type, proposed_state_type)
ctx = await initialize_orchestration(
session,
"flow",
*intended_transition,
)
ctx.run.run_count = 2
ctx.run_settings.retries = 3

async with contextlib.AsyncExitStack() as stack:
for rule in retry_policy:
ctx = await stack.enter_async_context(rule(ctx, *intended_transition))
await ctx.validate_proposed_state()

assert ctx.run.empirical_policy.retry_type == "in_process"


class TestManualFlowRetries:
async def test_can_manual_retry_with_arbitrary_state_name(
Expand Down Expand Up @@ -655,6 +679,37 @@ async def test_manual_retrying_bypasses_terminal_state_protection(
assert ctx.response_status == SetStateStatus.ACCEPT
assert ctx.run.run_count == 3

@pytest.mark.parametrize(
"proposed_state_type",
[states.StateType.SCHEDULED, states.StateType.FAILED],
)
async def test_manual_retry_updates_retry_type(
self,
session,
initialize_orchestration,
proposed_state_type,
):
manual_retry_policy = [HandleFlowTerminalStateTransitions]
initial_state_type = states.StateType.FAILED
intended_transition = (initial_state_type, proposed_state_type)
ctx = await initialize_orchestration(
session,
"flow",
*intended_transition,
)
ctx.proposed_state.name = "AwaitingRetry"
ctx.run.deployment_id = uuid4()
ctx.run.run_count = 2

async with contextlib.AsyncExitStack() as stack:
for rule in manual_retry_policy:
ctx = await stack.enter_async_context(rule(ctx, *intended_transition))

if proposed_state_type == states.StateType.SCHEDULED:
assert ctx.run.empirical_policy.retry_type == "reschedule"
else:
assert ctx.run.empirical_policy.retry_type is None


class TestUpdatingFlowRunTrackerOnTasks:
@pytest.mark.parametrize(
Expand Down
77 changes: 75 additions & 2 deletions tests/workers/test_base_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from prefect.client.base import ServerType
from prefect.client.orchestration import PrefectClient, get_client
from prefect.client.schemas import FlowRun
from prefect.client.schemas.objects import WorkerMetadata
from prefect.client.schemas.objects import StateType, WorkerMetadata
from prefect.exceptions import (
CrashedRun,
ObjectNotFound,
Expand All @@ -34,7 +34,13 @@
get_current_settings,
temporary_settings,
)
from prefect.states import Completed, Pending, Running, Scheduled
from prefect.states import (
Completed,
Failed,
Pending,
Running,
Scheduled,
)
from prefect.testing.utilities import AsyncMock
from prefect.utilities.pydantic import parse_obj_as
from prefect.workers.base import (
Expand Down Expand Up @@ -300,6 +306,73 @@ def create_run_with_deployment_2(state):
assert {flow_run.id for flow_run in submitted_flow_runs} == set(flow_run_ids[1:3])


async def test_workers_do_not_submit_flow_runs_awaiting_retry(
prefect_client: PrefectClient,
work_queue_1,
work_pool,
):
"""
Regression test for https://github.com/PrefectHQ/prefect/issues/15458

Ensure that flows in `AwaitingRetry` state are not submitted by workers. Previously,
with a retry delay long enough, workers would pick up flow runs in `AwaitingRetry`
state and submit them, even though the process they were initiated from is responsible
for retrying them.

The flows would be picked up by the worker because `AwaitingRetry` is a `SCHEDULED`
state type.

This test goes through the following steps:
- Create a flow
- Create a deployment for the flow
- Create a flow run for the deployment
- Set the flow run to `Running`
- Set the flow run to failed
- The server will reject this transition and put the flow run in an `AwaitingRetry` state
- Have the worker pick up any available flow runs to make sure that the flow run in `AwaitingRetry` state
is not picked up by the worker
"""

@flow(retries=2)
def test_flow():
pass

flow_id = await prefect_client.create_flow(
flow=test_flow,
)
deployment_id = await prefect_client.create_deployment(
flow_id=flow_id,
name="test-deployment",
work_queue_name=work_queue_1.name,
work_pool_name=work_pool.name,
)
flow_run = await prefect_client.create_flow_run_from_deployment(
deployment_id, state=Running()
)
# Need to update empirical policy so the server is aware of the retries
flow_run.empirical_policy.retries = 2
await prefect_client.update_flow_run(
flow_run_id=flow_run.id,
flow_version=test_flow.version,
empirical_policy=flow_run.empirical_policy,
)
# Set the flow run to failed
response = await prefect_client.set_flow_run_state(flow_run.id, state=Failed())
# The transition should be rejected and the flow run should be in `AwaitingRetry` state
assert response.state.name == "AwaitingRetry"
assert response.state.type == StateType.SCHEDULED

flow_run = await prefect_client.read_flow_run(flow_run.id)
# Check to ensure that the flow has a scheduled time earlier than now to rule out
# that the worker doesn't pick up the flow run due to its scheduled time being in the future
assert flow_run.state.state_details.scheduled_time < pendulum.now("utc")

async with WorkerTestImpl(work_pool_name=work_pool.name) as worker:
submitted_flow_runs = await worker.get_and_submit_flow_runs()

assert submitted_flow_runs == []


async def test_priority_trumps_lateness(
prefect_client: PrefectClient,
worker_deployment_wq1,
Expand Down