diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 2efbd9a3..d89baeee 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -11,6 +11,7 @@ and [Historic GitHub Contributors](https://github.com/zalando-incubator/kopf/gra - [Daniel Middlecote](https://github.com/dlmiddlecote) - [Henning Jacobs](https://github.com/hjacobs) - [Ismail Kaboubi](https://github.com/smileisak) +- [Michael Narodovitch](https://github.com/michaelnarodovitch) - [Sergey Vasilyev](https://github.com/nolar) - [Soroosh Sarabadani](https://github.com/psycho-ir) - [Trond Hindenes](https://github.com/trondhindenes) diff --git a/kopf/clients/patching.py b/kopf/clients/patching.py index 66849c88..5ef49438 100644 --- a/kopf/clients/patching.py +++ b/kopf/clients/patching.py @@ -15,7 +15,7 @@ async def patch_obj( name: Optional[str] = None, body: Optional[bodies.Body] = None, context: Optional[auth.APIContext] = None, # injected by the decorator -) -> None: +) -> bodies.RawBody: """ Patch a resource of specific kind. @@ -25,6 +25,12 @@ async def patch_obj( Unlike the object listing, the namespaced call is always used for the namespaced resources, even if the operator serves the whole cluster (i.e. is not namespace-restricted). + + Returns the patched body. The patched body can be partial (status-only, + no-status, or empty) -- depending on whether there were fields in the body + or in the status to patch; if neither had fields for patching, the result + is an empty body. The result should only be used to check against the patch: + if there was nothing to patch, it does not matter if the fields are absent. """ if context is None: raise RuntimeError("API instance is not injected by the decorator.") @@ -47,24 +53,34 @@ async def patch_obj( body_patch = dict(patch) # shallow: for mutation of the top-level keys below. status_patch = body_patch.pop('status', None) if as_subresource else None + # Patch & reconstruct the actual body as reported by the server. The reconstructed body can be + # partial or empty -- if the body/status patches are empty. This is fine: it is only used + # to verify that the patched fields are matching the patch. No patch? No mismatch! + patched_body = bodies.RawBody() try: if body_patch: - await context.session.patch( + response = await context.session.patch( url=resource.get_url(server=context.server, namespace=namespace, name=name), headers={'Content-Type': 'application/merge-patch+json'}, json=body_patch, raise_for_status=True, ) + patched_body = await response.json() + if status_patch: - await context.session.patch( + response = await context.session.patch( url=resource.get_url(server=context.server, namespace=namespace, name=name, subresource='status' if as_subresource else None), headers={'Content-Type': 'application/merge-patch+json'}, json={'status': status_patch}, raise_for_status=True, ) + patched_body['status'] = await response.json() + except aiohttp.ClientResponseError as e: if e.status == 404: pass else: raise + + return patched_body diff --git a/kopf/reactor/daemons.py b/kopf/reactor/daemons.py index 9657529c..ae56b698 100644 --- a/kopf/reactor/daemons.py +++ b/kopf/reactor/daemons.py @@ -25,7 +25,6 @@ import warnings from typing import Collection, List, Mapping, MutableMapping, Sequence -from kopf.clients import patching from kopf.engines import loggers from kopf.reactor import causation, effects, handling, lifecycles from kopf.storage import states @@ -323,6 +322,8 @@ async def _runner( Note: synchronous daemons are awaited to the exit and postpone cancellation. The runner will not exit until the thread exits. See `invoke` for details. """ + stopper = cause.stopper + try: if isinstance(handler, handlers_.ResourceDaemonHandler): await _resource_daemon(settings=settings, handler=handler, cause=cause) @@ -335,7 +336,7 @@ async def _runner( # Prevent future re-spawns for those exited on their own, for no reason. # Only the filter-mismatching daemons can be re-spawned on future events. - if cause.stopper.reason == primitives.DaemonStoppingReason.NONE: + if stopper.reason == primitives.DaemonStoppingReason.NONE: memory.forever_stopped.add(handler.id) # Save the memory by not remembering the exited daemons (they may be never re-spawned). @@ -344,7 +345,7 @@ async def _runner( # Whatever happened, make sure the sync threads of asyncio threaded executor are notified: # in a hope that they will exit maybe some time later to free the OS/asyncio resources. # A possible case: operator is exiting and cancelling all "hung" non-root tasks, etc. - cause.stopper.set(reason=primitives.DaemonStoppingReason.DONE) + stopper.set(reason=primitives.DaemonStoppingReason.DONE) async def _resource_daemon( @@ -362,14 +363,18 @@ async def _resource_daemon( Few kinds of errors are suppressed, those expected from the daemons when they are cancelled due to the resource deletion. """ + resource = cause.resource + stopper = cause.stopper logger = cause.logger + patch = cause.patch + body = cause.body if handler.initial_delay is not None: await effects.sleep_or_wait(handler.initial_delay, cause.stopper) # Similar to activities (in-memory execution), but applies patches on every attempt. state = states.State.from_scratch(handlers=[handler]) - while not cause.stopper.is_set() and not state.done: + while not stopper.is_set() and not state.done: outcomes = await handling.execute_handlers_once( lifecycle=lifecycles.all_at_once, # there is only one anyway @@ -379,18 +384,15 @@ async def _resource_daemon( state=state, ) state = state.with_outcomes(outcomes) - states.deliver_results(outcomes=outcomes, patch=cause.patch) - - if cause.patch: - cause.logger.debug("Patching with: %r", cause.patch) - await patching.patch_obj(resource=cause.resource, patch=cause.patch, body=cause.body) - cause.patch.clear() + states.deliver_results(outcomes=outcomes, patch=patch) + await effects.patch_and_check(resource=resource, patch=patch, body=body, logger=logger) + patch.clear() # The in-memory sleep does not react to resource changes, but only to stopping. if state.delay: await effects.sleep_or_wait(state.delay, cause.stopper) - if cause.stopper.is_set(): + if stopper.is_set(): logger.debug(f"{handler} has exited on request and will not be retried or restarted.") else: logger.debug(f"{handler} has exited on its own and will not be retried or restarted.") @@ -424,13 +426,18 @@ async def _resource_timer( It is much easier to have an extra task which mostly sleeps, but calls the handling functions from time to time. """ + resource = cause.resource + stopper = cause.stopper + logger = cause.logger + patch = cause.patch + body = cause.body if handler.initial_delay is not None: - await effects.sleep_or_wait(handler.initial_delay, cause.stopper) + await effects.sleep_or_wait(handler.initial_delay, stopper) # Similar to activities (in-memory execution), but applies patches on every attempt. state = states.State.from_scratch(handlers=[handler]) - while not cause.stopper.is_set(): # NB: ignore state.done! it is checked below explicitly. + while not stopper.is_set(): # NB: ignore state.done! it is checked below explicitly. # Reset success/failure retry counters & timers if it has succeeded. Keep it if failed. # Every next invocation of a successful handler starts the retries from scratch (from zero). @@ -440,10 +447,10 @@ async def _resource_timer( # Both `now` and `last_seen_time` are moving targets: the last seen time is updated # on every watch-event received, and prolongs the sleep. The sleep is never shortened. if handler.idle is not None: - while not cause.stopper.is_set() and time.monotonic() - memory.idle_reset_time < handler.idle: + while not stopper.is_set() and time.monotonic() - memory.idle_reset_time < handler.idle: delay = memory.idle_reset_time + handler.idle - time.monotonic() - await effects.sleep_or_wait(delay, cause.stopper) - if cause.stopper.is_set(): + await effects.sleep_or_wait(delay, stopper) + if stopper.is_set(): continue # Remember the start time for the sharp timing and idle-time-waster below. @@ -458,18 +465,14 @@ async def _resource_timer( state=state, ) state = state.with_outcomes(outcomes) - states.deliver_results(outcomes=outcomes, patch=cause.patch) - - # Apply the accumulated patches after every invocation attempt (regardless of its outcome). - if cause.patch: - cause.logger.debug("Patching with: %r", cause.patch) - await patching.patch_obj(resource=cause.resource, patch=cause.patch, body=cause.body) - cause.patch.clear() + states.deliver_results(outcomes=outcomes, patch=patch) + await effects.patch_and_check(resource=resource, patch=patch, body=body, logger=logger) + patch.clear() # For temporary errors, override the schedule by the one provided by errors themselves. # It can be either a delay from TemporaryError, or a backoff for an arbitrary exception. if not state.done: - await effects.sleep_or_wait(state.delays, cause.stopper) + await effects.sleep_or_wait(state.delays, stopper) # For sharp timers, calculate how much time is left to fit the interval grid: # |-----|-----|-----|-----|-----|-----|---> (interval=5, sharp=True) @@ -477,19 +480,19 @@ async def _resource_timer( elif handler.interval is not None and handler.sharp: passed_duration = time.monotonic() - started remaining_delay = handler.interval - (passed_duration % handler.interval) - await effects.sleep_or_wait(remaining_delay, cause.stopper) + await effects.sleep_or_wait(remaining_delay, stopper) # For regular (non-sharp) timers, simply sleep from last exit to the next call: # |-----|-----|-----|-----|-----|-----|---> (interval=5, sharp=False) # [slow_handler].....[slow_handler].....[slow... elif handler.interval is not None: - await effects.sleep_or_wait(handler.interval, cause.stopper) + await effects.sleep_or_wait(handler.interval, stopper) # For idle-only no-interval timers, wait till the next change (i.e. idling reset). # NB: This will skip the handler in the same tact (1/64th of a second) even if changed. elif handler.idle is not None: while memory.idle_reset_time <= started: - await effects.sleep_or_wait(handler.idle, cause.stopper) + await effects.sleep_or_wait(handler.idle, stopper) # Only in case there are no intervals and idling, treat it as a one-shot handler. # This makes the handler practically meaningless, but technically possible. diff --git a/kopf/reactor/effects.py b/kopf/reactor/effects.py index dce7a4f4..3e8495f8 100644 --- a/kopf/reactor/effects.py +++ b/kopf/reactor/effects.py @@ -21,15 +21,23 @@ import asyncio import collections import datetime +import logging from typing import Collection, Optional, Union from kopf.clients import patching from kopf.engines import loggers -from kopf.structs import bodies, configuration, patches, primitives, resources +from kopf.structs import bodies, configuration, dicts, diffs, patches, primitives, resources # How often to wake up from the long sleep, to show liveness in the logs. WAITING_KEEPALIVE_INTERVAL = 10 * 60 +# K8s-managed fields that are removed completely when patched to an empty list/dict. +KNOWN_INCONSISTENCIES = ( + dicts.parse_field('metadata.annotations'), + dicts.parse_field('metadata.finalizers'), + dicts.parse_field('metadata.labels'), +) + async def apply( *, @@ -47,10 +55,8 @@ async def apply( if patch: # TODO: LATER: and the dummies are there (without additional methods?) settings.persistence.progress_storage.touch(body=body, patch=patch, value=None) - # Actually patch if it contained payload originally or after dummies removal. - if patch: - logger.debug("Patching with: %r", patch) - await patching.patch_obj(resource=resource, patch=patch, body=body) + # Actually patch if it was not empty originally or after the dummies removal. + await patch_and_check(resource=resource, patch=patch, body=body, logger=logger) # Sleep strictly after patching, never before -- to keep the status proper. # The patching above, if done, interrupts the sleep instantly, so we skip it at all. @@ -78,11 +84,43 @@ async def apply( else: # Any unique always-changing value will work; not necessary a timestamp. value = datetime.datetime.utcnow().isoformat() - touch_patch = patches.Patch() - settings.persistence.progress_storage.touch(body=body, patch=touch_patch, value=value) - if touch_patch: - logger.debug("Provoking reaction with: %r", touch_patch) - await patching.patch_obj(resource=resource, patch=touch_patch, body=body) + touch = patches.Patch() + settings.persistence.progress_storage.touch(body=body, patch=touch, value=value) + await patch_and_check(resource=resource, patch=touch, body=body, logger=logger) + + +async def patch_and_check( + *, + resource: resources.Resource, + body: bodies.Body, + patch: patches.Patch, + logger: Union[logging.Logger, logging.LoggerAdapter], +) -> None: + """ + Apply a patch and verify that it is applied correctly. + + The inconsistencies are checked only against what was in the patch. + Other unexpected changes in the body are ignored, including the system + fields, such as generations, resource versions, and other unrelated fields, + such as other statuses, spec, labels, annotations, etc. + + Selected false-positive inconsistencies are explicitly ignored + for K8s-managed fields, such as finalizers, labels or annotations: + whenever an empty list/dict is stored, such fields are completely removed. + For normal fields (e.g. in spec/status), an empty list/dict is still + a value and is persisted in the object and matches with the patch. + """ + if patch: + logger.debug(f"Patching with: {patch!r}") + resulting_body = await patching.patch_obj(resource=resource, patch=patch, body=body) + inconsistencies = diffs.diff(dict(patch), dict(resulting_body), scope=diffs.DiffScope.LEFT) + inconsistencies = diffs.Diff( + diffs.DiffItem(op, field, old, new) + for op, field, old, new in inconsistencies + if old or new or field not in KNOWN_INCONSISTENCIES + ) + if inconsistencies: + logger.warning(f"Patching failed with inconsistencies: {inconsistencies}") async def sleep_or_wait( diff --git a/kopf/structs/diffs.py b/kopf/structs/diffs.py index 1e99ede4..07397778 100644 --- a/kopf/structs/diffs.py +++ b/kopf/structs/diffs.py @@ -8,6 +8,26 @@ from kopf.structs import dicts +class DiffScope(enum.Flag): + """ + Scope limitation for the diffs' fields to be noticed or ignored. + + In the full-scoped diff (the default), both objects (diff source & target) + are treated equally important, and the diff is calculated from the left + to the right one for all fields. + + In the left-scoped diff, only the left object (diff source) is considered + important, and only the differences for the fields found in the left object + (source) are checked. Extra fields in the right object (target) are ignored. + + In the right-scoped diff, only the fields in the right object (diff target) + are scanned. Extra fields in the left object (diff source) are ignored. + """ + RIGHT = enum.auto() + LEFT = enum.auto() + FULL = LEFT | RIGHT + + class DiffOperation(str, enum.Enum): ADD = 'add' CHANGE = 'change' @@ -118,6 +138,8 @@ def diff_iter( a: Any, b: Any, path: dicts.FieldPath = (), + *, + scope: DiffScope = DiffScope.FULL, ) -> Iterator[DiffItem]: """ Calculate the diff between two dicts. @@ -148,12 +170,12 @@ def diff_iter( elif isinstance(a, collections.abc.Mapping): a_keys = frozenset(a.keys()) b_keys = frozenset(b.keys()) - for key in b_keys - a_keys: - yield from diff_iter(None, b[key], path=path+(key,)) - for key in a_keys - b_keys: - yield from diff_iter(a[key], None, path=path+(key,)) - for key in a_keys & b_keys: - yield from diff_iter(a[key], b[key], path=path+(key,)) + for key in (b_keys - a_keys if DiffScope.RIGHT in scope else ()): + yield from diff_iter(None, b[key], path=path+(key,), scope=scope) + for key in (a_keys - b_keys if DiffScope.LEFT in scope else ()): + yield from diff_iter(a[key], None, path=path+(key,), scope=scope) + for key in (a_keys & b_keys): + yield from diff_iter(a[key], b[key], path=path+(key,), scope=scope) else: yield DiffItem(DiffOperation.CHANGE, path, a, b) @@ -162,11 +184,13 @@ def diff( a: Any, b: Any, path: dicts.FieldPath = (), + *, + scope: DiffScope = DiffScope.FULL, ) -> Diff: """ Same as `diff`, but returns the whole tuple instead of iterator. """ - return Diff(diff_iter(a, b, path=path)) + return Diff(diff_iter(a, b, path=path, scope=scope)) EMPTY = diff(None, None) diff --git a/tests/diffs/test_calculation.py b/tests/diffs/test_calculation.py index 1f75c705..d5ca8a04 100644 --- a/tests/diffs/test_calculation.py +++ b/tests/diffs/test_calculation.py @@ -1,101 +1,133 @@ -from kopf.structs.diffs import diff +import pytest +from kopf.structs.diffs import DiffScope, diff -def test_none_for_old(): + +@pytest.mark.parametrize('scope', list(DiffScope)) +def test_none_for_old(scope): a = None b = object() - d = diff(a, b) + d = diff(a, b, scope=scope) assert d == (('add', (), None, b),) -def test_none_for_new(): +@pytest.mark.parametrize('scope', list(DiffScope)) +def test_none_for_new(scope): a = object() b = None - d = diff(a, b) + d = diff(a, b, scope=scope) assert d == (('remove', (), a, None),) -def test_nones_for_both(): +@pytest.mark.parametrize('scope', list(DiffScope)) +def test_nones_for_both(scope): a = None b = None - d = diff(a, b) + d = diff(a, b, scope=scope) assert d == () -def test_scalars_equal(): +@pytest.mark.parametrize('scope', list(DiffScope)) +def test_scalars_equal(scope): a = 100 b = 100 - d = diff(a, b) + d = diff(a, b, scope=scope) assert d == () -def test_scalars_unequal(): +@pytest.mark.parametrize('scope', list(DiffScope)) +def test_scalars_unequal(scope): a = 100 b = 200 - d = diff(a, b) + d = diff(a, b, scope=scope) assert d == (('change', (), 100, 200),) -def test_strings_equal(): +@pytest.mark.parametrize('scope', list(DiffScope)) +def test_strings_equal(scope): a = 'hello' b = 'hello' - d = diff(a, b) + d = diff(a, b, scope=scope) assert d == () -def test_strings_unequal(): +@pytest.mark.parametrize('scope', list(DiffScope)) +def test_strings_unequal(scope): a = 'hello' b = 'world' - d = diff(a, b) + d = diff(a, b, scope=scope) assert d == (('change', (), 'hello', 'world'),) -def test_lists_equal(): +@pytest.mark.parametrize('scope', list(DiffScope)) +def test_lists_equal(scope): a = [100, 200, 300] b = [100, 200, 300] - d = diff(a, b) + d = diff(a, b, scope=scope) assert d == () -def test_lists_unequal(): +@pytest.mark.parametrize('scope', list(DiffScope)) +def test_lists_unequal(scope): a = [100, 200, 300] b = [100, 666, 300] - d = diff(a, b) + d = diff(a, b, scope=scope) assert d == (('change', (), [100, 200, 300], [100, 666, 300]),) -def test_dicts_equal(): +@pytest.mark.parametrize('scope', list(DiffScope)) +def test_dicts_equal(scope): a = {'hello': 'world', 'key': 'val'} b = {'key': 'val', 'hello': 'world'} - d = diff(a, b) + d = diff(a, b, scope=scope) assert d == () -def test_dicts_with_keys_added(): +@pytest.mark.parametrize('scope', [DiffScope.FULL, DiffScope.RIGHT]) +def test_dicts_with_keys_added_and_noticed(scope): a = {'hello': 'world'} b = {'hello': 'world', 'key': 'val'} - d = diff(a, b) + d = diff(a, b, scope=scope) assert d == (('add', ('key',), None, 'val'),) -def test_dicts_with_keys_removed(): +@pytest.mark.parametrize('scope', [DiffScope.LEFT]) +def test_dicts_with_keys_added_but_ignored(scope): + a = {'hello': 'world'} + b = {'hello': 'world', 'key': 'val'} + d = diff(a, b, scope=scope) + assert d == () + + +@pytest.mark.parametrize('scope', [DiffScope.FULL, DiffScope.LEFT]) +def test_dicts_with_keys_removed_and_noticed(scope): a = {'hello': 'world', 'key': 'val'} b = {'hello': 'world'} - d = diff(a, b) + d = diff(a, b, scope=scope) assert d == (('remove', ('key',), 'val', None),) -def test_dicts_with_keys_changed(): +@pytest.mark.parametrize('scope', [DiffScope.RIGHT]) +def test_dicts_with_keys_removed_but_ignored(scope): + a = {'hello': 'world', 'key': 'val'} + b = {'hello': 'world'} + d = diff(a, b, scope=scope) + assert d == () + + +@pytest.mark.parametrize('scope', list(DiffScope)) +def test_dicts_with_keys_changed(scope): a = {'hello': 'world', 'key': 'old'} b = {'hello': 'world', 'key': 'new'} - d = diff(a, b) + d = diff(a, b, scope=scope) assert d == (('change', ('key',), 'old', 'new'),) -def test_dicts_with_subkeys_changed(): +@pytest.mark.parametrize('scope', list(DiffScope)) +def test_dicts_with_subkeys_changed(scope): a = {'main': {'hello': 'world', 'key': 'old'}} b = {'main': {'hello': 'world', 'key': 'new'}} - d = diff(a, b) + d = diff(a, b, scope=scope) assert d == (('change', ('main', 'key'), 'old', 'new'),) diff --git a/tests/handling/conftest.py b/tests/handling/conftest.py index f92069e6..6222a736 100644 --- a/tests/handling/conftest.py +++ b/tests/handling/conftest.py @@ -54,7 +54,7 @@ class K8sMocks: def k8s_mocked(mocker, resp_mocker): # We mock on the level of our own K8s API wrappers, not the K8s client. return K8sMocks( - patch_obj=mocker.patch('kopf.clients.patching.patch_obj'), + patch_obj=mocker.patch('kopf.clients.patching.patch_obj', return_value={}), post_event=mocker.patch('kopf.clients.events.post_event'), sleep_or_wait=mocker.patch('kopf.reactor.effects.sleep_or_wait', return_value=None), ) diff --git a/tests/k8s/test_patching.py b/tests/k8s/test_patching.py index 5216741c..707be751 100644 --- a/tests/k8s/test_patching.py +++ b/tests/k8s/test_patching.py @@ -2,6 +2,8 @@ import pytest from kopf.clients.patching import patch_obj +from kopf.structs.bodies import Body +from kopf.structs.patches import Patch @pytest.mark.resource_clustered # see `resp_mocker` @@ -11,7 +13,7 @@ async def test_by_name_clustered( patch_mock = resp_mocker(return_value=aiohttp.web.json_response({})) aresponses.add(hostname, resource.get_url(namespace=None, name='name1'), 'patch', patch_mock) - patch = {'x': 'y'} + patch = Patch({'x': 'y'}) await patch_obj(resource=resource, namespace=None, name='name1', patch=patch) assert patch_mock.called @@ -27,7 +29,7 @@ async def test_by_name_namespaced( patch_mock = resp_mocker(return_value=aiohttp.web.json_response({})) aresponses.add(hostname, resource.get_url(namespace='ns1', name='name1'), 'patch', patch_mock) - patch = {'x': 'y'} + patch = Patch({'x': 'y'}) await patch_obj(resource=resource, namespace='ns1', name='name1', patch=patch) assert patch_mock.called @@ -44,8 +46,8 @@ async def test_by_body_clustered( patch_mock = resp_mocker(return_value=aiohttp.web.json_response({})) aresponses.add(hostname, resource.get_url(namespace=None, name='name1'), 'patch', patch_mock) - patch = {'x': 'y'} - body = {'metadata': {'name': 'name1'}} + body = Body({'metadata': {'name': 'name1'}}) + patch = Patch({'x': 'y'}) await patch_obj(resource=resource, body=body, patch=patch) assert patch_mock.called @@ -61,8 +63,8 @@ async def test_by_body_namespaced( patch_mock = resp_mocker(return_value=aiohttp.web.json_response({})) aresponses.add(hostname, resource.get_url(namespace='ns1', name='name1'), 'patch', patch_mock) - patch = {'x': 'y'} - body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} + body = Body({'metadata': {'namespace': 'ns1', 'name': 'name1'}}) + patch = Patch({'x': 'y'}) await patch_obj(resource=resource, body=body, patch=patch) assert patch_mock.called @@ -75,16 +77,24 @@ async def test_by_body_namespaced( async def test_status_as_subresource_with_combined_payload( resp_mocker, aresponses, hostname, resource, version_api_with_substatus): + # Simulate Kopf's initial state and intention. + body = Body({'metadata': {'namespace': 'ns1', 'name': 'name1'}}) + patch = Patch({'spec': {'x': 'y'}, 'status': {'s': 't'}}) + + # Simulate K8s API's behaviour. Assume something extra is added remotely. + object_response = {'metadata': {'namespace': 'ns1', 'name': 'name1', 'extra': '123'}, + 'spec': {'x': 'y', 'extra': '456'}, + 'status': '...'} + status_response = {'s': 't', 'extra': '789'} + object_url = resource.get_url(namespace='ns1', name='name1') status_url = resource.get_url(namespace='ns1', name='name1', subresource='status') - object_patch_mock = resp_mocker(return_value=aiohttp.web.json_response({})) - status_patch_mock = resp_mocker(return_value=aiohttp.web.json_response({})) + object_patch_mock = resp_mocker(return_value=aiohttp.web.json_response(object_response)) + status_patch_mock = resp_mocker(return_value=aiohttp.web.json_response(status_response)) aresponses.add(hostname, object_url, 'patch', object_patch_mock) aresponses.add(hostname, status_url, 'patch', status_patch_mock) - patch = {'spec': {'x': 'y'}, 'status': {'s': 't'}} - body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} - await patch_obj(resource=resource, body=body, patch=patch) + reconstructed = await patch_obj(resource=resource, body=body, patch=patch) assert object_patch_mock.called assert object_patch_mock.call_count == 1 @@ -96,20 +106,32 @@ async def test_status_as_subresource_with_combined_payload( data = status_patch_mock.call_args_list[0][0][0].data # [callidx][args/kwargs][argidx] assert data == {'status': {'s': 't'}} + assert reconstructed == {'metadata': {'namespace': 'ns1', 'name': 'name1', 'extra': '123'}, + 'spec': {'x': 'y', 'extra': '456'}, + 'status': {'s': 't', 'extra': '789'}} -async def test_status_as_subresource_with_fields_only( + +async def test_status_as_subresource_with_object_fields_only( resp_mocker, aresponses, hostname, resource, version_api_with_substatus): + # Simulate Kopf's initial state and intention. + body = Body({'metadata': {'namespace': 'ns1', 'name': 'name1'}}) + patch = Patch({'spec': {'x': 'y'}}) + + # Simulate K8s API's behaviour. Assume something extra is added remotely. + object_response = {'metadata': {'namespace': 'ns1', 'name': 'name1', 'extra': '123'}, + 'spec': {'x': 'y', 'extra': '456'}, + 'status': '...'} + status_response = {'s': 't', 'extra': '789'} + object_url = resource.get_url(namespace='ns1', name='name1') status_url = resource.get_url(namespace='ns1', name='name1', subresource='status') - object_patch_mock = resp_mocker(return_value=aiohttp.web.json_response({})) - status_patch_mock = resp_mocker(return_value=aiohttp.web.json_response({})) + object_patch_mock = resp_mocker(return_value=aiohttp.web.json_response(object_response)) + status_patch_mock = resp_mocker(return_value=aiohttp.web.json_response(status_response)) aresponses.add(hostname, object_url, 'patch', object_patch_mock) aresponses.add(hostname, status_url, 'patch', status_patch_mock) - patch = {'spec': {'x': 'y'}} - body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} - await patch_obj(resource=resource, body=body, patch=patch) + reconstructed = await patch_obj(resource=resource, body=body, patch=patch) assert object_patch_mock.called assert object_patch_mock.call_count == 1 @@ -118,20 +140,32 @@ async def test_status_as_subresource_with_fields_only( data = object_patch_mock.call_args_list[0][0][0].data # [callidx][args/kwargs][argidx] assert data == {'spec': {'x': 'y'}} + assert reconstructed == {'metadata': {'namespace': 'ns1', 'name': 'name1', 'extra': '123'}, + 'spec': {'x': 'y', 'extra': '456'}, + 'status': '...'} -async def test_status_as_subresource_with_status_only( + +async def test_status_as_subresource_with_status_fields_only( resp_mocker, aresponses, hostname, resource, version_api_with_substatus): + # Simulate Kopf's initial state and intention. + body = Body({'metadata': {'namespace': 'ns1', 'name': 'name1'}}) + patch = Patch({'status': {'s': 't'}}) + + # Simulate K8s API's behaviour. Assume something extra is added remotely. + object_response = {'metadata': {'namespace': 'ns1', 'name': 'name1', 'extra': '123'}, + 'spec': {'x': 'y', 'extra': '456'}, + 'status': '...'} + status_response = {'s': 't', 'extra': '789'} + object_url = resource.get_url(namespace='ns1', name='name1') status_url = resource.get_url(namespace='ns1', name='name1', subresource='status') - object_patch_mock = resp_mocker(return_value=aiohttp.web.json_response({})) - status_patch_mock = resp_mocker(return_value=aiohttp.web.json_response({})) + object_patch_mock = resp_mocker(return_value=aiohttp.web.json_response(object_response)) + status_patch_mock = resp_mocker(return_value=aiohttp.web.json_response(status_response)) aresponses.add(hostname, object_url, 'patch', object_patch_mock) aresponses.add(hostname, status_url, 'patch', status_patch_mock) - patch = {'status': {'s': 't'}} - body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} - await patch_obj(resource=resource, body=body, patch=patch) + reconstructed = await patch_obj(resource=resource, body=body, patch=patch) assert not object_patch_mock.called assert status_patch_mock.called @@ -140,20 +174,30 @@ async def test_status_as_subresource_with_status_only( data = status_patch_mock.call_args_list[0][0][0].data # [callidx][args/kwargs][argidx] assert data == {'status': {'s': 't'}} + assert reconstructed == {'status': {'s': 't', 'extra': '789'}} + -async def test_status_as_body_field( +async def test_status_as_body_field_with_combined_payload( resp_mocker, aresponses, hostname, resource): + # Simulate Kopf's initial state and intention. + body = Body({'metadata': {'namespace': 'ns1', 'name': 'name1'}}) + patch = Patch({'spec': {'x': 'y'}, 'status': {'s': 't'}}) + + # Simulate K8s API's behaviour. Assume something extra is added remotely. + object_response = {'metadata': {'namespace': 'ns1', 'name': 'name1', 'extra': '123'}, + 'spec': {'x': 'y', 'extra': '456'}, + 'status': '...'} + status_response = {'s': 't', 'extra': '789'} + object_url = resource.get_url(namespace='ns1', name='name1') status_url = resource.get_url(namespace='ns1', name='name1', subresource='status') - object_patch_mock = resp_mocker(return_value=aiohttp.web.json_response({})) - status_patch_mock = resp_mocker(return_value=aiohttp.web.json_response({})) + object_patch_mock = resp_mocker(return_value=aiohttp.web.json_response(object_response)) + status_patch_mock = resp_mocker(return_value=aiohttp.web.json_response(status_response)) aresponses.add(hostname, object_url, 'patch', object_patch_mock) aresponses.add(hostname, status_url, 'patch', status_patch_mock) - patch = {'spec': {'x': 'y'}, 'status': {'s': 't'}} - body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} - await patch_obj(resource=resource, body=body, patch=patch) + reconstructed = await patch_obj(resource=resource, body=body, patch=patch) assert object_patch_mock.called assert object_patch_mock.call_count == 1 @@ -162,6 +206,10 @@ async def test_status_as_body_field( data = object_patch_mock.call_args_list[0][0][0].data # [callidx][args/kwargs][argidx] assert data == {'spec': {'x': 'y'}, 'status': {'s': 't'}} + assert reconstructed == {'metadata': {'namespace': 'ns1', 'name': 'name1', 'extra': '123'}, + 'spec': {'x': 'y', 'extra': '456'}, + 'status': '...'} + async def test_raises_when_body_conflicts_with_namespace( resp_mocker, aresponses, hostname, resource): @@ -219,7 +267,9 @@ async def test_ignores_absent_objects( patch = {'x': 'y'} body = {'metadata': {'namespace': namespace, 'name': 'name1'}} - await patch_obj(resource=resource, body=body, patch=patch) + reconstructed = await patch_obj(resource=resource, body=body, patch=patch) + + assert reconstructed == {} @pytest.mark.parametrize('namespace', [None, 'ns1'], ids=['without-namespace', 'with-namespace']) diff --git a/tests/reactor/test_patching_inconsistencies.py b/tests/reactor/test_patching_inconsistencies.py new file mode 100644 index 00000000..2fb42a6c --- /dev/null +++ b/tests/reactor/test_patching_inconsistencies.py @@ -0,0 +1,128 @@ +import logging + +import aiohttp.web +import pytest + +from kopf.engines.loggers import LocalObjectLogger +from kopf.reactor.effects import patch_and_check +from kopf.structs.bodies import Body +from kopf.structs.patches import Patch + +# Assume that the underlying patch_obj() is already tested with/without status as a sub-resource. +# Assume that the underlying diff() is already tested with left/right/full scopes and all values. +# Test ONLY the logging/warning on patch-vs-response inconsistencies here. + + +@pytest.mark.parametrize('patch, response', [ + + pytest.param({'spec': {'x': 'y'}, 'status': {'s': 't'}}, + {'spec': {'x': 'y'}, 'status': {'s': 't'}}, + id='response-exact'), + + pytest.param({'spec': {'x': 'y'}, 'status': {'s': 't'}}, + {'spec': {'x': 'y'}, 'status': {'s': 't'}, 'metadata': '...'}, + id='response-root-extra'), + + pytest.param({'spec': {'x': 'y'}, 'status': {'s': 't'}}, + {'spec': {'x': 'y', 'extra': '...'}, 'status': {'s': 't'}}, + id='response-spec-extra'), + + pytest.param({'spec': {'x': 'y'}, 'status': {'s': 't'}}, + {'spec': {'x': 'y'}, 'status': {'s': 't', 'extra': '...'}}, + id='response-status-extra'), + + pytest.param({'spec': {'x': None}, 'status': {'s': None}}, + {'spec': {}, 'status': {}}, + id='response-clean'), + + # False-positive inconsistencies for K8s-managed fields. + pytest.param({'metadata': {'annotations': {}}}, {'metadata': {}}, id='false-annotations'), + pytest.param({'metadata': {'finalizers': []}}, {'metadata': {}}, id='false-finalizers'), + pytest.param({'metadata': {'labels': {}}}, {'metadata': {}}, id='false-labels'), + +]) +async def test_patching_without_inconsistencies( + resource, settings, caplog, assert_logs, version_api, + aresponses, hostname, resp_mocker, + patch, response): + caplog.set_level(logging.DEBUG) + + url = resource.get_url(namespace='ns1', name='name1') + patch_mock = resp_mocker(return_value=aiohttp.web.json_response(response)) + aresponses.add(hostname, url, 'patch', patch_mock) + + body = Body({'metadata': {'namespace': 'ns1', 'name': 'name1'}}) + logger = LocalObjectLogger(body=body, settings=settings) + await patch_and_check( + resource=resource, + body=body, + patch=Patch(patch), + logger=logger, + ) + + assert_logs([ + "Patching with:", + ], prohibited=[ + "Patching failed with inconsistencies:", + ]) + + +@pytest.mark.parametrize('patch, response', [ + + pytest.param({'spec': {'x': 'y'}, 'status': {'s': 't'}}, + {}, + id='response-empty'), + + pytest.param({'spec': {'x': 'y'}, 'status': {'s': 't'}}, + {'spec': {'x': 'y'}}, + id='response-status-lost'), + + pytest.param({'spec': {'x': 'y'}, 'status': {'s': 't'}}, + {'status': {'s': 't'}}, + id='response-spec-lost'), + + pytest.param({'spec': {'x': 'y'}, 'status': {'s': 't'}}, + {'spec': {'x': 'not-y'}, 'status': {'s': 't'}}, + id='response-spec-altered'), + + pytest.param({'spec': {'x': 'y'}, 'status': {'s': 't'}}, + {'spec': {'x': 'y'}, 'status': {'s': 'not-t'}}, + id='response-status-altered'), + + pytest.param({'spec': {'x': None}, 'status': {'s': None}}, + {'spec': {'x': 'y'}, 'status': {}}, + id='response-spec-undeleted'), + + pytest.param({'spec': {'x': None}, 'status': {'s': None}}, + {'spec': {}, 'status': {'s': 't'}}, + id='response-status-undeleted'), + + # True-positive inconsistencies for K8s-managed fields with possible false-positives. + pytest.param({'metadata': {'annotations': {'x': 'y'}}}, {'metadata': {}}, id='true-annotations'), + pytest.param({'metadata': {'finalizers': ['x', 'y']}}, {'metadata': {}}, id='true-finalizers'), + pytest.param({'metadata': {'labels': {'x': 'y'}}}, {'metadata': {}}, id='true-labels'), + +]) +async def test_patching_with_inconsistencies( + resource, settings, caplog, assert_logs, version_api, + aresponses, hostname, resp_mocker, + patch, response): + caplog.set_level(logging.DEBUG) + + url = resource.get_url(namespace='ns1', name='name1') + patch_mock = resp_mocker(return_value=aiohttp.web.json_response(response)) + aresponses.add(hostname, url, 'patch', patch_mock) + + body = Body({'metadata': {'namespace': 'ns1', 'name': 'name1'}}) + logger = LocalObjectLogger(body=body, settings=settings) + await patch_and_check( + resource=resource, + body=body, + patch=Patch(patch), + logger=logger, + ) + + assert_logs([ + "Patching with:", + "Patching failed with inconsistencies:", + ])