diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index b0f2d6f262e55..45a43fdba684c 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -21,6 +21,7 @@ import logging.config import os import re +from contextlib import suppress from http import HTTPStatus from importlib import reload from pathlib import Path @@ -47,6 +48,7 @@ _fetch_logs_from_service, _get_parsed_log_stream, _interleave_logs, + _parse_timestamp, ) from airflow.utils.log.logging_mixin import set_context from airflow.utils.net import get_hostname @@ -68,6 +70,20 @@ FILE_TASK_HANDLER = "task" +def _log_sample_to_parsed_log_stream(log_sample: str): + lines = log_sample.splitlines() + timestamp = None + next_timestamp = None + for idx, line in enumerate(lines): + if line: + with suppress(Exception): + # next_timestamp unchanged if line can't be parsed + next_timestamp = _parse_timestamp(line) + if next_timestamp: + timestamp = next_timestamp + yield timestamp, idx, line + + class TestFileTaskLogHandler: def clean_up(self): with create_session() as session: @@ -633,6 +649,7 @@ def test_interleave_interleaves(): "[2022-11-16T00:05:54.278-0800] {taskinstance.py:1258} INFO - Starting attempt 1 of 1", ] ) + log_sample1_stream = _log_sample_to_parsed_log_stream(log_sample1) log_sample2 = "\n".join( [ "[2022-11-16T00:05:54.295-0800] {taskinstance.py:1278} INFO - Executing on 2022-11-16 08:05:52.324532+00:00", @@ -643,6 +660,7 @@ def test_interleave_interleaves(): "[2022-11-16T00:05:54.309-0800] {standard_task_runner.py:83} INFO - Job 33648: Subtask wait", ] ) + log_sample2_stream = _log_sample_to_parsed_log_stream(log_sample2) log_sample3 = "\n".join( [ "[2022-11-16T00:05:54.457-0800] {task_command.py:376} INFO - Running on host daniels-mbp-2.lan", @@ -655,6 +673,7 @@ def test_interleave_interleaves(): "[2022-11-16T00:05:54.604-0800] {taskinstance.py:1360} INFO - Pausing task as DEFERRED. dag_id=simple_async_timedelta, task_id=wait, execution_date=20221116T080552, start_date=20221116T080554", ] ) + log_sample3_stream = _log_sample_to_parsed_log_stream(log_sample3) expected = "\n".join( [ "[2022-11-16T00:05:54.278-0800] {taskinstance.py:1258} INFO - Starting attempt 1 of 1", @@ -672,7 +691,9 @@ def test_interleave_interleaves(): "[2022-11-16T00:05:54.604-0800] {taskinstance.py:1360} INFO - Pausing task as DEFERRED. dag_id=simple_async_timedelta, task_id=wait, execution_date=20221116T080552, start_date=20221116T080554", ] ) - assert "\n".join(_interleave_logs(log_sample2, log_sample1, log_sample3)) == expected + interleave_log_stream = _interleave_logs(log_sample1_stream, log_sample2_stream, log_sample3_stream) + interleave_log_str = "\n".join(line for line in interleave_log_stream) + assert interleave_log_str == expected long_sample = """ @@ -749,22 +770,31 @@ def test_interleave_logs_correct_ordering(): [2023-01-17T12:47:11.883-0800] {triggerer_job.py:540} INFO - Trigger (ID 1) fired: TriggerEvent """ - assert sample_with_dupe == "\n".join(_interleave_logs(sample_with_dupe, "", sample_with_dupe)) + interleave_stream = _interleave_logs( + _log_sample_to_parsed_log_stream(sample_with_dupe), + _log_sample_to_parsed_log_stream(""), + _log_sample_to_parsed_log_stream(sample_with_dupe), + ) + interleave_str = "\n".join(line for line in interleave_stream) + assert interleave_str == sample_with_dupe def test_interleave_logs_correct_dedupe(): sample_without_dupe = """test, - test, - test, - test, - test, - test, - test, - test, - test, - test""" - - assert sample_without_dupe == "\n".join(_interleave_logs(",\n ".join(["test"] * 10))) +test, +test, +test, +test, +test, +test, +test, +test, +test""" + + interleave_stream = _interleave_logs( + _log_sample_to_parsed_log_stream(sample_without_dupe), + ) + assert "\n".join(line for line in interleave_stream) == "test,\ntest" def test_permissions_for_new_directories(tmp_path):