Skip to content

Commit

Permalink
Merge branch 'main' into patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
nolar committed Sep 21, 2021
2 parents 0a19b09 + 6031d38 commit 0c50c9f
Show file tree
Hide file tree
Showing 13 changed files with 528 additions and 63 deletions.
31 changes: 21 additions & 10 deletions docs/embedding.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ This can be used, for example, in desktop applications or web APIs/UIs
to keep the state of the cluster and its resources in memory.


Manual orchestration
====================
Manual execution
================

Since Kopf is fully asynchronous, the best way to run Kopf is to provide
an event-loop dedicated to Kopf in a separate thread, while running
the main application in the main thread.
an event-loop in a separate thread, which is dedicated to Kopf,
while running the main application in the main thread:

.. code-block:: python
Expand All @@ -28,8 +28,7 @@ the main application in the main thread.
pass
def kopf_thread():
loop = asyncio.get_event_loop()
loop.run_until_complete(kopf.operator())
asyncio.run(kopf.operator())
def main():
thread = threading.Thread(target=kopf_thread)
Expand All @@ -45,13 +44,17 @@ so its event-loop runs in the main thread.
the OS signal handlers, so a developer should implement the termination
themselves (cancellation of an operator task is enough).


Manual orchestration
====================

Alternatively, a developer can orchestrate the operator's tasks and sub-tasks
themselves. The example above is an equivalent of the following:

.. code-block:: python
def kopf_thread():
loop = asyncio.get_event_loop()
loop = asyncio.get_event_loop_policy().get_event_loop()
tasks = loop.run_until_complete(kopf.spawn_tasks())
loop.run_until_complete(kopf.run_tasks(tasks, return_when=asyncio.FIRST_COMPLETED))
Expand All @@ -60,10 +63,19 @@ Or, if proper cancellation and termination are not expected, of the following:
.. code-block:: python
def kopf_thread():
loop = asyncio.get_event_loop()
loop = asyncio.get_event_loop_policy().get_event_loop()
tasks = loop.run_until_complete(kopf.spawn_tasks())
loop.run_until_complete(asyncio.wait(tasks))
In all cases, make sure that asyncio event loops are properly used.
Specifically, :func:`asyncio.run` creates and finalises a new event loop
for a single call. Several calls cannot share the coroutines and tasks.
To make several calls, either create a new event loop, or get the event loop
of the current asyncio _context_ (by default, of the current thread).
See more on the asyncio event loops and _contexts_ in `Asyncio Policies`__.

__ https://docs.python.org/3/library/asyncio-policy.html


Multiple operators
==================
Expand All @@ -90,8 +102,7 @@ in :mod:`contextvars` containers with values isolated per-loop and per-task.
pass
def kopf_thread():
loop = asyncio.get_event_loop()
loop.run_until_complete(kopf.operator(
asyncio.run(kopf.operator(
registry=registry,
))
Expand Down
201 changes: 197 additions & 4 deletions kopf/_cogs/aiokits/aiotasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
Anyway, ``asyncio`` wraps all awaitables and coroutines into tasks on almost
all function calls with multiple awaiables (e.g. :func:`asyncio.wait`),
so there is no added overhead; intstead, the implicit overhead is made explicit.
so there is no added overhead; instead, the implicit overhead is made explicit.
"""
import asyncio
import logging
import sys
from typing import TYPE_CHECKING, Any, Awaitable, Collection, Coroutine, \
Generator, Optional, Set, Tuple, TypeVar, Union
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Collection, Coroutine, \
Generator, NamedTuple, Optional, Set, Tuple, TypeVar, Union

_T = TypeVar('_T')

Expand All @@ -38,6 +38,40 @@ def create_task(
return asyncio.create_task(coro)


async def cancel_coro(
coro: Coroutine[Any, Any, Any],
*,
name: Optional[str] = None,
) -> None:
"""
Cancel the coroutine if the wrapped code block is cancelled or fails.
All coroutines must be awaited to prevent RuntimeWarnings/ResourceWarnings.
As such, we generally need to create a dummy task and cancel it immediately.
Despite asyncio tasks are lightweight, they still create an object and thus
consume memory. This can be undesired when applied at scale --- e.g.
in the multiplexer: when the watcher exits, it cancels all pending workers.
To save memory, we first try to close the coroutine with no dummy task.
As a fallback, the coroutine is cancelled gracefully via a dummy task.
The context manager should be applied to all async code in the managing
(i.e. parent/wrapping) coroutine from the beginning of it till the managed
coro is actually awaited and executed.
"""
try:
# A dirty (undocumented) way to close a coro, but it saves memory.
coro.close() # OR: coro.throw(asyncio.CancelledError())
except AttributeError:
# The official way is to create an extra task object, thus to waste some memory.
corotask = create_task(coro=coro, name=name)
corotask.cancel()
try:
await corotask
except asyncio.CancelledError:
pass


async def guard(
coro: Coroutine[Any, Any, Any],
name: str,
Expand All @@ -63,7 +97,11 @@ async def guard(

# Guarded tasks can have prerequisites, which are set in other tasks.
if flag is not None:
await flag.wait()
try:
await flag.wait()
except asyncio.CancelledError:
await cancel_coro(coro=coro, name=name)
raise

try:
await coro
Expand Down Expand Up @@ -215,3 +253,158 @@ async def all_tasks(
current_task = asyncio.current_task()
return {task for task in asyncio.all_tasks()
if task is not current_task and task not in ignored}


class SchedulerJob(NamedTuple):
coro: Coroutine[Any, Any, Any]
name: Optional[str]


class Scheduler:
"""
An scheduler/orchestrator/executor for "fire-and-forget" tasks.
Coroutines can be spawned via this scheduler and forgotten: no need to wait
for them or to check their status --- the scheduler will take care of it.
It is a simplified equivalent of aiojobs, but compatible with Python 3.10.
Python 3.10 removed the explicit event loops (deprecated since Python 3.7),
which broke aiojobs. At the same time, aiojobs looks unmaintained
and contains no essential changes since July 2019 (i.e. for 2+ years).
Hence the necessity to replicate the functionality.
The scheduler is needed only for internal use and is not exposed to users.
It is mainly used in the multiplexer (splitting a single stream of events
of a resource kind into multiple queues of individual resource objects).
Therefore, many of the features of aiojobs are removed as unnecessary:
no individual task/job handling or closing, no timeouts, etc.
.. note::
Despite all coros will be wrapped into tasks sooner or later,
and despite it is convincing to do this earlier and manage the tasks
rather than our own queue of coros+names, do not do this:
we want all tasks to refer to their true coros in their reprs,
not to wrappers which wait until the running capacity is available.
"""

def __init__(
self,
*,
limit: Optional[int] = None,
exception_handler: Optional[Callable[[BaseException], None]] = None,
) -> None:
super().__init__()
self._closed = False
self._limit = limit
self._exception_handler = exception_handler
self._condition = asyncio.Condition()
self._pending_coros: asyncio.Queue[SchedulerJob] = asyncio.Queue()
self._running_tasks: Set[Task] = set()
self._cleaning_queue: asyncio.Queue[Task] = asyncio.Queue()
self._cleaning_task = create_task(self._task_cleaner(), name=f"task cleaner of {self!r}")
self._spawning_task = create_task(self._task_spawner(), name=f"task spawner of {self!r}")

def empty(self) -> bool:
""" Check if the scheduler has nothing to do. """
return self._pending_coros.empty() and not self._running_tasks

async def wait(self) -> None:
"""
Wait until the scheduler does nothing, i.e. idling (all tasks are done).
"""
async with self._condition:
await self._condition.wait_for(self.empty)

async def close(self) -> None:
"""
Stop accepting new tasks and cancel all running/pending ones.
"""

# Ensure that all pending coros are awaited -- to prevent RuntimeWarnings/ResourceWarnings.
# But do it via the normal flow, i.e. without exceeding the limit of the scheduler (if any).
self._closed = True
for task in self._running_tasks:
task.cancel()

# Wait until all tasks are fully done (it can take some time). This also includes
# the pending coros, which are spawned and instantly cancelled (to prevent RuntimeWarnings).
await self.wait()

# Cleanup the scheduler's own resources.
await stop({self._spawning_task, self._cleaning_task}, title="scheduler", quiet=True)

async def spawn(
self,
coro: Coroutine[Any, Any, Any],
*,
name: Optional[str] = None,
) -> None:
"""
Schedule a coroutine for ownership and eventual execution.
Coroutine ownership ensures that all "fire-and-forget" coroutines
that were passed to the scheduler will be awaited (to prevent warnings),
even if the scheduler is closed before the coroutines are started.
If a coroutine is added to a closed scheduler, it will be instantly
cancelled before raising the scheduler's exception.
"""
if self._closed:
await cancel_coro(coro=coro, name=name)
raise RuntimeError("Cannot add new coroutines to a closed and inactive scheduler.")
async with self._condition:
await self._pending_coros.put(SchedulerJob(coro=coro, name=name))
self._condition.notify_all() # -> task_spawner()

def _can_spawn(self) -> bool:
return (not self._pending_coros.empty() and
(self._limit is None or len(self._running_tasks) < self._limit))

async def _task_spawner(self) -> None:
""" An internal meta-task to actually start pending coros as tasks. """
while True:
async with self._condition:
await self._condition.wait_for(self._can_spawn)

# Spawn as many tasks as allowed and as many coros as available at the moment.
# Since nothing monitors the tasks "actively", we configure them to report back
# when they are finished --- to be awaited and released "passively".
while self._can_spawn():
coro, name = self._pending_coros.get_nowait() # guaranteed by the predicate
task = create_task(coro=coro, name=name)
task.add_done_callback(self._task_done_callback)
self._running_tasks.add(task)
if self._closed:
task.cancel() # used to await the coros without executing them.

async def _task_cleaner(self) -> None:
""" An internal meta-task to cleanup the actually finished tasks. """
while True:
task = await self._cleaning_queue.get()

# Await the task from an outer context to prevent RuntimeWarnings/ResourceWarnings.
try:
await task
except BaseException:
pass

# Ping other tasks to refill the pool of running tasks (or to close the scheduler).
async with self._condition:
self._running_tasks.discard(task)
self._condition.notify_all() # -> task_spawner() & close()

def _task_done_callback(self, task: Task) -> None:
# When a "fire-and-forget" task is done, release its system resources immediately:
# nothing else is going to explicitly "await" for it any time soon, so we must do it.
# But since a callback cannot be async, "awaiting" is done in a background utility task.
self._running_tasks.discard(task)
self._cleaning_queue.put_nowait(task)

# If failed, initiate a callback defined by the owner of the task (if any).
exc: Optional[BaseException]
try:
exc = task.exception()
except asyncio.CancelledError:
exc = None
if exc is not None and self._exception_handler is not None:
self._exception_handler(exc)
2 changes: 1 addition & 1 deletion kopf/_cogs/clients/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def __init__(
context: ssl.SSLContext
if certificate_path and private_key_path:
context = ssl.create_default_context(
purpose=ssl.Purpose.CLIENT_AUTH,
purpose=ssl.Purpose.SERVER_AUTH,
cafile=ca_path)
context.load_cert_chain(
certfile=certificate_path,
Expand Down
2 changes: 1 addition & 1 deletion kopf/_core/actions/invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ async def invoke(
# in the task than to have orphan threads which deplete the executor's pool capacity.
# Cancellation is postponed until the thread exits, but it happens anyway (for consistency).
# Note: the docs say the result is a future, but typesheds say it is a coroutine => cast()!
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
executor = settings.execution.executor if settings is not None else None
future = cast(aiotasks.Future, loop.run_in_executor(executor, real_fn))
cancellation: Optional[asyncio.CancelledError] = None
Expand Down
3 changes: 2 additions & 1 deletion kopf/_core/actions/loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ def configure(
if not debug:
logger.handlers[:] = [logging.NullHandler()]

loop = asyncio.get_event_loop()
# Since Python 3.10, get_event_loop() is deprecated, issues a warning. Here is a way around:
loop = asyncio.get_event_loop_policy().get_event_loop()
loop.set_debug(bool(debug))


Expand Down
26 changes: 14 additions & 12 deletions kopf/_core/engines/daemons.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
from typing import Collection, Dict, Iterable, List, Mapping, \
MutableMapping, Optional, Sequence, Set, Union

import aiojobs

from kopf._cogs.aiokits import aiotasks, aiotime, aiotoggles
from kopf._cogs.configs import configuration
from kopf._cogs.structs import bodies, ids, patches
Expand Down Expand Up @@ -172,7 +170,7 @@ async def stop_daemons(
* EITHER the operator should block this resource's processing and wait until
the daemons are terminated -- thus leaking daemon's abstractions and logic
and tools (e.g. aiojobs scheduler) to the upper level of processing;
and tools (e.g. a task scheduler) to the upper level of processing;
* OR the daemons termination should mimic the change-detection handlers
and simulate the delays with multiple handling cycles -- in order to
Expand Down Expand Up @@ -275,7 +273,7 @@ async def daemon_killer(
"""
# Unlimited job pool size —- the same as if we would be managing the tasks directly.
# Unlimited timeout in `close()` -- since we have our own per-daemon timeout management.
scheduler: aiojobs.Scheduler = await aiojobs.create_scheduler(limit=None, close_timeout=99999)
scheduler = aiotasks.Scheduler()
try:
while True:

Expand All @@ -286,10 +284,12 @@ async def daemon_killer(
# The daemons remain resumable, since they exit not on their own accord.
for memory in memories.iter_all_daemon_memories():
for daemon in memory.running_daemons.values():
await scheduler.spawn(stop_daemon(
settings=settings,
daemon=daemon,
reason=stoppers.DaemonStoppingReason.OPERATOR_PAUSING))
await scheduler.spawn(
name=f"pausing stopper of {daemon}",
coro=stop_daemon(
settings=settings,
daemon=daemon,
reason=stoppers.DaemonStoppingReason.OPERATOR_PAUSING))

# Stay here while the operator is paused, until it is resumed.
# The fresh stream of watch-events will spawn new daemons naturally.
Expand All @@ -299,10 +299,12 @@ async def daemon_killer(
finally:
for memory in memories.iter_all_daemon_memories():
for daemon in memory.running_daemons.values():
await scheduler.spawn(stop_daemon(
settings=settings,
daemon=daemon,
reason=stoppers.DaemonStoppingReason.OPERATOR_EXITING))
await scheduler.spawn(
name=f"exiting stopper of {daemon}",
coro=stop_daemon(
settings=settings,
daemon=daemon,
reason=stoppers.DaemonStoppingReason.OPERATOR_EXITING))
await scheduler.close()


Expand Down
Loading

0 comments on commit 0c50c9f

Please sign in to comment.