diff --git a/docs/configuration.rst b/docs/configuration.rst index 2616230b..24b7e019 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -364,3 +364,36 @@ e.g. by arbitrarily labelling them, so that a new diff-base is generated: kubectl label kex -l somelabel=somevalue ping=pong Then, switch to the new storage alone, without the transitional setup. + + +Error throttling +================ + +To prevent uncontrollable flood of activities in case of errors that prevent +the resources being marked as handled, which could lead to Kubernetes API +flooding, it is possible to throttle the activities on a per-resource basis: + +.. code-block:: python + + import kopf + + @kopf.on.startup() + def configure(settings: kopf.OperatorSettings, **_): + settings.batching.error_delays = [10, 20, 30] + +In that case, all unhandled errors in the framework or in the Kubernetes API +would be backed-off by 10s after the 1st error, then by 20s after the 2nd one, +and then by 30s after the 3rd, 4th, 5th errors and so on. On a first success, +the backoff intervals will be reset and re-used again on the next error. + +The default is a sequence of Fibonacci numbers from 1 second to 10 minutes. + +The back-offs are not persisted, so they are lost on the operator restarts. + +These back-offs do not cover errors in the handlers -- the handlers have their +own configurable per-handler back-off intervals. These back-offs are for Kopf +and for Kubernetes API mostly (and other environment issues). + +To disable throttling (on your own risk!), set the error delays to +an empty list (``[]``) or an empty tuple (``()``). +Interpret as: no throttling delays set -- no throttling sleeps done. diff --git a/kopf/reactor/effects.py b/kopf/reactor/effects.py index 3e8495f8..ea26b007 100644 --- a/kopf/reactor/effects.py +++ b/kopf/reactor/effects.py @@ -20,13 +20,16 @@ """ import asyncio import collections +import contextlib import datetime import logging -from typing import Collection, Optional, Union +import time +from typing import AsyncGenerator, Collection, Iterable, Optional, Tuple, Type, Union from kopf.clients import patching from kopf.engines import loggers -from kopf.structs import bodies, configuration, dicts, diffs, patches, primitives, resources +from kopf.structs import bodies, configuration, containers, dicts, \ + diffs, patches, primitives, resources # How often to wake up from the long sleep, to show liveness in the logs. WAITING_KEEPALIVE_INTERVAL = 10 * 60 @@ -139,6 +142,10 @@ async def sleep_or_wait( actual_delays = [delay for delay in passed_delays if delay is not None] minimal_delay = min(actual_delays) if actual_delays else 0 + # Do not go for the real low-level system sleep if there is no need to sleep. + if minimal_delay <= 0: + return None + awakening_event = ( wakeup.async_event if isinstance(wakeup, primitives.DaemonStopper) else wakeup if wakeup is not None else @@ -154,3 +161,66 @@ async def sleep_or_wait( end_time = loop.time() duration = end_time - start_time return max(0, minimal_delay - duration) + + +@contextlib.asynccontextmanager +async def throttled( + *, + throttler: containers.Throttler, + delays: Iterable[float], + wakeup: Optional[Union[asyncio.Event, primitives.DaemonStopper]] = None, + logger: Union[logging.Logger, logging.LoggerAdapter], + errors: Union[Type[BaseException], Tuple[Type[BaseException], ...]] = Exception, +) -> AsyncGenerator[bool, None]: + """ + A helper to throttle any arbitrary operation. + """ + + # The 1st sleep: if throttling is already active, but was interrupted by a queue replenishment. + # It is needed to properly process the latest known event after the successful sleep. + if throttler.active_until is not None: + remaining_time = throttler.active_until - time.monotonic() + unslept_time = await sleep_or_wait(remaining_time, wakeup=wakeup) + if unslept_time is None: + logger.info("Throttling is over. Switching back to normal operations.") + throttler.active_until = None + + # Run only if throttling either is not active initially, or has just finished sleeping. + should_run = throttler.active_until is None + try: + yield should_run + + except Exception as e: + + # If it is not an error-of-interest, escalate normally. BaseExceptions are escalated always. + if not isinstance(e, errors): + raise + + # If the code does not follow the recommendation to not run, escalate. + if not should_run: + raise + + # Activate throttling if not yet active, or reuse the active sequence of delays. + if throttler.source_of_delays is None: + throttler.source_of_delays = iter(delays) + + # Choose a delay. If there are none, avoid throttling at all. + throttle_delay = next(throttler.source_of_delays, throttler.last_used_delay) + if throttle_delay is not None: + throttler.last_used_delay = throttle_delay + throttler.active_until = time.monotonic() + throttle_delay + logger.exception(f"Throttling for {throttle_delay} seconds due to an unexpected error:") + + else: + # Reset the throttling. Release the iterator to keep the memory free during normal run. + if should_run: + throttler.source_of_delays = throttler.last_used_delay = None + + # The 2nd sleep: if throttling has been just activated (i.e. there was a fresh error). + # It is needed to have better logging/sleeping without workers exiting for "no events". + if throttler.active_until is not None and should_run: + remaining_time = throttler.active_until - time.monotonic() + unslept_time = await sleep_or_wait(remaining_time, wakeup=wakeup) + if unslept_time is None: + throttler.active_until = None + logger.info("Throttling is over. Switching back to normal operations.") diff --git a/kopf/reactor/processing.py b/kopf/reactor/processing.py index b8665591..19d47332 100644 --- a/kopf/reactor/processing.py +++ b/kopf/reactor/processing.py @@ -39,11 +39,7 @@ async def process_resource_event( Convert the low-level events, as provided by the watching/queueing tasks, to the high-level causes, and then call the cause-handling logic. - - All the internally provoked changes are intercepted, do not create causes, - and therefore do not call the handling logic. """ - finalizer = settings.persistence.finalizer # Recall what is stored about that object. Share it in little portions with the consumers. # And immediately forget it if the object is deleted from the cluster (but keep in memory). @@ -62,11 +58,64 @@ async def process_resource_event( body = memory.live_fresh_body if memory.live_fresh_body is not None else bodies.Body(raw_body) patch = patches.Patch() - # Each object has its own prefixed logger, to distinguish parallel handling. - logger = loggers.ObjectLogger(body=body, settings=settings) - posting.event_queue_loop_var.set(asyncio.get_running_loop()) - posting.event_queue_var.set(event_queue) # till the end of this object's task. + # Throttle the non-handler-related errors. The regular event watching/batching continues + # to prevent queue overfilling, but the processing is skipped (events are ignored). + # Choice of place: late enough to have a per-resource memory for a throttler; also, a logger. + # But early enough to catch environment errors from K8s API, and from most of the complex code. + async with effects.throttled( + throttler=memory.error_throttler, + logger=loggers.LocalObjectLogger(body=body, settings=settings), + delays=settings.batching.error_delays, + wakeup=replenished, + ) as should_run: + if should_run: + + # Each object has its own prefixed logger, to distinguish parallel handling. + logger = loggers.ObjectLogger(body=body, settings=settings) + posting.event_queue_loop_var.set(asyncio.get_running_loop()) + posting.event_queue_var.set(event_queue) # till the end of this object's task. + + # Do the magic -- do the job. + delays = await process_resource_causes( + lifecycle=lifecycle, + registry=registry, + settings=settings, + resource=resource, + raw_event=raw_event, + body=body, + patch=patch, + logger=logger, + memory=memory, + ) + + # Whatever was done, apply the accumulated changes to the object, or sleep-n-touch for delays. + # But only once, to reduce the number of API calls and the generated irrelevant events. + # And only if the object is at least supposed to exist (not "GONE"), even if actually does not. + if raw_event['type'] != 'DELETED': + await effects.apply( + settings=settings, + resource=resource, + body=body, + patch=patch, + logger=logger, + delays=delays, + replenished=replenished, + ) + + +async def process_resource_causes( + lifecycle: lifecycles.LifeCycleFn, + registry: registries.OperatorRegistry, + settings: configuration.OperatorSettings, + resource: resources.Resource, + raw_event: bodies.RawEvent, + body: bodies.Body, + patch: patches.Patch, + logger: loggers.ObjectLogger, + memory: containers.ResourceMemory, +) -> Collection[float]: + finalizer = settings.persistence.finalizer extra_fields = registry.resource_changing_handlers[resource].get_extra_fields() old = settings.persistence.diffbase_storage.fetch(body=body) new = settings.persistence.diffbase_storage.build(body=body, extra_fields=extra_fields) @@ -173,19 +222,7 @@ async def process_resource_event( logger.debug("Removing the finalizer, thus allowing the actual deletion.") finalizers.allow_deletion(body=body, patch=patch, finalizer=finalizer) - # Whatever was done, apply the accumulated changes to the object, or sleep-n-touch for delays. - # But only once, to reduce the number of API calls and the generated irrelevant events. - # And only if the object is at least supposed to exist (not "GONE"), even if actually does not. - if raw_event['type'] != 'DELETED': - await effects.apply( - settings=settings, - resource=resource, - body=body, - patch=patch, - logger=logger, - delays=list(resource_spawning_delays) + list(resource_changing_delays), - replenished=replenished, - ) + return list(resource_spawning_delays) + list(resource_changing_delays) async def process_resource_watching_cause( diff --git a/kopf/structs/configuration.py b/kopf/structs/configuration.py index f860b588..51d6579b 100644 --- a/kopf/structs/configuration.py +++ b/kopf/structs/configuration.py @@ -27,7 +27,7 @@ """ import concurrent.futures import dataclasses -from typing import Optional +from typing import Iterable, Optional from kopf import config # for legacy defaults only from kopf.storage import diffbase, progress @@ -122,6 +122,24 @@ class BatchingSettings: This is the time given to the worker to deplete and process the queue. """ + error_delays: Iterable[float] = (1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610) + """ + Backoff intervals in case of unexpected errors in the framework (not the handlers). + + Per-resource workers freeze all activities for this number of seconds in case of errors. + Once they are back to work, they process only the latest event seen (due to event batching). + + Every further error leads to the next, even bigger delay (10m is enough for a default maximum). + Every success resets the backoff intervals, and it goes from the beginning on the next error. + + If needed, this value can be an arbitrary collection/iterator/object: + only ``iter()`` is called on every new throttling cycle, no other protocols + are required; but make sure that it is re-iterable for multiple uses. + + To disable throttling (on your own risk), set it to ``[]`` or ``()``. + """ + + @dataclasses.dataclass class ExecutionSettings: diff --git a/kopf/structs/containers.py b/kopf/structs/containers.py index ed35883a..1a3bb60e 100644 --- a/kopf/structs/containers.py +++ b/kopf/structs/containers.py @@ -31,6 +31,14 @@ class Daemon: stopper: primitives.DaemonStopper # a signaller for the termination and its reason. +@dataclasses.dataclass(frozen=False) +class Throttler: + """ A state of throttling for one specific purpose (there can be a few). """ + source_of_delays: Optional[Iterator[float]] = None + last_used_delay: Optional[float] = None + active_until: Optional[float] = None # internal clock + + class Memo(Dict[Any, Any]): """ A container to hold arbitrary keys-fields assigned by the users. """ @@ -61,6 +69,9 @@ class ResourceMemory: noticed_by_listing: bool = False fully_handled_once: bool = False + # Throttling for API errors (mostly from PATCHing) and for processing in general. + error_throttler: Throttler = dataclasses.field(default_factory=Throttler) + # For background and timed threads/tasks (invoked with the kwargs of the last-seen body). live_fresh_body: Optional[bodies.Body] = None idle_reset_time: float = dataclasses.field(default_factory=time.monotonic) diff --git a/tests/settings/test_defaults.py b/tests/settings/test_defaults.py index 12d546cf..f2110c9f 100644 --- a/tests/settings/test_defaults.py +++ b/tests/settings/test_defaults.py @@ -14,5 +14,6 @@ async def test_declared_public_interface_and_promised_defaults(): assert settings.batching.idle_timeout == 5.0 assert settings.batching.exit_timeout == 2.0 assert settings.batching.batch_window == 0.1 + assert settings.batching.error_delays == (1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610) assert settings.execution.executor is not None assert settings.execution.max_workers is None diff --git a/tests/test_sleeping.py b/tests/timing/test_sleeping.py similarity index 86% rename from tests/test_sleeping.py rename to tests/timing/test_sleeping.py index c7630168..61560eff 100644 --- a/tests/test_sleeping.py +++ b/tests/timing/test_sleeping.py @@ -26,9 +26,14 @@ async def test_specific_delays_only_are_awaited(timer): assert unslept is None -async def test_passed_delays_skip_sleeping(timer): +@pytest.mark.parametrize('delays', [ + pytest.param([1000, -10], id='mixed-signs'), + pytest.param([-100, -10], id='all-negative'), + pytest.param(-10, id='alone'), +]) +async def test_negative_delays_skip_sleeping(timer, delays): with timer: - unslept = await asyncio.wait_for(sleep_or_wait([0.10, -10]), timeout=1.0) + unslept = await asyncio.wait_for(sleep_or_wait(delays), timeout=1.0) assert timer.seconds < 0.01 assert unslept is None diff --git a/tests/timing/test_throttling.py b/tests/timing/test_throttling.py new file mode 100644 index 00000000..8eb91466 --- /dev/null +++ b/tests/timing/test_throttling.py @@ -0,0 +1,317 @@ +import asyncio +import logging +from unittest.mock import call + +import pytest + +from kopf.reactor.effects import throttled +from kopf.structs.containers import Throttler + + +@pytest.fixture(autouse=True) +def clock(mocker): + return mocker.patch('time.monotonic', return_value=0) + + +@pytest.fixture(autouse=True) +def sleep(mocker): + return mocker.patch('kopf.reactor.effects.sleep_or_wait', return_value=None) + + +async def test_remains_inactive_on_success(): + logger = logging.getLogger() + throttler = Throttler() + async with throttled(throttler=throttler, logger=logger, delays=[123]): + pass + assert throttler.source_of_delays is None + assert throttler.last_used_delay is None + + +@pytest.mark.parametrize('exc_cls, kwargs', [ + pytest.param(BaseException, dict(), id='none'), + pytest.param(BaseException, dict(errors=BaseException), id='base'), + pytest.param(Exception, dict(errors=ValueError), id='child'), + pytest.param(RuntimeError, dict(errors=ValueError), id='sibling'), + pytest.param(RuntimeError, dict(errors=(ValueError, TypeError)), id='tuple'), +]) +async def test_escalates_unexpected_errors(exc_cls, kwargs): + logger = logging.getLogger() + throttler = Throttler() + with pytest.raises(exc_cls): + async with throttled(throttler=throttler, logger=logger, delays=[123], **kwargs): + raise exc_cls() + + +@pytest.mark.parametrize('exc_cls, kwargs', [ + pytest.param(Exception, dict(), id='none'), + pytest.param(RuntimeError, dict(errors=Exception), id='parent'), + pytest.param(RuntimeError, dict(errors=(RuntimeError, EnvironmentError)), id='tuple'), +]) +async def test_activates_on_expected_errors(exc_cls, kwargs): + logger = logging.getLogger() + throttler = Throttler() + async with throttled(throttler=throttler, logger=logger, delays=[123], **kwargs): + raise exc_cls() + assert throttler.source_of_delays is not None + assert throttler.last_used_delay is not None + + +async def test_sleeps_for_the_first_delay_when_inactive(sleep): + logger = logging.getLogger() + throttler = Throttler() + + async with throttled(throttler=throttler, logger=logger, delays=[123, 234]): + raise Exception() + + assert throttler.last_used_delay == 123 + assert throttler.source_of_delays is not None + assert next(throttler.source_of_delays) == 234 + + assert throttler.active_until is None # means: no sleep time left + assert sleep.mock_calls == [call(123, wakeup=None)] + + +async def test_sleeps_for_the_next_delay_when_active(sleep): + logger = logging.getLogger() + throttler = Throttler() + + async with throttled(throttler=throttler, logger=logger, delays=[123, 234]): + raise Exception() + + sleep.reset_mock() + async with throttled(throttler=throttler, logger=logger, delays=[...]): + raise Exception() + + assert throttler.last_used_delay == 234 + assert throttler.source_of_delays is not None + assert next(throttler.source_of_delays, 999) == 999 + + assert throttler.active_until is None # means: no sleep time left + assert sleep.mock_calls == [call(234, wakeup=None)] + + +async def test_sleeps_for_the_last_known_delay_when_depleted(sleep): + logger = logging.getLogger() + throttler = Throttler() + + async with throttled(throttler=throttler, logger=logger, delays=[123, 234]): + raise Exception() + + async with throttled(throttler=throttler, logger=logger, delays=[...]): + raise Exception() + + sleep.reset_mock() + async with throttled(throttler=throttler, logger=logger, delays=[...]): + raise Exception() + + assert throttler.last_used_delay == 234 + assert throttler.source_of_delays is not None + assert next(throttler.source_of_delays, 999) == 999 + + assert throttler.active_until is None # means: no sleep time left + assert sleep.mock_calls == [call(234, wakeup=None)] + + +async def test_resets_on_success(sleep): + logger = logging.getLogger() + throttler = Throttler() + + async with throttled(throttler=throttler, logger=logger, delays=[123]): + raise Exception() + + sleep.reset_mock() + async with throttled(throttler=throttler, logger=logger, delays=[...]): + pass + + assert throttler.last_used_delay is None + assert throttler.source_of_delays is None + assert throttler.active_until is None + assert sleep.mock_calls == [] + + +async def test_skips_on_no_delays(sleep): + logger = logging.getLogger() + throttler = Throttler() + + async with throttled(throttler=throttler, logger=logger, delays=[]): + raise Exception() + + assert throttler.last_used_delay is None + assert throttler.source_of_delays is not None + assert next(throttler.source_of_delays, 999) == 999 + + assert throttler.active_until is None # means: no sleep time left + assert sleep.mock_calls == [] + + +async def test_renews_on_repeated_failure(sleep): + logger = logging.getLogger() + throttler = Throttler() + + async with throttled(throttler=throttler, logger=logger, delays=[123]): + raise Exception() + + async with throttled(throttler=throttler, logger=logger, delays=[...]): + pass + + sleep.reset_mock() + async with throttled(throttler=throttler, logger=logger, delays=[234]): + raise Exception() + + assert throttler.last_used_delay is 234 + assert throttler.source_of_delays is not None + assert throttler.active_until is None + assert sleep.mock_calls == [call(234, wakeup=None)] + + +async def test_interruption(clock, sleep): + wakeup = asyncio.Event() + logger = logging.getLogger() + throttler = Throttler() + + clock.return_value = 1000 # simulated "now" + sleep.return_value = 55 # simulated sleep time left + async with throttled(throttler=throttler, logger=logger, delays=[123, 234], wakeup=wakeup): + raise Exception() + + assert throttler.last_used_delay == 123 + assert throttler.source_of_delays is not None + assert throttler.active_until == 1123 # means: some sleep time is left + assert sleep.mock_calls == [call(123, wakeup=wakeup)] + + +async def test_continuation_with_success(clock, sleep): + wakeup = asyncio.Event() + logger = logging.getLogger() + throttler = Throttler() + + clock.return_value = 1000 # simulated "now" + sleep.return_value = 55 # simulated sleep time left + async with throttled(throttler=throttler, logger=logger, delays=[123, 234], wakeup=wakeup): + raise Exception() + + sleep.reset_mock() + clock.return_value = 1077 # simulated "now" + sleep.return_value = None # simulated sleep time left + async with throttled(throttler=throttler, logger=logger, delays=[...], wakeup=wakeup): + pass + + assert throttler.last_used_delay is None + assert throttler.source_of_delays is None + assert throttler.active_until is None # means: no sleep time is left + assert sleep.mock_calls == [call(123 - 77, wakeup=wakeup)] + + +async def test_continuation_with_error(clock, sleep): + wakeup = asyncio.Event() + logger = logging.getLogger() + throttler = Throttler() + + clock.return_value = 1000 # simulated "now" + sleep.return_value = 55 # simulated sleep time left + async with throttled(throttler=throttler, logger=logger, delays=[123, 234], wakeup=wakeup): + raise Exception() + + sleep.reset_mock() + clock.return_value = 1077 # simulated "now" + sleep.return_value = None # simulated sleep time left + async with throttled(throttler=throttler, logger=logger, delays=[...], wakeup=wakeup): + raise Exception() + + assert throttler.last_used_delay == 234 + assert throttler.source_of_delays is not None + assert throttler.active_until is None # means: no sleep time is left + assert sleep.mock_calls == [call(123 - 77, wakeup=wakeup), call(234, wakeup=wakeup)] + + +async def test_continuation_when_overdue(clock, sleep): + wakeup = asyncio.Event() + logger = logging.getLogger() + throttler = Throttler() + + clock.return_value = 1000 # simulated "now" + sleep.return_value = 55 # simulated sleep time left + async with throttled(throttler=throttler, logger=logger, delays=[123, 234], wakeup=wakeup): + raise Exception() + + sleep.reset_mock() + clock.return_value = 2000 # simulated "now" + sleep.return_value = None # simulated sleep time left + async with throttled(throttler=throttler, logger=logger, delays=[...], wakeup=wakeup): + raise Exception() + + assert throttler.last_used_delay == 234 + assert throttler.source_of_delays is not None + assert throttler.active_until is None # means: no sleep time is left + assert sleep.mock_calls == [call(123 - 1000, wakeup=wakeup), call(234, wakeup=wakeup)] + + +async def test_recommends_running_initially(): + logger = logging.getLogger() + throttler = Throttler() + async with throttled(throttler=throttler, logger=logger, delays=[123]) as should_run: + remembered_should_run = should_run + assert remembered_should_run is True + + +async def test_recommends_skipping_immediately_after_interrupted_error(sleep): + logger = logging.getLogger() + throttler = Throttler() + + sleep.return_value = 33 # simulated sleep time left + async with throttled(throttler=throttler, logger=logger, delays=[123]): + raise Exception() + + sleep.return_value = 33 # simulated sleep time left + async with throttled(throttler=throttler, logger=logger, delays=[...]) as should_run: + remembered_should_run = should_run + + assert remembered_should_run is False + + +async def test_recommends_running_immediately_after_continued(sleep): + logger = logging.getLogger() + throttler = Throttler() + + sleep.return_value = 33 # simulated sleep time left + async with throttled(throttler=throttler, logger=logger, delays=[123]): + raise Exception() + + sleep.return_value = None # simulated sleep time left + async with throttled(throttler=throttler, logger=logger, delays=[...]) as should_run: + remembered_should_run = should_run + + assert remembered_should_run is True + + +async def test_logging_when_deactivates_immediately(caplog): + caplog.set_level(0) + logger = logging.getLogger() + throttler = Throttler() + + async with throttled(throttler=throttler, logger=logger, delays=[123]): + raise Exception() + + assert caplog.messages == [ + "Throttling for 123 seconds due to an unexpected error:", + "Throttling is over. Switching back to normal operations.", + ] + + +async def test_logging_when_deactivates_on_reentry(sleep, caplog): + caplog.set_level(0) + logger = logging.getLogger() + throttler = Throttler() + + sleep.return_value = 55 # simulated sleep time left + async with throttled(throttler=throttler, logger=logger, delays=[123]): + raise Exception() + + sleep.return_value = None # simulated sleep time left + async with throttled(throttler=throttler, logger=logger, delays=[...]): + pass + + assert caplog.messages == [ + "Throttling for 123 seconds due to an unexpected error:", + "Throttling is over. Switching back to normal operations.", + ]