From f36de79d19db9c2a7a75d1329ed0921bc429d9ec Mon Sep 17 00:00:00 2001 From: Valeriia Klestova <123654509+vlerkin@users.noreply.github.com> Date: Tue, 19 Nov 2024 14:11:10 +0100 Subject: [PATCH] Improve watching resources (PR #38) * extracted resource watching logic into a separate class; implemented logic for observer in a RecourceWatcher class; added method to stop a thread gracefully * finish the observer logic extraction; merge changes from main; add resource watcher instance to enable_joblogs to subscribe to the event watcher if the log feature is configured; delete logic about event watcher from main; pass container for list objects function instead of container name; remove start methon from log handler class; modify joblogs init to subscribe to event watcher * add number of retry attempts and exponential backoff time to the reconnect loop for event watcher; make number of reconnect attempts, backoff time and a coefficient for exponential growth configurable via config; add backoff_time, reconnection_attempts and backoff_coefficient as attributes to the resource watcher init; add resource_version as a param to w.stream so a failed stream can read from the last resource it was able to catch; add urllib3.exceptions.ProtocolError and handle reconnection after some exponential backoff time to avoid api flooding; add config as a param for init for resource watcher; modify config in kubernetes.yaml and k8s config to contain add backoff_time, reconnection_attempts and backoff_coefficient * add logic to reset number of reconnect attempts and backoff time when connection to the k8s was achieved so only sequential failures detected; add exception handling to watch_pods to handle failure in urllib3, when source version is old and not available anymore, and when stream is ended; remove k8s resource watcher initialization from run function in api.py and move it to k8s.py launcher as _init_resource_watcher; refactor existing logic from joblogs/__init__.py to keep it in _init_resource_watcher and enable_joblogs in k8s launcher * added a CONFIG.md file with detailed explanations about parameters used to re-establish connection to the Kubernetes wather * move section about config file from README.md to CONFIG.md; add a link to the CONFIG.md in the README.md; remove variables for reconnection_attempts, backoff_time and backoff_coefficient fron the sample config since default values are provided in the code. --- CONFIG.md | 38 +++++ README.md | 11 +- kubernetes.yaml | 2 + scrapyd_k8s.sample-k8s.conf | 3 + scrapyd_k8s/__main__.py | 4 +- scrapyd_k8s/api.py | 6 - scrapyd_k8s/joblogs/__init__.py | 26 +-- scrapyd_k8s/joblogs/log_handler_k8s.py | 78 ++++----- scrapyd_k8s/k8s_resource_watcher.py | 151 ++++++++++++++++++ scrapyd_k8s/launcher/k8s.py | 25 ++- scrapyd_k8s/object_storage/libcloud_driver.py | 3 +- 11 files changed, 251 insertions(+), 96 deletions(-) create mode 100644 CONFIG.md create mode 100644 scrapyd_k8s/k8s_resource_watcher.py diff --git a/CONFIG.md b/CONFIG.md new file mode 100644 index 0000000..767c287 --- /dev/null +++ b/CONFIG.md @@ -0,0 +1,38 @@ +## About +This file provides you with the detailed description of parameters listed in the config file, and explaining why they are used +and when you are expected to provide or change them. + +## Configuration file + +* `http_port` - defaults to `6800` ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#http-port)) +* `bind_address` - defaults to `127.0.0.1` ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#bind-address)) +* `max_proc` - _(implementation pending)_, if unset or `0` it will use the number of nodes in the cluster, defaults to `0` ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#max-proc)) +* `repository` - Python class for accessing the image repository, defaults to `scrapyd_k8s.repository.Remote` +* `launcher` - Python class for managing jobs on the cluster, defaults to `scrapyd_k8s.launcher.K8s` +* `username` - Set this and `password` to enable basic authentication ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#username)) +* `password` - Set this and `username` to enable basic authentication ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#password)) + +The Docker and Kubernetes launchers have their own additional options. + +## [scrapyd] section, reconnection_attempts, backoff_time, backoff_coefficient + +### Context +The Kubernetes event watcher is used in the code as part of the joblogs feature and is also utilized for limiting the +number of jobs running in parallel on the cluster. Both features are not enabled by default and can be activated if you +choose to use them. + +The event watcher establishes a connection to the Kubernetes API and receives a stream of events from it. However, the +nature of this long-lived connection is unstable; it can be interrupted by network issues, proxies configured to terminate +long-lived connections, and other factors. For this reason, a mechanism was implemented to re-establish the long-lived +connection to the Kubernetes API. To achieve this, three parameters were introduced: `reconnection_attempts`, +`backoff_time` and `backoff_coefficient`. + +### What are these parameters about? +- `reconnection_attempts` - defines how many consecutive attempts will be made to reconnect if the connection fails; +- `backoff_time` and `backoff_coefficient` - are used to gradually slow down each subsequent attempt to establish a +connection with the Kubernetes API, preventing the API from becoming overloaded with requests. The `backoff_time` increases +exponentially and is calculated as `backoff_time *= self.backoff_coefficient`. + +### When do I need to change it in the config file? +Default values for these parameters are provided in the code and are tuned to an "average" cluster setting. If your network +requirements or other conditions are unusual, you may need to adjust these values to better suit your specific setup. \ No newline at end of file diff --git a/README.md b/README.md index f53ebec..a495739 100644 --- a/README.md +++ b/README.md @@ -241,16 +241,7 @@ Not supported, by design. If you want to delete a project, remove it from the configuration file. ## Configuration file - -* `http_port` - defaults to `6800` ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#http-port)) -* `bind_address` - defaults to `127.0.0.1` ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#bind-address)) -* `max_proc` - _(implementation pending)_, if unset or `0` it will use the number of nodes in the cluster, defaults to `0` ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#max-proc)) -* `repository` - Python class for accessing the image repository, defaults to `scrapyd_k8s.repository.Remote` -* `launcher` - Python class for managing jobs on the cluster, defaults to `scrapyd_k8s.launcher.K8s` -* `username` - Set this and `password` to enable basic authentication ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#username)) -* `password` - Set this and `username` to enable basic authentication ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#password)) - -The Docker and Kubernetes launchers have their own additional options. +To read in detail about the config file, please, navigate to the [Configuration Guide](CONFIG.md) ## License diff --git a/kubernetes.yaml b/kubernetes.yaml index 11853ba..75813a6 100644 --- a/kubernetes.yaml +++ b/kubernetes.yaml @@ -87,6 +87,8 @@ data: launcher = scrapyd_k8s.launcher.K8s namespace = default + + max_proc = 2 # This is an example spider that should work out of the box. # Adapt the spider config to your use-case. diff --git a/scrapyd_k8s.sample-k8s.conf b/scrapyd_k8s.sample-k8s.conf index 44aa99e..9ab3536 100644 --- a/scrapyd_k8s.sample-k8s.conf +++ b/scrapyd_k8s.sample-k8s.conf @@ -19,6 +19,9 @@ namespace = default # Optional pull secret, in case you have private spiders. #pull_secret = ghcr-registry +# Maximum number of jobs running in parallel +max_proc = 10 + # For each project, define a project section. # This contains a repository that points to the remote container repository. # An optional env_secret is the name of a secret with additional environment diff --git a/scrapyd_k8s/__main__.py b/scrapyd_k8s/__main__.py index 6620da8..098fe6c 100644 --- a/scrapyd_k8s/__main__.py +++ b/scrapyd_k8s/__main__.py @@ -1,7 +1,6 @@ import logging import sys -from .api import run, config -from .joblogs import joblogs_init +from .api import run def setup_logging(): logging.basicConfig( @@ -14,5 +13,4 @@ def setup_logging(): if __name__ == "__main__": setup_logging() - joblogs_init(config) run() diff --git a/scrapyd_k8s/api.py b/scrapyd_k8s/api.py index dc45740..ff1e36c 100644 --- a/scrapyd_k8s/api.py +++ b/scrapyd_k8s/api.py @@ -155,11 +155,5 @@ def run(): if config_username is not None and config_password is not None: enable_authentication(app, config_username, config_password) - if config.joblogs() is not None: - launcher.enable_joblogs(config) - logger.info("Job logs handling enabled.") - else: - logger.debug("Job logs handling not enabled; 'joblogs' configuration section is missing.") - # run server app.run(host=host, port=port) diff --git a/scrapyd_k8s/joblogs/__init__.py b/scrapyd_k8s/joblogs/__init__.py index 483fb7b..e3632da 100644 --- a/scrapyd_k8s/joblogs/__init__.py +++ b/scrapyd_k8s/joblogs/__init__.py @@ -1,25 +1 @@ -import logging -from scrapyd_k8s.joblogs.log_handler_k8s import KubernetesJobLogHandler - -logger = logging.getLogger(__name__) - -def joblogs_init(config): - """ - Initializes job logs handling by starting the Kubernetes job log handler. - - Parameters - ---------- - config : Config - Configuration object containing settings for job logs and storage. - - Returns - ------- - None - """ - joblogs_config = config.joblogs() - if joblogs_config and joblogs_config.get('storage_provider') is not None: - log_handler = KubernetesJobLogHandler(config) - log_handler.start() - logger.info("Job logs handler started.") - else: - logger.warning("No storage provider configured; job logs will not be uploaded.") +from scrapyd_k8s.joblogs.log_handler_k8s import KubernetesJobLogHandler \ No newline at end of file diff --git a/scrapyd_k8s/joblogs/log_handler_k8s.py b/scrapyd_k8s/joblogs/log_handler_k8s.py index f20cd37..7102ed6 100644 --- a/scrapyd_k8s/joblogs/log_handler_k8s.py +++ b/scrapyd_k8s/joblogs/log_handler_k8s.py @@ -63,25 +63,7 @@ def __init__(self, config): self.pod_tmp_mapping = {} self.namespace = config.namespace() self.num_lines_to_check = int(config.joblogs().get('num_lines_to_check', 0)) - self.object_storage_provider = None - - def start(self): - """ - Starts the pod watcher thread for job logs. - - Returns - ------- - None - """ - if self.config.joblogs() and self.config.joblogs().get('storage_provider') is not None: - pod_watcher_thread = threading.Thread( - target=self.watch_pods - ) - pod_watcher_thread.daemon = True - pod_watcher_thread.start() - logger.info("Started pod watcher thread for job logs.") - else: - logger.warning("No storage provider configured; job logs will not be uploaded.") + self.object_storage_provider = LibcloudObjectStorage(self.config) def get_last_n_lines(self, file_path, num_lines): """ @@ -236,7 +218,7 @@ def stream_logs(self, job_name): except Exception as e: logger.exception(f"Error streaming logs for job '{job_name}': {e}") - def watch_pods(self): + def handle_events(self, event): """ Watches Kubernetes pods and handles events such as starting log streaming or uploading logs. @@ -245,36 +227,34 @@ def watch_pods(self): None """ self.object_storage_provider = LibcloudObjectStorage(self.config) - w = watch.Watch() - v1 = client.CoreV1Api() try: - for event in w.stream(v1.list_namespaced_pod, namespace=self.namespace): - pod = event['object'] - if pod.metadata.labels.get("org.scrapy.job_id"): - job_id = pod.metadata.labels.get("org.scrapy.job_id") - pod_name = pod.metadata.name - thread_name = f"{self.namespace}_{pod_name}" - if pod.status.phase == 'Running': - if (thread_name in self.watcher_threads - and self.watcher_threads[thread_name] is not None - and self.watcher_threads[thread_name].is_alive()): - pass - else: - self.watcher_threads[thread_name] = threading.Thread( - target=self.stream_logs, - args=(pod_name,) - ) - self.watcher_threads[thread_name].start() - elif pod.status.phase in ['Succeeded', 'Failed']: - log_filename = self.pod_tmp_mapping.get(pod_name) - if log_filename is not None and os.path.isfile(log_filename) and os.path.getsize(log_filename) > 0: - if self.object_storage_provider.object_exists(job_id): - logger.info(f"Log file for job '{job_id}' already exists in storage.") - else: - self.object_storage_provider.upload_file(log_filename) + + pod = event['object'] + if pod.metadata.labels.get("org.scrapy.job_id"): + job_id = pod.metadata.labels.get("org.scrapy.job_id") + pod_name = pod.metadata.name + thread_name = f"{self.namespace}_{pod_name}" + if pod.status.phase == 'Running': + if (thread_name in self.watcher_threads + and self.watcher_threads[thread_name] is not None + and self.watcher_threads[thread_name].is_alive()): + pass + else: + self.watcher_threads[thread_name] = threading.Thread( + target=self.stream_logs, + args=(pod_name,) + ) + self.watcher_threads[thread_name].start() + elif pod.status.phase in ['Succeeded', 'Failed']: + log_filename = self.pod_tmp_mapping.get(pod_name) + if log_filename is not None and os.path.isfile(log_filename) and os.path.getsize(log_filename) > 0: + if self.object_storage_provider.object_exists(job_id): + logger.info(f"Log file for job '{job_id}' already exists in storage.") else: - logger.info(f"Logfile not found for job '{job_id}'") - else: - logger.debug(f"Other pod event type '{event['type']}' for pod '{pod.metadata.name}' - Phase: '{pod.status.phase}'") + self.object_storage_provider.upload_file(log_filename) + else: + logger.info(f"Logfile not found for job '{job_id}'") + else: + logger.debug(f"Other pod event type '{event['type']}' for pod '{pod.metadata.name}' - Phase: '{pod.status.phase}'") except Exception as e: logger.exception(f"Error watching pods in namespace '{self.namespace}': {e}") diff --git a/scrapyd_k8s/k8s_resource_watcher.py b/scrapyd_k8s/k8s_resource_watcher.py new file mode 100644 index 0000000..9a9d89e --- /dev/null +++ b/scrapyd_k8s/k8s_resource_watcher.py @@ -0,0 +1,151 @@ +import threading +import logging +import time +from kubernetes import client, watch +from typing import Callable, List +import urllib3 + +logger = logging.getLogger(__name__) + +class ResourceWatcher: + """ + Watches Kubernetes pod events and notifies subscribers about relevant events. + + Attributes + ---------- + namespace : str + Kubernetes namespace to watch pods in. + subscribers : List[Callable] + List of subscriber callback functions to notify on events. + """ + + def __init__(self, namespace, config): + """ + Initializes the ResourceWatcher. + + Parameters + ---------- + namespace : str + Kubernetes namespace to watch pods in. + """ + self.namespace = namespace + self.reconnection_attempts = int(config.scrapyd().get('reconnection_attempts', 5)) + self.backoff_time = int(config.scrapyd().get('backoff_time', 5)) + self.backoff_coefficient = int(config.scrapyd().get('backoff_coefficient', 2)) + self.subscribers: List[Callable] = [] + self._stop_event = threading.Event() + self.watcher_thread = threading.Thread(target=self.watch_pods, daemon=True) + self.watcher_thread.start() + logger.info(f"ResourceWatcher thread started for namespace '{self.namespace}'.") + + def subscribe(self, callback: Callable): + """ + Adds a subscriber callback to be notified on events. + + Parameters + ---------- + callback : Callable + A function to call when an event is received. + """ + if callback not in self.subscribers: + self.subscribers.append(callback) + logger.debug(f"Subscriber {callback.__name__} added.") + + def unsubscribe(self, callback: Callable): + """ + Removes a subscriber callback. + + Parameters + ---------- + callback : Callable + The subscriber function to remove. + """ + if callback in self.subscribers: + self.subscribers.remove(callback) + logger.debug(f"Subscriber {callback.__name__} removed.") + + def notify_subscribers(self, event: dict): + """ + Notifies all subscribers about an event. + + Parameters + ---------- + event : dict + The Kubernetes event data. + """ + for subscriber in self.subscribers: + try: + subscriber(event) + except Exception as e: + logger.exception(f"Error notifying subscriber {subscriber.__name__}: {e}") + + def watch_pods(self): + """ + Watches Kubernetes pod events and notifies subscribers. + Runs in a separate thread. + """ + v1 = client.CoreV1Api() + w = watch.Watch() + resource_version = None + + logger.info(f"Started watching pods in namespace '{self.namespace}'.") + backoff_time = self.backoff_time + reconnection_attempts = self.reconnection_attempts + while not self._stop_event.is_set() and reconnection_attempts > 0: + try: + kwargs = { + 'namespace': self.namespace, + 'timeout_seconds': 0, + } + if resource_version: + kwargs['resource_version'] = resource_version + first_event = True + for event in w.stream(v1.list_namespaced_pod, **kwargs): + if first_event: + # Reset reconnection attempts and backoff time upon successful reconnection + reconnection_attempts = self.reconnection_attempts + backoff_time = self.backoff_time + first_event = False # Ensure this only happens once per connection + pod_name = event['object'].metadata.name + resource_version = event['object'].metadata.resource_version + event_type = event['type'] + logger.debug(f"Received event: {event_type} for pod: {pod_name}") + self.notify_subscribers(event) + except (urllib3.exceptions.ProtocolError, + urllib3.exceptions.ReadTimeoutError, + urllib3.exceptions.ConnectionError) as e: + reconnection_attempts -= 1 + logger.exception(f"Encountered network error: {e}") + logger.info(f"Retrying to watch pods after {backoff_time} seconds...") + time.sleep(backoff_time) + backoff_time *= self.backoff_coefficient + except client.ApiException as e: + # Resource version is too old and cannot be accessed anymore + if e.status == 410: + logger.error("Received 410 Gone error, resetting resource_version and restarting watch.") + resource_version = None + continue + else: + reconnection_attempts -= 1 + logger.exception(f"Encountered ApiException: {e}") + logger.info(f"Retrying to watch pods after {backoff_time} seconds...") + time.sleep(backoff_time) + backoff_time *= self.backoff_coefficient + except StopIteration: + logger.info("Watch stream ended, restarting watch.") + continue + except Exception as e: + reconnection_attempts -= 1 + logger.exception(f"Watcher encountered exception: {e}") + logger.info(f"Retrying to watch pods after {backoff_time} seconds...") + time.sleep(backoff_time) + backoff_time *= self.backoff_coefficient + + + def stop(self): + """ + Stops the watcher thread gracefully. + """ + self._stop_event.set() + self.watcher_thread.join() + logger.info(f"ResourceWatcher thread stopped for namespace '{self.namespace}'.") diff --git a/scrapyd_k8s/launcher/k8s.py b/scrapyd_k8s/launcher/k8s.py index 76910e7..00ed30a 100644 --- a/scrapyd_k8s/launcher/k8s.py +++ b/scrapyd_k8s/launcher/k8s.py @@ -1,11 +1,15 @@ import os +import logging import kubernetes import kubernetes.stream from signal import Signals +from ..k8s_resource_watcher import ResourceWatcher from ..utils import format_datetime_object, native_stringify_dict -from scrapyd_k8s.joblogs import joblogs_init +from scrapyd_k8s.joblogs import KubernetesJobLogHandler + +logger = logging.getLogger(__name__) class K8s: @@ -25,6 +29,16 @@ def __init__(self, config): self._k8s = kubernetes.client.CoreV1Api() self._k8s_batch = kubernetes.client.BatchV1Api() + self._init_resource_watcher(config) + + def _init_resource_watcher(self, config): + self.resource_watcher = ResourceWatcher(self._namespace, config) + + if config.joblogs() is not None: + self.enable_joblogs(config) + else: + logger.debug("Job logs handling not enabled; 'joblogs' configuration section is missing.") + def get_node_name(self): deployment = os.getenv('MY_DEPLOYMENT_NAME', 'default') namespace = os.getenv('MY_NAMESPACE') @@ -122,7 +136,14 @@ def cancel(self, project, job_id, signal): return prevstate def enable_joblogs(self, config): - joblogs_init(config) + joblogs_config = config.joblogs() + if joblogs_config and joblogs_config.get('storage_provider') is not None: + log_handler = KubernetesJobLogHandler(config) + self.resource_watcher.subscribe(log_handler.handle_events) + logger.info("Job logs handler started.") + else: + logger.warning("No storage provider configured; job logs will not be uploaded.") + def _parse_job(self, job): state = self._k8s_job_to_scrapyd_status(job) diff --git a/scrapyd_k8s/object_storage/libcloud_driver.py b/scrapyd_k8s/object_storage/libcloud_driver.py index 90cb02a..6d33976 100644 --- a/scrapyd_k8s/object_storage/libcloud_driver.py +++ b/scrapyd_k8s/object_storage/libcloud_driver.py @@ -165,8 +165,9 @@ def object_exists(self, prefix): ---- Logs information about the existence check or errors encountered. """ + container = self.driver.get_container(container_name=self._container_name) try: - objects = self.driver.list_container_objects(container=self._container_name, prefix=prefix) + objects = self.driver.list_container_objects(container=container, prefix=prefix) if objects: logger.debug(f"At least one object with prefix '{prefix}' exists in container '{self._container_name}'.") return True