diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py b/airflow-core/src/airflow/utils/log/file_task_handler.py index 124da295bb08a..4dc9051c02559 100644 --- a/airflow-core/src/airflow/utils/log/file_task_handler.py +++ b/airflow-core/src/airflow/utils/log/file_task_handler.py @@ -862,13 +862,13 @@ def _read_from_local( def _read_from_logs_server( self, - ti: TaskInstance, + ti: TaskInstance | TaskInstanceHistory, worker_log_rel_path: str, ) -> LogResponse: sources: LogSourceInfo = [] log_streams: list[RawLogStream] = [] try: - log_type = LogType.TRIGGER if ti.triggerer_job else LogType.WORKER + log_type = LogType.TRIGGER if getattr(ti, "triggerer_job", False) else LogType.WORKER url, rel_path = self._get_log_retrieval_url(ti, worker_log_rel_path, log_type=log_type) response = _fetch_logs_from_service(url, rel_path) if response.status_code == 403: diff --git a/airflow-core/tests/unit/utils/test_log_handlers.py b/airflow-core/tests/unit/utils/test_log_handlers.py index 9c32e9b232aa6..ba9d09718cd66 100644 --- a/airflow-core/tests/unit/utils/test_log_handlers.py +++ b/airflow-core/tests/unit/utils/test_log_handlers.py @@ -562,6 +562,18 @@ def test__read_served_logs_checked_when_done_and_no_local_or_remote_logs( assert extract_events(logs, False) == expected_logs assert metadata == {"end_of_log": True, "log_pos": 3} + @pytest.mark.parametrize("is_tih", [False, True]) + def test_read_served_logs(self, is_tih, create_task_instance): + ti = create_task_instance( + state=TaskInstanceState.SUCCESS, + hostname="test_hostname", + ) + if is_tih: + ti = TaskInstanceHistory(ti, ti.state) + fth = FileTaskHandler("") + sources, _ = fth._read_from_logs_server(ti, "test.log") + assert len(sources) > 0 + def test_add_triggerer_suffix(self): sample = "any/path/to/thing.txt" assert FileTaskHandler.add_triggerer_suffix(sample) == sample + ".trigger"