diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py index 265bfca6bbe88..d4da248a84fa4 100644 --- a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import time from datetime import datetime, timezone import boto3 @@ -27,6 +28,9 @@ class TestRemoteLogging: airflow_client = AirflowClient() dag_id = "example_xcom_test" + task_count = 6 + retry_interval_in_seconds = 1 + max_retries = 5 def test_dag_unpause(self): self.airflow_client.un_pause_dag( @@ -72,9 +76,25 @@ def test_remote_logging_s3(self): bucket_name = "test-airflow-logs" response = s3_client.list_objects_v2(Bucket=bucket_name) + # Wait for logs to be available in S3 + for _ in range(self.max_retries): + response = s3_client.list_objects_v2(Bucket=bucket_name) + contents = response.get("Contents", []) + if len(contents) >= self.task_count: + break + + print(f"Expected at least {self.task_count} log files, found {len(contents)}. Retrying...") + time.sleep(self.retry_interval_in_seconds) + if "Contents" not in response: pytest.fail("No objects found in S3 bucket %s", bucket_name) + if len(response["Contents"]) < self.task_count: + pytest.fail( + f"Expected at least {self.task_count} log files in S3 bucket {bucket_name}, " + f"but found {len(response['Contents'])} objects: {[obj.get('Key') for obj in response.get('Contents', [])]}" + ) + # s3 key format: dag_id=example_xcom/run_id=manual__2025-09-29T23:32:09.457215+00:00/task_id=bash_pull/attempt=1.log log_files = [f"s3://{bucket_name}/{obj['Key']}" for obj in response["Contents"]]