Skip to content

Commit

Permalink
Merge pull request #527 from nolar/warn-on-patch-inconsistencies
Browse files Browse the repository at this point in the history
Warn on inconsistencies of a patch and a patched object
  • Loading branch information
nolar authored Sep 3, 2020
2 parents b826073 + ff18ca2 commit 2e1df0c
Show file tree
Hide file tree
Showing 9 changed files with 399 additions and 107 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and [Historic GitHub Contributors](https://github.com/zalando-incubator/kopf/gra
- [Daniel Middlecote](https://github.com/dlmiddlecote)
- [Henning Jacobs](https://github.com/hjacobs)
- [Ismail Kaboubi](https://github.com/smileisak)
- [Michael Narodovitch](https://github.com/michaelnarodovitch)
- [Sergey Vasilyev](https://github.com/nolar)
- [Soroosh Sarabadani](https://github.com/psycho-ir)
- [Trond Hindenes](https://github.com/trondhindenes)
Expand Down
22 changes: 19 additions & 3 deletions kopf/clients/patching.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async def patch_obj(
name: Optional[str] = None,
body: Optional[bodies.Body] = None,
context: Optional[auth.APIContext] = None, # injected by the decorator
) -> None:
) -> bodies.RawBody:
"""
Patch a resource of specific kind.
Expand All @@ -25,6 +25,12 @@ async def patch_obj(
Unlike the object listing, the namespaced call is always
used for the namespaced resources, even if the operator serves
the whole cluster (i.e. is not namespace-restricted).
Returns the patched body. The patched body can be partial (status-only,
no-status, or empty) -- depending on whether there were fields in the body
or in the status to patch; if neither had fields for patching, the result
is an empty body. The result should only be used to check against the patch:
if there was nothing to patch, it does not matter if the fields are absent.
"""
if context is None:
raise RuntimeError("API instance is not injected by the decorator.")
Expand All @@ -47,24 +53,34 @@ async def patch_obj(
body_patch = dict(patch) # shallow: for mutation of the top-level keys below.
status_patch = body_patch.pop('status', None) if as_subresource else None

# Patch & reconstruct the actual body as reported by the server. The reconstructed body can be
# partial or empty -- if the body/status patches are empty. This is fine: it is only used
# to verify that the patched fields are matching the patch. No patch? No mismatch!
patched_body = bodies.RawBody()
try:
if body_patch:
await context.session.patch(
response = await context.session.patch(
url=resource.get_url(server=context.server, namespace=namespace, name=name),
headers={'Content-Type': 'application/merge-patch+json'},
json=body_patch,
raise_for_status=True,
)
patched_body = await response.json()

if status_patch:
await context.session.patch(
response = await context.session.patch(
url=resource.get_url(server=context.server, namespace=namespace, name=name,
subresource='status' if as_subresource else None),
headers={'Content-Type': 'application/merge-patch+json'},
json={'status': status_patch},
raise_for_status=True,
)
patched_body['status'] = await response.json()

except aiohttp.ClientResponseError as e:
if e.status == 404:
pass
else:
raise

return patched_body
57 changes: 30 additions & 27 deletions kopf/reactor/daemons.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import warnings
from typing import Collection, List, Mapping, MutableMapping, Sequence

from kopf.clients import patching
from kopf.engines import loggers
from kopf.reactor import causation, effects, handling, lifecycles
from kopf.storage import states
Expand Down Expand Up @@ -323,6 +322,8 @@ async def _runner(
Note: synchronous daemons are awaited to the exit and postpone cancellation.
The runner will not exit until the thread exits. See `invoke` for details.
"""
stopper = cause.stopper

try:
if isinstance(handler, handlers_.ResourceDaemonHandler):
await _resource_daemon(settings=settings, handler=handler, cause=cause)
Expand All @@ -335,7 +336,7 @@ async def _runner(

# Prevent future re-spawns for those exited on their own, for no reason.
# Only the filter-mismatching daemons can be re-spawned on future events.
if cause.stopper.reason == primitives.DaemonStoppingReason.NONE:
if stopper.reason == primitives.DaemonStoppingReason.NONE:
memory.forever_stopped.add(handler.id)

# Save the memory by not remembering the exited daemons (they may be never re-spawned).
Expand All @@ -344,7 +345,7 @@ async def _runner(
# Whatever happened, make sure the sync threads of asyncio threaded executor are notified:
# in a hope that they will exit maybe some time later to free the OS/asyncio resources.
# A possible case: operator is exiting and cancelling all "hung" non-root tasks, etc.
cause.stopper.set(reason=primitives.DaemonStoppingReason.DONE)
stopper.set(reason=primitives.DaemonStoppingReason.DONE)


async def _resource_daemon(
Expand All @@ -362,14 +363,18 @@ async def _resource_daemon(
Few kinds of errors are suppressed, those expected from the daemons when
they are cancelled due to the resource deletion.
"""
resource = cause.resource
stopper = cause.stopper
logger = cause.logger
patch = cause.patch
body = cause.body

if handler.initial_delay is not None:
await effects.sleep_or_wait(handler.initial_delay, cause.stopper)

# Similar to activities (in-memory execution), but applies patches on every attempt.
state = states.State.from_scratch(handlers=[handler])
while not cause.stopper.is_set() and not state.done:
while not stopper.is_set() and not state.done:

outcomes = await handling.execute_handlers_once(
lifecycle=lifecycles.all_at_once, # there is only one anyway
Expand All @@ -379,18 +384,15 @@ async def _resource_daemon(
state=state,
)
state = state.with_outcomes(outcomes)
states.deliver_results(outcomes=outcomes, patch=cause.patch)

if cause.patch:
cause.logger.debug("Patching with: %r", cause.patch)
await patching.patch_obj(resource=cause.resource, patch=cause.patch, body=cause.body)
cause.patch.clear()
states.deliver_results(outcomes=outcomes, patch=patch)
await effects.patch_and_check(resource=resource, patch=patch, body=body, logger=logger)
patch.clear()

# The in-memory sleep does not react to resource changes, but only to stopping.
if state.delay:
await effects.sleep_or_wait(state.delay, cause.stopper)

if cause.stopper.is_set():
if stopper.is_set():
logger.debug(f"{handler} has exited on request and will not be retried or restarted.")
else:
logger.debug(f"{handler} has exited on its own and will not be retried or restarted.")
Expand Down Expand Up @@ -424,13 +426,18 @@ async def _resource_timer(
It is much easier to have an extra task which mostly sleeps,
but calls the handling functions from time to time.
"""
resource = cause.resource
stopper = cause.stopper
logger = cause.logger
patch = cause.patch
body = cause.body

if handler.initial_delay is not None:
await effects.sleep_or_wait(handler.initial_delay, cause.stopper)
await effects.sleep_or_wait(handler.initial_delay, stopper)

# Similar to activities (in-memory execution), but applies patches on every attempt.
state = states.State.from_scratch(handlers=[handler])
while not cause.stopper.is_set(): # NB: ignore state.done! it is checked below explicitly.
while not stopper.is_set(): # NB: ignore state.done! it is checked below explicitly.

# Reset success/failure retry counters & timers if it has succeeded. Keep it if failed.
# Every next invocation of a successful handler starts the retries from scratch (from zero).
Expand All @@ -440,10 +447,10 @@ async def _resource_timer(
# Both `now` and `last_seen_time` are moving targets: the last seen time is updated
# on every watch-event received, and prolongs the sleep. The sleep is never shortened.
if handler.idle is not None:
while not cause.stopper.is_set() and time.monotonic() - memory.idle_reset_time < handler.idle:
while not stopper.is_set() and time.monotonic() - memory.idle_reset_time < handler.idle:
delay = memory.idle_reset_time + handler.idle - time.monotonic()
await effects.sleep_or_wait(delay, cause.stopper)
if cause.stopper.is_set():
await effects.sleep_or_wait(delay, stopper)
if stopper.is_set():
continue

# Remember the start time for the sharp timing and idle-time-waster below.
Expand All @@ -458,38 +465,34 @@ async def _resource_timer(
state=state,
)
state = state.with_outcomes(outcomes)
states.deliver_results(outcomes=outcomes, patch=cause.patch)

# Apply the accumulated patches after every invocation attempt (regardless of its outcome).
if cause.patch:
cause.logger.debug("Patching with: %r", cause.patch)
await patching.patch_obj(resource=cause.resource, patch=cause.patch, body=cause.body)
cause.patch.clear()
states.deliver_results(outcomes=outcomes, patch=patch)
await effects.patch_and_check(resource=resource, patch=patch, body=body, logger=logger)
patch.clear()

# For temporary errors, override the schedule by the one provided by errors themselves.
# It can be either a delay from TemporaryError, or a backoff for an arbitrary exception.
if not state.done:
await effects.sleep_or_wait(state.delays, cause.stopper)
await effects.sleep_or_wait(state.delays, stopper)

# For sharp timers, calculate how much time is left to fit the interval grid:
# |-----|-----|-----|-----|-----|-----|---> (interval=5, sharp=True)
# [slow_handler]....[slow_handler]....[slow...
elif handler.interval is not None and handler.sharp:
passed_duration = time.monotonic() - started
remaining_delay = handler.interval - (passed_duration % handler.interval)
await effects.sleep_or_wait(remaining_delay, cause.stopper)
await effects.sleep_or_wait(remaining_delay, stopper)

# For regular (non-sharp) timers, simply sleep from last exit to the next call:
# |-----|-----|-----|-----|-----|-----|---> (interval=5, sharp=False)
# [slow_handler].....[slow_handler].....[slow...
elif handler.interval is not None:
await effects.sleep_or_wait(handler.interval, cause.stopper)
await effects.sleep_or_wait(handler.interval, stopper)

# For idle-only no-interval timers, wait till the next change (i.e. idling reset).
# NB: This will skip the handler in the same tact (1/64th of a second) even if changed.
elif handler.idle is not None:
while memory.idle_reset_time <= started:
await effects.sleep_or_wait(handler.idle, cause.stopper)
await effects.sleep_or_wait(handler.idle, stopper)

# Only in case there are no intervals and idling, treat it as a one-shot handler.
# This makes the handler practically meaningless, but technically possible.
Expand Down
58 changes: 48 additions & 10 deletions kopf/reactor/effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,23 @@
import asyncio
import collections
import datetime
import logging
from typing import Collection, Optional, Union

from kopf.clients import patching
from kopf.engines import loggers
from kopf.structs import bodies, configuration, patches, primitives, resources
from kopf.structs import bodies, configuration, dicts, diffs, patches, primitives, resources

# How often to wake up from the long sleep, to show liveness in the logs.
WAITING_KEEPALIVE_INTERVAL = 10 * 60

# K8s-managed fields that are removed completely when patched to an empty list/dict.
KNOWN_INCONSISTENCIES = (
dicts.parse_field('metadata.annotations'),
dicts.parse_field('metadata.finalizers'),
dicts.parse_field('metadata.labels'),
)


async def apply(
*,
Expand All @@ -47,10 +55,8 @@ async def apply(
if patch: # TODO: LATER: and the dummies are there (without additional methods?)
settings.persistence.progress_storage.touch(body=body, patch=patch, value=None)

# Actually patch if it contained payload originally or after dummies removal.
if patch:
logger.debug("Patching with: %r", patch)
await patching.patch_obj(resource=resource, patch=patch, body=body)
# Actually patch if it was not empty originally or after the dummies removal.
await patch_and_check(resource=resource, patch=patch, body=body, logger=logger)

# Sleep strictly after patching, never before -- to keep the status proper.
# The patching above, if done, interrupts the sleep instantly, so we skip it at all.
Expand Down Expand Up @@ -78,11 +84,43 @@ async def apply(
else:
# Any unique always-changing value will work; not necessary a timestamp.
value = datetime.datetime.utcnow().isoformat()
touch_patch = patches.Patch()
settings.persistence.progress_storage.touch(body=body, patch=touch_patch, value=value)
if touch_patch:
logger.debug("Provoking reaction with: %r", touch_patch)
await patching.patch_obj(resource=resource, patch=touch_patch, body=body)
touch = patches.Patch()
settings.persistence.progress_storage.touch(body=body, patch=touch, value=value)
await patch_and_check(resource=resource, patch=touch, body=body, logger=logger)


async def patch_and_check(
*,
resource: resources.Resource,
body: bodies.Body,
patch: patches.Patch,
logger: Union[logging.Logger, logging.LoggerAdapter],
) -> None:
"""
Apply a patch and verify that it is applied correctly.
The inconsistencies are checked only against what was in the patch.
Other unexpected changes in the body are ignored, including the system
fields, such as generations, resource versions, and other unrelated fields,
such as other statuses, spec, labels, annotations, etc.
Selected false-positive inconsistencies are explicitly ignored
for K8s-managed fields, such as finalizers, labels or annotations:
whenever an empty list/dict is stored, such fields are completely removed.
For normal fields (e.g. in spec/status), an empty list/dict is still
a value and is persisted in the object and matches with the patch.
"""
if patch:
logger.debug(f"Patching with: {patch!r}")
resulting_body = await patching.patch_obj(resource=resource, patch=patch, body=body)
inconsistencies = diffs.diff(dict(patch), dict(resulting_body), scope=diffs.DiffScope.LEFT)
inconsistencies = diffs.Diff(
diffs.DiffItem(op, field, old, new)
for op, field, old, new in inconsistencies
if old or new or field not in KNOWN_INCONSISTENCIES
)
if inconsistencies:
logger.warning(f"Patching failed with inconsistencies: {inconsistencies}")


async def sleep_or_wait(
Expand Down
38 changes: 31 additions & 7 deletions kopf/structs/diffs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,26 @@
from kopf.structs import dicts


class DiffScope(enum.Flag):
"""
Scope limitation for the diffs' fields to be noticed or ignored.
In the full-scoped diff (the default), both objects (diff source & target)
are treated equally important, and the diff is calculated from the left
to the right one for all fields.
In the left-scoped diff, only the left object (diff source) is considered
important, and only the differences for the fields found in the left object
(source) are checked. Extra fields in the right object (target) are ignored.
In the right-scoped diff, only the fields in the right object (diff target)
are scanned. Extra fields in the left object (diff source) are ignored.
"""
RIGHT = enum.auto()
LEFT = enum.auto()
FULL = LEFT | RIGHT


class DiffOperation(str, enum.Enum):
ADD = 'add'
CHANGE = 'change'
Expand Down Expand Up @@ -118,6 +138,8 @@ def diff_iter(
a: Any,
b: Any,
path: dicts.FieldPath = (),
*,
scope: DiffScope = DiffScope.FULL,
) -> Iterator[DiffItem]:
"""
Calculate the diff between two dicts.
Expand Down Expand Up @@ -148,12 +170,12 @@ def diff_iter(
elif isinstance(a, collections.abc.Mapping):
a_keys = frozenset(a.keys())
b_keys = frozenset(b.keys())
for key in b_keys - a_keys:
yield from diff_iter(None, b[key], path=path+(key,))
for key in a_keys - b_keys:
yield from diff_iter(a[key], None, path=path+(key,))
for key in a_keys & b_keys:
yield from diff_iter(a[key], b[key], path=path+(key,))
for key in (b_keys - a_keys if DiffScope.RIGHT in scope else ()):
yield from diff_iter(None, b[key], path=path+(key,), scope=scope)
for key in (a_keys - b_keys if DiffScope.LEFT in scope else ()):
yield from diff_iter(a[key], None, path=path+(key,), scope=scope)
for key in (a_keys & b_keys):
yield from diff_iter(a[key], b[key], path=path+(key,), scope=scope)
else:
yield DiffItem(DiffOperation.CHANGE, path, a, b)

Expand All @@ -162,11 +184,13 @@ def diff(
a: Any,
b: Any,
path: dicts.FieldPath = (),
*,
scope: DiffScope = DiffScope.FULL,
) -> Diff:
"""
Same as `diff`, but returns the whole tuple instead of iterator.
"""
return Diff(diff_iter(a, b, path=path))
return Diff(diff_iter(a, b, path=path, scope=scope))


EMPTY = diff(None, None)
Loading

0 comments on commit 2e1df0c

Please sign in to comment.