From fdc0a6d69c8d97c3b14f06e6ecf367a6c0423a78 Mon Sep 17 00:00:00 2001 From: Valeriia Klestova Date: Fri, 1 Nov 2024 16:48:15 +0100 Subject: [PATCH 1/6] extracted resource watching logic into a separate class; implemented logic for observer in a RecourceWatcher class; added method to stop a thread gracefully --- scrapyd_k8s.sample-k8s.conf | 3 + scrapyd_k8s/joblogs/__init__.py | 4 + scrapyd_k8s/joblogs/log_handler_k8s.py | 60 +++++++------- scrapyd_k8s/k8s_resource_watcher.py | 106 +++++++++++++++++++++++++ 4 files changed, 142 insertions(+), 31 deletions(-) create mode 100644 scrapyd_k8s/k8s_resource_watcher.py diff --git a/scrapyd_k8s.sample-k8s.conf b/scrapyd_k8s.sample-k8s.conf index 44aa99e..9ab3536 100644 --- a/scrapyd_k8s.sample-k8s.conf +++ b/scrapyd_k8s.sample-k8s.conf @@ -19,6 +19,9 @@ namespace = default # Optional pull secret, in case you have private spiders. #pull_secret = ghcr-registry +# Maximum number of jobs running in parallel +max_proc = 10 + # For each project, define a project section. # This contains a repository that points to the remote container repository. # An optional env_secret is the name of a secret with additional environment diff --git a/scrapyd_k8s/joblogs/__init__.py b/scrapyd_k8s/joblogs/__init__.py index 483fb7b..2a7b597 100644 --- a/scrapyd_k8s/joblogs/__init__.py +++ b/scrapyd_k8s/joblogs/__init__.py @@ -1,5 +1,6 @@ import logging from scrapyd_k8s.joblogs.log_handler_k8s import KubernetesJobLogHandler +from scrapyd_k8s.k8s_resource_watcher import ResourceWatcher logger = logging.getLogger(__name__) @@ -16,10 +17,13 @@ def joblogs_init(config): ------- None """ + namespace = config.namespace() + resource_watcher = ResourceWatcher(namespace) joblogs_config = config.joblogs() if joblogs_config and joblogs_config.get('storage_provider') is not None: log_handler = KubernetesJobLogHandler(config) log_handler.start() + resource_watcher.subscribe(log_handler.handle_events) logger.info("Job logs handler started.") else: logger.warning("No storage provider configured; job logs will not be uploaded.") diff --git a/scrapyd_k8s/joblogs/log_handler_k8s.py b/scrapyd_k8s/joblogs/log_handler_k8s.py index f20cd37..2241052 100644 --- a/scrapyd_k8s/joblogs/log_handler_k8s.py +++ b/scrapyd_k8s/joblogs/log_handler_k8s.py @@ -75,7 +75,7 @@ def start(self): """ if self.config.joblogs() and self.config.joblogs().get('storage_provider') is not None: pod_watcher_thread = threading.Thread( - target=self.watch_pods + target=self.handle_events ) pod_watcher_thread.daemon = True pod_watcher_thread.start() @@ -236,7 +236,7 @@ def stream_logs(self, job_name): except Exception as e: logger.exception(f"Error streaming logs for job '{job_name}': {e}") - def watch_pods(self): + def handle_events(self, event): """ Watches Kubernetes pods and handles events such as starting log streaming or uploading logs. @@ -245,36 +245,34 @@ def watch_pods(self): None """ self.object_storage_provider = LibcloudObjectStorage(self.config) - w = watch.Watch() - v1 = client.CoreV1Api() try: - for event in w.stream(v1.list_namespaced_pod, namespace=self.namespace): - pod = event['object'] - if pod.metadata.labels.get("org.scrapy.job_id"): - job_id = pod.metadata.labels.get("org.scrapy.job_id") - pod_name = pod.metadata.name - thread_name = f"{self.namespace}_{pod_name}" - if pod.status.phase == 'Running': - if (thread_name in self.watcher_threads - and self.watcher_threads[thread_name] is not None - and self.watcher_threads[thread_name].is_alive()): - pass - else: - self.watcher_threads[thread_name] = threading.Thread( - target=self.stream_logs, - args=(pod_name,) - ) - self.watcher_threads[thread_name].start() - elif pod.status.phase in ['Succeeded', 'Failed']: - log_filename = self.pod_tmp_mapping.get(pod_name) - if log_filename is not None and os.path.isfile(log_filename) and os.path.getsize(log_filename) > 0: - if self.object_storage_provider.object_exists(job_id): - logger.info(f"Log file for job '{job_id}' already exists in storage.") - else: - self.object_storage_provider.upload_file(log_filename) + + pod = event['object'] + if pod.metadata.labels.get("org.scrapy.job_id"): + job_id = pod.metadata.labels.get("org.scrapy.job_id") + pod_name = pod.metadata.name + thread_name = f"{self.namespace}_{pod_name}" + if pod.status.phase == 'Running': + if (thread_name in self.watcher_threads + and self.watcher_threads[thread_name] is not None + and self.watcher_threads[thread_name].is_alive()): + pass + else: + self.watcher_threads[thread_name] = threading.Thread( + target=self.stream_logs, + args=(pod_name,) + ) + self.watcher_threads[thread_name].start() + elif pod.status.phase in ['Succeeded', 'Failed']: + log_filename = self.pod_tmp_mapping.get(pod_name) + if log_filename is not None and os.path.isfile(log_filename) and os.path.getsize(log_filename) > 0: + if self.object_storage_provider.object_exists(job_id): + logger.info(f"Log file for job '{job_id}' already exists in storage.") else: - logger.info(f"Logfile not found for job '{job_id}'") - else: - logger.debug(f"Other pod event type '{event['type']}' for pod '{pod.metadata.name}' - Phase: '{pod.status.phase}'") + self.object_storage_provider.upload_file(log_filename) + else: + logger.info(f"Logfile not found for job '{job_id}'") + else: + logger.debug(f"Other pod event type '{event['type']}' for pod '{pod.metadata.name}' - Phase: '{pod.status.phase}'") except Exception as e: logger.exception(f"Error watching pods in namespace '{self.namespace}': {e}") diff --git a/scrapyd_k8s/k8s_resource_watcher.py b/scrapyd_k8s/k8s_resource_watcher.py new file mode 100644 index 0000000..4429d44 --- /dev/null +++ b/scrapyd_k8s/k8s_resource_watcher.py @@ -0,0 +1,106 @@ +import threading +import logging +import time +from kubernetes import client, watch +from typing import Callable, List + +logger = logging.getLogger(__name__) + +class ResourceWatcher: + """ + Watches Kubernetes pod events and notifies subscribers about relevant events. + + Attributes + ---------- + namespace : str + Kubernetes namespace to watch pods in. + subscribers : List[Callable] + List of subscriber callback functions to notify on events. + """ + + def __init__(self, namespace: str): + """ + Initializes the ResourceWatcher. + + Parameters + ---------- + namespace : str + Kubernetes namespace to watch pods in. + """ + self.namespace = namespace + self.subscribers: List[Callable] = [] + self._stop_event = threading.Event() + self.watcher_thread = threading.Thread(target=self.watch_pods, daemon=True) + self.watcher_thread.start() + logger.info(f"ResourceWatcher thread started for namespace '{self.namespace}'.") + + def subscribe(self, callback: Callable): + """ + Adds a subscriber callback to be notified on events. + + Parameters + ---------- + callback : Callable + A function to call when an event is received. + """ + if callback not in self.subscribers: + self.subscribers.append(callback) + logger.debug(f"Subscriber {callback.__name__} added.") + + def unsubscribe(self, callback: Callable): + """ + Removes a subscriber callback. + + Parameters + ---------- + callback : Callable + The subscriber function to remove. + """ + if callback in self.subscribers: + self.subscribers.remove(callback) + logger.debug(f"Subscriber {callback.__name__} removed.") + + def notify_subscribers(self, event: dict): + """ + Notifies all subscribers about an event. + + Parameters + ---------- + event : dict + The Kubernetes event data. + """ + for subscriber in self.subscribers: + try: + subscriber(event) + except Exception as e: + logger.exception(f"Error notifying subscriber {subscriber.__name__}: {e}") + + def watch_pods(self): + """ + Watches Kubernetes pod events and notifies subscribers. + Runs in a separate thread. + """ + v1 = client.CoreV1Api() + w = watch.Watch() + + logger.info(f"Started watching pods in namespace '{self.namespace}'.") + + while not self._stop_event.is_set(): + try: + for event in w.stream(v1.list_namespaced_pod, namespace=self.namespace, timeout_seconds=0): + pod_name = event['object'].metadata.name + event_type = event['type'] + logger.debug(f"Received event: {event_type} for pod: {pod_name}") + self.notify_subscribers(event) + except Exception as e: + logger.exception(f"Error watching pods in namespace '{self.namespace}': {e}") + logger.info("Retrying to watch pods after a short delay...") + time.sleep(5) + + def stop(self): + """ + Stops the watcher thread gracefully. + """ + self._stop_event.set() + self.watcher_thread.join() + logger.info(f"ResourceWatcher thread stopped for namespace '{self.namespace}'.") From 768757bcfb9ff0e49692d5c73467627c2507f555 Mon Sep 17 00:00:00 2001 From: Valeriia Klestova Date: Thu, 7 Nov 2024 17:48:30 +0100 Subject: [PATCH 2/6] finish the observer logic extraction; merge changes from main; add resource watcher instance to enable_joblogs to subscribe to the event watcher if the log feature is configured; delete logic about event watcher from main; pass container for list objects function instead of container name; remove start methon from log handler class; modify joblogs init to subscribe to event watcher --- scrapyd_k8s/__main__.py | 4 +--- scrapyd_k8s/api.py | 2 +- scrapyd_k8s/joblogs/__init__.py | 4 +--- scrapyd_k8s/joblogs/log_handler_k8s.py | 20 +------------------ scrapyd_k8s/launcher/k8s.py | 4 ++-- scrapyd_k8s/object_storage/libcloud_driver.py | 3 ++- 6 files changed, 8 insertions(+), 29 deletions(-) diff --git a/scrapyd_k8s/__main__.py b/scrapyd_k8s/__main__.py index 6620da8..098fe6c 100644 --- a/scrapyd_k8s/__main__.py +++ b/scrapyd_k8s/__main__.py @@ -1,7 +1,6 @@ import logging import sys -from .api import run, config -from .joblogs import joblogs_init +from .api import run def setup_logging(): logging.basicConfig( @@ -14,5 +13,4 @@ def setup_logging(): if __name__ == "__main__": setup_logging() - joblogs_init(config) run() diff --git a/scrapyd_k8s/api.py b/scrapyd_k8s/api.py index dc45740..8725d5b 100644 --- a/scrapyd_k8s/api.py +++ b/scrapyd_k8s/api.py @@ -156,7 +156,7 @@ def run(): enable_authentication(app, config_username, config_password) if config.joblogs() is not None: - launcher.enable_joblogs(config) + launcher.enable_joblogs(config, resource_watcher) logger.info("Job logs handling enabled.") else: logger.debug("Job logs handling not enabled; 'joblogs' configuration section is missing.") diff --git a/scrapyd_k8s/joblogs/__init__.py b/scrapyd_k8s/joblogs/__init__.py index 2a7b597..a82df1a 100644 --- a/scrapyd_k8s/joblogs/__init__.py +++ b/scrapyd_k8s/joblogs/__init__.py @@ -4,7 +4,7 @@ logger = logging.getLogger(__name__) -def joblogs_init(config): +def joblogs_init(config, resource_watcher): """ Initializes job logs handling by starting the Kubernetes job log handler. @@ -18,11 +18,9 @@ def joblogs_init(config): None """ namespace = config.namespace() - resource_watcher = ResourceWatcher(namespace) joblogs_config = config.joblogs() if joblogs_config and joblogs_config.get('storage_provider') is not None: log_handler = KubernetesJobLogHandler(config) - log_handler.start() resource_watcher.subscribe(log_handler.handle_events) logger.info("Job logs handler started.") else: diff --git a/scrapyd_k8s/joblogs/log_handler_k8s.py b/scrapyd_k8s/joblogs/log_handler_k8s.py index 2241052..7102ed6 100644 --- a/scrapyd_k8s/joblogs/log_handler_k8s.py +++ b/scrapyd_k8s/joblogs/log_handler_k8s.py @@ -63,25 +63,7 @@ def __init__(self, config): self.pod_tmp_mapping = {} self.namespace = config.namespace() self.num_lines_to_check = int(config.joblogs().get('num_lines_to_check', 0)) - self.object_storage_provider = None - - def start(self): - """ - Starts the pod watcher thread for job logs. - - Returns - ------- - None - """ - if self.config.joblogs() and self.config.joblogs().get('storage_provider') is not None: - pod_watcher_thread = threading.Thread( - target=self.handle_events - ) - pod_watcher_thread.daemon = True - pod_watcher_thread.start() - logger.info("Started pod watcher thread for job logs.") - else: - logger.warning("No storage provider configured; job logs will not be uploaded.") + self.object_storage_provider = LibcloudObjectStorage(self.config) def get_last_n_lines(self, file_path, num_lines): """ diff --git a/scrapyd_k8s/launcher/k8s.py b/scrapyd_k8s/launcher/k8s.py index 76910e7..d9408cf 100644 --- a/scrapyd_k8s/launcher/k8s.py +++ b/scrapyd_k8s/launcher/k8s.py @@ -121,8 +121,8 @@ def cancel(self, project, job_id, signal): ) return prevstate - def enable_joblogs(self, config): - joblogs_init(config) + def enable_joblogs(self, config, resource_watcher): + joblogs_init(config, resource_watcher) def _parse_job(self, job): state = self._k8s_job_to_scrapyd_status(job) diff --git a/scrapyd_k8s/object_storage/libcloud_driver.py b/scrapyd_k8s/object_storage/libcloud_driver.py index 90cb02a..6d33976 100644 --- a/scrapyd_k8s/object_storage/libcloud_driver.py +++ b/scrapyd_k8s/object_storage/libcloud_driver.py @@ -165,8 +165,9 @@ def object_exists(self, prefix): ---- Logs information about the existence check or errors encountered. """ + container = self.driver.get_container(container_name=self._container_name) try: - objects = self.driver.list_container_objects(container=self._container_name, prefix=prefix) + objects = self.driver.list_container_objects(container=container, prefix=prefix) if objects: logger.debug(f"At least one object with prefix '{prefix}' exists in container '{self._container_name}'.") return True From 8fec9a7d4ac869aca060f647867e24edee1ee3f1 Mon Sep 17 00:00:00 2001 From: Valeriia Klestova Date: Thu, 14 Nov 2024 17:00:02 +0100 Subject: [PATCH 3/6] add number of retry attempts and exponential backoff time to the reconnect loop for event watcher; make number of reconnect attempts, backoff time and a coefficient for exponential growth configurable via config; add backoff_time, reconnection_attempts and backoff_coefficient as attributes to the resource watcher init; add resource_version as a param to w.stream so a failed stream can read from the last resource it was able to catch; add urllib3.exceptions.ProtocolError and handle reconnection after some exponential backoff time to avoid api flooding; add config as a param for init for resource watcher; modify config in kubernetes.yaml and k8s config to contain add backoff_time, reconnection_attempts and backoff_coefficient --- kubernetes.yaml | 5 +++++ scrapyd_k8s.sample-k8s.conf | 10 +++++++++ scrapyd_k8s/api.py | 2 ++ scrapyd_k8s/k8s_resource_watcher.py | 33 +++++++++++++++++++++++------ scrapyd_k8s/launcher/k8s.py | 2 ++ 5 files changed, 45 insertions(+), 7 deletions(-) diff --git a/kubernetes.yaml b/kubernetes.yaml index 11853ba..e852608 100644 --- a/kubernetes.yaml +++ b/kubernetes.yaml @@ -87,6 +87,11 @@ data: launcher = scrapyd_k8s.launcher.K8s namespace = default + + max_proc = 2 + reconnection_attempts = 5 + backoff_time = 5 + backoff_coefficient = 2 # This is an example spider that should work out of the box. # Adapt the spider config to your use-case. diff --git a/scrapyd_k8s.sample-k8s.conf b/scrapyd_k8s.sample-k8s.conf index 9ab3536..8f84802 100644 --- a/scrapyd_k8s.sample-k8s.conf +++ b/scrapyd_k8s.sample-k8s.conf @@ -22,6 +22,16 @@ namespace = default # Maximum number of jobs running in parallel max_proc = 10 +# Number of attempts to reconnect with k8s API to watch events, default is 5 +reconnection_attempts = 5 + +# Minimum time in seconds to wait before reconnecting to k8s API to watch events, default is 5 +backoff_time = 5 + +# Coefficient that is multiplied by backoff_time to provide exponential backoff to prevent k8s API from being overwhelmed +# default is 2, every reconnection attempt will take backoff_time*backoff_coefficient +backoff_coefficient = 2 + # For each project, define a project section. # This contains a repository that points to the remote container repository. # An optional env_secret is the name of a secret with additional environment diff --git a/scrapyd_k8s/api.py b/scrapyd_k8s/api.py index 8725d5b..d7a4766 100644 --- a/scrapyd_k8s/api.py +++ b/scrapyd_k8s/api.py @@ -156,6 +156,8 @@ def run(): enable_authentication(app, config_username, config_password) if config.joblogs() is not None: + namespace = config.namespace() + resource_watcher = ResourceWatcher(namespace, config) launcher.enable_joblogs(config, resource_watcher) logger.info("Job logs handling enabled.") else: diff --git a/scrapyd_k8s/k8s_resource_watcher.py b/scrapyd_k8s/k8s_resource_watcher.py index 4429d44..e87032b 100644 --- a/scrapyd_k8s/k8s_resource_watcher.py +++ b/scrapyd_k8s/k8s_resource_watcher.py @@ -3,6 +3,7 @@ import time from kubernetes import client, watch from typing import Callable, List +import urllib3 logger = logging.getLogger(__name__) @@ -18,7 +19,7 @@ class ResourceWatcher: List of subscriber callback functions to notify on events. """ - def __init__(self, namespace: str): + def __init__(self, namespace, config): """ Initializes the ResourceWatcher. @@ -28,6 +29,9 @@ def __init__(self, namespace: str): Kubernetes namespace to watch pods in. """ self.namespace = namespace + self.reconnection_attempts = int(config.scrapyd().get('reconnection_attempts', 5)) + self.backoff_time = int(config.scrapyd().get('backoff_time', 5)) + self.backoff_coefficient = int(config.scrapyd().get('backoff_coefficient', 2)) self.subscribers: List[Callable] = [] self._stop_event = threading.Event() self.watcher_thread = threading.Thread(target=self.watch_pods, daemon=True) @@ -82,20 +86,35 @@ def watch_pods(self): """ v1 = client.CoreV1Api() w = watch.Watch() + resource_version = None logger.info(f"Started watching pods in namespace '{self.namespace}'.") - - while not self._stop_event.is_set(): + backoff_time = self.backoff_time + reconnection_attempts = self.reconnection_attempts + print(f"RECONNECTION: {reconnection_attempts}") + print(f"BACKOFF TIME: {backoff_time}") + while not self._stop_event.is_set() and reconnection_attempts > 0: try: - for event in w.stream(v1.list_namespaced_pod, namespace=self.namespace, timeout_seconds=0): + kwargs = { + 'namespace': self.namespace, + 'timeout_seconds': 0, + } + if resource_version: + kwargs['resource_version'] = resource_version + + for event in w.stream(v1.list_namespaced_pod, **kwargs): pod_name = event['object'].metadata.name + resource_version = event['object'].metadata.resource_version event_type = event['type'] logger.debug(f"Received event: {event_type} for pod: {pod_name}") self.notify_subscribers(event) - except Exception as e: - logger.exception(f"Error watching pods in namespace '{self.namespace}': {e}") + except urllib3.exceptions.ProtocolError as e: + reconnection_attempts -= 1 + logger.exception(f"Encountered ProtocolError: {e}") logger.info("Retrying to watch pods after a short delay...") - time.sleep(5) + time.sleep(backoff_time) + backoff_time = backoff_time * self.backoff_coefficient + def stop(self): """ diff --git a/scrapyd_k8s/launcher/k8s.py b/scrapyd_k8s/launcher/k8s.py index d9408cf..84130ce 100644 --- a/scrapyd_k8s/launcher/k8s.py +++ b/scrapyd_k8s/launcher/k8s.py @@ -25,6 +25,8 @@ def __init__(self, config): self._k8s = kubernetes.client.CoreV1Api() self._k8s_batch = kubernetes.client.BatchV1Api() + self.resource_watcher = ResourceWatcher(self._namespace, config) + def get_node_name(self): deployment = os.getenv('MY_DEPLOYMENT_NAME', 'default') namespace = os.getenv('MY_NAMESPACE') From 393a0d869af90b69e5b7a67e12926b3a1ecf907b Mon Sep 17 00:00:00 2001 From: Valeriia Klestova Date: Fri, 15 Nov 2024 14:37:12 +0100 Subject: [PATCH 4/6] add logic to reset number of reconnect attempts and backoff time when connection to the k8s was achieved so only sequential failures detected; add exception handling to watch_pods to handle failure in urllib3, when source version is old and not available anymore, and when stream is ended; remove k8s resource watcher initialization from run function in api.py and move it to k8s.py launcher as _init_resource_watcher; refactor existing logic from joblogs/__init__.py to keep it in _init_resource_watcher and enable_joblogs in k8s launcher --- scrapyd_k8s/api.py | 8 ------ scrapyd_k8s/joblogs/__init__.py | 28 +------------------- scrapyd_k8s/k8s_resource_watcher.py | 40 ++++++++++++++++++++++++----- scrapyd_k8s/launcher/k8s.py | 25 +++++++++++++++--- 4 files changed, 56 insertions(+), 45 deletions(-) diff --git a/scrapyd_k8s/api.py b/scrapyd_k8s/api.py index d7a4766..ff1e36c 100644 --- a/scrapyd_k8s/api.py +++ b/scrapyd_k8s/api.py @@ -155,13 +155,5 @@ def run(): if config_username is not None and config_password is not None: enable_authentication(app, config_username, config_password) - if config.joblogs() is not None: - namespace = config.namespace() - resource_watcher = ResourceWatcher(namespace, config) - launcher.enable_joblogs(config, resource_watcher) - logger.info("Job logs handling enabled.") - else: - logger.debug("Job logs handling not enabled; 'joblogs' configuration section is missing.") - # run server app.run(host=host, port=port) diff --git a/scrapyd_k8s/joblogs/__init__.py b/scrapyd_k8s/joblogs/__init__.py index a82df1a..e3632da 100644 --- a/scrapyd_k8s/joblogs/__init__.py +++ b/scrapyd_k8s/joblogs/__init__.py @@ -1,27 +1 @@ -import logging -from scrapyd_k8s.joblogs.log_handler_k8s import KubernetesJobLogHandler -from scrapyd_k8s.k8s_resource_watcher import ResourceWatcher - -logger = logging.getLogger(__name__) - -def joblogs_init(config, resource_watcher): - """ - Initializes job logs handling by starting the Kubernetes job log handler. - - Parameters - ---------- - config : Config - Configuration object containing settings for job logs and storage. - - Returns - ------- - None - """ - namespace = config.namespace() - joblogs_config = config.joblogs() - if joblogs_config and joblogs_config.get('storage_provider') is not None: - log_handler = KubernetesJobLogHandler(config) - resource_watcher.subscribe(log_handler.handle_events) - logger.info("Job logs handler started.") - else: - logger.warning("No storage provider configured; job logs will not be uploaded.") +from scrapyd_k8s.joblogs.log_handler_k8s import KubernetesJobLogHandler \ No newline at end of file diff --git a/scrapyd_k8s/k8s_resource_watcher.py b/scrapyd_k8s/k8s_resource_watcher.py index e87032b..9a9d89e 100644 --- a/scrapyd_k8s/k8s_resource_watcher.py +++ b/scrapyd_k8s/k8s_resource_watcher.py @@ -91,8 +91,6 @@ def watch_pods(self): logger.info(f"Started watching pods in namespace '{self.namespace}'.") backoff_time = self.backoff_time reconnection_attempts = self.reconnection_attempts - print(f"RECONNECTION: {reconnection_attempts}") - print(f"BACKOFF TIME: {backoff_time}") while not self._stop_event.is_set() and reconnection_attempts > 0: try: kwargs = { @@ -101,19 +99,47 @@ def watch_pods(self): } if resource_version: kwargs['resource_version'] = resource_version - + first_event = True for event in w.stream(v1.list_namespaced_pod, **kwargs): + if first_event: + # Reset reconnection attempts and backoff time upon successful reconnection + reconnection_attempts = self.reconnection_attempts + backoff_time = self.backoff_time + first_event = False # Ensure this only happens once per connection pod_name = event['object'].metadata.name resource_version = event['object'].metadata.resource_version event_type = event['type'] logger.debug(f"Received event: {event_type} for pod: {pod_name}") self.notify_subscribers(event) - except urllib3.exceptions.ProtocolError as e: + except (urllib3.exceptions.ProtocolError, + urllib3.exceptions.ReadTimeoutError, + urllib3.exceptions.ConnectionError) as e: + reconnection_attempts -= 1 + logger.exception(f"Encountered network error: {e}") + logger.info(f"Retrying to watch pods after {backoff_time} seconds...") + time.sleep(backoff_time) + backoff_time *= self.backoff_coefficient + except client.ApiException as e: + # Resource version is too old and cannot be accessed anymore + if e.status == 410: + logger.error("Received 410 Gone error, resetting resource_version and restarting watch.") + resource_version = None + continue + else: + reconnection_attempts -= 1 + logger.exception(f"Encountered ApiException: {e}") + logger.info(f"Retrying to watch pods after {backoff_time} seconds...") + time.sleep(backoff_time) + backoff_time *= self.backoff_coefficient + except StopIteration: + logger.info("Watch stream ended, restarting watch.") + continue + except Exception as e: reconnection_attempts -= 1 - logger.exception(f"Encountered ProtocolError: {e}") - logger.info("Retrying to watch pods after a short delay...") + logger.exception(f"Watcher encountered exception: {e}") + logger.info(f"Retrying to watch pods after {backoff_time} seconds...") time.sleep(backoff_time) - backoff_time = backoff_time * self.backoff_coefficient + backoff_time *= self.backoff_coefficient def stop(self): diff --git a/scrapyd_k8s/launcher/k8s.py b/scrapyd_k8s/launcher/k8s.py index 84130ce..00ed30a 100644 --- a/scrapyd_k8s/launcher/k8s.py +++ b/scrapyd_k8s/launcher/k8s.py @@ -1,11 +1,15 @@ import os +import logging import kubernetes import kubernetes.stream from signal import Signals +from ..k8s_resource_watcher import ResourceWatcher from ..utils import format_datetime_object, native_stringify_dict -from scrapyd_k8s.joblogs import joblogs_init +from scrapyd_k8s.joblogs import KubernetesJobLogHandler + +logger = logging.getLogger(__name__) class K8s: @@ -25,8 +29,16 @@ def __init__(self, config): self._k8s = kubernetes.client.CoreV1Api() self._k8s_batch = kubernetes.client.BatchV1Api() + self._init_resource_watcher(config) + + def _init_resource_watcher(self, config): self.resource_watcher = ResourceWatcher(self._namespace, config) + if config.joblogs() is not None: + self.enable_joblogs(config) + else: + logger.debug("Job logs handling not enabled; 'joblogs' configuration section is missing.") + def get_node_name(self): deployment = os.getenv('MY_DEPLOYMENT_NAME', 'default') namespace = os.getenv('MY_NAMESPACE') @@ -123,8 +135,15 @@ def cancel(self, project, job_id, signal): ) return prevstate - def enable_joblogs(self, config, resource_watcher): - joblogs_init(config, resource_watcher) + def enable_joblogs(self, config): + joblogs_config = config.joblogs() + if joblogs_config and joblogs_config.get('storage_provider') is not None: + log_handler = KubernetesJobLogHandler(config) + self.resource_watcher.subscribe(log_handler.handle_events) + logger.info("Job logs handler started.") + else: + logger.warning("No storage provider configured; job logs will not be uploaded.") + def _parse_job(self, job): state = self._k8s_job_to_scrapyd_status(job) From 515642fca96338aee05d7b2a891da4b933dc3c9d Mon Sep 17 00:00:00 2001 From: Valeriia Klestova Date: Tue, 19 Nov 2024 11:53:54 +0100 Subject: [PATCH 5/6] added a CONFIG.md file with detailed explanations about parameters used to re-establish connection to the Kubernetes wather --- CONFIG.md | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 CONFIG.md diff --git a/CONFIG.md b/CONFIG.md new file mode 100644 index 0000000..d5022d8 --- /dev/null +++ b/CONFIG.md @@ -0,0 +1,26 @@ +## About +This file provides you with the detailed description of parameters listed in the config file, and explaining why they are used +and when you are expected to provide or change them. + +## [scrapyd] section, reconnection_attempts, backoff_time, backoff_coefficient + +### Context +The Kubernetes event watcher is used in the code as part of the joblogs feature and is also utilized for limiting the +number of jobs running in parallel on the cluster. Both features are not enabled by default and can be activated if you +choose to use them. + +The event watcher establishes a connection to the Kubernetes API and receives a stream of events from it. However, the +nature of this long-lived connection is unstable; it can be interrupted by network issues, proxies configured to terminate +long-lived connections, and other factors. For this reason, a mechanism was implemented to re-establish the long-lived +connection to the Kubernetes API. To achieve this, three parameters were introduced: `reconnection_attempts`, +`backoff_time` and `backoff_coefficient`. + +### What are these parameters about? +- `reconnection_attempts` - defines how many consecutive attempts will be made to reconnect if the connection fails; +- `backoff_time` and `backoff_coefficient` - are used to gradually slow down each subsequent attempt to establish a +connection with the Kubernetes API, preventing the API from becoming overloaded with requests. The `backoff_time` increases +exponentially and is calculated as `backoff_time *= self.backoff_coefficient`. + +### When do I need to change it in the config file? +Default values for these parameters are provided in the code and are tuned to an "average" cluster setting. If your network +requirements or other conditions are unusual, you may need to adjust these values to better suit your specific setup. \ No newline at end of file From 8fdcb6075e1dc1de8620db166c19ffde41630659 Mon Sep 17 00:00:00 2001 From: Valeriia Klestova Date: Tue, 19 Nov 2024 13:28:20 +0100 Subject: [PATCH 6/6] move section about config file from README.md to CONFIG.md; add a link to the CONFIG.md in the README.md; remove variables for reconnection_attempts, backoff_time and backoff_coefficient fron the sample config since default values are provided in the code. --- CONFIG.md | 12 ++++++++++++ README.md | 11 +---------- kubernetes.yaml | 3 --- scrapyd_k8s.sample-k8s.conf | 10 ---------- 4 files changed, 13 insertions(+), 23 deletions(-) diff --git a/CONFIG.md b/CONFIG.md index d5022d8..767c287 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -2,6 +2,18 @@ This file provides you with the detailed description of parameters listed in the config file, and explaining why they are used and when you are expected to provide or change them. +## Configuration file + +* `http_port` - defaults to `6800` ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#http-port)) +* `bind_address` - defaults to `127.0.0.1` ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#bind-address)) +* `max_proc` - _(implementation pending)_, if unset or `0` it will use the number of nodes in the cluster, defaults to `0` ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#max-proc)) +* `repository` - Python class for accessing the image repository, defaults to `scrapyd_k8s.repository.Remote` +* `launcher` - Python class for managing jobs on the cluster, defaults to `scrapyd_k8s.launcher.K8s` +* `username` - Set this and `password` to enable basic authentication ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#username)) +* `password` - Set this and `username` to enable basic authentication ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#password)) + +The Docker and Kubernetes launchers have their own additional options. + ## [scrapyd] section, reconnection_attempts, backoff_time, backoff_coefficient ### Context diff --git a/README.md b/README.md index f53ebec..a495739 100644 --- a/README.md +++ b/README.md @@ -241,16 +241,7 @@ Not supported, by design. If you want to delete a project, remove it from the configuration file. ## Configuration file - -* `http_port` - defaults to `6800` ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#http-port)) -* `bind_address` - defaults to `127.0.0.1` ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#bind-address)) -* `max_proc` - _(implementation pending)_, if unset or `0` it will use the number of nodes in the cluster, defaults to `0` ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#max-proc)) -* `repository` - Python class for accessing the image repository, defaults to `scrapyd_k8s.repository.Remote` -* `launcher` - Python class for managing jobs on the cluster, defaults to `scrapyd_k8s.launcher.K8s` -* `username` - Set this and `password` to enable basic authentication ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#username)) -* `password` - Set this and `username` to enable basic authentication ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#password)) - -The Docker and Kubernetes launchers have their own additional options. +To read in detail about the config file, please, navigate to the [Configuration Guide](CONFIG.md) ## License diff --git a/kubernetes.yaml b/kubernetes.yaml index e852608..75813a6 100644 --- a/kubernetes.yaml +++ b/kubernetes.yaml @@ -89,9 +89,6 @@ data: namespace = default max_proc = 2 - reconnection_attempts = 5 - backoff_time = 5 - backoff_coefficient = 2 # This is an example spider that should work out of the box. # Adapt the spider config to your use-case. diff --git a/scrapyd_k8s.sample-k8s.conf b/scrapyd_k8s.sample-k8s.conf index 8f84802..9ab3536 100644 --- a/scrapyd_k8s.sample-k8s.conf +++ b/scrapyd_k8s.sample-k8s.conf @@ -22,16 +22,6 @@ namespace = default # Maximum number of jobs running in parallel max_proc = 10 -# Number of attempts to reconnect with k8s API to watch events, default is 5 -reconnection_attempts = 5 - -# Minimum time in seconds to wait before reconnecting to k8s API to watch events, default is 5 -backoff_time = 5 - -# Coefficient that is multiplied by backoff_time to provide exponential backoff to prevent k8s API from being overwhelmed -# default is 2, every reconnection attempt will take backoff_time*backoff_coefficient -backoff_coefficient = 2 - # For each project, define a project section. # This contains a repository that points to the remote container repository. # An optional env_secret is the name of a secret with additional environment