Skip to content

Conversation

@jason810496
Copy link
Member

related: #56811

Why

After #56811, we can ensure there are 6 files present in S3 (the same amount of tasks in example_xcom_test Dag) before validating source task_log_sources and log_files. However the any CI with Additional PROD image tests / Remote logging tests with PROD image / test-e2e-integration-tests is still failing.

IMO, the main root cause is the /opt/airflow/logs/ prefix in task_log_sources, which make the any(source in log_files for source in task_log_sources) condition always fail.

Example fail runs:

=========================== short test summary info ============================
FAILED tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py::TestRemoteLogging::test_remote_logging_s3 - AssertionError: None of the log sources ['/opt/airflow/logs/dag_id=example_xcom_test/run_id=manual__2025-10-20T03:24:32.261538+00:00/task_id=bash_pull/attempt=1.log'] were found in S3 bucket logs ['s3://test-airflow-logs/dag_id=example_xcom_test/run_id=manual__2025-10-20T03:24:32.261538+00:00/task_id=bash_pull/attempt=1.log', 's3://test-airflow-logs/dag_id=example_xcom_test/run_id=manual__2025-10-20T03:24:32.261538+00:00/task_id=bash_push/attempt=1.log', 's3://test-airflow-logs/dag_id=example_xcom_test/run_id=manual__2025-10-20T03:24:32.261538+00:00/task_id=pull_value_from_bash_push/attempt=1.log', 's3://test-airflow-logs/dag_id=example_xcom_test/run_id=manual__2025-10-20T03:24:32.261538+00:00/task_id=puller/attempt=1.log', 's3://test-airflow-logs/dag_id=example_xcom_test/run_id=manual__2025-10-20T03:24:32.261538+00:00/task_id=push/attempt=1.log', 's3://test-airflow-logs/dag_id=example_xcom_test/run_id=manual__2025-10-20T03:24:32.261538+00:00/task_id=push_by_returning/attempt=1.log']
assert False
 +  where False = any(<generator object TestRemoteLogging.test_remote_logging_s3.<locals>.<genexpr> at 0x7fe6880d4c10>)
=================== 1 failed, 1 passed, 1 warning in 42.88s ====================

There are already 6 files in log_files (which should be success) but the test still fail in above traceback.

What

  • Remove /opt/airflow/logs/ prefix from task_log_sources
  • task_log_sources format before fix:
    • /opt/airflow/logs/dag_id=example_xcom_test/run_id=manual__2025-10-20T03:24:32.261538+00:00/task_id=bash_pull/attempt=1.log
  • task_log_sources format after fix:
    • dag_id=example_xcom_test/run_id=manual__2025-10-20T03:24:32.261538+00:00/task_id=bash_pull/attempt=1.log
  • log_files format:
    • s3://test-airflow-logs/dag_id=example_xcom_test/run_id=manual__2025-10-20T03:24:32.261538+00:00/task_id=bash_pull/attempt=1.log

Copy link
Member Author

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add area:API label explicitly to trigger test_remote_loggin_s3 test.

@gopidesupavan
Copy link
Member

This looks to me issue with api, the tests here explicitly we are validating s3 log source path returning correctly or not.

To me i am not sure why the api returning local log location? instead of s3 source

@jason810496
Copy link
Member Author

This looks to me issue with api, the tests here explicitly we are validating s3 log source path returning correctly or not.

To me i am not sure why the api returning local log location? instead of s3 source

Good point. I found why sometime we will get local source path instead of s3 path:

The FileTaskHandler will try to read from remote log, and if we are not able to get remote log (e.g. still uploading), we will try to read from local.

sources: LogSourceInfo = []
source_list: list[str] = []
remote_logs: list[RawLogStream] = []
local_logs: list[RawLogStream] = []
executor_logs: list[RawLogStream] = []
served_logs: list[RawLogStream] = []
with suppress(NotImplementedError):
sources, logs = self._read_remote_logs(ti, try_number, metadata)
if not logs:
remote_logs = []
elif isinstance(logs, list) and isinstance(logs[0], str):
# If the logs are in legacy format, convert them to a generator of log lines
remote_logs = [
# We don't need to use the log_pos here, as we are using the metadata to track the position
_get_compatible_log_stream(cast("list[str]", logs))
]
elif isinstance(logs, list) and _is_logs_stream_like(logs[0]):
# If the logs are already in a stream-like format, we can use them directly
remote_logs = cast("list[RawLogStream]", logs)
else:
# If the logs are in a different format, raise an error
raise TypeError("Logs should be either a list of strings or a generator of log lines.")
# Extend LogSourceInfo
source_list.extend(sources)
has_k8s_exec_pod = False
if ti.state == TaskInstanceState.RUNNING:
executor_get_task_log = self._get_executor_get_task_log(ti)
response = executor_get_task_log(ti, try_number)
if response:
sources, logs = response
# make the logs stream-like compatible
executor_logs = [_get_compatible_log_stream(logs)]
if sources:
source_list.extend(sources)
has_k8s_exec_pod = True
if not (remote_logs and ti.state not in State.unfinished):
# when finished, if we have remote logs, no need to check local
worker_log_full_path = Path(self.local_base, worker_log_rel_path)
sources, local_logs = self._read_from_local(worker_log_full_path)
source_list.extend(sources)

It's not an issue in API but a fallback feature for fetching logs.

Copy link
Member Author

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the retry logic before we call get_task_logs, which should prevent read path of FileTaskHandler go to _read_from_local.

By the way, may I ask how can trigger Additional PROD image tests / Remote logging tests with PROD image / test-e2e-integration-tests test? I add area:API label but it doesn't work.

Thanks!

@gopidesupavan
Copy link
Member

gopidesupavan commented Oct 20, 2025

I moved the retry logic before we call get_task_logs, which should prevent read path of FileTaskHandler go to _read_from_local.

By the way, may I ask how can trigger Additional PROD image tests / Remote logging tests with PROD image / test-e2e-integration-tests test? I add area:API label but it doesn't work.

Thanks!

This is the conditions : https://github.com/apache/airflow/blob/main/.github/workflows/ci-amd.yml#L752 and https://github.com/apache/airflow/blob/main/dev/breeze/src/airflow_breeze/utils/selective_checks.py#L872

You could probably update the prod_image_build to return true for area:API label change that should work. :)

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

@gopidesupavan
Copy link
Member

The reason why we are comparing the name is whether API able to read the log by fetching connection, the last release we have this issue, writing log to remote is fine but, reading part broken as its not able to fetch the connection .

@jason810496 jason810496 added the full tests needed We need to run full set of tests for this PR to merge label Oct 20, 2025
@jason810496 jason810496 reopened this Oct 20, 2025
@jason810496
Copy link
Member Author

The reason why we are comparing the name is whether API able to read the log by fetching connection, the last release we have this issue, writing log to remote is fine but, reading part broken as its not able to fetch the connection .

Shouldn't Fix Connection or Variable access in Server context #56602 resolve the issue?

@jason810496
Copy link
Member Author

CI keep failing because of docker registry service error after a few rerun

Unable to find image 'bash:latest' locally
docker: Error response from daemon: Head "https://registry-1.docker.io/v2/library/bash/manifests/latest": received unexpected HTTP status: 503 Service Unavailable

@gopidesupavan
Copy link
Member

CI keep failing because of docker registry service error after a few rerun

Unable to find image 'bash:latest' locally
docker: Error response from daemon: Head "https://registry-1.docker.io/v2/library/bash/manifests/latest": received unexpected HTTP status: 503 Service Unavailable

seems docker has issues active incidents here: https://www.dockerstatus.com/

@jason810496
Copy link
Member Author

@jason810496 jason810496 merged commit b606608 into apache:main Oct 20, 2025
247 of 250 checks passed
nailo2c pushed a commit to nailo2c/airflow that referenced this pull request Oct 20, 2025
* Fix task_log_sources naming for test_remote_logging_s3

* Move retry before get_task_logs
TyrellHaywood pushed a commit to TyrellHaywood/airflow that referenced this pull request Oct 22, 2025
* Fix task_log_sources naming for test_remote_logging_s3

* Move retry before get_task_logs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:API Airflow's REST/HTTP API full tests needed We need to run full set of tests for this PR to merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants