Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unfreeze operators as soon as possible due to peers' expirations #581

Merged
merged 4 commits into from
Nov 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 28 additions & 17 deletions kopf/engines/peering.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import logging
import os
import random
from typing import Any, Dict, Iterable, Mapping, NewType, NoReturn, Optional, Union, cast
from typing import Any, Dict, Iterable, Mapping, NewType, NoReturn, Optional, cast

import iso8601

Expand All @@ -61,31 +61,31 @@ def __init__(
*,
identity: Identity,
priority: int = 0,
lastseen: Optional[Union[str, datetime.datetime]] = None,
lifetime: Union[int, datetime.timedelta] = 60,
lifetime: int = 60,
lastseen: Optional[str] = None,
**_: Any, # for the forward-compatibility with the new fields
):
super().__init__()
self.identity = identity
self.priority = priority
self.lifetime = (lifetime if isinstance(lifetime, datetime.timedelta) else
datetime.timedelta(seconds=int(lifetime)))
self.lastseen = (lastseen if isinstance(lastseen, datetime.datetime) else
iso8601.parse_date(lastseen) if lastseen is not None else
self.lifetime = datetime.timedelta(seconds=int(lifetime))
self.lastseen = (iso8601.parse_date(lastseen) if lastseen is not None else
datetime.datetime.utcnow())
self.lastseen = self.lastseen.replace(tzinfo=None) # only the naive utc -- for comparison
self.deadline = self.lastseen + self.lifetime
self.is_dead = self.deadline <= datetime.datetime.utcnow()

def __repr__(self) -> str:
return f"{self.__class__.__name__}(identity={self.identity}, priority={self.priority}, lastseen={self.lastseen}, lifetime={self.lifetime})"
clsname = self.__class__.__name__
options = ", ".join(f"{key!s}={val!r}" for key, val in self.as_dict().items())
return f"<{clsname} {self.identity}: {options}>"

def as_dict(self) -> Dict[str, Any]:
# Only the non-calculated and non-identifying fields.
return {
'priority': self.priority,
'lastseen': self.lastseen.isoformat(),
'lifetime': self.lifetime.total_seconds(),
'priority': int(self.priority),
'lifetime': int(self.lifetime.total_seconds()),
'lastseen': str(self.lastseen.isoformat()),
}


Expand Down Expand Up @@ -141,6 +141,18 @@ async def process_peering_event(
logger.info(f"Resuming operations after the freeze. Conflicting operators with the same priority are gone.")
await freeze_mode.turn_off()

# Either wait for external updates (and exit when they arrive), or until the blocking peers
# are expected to expire, and force the immediate re-evaluation by a certain change of self.
# This incurs an extra PATCH request besides usual keepalives, but in the complete silence
# from other peers that existed a moment earlier, this should not be a problem.
now = datetime.datetime.utcnow()
delay = max([0] + [(peer.deadline - now).total_seconds() for peer in same_peers + prio_peers])
if delay:
try:
await asyncio.wait_for(replenished.wait(), timeout=delay)
except asyncio.TimeoutError:
await touch(identity=identity, settings=settings, namespace=namespace)


async def keepalive(
*,
Expand All @@ -153,15 +165,14 @@ async def keepalive(
"""
try:
while True:
await touch(
identity=identity,
settings=settings,
namespace=namespace,
)
await touch(identity=identity, settings=settings, namespace=namespace)

# How often do we update. Keep limited to avoid k8s api flooding.
# Should be slightly less than the lifetime, enough for a patch request to finish.
await asyncio.sleep(max(1, int(settings.peering.lifetime - 10)))
# A little jitter is added to evenly distribute the keep-alives over time.
lifetime = settings.peering.lifetime
duration = min(lifetime, max(1, lifetime - random.randint(5, 10)))
await asyncio.sleep(max(1, duration))
finally:
try:
await asyncio.shield(touch(
Expand Down
5 changes: 5 additions & 0 deletions kopf/structs/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,13 @@ class PeeringSettings:
For how long (in seconds) the operator's record is considered actual
by other operators before assuming that the corresponding operator
is not functioning and the freeze mode should be re-evaluated.

The peered operators will update their records as long as they are running,
slightly faster than their records expires (5-10 seconds earlier).

Note that it is the lifetime of the current operator. For operators that
do not communicate their lifetime (broken?), it is always assumed to be
60 seconds regardless of this operator's configuration (a hard-coded value).
"""

mandatory: bool = False
Expand Down
112 changes: 98 additions & 14 deletions tests/peering/test_freeze_mode.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import dataclasses
from unittest.mock import Mock

import freezegun
import pytest
Expand All @@ -7,6 +9,27 @@
from kopf.structs import bodies, primitives


@dataclasses.dataclass(frozen=True, eq=False)
class K8sMocks:
patch_obj: Mock


@pytest.fixture(autouse=True)
def k8s_mocked(mocker, resp_mocker):
# We mock on the level of our own K8s API wrappers, not the K8s client.
return K8sMocks(
patch_obj=mocker.patch('kopf.clients.patching.patch_obj', return_value={}),
)


@pytest.fixture
async def replenished():
# Make sure that freeze-sleeps are not actually executed, i.e. exit instantly.
replenished = asyncio.Event()
replenished.set()
return replenished


@pytest.mark.parametrize('our_name, our_namespace, their_name, their_namespace', [
['our-name', 'our-namespace', 'their-name', 'their-namespace'],
['our-name', 'our-namespace', 'their-name', 'our-namespace'],
Expand All @@ -20,7 +43,8 @@
['our-name', None, 'their-name', 'our-namespace'],
])
async def test_other_peering_objects_are_ignored(
mocker, settings, our_name, our_namespace, their_name, their_namespace):
mocker, k8s_mocked, settings, replenished,
our_name, our_namespace, their_name, their_namespace):

status = mocker.Mock()
status.items.side_effect = Exception("This should not be called.")
Expand All @@ -31,22 +55,26 @@ async def test_other_peering_objects_are_ignored(
'status': status,
})

wait_for = mocker.patch('asyncio.wait_for')

settings.peering.name = our_name
await process_peering_event(
raw_event=event,
freeze_mode=primitives.Toggle(),
replenished=asyncio.Event(),
replenished=replenished,
autoclean=False,
identity='id',
settings=settings,
namespace=our_namespace,
)
assert not status.items.called
assert not k8s_mocked.patch_obj.called
assert wait_for.call_count == 0


@freezegun.freeze_time('2020-12-31T23:59:59.123456')
async def test_toggled_on_for_higher_priority_peer_when_initially_off(
caplog, assert_logs, settings):
mocker, k8s_mocked, replenished, caplog, assert_logs, settings):

event = bodies.RawEvent(
type='ADDED', # irrelevant
Expand All @@ -63,8 +91,8 @@ async def test_toggled_on_for_higher_priority_peer_when_initially_off(
settings.peering.name = 'name'
settings.peering.priority = 100

replenished = asyncio.Event()
freeze_mode = primitives.Toggle(False)
wait_for = mocker.patch('asyncio.wait_for')

caplog.set_level(0)
assert freeze_mode.is_off()
Expand All @@ -78,6 +106,9 @@ async def test_toggled_on_for_higher_priority_peer_when_initially_off(
settings=settings,
)
assert freeze_mode.is_on()
assert wait_for.call_count == 1
assert 9 < wait_for.call_args[1]['timeout'] < 10
assert not k8s_mocked.patch_obj.called
assert_logs(["Freezing operations in favour of"], prohibited=[
"Possibly conflicting operators",
"Freezing all operators, including self",
Expand All @@ -87,7 +118,7 @@ async def test_toggled_on_for_higher_priority_peer_when_initially_off(

@freezegun.freeze_time('2020-12-31T23:59:59.123456')
async def test_ignored_for_higher_priority_peer_when_already_on(
caplog, assert_logs, settings):
mocker, k8s_mocked, replenished, caplog, assert_logs, settings):

event = bodies.RawEvent(
type='ADDED', # irrelevant
Expand All @@ -104,8 +135,8 @@ async def test_ignored_for_higher_priority_peer_when_already_on(
settings.peering.name = 'name'
settings.peering.priority = 100

replenished = asyncio.Event()
freeze_mode = primitives.Toggle(True)
wait_for = mocker.patch('asyncio.wait_for')

caplog.set_level(0)
assert freeze_mode.is_on()
Expand All @@ -119,6 +150,9 @@ async def test_ignored_for_higher_priority_peer_when_already_on(
settings=settings,
)
assert freeze_mode.is_on()
assert wait_for.call_count == 1
assert 9 < wait_for.call_args[1]['timeout'] < 10
assert not k8s_mocked.patch_obj.called
assert_logs([], prohibited=[
"Possibly conflicting operators",
"Freezing all operators, including self",
Expand All @@ -129,7 +163,7 @@ async def test_ignored_for_higher_priority_peer_when_already_on(

@freezegun.freeze_time('2020-12-31T23:59:59.123456')
async def test_toggled_off_for_lower_priority_peer_when_initially_on(
caplog, assert_logs, settings):
mocker, k8s_mocked, replenished, caplog, assert_logs, settings):

event = bodies.RawEvent(
type='ADDED', # irrelevant
Expand All @@ -146,8 +180,8 @@ async def test_toggled_off_for_lower_priority_peer_when_initially_on(
settings.peering.name = 'name'
settings.peering.priority = 100

replenished = asyncio.Event()
freeze_mode = primitives.Toggle(True)
wait_for = mocker.patch('asyncio.wait_for')

caplog.set_level(0)
assert freeze_mode.is_on()
Expand All @@ -161,6 +195,8 @@ async def test_toggled_off_for_lower_priority_peer_when_initially_on(
settings=settings,
)
assert freeze_mode.is_off()
assert wait_for.call_count == 0
assert not k8s_mocked.patch_obj.called
assert_logs(["Resuming operations after the freeze"], prohibited=[
"Possibly conflicting operators",
"Freezing all operators, including self",
Expand All @@ -170,7 +206,7 @@ async def test_toggled_off_for_lower_priority_peer_when_initially_on(

@freezegun.freeze_time('2020-12-31T23:59:59.123456')
async def test_ignored_for_lower_priority_peer_when_already_off(
caplog, assert_logs, settings):
mocker, k8s_mocked, replenished, caplog, assert_logs, settings):

event = bodies.RawEvent(
type='ADDED', # irrelevant
Expand All @@ -187,8 +223,8 @@ async def test_ignored_for_lower_priority_peer_when_already_off(
settings.peering.name = 'name'
settings.peering.priority = 100

replenished = asyncio.Event()
freeze_mode = primitives.Toggle(False)
wait_for = mocker.patch('asyncio.wait_for')

caplog.set_level(0)
assert freeze_mode.is_off()
Expand All @@ -202,6 +238,8 @@ async def test_ignored_for_lower_priority_peer_when_already_off(
settings=settings,
)
assert freeze_mode.is_off()
assert wait_for.call_count == 0
assert not k8s_mocked.patch_obj.called
assert_logs([], prohibited=[
"Possibly conflicting operators",
"Freezing all operators, including self",
Expand All @@ -212,7 +250,7 @@ async def test_ignored_for_lower_priority_peer_when_already_off(

@freezegun.freeze_time('2020-12-31T23:59:59.123456')
async def test_toggled_on_for_same_priority_peer_when_initially_off(
caplog, assert_logs, settings):
mocker, k8s_mocked, replenished, caplog, assert_logs, settings):

event = bodies.RawEvent(
type='ADDED', # irrelevant
Expand All @@ -229,8 +267,8 @@ async def test_toggled_on_for_same_priority_peer_when_initially_off(
settings.peering.name = 'name'
settings.peering.priority = 100

replenished = asyncio.Event()
freeze_mode = primitives.Toggle(False)
wait_for = mocker.patch('asyncio.wait_for')

caplog.set_level(0)
assert freeze_mode.is_off()
Expand All @@ -244,6 +282,9 @@ async def test_toggled_on_for_same_priority_peer_when_initially_off(
settings=settings,
)
assert freeze_mode.is_on()
assert wait_for.call_count == 1
assert 9 < wait_for.call_args[1]['timeout'] < 10
assert not k8s_mocked.patch_obj.called
assert_logs([
"Possibly conflicting operators",
"Freezing all operators, including self",
Expand All @@ -255,7 +296,7 @@ async def test_toggled_on_for_same_priority_peer_when_initially_off(

@freezegun.freeze_time('2020-12-31T23:59:59.123456')
async def test_ignored_for_same_priority_peer_when_already_on(
caplog, assert_logs, settings):
mocker, k8s_mocked, replenished, caplog, assert_logs, settings):

event = bodies.RawEvent(
type='ADDED', # irrelevant
Expand All @@ -272,8 +313,8 @@ async def test_ignored_for_same_priority_peer_when_already_on(
settings.peering.name = 'name'
settings.peering.priority = 100

replenished = asyncio.Event()
freeze_mode = primitives.Toggle(True)
wait_for = mocker.patch('asyncio.wait_for')

caplog.set_level(0)
assert freeze_mode.is_on()
Expand All @@ -287,10 +328,53 @@ async def test_ignored_for_same_priority_peer_when_already_on(
settings=settings,
)
assert freeze_mode.is_on()
assert wait_for.call_count == 1
assert 9 < wait_for.call_args[1]['timeout'] < 10
assert not k8s_mocked.patch_obj.called
assert_logs([
"Possibly conflicting operators",
], prohibited=[
"Freezing all operators, including self",
"Freezing operations in favour of",
"Resuming operations after the freeze",
])


@freezegun.freeze_time('2020-12-31T23:59:59.123456')
@pytest.mark.parametrize('priority', [100, 101])
async def test_resumes_immediately_on_expiration_of_blocking_peers(
mocker, k8s_mocked, replenished, caplog, assert_logs, settings, priority):

event = bodies.RawEvent(
type='ADDED', # irrelevant
object={
'metadata': {'name': 'name', 'namespace': 'namespace'}, # for matching
'status': {
'higher-prio': {
'priority': priority,
'lifetime': 10,
'lastseen': '2020-12-31T23:59:59'
},
},
})
settings.peering.name = 'name'
settings.peering.priority = 100

freeze_mode = primitives.Toggle(True)
wait_for = mocker.patch('asyncio.wait_for', side_effect=asyncio.TimeoutError)

caplog.set_level(0)
assert freeze_mode.is_on()
await process_peering_event(
raw_event=event,
freeze_mode=freeze_mode,
replenished=replenished,
autoclean=False,
namespace='namespace',
identity='id',
settings=settings,
)
assert freeze_mode.is_on()
assert wait_for.call_count == 1
assert 9 < wait_for.call_args[1]['timeout'] < 10
assert k8s_mocked.patch_obj.called
Loading