diff --git a/kopf/engines/peering.py b/kopf/engines/peering.py index 46c942ea..ed8e0157 100644 --- a/kopf/engines/peering.py +++ b/kopf/engines/peering.py @@ -35,7 +35,7 @@ import logging import os import random -from typing import Any, Dict, Iterable, Mapping, NewType, NoReturn, Optional, Union, cast +from typing import Any, Dict, Iterable, Mapping, NewType, NoReturn, Optional, cast import iso8601 @@ -61,31 +61,31 @@ def __init__( *, identity: Identity, priority: int = 0, - lastseen: Optional[Union[str, datetime.datetime]] = None, - lifetime: Union[int, datetime.timedelta] = 60, + lifetime: int = 60, + lastseen: Optional[str] = None, **_: Any, # for the forward-compatibility with the new fields ): super().__init__() self.identity = identity self.priority = priority - self.lifetime = (lifetime if isinstance(lifetime, datetime.timedelta) else - datetime.timedelta(seconds=int(lifetime))) - self.lastseen = (lastseen if isinstance(lastseen, datetime.datetime) else - iso8601.parse_date(lastseen) if lastseen is not None else + self.lifetime = datetime.timedelta(seconds=int(lifetime)) + self.lastseen = (iso8601.parse_date(lastseen) if lastseen is not None else datetime.datetime.utcnow()) self.lastseen = self.lastseen.replace(tzinfo=None) # only the naive utc -- for comparison self.deadline = self.lastseen + self.lifetime self.is_dead = self.deadline <= datetime.datetime.utcnow() def __repr__(self) -> str: - return f"{self.__class__.__name__}(identity={self.identity}, priority={self.priority}, lastseen={self.lastseen}, lifetime={self.lifetime})" + clsname = self.__class__.__name__ + options = ", ".join(f"{key!s}={val!r}" for key, val in self.as_dict().items()) + return f"<{clsname} {self.identity}: {options}>" def as_dict(self) -> Dict[str, Any]: # Only the non-calculated and non-identifying fields. return { - 'priority': self.priority, - 'lastseen': self.lastseen.isoformat(), - 'lifetime': self.lifetime.total_seconds(), + 'priority': int(self.priority), + 'lifetime': int(self.lifetime.total_seconds()), + 'lastseen': str(self.lastseen.isoformat()), } @@ -141,6 +141,18 @@ async def process_peering_event( logger.info(f"Resuming operations after the freeze. Conflicting operators with the same priority are gone.") await freeze_mode.turn_off() + # Either wait for external updates (and exit when they arrive), or until the blocking peers + # are expected to expire, and force the immediate re-evaluation by a certain change of self. + # This incurs an extra PATCH request besides usual keepalives, but in the complete silence + # from other peers that existed a moment earlier, this should not be a problem. + now = datetime.datetime.utcnow() + delay = max([0] + [(peer.deadline - now).total_seconds() for peer in same_peers + prio_peers]) + if delay: + try: + await asyncio.wait_for(replenished.wait(), timeout=delay) + except asyncio.TimeoutError: + await touch(identity=identity, settings=settings, namespace=namespace) + async def keepalive( *, @@ -153,15 +165,14 @@ async def keepalive( """ try: while True: - await touch( - identity=identity, - settings=settings, - namespace=namespace, - ) + await touch(identity=identity, settings=settings, namespace=namespace) # How often do we update. Keep limited to avoid k8s api flooding. # Should be slightly less than the lifetime, enough for a patch request to finish. - await asyncio.sleep(max(1, int(settings.peering.lifetime - 10))) + # A little jitter is added to evenly distribute the keep-alives over time. + lifetime = settings.peering.lifetime + duration = min(lifetime, max(1, lifetime - random.randint(5, 10))) + await asyncio.sleep(max(1, duration)) finally: try: await asyncio.shield(touch( diff --git a/kopf/structs/configuration.py b/kopf/structs/configuration.py index 21cd243f..ea125df1 100644 --- a/kopf/structs/configuration.py +++ b/kopf/structs/configuration.py @@ -117,8 +117,13 @@ class PeeringSettings: For how long (in seconds) the operator's record is considered actual by other operators before assuming that the corresponding operator is not functioning and the freeze mode should be re-evaluated. + The peered operators will update their records as long as they are running, slightly faster than their records expires (5-10 seconds earlier). + + Note that it is the lifetime of the current operator. For operators that + do not communicate their lifetime (broken?), it is always assumed to be + 60 seconds regardless of this operator's configuration (a hard-coded value). """ mandatory: bool = False diff --git a/tests/peering/test_freeze_mode.py b/tests/peering/test_freeze_mode.py index c5c608f4..5cf49937 100644 --- a/tests/peering/test_freeze_mode.py +++ b/tests/peering/test_freeze_mode.py @@ -1,4 +1,6 @@ import asyncio +import dataclasses +from unittest.mock import Mock import freezegun import pytest @@ -7,6 +9,27 @@ from kopf.structs import bodies, primitives +@dataclasses.dataclass(frozen=True, eq=False) +class K8sMocks: + patch_obj: Mock + + +@pytest.fixture(autouse=True) +def k8s_mocked(mocker, resp_mocker): + # We mock on the level of our own K8s API wrappers, not the K8s client. + return K8sMocks( + patch_obj=mocker.patch('kopf.clients.patching.patch_obj', return_value={}), + ) + + +@pytest.fixture +async def replenished(): + # Make sure that freeze-sleeps are not actually executed, i.e. exit instantly. + replenished = asyncio.Event() + replenished.set() + return replenished + + @pytest.mark.parametrize('our_name, our_namespace, their_name, their_namespace', [ ['our-name', 'our-namespace', 'their-name', 'their-namespace'], ['our-name', 'our-namespace', 'their-name', 'our-namespace'], @@ -20,7 +43,8 @@ ['our-name', None, 'their-name', 'our-namespace'], ]) async def test_other_peering_objects_are_ignored( - mocker, settings, our_name, our_namespace, their_name, their_namespace): + mocker, k8s_mocked, settings, replenished, + our_name, our_namespace, their_name, their_namespace): status = mocker.Mock() status.items.side_effect = Exception("This should not be called.") @@ -31,22 +55,26 @@ async def test_other_peering_objects_are_ignored( 'status': status, }) + wait_for = mocker.patch('asyncio.wait_for') + settings.peering.name = our_name await process_peering_event( raw_event=event, freeze_mode=primitives.Toggle(), - replenished=asyncio.Event(), + replenished=replenished, autoclean=False, identity='id', settings=settings, namespace=our_namespace, ) assert not status.items.called + assert not k8s_mocked.patch_obj.called + assert wait_for.call_count == 0 @freezegun.freeze_time('2020-12-31T23:59:59.123456') async def test_toggled_on_for_higher_priority_peer_when_initially_off( - caplog, assert_logs, settings): + mocker, k8s_mocked, replenished, caplog, assert_logs, settings): event = bodies.RawEvent( type='ADDED', # irrelevant @@ -63,8 +91,8 @@ async def test_toggled_on_for_higher_priority_peer_when_initially_off( settings.peering.name = 'name' settings.peering.priority = 100 - replenished = asyncio.Event() freeze_mode = primitives.Toggle(False) + wait_for = mocker.patch('asyncio.wait_for') caplog.set_level(0) assert freeze_mode.is_off() @@ -78,6 +106,9 @@ async def test_toggled_on_for_higher_priority_peer_when_initially_off( settings=settings, ) assert freeze_mode.is_on() + assert wait_for.call_count == 1 + assert 9 < wait_for.call_args[1]['timeout'] < 10 + assert not k8s_mocked.patch_obj.called assert_logs(["Freezing operations in favour of"], prohibited=[ "Possibly conflicting operators", "Freezing all operators, including self", @@ -87,7 +118,7 @@ async def test_toggled_on_for_higher_priority_peer_when_initially_off( @freezegun.freeze_time('2020-12-31T23:59:59.123456') async def test_ignored_for_higher_priority_peer_when_already_on( - caplog, assert_logs, settings): + mocker, k8s_mocked, replenished, caplog, assert_logs, settings): event = bodies.RawEvent( type='ADDED', # irrelevant @@ -104,8 +135,8 @@ async def test_ignored_for_higher_priority_peer_when_already_on( settings.peering.name = 'name' settings.peering.priority = 100 - replenished = asyncio.Event() freeze_mode = primitives.Toggle(True) + wait_for = mocker.patch('asyncio.wait_for') caplog.set_level(0) assert freeze_mode.is_on() @@ -119,6 +150,9 @@ async def test_ignored_for_higher_priority_peer_when_already_on( settings=settings, ) assert freeze_mode.is_on() + assert wait_for.call_count == 1 + assert 9 < wait_for.call_args[1]['timeout'] < 10 + assert not k8s_mocked.patch_obj.called assert_logs([], prohibited=[ "Possibly conflicting operators", "Freezing all operators, including self", @@ -129,7 +163,7 @@ async def test_ignored_for_higher_priority_peer_when_already_on( @freezegun.freeze_time('2020-12-31T23:59:59.123456') async def test_toggled_off_for_lower_priority_peer_when_initially_on( - caplog, assert_logs, settings): + mocker, k8s_mocked, replenished, caplog, assert_logs, settings): event = bodies.RawEvent( type='ADDED', # irrelevant @@ -146,8 +180,8 @@ async def test_toggled_off_for_lower_priority_peer_when_initially_on( settings.peering.name = 'name' settings.peering.priority = 100 - replenished = asyncio.Event() freeze_mode = primitives.Toggle(True) + wait_for = mocker.patch('asyncio.wait_for') caplog.set_level(0) assert freeze_mode.is_on() @@ -161,6 +195,8 @@ async def test_toggled_off_for_lower_priority_peer_when_initially_on( settings=settings, ) assert freeze_mode.is_off() + assert wait_for.call_count == 0 + assert not k8s_mocked.patch_obj.called assert_logs(["Resuming operations after the freeze"], prohibited=[ "Possibly conflicting operators", "Freezing all operators, including self", @@ -170,7 +206,7 @@ async def test_toggled_off_for_lower_priority_peer_when_initially_on( @freezegun.freeze_time('2020-12-31T23:59:59.123456') async def test_ignored_for_lower_priority_peer_when_already_off( - caplog, assert_logs, settings): + mocker, k8s_mocked, replenished, caplog, assert_logs, settings): event = bodies.RawEvent( type='ADDED', # irrelevant @@ -187,8 +223,8 @@ async def test_ignored_for_lower_priority_peer_when_already_off( settings.peering.name = 'name' settings.peering.priority = 100 - replenished = asyncio.Event() freeze_mode = primitives.Toggle(False) + wait_for = mocker.patch('asyncio.wait_for') caplog.set_level(0) assert freeze_mode.is_off() @@ -202,6 +238,8 @@ async def test_ignored_for_lower_priority_peer_when_already_off( settings=settings, ) assert freeze_mode.is_off() + assert wait_for.call_count == 0 + assert not k8s_mocked.patch_obj.called assert_logs([], prohibited=[ "Possibly conflicting operators", "Freezing all operators, including self", @@ -212,7 +250,7 @@ async def test_ignored_for_lower_priority_peer_when_already_off( @freezegun.freeze_time('2020-12-31T23:59:59.123456') async def test_toggled_on_for_same_priority_peer_when_initially_off( - caplog, assert_logs, settings): + mocker, k8s_mocked, replenished, caplog, assert_logs, settings): event = bodies.RawEvent( type='ADDED', # irrelevant @@ -229,8 +267,8 @@ async def test_toggled_on_for_same_priority_peer_when_initially_off( settings.peering.name = 'name' settings.peering.priority = 100 - replenished = asyncio.Event() freeze_mode = primitives.Toggle(False) + wait_for = mocker.patch('asyncio.wait_for') caplog.set_level(0) assert freeze_mode.is_off() @@ -244,6 +282,9 @@ async def test_toggled_on_for_same_priority_peer_when_initially_off( settings=settings, ) assert freeze_mode.is_on() + assert wait_for.call_count == 1 + assert 9 < wait_for.call_args[1]['timeout'] < 10 + assert not k8s_mocked.patch_obj.called assert_logs([ "Possibly conflicting operators", "Freezing all operators, including self", @@ -255,7 +296,7 @@ async def test_toggled_on_for_same_priority_peer_when_initially_off( @freezegun.freeze_time('2020-12-31T23:59:59.123456') async def test_ignored_for_same_priority_peer_when_already_on( - caplog, assert_logs, settings): + mocker, k8s_mocked, replenished, caplog, assert_logs, settings): event = bodies.RawEvent( type='ADDED', # irrelevant @@ -272,8 +313,8 @@ async def test_ignored_for_same_priority_peer_when_already_on( settings.peering.name = 'name' settings.peering.priority = 100 - replenished = asyncio.Event() freeze_mode = primitives.Toggle(True) + wait_for = mocker.patch('asyncio.wait_for') caplog.set_level(0) assert freeze_mode.is_on() @@ -287,6 +328,9 @@ async def test_ignored_for_same_priority_peer_when_already_on( settings=settings, ) assert freeze_mode.is_on() + assert wait_for.call_count == 1 + assert 9 < wait_for.call_args[1]['timeout'] < 10 + assert not k8s_mocked.patch_obj.called assert_logs([ "Possibly conflicting operators", ], prohibited=[ @@ -294,3 +338,43 @@ async def test_ignored_for_same_priority_peer_when_already_on( "Freezing operations in favour of", "Resuming operations after the freeze", ]) + + +@freezegun.freeze_time('2020-12-31T23:59:59.123456') +@pytest.mark.parametrize('priority', [100, 101]) +async def test_resumes_immediately_on_expiration_of_blocking_peers( + mocker, k8s_mocked, replenished, caplog, assert_logs, settings, priority): + + event = bodies.RawEvent( + type='ADDED', # irrelevant + object={ + 'metadata': {'name': 'name', 'namespace': 'namespace'}, # for matching + 'status': { + 'higher-prio': { + 'priority': priority, + 'lifetime': 10, + 'lastseen': '2020-12-31T23:59:59' + }, + }, + }) + settings.peering.name = 'name' + settings.peering.priority = 100 + + freeze_mode = primitives.Toggle(True) + wait_for = mocker.patch('asyncio.wait_for', side_effect=asyncio.TimeoutError) + + caplog.set_level(0) + assert freeze_mode.is_on() + await process_peering_event( + raw_event=event, + freeze_mode=freeze_mode, + replenished=replenished, + autoclean=False, + namespace='namespace', + identity='id', + settings=settings, + ) + assert freeze_mode.is_on() + assert wait_for.call_count == 1 + assert 9 < wait_for.call_args[1]['timeout'] < 10 + assert k8s_mocked.patch_obj.called diff --git a/tests/peering/test_keepalive.py b/tests/peering/test_keepalive.py index 40594dff..912c30f7 100644 --- a/tests/peering/test_keepalive.py +++ b/tests/peering/test_keepalive.py @@ -13,13 +13,17 @@ async def test_background_task_runs(mocker, settings): sleep_mock = mocker.patch('asyncio.sleep') sleep_mock.side_effect = [None, None, StopInfiniteCycleException] + randint_mock = mocker.patch('random.randint') + randint_mock.side_effect = [7, 5, 9] + settings.peering.lifetime = 33 with pytest.raises(StopInfiniteCycleException): await keepalive(settings=settings, identity='id', namespace='namespace') + assert randint_mock.call_count == 3 # only to be sure that we test the right thing assert sleep_mock.call_count == 3 - assert sleep_mock.call_args_list[0][0][0] == 33 - 10 - assert sleep_mock.call_args_list[1][0][0] == 33 - 10 - assert sleep_mock.call_args_list[2][0][0] == 33 - 10 + assert sleep_mock.call_args_list[0][0][0] == 33 - 7 + assert sleep_mock.call_args_list[1][0][0] == 33 - 5 + assert sleep_mock.call_args_list[2][0][0] == 33 - 9 assert touch_mock.call_count == 4 # 3 updates + 1 clean-up diff --git a/tests/peering/test_peers.py b/tests/peering/test_peers.py index 4694f8a5..1fff7642 100644 --- a/tests/peering/test_peers.py +++ b/tests/peering/test_peers.py @@ -17,12 +17,7 @@ def test_defaults(): def test_repr(): peer = Peer(identity='some-id') text = repr(peer) - assert text.startswith('Peer(') - assert text.endswith(')') - assert 'identity=some-id' in text - assert 'priority=0' in text - assert 'lastseen=' in text - assert 'lifetime=' in text + assert text == "" @freezegun.freeze_time('2020-12-31T23:59:59.123456') @@ -37,12 +32,6 @@ def test_priority_unspecified(): assert peer.priority == 0 -@freezegun.freeze_time('2020-12-31T23:59:59.123456') -def test_creation_with_lifetime_as_timedelta(): - peer = Peer(identity='id', lifetime=datetime.timedelta(seconds=123)) - assert peer.lifetime == datetime.timedelta(seconds=123) - - @freezegun.freeze_time('2020-12-31T23:59:59.123456') def test_creation_with_lifetime_as_number(): peer = Peer(identity='id', lifetime=123) @@ -55,12 +44,6 @@ def test_creation_with_lifetime_unspecified(): assert peer.lifetime == datetime.timedelta(seconds=60) -@freezegun.freeze_time('2020-12-31T23:59:59.123456') -def test_creation_with_lastseen_as_datetime(): - peer = Peer(identity='id', lastseen=datetime.datetime(2020, 1, 1, 12, 34, 56, 789123)) - assert peer.lastseen == datetime.datetime(2020, 1, 1, 12, 34, 56, 789123) - - @freezegun.freeze_time('2020-12-31T23:59:59.123456') def test_creation_with_lastseen_as_string(): peer = Peer(identity='id', lastseen='2020-01-01T12:34:56.789123')