From 51fad95d5c7d08712e5e9b1b3f0ae055704891b2 Mon Sep 17 00:00:00 2001 From: Alputer Date: Tue, 26 Nov 2024 11:42:08 +0100 Subject: [PATCH] feat(opensearch): capture logs from Dask cluster pods (#616) This commit collects logs from Dask scheduler and workers and propagates them to all REANA jobs that are using the same Dask cluster. This is not ideal, since Dask logs can become thusly duplicated for different workflow steps of the workflow, which could be confusing for the user. However, when a user uses Dask to parallelise the workflow jobs, usually the workflow steps are defined only within Dask, so this situation does not occur. Hence we can afford doing this in usual real-life conditions. Separating Dask scheduler and worker logs from regular Kubernetes job logs would require a larger architectural change and is therefore deferred to a future commit. Closes #610 --- reana_workflow_controller/opensearch.py | 77 ++++++++++++++++++++++--- reana_workflow_controller/rest/utils.py | 53 +++++++++++++++-- 2 files changed, 116 insertions(+), 14 deletions(-) diff --git a/reana_workflow_controller/opensearch.py b/reana_workflow_controller/opensearch.py index aea2d6ca..8acc17b3 100644 --- a/reana_workflow_controller/opensearch.py +++ b/reana_workflow_controller/opensearch.py @@ -63,11 +63,13 @@ def __init__( os_client: OpenSearch | None = None, job_index: str = "fluentbit-job_log", workflow_index: str = "fluentbit-workflow_log", + dask_index: str = "fluentbit-dask_log", max_rows: int = 5000, log_key: str = "log", order: str = "asc", job_log_matcher: str = "kubernetes.labels.job-name.keyword", workflow_log_matcher: str = "kubernetes.labels.reana-run-batch-workflow-uuid.keyword", + dask_log_matcher: str = "kubernetes.labels.dask.org/cluster-name.keyword", timeout: int = 5, ) -> None: """ @@ -91,34 +93,57 @@ def __init__( self.os_client = os_client self.job_index = job_index self.workflow_index = workflow_index + self.dask_index = dask_index self.max_rows = max_rows self.log_key = log_key self.order = order self.job_log_matcher = job_log_matcher self.workflow_log_matcher = workflow_log_matcher + self.dask_log_matcher = dask_log_matcher self.timeout = timeout - def fetch_logs(self, id: str, index: str, match: str) -> str | None: + def fetch_logs( + self, id: str, index: str, match: str = None, matches: dict | None = None + ) -> str | None: """ - Fetch logs of a specific job or workflow. + Fetch logs of a specific job, workflow or Dask cluster. :param id: Job or workflow ID. :param index: Index name for logs. - :param match: Matcher for logs. + :param match: Single matcher for logs (mutually exclusive with `matches`). + :param matches: Dictionary of field-to-value pairs for multiple match conditions. - :return: Job or workflow logs. + :return: Job, workflow or Dask cluster logs matching the conditions. """ - query = { - "query": {"match": {match: id}}, - "sort": [{"@timestamp": {"order": self.order}}], - } + if matches: + # Build a bool query with multiple conditions + query = { + "query": { + "bool": { + "must": [ + {"match": {field: value}} + for field, value in matches.items() + ] + } + }, + "sort": [{"@timestamp": {"order": self.order}}], + } + elif match: + # Build a simple single-match query + query = { + "query": {"match": {match: id}}, + "sort": [{"@timestamp": {"order": self.order}}], + } + else: + logging.error("Either `match` or `matches` must be provided.") + return None try: response = self.os_client.search( index=index, body=query, size=self.max_rows, timeout=self.timeout ) except Exception as e: - logging.error("Failed to fetch logs for {0}: {1}".format(id, e)) + logging.error(f"Failed to fetch logs for {id}: {e}") return None return self._concat_rows(response["hits"]["hits"]) @@ -151,6 +176,40 @@ def fetch_workflow_logs(self, workflow_id: str) -> str | None: self.workflow_log_matcher, ) + def fetch_dask_scheduler_logs(self, workflow_id: str) -> str | None: + """ + Fetch logs of the scheduler of a Dask cluster. + + :param workflow_id: Workflow ID. + + :return: Dask cluster scheduler logs. + """ + return self.fetch_logs( + id=None, + index=self.dask_index, + matches={ + self.dask_log_matcher: f"reana-run-dask-{workflow_id}", + "kubernetes.labels.dask.org/component": "scheduler", + }, + ) + + def fetch_dask_worker_logs(self, workflow_id: str) -> str | None: + """ + Fetch logs of the workers of a Dask cluster. + + :param workflow_id: Workflow ID. + + :return: Dask cluster worker logs. + """ + return self.fetch_logs( + id=None, + index=self.dask_index, + matches={ + self.dask_log_matcher: f"reana-run-dask-{workflow_id}", + "kubernetes.labels.dask.org/component": "worker", + }, + ) + def _concat_rows(self, rows: list) -> str | None: """ Concatenate log messages from rows. diff --git a/reana_workflow_controller/rest/utils.py b/reana_workflow_controller/rest/utils.py index 3449bf5d..d457a4d4 100644 --- a/reana_workflow_controller/rest/utils.py +++ b/reana_workflow_controller/rest/utils.py @@ -60,10 +60,12 @@ from werkzeug.exceptions import BadRequest, NotFound from reana_workflow_controller.config import ( + DASK_ENABLED, PROGRESS_STATUSES, REANA_GITLAB_HOST, PREVIEWABLE_MIME_TYPE_PREFIXES, ) +from reana_workflow_controller.dask import requires_dask from reana_workflow_controller.consumer import _update_workflow_status from reana_workflow_controller.errors import ( REANAExternalCallError, @@ -183,11 +185,52 @@ def build_workflow_logs(workflow, steps=None, paginate=None): open_search_log_fetcher = build_opensearch_log_fetcher() - logs = ( - open_search_log_fetcher.fetch_job_logs(job.backend_job_id) - if open_search_log_fetcher - else None - ) + logs = None + + if DASK_ENABLED and requires_dask(workflow): + logs = ( + (open_search_log_fetcher.fetch_job_logs(job.backend_job_id) or "") + + """ + + ------------------------------------------------------------------- + ------------------------------------------------------------------- + ---------------- DASK SCHEDULER LOGS ---------------- + ------------------------------------------------------------------- + ------------------------------------------------------------------- + + + + """ + + ( + open_search_log_fetcher.fetch_dask_scheduler_logs(job.workflow_uuid) + or "" + ) + + """ + + ------------------------------------------------------------------- + ------------------------------------------------------------------- + ---------------- DASK WORKER LOGS ---------------- + ------------------------------------------------------------------- + ------------------------------------------------------------------- + + + + + """ + + ( + open_search_log_fetcher.fetch_dask_worker_logs(job.workflow_uuid) + or "" + ) + if open_search_log_fetcher + else None + ) + + else: + logs = ( + open_search_log_fetcher.fetch_job_logs(job.backend_job_id) + if open_search_log_fetcher + else None + ) item = { "workflow_uuid": str(job.workflow_uuid) or "",