-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KubernetesPodOperator fails due to getting logs #39239
Comments
There is exception handling for def write_logs(self, pod: k8s.V1Pod):
try:
logs = self.pod_manager.read_pod_logs(
pod=pod,
container_name=self.base_container_name,
follow=False,
)
for raw_line in logs:
line = raw_line.decode("utf-8", errors="backslashreplace").rstrip("\n")
self.log.info("Container logs: %s", line)
except HTTPError as e:
self.log.warning(
"Reading of logs interrupted with error %r; will retry. "
"Set log level to DEBUG for traceback.",
e,
) The error that bubbles up is an |
Hello @RNHTTR , Yes, I belive that too. I am doing a PR including this exception in that point but It is my firts PR and it's taking me longer. |
Something like this: def write_logs(self, pod: k8s.V1Pod):
try:
logs = self.pod_manager.read_pod_logs(
pod=pod,
container_name=self.base_container_name,
follow=False,
)
for raw_line in logs:
line = raw_line.decode("utf-8", errors="backslashreplace").rstrip("\n")
self.log.info("Container logs: %s", line)
except (HTTPError, ApiException) as e:
self.log.warning(
"Reading of logs interrupted with error %r; will retry. "
"Set log level to DEBUG for traceback.",
e if not isinstance(e, ApiException) else e.reason,
) |
I think simply handling the ApiException won't be enough. We should perform additional checks because during cleanup, we also read the pod. |
Hi @pankajastro , Of course. I have added the following in the _clean: def _clean(self, event: dict[str, Any]):
if event["status"] == "running":
return
istio_enabled = self.is_istio_enabled(self.pod)
# Skip await_pod_completion when the event is 'timeout' due to the pod can hang
# on the ErrImagePull or ContainerCreating step and it will never complete
if event["status"] != "timeout":
try:
self.pod = self.pod_manager.await_pod_completion(
self.pod, istio_enabled, self.base_container_name
)
except ApiException as e:
if e.status == 404:
self.pod = None
else:
raise e
if self.pod is not None:
self.post_complete_action(
pod=self.pod,
remote_pod=self.pod,
) I have done this due to the await_pod_completion can throw the same ApiException than write_logs. Handling this exception not is necessary make the post_complete_action. What do you think? |
would you like to create a PR? |
Yes, I would like it but It's my first time so I reading how do it |
Apache Airflow Provider(s)
cncf-kubernetes
Versions of Apache Airflow Providers
apache-airflow-providers-cncf-kubernetes 8.0.1
apache-airflow-providers-celery 3.6.1
Apache Airflow version
2.8.3
Operating System
Debian GNU/Linux 12 (bookworm)
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
What happened
I have an Airflow instance deploy on GKE cluster with helm (keda is activate on workers to scale from 1 to 5 with a cooldownPeriod set to 240). I use CeleryKubernetesExecutor and I run the tasks with KubernetesPodOperator on the celery workers. To not have an active task on celery worker while KubernetesPodOperator running I activate deferrable mode on them. I set poll_interval to 10 seconds. After that many errors with 404 status code appears.
Situation:
example_task = KubernetesPodOperator(task_id="example_task", name="example_task", get_logs=True, on_finish_action="delete_pod", log_events_on_failure=True, deferrable=True, poll_interval=10, logging_interval=None)
We can suppose that the rest of input params are correct.
The steps are:
[2024-04-24T12:48:57.740+0000] {triggerer_job_runner.py:623} INFO - trigger example_task (ID 668324) completed
The task logs is:
[2024-04-24, 12:51:21 UTC] {taskinstance.py:2731} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 444, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
return execute_callable(context=context, **execute_callable_kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 1604, in resume_execution
return execute_callable(context)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 738, in trigger_reentry
self.write_logs(self.pod)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 771, in write_logs
logs = self.pod_manager.read_pod_logs(
File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 324, in wrapped_f
return self(f, *args, **kw)
File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 404, in __call__
do = self.iter(retry_state=retry_state)
File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 360, in iter
raise retry_exc.reraise()
File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 193, in reraise
raise self.last_attempt.result()
File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 451, in result
return self.__get_result()
File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 407, in __call__
result = fn(*args, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 668, in read_pod_logs
logs = self._client.read_namespaced_pod_log(
File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api/core_v1_api.py", line 23957, in read_namespaced_pod_log
return self.read_namespaced_pod_log_with_http_info(name, namespace, **kwargs) # noqa: E501
File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api/core_v1_api.py", line 24076, in read_namespaced_pod_log_with_http_info
return self.api_client.call_api(
File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api_client.py", line 348, in call_api
return self.__call_api(resource_path, method,
File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
response_data = self.request(
File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api_client.py", line 373, in request
return self.rest_client.GET(url,
File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/rest.py", line 244, in GET
return self.request("GET", url,
File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/rest.py", line 238, in request
raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (404)
Reason: Not Found
The trigger_reentry method causes the error. I think that this is due to the POST_TERMINATION_TIMEOUT value. This is set to 120 on KubernetesPodOperator and the worker try to read the pod log 3 minutes later (pod finish at 12:48:51 and worker reading at 12:51:21). In that time the pod don't exists it not possible read the log and 404 error is back.
What you think should happen instead
The task should finish success with a warning indicating that logs can't reading or maybe other option but not mark the task as failed and set to up_for_retry.
How to reproduce
The steps are indicate on "What happened" section. You just activate deferrable mode on KubernetesPodOperators and use CeleryKubernetesExecutor. Derive the task to celery workers.
Anything else
No response
Are you willing to submit PR?
Code of Conduct
The text was updated successfully, but these errors were encountered: