Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

6 scheduling observer extraction #38

Merged
merged 6 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ data:
launcher = scrapyd_k8s.launcher.K8s

namespace = default

max_proc = 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is max_proc here 2, and in scrapyd_k8s.sample-k8s.conf 10?

reconnection_attempts = 5
backoff_time = 5
backoff_coefficient = 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have sensible defaults, and not have to think about this yet?
Then we can add a more detailed configuration documentation to document this. I think ideally, no one would need to touch these (unless you have very different network requirements) - I'd like the default to work for most cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a section in the README would make sense here. Though might it be time to create a new file explaining the configuration options in more detail? That could give some more freedom to explain them in more detail.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are defaults in the code, but they need to be "tested" to see if it's enough for our prod cluster, I suspect that it might be possible that we need to increase the number of attempts, but I tried to make it resilient in a sense that every time the connection was successfully re-established, the number of attempts sets to provided default number, so we really catch the cases when we have several connection breaks in a row.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I do agree that a section about the config file is needed. How do you see this new file, just CONFIG.md or something else as part of the repo?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, CONFIG.md sounds fine 👍

Copy link
Member

@wvengen wvengen Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super, thanks! Pending:

  • Remove the variables from the sample config files (defaults are meant to be ok, and are documented)
  • Move config info from README.md to CONFIG.md
  • Make sure CONFIG.md is linked to from README.md at a sensible place.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the feedback, done!


# This is an example spider that should work out of the box.
# Adapt the spider config to your use-case.
Expand Down
13 changes: 13 additions & 0 deletions scrapyd_k8s.sample-k8s.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ namespace = default
# Optional pull secret, in case you have private spiders.
#pull_secret = ghcr-registry

# Maximum number of jobs running in parallel
max_proc = 10

# Number of attempts to reconnect with k8s API to watch events, default is 5
reconnection_attempts = 5

# Minimum time in seconds to wait before reconnecting to k8s API to watch events, default is 5
backoff_time = 5

# Coefficient that is multiplied by backoff_time to provide exponential backoff to prevent k8s API from being overwhelmed
# default is 2, every reconnection attempt will take backoff_time*backoff_coefficient
backoff_coefficient = 2

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(see previous comment)

# For each project, define a project section.
# This contains a repository that points to the remote container repository.
# An optional env_secret is the name of a secret with additional environment
Expand Down
4 changes: 1 addition & 3 deletions scrapyd_k8s/__main__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
import sys
from .api import run, config
from .joblogs import joblogs_init
from .api import run

def setup_logging():
logging.basicConfig(
Expand All @@ -14,5 +13,4 @@ def setup_logging():

if __name__ == "__main__":
setup_logging()
joblogs_init(config)
run()
6 changes: 0 additions & 6 deletions scrapyd_k8s/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,5 @@ def run():
if config_username is not None and config_password is not None:
enable_authentication(app, config_username, config_password)

if config.joblogs() is not None:
launcher.enable_joblogs(config)
logger.info("Job logs handling enabled.")
else:
logger.debug("Job logs handling not enabled; 'joblogs' configuration section is missing.")

# run server
app.run(host=host, port=port)
26 changes: 1 addition & 25 deletions scrapyd_k8s/joblogs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,25 +1 @@
import logging
from scrapyd_k8s.joblogs.log_handler_k8s import KubernetesJobLogHandler

logger = logging.getLogger(__name__)

def joblogs_init(config):
"""
Initializes job logs handling by starting the Kubernetes job log handler.

Parameters
----------
config : Config
Configuration object containing settings for job logs and storage.

Returns
-------
None
"""
joblogs_config = config.joblogs()
if joblogs_config and joblogs_config.get('storage_provider') is not None:
log_handler = KubernetesJobLogHandler(config)
log_handler.start()
logger.info("Job logs handler started.")
else:
logger.warning("No storage provider configured; job logs will not be uploaded.")
from scrapyd_k8s.joblogs.log_handler_k8s import KubernetesJobLogHandler
78 changes: 29 additions & 49 deletions scrapyd_k8s/joblogs/log_handler_k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,7 @@ def __init__(self, config):
self.pod_tmp_mapping = {}
self.namespace = config.namespace()
self.num_lines_to_check = int(config.joblogs().get('num_lines_to_check', 0))
self.object_storage_provider = None

def start(self):
"""
Starts the pod watcher thread for job logs.

Returns
-------
None
"""
if self.config.joblogs() and self.config.joblogs().get('storage_provider') is not None:
pod_watcher_thread = threading.Thread(
target=self.watch_pods
)
pod_watcher_thread.daemon = True
pod_watcher_thread.start()
logger.info("Started pod watcher thread for job logs.")
else:
logger.warning("No storage provider configured; job logs will not be uploaded.")
self.object_storage_provider = LibcloudObjectStorage(self.config)

def get_last_n_lines(self, file_path, num_lines):
"""
Expand Down Expand Up @@ -236,7 +218,7 @@ def stream_logs(self, job_name):
except Exception as e:
logger.exception(f"Error streaming logs for job '{job_name}': {e}")

def watch_pods(self):
def handle_events(self, event):
"""
Watches Kubernetes pods and handles events such as starting log streaming or uploading logs.

Expand All @@ -245,36 +227,34 @@ def watch_pods(self):
None
"""
self.object_storage_provider = LibcloudObjectStorage(self.config)
w = watch.Watch()
v1 = client.CoreV1Api()
try:
for event in w.stream(v1.list_namespaced_pod, namespace=self.namespace):
pod = event['object']
if pod.metadata.labels.get("org.scrapy.job_id"):
job_id = pod.metadata.labels.get("org.scrapy.job_id")
pod_name = pod.metadata.name
thread_name = f"{self.namespace}_{pod_name}"
if pod.status.phase == 'Running':
if (thread_name in self.watcher_threads
and self.watcher_threads[thread_name] is not None
and self.watcher_threads[thread_name].is_alive()):
pass
else:
self.watcher_threads[thread_name] = threading.Thread(
target=self.stream_logs,
args=(pod_name,)
)
self.watcher_threads[thread_name].start()
elif pod.status.phase in ['Succeeded', 'Failed']:
log_filename = self.pod_tmp_mapping.get(pod_name)
if log_filename is not None and os.path.isfile(log_filename) and os.path.getsize(log_filename) > 0:
if self.object_storage_provider.object_exists(job_id):
logger.info(f"Log file for job '{job_id}' already exists in storage.")
else:
self.object_storage_provider.upload_file(log_filename)

pod = event['object']
if pod.metadata.labels.get("org.scrapy.job_id"):
job_id = pod.metadata.labels.get("org.scrapy.job_id")
pod_name = pod.metadata.name
thread_name = f"{self.namespace}_{pod_name}"
if pod.status.phase == 'Running':
if (thread_name in self.watcher_threads
and self.watcher_threads[thread_name] is not None
and self.watcher_threads[thread_name].is_alive()):
pass
else:
self.watcher_threads[thread_name] = threading.Thread(
target=self.stream_logs,
args=(pod_name,)
)
self.watcher_threads[thread_name].start()
elif pod.status.phase in ['Succeeded', 'Failed']:
log_filename = self.pod_tmp_mapping.get(pod_name)
if log_filename is not None and os.path.isfile(log_filename) and os.path.getsize(log_filename) > 0:
if self.object_storage_provider.object_exists(job_id):
logger.info(f"Log file for job '{job_id}' already exists in storage.")
else:
logger.info(f"Logfile not found for job '{job_id}'")
else:
logger.debug(f"Other pod event type '{event['type']}' for pod '{pod.metadata.name}' - Phase: '{pod.status.phase}'")
self.object_storage_provider.upload_file(log_filename)
else:
logger.info(f"Logfile not found for job '{job_id}'")
else:
logger.debug(f"Other pod event type '{event['type']}' for pod '{pod.metadata.name}' - Phase: '{pod.status.phase}'")
except Exception as e:
logger.exception(f"Error watching pods in namespace '{self.namespace}': {e}")
151 changes: 151 additions & 0 deletions scrapyd_k8s/k8s_resource_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import threading
import logging
import time
from kubernetes import client, watch
from typing import Callable, List
import urllib3

logger = logging.getLogger(__name__)

class ResourceWatcher:
"""
Watches Kubernetes pod events and notifies subscribers about relevant events.

Attributes
----------
namespace : str
Kubernetes namespace to watch pods in.
subscribers : List[Callable]
List of subscriber callback functions to notify on events.
"""

def __init__(self, namespace, config):
"""
Initializes the ResourceWatcher.

Parameters
----------
namespace : str
Kubernetes namespace to watch pods in.
"""
self.namespace = namespace
self.reconnection_attempts = int(config.scrapyd().get('reconnection_attempts', 5))
self.backoff_time = int(config.scrapyd().get('backoff_time', 5))
self.backoff_coefficient = int(config.scrapyd().get('backoff_coefficient', 2))
self.subscribers: List[Callable] = []
self._stop_event = threading.Event()
self.watcher_thread = threading.Thread(target=self.watch_pods, daemon=True)
self.watcher_thread.start()
logger.info(f"ResourceWatcher thread started for namespace '{self.namespace}'.")

def subscribe(self, callback: Callable):
"""
Adds a subscriber callback to be notified on events.

Parameters
----------
callback : Callable
A function to call when an event is received.
"""
if callback not in self.subscribers:
self.subscribers.append(callback)
logger.debug(f"Subscriber {callback.__name__} added.")

def unsubscribe(self, callback: Callable):
"""
Removes a subscriber callback.

Parameters
----------
callback : Callable
The subscriber function to remove.
"""
if callback in self.subscribers:
self.subscribers.remove(callback)
logger.debug(f"Subscriber {callback.__name__} removed.")

def notify_subscribers(self, event: dict):
"""
Notifies all subscribers about an event.

Parameters
----------
event : dict
The Kubernetes event data.
"""
for subscriber in self.subscribers:
try:
subscriber(event)
except Exception as e:
logger.exception(f"Error notifying subscriber {subscriber.__name__}: {e}")

def watch_pods(self):
"""
Watches Kubernetes pod events and notifies subscribers.
Runs in a separate thread.
"""
v1 = client.CoreV1Api()
w = watch.Watch()
resource_version = None

logger.info(f"Started watching pods in namespace '{self.namespace}'.")
backoff_time = self.backoff_time
reconnection_attempts = self.reconnection_attempts
while not self._stop_event.is_set() and reconnection_attempts > 0:
try:
kwargs = {
'namespace': self.namespace,
'timeout_seconds': 0,
}
if resource_version:
kwargs['resource_version'] = resource_version
first_event = True
for event in w.stream(v1.list_namespaced_pod, **kwargs):
if first_event:
# Reset reconnection attempts and backoff time upon successful reconnection
reconnection_attempts = self.reconnection_attempts
backoff_time = self.backoff_time
first_event = False # Ensure this only happens once per connection
pod_name = event['object'].metadata.name
resource_version = event['object'].metadata.resource_version
event_type = event['type']
logger.debug(f"Received event: {event_type} for pod: {pod_name}")
self.notify_subscribers(event)
except (urllib3.exceptions.ProtocolError,
urllib3.exceptions.ReadTimeoutError,
urllib3.exceptions.ConnectionError) as e:
reconnection_attempts -= 1
logger.exception(f"Encountered network error: {e}")
logger.info(f"Retrying to watch pods after {backoff_time} seconds...")
time.sleep(backoff_time)
backoff_time *= self.backoff_coefficient
except client.ApiException as e:
# Resource version is too old and cannot be accessed anymore
if e.status == 410:
logger.error("Received 410 Gone error, resetting resource_version and restarting watch.")
resource_version = None
continue
else:
reconnection_attempts -= 1
logger.exception(f"Encountered ApiException: {e}")
logger.info(f"Retrying to watch pods after {backoff_time} seconds...")
time.sleep(backoff_time)
backoff_time *= self.backoff_coefficient
except StopIteration:
logger.info("Watch stream ended, restarting watch.")
continue
except Exception as e:
reconnection_attempts -= 1
logger.exception(f"Watcher encountered exception: {e}")
logger.info(f"Retrying to watch pods after {backoff_time} seconds...")
time.sleep(backoff_time)
backoff_time *= self.backoff_coefficient


def stop(self):
"""
Stops the watcher thread gracefully.
"""
self._stop_event.set()
self.watcher_thread.join()
logger.info(f"ResourceWatcher thread stopped for namespace '{self.namespace}'.")
25 changes: 23 additions & 2 deletions scrapyd_k8s/launcher/k8s.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import os
import logging

import kubernetes
import kubernetes.stream
from signal import Signals

from ..k8s_resource_watcher import ResourceWatcher
from ..utils import format_datetime_object, native_stringify_dict
from scrapyd_k8s.joblogs import joblogs_init
from scrapyd_k8s.joblogs import KubernetesJobLogHandler

logger = logging.getLogger(__name__)

class K8s:

Expand All @@ -25,6 +29,16 @@ def __init__(self, config):
self._k8s = kubernetes.client.CoreV1Api()
self._k8s_batch = kubernetes.client.BatchV1Api()

self._init_resource_watcher(config)

def _init_resource_watcher(self, config):
self.resource_watcher = ResourceWatcher(self._namespace, config)

if config.joblogs() is not None:
self.enable_joblogs(config)
else:
logger.debug("Job logs handling not enabled; 'joblogs' configuration section is missing.")

def get_node_name(self):
deployment = os.getenv('MY_DEPLOYMENT_NAME', 'default')
namespace = os.getenv('MY_NAMESPACE')
Expand Down Expand Up @@ -122,7 +136,14 @@ def cancel(self, project, job_id, signal):
return prevstate

def enable_joblogs(self, config):
joblogs_init(config)
joblogs_config = config.joblogs()
if joblogs_config and joblogs_config.get('storage_provider') is not None:
log_handler = KubernetesJobLogHandler(config)
self.resource_watcher.subscribe(log_handler.handle_events)
logger.info("Job logs handler started.")
else:
logger.warning("No storage provider configured; job logs will not be uploaded.")


def _parse_job(self, job):
state = self._k8s_job_to_scrapyd_status(job)
Expand Down
3 changes: 2 additions & 1 deletion scrapyd_k8s/object_storage/libcloud_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ def object_exists(self, prefix):
----
Logs information about the existence check or errors encountered.
"""
container = self.driver.get_container(container_name=self._container_name)
try:
objects = self.driver.list_container_objects(container=self._container_name, prefix=prefix)
objects = self.driver.list_container_objects(container=container, prefix=prefix)
if objects:
logger.debug(f"At least one object with prefix '{prefix}' exists in container '{self._container_name}'.")
return True
Expand Down
Loading