From e7c3d375a06c0fe7ff5b635a2087bd5e7d060194 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Sat, 18 Oct 2025 17:51:37 +0800 Subject: [PATCH 1/3] Add retry for test_remote_logging_s3 e2e test --- .../remote_log_tests/test_remote_logging.py | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py index 265bfca6bbe88..55e8c6d9639d1 100644 --- a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import time from datetime import datetime, timezone import boto3 @@ -27,6 +28,9 @@ class TestRemoteLogging: airflow_client = AirflowClient() dag_id = "example_xcom_test" + task_count = 6 + retry_delay_seconds = 1 + max_retries = 5 def test_dag_unpause(self): self.airflow_client.un_pause_dag( @@ -72,9 +76,25 @@ def test_remote_logging_s3(self): bucket_name = "test-airflow-logs" response = s3_client.list_objects_v2(Bucket=bucket_name) + # Wait for logs to be available in S3 + for _ in range(self.max_retries): + response = s3_client.list_objects_v2(Bucket=bucket_name) + contents = response.get("Contents", []) + if len(contents) >= self.task_count: + break + + print(f"Expected at least {self.task_count} log files, found {len(contents)}. Retrying...") + time.sleep(self.retry_delay_seconds) + if "Contents" not in response: pytest.fail("No objects found in S3 bucket %s", bucket_name) + if len(response["Contents"]) < self.task_count: + pytest.fail( + f"Expected at least {self.task_count} log files in S3 bucket {bucket_name}, " + f"but found {len(response['Contents'])}." + ) + # s3 key format: dag_id=example_xcom/run_id=manual__2025-09-29T23:32:09.457215+00:00/task_id=bash_pull/attempt=1.log log_files = [f"s3://{bucket_name}/{obj['Key']}" for obj in response["Contents"]] From f248bc3393cce1cc8ab370381546825cb78fae71 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Sat, 18 Oct 2025 18:20:20 +0800 Subject: [PATCH 2/3] Print list of contents found --- .../airflow_e2e_tests/remote_log_tests/test_remote_logging.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py index 55e8c6d9639d1..9dabe70ea8aef 100644 --- a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py @@ -92,7 +92,7 @@ def test_remote_logging_s3(self): if len(response["Contents"]) < self.task_count: pytest.fail( f"Expected at least {self.task_count} log files in S3 bucket {bucket_name}, " - f"but found {len(response['Contents'])}." + f"but found {len(response['Contents'])} objects: {[obj.get('Key') for obj in response.get('Contents', [])]}" ) # s3 key format: dag_id=example_xcom/run_id=manual__2025-09-29T23:32:09.457215+00:00/task_id=bash_pull/attempt=1.log From b52c2c7f714f051847eb5952ca38ea3b9ec8b797 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Sat, 18 Oct 2025 18:28:50 +0800 Subject: [PATCH 3/3] Rename retry_delay_seconds as retry_interval_in_seconds --- .../airflow_e2e_tests/remote_log_tests/test_remote_logging.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py index 9dabe70ea8aef..d4da248a84fa4 100644 --- a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py @@ -29,7 +29,7 @@ class TestRemoteLogging: airflow_client = AirflowClient() dag_id = "example_xcom_test" task_count = 6 - retry_delay_seconds = 1 + retry_interval_in_seconds = 1 max_retries = 5 def test_dag_unpause(self): @@ -84,7 +84,7 @@ def test_remote_logging_s3(self): break print(f"Expected at least {self.task_count} log files, found {len(contents)}. Retrying...") - time.sleep(self.retry_delay_seconds) + time.sleep(self.retry_interval_in_seconds) if "Contents" not in response: pytest.fail("No objects found in S3 bucket %s", bucket_name)