diff --git a/kubespawner/reflector.py b/kubespawner/reflector.py index 92f705f2..5780b8bd 100644 --- a/kubespawner/reflector.py +++ b/kubespawner/reflector.py @@ -25,6 +25,14 @@ class NamespacedResourceReflector(LoggingConfigurable): """ ) + fields = Dict( + {}, + config=True, + help=""" + Fields to restrict the reflected objects + """ + ) + namespace = Unicode( None, allow_none=True, @@ -88,8 +96,10 @@ def __init__(self, *args, **kwargs): # FIXME: Protect against malicious labels? self.label_selector = ','.join(['{}={}'.format(k, v) for k, v in self.labels.items()]) + self.field_selector = ','.join(['{}={}'.format(k, v) for k, v in self.fields.items()]) self.first_load_future = Future() + self._stop_event = threading.Event() self.start() @@ -101,7 +111,8 @@ def _list_and_update(self): """ initial_resources = getattr(self.api, self.list_method_name)( self.namespace, - label_selector=self.label_selector + label_selector=self.label_selector, + field_selector=self.field_selector ) # This is an atomic operation on the dictionary! self.resources = {p.metadata.name: p for p in initial_resources.items} @@ -133,7 +144,7 @@ def _watch_and_update(self): """ cur_delay = 0.1 while True: - self.log.info("watching for %s with label selector %s in namespace %s", self.kind, self.label_selector, self.namespace) + self.log.info("watching for %s with label selector %s / field selector %s in namespace %s", self.kind, self.label_selector, self.field_selector, self.namespace) w = watch.Watch() try: resource_version = self._list_and_update() @@ -144,6 +155,7 @@ def _watch_and_update(self): getattr(self.api, self.list_method_name), self.namespace, label_selector=self.label_selector, + field_selector=self.field_selector, resource_version=resource_version, ): cur_delay = 0.1 @@ -154,6 +166,9 @@ def _watch_and_update(self): else: # This is an atomic operation on the dictionary! self.resources[resource.metadata.name] = resource + if self._stop_event.is_set(): + break + except Exception: cur_delay = cur_delay * 2 if cur_delay > 30: @@ -166,6 +181,9 @@ def _watch_and_update(self): continue finally: w.stop() + if self._stop_event.is_set(): + self.log.info("%s watcher stopped", self.kind) + break def start(self): """ @@ -186,4 +204,5 @@ def start(self): self.watch_thread.daemon = True self.watch_thread.start() - + def stop(self): + self._stop_event.set() diff --git a/kubespawner/spawner.py b/kubespawner/spawner.py index 27215fd3..4e28bfbe 100644 --- a/kubespawner/spawner.py +++ b/kubespawner/spawner.py @@ -43,6 +43,15 @@ class PodReflector(NamespacedResourceReflector): def pods(self): return self.resources +class EventReflector(NamespacedResourceReflector): + kind = 'events' + + list_method_name = 'list_namespaced_event' + + @property + def events(self): + return self.resources + class KubeSpawner(Spawner): """ Implement a JupyterHub spawner to spawn pods in a Kubernetes Cluster. @@ -1019,6 +1028,17 @@ def start(self): else: raise + main_loop = IOLoop.current() + def on_reflector_failure(): + self.log.critical("Events reflector failed, halting Hub.") + main_loop.stop() + + # events are selected based on pod name, which will include previous launch/stop + self.events = EventReflector( + parent=self, namespace=self.namespace, + fields={'involvedObject.kind': 'Pod', 'involvedObject.name': self.pod_name}, + on_failure=on_reflector_failure + ) # If we run into a 409 Conflict error, it means a pod with the # same name already exists. We stop it, wait for it to stop, and # try again. We try 4 times, and if it still fails we give up. @@ -1058,6 +1078,10 @@ def start(self): ) pod = self.pod_reflector.pods[self.pod_name] + self.log.debug('pod %s events before launch: %s', self.pod_name, self.events.events) + # Note: we stop the event watcher once launch is successful, but the reflector + # will only stop when the next event comes in, likely when it is stopped. + self.events.stop() return (pod.status.pod_ip, self.port) @gen.coroutine