Skip to content

Commit

Permalink
Update critical_service_loop to throw a runtime error on failure (P…
Browse files Browse the repository at this point in the history
  • Loading branch information
zanieb authored and Åsmund Østvold committed May 11, 2023
1 parent d77a868 commit ec7ee01
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 18 deletions.
7 changes: 3 additions & 4 deletions src/prefect/utilities/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ async def critical_service_loop(
failures.append((exc, sys.exc_info()[-1]))
else:
raise
except KeyboardInterrupt:
return

# Decide whether to exit now based on recent history.
#
Expand All @@ -89,7 +87,7 @@ async def critical_service_loop(
# We've failed enough times to be sure something is wrong, the writing is
# on the wall. Let's explain what we've seen and exit.
printer(
f"\nFailed the last {consecutive} attempts. "
f"\nFailed the last {consecutive} attempts. "
"Please check your environment and configuration."
)

Expand All @@ -102,7 +100,8 @@ async def critical_service_loop(
for exception, traceback in failures_by_type:
printer("".join(format_exception(None, exception, traceback)))
printer()
return

raise RuntimeError("Service exceeded error threshold.")

if run_once:
return
Expand Down
52 changes: 38 additions & 14 deletions tests/utilities/test_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
from prefect.utilities.services import critical_service_loop


class UncapturedException(BaseException):
pass


async def test_critical_service_loop_operates_normally():
workload = AsyncMock(
side_effect=[
Expand All @@ -15,15 +19,25 @@ async def test_critical_service_loop_operates_normally():
None,
None,
None,
KeyboardInterrupt,
UncapturedException,
]
)

await critical_service_loop(workload, 0.0)
with pytest.raises(UncapturedException):
await critical_service_loop(workload, 0.0)

assert workload.await_count == 6


async def test_critical_service_loop_does_not_capture_keyboard_interrupt():
workload = AsyncMock(side_effect=KeyboardInterrupt)

with pytest.raises(KeyboardInterrupt):
await critical_service_loop(workload, 0.0)

assert workload.await_count == 1


async def test_tolerates_single_intermittent_error():
workload = AsyncMock(
side_effect=[
Expand All @@ -32,11 +46,12 @@ async def test_tolerates_single_intermittent_error():
None,
None,
None,
KeyboardInterrupt,
UncapturedException,
]
)

await critical_service_loop(workload, 0.0)
with pytest.raises(UncapturedException):
await critical_service_loop(workload, 0.0)

assert workload.await_count == 6

Expand All @@ -49,11 +64,12 @@ async def test_tolerates_two_consecutive_errors():
httpx.TimeoutException("oofta"),
None,
None,
KeyboardInterrupt,
UncapturedException,
]
)

await critical_service_loop(workload, 0.0)
with pytest.raises(UncapturedException):
await critical_service_loop(workload, 0.0)

assert workload.await_count == 6

Expand All @@ -66,11 +82,12 @@ async def test_tolerates_majority_errors():
httpx.TimeoutException("oofta"),
httpx.TimeoutException("boo"),
None,
KeyboardInterrupt,
UncapturedException,
]
)

await critical_service_loop(workload, 0.0)
with pytest.raises(UncapturedException):
await critical_service_loop(workload, 0.0)

assert workload.await_count == 6

Expand All @@ -84,11 +101,12 @@ async def test_quits_after_3_consecutive_errors(capsys: pytest.CaptureFixture):
httpx.TimeoutException("boo"),
httpx.ConnectError("woops"),
None,
KeyboardInterrupt,
None,
]
)

await critical_service_loop(workload, 0.0, consecutive=3)
with pytest.raises(RuntimeError, match="Service exceeded error threshold"):
await critical_service_loop(workload, 0.0, consecutive=3)

assert workload.await_count == 4
result = capsys.readouterr()
Expand All @@ -106,13 +124,16 @@ async def test_consistent_sleeps_between_loops(monkeypatch):
None,
None,
None,
KeyboardInterrupt,
UncapturedException,
]
)
sleeper = AsyncMock()

monkeypatch.setattr("prefect.utilities.services.anyio.sleep", sleeper)
await critical_service_loop(workload, 0.0)

with pytest.raises(UncapturedException):
await critical_service_loop(workload, 0.0)

assert workload.await_count == 6

sleep_times = [call.args[0] for call in sleeper.await_args_list]
Expand All @@ -133,13 +154,16 @@ async def test_jittered_sleeps_between_loops(monkeypatch):
None,
None,
None,
KeyboardInterrupt,
UncapturedException,
]
)
sleeper = AsyncMock()

monkeypatch.setattr("prefect.utilities.services.anyio.sleep", sleeper)
await critical_service_loop(workload, 42, jitter_range=0.3)

with pytest.raises(UncapturedException):
await critical_service_loop(workload, 42, jitter_range=0.3)

assert workload.await_count == 6

sleep_times = [call.args[0] for call in sleeper.await_args_list]
Expand Down

0 comments on commit ec7ee01

Please sign in to comment.