Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process events instantly and consistently, stop skipping the events due to "batching" #844

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,59 @@ The default is 0.1 seconds (nearly instant, but not flooding).
settings.watching.server_timeout = 10 * 60


.. _consistency:

Consistency
===========

Generally, a resource's events and updates streamed from the Kubernetes API
are processed as soon as possible, with no delays or skipping. However,
high-level change-detection handlers (on-creation/resume/update/deletion)
require a consistent state of the resource. _Consistency_ means that all
patches applied by Kopf itself have arrived back via the watch-stream.
If Kopf did not patch the resource recently, it is consistent by definition.

The _inconsistent_ states can be seen in relatively rare circumstances
on slow networks (operator⟺apiservers) or under high load, especially
when the operator or a third party patches the resource on their own.
In those cases, the framework can misinterpret the intermediate states
and perform double-processing (i.e. double handler execution).

To prevent this, all state-dependent handlers are postponed until
the consistency is reached via one of the following two ways:

* The expected resource version from the PATCH API operation arrives
via the watch-stream of the resource within the specified time window.
* The expected resource version from the PATCH API operation does not arrive
via the watch-stream within the specified time window, in which case
the consistency is assumed (implied) and the processing continues as if
the version has arrived, possibly causing the mentioned side-effects.

The time window is measured relative to the time of the last ``PATCH`` call.
The timeout should be long enough to assume that if the expected resource
version did not arrive within the specified time, it will never arrive.

.. code-block:: python

import kopf

@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_):
settings.persistence.consistency_timeout = 10

The default value (5 seconds) aims to the safest scenario out of the box.

The value of ``0`` will effectively disable the consistency tracking
and declare all resource states as consistent -- even if they are not.
Use this with care -- e.g. with self-made persistence storages instead of
Kopf's annotations (see :ref:`progress-storing` and :ref:`diffbase-storing`).

The consistency timeout does not affect low-level handlers with no persistence,
such as ``@kopf.on.event``, ``@kopf.index``, ``@kopf.daemon``, ``@kopf.timer``
-- these handlers are invoked for each and every watch-event with no delay
(if they match the :doc:`filters <filters>`, of course).


Finalizers
==========

Expand Down
12 changes: 9 additions & 3 deletions examples/01-minimal/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@


@kopf.on.create('kopfexamples')
def create_fn(spec, **kwargs):
print(f"And here we are! Creating: {spec}")
return {'message': 'hello world'} # will be the new status
@kopf.on.update('kopfexamples')
def create_fn(meta, spec, reason, logger, **kwargs):
rv = meta.get('resourceVersion')
logger.warning(f">>> {rv=} And here we are! {reason=}: {spec}")


# @kopf.on.create('kopfexamples')
# def create_fn2(spec, **kwargs):
# print(f"And here we are! Creating2: {spec}")
41 changes: 4 additions & 37 deletions examples/05-handlers/example.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,8 @@
import kopf


@kopf.on.resume('kopfexamples')
def resume_fn_1(**kwargs):
print(f'RESUMED 1st')


@kopf.on.create('kopfexamples')
def create_fn_1(**kwargs):
print('CREATED 1st')


@kopf.on.resume('kopfexamples')
def resume_fn_2(**kwargs):
print(f'RESUMED 2nd')


@kopf.on.create('kopfexamples')
def create_fn_2(**kwargs):
print('CREATED 2nd')


@kopf.on.update('kopfexamples')
def update_fn(old, new, diff, **kwargs):
print('UPDATED')


@kopf.on.delete('kopfexamples')
def delete_fn_1(**kwargs):
print('DELETED 1st')


@kopf.on.delete('kopfexamples')
def delete_fn_2(**kwargs):
print('DELETED 2nd')


@kopf.on.field('kopfexamples', field='spec.field')
def field_fn(old, new, **kwargs):
print(f'FIELD CHANGED: {old} -> {new}')
def delete_fn(retry, logger, **_):
if retry < 3:
raise kopf.TemporaryError("no yet", delay=5)
logger.info('DELETED')
30 changes: 24 additions & 6 deletions kopf/_cogs/configs/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import concurrent.futures
import dataclasses
import logging
import warnings
from typing import Iterable, Optional, Union

from kopf._cogs.configs import diffbase, progress
Expand Down Expand Up @@ -187,6 +188,7 @@ class WatchingSettings:
"""


# TODO: is it now WorkerSettings/MultiplexerSettings?
@dataclasses.dataclass
class BatchingSettings:
"""
Expand All @@ -204,12 +206,6 @@ class BatchingSettings:
How soon an idle worker is exited and garbage-collected if no events arrive.
"""

batch_window: float = 0.1
"""
How fast/slow does a worker deplete the queue when an event is received.
All events arriving within this window will be ignored except the last one.
"""

exit_timeout: float = 2.0
"""
How soon a worker is cancelled when the parent watcher is going to exit.
Expand All @@ -223,6 +219,21 @@ class BatchingSettings:
For more information on error throttling, see :ref:`error-throttling`.
"""

_batch_window: float = 0.1 # deprecated

@property
def batch_window(self) -> float:
""" Deprecated and affects nothing. """
warnings.warn("Time-based event batching was removed. Please stop configuring it.",
DeprecationWarning)
return self._batch_window

@batch_window.setter
def batch_window(self, value: float) -> None:
warnings.warn("Time-based event batching was removed. Please stop configuring it.",
DeprecationWarning)
self._batch_window = value


@dataclasses.dataclass
class ScanningSettings:
Expand Down Expand Up @@ -374,6 +385,13 @@ class PersistenceSettings:
How the resource's essence (non-technical, contentful fields) are stored.
"""

consistency_timeout: float = 5.0
"""
For how long a patched resource version is awaited (seconds).

See :ref:`consistency` for detailed explanation.
"""


@dataclasses.dataclass
class BackgroundSettings:
Expand Down
14 changes: 8 additions & 6 deletions kopf/_core/actions/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"""
import asyncio
import datetime
from typing import Collection, Optional
from typing import Collection, Optional, Tuple

from kopf._cogs.aiokits import aiotime
from kopf._cogs.clients import patching
Expand Down Expand Up @@ -49,15 +49,15 @@ async def apply(
delays: Collection[float],
logger: loggers.ObjectLogger,
stream_pressure: Optional[asyncio.Event] = None, # None for tests
) -> bool:
) -> Tuple[bool, Optional[str]]:
delay = min(delays) if delays else None

# Delete dummies on occasion, but don't trigger special patching for them [discussable].
if patch: # TODO: LATER: and the dummies are there (without additional methods?)
settings.persistence.progress_storage.touch(body=body, patch=patch, value=None)

# Actually patch if it was not empty originally or after the dummies removal.
await patch_and_check(
resource_version = await patch_and_check(
settings=settings,
resource=resource,
logger=logger,
Expand Down Expand Up @@ -92,7 +92,7 @@ async def apply(
value = datetime.datetime.utcnow().isoformat()
touch = patches.Patch()
settings.persistence.progress_storage.touch(body=body, patch=touch, value=value)
await patch_and_check(
resource_version = await patch_and_check(
settings=settings,
resource=resource,
logger=logger,
Expand All @@ -101,7 +101,7 @@ async def apply(
)
elif not patch: # no patch/touch and no delay
applied = True
return applied
return applied, resource_version


async def patch_and_check(
Expand All @@ -111,7 +111,7 @@ async def patch_and_check(
body: bodies.Body,
patch: patches.Patch,
logger: typedefs.Logger,
) -> None:
) -> Optional[str]: # patched resource version
"""
Apply a patch and verify that it is applied correctly.

Expand Down Expand Up @@ -146,3 +146,5 @@ async def patch_and_check(
logger.debug(f"Patching was skipped: the object does not exist anymore.")
elif inconsistencies:
logger.warning(f"Patching failed with inconsistencies: {inconsistencies}")
return (resulting_body or {}).get('metadata', {}).get('resourceVersion')
return None
1 change: 1 addition & 0 deletions kopf/_core/engines/peering.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ async def process_peering_event(
# Must be accepted whether used or not -- as passed by watcher()/worker().
resource_indexed: Optional[aiotoggles.Toggle] = None, # None for tests & observation
operator_indexed: Optional[aiotoggles.ToggleSet] = None, # None for tests & observation
consistency_time: Optional[float] = None, # None for tests & observation
) -> None:
"""
Handle a single update of the peers by us or by other operators.
Expand Down
2 changes: 2 additions & 0 deletions kopf/_core/reactor/observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ async def process_discovered_namespace_event(
stream_pressure: Optional[asyncio.Event] = None, # None for tests
resource_indexed: Optional[aiotoggles.Toggle] = None, # None for tests & observation
operator_indexed: Optional[aiotoggles.ToggleSet] = None, # None for tests & observation
consistency_time: Optional[float] = None, # None for tests & observation
) -> None:
if raw_event['type'] is None:
return
Expand All @@ -169,6 +170,7 @@ async def process_discovered_resource_event(
stream_pressure: Optional[asyncio.Event] = None, # None for tests
resource_indexed: Optional[aiotoggles.Toggle] = None, # None for tests & observation
operator_indexed: Optional[aiotoggles.ToggleSet] = None, # None for tests & observation
consistency_time: Optional[float] = None, # None for tests & observation
) -> None:
# Ignore the initial listing, as all custom resources were already noticed by API listing.
# This prevents numerous unneccessary API requests at the the start of the operator.
Expand Down
Loading