Skip to content

Commit

Permalink
Fix TaskHandlerWithCustomFormatter now adds prefix only once (#38502)
Browse files Browse the repository at this point in the history
When using the TaskHandlerWithCustomFormatter to add a prefix to logs, it was previously adding the prefix multiple times. This happened because it was being called multiple times from logging_mixin.py, and worsened because even when the handler's formatter was a TimezoneAware formatter (to include UTC offset), it was still adding an additional prefix. Because of this, I felt that any solution outside of the TaskHandlerWithCustomFormatter itself would either require a restructuring of the handlers' structure or slow down execution for all other handlers. And so, the solution I settled on was to add to TaskHandlerWithCustomFormatter's initial 'if' statement a simple 'or self.prefix_jinja_template is not None', so that it returns if the prefix had already been set. This is similar to what is done by the ElasticSearch es_task_handler.py.

Note: also fixed the documentation's example for the handler, as the previous one was incorrect and didn't work.
(cherry picked from commit 61d1c95)
  • Loading branch information
TiDeane authored and ephraimbuddy committed Jun 4, 2024
1 parent c4624fd commit 3562374
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 6 deletions.
2 changes: 1 addition & 1 deletion airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,7 @@ logging:
Specify prefix pattern like mentioned below with stream handler TaskHandlerWithCustomFormatter
version_added: 2.0.0
type: string
example: "{ti.dag_id}-{ti.task_id}-{execution_date}-{try_number}"
example: "{{ti.dag_id}}-{{ti.task_id}}-{{execution_date}}-{{ti.try_number}}"
is_template: true
default: ""
log_filename_template:
Expand Down
3 changes: 2 additions & 1 deletion airflow/utils/log/task_handler_with_custom_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ def set_context(self, ti) -> None:
:param ti:
:return:
"""
if ti.raw or self.formatter is None:
# Returns if there is no formatter or if the prefix has already been set
if ti.raw or self.formatter is None or self.prefix_jinja_template is not None:
return
prefix = conf.get("logging", "task_log_prefix_template")

Expand Down
24 changes: 20 additions & 4 deletions tests/utils/test_task_handler_with_custom_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,36 @@ def task_instance():
clear_db_runs()


def assert_prefix(task_instance: TaskInstance, prefix: str) -> None:
def assert_prefix_once(task_instance: TaskInstance, prefix: str) -> None:
handler = next((h for h in task_instance.log.handlers if h.name == TASK_HANDLER), None)
assert handler is not None, "custom task log handler not set up correctly"
assert handler.formatter is not None, "custom task log formatter not set up correctly"
previous_formatter = handler.formatter
expected_format = f"{prefix}:{handler.formatter._fmt}"
set_context(task_instance.log, task_instance)
assert expected_format == handler.formatter._fmt
handler.setFormatter(previous_formatter)


def assert_prefix_multiple(task_instance: TaskInstance, prefix: str) -> None:
handler = next((h for h in task_instance.log.handlers if h.name == TASK_HANDLER), None)
assert handler is not None, "custom task log handler not set up correctly"
assert handler.formatter is not None, "custom task log formatter not set up correctly"
previous_formatter = handler.formatter
expected_format = f"{prefix}:{handler.formatter._fmt}"
set_context(task_instance.log, task_instance)
set_context(task_instance.log, task_instance)
set_context(task_instance.log, task_instance)
assert expected_format == handler.formatter._fmt
handler.setFormatter(previous_formatter)


def test_custom_formatter_default_format(task_instance):
"""The default format provides no prefix."""
assert_prefix(task_instance, "")
assert_prefix_once(task_instance, "")


@conf_vars({("logging", "task_log_prefix_template"): "{{ti.dag_id }}-{{ ti.task_id }}"})
@conf_vars({("logging", "task_log_prefix_template"): "{{ ti.dag_id }}-{{ ti.task_id }}"})
def test_custom_formatter_custom_format_not_affected_by_config(task_instance):
assert_prefix(task_instance, f"{DAG_ID}-{TASK_ID}")
"""Certifies that the prefix is only added once, even after repeated calls"""
assert_prefix_multiple(task_instance, f"{DAG_ID}-{TASK_ID}")

0 comments on commit 3562374

Please sign in to comment.