Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Convert client wrappers to coroutines, use thread pools for actual calls #217

Merged
merged 1 commit into from
Nov 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions kopf/clients/classes.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
import asyncio
import functools
from typing import Type

import pykube

from kopf import config
from kopf.clients import auth
from kopf.structs import resources


def _make_cls(
async def _make_cls(
resource: resources.Resource,
) -> Type[pykube.objects.APIObject]:

loop = asyncio.get_running_loop()
api = auth.get_pykube_api()
api_resources = api.resource_list(resource.api_version)['resources']
fn = functools.partial(api.resource_list, resource.api_version)
rsp = await loop.run_in_executor(config.WorkersConfig.get_syn_executor(), fn)

api_resources = rsp['resources']
resource_kind = next((r['kind'] for r in api_resources if r['name'] == resource.plural), None)
is_namespaced = next((r['namespaced'] for r in api_resources if r['name'] == resource.plural), None)
if not resource_kind:
Expand Down
32 changes: 21 additions & 11 deletions kopf/clients/fetching.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import asyncio
import enum
import functools
from typing import TypeVar, Optional, Union, Collection, List, Tuple, cast

import pykube
import requests
from typing import TypeVar, Optional, Union, Collection, List, Tuple, cast

from kopf import config
from kopf.clients import auth
from kopf.clients import classes
from kopf.structs import bodies
Expand All @@ -16,17 +19,19 @@ class _UNSET(enum.Enum):
token = enum.auto()


def read_crd(
async def read_crd(
*,
resource: resources.Resource,
default: Union[_T, _UNSET] = _UNSET.token,
) -> Union[bodies.Body, _T]:
try:
loop = asyncio.get_running_loop()
api = auth.get_pykube_api()
cls = pykube.CustomResourceDefinition
obj = cls.objects(api, namespace=None).get_by_name(name=resource.name)
qry = cls.objects(api, namespace=None)
fn = functools.partial(qry.get_by_name, name=resource.name)
obj = await loop.run_in_executor(config.WorkersConfig.get_syn_executor(), fn)
return cast(bodies.Body, obj.obj)

except pykube.ObjectDoesNotExist:
if not isinstance(default, _UNSET):
return default
Expand All @@ -37,18 +42,21 @@ def read_crd(
raise


def read_obj(
async def read_obj(
*,
resource: resources.Resource,
namespace: Optional[str] = None,
name: Optional[str] = None,
default: Union[_T, _UNSET] = _UNSET.token,
) -> Union[bodies.Body, _T]:
try:
loop = asyncio.get_running_loop()
api = auth.get_pykube_api()
cls = classes._make_cls(resource=resource)
cls = await classes._make_cls(resource=resource)
namespace = namespace if issubclass(cls, pykube.objects.NamespacedAPIObject) else None
obj = cls.objects(api, namespace=namespace).get_by_name(name=name)
qry = cls.objects(api, namespace=namespace)
fn = functools.partial(qry.get_by_name, name=name)
obj = await loop.run_in_executor(config.WorkersConfig.get_syn_executor(), fn)
return cast(bodies.Body, obj.obj)
except pykube.ObjectDoesNotExist:
if not isinstance(default, _UNSET):
Expand All @@ -60,7 +68,7 @@ def read_obj(
raise


def list_objs_rv(
async def list_objs_rv(
*,
resource: resources.Resource,
namespace: Optional[str] = None,
Expand All @@ -77,11 +85,13 @@ def list_objs_rv(

* The resource is namespace-scoped AND operator is namespaced-restricted.
"""
loop = asyncio.get_running_loop()
api = auth.get_pykube_api()
cls = classes._make_cls(resource=resource)
cls = await classes._make_cls(resource=resource)
namespace = namespace if issubclass(cls, pykube.objects.NamespacedAPIObject) else None
lst = cls.objects(api, namespace=pykube.all if namespace is None else namespace)
rsp = lst.response
qry = cls.objects(api, namespace=pykube.all if namespace is None else namespace)
fn = lambda: qry.response # it is a property, so cannot be threaded without lambdas.
rsp = await loop.run_in_executor(config.WorkersConfig.get_syn_executor(), fn)

items: List[bodies.Body] = []
resource_version = rsp.get('metadata', {}).get('resourceVersion', None)
Expand Down
2 changes: 1 addition & 1 deletion kopf/clients/patching.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async def patch_obj(
body['metadata']['namespace'] = namespace

api = auth.get_pykube_api()
cls = classes._make_cls(resource=resource)
cls = await classes._make_cls(resource=resource)
obj = cls(api, body)

# The handler could delete its own object, so we have nothing to patch. It is okay, ignore.
Expand Down
39 changes: 23 additions & 16 deletions kopf/clients/watching.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"""

import asyncio
import collections
import concurrent.futures
import logging
from typing import Optional, Iterator, AsyncIterator, cast
Expand All @@ -34,6 +35,9 @@

logger = logging.getLogger(__name__)

# Pykube declares it inside of a function, not importable from the package/module.
PykubeWatchEvent = collections.namedtuple("WatchEvent", "type object")


class WatchingError(Exception):
"""
Expand All @@ -48,7 +52,7 @@ class StopStreaming(RuntimeError):
"""


def streaming_next(__src: Iterator[bodies.RawEvent]) -> bodies.RawEvent:
def streaming_next(__src: Iterator[PykubeWatchEvent]) -> PykubeWatchEvent:
"""
Same as `next`, but replaces the `StopIteration` with `StopStreaming`.
"""
Expand All @@ -59,11 +63,11 @@ def streaming_next(__src: Iterator[bodies.RawEvent]) -> bodies.RawEvent:


async def streaming_aiter(
__src: Iterator[bodies.RawEvent],
__src: Iterator[PykubeWatchEvent],
*,
loop: Optional[asyncio.AbstractEventLoop] = None,
executor: Optional[concurrent.futures.Executor] = None,
) -> AsyncIterator[bodies.RawEvent]:
) -> AsyncIterator[PykubeWatchEvent]:
"""
Same as `iter`, but asynchronous and stops on `StopStreaming`, not on `StopIteration`.
"""
Expand Down Expand Up @@ -107,16 +111,17 @@ async def streaming_watch(

# First, list the resources regularly, and get the list's resource version.
# Simulate the events with type "None" event - used in detection of causes.
items, resource_version = fetching.list_objs_rv(resource=resource, namespace=namespace)
items, resource_version = await fetching.list_objs_rv(resource=resource, namespace=namespace)
for item in items:
yield {'type': None, 'object': item}

# Then, watch the resources starting from the list's resource version.
loop = asyncio.get_event_loop()
stream = watch_objs(resource=resource, namespace=namespace,
timeout=config.WatchersConfig.default_stream_timeout,
since=resource_version)
async for event in streaming_aiter(stream, loop=loop):
stream = watch_objs(
resource=resource, namespace=namespace,
timeout=config.WatchersConfig.default_stream_timeout,
since=resource_version,
)
async for event in stream:

# "410 Gone" is for the "resource version too old" error, we must restart watching.
# The resource versions are lost by k8s after few minutes (as per the official doc).
Expand All @@ -138,13 +143,13 @@ async def streaming_watch(
yield cast(bodies.Event, event)


def watch_objs(
async def watch_objs(
*,
resource: resources.Resource,
namespace: Optional[str] = None,
timeout: Optional[float] = None,
since: Optional[str] = None,
) -> Iterator[bodies.RawEvent]:
) -> AsyncIterator[bodies.RawEvent]:
"""
Watch objects of a specific resource type.

Expand All @@ -157,17 +162,19 @@ def watch_objs(

* The resource is namespace-scoped AND operator is namespaced-restricted.
"""
src: Iterator[PykubeWatchEvent]

params = {}
if timeout is not None:
params['timeoutSeconds'] = timeout

api = auth.get_pykube_api(timeout=None)
cls = classes._make_cls(resource=resource)
cls = await classes._make_cls(resource=resource)
namespace = namespace if issubclass(cls, pykube.objects.NamespacedAPIObject) else None
lst = cls.objects(api, namespace=pykube.all if namespace is None else namespace)
src = lst.watch(since=since, params=params)
return iter(cast(bodies.RawEvent, {
'type': event.type,
'object': event.object.obj,
}) for event in src)
async for event in streaming_aiter(iter(src)):
yield cast(bodies.RawEvent, {
'type': event.type,
'object': event.object.obj,
})
20 changes: 10 additions & 10 deletions kopf/engines/peering.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def resource(self) -> resources.Resource:
return LEGACY_PEERING_RESOURCE if self.legacy else CLUSTER_PEERING_RESOURCE if self.namespace is None else NAMESPACED_PEERING_RESOURCE

@classmethod
def detect(
async def detect(
cls,
standalone: bool,
namespace: Optional[str],
Expand All @@ -106,16 +106,16 @@ def detect(
return None

if name:
if Peer._is_peering_exist(name, namespace=namespace):
if await Peer._is_peering_exist(name, namespace=namespace):
return cls(name=name, namespace=namespace, **kwargs)
elif Peer._is_peering_legacy(name, namespace=namespace):
elif await Peer._is_peering_legacy(name, namespace=namespace):
return cls(name=name, namespace=namespace, legacy=True, **kwargs)
else:
raise Exception(f"The peering {name!r} was not found")

if Peer._is_peering_exist(name=PEERING_DEFAULT_NAME, namespace=namespace):
if await Peer._is_peering_exist(name=PEERING_DEFAULT_NAME, namespace=namespace):
return cls(name=PEERING_DEFAULT_NAME, namespace=namespace, **kwargs)
elif Peer._is_peering_legacy(name=PEERING_DEFAULT_NAME, namespace=namespace):
elif await Peer._is_peering_legacy(name=PEERING_DEFAULT_NAME, namespace=namespace):
return cls(name=PEERING_DEFAULT_NAME, namespace=namespace, legacy=True, **kwargs)

logger.warning(f"Default peering object not found, falling back to the standalone mode.")
Expand Down Expand Up @@ -153,13 +153,13 @@ async def disappear(self) -> None:
await apply_peers([self], name=self.name, namespace=self.namespace, legacy=self.legacy)

@staticmethod
def _is_peering_exist(name: str, namespace: Optional[str]) -> bool:
async def _is_peering_exist(name: str, namespace: Optional[str]) -> bool:
resource = CLUSTER_PEERING_RESOURCE if namespace is None else NAMESPACED_PEERING_RESOURCE
obj = fetching.read_obj(resource=resource, namespace=namespace, name=name, default=None)
obj = await fetching.read_obj(resource=resource, namespace=namespace, name=name, default=None)
return obj is not None

@staticmethod
def _is_peering_legacy(name: str, namespace: Optional[str]) -> bool:
async def _is_peering_legacy(name: str, namespace: Optional[str]) -> bool:
"""
Legacy mode for the peering: cluster-scoped KopfPeering (new mode: namespaced).

Expand All @@ -168,14 +168,14 @@ def _is_peering_legacy(name: str, namespace: Optional[str]) -> bool:
This logic will be removed since 1.0.
Deploy ``ClusterKopfPeering`` as per documentation, and use it normally.
"""
crd = fetching.read_crd(resource=LEGACY_PEERING_RESOURCE, default=None)
crd = await fetching.read_crd(resource=LEGACY_PEERING_RESOURCE, default=None)
if crd is None:
return False

if str(crd.get('spec', {}).get('scope', '')).lower() != 'cluster':
return False # no legacy mode detected

obj = fetching.read_obj(resource=LEGACY_PEERING_RESOURCE, name=name, default=None)
obj = await fetching.read_obj(resource=LEGACY_PEERING_RESOURCE, name=name, default=None)
return obj is not None


Expand Down
2 changes: 1 addition & 1 deletion kopf/reactor/running.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ async def spawn_tasks(
])

# Monitor the peers, unless explicitly disabled.
ourselves: Optional[peering.Peer] = peering.Peer.detect(
ourselves: Optional[peering.Peer] = await peering.Peer.detect(
id=peering.detect_own_id(), priority=priority,
standalone=standalone, namespace=namespace, name=peering_name,
)
Expand Down
12 changes: 6 additions & 6 deletions tests/k8s/test_list_objs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
from kopf.clients.fetching import list_objs_rv


def test_when_successful_clustered(req_mock, resource):
async def test_when_successful_clustered(req_mock, resource):
result = {'items': [{}, {}]}
req_mock.get.return_value.json.return_value = result

items, resource_version = list_objs_rv(resource=resource, namespace=None)
items, resource_version = await list_objs_rv(resource=resource, namespace=None)
assert items == result['items']

assert req_mock.get.called
Expand All @@ -19,11 +19,11 @@ def test_when_successful_clustered(req_mock, resource):
assert 'namespaces/' not in url


def test_when_successful_namespaced(req_mock, resource):
async def test_when_successful_namespaced(req_mock, resource):
result = {'items': [{}, {}]}
req_mock.get.return_value.json.return_value = result

items, resource_version = list_objs_rv(resource=resource, namespace='ns1')
items, resource_version = await list_objs_rv(resource=resource, namespace='ns1')
assert items == result['items']

assert req_mock.get.called
Expand All @@ -35,12 +35,12 @@ def test_when_successful_namespaced(req_mock, resource):

@pytest.mark.parametrize('namespace', [None, 'ns1'], ids=['without-namespace', 'with-namespace'])
@pytest.mark.parametrize('status', [400, 401, 403, 500, 666])
def test_raises_api_error(req_mock, resource, namespace, status):
async def test_raises_api_error(req_mock, resource, namespace, status):
response = requests.Response()
response.status_code = status
error = requests.exceptions.HTTPError("boo!", response=response)
req_mock.get.side_effect = error

with pytest.raises(requests.exceptions.HTTPError) as e:
list_objs_rv(resource=resource, namespace=namespace)
await list_objs_rv(resource=resource, namespace=namespace)
assert e.value.response.status_code == status
Loading