Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,20 @@ logging:
default: "dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/\
{%% if ti.map_index >= 0 %%}map_index={{ ti.map_index }}/{%% endif %%}\
attempt={{ try_number }}.log"
use_historical_filename_templates:
description: |
When this parameter is set to ``True``, Airflow will use the old filename templates for historical
tasks. Similarly in this case elasticsearch_id is not properly set for historical tasks if you
change it. Both require access to the database to render the template filenames
by webserver, and it might lead to Dag Authors being able to execute code on the webserver, that's why
it's disabled by default - but it might lead to old logs not being displayed in the webserver UI.
You can enable it you change the value of ``log_filename_template`` in the past and want to be able
to see the logs for historical tasks, however you should only do that if you trust your Dag authors
to not abuse the capability of executing arbitrary code on the webserver through template rendering.
version_added: 2.11.1
type: boolean
example: ~
default: "False"
log_processor_filename_template:
description: |
Formatting for how airflow generates file names for log
Expand Down
20 changes: 18 additions & 2 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
from airflow.models.base import Base, StringID
from airflow.models.expandinput import NotFullyPopulated
from airflow.models.taskinstance import TaskInstance as TI
from airflow.models.tasklog import LogTemplate
from airflow.models.tasklog import LogTemplate, LogTemplateDataClass
from airflow.stats import Stats
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_states import SCHEDULEABLE_STATES
Expand Down Expand Up @@ -1648,9 +1648,25 @@ def schedule_tis(
return count

@provide_session
def get_log_template(self, *, session: Session = NEW_SESSION) -> LogTemplate | LogTemplatePydantic:
def get_db_log_template(self, *, session: Session = NEW_SESSION) -> LogTemplate | LogTemplatePydantic:
return DagRun._get_log_template(log_template_id=self.log_template_id, session=session)

@provide_session
def get_log_template(
self, session: Session = NEW_SESSION
) -> LogTemplate | LogTemplatePydantic | LogTemplateDataClass:
if airflow_conf.getboolean("core", "use_historical_filename_templates", fallback=False):
return self.get_db_log_template(session=session)
else:
return LogTemplateDataClass(
filename=airflow_conf.get_mandatory_value("core", "log_filename_template"),
elasticsearch_id=airflow_conf.get(
"elasticsearch",
"log_id_template",
fallback="{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this needs to be {{ }}. like this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really - that's a string that is already default - the {{ is only needed when we read it from config and where it is processed first by Jinja template. But yeah - I will double check if this works as expected (also this fallback is not going to be used most likely - it's only in case the code is run before ProvidersManager discovery - because once providers manager processes it, there is a default defined from provider.yaml.

),
)

@staticmethod
@internal_api_call
@provide_session
Expand Down
15 changes: 15 additions & 0 deletions airflow/models/tasklog.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# under the License.
from __future__ import annotations

from dataclasses import dataclass

from sqlalchemy import Column, Integer, Text

from airflow.models.base import Base
Expand All @@ -42,3 +44,16 @@ class LogTemplate(Base):
def __repr__(self) -> str:
attrs = ", ".join(f"{k}={getattr(self, k)}" for k in ("filename", "elasticsearch_id"))
return f"LogTemplate({attrs})"


@dataclass
class LogTemplateDataClass:
"""
Dataclass for log template (used when log template is read from configuration, not database).

:field filename: log filename template
:field elasticsearch_id: Elasticsearch document ID for log template
"""

filename: str
elasticsearch_id: str
17 changes: 17 additions & 0 deletions newsfragments/61880.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
Retrieving historical log templates is disabled in Airflow 2.11.1

When you change the log template in Airflow 2.11.1, the historical log templates are not retrieved.
This means that if you have existing logs that were generated using a different log template,
they will not be accessible using the new log template.

This change is due to potential security issues that could arise from retrieving historical log templates,
which allow Dag Authors to execute arbitrary code in webserver when retrieving logs.
By disabling the retrieval of historical log templates, Airflow 2.11.1 aims to enhance the security of the
system and prevent potential vulnerabilities in case the potential of executing arbitrary code in webserver
is important for Airflow deployment.

Users who need to access historical logs generated with a different log template will need to manually
update their log files to match the naming of their historical log files with the latest log template
configured in Airflow configuration, or they can set the "core.use_historical_filename_templates"
configuration option to True to enable the retrieval of historical log templates, if they are fine with
the Dag Authors being able to execute arbitrary code in webserver when retrieving logs.
2 changes: 2 additions & 0 deletions tests/utils/log/test_file_processor_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from airflow.utils import timezone
from airflow.utils.log.file_processor_handler import FileProcessorHandler
from tests.test_utils.config import conf_vars


class TestFileProcessorHandler:
Expand Down Expand Up @@ -60,6 +61,7 @@ def test_template(self):
handler.set_context(filename=os.path.join(self.dag_dir, "logfile"))
assert os.path.exists(os.path.join(path, "logfile.log"))

@conf_vars({("core", "use_historical_filename_templates"): "True"})
def test_symlink_latest_log_directory(self):
handler = FileProcessorHandler(base_log_folder=self.base_log_folder, filename_template=self.filename)
handler.dag_dir = self.dag_dir
Expand Down
5 changes: 5 additions & 0 deletions tests/utils/log/test_log_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def prepare_db(self, create_task_instance):
session.delete(log_template)
session.commit()

@conf_vars({("core", "use_historical_filename_templates"): "True"})
def test_test_read_log_chunks_should_read_one_try(self):
task_log_reader = TaskLogReader()
ti = copy.copy(self.ti)
Expand All @@ -137,6 +138,7 @@ def test_test_read_log_chunks_should_read_one_try(self):
]
assert metadatas == {"end_of_log": True, "log_pos": 13}

@conf_vars({("core", "use_historical_filename_templates"): "True"})
def test_test_read_log_chunks_should_read_all_files(self):
task_log_reader = TaskLogReader()
ti = copy.copy(self.ti)
Expand All @@ -152,6 +154,7 @@ def test_test_read_log_chunks_should_read_all_files(self):
assert f"try_number={i + 1}." in logs[i][0][1]
assert metadatas == {"end_of_log": True, "log_pos": 13}

@conf_vars({("core", "use_historical_filename_templates"): "True"})
def test_test_test_read_log_stream_should_read_one_try(self):
task_log_reader = TaskLogReader()
ti = copy.copy(self.ti)
Expand All @@ -163,6 +166,7 @@ def test_test_test_read_log_stream_should_read_one_try(self):
" INFO - ::endgroup::\ntry_number=1.\n"
]

@conf_vars({("core", "use_historical_filename_templates"): "True"})
def test_test_test_read_log_stream_should_read_all_logs(self):
task_log_reader = TaskLogReader()
self.ti.state = TaskInstanceState.SUCCESS # Ensure mocked instance is completed to return stream
Expand Down Expand Up @@ -262,6 +266,7 @@ def test_supports_external_link(self):
mock_prop.return_value = True
assert task_log_reader.supports_external_link

@conf_vars({("core", "use_historical_filename_templates"): "True"})
def test_task_log_filename_unique(self, dag_maker):
"""Ensure the default log_filename_template produces a unique filename.

Expand Down
36 changes: 36 additions & 0 deletions tests/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ def test_set_context_trigger(self, create_dummy_dag, dag_maker, is_a_trigger, se


class TestFilenameRendering:
@conf_vars({("core", "use_historical_filename_templates"): "True"})
def test_python_formatting(self, create_log_template, create_task_instance):
create_log_template("{dag_id}/{task_id}/{execution_date}/{try_number}.log")
filename_rendering_ti = create_task_instance(
Expand All @@ -606,6 +607,24 @@ def test_python_formatting(self, create_log_template, create_task_instance):
rendered_filename = fth._render_filename(filename_rendering_ti, 42)
assert expected_filename == rendered_filename

def test_python_formatting_historical_logs_not_enabled(self, create_log_template, create_task_instance):
create_log_template("{dag_id}/{task_id}/{execution_date}/{try_number}.log")
filename_rendering_ti = create_task_instance(
dag_id="dag_for_testing_filename_rendering",
task_id="task_for_testing_filename_rendering",
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
)

expected_filename = (
f"dag_id=dag_for_testing_filename_rendering/"
f"run_id=scheduled__{DEFAULT_DATE.isoformat()}/task_id=task_for_testing_filename_rendering/attempt=42.log"
)
fth = FileTaskHandler("")
rendered_filename = fth._render_filename(filename_rendering_ti, 42)
assert expected_filename == rendered_filename

@conf_vars({("core", "use_historical_filename_templates"): "True"})
def test_jinja_rendering(self, create_log_template, create_task_instance):
create_log_template("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log")
filename_rendering_ti = create_task_instance(
Expand All @@ -623,6 +642,23 @@ def test_jinja_rendering(self, create_log_template, create_task_instance):
rendered_filename = fth._render_filename(filename_rendering_ti, 42)
assert expected_filename == rendered_filename

def test_jinja_rendering_historical_logs_not_enabled(self, create_log_template, create_task_instance):
create_log_template("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log")
filename_rendering_ti = create_task_instance(
dag_id="dag_for_testing_filename_rendering",
task_id="task_for_testing_filename_rendering",
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
)

expected_filename = (
f"dag_id=dag_for_testing_filename_rendering/"
f"run_id=scheduled__{DEFAULT_DATE.isoformat()}/task_id=task_for_testing_filename_rendering/attempt=42.log"
)
fth = FileTaskHandler("")
rendered_filename = fth._render_filename(filename_rendering_ti, 42)
assert expected_filename == rendered_filename


class TestLogUrl:
def test_log_retrieval_valid(self, create_task_instance):
Expand Down
23 changes: 23 additions & 0 deletions tests/www/views/test_views_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ def dag_run_with_log_filename(tis):
session.query(LogTemplate).filter(LogTemplate.id == log_template.id).delete()


@conf_vars({("core", "use_historical_filename_templates"): "True"})
def test_get_logs_for_changed_filename_format_db(
log_admin_client, dag_run_with_log_filename, create_expected_log_file
):
Expand All @@ -323,6 +324,28 @@ def test_get_logs_for_changed_filename_format_db(
assert expected_filename in content_disposition


def test_get_logs_for_changed_filename_format_db_historical_logs_not_enabled(
log_admin_client, dag_run_with_log_filename, create_expected_log_file
):
try_number = 1
create_expected_log_file(try_number)
url = (
f"get_logs_with_metadata?dag_id={dag_run_with_log_filename.dag_id}&"
f"task_id={TASK_ID}&"
f"execution_date={urllib.parse.quote_plus(dag_run_with_log_filename.logical_date.isoformat())}&"
f"try_number={try_number}&metadata={{}}&format=file"
)
response = log_admin_client.get(url)

# Should find the log under corresponding db entry.
assert 200 == response.status_code
assert "Log for testing." in response.data.decode("utf-8")
content_disposition = response.headers["Content-Disposition"]
expected_filename = f"dag_id={dag_run_with_log_filename.dag_id}/run_id={dag_run_with_log_filename.run_id}/task_id={TASK_ID}/attempt={try_number}.log"
assert content_disposition.startswith("attachment")
assert expected_filename in content_disposition


@unittest.mock.patch(
"airflow.utils.log.file_task_handler.FileTaskHandler.read",
side_effect=[
Expand Down
Loading