Skip to content

Commit

Permalink
Merge pull request #81 from lsst-sqre/tickets/DM-31418
Browse files Browse the repository at this point in the history
[DM-31418] Fix various race conditions
  • Loading branch information
rra authored Aug 19, 2021
2 parents 4f9c8ce + c0cb94d commit 7005e60
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 80 deletions.
3 changes: 2 additions & 1 deletion requirements/main.in
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ inflect
jinja2
jupyterhub==1.3.0
jupyterhub-idle-culler
https://github.com/cbanek/kubespawner/archive/f2810911a7d5206535db52a8fd5762bd28878975.zip
jupyterhub-kubespawner
psycopg2
ruamel.yaml
tornado
12 changes: 7 additions & 5 deletions requirements/main.txt
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,13 @@ jupyterhub-idle-culler==1.1 \
--hash=sha256:45bceffeea8758b60e1f60650bc3de0d0d1e5f774623793db268ded1fa0cbdcb \
--hash=sha256:600b2713f90ecc475f690747d1ec02bcae8f44fa7f3e51a71e400727895631d8
# via -r requirements/main.in
https://github.com/cbanek/kubespawner/archive/f2810911a7d5206535db52a8fd5762bd28878975.zip \
--hash=sha256:dbf75280cbaedc5aef84f038edca34fc8a656bf1ef591beb67ee605c8bfe9ba1
jupyterhub-kubespawner==1.1.0 \
--hash=sha256:39775f30546de7dcdbf324ae468b50829daf3742604ab01e6023883230ee0ffd \
--hash=sha256:867d7d364be870e72b9ac81fa90c167cc208d8d1185d58e80e6e75692eb2bdeb
# via -r requirements/main.in
kubernetes==17.17.0 \
--hash=sha256:225a95a0aadbd5b645ab389d941a7980db8cdad2a776fde64d1b43fc3299bde9 \
--hash=sha256:c69b318696ba797dcf63eb928a8d4370c52319f4140023c502d7dfdf2080eb79
kubernetes==18.20.0 \
--hash=sha256:0c72d00e7883375bd39ae99758425f5e6cb86388417cf7cc84305c211b2192cf \
--hash=sha256:ff31ec17437293e7d4e1459f1228c42d27c7724dfb56b4868aba7a901a5b72c9
# via jupyterhub-kubespawner
mako==1.1.4 \
--hash=sha256:17831f0b7087c313c0ffae2bcbbd3c1d5ba9eeac9c38f2eb7b50e8c99fe9d5ab \
Expand Down Expand Up @@ -691,6 +692,7 @@ tornado==6.1 \
--hash=sha256:fa2ba70284fa42c2a5ecb35e322e68823288a4251f9ba9cc77be04ae15eada68 \
--hash=sha256:fba85b6cd9c39be262fcd23865652920832b61583de2a2ca907dbd8e8a8c81e5
# via
# -r requirements/main.in
# jupyterhub
# jupyterhub-idle-culler
traitlets==5.0.5 \
Expand Down
6 changes: 4 additions & 2 deletions src/nublado2/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ async def pre_spawn(self, spawner: Spawner) -> None:

await self.resourcemgr.create_user_resources(spawner, options)

def post_stop(self, spawner: Spawner) -> None:
async def post_stop(self, spawner: Spawner) -> None:
user = spawner.user.name
self.log.debug(f"Post stop-hook called for {user}")
self.resourcemgr.delete_user_resources(spawner.namespace)
await self.resourcemgr.delete_user_resources(
spawner, spawner.namespace
)

async def show_options(self, spawner: Spawner) -> str:
user = spawner.user.name
Expand Down
208 changes: 170 additions & 38 deletions src/nublado2/resourcemgr.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
"""Spawn and delete Kubernetes resources other than the pod."""

from __future__ import annotations

from datetime import timedelta
from functools import partial
from io import StringIO
from typing import TYPE_CHECKING

import kubernetes
from jinja2 import Template
from jupyterhub.utils import exponential_backoff
from kubernetes import client
from kubernetes.client.rest import ApiException
from kubernetes.utils import create_from_dict
from kubespawner.clients import shared_client
from ruamel.yaml import YAML
from tornado import gen
from traitlets.config import LoggingConfigurable

from nublado2.crdparser import CRDParser
Expand All @@ -17,23 +24,23 @@
if TYPE_CHECKING:
from typing import Any, Dict

from jupyterhub.spawner import Spawner
from jupyterhub.kubespawner import KubeSpawner

from nublado2.selectedoptions import SelectedOptions

# Load Kubernetes configuration. Because we create global class variables for
# the Kubernetes API, this has to be done during module load.
kubernetes.config.load_incluster_config()


class ResourceManager(LoggingConfigurable):
# These k8s clients don't copy well with locks, connection,
# pools, locks, etc. Copying seems to happen under the hood of the
# LoggingConfigurable base class, so just have them be class variables.
# Should be safe to share these, and better to have fewer of them.
k8s_api = client.ApiClient()
custom_api = client.CustomObjectsApi()
k8s_client = client.CoreV1Api()
"""Create additional Kubernetes resources when spawning labs.
This is conceptually a subclass of KubeSpawner but it's patched in via
hooks rather than as a proper subclass. It creates (or deletes) all of
the other resources we want to create for a lab pod, and then delegates
creation of the pod itself to KubeSpawner.
This class makes extensive use of KubeSpawner internals to avoid
reimplementing the wheel and to work nicely with KubeSpawner and its
concurrency model.
"""

def __init__(self) -> None:
self.nublado_config = NubladoConfig()
Expand All @@ -42,18 +49,42 @@ def __init__(self) -> None:
self.yaml.indent(mapping=2, sequence=4, offset=2)

async def create_user_resources(
self, spawner: Spawner, options: SelectedOptions
self, spawner: KubeSpawner, options: SelectedOptions
) -> None:
"""Create the user resources for this spawning session."""
await self.provisioner.provision_homedir(spawner)
try:
await exponential_backoff(
partial(
self._wait_for_namespace_deletion,
spawner,
spawner.namespace,
),
f"Namespace {spawner.namespace} still being deleted",
timeout=spawner.k8s_api_request_retry_timeout,
)
await self._create_kubernetes_resources(spawner, options)
except Exception:
self.log.exception("Exception creating user resource!")
raise

def _create_lab_environment_configmap(
self, spawner: Spawner, template_values: Dict[str, Any]
async def delete_user_resources(
self, spawner: KubeSpawner, namespace: str
) -> None:
"""Clean up a Jupyter lab by deleting the whole namespace.
The reason is it's easier to do this than try to make a list of
resources to delete, especially when new things may be dynamically
created outside of the hub, like dask.
"""
api = shared_client("CoreV1Api")
await gen.with_timeout(
timedelta(seconds=spawner.k8s_api_request_timeout),
spawner.asynchronize(api.delete_namespace, namespace),
)

async def _create_lab_environment_configmap(
self, spawner: KubeSpawner, template_values: Dict[str, Any]
) -> None:
"""Create the ConfigMap that holds environment settings for the lab."""
environment = {}
Expand All @@ -73,16 +104,20 @@ def _create_lab_environment_configmap(
),
data=environment,
)
self.k8s_client.create_namespaced_config_map(spawner.namespace, body)
await exponential_backoff(
partial(spawner._make_create_resource_request, "config_map", body),
f"Could not create ConfigMap {spawner.namespace}/lab-environment",
timeout=spawner.k8s_api_request_retry_timeout,
)

async def _create_kubernetes_resources(
self, spawner: Spawner, options: SelectedOptions
self, spawner: KubeSpawner, options: SelectedOptions
) -> None:
api_client = shared_client("ApiClient")
custom_api = shared_client("CustomObjectsApi")
template_values = await self._build_template_values(spawner, options)

# Generate the list of additional user resources from the template.
self.log.debug("Template:")
self.log.debug(self.nublado_config.user_resources_template)
t = Template(self.nublado_config.user_resources_template)
templated_user_resources = t.render(template_values)
self.log.debug("Generated user resources:")
Expand All @@ -91,6 +126,7 @@ async def _create_kubernetes_resources(

# Add in the standard labels and annotations common to every resource
# and create the resources.
service_account = None
for resource in resources:
if "metadata" not in resource:
resource["metadata"] = {}
Expand All @@ -105,24 +141,53 @@ async def _create_kubernetes_resources(
api_version = resource["apiVersion"]
if "." in api_version and ".k8s.io/" not in api_version:
crd_parser = CRDParser.from_crd_body(resource)
self.custom_api.create_namespaced_custom_object(
body=resource,
group=crd_parser.group,
version=crd_parser.version,
namespace=spawner.namespace,
plural=crd_parser.plural,
await gen.with_timeout(
timedelta(seconds=spawner.k8s_api_request_timeout),
spawner.asynchronize(
custom_api.create_namespaced_custom_object,
body=resource,
group=crd_parser.group,
version=crd_parser.version,
namespace=spawner.namespace,
plural=crd_parser.plural,
),
)
else:
create_from_dict(self.k8s_api, resource)
await gen.with_timeout(
timedelta(seconds=spawner.k8s_api_request_timeout),
spawner.asynchronize(
create_from_dict, api_client, resource
),
)

# If this was a service account, note its name.
if resource["kind"] == "ServiceAccount":
service_account = resource["metadata"]["name"]

# Construct the lab environment ConfigMap. This is constructed from
# configuration settings and doesn't use a resource template like
# other resources. This has to be done last, becuase the namespace is
# created from the user resources template.
self._create_lab_environment_configmap(spawner, template_values)
await self._create_lab_environment_configmap(spawner, template_values)

# Wait for the service account to generate a token before proceeding.
# Otherwise, we may try to create the pod before the service account
# token exists and Kubernetes will object.
if service_account:
await exponential_backoff(
partial(
self._wait_for_service_account_token,
spawner,
service_account,
spawner.namespace,
),
f"Service account {service_account} has no token",
timeout=spawner.k8s_api_request_retry_timeout,
)

async def _build_dask_template(self, spawner: Spawner) -> str:
async def _build_dask_template(self, spawner: KubeSpawner) -> str:
"""Build a template for dask workers from the jupyter pod manifest."""
api_client = shared_client("ApiClient")
dask_template = await spawner.get_pod_manifest()

# Here we make a few mangles to the jupyter pod manifest
Expand All @@ -144,17 +209,16 @@ async def _build_dask_template(self, spawner: Spawner) -> str:
# alone doesn't.
dask_yaml_stream = StringIO()
self.yaml.dump(
self.k8s_api.sanitize_for_serialization(dask_template),
api_client.sanitize_for_serialization(dask_template),
dask_yaml_stream,
)
return dask_yaml_stream.getvalue()

async def _build_template_values(
self, spawner: Spawner, options: SelectedOptions
self, spawner: KubeSpawner, options: SelectedOptions
) -> Dict[str, Any]:
"""Construct the template variables for Jinja templating."""
auth_state = await spawner.user.get_auth_state()
self.log.debug(f"Auth state={auth_state}")
groups = auth_state["groups"]

# Build a comma separated list of group:gid
Expand All @@ -179,10 +243,78 @@ async def _build_template_values(
self.log.debug(f"Template values={template_values}")
return template_values

def delete_user_resources(self, namespace: str) -> None:
"""Clean up a jupyterlab by deleting the whole namespace.
async def _wait_for_namespace_deletion(
self, spawner: KubeSpawner, name: str
) -> bool:
"""Waits for the user's namespace to be deleted.
If the namespace exists but has not been marked for deletion, try to
delete it. If we're spawning a new lab while the namespace still
exists, that means something has gone wrong with the user's lab and
there's nothing salvagable.
Returns
-------
done : `bool`
`True` if the namespace has been deleted, `False` if it still
exists
"""
api = shared_client("CoreV1Api")
try:
namespace = await gen.with_timeout(
timedelta(seconds=spawner.k8s_api_request_timeout),
spawner.asynchronize(api.read_namespace, name),
)
if namespace.status.phase != "Terminating":
# Paranoia to ensure that we don't delete some random service
# namespace if something weird happens.
assert name.startswith("nublado2-")
self.log.warning(f"Deleting abandoned namespace {name}")
await gen.with_timeout(
timedelta(seconds=spawner.k8s_api_request_timeout),
spawner.asynchronize(api.delete_namespace, name),
)
return False
except gen.TimeoutError:
return False
except ApiException as e:
if e.status == 404:
return True
raise

async def _wait_for_service_account_token(
self, spawner: KubeSpawner, name: str, namespace: str
) -> bool:
"""Waits for a service account to spawn an associated token.
The reason is it's easier to do this than try to make a list
of resources to delete, especially when new things may be
dynamically created outside of the hub, like dask."""
self.k8s_client.delete_namespace(name=namespace)
Returns
-------
done : `bool`
`True` once the secret exists, `False` otherwise (so it can be
called from ``exponential_backoff``)
"""
api = shared_client("CoreV1Api")
try:
service_account = await gen.with_timeout(
timedelta(seconds=spawner.k8s_api_request_timeout),
spawner.asynchronize(
api.read_namespaced_service_account, name, namespace
),
)
if not service_account.secrets:
return False
secret_name = service_account.secrets[0].name
secret = await gen.with_timeout(
timedelta(seconds=spawner.k8s_api_request_timeout),
spawner.asynchronize(
api.read_namespaced_secret, secret_name, namespace
),
)
return secret.metadata.name == secret_name
except gen.TimeoutError:
return False
except ApiException as e:
if e.status == 404:
self.log.debug("Waiting for secret for service account {name}")
return False
raise
11 changes: 2 additions & 9 deletions tests/provisioner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,11 @@
from typing import TYPE_CHECKING
from unittest.mock import MagicMock, Mock, patch

import kubernetes
import pytest
from aioresponses import CallbackResult, aioresponses
from jupyterhub.spawner import Spawner
from jupyterhub.user import User

# We have to patch out the Kubernetes configuration when importing
# nublado2.resourcemgr the first time, because it tries to load the Kubernetes
# configuration on module load.
with patch.object(kubernetes, "config"):
import nublado2.resourcemgr # noqa: F401

from nublado2.resourcemgr import ResourceManager

if TYPE_CHECKING:
Expand Down Expand Up @@ -73,15 +66,15 @@ def handler(url: str, **kwargs: Any) -> CallbackResult:
@pytest.mark.asyncio
async def test_provision() -> None:
resource_manager = ResourceManager()

# AsyncMock was introduced in Python 3.8, so sadly we can't use it yet.
spawner = Mock(spec=Spawner)
spawner.user = Mock(spec=User)
spawner.user.name = "someuser"
auth_state = {
"uid": 1234,
"groups": [{"name": "foo", "id": 1234}],
}

# AsyncMock was introduced in Python 3.8, so sadly we can't use it yet.
if sys.version_info < (3, 8):
spawner.user.get_auth_state.return_value = asyncio.Future()
spawner.user.get_auth_state.return_value.set_result(auth_state)
Expand Down
Loading

0 comments on commit 7005e60

Please sign in to comment.