diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index e20038c49d3bb..76ce946889ba7 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -36,13 +36,13 @@ from airflow.utils.helpers import parse_template_string from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.json_formatter import JSONFormatter -from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin # Elasticsearch hosted log type EsLogMsgType = List[Tuple[str, str]] -class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin): +class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin): """ ElasticsearchTaskHandler is a python log handler that reads logs from Elasticsearch. Note logs are not directly @@ -350,6 +350,11 @@ def get_external_log_url(self, task_instance: TaskInstance, try_number: int) -> url = 'https://' + self.frontend.format(log_id=quote(log_id)) return url + @property + def supports_external_link(self) -> bool: + """Whether we can support external links""" + return bool(self.frontend) + class _ESJsonLogFmt: """Helper class to read ES Logs and re-format it to match settings.LOG_FORMAT""" diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py index 0e1f69198d014..bebe369c2b0c4 100644 --- a/airflow/utils/log/log_reader.py +++ b/airflow/utils/log/log_reader.py @@ -98,9 +98,12 @@ def supports_read(self): return hasattr(self.log_handler, 'read') @property - def supports_external_link(self): + def supports_external_link(self) -> bool: """Check if the logging handler supports external links (e.g. to Elasticsearch, Stackdriver, etc).""" - return isinstance(self.log_handler, ExternalLoggingMixin) + if not isinstance(self.log_handler, ExternalLoggingMixin): + return False + + return self.log_handler.supports_external_link def render_log_filename(self, ti: TaskInstance, try_number: Optional[int] = None): """ diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py index e101eb58fdc74..1fc075724223f 100644 --- a/airflow/utils/log/logging_mixin.py +++ b/airflow/utils/log/logging_mixin.py @@ -66,6 +66,11 @@ def log_name(self) -> str: def get_external_log_url(self, task_instance, try_number) -> str: """Return the URL for log visualization in the external service.""" + @property + @abc.abstractmethod + def supports_external_link(self) -> bool: + """Return whether handler is able to support external links.""" + # TODO: Formally inherit from io.IOBase class StreamLogWriter: diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py index 9a5ec3b0ef1c8..d06e016726929 100644 --- a/tests/providers/elasticsearch/log/test_es_task_handler.py +++ b/tests/providers/elasticsearch/log/test_es_task_handler.py @@ -443,3 +443,13 @@ def test_get_external_log_url(self, es_frontend, expected_url): ) url = es_task_handler.get_external_log_url(self.ti, self.ti.try_number) assert expected_url == url + + @parameterized.expand( + [ + ('localhost:5601/{log_id}', True), + (None, False), + ] + ) + def test_supports_external_link(self, frontend, expected): + self.es_task_handler.frontend = frontend + assert self.es_task_handler.supports_external_link == expected diff --git a/tests/utils/log/test_log_reader.py b/tests/utils/log/test_log_reader.py index cef374ca5cbed..301eed71bd06e 100644 --- a/tests/utils/log/test_log_reader.py +++ b/tests/utils/log/test_log_reader.py @@ -30,6 +30,7 @@ from airflow.operators.dummy import DummyOperator from airflow.utils import timezone from airflow.utils.log.log_reader import TaskLogReader +from airflow.utils.log.logging_mixin import ExternalLoggingMixin from airflow.utils.session import create_session from tests.test_utils.config import conf_vars from tests.test_utils.db import clear_db_runs @@ -215,3 +216,23 @@ def test_read_log_stream_should_read_each_try_in_turn(self, mock_read): ], any_order=False, ) + + def test_supports_external_link(self): + task_log_reader = TaskLogReader() + + # Short circuit if log_handler doesn't include ExternalLoggingMixin + task_log_reader.log_handler = mock.MagicMock() + mock_prop = mock.PropertyMock() + mock_prop.return_value = False + type(task_log_reader.log_handler).supports_external_link = mock_prop + assert not task_log_reader.supports_external_link + mock_prop.assert_not_called() + + # Otherwise, defer to the log_handlers supports_external_link + task_log_reader.log_handler = mock.MagicMock(spec=ExternalLoggingMixin) + type(task_log_reader.log_handler).supports_external_link = mock_prop + assert not task_log_reader.supports_external_link + mock_prop.assert_called_once() + + mock_prop.return_value = True + assert task_log_reader.supports_external_link diff --git a/tests/www/views/test_views_log.py b/tests/www/views/test_views_log.py index a56b0f0ea4213..152c43db666f4 100644 --- a/tests/www/views/test_views_log.py +++ b/tests/www/views/test_views_log.py @@ -379,17 +379,25 @@ def test_redirect_to_external_log_with_local_log_handler(log_admin_client, task_ assert 'http://localhost/home' == response.headers['Location'] -class ExternalHandler(ExternalLoggingMixin): +class _ExternalHandler(ExternalLoggingMixin): EXTERNAL_URL = 'http://external-service.com' - def get_external_log_url(self, *args, **kwargs): + @property + def log_name(self) -> str: + return 'ExternalLog' + + def get_external_log_url(self, *args, **kwargs) -> str: return self.EXTERNAL_URL + @property + def supports_external_link(self) -> bool: + return True + @unittest.mock.patch( 'airflow.utils.log.log_reader.TaskLogReader.log_handler', new_callable=unittest.mock.PropertyMock, - return_value=ExternalHandler(), + return_value=_ExternalHandler(), ) def test_redirect_to_external_log_with_external_log_handler(_, log_admin_client): url_template = "redirect_to_external_log?dag_id={}&task_id={}&execution_date={}&try_number={}" @@ -402,4 +410,4 @@ def test_redirect_to_external_log_with_external_log_handler(_, log_admin_client) ) response = log_admin_client.get(url) assert 302 == response.status_code - assert ExternalHandler.EXTERNAL_URL == response.headers['Location'] + assert _ExternalHandler.EXTERNAL_URL == response.headers['Location'] diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py index b8c2d83cf5d6e..88f881cea96df 100644 --- a/tests/www/views/test_views_tasks.py +++ b/tests/www/views/test_views_tasks.py @@ -551,12 +551,20 @@ def test_show_external_log_redirect_link_with_local_log_handler(capture_template class _ExternalHandler(ExternalLoggingMixin): + _supports_external_link = True LOG_NAME = 'ExternalLog' @property - def log_name(self): + def log_name(self) -> str: return self.LOG_NAME + def get_external_log_url(self, *args, **kwargs) -> str: + return 'http://external-service.com' + + @property + def supports_external_link(self) -> bool: + return self._supports_external_link + @pytest.mark.parametrize("endpoint", ["graph", "tree"]) @unittest.mock.patch( @@ -574,3 +582,121 @@ def test_show_external_log_redirect_link_with_external_log_handler( ctx = templates[0].local_context assert ctx['show_external_log_redirect'] assert ctx['external_log_name'] == _ExternalHandler.LOG_NAME + + +@pytest.mark.parametrize("endpoint", ["graph", "tree"]) +@unittest.mock.patch( + 'airflow.utils.log.log_reader.TaskLogReader.log_handler', + new_callable=unittest.mock.PropertyMock, + return_value=_ExternalHandler(), +) +def test_external_log_redirect_link_with_external_log_handler_not_shown( + _external_handler, capture_templates, admin_client, endpoint +): + """Show external links if log handler is external.""" + _external_handler.return_value._supports_external_link = False + url = f'{endpoint}?dag_id=example_bash_operator' + with capture_templates() as templates: + admin_client.get(url, follow_redirects=True) + ctx = templates[0].local_context + assert not ctx['show_external_log_redirect'] + assert ctx['external_log_name'] is None + + +def _get_appbuilder_pk_string(model_view_cls, instance) -> str: + """Utility to get Flask-Appbuilder's string format "pk" for an object. + + Used to generate requests to FAB action views without *too* much difficulty. + The implementation relies on FAB internals, but unfortunately I don't see + a better way around it. + + Example usage:: + + >>> from airflow.www.views import TaskInstanceModelView + >>> ti = session.Query(TaskInstance).filter(...).one() + >>> pk = _get_appbuilder_pk_string(TaskInstanceModelView, ti) + >>> client.post("...", data={"action": "...", "rowid": pk}) + """ + pk_value = model_view_cls.datamodel.get_pk_value(instance) + return model_view_cls._serialize_pk_if_composite(model_view_cls, pk_value) + + +def test_task_instance_clear(session, admin_client): + task_id = "runme_0" + + # Set the state to success for clearing. + ti_q = session.query(TaskInstance).filter(TaskInstance.task_id == task_id) + ti_q.update({"state": State.SUCCESS}) + session.commit() + + # Send a request to clear. + rowid = _get_appbuilder_pk_string(TaskInstanceModelView, ti_q.one()) + resp = admin_client.post( + "/taskinstance/action_post", + data={"action": "clear", "rowid": rowid}, + follow_redirects=True, + ) + assert resp.status_code == 200 + + # Now the state should be None. + state = session.query(TaskInstance.state).filter(TaskInstance.task_id == task_id).scalar() + assert state == State.NONE + + +def test_task_instance_clear_failure(admin_client): + rowid = '["12345"]' # F.A.B. crashes if the rowid is *too* invalid. + resp = admin_client.post( + "/taskinstance/action_post", + data={"action": "clear", "rowid": rowid}, + follow_redirects=True, + ) + assert resp.status_code == 200 + check_content_in_response("Failed to clear task instances:", resp) + + +@pytest.mark.parametrize( + "action, expected_state", + [ + ("set_running", State.RUNNING), + ("set_failed", State.FAILED), + ("set_success", State.SUCCESS), + ("set_retry", State.UP_FOR_RETRY), + ], + ids=["running", "failed", "success", "retry"], +) +def test_task_instance_set_state(session, admin_client, action, expected_state): + task_id = "runme_0" + + # Send a request to clear. + ti_q = session.query(TaskInstance).filter(TaskInstance.task_id == task_id) + rowid = _get_appbuilder_pk_string(TaskInstanceModelView, ti_q.one()) + resp = admin_client.post( + "/taskinstance/action_post", + data={"action": action, "rowid": rowid}, + follow_redirects=True, + ) + assert resp.status_code == 200 + + # Now the state should be modified. + state = session.query(TaskInstance.state).filter(TaskInstance.task_id == task_id).scalar() + assert state == expected_state + + +@pytest.mark.parametrize( + "action", + [ + "set_running", + "set_failed", + "set_success", + "set_retry", + ], +) +def test_task_instance_set_state_failure(admin_client, action): + rowid = '["12345"]' # F.A.B. crashes if the rowid is *too* invalid. + resp = admin_client.post( + "/taskinstance/action_post", + data={"action": action, "rowid": rowid}, + follow_redirects=True, + ) + assert resp.status_code == 200 + check_content_in_response("Failed to set state", resp)