diff --git a/CHANGES.rst b/CHANGES.rst index b4beb745..cd29e947 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -11,6 +11,7 @@ Version master (UNRELEASED) - Improves error reporting on Docker image related failures. - Prefixes user workflows with the configured REANA prefix. - Switches CVMFS to be read-only mount. +- Instantiates jobs in the configured runtime namespace. - Enables running tests locally on MacOS. - Adds Black formatter support. diff --git a/reana_job_controller/job_monitor.py b/reana_job_controller/job_monitor.py index c9062960..0beede1e 100644 --- a/reana_job_controller/job_monitor.py +++ b/reana_job_controller/job_monitor.py @@ -14,6 +14,7 @@ import traceback from kubernetes import client, watch +from reana_commons.config import REANA_RUNTIME_KUBERNETES_NAMESPACE from reana_commons.k8s.api_client import ( current_k8s_batchv1_api_client, current_k8s_corev1_api_client, @@ -202,10 +203,6 @@ def get_job_logs(self, job_pod): """Get job pod's containers' logs.""" try: pod_logs = "" - # job_pod = current_k8s_corev1_api_client.read_namespaced_pod( - # namespace='default', - # name=job_pod.metadata.name) - # we probably don't need this call again... FIXME container_statuses = job_pod.status.container_statuses + ( job_pod.status.init_container_statuses or [] ) @@ -214,7 +211,7 @@ def get_job_logs(self, job_pod): for container in container_statuses: if container.state.terminated: container_log = current_k8s_corev1_api_client.read_namespaced_pod_log( - namespace="default", + namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, name=job_pod.metadata.name, container=container.name, ) @@ -244,7 +241,7 @@ def watch_jobs(self, job_db, app=None): w = watch.Watch() for event in w.stream( current_k8s_corev1_api_client.list_namespaced_pod, - namespace="default", + namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, label_selector="job-name", ): logging.info("New Pod event received: {0}".format(event["type"])) diff --git a/reana_job_controller/kubernetes_job_manager.py b/reana_job_controller/kubernetes_job_manager.py index 89c92959..c5be7416 100644 --- a/reana_job_controller/kubernetes_job_manager.py +++ b/reana_job_controller/kubernetes_job_manager.py @@ -22,8 +22,8 @@ CVMFS_REPOSITORIES, K8S_CERN_EOS_AVAILABLE, K8S_CERN_EOS_MOUNT_CONFIGURATION, - K8S_DEFAULT_NAMESPACE, REANA_COMPONENT_PREFIX, + REANA_RUNTIME_KUBERNETES_NAMESPACE, WORKFLOW_RUNTIME_USER_GID, WORKFLOW_RUNTIME_USER_UID, ) @@ -112,7 +112,8 @@ def execute(self): self.job = { "kind": "Job", "apiVersion": "batch/v1", - "metadata": {"name": backend_job_id, "namespace": K8S_DEFAULT_NAMESPACE}, + "metadata": {"name": backend_job_id, + "namespace": REANA_RUNTIME_KUBERNETES_NAMESPACE}, "spec": { "backoffLimit": KubernetesJobManager.MAX_NUM_JOB_RESTARTS, "autoSelector": True, @@ -194,7 +195,7 @@ def _submit(self): """Submit job and return its backend id.""" try: api_response = current_k8s_batchv1_api_client.create_namespaced_job( - namespace=K8S_DEFAULT_NAMESPACE, body=self.job + namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, body=self.job ) return self.job["metadata"]["name"] except ApiException as e: @@ -214,7 +215,7 @@ def stop(backend_job_id, asynchronous=True): propagation_policy = "Background" if asynchronous else "Foreground" delete_options = V1DeleteOptions(propagation_policy=propagation_policy) current_k8s_batchv1_api_client.delete_namespaced_job( - backend_job_id, K8S_DEFAULT_NAMESPACE, body=delete_options + backend_job_id, REANA_RUNTIME_KUBERNETES_NAMESPACE, body=delete_options ) except ApiException as e: logging.error(