diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 1e52a05bd43f6..536630de58ab7 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -927,6 +927,20 @@ logging: default: "dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/\ {%% if ti.map_index >= 0 %%}map_index={{ ti.map_index }}/{%% endif %%}\ attempt={{ try_number }}.log" + use_historical_filename_templates: + description: | + When this parameter is set to ``True``, Airflow will use the old filename templates for historical + tasks. Similarly in this case elasticsearch_id is not properly set for historical tasks if you + change it. Both require access to the database to render the template filenames + by webserver, and it might lead to Dag Authors being able to execute code on the webserver, that's why + it's disabled by default - but it might lead to old logs not being displayed in the webserver UI. + You can enable it you change the value of ``log_filename_template`` in the past and want to be able + to see the logs for historical tasks, however you should only do that if you trust your Dag authors + to not abuse the capability of executing arbitrary code on the webserver through template rendering. + version_added: 2.11.1 + type: boolean + example: ~ + default: "False" log_processor_filename_template: description: | Formatting for how airflow generates file names for log diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 30d1bda13d904..224f7b79ed769 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -58,7 +58,7 @@ from airflow.models.base import Base, StringID from airflow.models.expandinput import NotFullyPopulated from airflow.models.taskinstance import TaskInstance as TI -from airflow.models.tasklog import LogTemplate +from airflow.models.tasklog import LogTemplate, LogTemplateDataClass from airflow.stats import Stats from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.dependencies_states import SCHEDULEABLE_STATES @@ -1648,9 +1648,25 @@ def schedule_tis( return count @provide_session - def get_log_template(self, *, session: Session = NEW_SESSION) -> LogTemplate | LogTemplatePydantic: + def get_db_log_template(self, *, session: Session = NEW_SESSION) -> LogTemplate | LogTemplatePydantic: return DagRun._get_log_template(log_template_id=self.log_template_id, session=session) + @provide_session + def get_log_template( + self, session: Session = NEW_SESSION + ) -> LogTemplate | LogTemplatePydantic | LogTemplateDataClass: + if airflow_conf.getboolean("core", "use_historical_filename_templates", fallback=False): + return self.get_db_log_template(session=session) + else: + return LogTemplateDataClass( + filename=airflow_conf.get_mandatory_value("core", "log_filename_template"), + elasticsearch_id=airflow_conf.get( + "elasticsearch", + "log_id_template", + fallback="{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}", + ), + ) + @staticmethod @internal_api_call @provide_session diff --git a/airflow/models/tasklog.py b/airflow/models/tasklog.py index d55eb94a266d7..758c145c55a91 100644 --- a/airflow/models/tasklog.py +++ b/airflow/models/tasklog.py @@ -17,6 +17,8 @@ # under the License. from __future__ import annotations +from dataclasses import dataclass + from sqlalchemy import Column, Integer, Text from airflow.models.base import Base @@ -42,3 +44,16 @@ class LogTemplate(Base): def __repr__(self) -> str: attrs = ", ".join(f"{k}={getattr(self, k)}" for k in ("filename", "elasticsearch_id")) return f"LogTemplate({attrs})" + + +@dataclass +class LogTemplateDataClass: + """ + Dataclass for log template (used when log template is read from configuration, not database). + + :field filename: log filename template + :field elasticsearch_id: Elasticsearch document ID for log template + """ + + filename: str + elasticsearch_id: str diff --git a/newsfragments/61880.significant.rst b/newsfragments/61880.significant.rst new file mode 100644 index 0000000000000..d00105206c4c6 --- /dev/null +++ b/newsfragments/61880.significant.rst @@ -0,0 +1,17 @@ +Retrieving historical log templates is disabled in Airflow 2.11.1 + +When you change the log template in Airflow 2.11.1, the historical log templates are not retrieved. +This means that if you have existing logs that were generated using a different log template, +they will not be accessible using the new log template. + +This change is due to potential security issues that could arise from retrieving historical log templates, +which allow Dag Authors to execute arbitrary code in webserver when retrieving logs. +By disabling the retrieval of historical log templates, Airflow 2.11.1 aims to enhance the security of the +system and prevent potential vulnerabilities in case the potential of executing arbitrary code in webserver +is important for Airflow deployment. + +Users who need to access historical logs generated with a different log template will need to manually +update their log files to match the naming of their historical log files with the latest log template +configured in Airflow configuration, or they can set the "core.use_historical_filename_templates" +configuration option to True to enable the retrieval of historical log templates, if they are fine with +the Dag Authors being able to execute arbitrary code in webserver when retrieving logs. diff --git a/tests/utils/log/test_file_processor_handler.py b/tests/utils/log/test_file_processor_handler.py index f50fc619ac5bd..9e7504e098d78 100644 --- a/tests/utils/log/test_file_processor_handler.py +++ b/tests/utils/log/test_file_processor_handler.py @@ -25,6 +25,7 @@ from airflow.utils import timezone from airflow.utils.log.file_processor_handler import FileProcessorHandler +from tests.test_utils.config import conf_vars class TestFileProcessorHandler: @@ -60,6 +61,7 @@ def test_template(self): handler.set_context(filename=os.path.join(self.dag_dir, "logfile")) assert os.path.exists(os.path.join(path, "logfile.log")) + @conf_vars({("core", "use_historical_filename_templates"): "True"}) def test_symlink_latest_log_directory(self): handler = FileProcessorHandler(base_log_folder=self.base_log_folder, filename_template=self.filename) handler.dag_dir = self.dag_dir diff --git a/tests/utils/log/test_log_reader.py b/tests/utils/log/test_log_reader.py index 2463db11a5bcd..6e9bd2d3ca877 100644 --- a/tests/utils/log/test_log_reader.py +++ b/tests/utils/log/test_log_reader.py @@ -120,6 +120,7 @@ def prepare_db(self, create_task_instance): session.delete(log_template) session.commit() + @conf_vars({("core", "use_historical_filename_templates"): "True"}) def test_test_read_log_chunks_should_read_one_try(self): task_log_reader = TaskLogReader() ti = copy.copy(self.ti) @@ -137,6 +138,7 @@ def test_test_read_log_chunks_should_read_one_try(self): ] assert metadatas == {"end_of_log": True, "log_pos": 13} + @conf_vars({("core", "use_historical_filename_templates"): "True"}) def test_test_read_log_chunks_should_read_all_files(self): task_log_reader = TaskLogReader() ti = copy.copy(self.ti) @@ -152,6 +154,7 @@ def test_test_read_log_chunks_should_read_all_files(self): assert f"try_number={i + 1}." in logs[i][0][1] assert metadatas == {"end_of_log": True, "log_pos": 13} + @conf_vars({("core", "use_historical_filename_templates"): "True"}) def test_test_test_read_log_stream_should_read_one_try(self): task_log_reader = TaskLogReader() ti = copy.copy(self.ti) @@ -163,6 +166,7 @@ def test_test_test_read_log_stream_should_read_one_try(self): " INFO - ::endgroup::\ntry_number=1.\n" ] + @conf_vars({("core", "use_historical_filename_templates"): "True"}) def test_test_test_read_log_stream_should_read_all_logs(self): task_log_reader = TaskLogReader() self.ti.state = TaskInstanceState.SUCCESS # Ensure mocked instance is completed to return stream @@ -262,6 +266,7 @@ def test_supports_external_link(self): mock_prop.return_value = True assert task_log_reader.supports_external_link + @conf_vars({("core", "use_historical_filename_templates"): "True"}) def test_task_log_filename_unique(self, dag_maker): """Ensure the default log_filename_template produces a unique filename. diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index 95483f2285fa8..ab546332e3d2e 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -589,6 +589,7 @@ def test_set_context_trigger(self, create_dummy_dag, dag_maker, is_a_trigger, se class TestFilenameRendering: + @conf_vars({("core", "use_historical_filename_templates"): "True"}) def test_python_formatting(self, create_log_template, create_task_instance): create_log_template("{dag_id}/{task_id}/{execution_date}/{try_number}.log") filename_rendering_ti = create_task_instance( @@ -606,6 +607,24 @@ def test_python_formatting(self, create_log_template, create_task_instance): rendered_filename = fth._render_filename(filename_rendering_ti, 42) assert expected_filename == rendered_filename + def test_python_formatting_historical_logs_not_enabled(self, create_log_template, create_task_instance): + create_log_template("{dag_id}/{task_id}/{execution_date}/{try_number}.log") + filename_rendering_ti = create_task_instance( + dag_id="dag_for_testing_filename_rendering", + task_id="task_for_testing_filename_rendering", + run_type=DagRunType.SCHEDULED, + execution_date=DEFAULT_DATE, + ) + + expected_filename = ( + f"dag_id=dag_for_testing_filename_rendering/" + f"run_id=scheduled__{DEFAULT_DATE.isoformat()}/task_id=task_for_testing_filename_rendering/attempt=42.log" + ) + fth = FileTaskHandler("") + rendered_filename = fth._render_filename(filename_rendering_ti, 42) + assert expected_filename == rendered_filename + + @conf_vars({("core", "use_historical_filename_templates"): "True"}) def test_jinja_rendering(self, create_log_template, create_task_instance): create_log_template("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log") filename_rendering_ti = create_task_instance( @@ -623,6 +642,23 @@ def test_jinja_rendering(self, create_log_template, create_task_instance): rendered_filename = fth._render_filename(filename_rendering_ti, 42) assert expected_filename == rendered_filename + def test_jinja_rendering_historical_logs_not_enabled(self, create_log_template, create_task_instance): + create_log_template("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log") + filename_rendering_ti = create_task_instance( + dag_id="dag_for_testing_filename_rendering", + task_id="task_for_testing_filename_rendering", + run_type=DagRunType.SCHEDULED, + execution_date=DEFAULT_DATE, + ) + + expected_filename = ( + f"dag_id=dag_for_testing_filename_rendering/" + f"run_id=scheduled__{DEFAULT_DATE.isoformat()}/task_id=task_for_testing_filename_rendering/attempt=42.log" + ) + fth = FileTaskHandler("") + rendered_filename = fth._render_filename(filename_rendering_ti, 42) + assert expected_filename == rendered_filename + class TestLogUrl: def test_log_retrieval_valid(self, create_task_instance): diff --git a/tests/www/views/test_views_log.py b/tests/www/views/test_views_log.py index 2607317c5fccc..c2fa9bffd655b 100644 --- a/tests/www/views/test_views_log.py +++ b/tests/www/views/test_views_log.py @@ -299,6 +299,7 @@ def dag_run_with_log_filename(tis): session.query(LogTemplate).filter(LogTemplate.id == log_template.id).delete() +@conf_vars({("core", "use_historical_filename_templates"): "True"}) def test_get_logs_for_changed_filename_format_db( log_admin_client, dag_run_with_log_filename, create_expected_log_file ): @@ -323,6 +324,28 @@ def test_get_logs_for_changed_filename_format_db( assert expected_filename in content_disposition +def test_get_logs_for_changed_filename_format_db_historical_logs_not_enabled( + log_admin_client, dag_run_with_log_filename, create_expected_log_file +): + try_number = 1 + create_expected_log_file(try_number) + url = ( + f"get_logs_with_metadata?dag_id={dag_run_with_log_filename.dag_id}&" + f"task_id={TASK_ID}&" + f"execution_date={urllib.parse.quote_plus(dag_run_with_log_filename.logical_date.isoformat())}&" + f"try_number={try_number}&metadata={{}}&format=file" + ) + response = log_admin_client.get(url) + + # Should find the log under corresponding db entry. + assert 200 == response.status_code + assert "Log for testing." in response.data.decode("utf-8") + content_disposition = response.headers["Content-Disposition"] + expected_filename = f"dag_id={dag_run_with_log_filename.dag_id}/run_id={dag_run_with_log_filename.run_id}/task_id={TASK_ID}/attempt={try_number}.log" + assert content_disposition.startswith("attachment") + assert expected_filename in content_disposition + + @unittest.mock.patch( "airflow.utils.log.file_task_handler.FileTaskHandler.read", side_effect=[