Skip to content

Commit

Permalink
Fix interleave releated test
Browse files Browse the repository at this point in the history
  • Loading branch information
jason810496 committed Dec 21, 2024
1 parent afa5b1b commit 8617e5b
Showing 1 changed file with 43 additions and 13 deletions.
56 changes: 43 additions & 13 deletions tests/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import logging.config
import os
import re
from contextlib import suppress
from http import HTTPStatus
from importlib import reload
from pathlib import Path
Expand All @@ -47,6 +48,7 @@
_fetch_logs_from_service,
_get_parsed_log_stream,
_interleave_logs,
_parse_timestamp,
)
from airflow.utils.log.logging_mixin import set_context
from airflow.utils.net import get_hostname
Expand All @@ -68,6 +70,20 @@
FILE_TASK_HANDLER = "task"


def _log_sample_to_parsed_log_stream(log_sample: str):
lines = log_sample.splitlines()
timestamp = None
next_timestamp = None
for idx, line in enumerate(lines):
if line:
with suppress(Exception):
# next_timestamp unchanged if line can't be parsed
next_timestamp = _parse_timestamp(line)
if next_timestamp:
timestamp = next_timestamp
yield timestamp, idx, line


class TestFileTaskLogHandler:
def clean_up(self):
with create_session() as session:
Expand Down Expand Up @@ -633,6 +649,7 @@ def test_interleave_interleaves():
"[2022-11-16T00:05:54.278-0800] {taskinstance.py:1258} INFO - Starting attempt 1 of 1",
]
)
log_sample1_stream = _log_sample_to_parsed_log_stream(log_sample1)
log_sample2 = "\n".join(
[
"[2022-11-16T00:05:54.295-0800] {taskinstance.py:1278} INFO - Executing <Task(TimeDeltaSensorAsync): wait> on 2022-11-16 08:05:52.324532+00:00",
Expand All @@ -643,6 +660,7 @@ def test_interleave_interleaves():
"[2022-11-16T00:05:54.309-0800] {standard_task_runner.py:83} INFO - Job 33648: Subtask wait",
]
)
log_sample2_stream = _log_sample_to_parsed_log_stream(log_sample2)
log_sample3 = "\n".join(
[
"[2022-11-16T00:05:54.457-0800] {task_command.py:376} INFO - Running <TaskInstance: simple_async_timedelta.wait manual__2022-11-16T08:05:52.324532+00:00 [running]> on host daniels-mbp-2.lan",
Expand All @@ -655,6 +673,7 @@ def test_interleave_interleaves():
"[2022-11-16T00:05:54.604-0800] {taskinstance.py:1360} INFO - Pausing task as DEFERRED. dag_id=simple_async_timedelta, task_id=wait, execution_date=20221116T080552, start_date=20221116T080554",
]
)
log_sample3_stream = _log_sample_to_parsed_log_stream(log_sample3)
expected = "\n".join(
[
"[2022-11-16T00:05:54.278-0800] {taskinstance.py:1258} INFO - Starting attempt 1 of 1",
Expand All @@ -672,7 +691,9 @@ def test_interleave_interleaves():
"[2022-11-16T00:05:54.604-0800] {taskinstance.py:1360} INFO - Pausing task as DEFERRED. dag_id=simple_async_timedelta, task_id=wait, execution_date=20221116T080552, start_date=20221116T080554",
]
)
assert "\n".join(_interleave_logs(log_sample2, log_sample1, log_sample3)) == expected
interleave_log_stream = _interleave_logs(log_sample1_stream, log_sample2_stream, log_sample3_stream)
interleave_log_str = "\n".join(line for line in interleave_log_stream)
assert interleave_log_str == expected


long_sample = """
Expand Down Expand Up @@ -749,22 +770,31 @@ def test_interleave_logs_correct_ordering():
[2023-01-17T12:47:11.883-0800] {triggerer_job.py:540} INFO - Trigger <airflow.triggers.temporal.DateTimeTrigger moment=2023-01-17T20:47:11.254388+00:00> (ID 1) fired: TriggerEvent<DateTime(2023, 1, 17, 20, 47, 11, 254388, tzinfo=Timezone('UTC'))>
"""

assert sample_with_dupe == "\n".join(_interleave_logs(sample_with_dupe, "", sample_with_dupe))
interleave_stream = _interleave_logs(
_log_sample_to_parsed_log_stream(sample_with_dupe),
_log_sample_to_parsed_log_stream(""),
_log_sample_to_parsed_log_stream(sample_with_dupe),
)
interleave_str = "\n".join(line for line in interleave_stream)
assert interleave_str == sample_with_dupe


def test_interleave_logs_correct_dedupe():
sample_without_dupe = """test,
test,
test,
test,
test,
test,
test,
test,
test,
test"""

assert sample_without_dupe == "\n".join(_interleave_logs(",\n ".join(["test"] * 10)))
test,
test,
test,
test,
test,
test,
test,
test,
test"""

interleave_stream = _interleave_logs(
_log_sample_to_parsed_log_stream(sample_without_dupe),
)
assert "\n".join(line for line in interleave_stream) == "test,\ntest"


def test_permissions_for_new_directories(tmp_path):
Expand Down

0 comments on commit 8617e5b

Please sign in to comment.