Skip to content

Commit

Permalink
Classify handler- and processing states by purpose
Browse files Browse the repository at this point in the history
This fix is important only for the states restored from storage with specific handlers referenced: those handlers should be taken into account when purging the state (#557), but not for `.done` & `.delays` — these ones must be calculated only for the handlers currently in scope.

For log messages with counts of succeeded and failed handlers (#540), only the handlers relevant to the current cause should be counted. However, if there are irrelevant handlers, it means that one cause was interrupted (intercepted, overridden) by another: e.g., if deletion has happened while a creation/update/resume was being processed.

Technically, now, without the fix, this merges two unrelated causes and their handlers, since, from Kopf's point of view, the irrelevant handlers look the same as the relevant but filtered out handlers (e.g. by labels). To work around this, we introduce "purposes" of the handlers and persist them in the handlers' states.

For compatibility, if the purpose is not stored (`None`), treat it as a catch-all value, thus reproducing the same behaviour as before the fix. This will only be used if the operator is upgraded in the middle of the processing (e.g. due to temporary errors or intentional delays). All new handling cycles will be stored with the purpose. In case of a rollback, the purpose will be ignored, but also overwritten back to `None` by the old versions of Kopf. This is fine.

---

An example case:

```python
import kopf

@kopf.on.create('zalando.org', 'v1', 'kopfexamples')
def create_fn1(**_):
    pass

@kopf.on.create('zalando.org', 'v1', 'kopfexamples')
def create_fn2(**_):
    raise kopf.TemporaryError("temporary failure", delay=5)

@kopf.on.delete('zalando.org', 'v1', 'kopfexamples')
def delete_fn1(**_):
    pass

@kopf.on.delete('zalando.org', 'v1', 'kopfexamples')
def delete_fn2(**_):
    pass
```

```
kubectl create -f examples/obj.yaml && sleep 2 && kubectl delete -f examples/obj.yaml
```

Here, on the object deletion, the state will be restored with 4 handlers: `create_fn1` & `create_fn2`& `delete_fn1` & `delete_fn2`. Since `create_fn2` was delayed and not finished, then, without the fix, it would yield its own delay and will prevent the `.done` from achieving `True` — while it is irrelevant anymore, and only `delete_fn1` & `delete_fn2` should be considered. However, all four should be purged from annotations when done.

Now, with this fix, the logs will report interruption of creation and the beginning of deletion — each with its own numbers.

```
Creation is in progress: …
Sleeping for 4.866814 seconds for the delayed handlers.
…
Sleeping was interrupted by new changes, 3.3 seconds left.
Creation is superseded by deletion: 1 succeeded; 0 failed; 1 left.
Deletion is in progress: …
…
Deletion is processed: 2 succeeded; 0 failed.
```

---

Note that only some transitions are possible due to external reasons, not all of them:

* creation/update/resume → deletion (when marked for deletion, the pending handlers are abandoned).
* resume → update (when changed; the resuming handlers are mixed into the updating process).

Other transitions within the creation/update/resume group are impossible due to their nature, except when initiated by the framework itself.

A transition back from deletion to creation/update/resume is impossible in practice, but would work otherwise.
  • Loading branch information
nolar committed Dec 10, 2020
1 parent c5086f0 commit d898c70
Show file tree
Hide file tree
Showing 6 changed files with 396 additions and 32 deletions.
2 changes: 1 addition & 1 deletion kopf/reactor/handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async def execute(
cause_handlers = subregistry.get_handlers(cause=cause)
storage = settings.persistence.progress_storage
state = states.State.from_storage(body=cause.body, storage=storage, handlers=owned_handlers)
state = state.with_handlers(cause_handlers)
state = state.with_purpose(cause.reason).with_handlers(cause_handlers)
outcomes = await execute_handlers_once(
lifecycle=lifecycle,
settings=settings,
Expand Down
38 changes: 30 additions & 8 deletions kopf/reactor/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,16 +337,38 @@ async def process_resource_changing_cause(
# Regular causes invoke the handlers.
if cause.reason in handlers_.HANDLER_REASONS:
title = handlers_.TITLES.get(cause.reason, repr(cause.reason))
logger.debug(f"{title.capitalize()} event: %r", body)
if cause.diff and cause.old is not None and cause.new is not None:
logger.debug(f"{title.capitalize()} diff: %r", cause.diff)

resource_registry = registry.resource_changing_handlers[cause.resource]
cause_handlers = resource_registry.get_handlers(cause=cause)
owned_handlers = resource_registry.get_all_handlers()
cause_handlers = resource_registry.get_handlers(cause=cause)
storage = settings.persistence.progress_storage
state = states.State.from_storage(body=cause.body, storage=storage, handlers=owned_handlers)
state = state.with_handlers(cause_handlers)
state = state.with_purpose(cause.reason).with_handlers(cause_handlers)

# Report the causes that have been superseded (intercepted, overridden) by the current one.
# The mix-in causes (i.e. resuming) is re-purposed if its handlers are still selected.
# To the next cycle, all extras are purged or re-purposed, so the message does not repeat.
for extra_reason, counters in state.extras.items(): # usually 0..1 items, rarely 2+.
extra_title = handlers_.TITLES.get(extra_reason, repr(extra_reason))
logger.info(f"{extra_title.capitalize()} is superseded by {title.lower()}: "
f"{counters.success} succeeded; "
f"{counters.failure} failed; "
f"{counters.running} left to the moment.")
state = state.with_purpose(purpose=cause.reason, handlers=cause_handlers)

# Purge the now-irrelevant handlers if they were not re-purposed (extras are recalculated!).
# The current cause continues afterwards, and overrides its own pre-purged handler states.
# TODO: purge only the handlers that fell out of current purpose; but it is not critical
if state.extras:
state.purge(body=cause.body, patch=cause.patch,
storage=storage, handlers=owned_handlers)

# Inform on the current cause/event on every processing cycle. Even if there are
# no handlers -- to show what has happened and why the diff-base is patched.
logger.debug(f"{title.capitalize()} event: %r", body)
if cause.diff and cause.old is not None and cause.new is not None:
logger.debug(f"{title.capitalize()} diff: %r", cause.diff)

if cause_handlers:
outcomes = await handling.execute_handlers_once(
lifecycle=lifecycle,
Expand All @@ -360,10 +382,10 @@ async def process_resource_changing_cause(
states.deliver_results(outcomes=outcomes, patch=cause.patch)

if state.done:
success_count, failure_count = state.counts
counters = state.counts # calculate only once
logger.info(f"{title.capitalize()} event is processed: "
f"{success_count} succeeded; "
f"{failure_count} failed.")
f"{counters.success} succeeded; "
f"{counters.failure} failed.")
state.purge(body=cause.body, patch=cause.patch,
storage=storage, handlers=owned_handlers)

Expand Down
1 change: 1 addition & 0 deletions kopf/storage/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class ProgressRecord(TypedDict, total=True):
started: Optional[str]
stopped: Optional[str]
delayed: Optional[str]
purpose: Optional[str]
retries: Optional[int]
success: Optional[bool]
failure: Optional[bool]
Expand Down
85 changes: 72 additions & 13 deletions kopf/storage/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
import copy
import dataclasses
import datetime
from typing import Any, Collection, Dict, Iterable, Iterator, Mapping, Optional, Tuple, overload
from typing import Any, Collection, Dict, Iterable, Iterator, \
Mapping, NamedTuple, Optional, overload

from kopf.storage import progress
from kopf.structs import bodies, callbacks, handlers as handlers_, patches
Expand Down Expand Up @@ -52,6 +53,7 @@ class HandlerState:
started: Optional[datetime.datetime] = None # None means this information was lost.
stopped: Optional[datetime.datetime] = None # None means it is still running (e.g. delayed).
delayed: Optional[datetime.datetime] = None # None means it is finished (succeeded/failed).
purpose: Optional[handlers_.Reason] = None # None is a catch-all marker for upgrades/rollbacks.
retries: int = 0
success: bool = False
failure: bool = False
Expand All @@ -60,9 +62,10 @@ class HandlerState:
_origin: Optional[progress.ProgressRecord] = None # to check later if it has actually changed.

@classmethod
def from_scratch(cls) -> "HandlerState":
def from_scratch(cls, *, purpose: Optional[handlers_.Reason] = None) -> "HandlerState":
return cls(
started=datetime.datetime.utcnow(),
purpose=purpose,
)

@classmethod
Expand All @@ -71,6 +74,7 @@ def from_storage(cls, __d: progress.ProgressRecord) -> "HandlerState":
started=_datetime_fromisoformat(__d.get('started')) or datetime.datetime.utcnow(),
stopped=_datetime_fromisoformat(__d.get('stopped')),
delayed=_datetime_fromisoformat(__d.get('delayed')),
purpose=handlers_.Reason(__d.get('purpose')) if __d.get('purpose') else None,
retries=__d.get('retries') or 0,
success=__d.get('success') or False,
failure=__d.get('failure') or False,
Expand All @@ -84,6 +88,7 @@ def for_storage(self) -> progress.ProgressRecord:
started=None if self.started is None else _datetime_toisoformat(self.started),
stopped=None if self.stopped is None else _datetime_toisoformat(self.stopped),
delayed=None if self.delayed is None else _datetime_toisoformat(self.delayed),
purpose=None if self.purpose is None else str(self.purpose.value),
retries=None if self.retries is None else int(self.retries),
success=None if self.success is None else bool(self.success),
failure=None if self.failure is None else bool(self.failure),
Expand All @@ -95,13 +100,20 @@ def as_in_storage(self) -> Mapping[str, Any]:
# Nones are not stored by Kubernetes, so we filter them out for comparison.
return {key: val for key, val in self.for_storage().items() if val is not None}

def with_purpose(
self,
purpose: Optional[handlers_.Reason],
) -> "HandlerState":
return dataclasses.replace(self, purpose=purpose)

def with_outcome(
self,
outcome: HandlerOutcome,
) -> "HandlerState":
now = datetime.datetime.utcnow()
cls = type(self)
return cls(
purpose=self.purpose,
started=self.started if self.started else now,
stopped=self.stopped if self.stopped else now if outcome.final else None,
delayed=now + datetime.timedelta(seconds=outcome.delay) if outcome.delay is not None else None,
Expand Down Expand Up @@ -133,6 +145,12 @@ def runtime(self) -> datetime.timedelta:
return now - (self.started if self.started else now)


class StateCounters(NamedTuple):
success: int
failure: int
running: int


class State(Mapping[handlers_.HandlerId, HandlerState]):
"""
A state of selected handlers, as persisted in the object's status.
Expand All @@ -149,9 +167,12 @@ class State(Mapping[handlers_.HandlerId, HandlerState]):
def __init__(
self,
__src: Mapping[handlers_.HandlerId, HandlerState],
*,
purpose: Optional[handlers_.Reason] = None,
):
super().__init__()
self._states = dict(__src)
self.purpose = purpose

@classmethod
def from_scratch(cls) -> "State":
Expand All @@ -173,17 +194,27 @@ def from_storage(
handler_states[handler_id] = HandlerState.from_storage(content)
return cls(handler_states)

def with_purpose(
self,
purpose: Optional[handlers_.Reason],
handlers: Iterable[handlers_.BaseHandler] = (), # to be re-purposed
) -> "State":
handler_states: Dict[handlers_.HandlerId, HandlerState] = dict(self)
for handler in handlers:
handler_states[handler.id] = handler_states[handler.id].with_purpose(purpose)
cls = type(self)
return cls(handler_states, purpose=purpose)

def with_handlers(
self,
handlers: Iterable[handlers_.BaseHandler],
) -> "State":
handler_ids = {handler.id for handler in handlers}
handler_states: Dict[handlers_.HandlerId, HandlerState] = dict(self)
for handler_id in handler_ids:
if handler_id not in handler_states:
handler_states[handler_id] = HandlerState.from_scratch()
for handler in handlers:
if handler.id not in handler_states:
handler_states[handler.id] = HandlerState.from_scratch(purpose=self.purpose)
cls = type(self)
return cls(handler_states)
return cls(handler_states, purpose=self.purpose)

def with_outcomes(
self,
Expand All @@ -198,7 +229,7 @@ def with_outcomes(
handler_id: (handler_state if handler_id not in outcomes else
handler_state.with_outcome(outcomes[handler_id]))
for handler_id, handler_state in self.items()
})
}, purpose=self.purpose)

def store(
self,
Expand Down Expand Up @@ -245,13 +276,39 @@ def __getitem__(self, item: handlers_.HandlerId) -> HandlerState:
@property
def done(self) -> bool:
# In particular, no handlers means that it is "done" even before doing.
return all(handler_state.finished for handler_state in self._states.values())
return all(
handler_state.finished for handler_state in self._states.values()
if self.purpose is None or handler_state.purpose is None
or handler_state.purpose == self.purpose
)

@property
def extras(self) -> Mapping[handlers_.Reason, StateCounters]:
return {
reason: StateCounters(
success=len([1 for handler_state in self._states.values()
if handler_state.purpose == reason and handler_state.success]),
failure=len([1 for handler_state in self._states.values()
if handler_state.purpose == reason and handler_state.failure]),
running=len([1 for handler_state in self._states.values()
if handler_state.purpose == reason and not handler_state.finished]),
)
for reason in handlers_.HANDLER_REASONS
if self.purpose is not None and reason != self.purpose
if any(handler_state.purpose == reason for handler_state in self._states.values())
}

@property
def counts(self) -> Tuple[int, int]:
return (
len([1 for handler_state in self._states.values() if handler_state.success]),
len([1 for handler_state in self._states.values() if handler_state.failure]),
def counts(self) -> StateCounters:
purposeful_states = [
handler_state for handler_state in self._states.values()
if self.purpose is None or handler_state.purpose is None
or handler_state.purpose == self.purpose
]
return StateCounters(
success=len([1 for handler_state in purposeful_states if handler_state.success]),
failure=len([1 for handler_state in purposeful_states if handler_state.failure]),
running=len([1 for handler_state in purposeful_states if not handler_state.finished]),
)

@property
Expand All @@ -273,6 +330,8 @@ def delays(self) -> Collection[float]:
max(0, (handler_state.delayed - now).total_seconds()) if handler_state.delayed else 0
for handler_state in self._states.values()
if not handler_state.finished
if self.purpose is None or handler_state.purpose is None
or handler_state.purpose == self.purpose
]


Expand Down
46 changes: 46 additions & 0 deletions tests/handling/test_cause_logging.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import asyncio
import datetime
import logging

import freezegun
import pytest

import kopf
from kopf.reactor.processing import process_resource_event
from kopf.storage.progress import StatusProgressStorage
from kopf.structs.containers import ResourceMemories
from kopf.structs.handlers import ALL_REASONS, HANDLER_REASONS, Reason

Expand Down Expand Up @@ -91,3 +94,46 @@ async def test_diffs_not_logged_if_absent(registry, settings, resource, handlers
], prohibited=[
" diff: "
])



# Timestamps: time zero (0), before (B), after (A), and time zero+1s (1).
TS0 = datetime.datetime(2020, 12, 31, 23, 59, 59, 123456)
TS1_ISO = '2021-01-01T00:00:00.123456'


@pytest.mark.parametrize('cause_types', [
# All combinations except for same-to-same (it is not an "extra" then).
(a, b) for a in HANDLER_REASONS for b in HANDLER_REASONS if a != b
])
@freezegun.freeze_time(TS0)
async def test_supersession_is_logged(
registry, settings, resource, handlers, cause_types, cause_mock, caplog, assert_logs):
caplog.set_level(logging.DEBUG)

settings.persistence.progress_storage = StatusProgressStorage()
body = {'status': {'kopf': {'progress': {
'create_fn': {'purpose': cause_types[0]},
'update_fn': {'purpose': cause_types[0]},
'resume_fn': {'purpose': cause_types[0]},
'delete_fn': {'purpose': cause_types[0]},
}}}}

cause_mock.reason = cause_types[1]
event_type = None if cause_types[1] == Reason.RESUME else 'irrelevant'

await process_resource_event(
lifecycle=kopf.lifecycles.all_at_once,
registry=registry,
settings=settings,
resource=resource,
memories=ResourceMemories(),
raw_event={'type': event_type, 'object': body},
replenished=asyncio.Event(),
event_queue=asyncio.Queue(),
)
assert_logs([
"(Creation|Updating|Resuming|Deletion) is superseded by (creation|updating|resuming|deletion): ",
"(Creation|Updating|Resuming|Deletion) is in progress: ",
"(Creation|Updating|Resuming|Deletion) is processed: ",
])
Loading

0 comments on commit d898c70

Please sign in to comment.