diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index e20038c49d3bb..76ce946889ba7 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -36,13 +36,13 @@ from airflow.utils.helpers import parse_template_string from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.json_formatter import JSONFormatter -from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin # Elasticsearch hosted log type EsLogMsgType = List[Tuple[str, str]] -class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin): +class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin): """ ElasticsearchTaskHandler is a python log handler that reads logs from Elasticsearch. Note logs are not directly @@ -350,6 +350,11 @@ def get_external_log_url(self, task_instance: TaskInstance, try_number: int) -> url = 'https://' + self.frontend.format(log_id=quote(log_id)) return url + @property + def supports_external_link(self) -> bool: + """Whether we can support external links""" + return bool(self.frontend) + class _ESJsonLogFmt: """Helper class to read ES Logs and re-format it to match settings.LOG_FORMAT""" diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py index 0e1f69198d014..bebe369c2b0c4 100644 --- a/airflow/utils/log/log_reader.py +++ b/airflow/utils/log/log_reader.py @@ -98,9 +98,12 @@ def supports_read(self): return hasattr(self.log_handler, 'read') @property - def supports_external_link(self): + def supports_external_link(self) -> bool: """Check if the logging handler supports external links (e.g. to Elasticsearch, Stackdriver, etc).""" - return isinstance(self.log_handler, ExternalLoggingMixin) + if not isinstance(self.log_handler, ExternalLoggingMixin): + return False + + return self.log_handler.supports_external_link def render_log_filename(self, ti: TaskInstance, try_number: Optional[int] = None): """ diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py index e101eb58fdc74..1fc075724223f 100644 --- a/airflow/utils/log/logging_mixin.py +++ b/airflow/utils/log/logging_mixin.py @@ -66,6 +66,11 @@ def log_name(self) -> str: def get_external_log_url(self, task_instance, try_number) -> str: """Return the URL for log visualization in the external service.""" + @property + @abc.abstractmethod + def supports_external_link(self) -> bool: + """Return whether handler is able to support external links.""" + # TODO: Formally inherit from io.IOBase class StreamLogWriter: diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py index 9a5ec3b0ef1c8..d06e016726929 100644 --- a/tests/providers/elasticsearch/log/test_es_task_handler.py +++ b/tests/providers/elasticsearch/log/test_es_task_handler.py @@ -443,3 +443,13 @@ def test_get_external_log_url(self, es_frontend, expected_url): ) url = es_task_handler.get_external_log_url(self.ti, self.ti.try_number) assert expected_url == url + + @parameterized.expand( + [ + ('localhost:5601/{log_id}', True), + (None, False), + ] + ) + def test_supports_external_link(self, frontend, expected): + self.es_task_handler.frontend = frontend + assert self.es_task_handler.supports_external_link == expected diff --git a/tests/utils/log/test_log_reader.py b/tests/utils/log/test_log_reader.py index cef374ca5cbed..301eed71bd06e 100644 --- a/tests/utils/log/test_log_reader.py +++ b/tests/utils/log/test_log_reader.py @@ -30,6 +30,7 @@ from airflow.operators.dummy import DummyOperator from airflow.utils import timezone from airflow.utils.log.log_reader import TaskLogReader +from airflow.utils.log.logging_mixin import ExternalLoggingMixin from airflow.utils.session import create_session from tests.test_utils.config import conf_vars from tests.test_utils.db import clear_db_runs @@ -215,3 +216,23 @@ def test_read_log_stream_should_read_each_try_in_turn(self, mock_read): ], any_order=False, ) + + def test_supports_external_link(self): + task_log_reader = TaskLogReader() + + # Short circuit if log_handler doesn't include ExternalLoggingMixin + task_log_reader.log_handler = mock.MagicMock() + mock_prop = mock.PropertyMock() + mock_prop.return_value = False + type(task_log_reader.log_handler).supports_external_link = mock_prop + assert not task_log_reader.supports_external_link + mock_prop.assert_not_called() + + # Otherwise, defer to the log_handlers supports_external_link + task_log_reader.log_handler = mock.MagicMock(spec=ExternalLoggingMixin) + type(task_log_reader.log_handler).supports_external_link = mock_prop + assert not task_log_reader.supports_external_link + mock_prop.assert_called_once() + + mock_prop.return_value = True + assert task_log_reader.supports_external_link diff --git a/tests/www/views/test_views_log.py b/tests/www/views/test_views_log.py index a56b0f0ea4213..152c43db666f4 100644 --- a/tests/www/views/test_views_log.py +++ b/tests/www/views/test_views_log.py @@ -379,17 +379,25 @@ def test_redirect_to_external_log_with_local_log_handler(log_admin_client, task_ assert 'http://localhost/home' == response.headers['Location'] -class ExternalHandler(ExternalLoggingMixin): +class _ExternalHandler(ExternalLoggingMixin): EXTERNAL_URL = 'http://external-service.com' - def get_external_log_url(self, *args, **kwargs): + @property + def log_name(self) -> str: + return 'ExternalLog' + + def get_external_log_url(self, *args, **kwargs) -> str: return self.EXTERNAL_URL + @property + def supports_external_link(self) -> bool: + return True + @unittest.mock.patch( 'airflow.utils.log.log_reader.TaskLogReader.log_handler', new_callable=unittest.mock.PropertyMock, - return_value=ExternalHandler(), + return_value=_ExternalHandler(), ) def test_redirect_to_external_log_with_external_log_handler(_, log_admin_client): url_template = "redirect_to_external_log?dag_id={}&task_id={}&execution_date={}&try_number={}" @@ -402,4 +410,4 @@ def test_redirect_to_external_log_with_external_log_handler(_, log_admin_client) ) response = log_admin_client.get(url) assert 302 == response.status_code - assert ExternalHandler.EXTERNAL_URL == response.headers['Location'] + assert _ExternalHandler.EXTERNAL_URL == response.headers['Location'] diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py index b8c2d83cf5d6e..7c94006f288ae 100644 --- a/tests/www/views/test_views_tasks.py +++ b/tests/www/views/test_views_tasks.py @@ -551,12 +551,20 @@ def test_show_external_log_redirect_link_with_local_log_handler(capture_template class _ExternalHandler(ExternalLoggingMixin): + _supports_external_link = True LOG_NAME = 'ExternalLog' @property - def log_name(self): + def log_name(self) -> str: return self.LOG_NAME + def get_external_log_url(self, *args, **kwargs) -> str: + return 'http://external-service.com' + + @property + def supports_external_link(self) -> bool: + return self._supports_external_link + @pytest.mark.parametrize("endpoint", ["graph", "tree"]) @unittest.mock.patch( @@ -574,3 +582,22 @@ def test_show_external_log_redirect_link_with_external_log_handler( ctx = templates[0].local_context assert ctx['show_external_log_redirect'] assert ctx['external_log_name'] == _ExternalHandler.LOG_NAME + + +@pytest.mark.parametrize("endpoint", ["graph", "tree"]) +@unittest.mock.patch( + 'airflow.utils.log.log_reader.TaskLogReader.log_handler', + new_callable=unittest.mock.PropertyMock, + return_value=_ExternalHandler(), +) +def test_external_log_redirect_link_with_external_log_handler_not_shown( + _external_handler, capture_templates, admin_client, endpoint +): + """Show external links if log handler is external.""" + _external_handler.return_value._supports_external_link = False + url = f'{endpoint}?dag_id=example_bash_operator' + with capture_templates() as templates: + admin_client.get(url, follow_redirects=True) + ctx = templates[0].local_context + assert not ctx['show_external_log_redirect'] + assert ctx['external_log_name'] is None