Skip to content

Commit

Permalink
Add test for allow_failure and quote (#8055)
Browse files Browse the repository at this point in the history
Co-authored-by: Michael Adkins <michael@prefect.io>
  • Loading branch information
ahuang11 and zanieb authored Jan 6, 2023
1 parent 136aecc commit 216c030
Showing 1 changed file with 46 additions and 1 deletion.
47 changes: 46 additions & 1 deletion src/prefect/testing/standard_test_suites/task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
from prefect import flow, task
from prefect.client.schemas import TaskRun
from prefect.deprecated.data_documents import DataDocument
from prefect.logging import get_run_logger
from prefect.orion.schemas.states import StateType
from prefect.states import State
from prefect.states import Crashed, State
from prefect.task_runners import BaseTaskRunner, TaskConcurrencyType
from prefect.testing.utilities import exceptions_equal
from prefect.utilities.annotations import allow_failure, quote


class TaskRunnerStandardTestSuite(ABC):
Expand Down Expand Up @@ -654,3 +656,46 @@ async def test_flow():
await test_flow()

assert tmp_file.read_text() == "foo"

def test_allow_failure(self, task_runner, caplog):
@task
def failing_task():
raise ValueError("This is expected")

@task
def depdendent_task():
logger = get_run_logger()
logger.info("Dependent task still runs!")
return 1

@task
def another_dependent_task():
logger = get_run_logger()
logger.info("Sub-dependent task still runs!")
return 1

@flow(task_runner=task_runner)
def test_flow():
ft = failing_task.submit()
dt = depdendent_task.submit(wait_for=[allow_failure(ft)])
another_dependent_task.submit(wait_for=[dt])

with pytest.raises(ValueError, match="This is expected"):
test_flow()
assert len(caplog.records) == 2
assert caplog.records[0].msg == "Dependent task still runs!"
assert caplog.records[1].msg == "Sub-dependent task still runs!"

def test_passing_quoted_state(self, task_runner):
@task
def test_task():
state = Crashed()
return quote(state)

@flow(task_runner=task_runner)
def test_flow():
return test_task()

result = test_flow()
assert isinstance(result, quote)
assert isinstance(result.unquote(), State)

0 comments on commit 216c030

Please sign in to comment.