Skip to content

Commit

Permalink
Fix external elasticsearch logs link (#16357)
Browse files Browse the repository at this point in the history
During the 2.0 upgrade, the external log link when using elasticsearch
remote logs was broken. This fixes it, including it only being shown if
`[elasticsearch] frontend` is set.

(cherry picked from commit e31e515)
  • Loading branch information
jedcunningham authored and jhtimmins committed Aug 12, 2021
1 parent 3d6db6d commit 5ea02cc
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 9 deletions.
9 changes: 7 additions & 2 deletions airflow/providers/elasticsearch/log/es_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@
from airflow.utils.helpers import parse_template_string
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.json_formatter import JSONFormatter
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin

# Elasticsearch hosted log type
EsLogMsgType = List[Tuple[str, str]]


class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin):
"""
ElasticsearchTaskHandler is a python log handler that
reads logs from Elasticsearch. Note logs are not directly
Expand Down Expand Up @@ -350,6 +350,11 @@ def get_external_log_url(self, task_instance: TaskInstance, try_number: int) ->
url = 'https://' + self.frontend.format(log_id=quote(log_id))
return url

@property
def supports_external_link(self) -> bool:
"""Whether we can support external links"""
return bool(self.frontend)


class _ESJsonLogFmt:
"""Helper class to read ES Logs and re-format it to match settings.LOG_FORMAT"""
Expand Down
7 changes: 5 additions & 2 deletions airflow/utils/log/log_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,12 @@ def supports_read(self):
return hasattr(self.log_handler, 'read')

@property
def supports_external_link(self):
def supports_external_link(self) -> bool:
"""Check if the logging handler supports external links (e.g. to Elasticsearch, Stackdriver, etc)."""
return isinstance(self.log_handler, ExternalLoggingMixin)
if not isinstance(self.log_handler, ExternalLoggingMixin):
return False

return self.log_handler.supports_external_link

def render_log_filename(self, ti: TaskInstance, try_number: Optional[int] = None):
"""
Expand Down
5 changes: 5 additions & 0 deletions airflow/utils/log/logging_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ def log_name(self) -> str:
def get_external_log_url(self, task_instance, try_number) -> str:
"""Return the URL for log visualization in the external service."""

@property
@abc.abstractmethod
def supports_external_link(self) -> bool:
"""Return whether handler is able to support external links."""


# TODO: Formally inherit from io.IOBase
class StreamLogWriter:
Expand Down
10 changes: 10 additions & 0 deletions tests/providers/elasticsearch/log/test_es_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,3 +443,13 @@ def test_get_external_log_url(self, es_frontend, expected_url):
)
url = es_task_handler.get_external_log_url(self.ti, self.ti.try_number)
assert expected_url == url

@parameterized.expand(
[
('localhost:5601/{log_id}', True),
(None, False),
]
)
def test_supports_external_link(self, frontend, expected):
self.es_task_handler.frontend = frontend
assert self.es_task_handler.supports_external_link == expected
21 changes: 21 additions & 0 deletions tests/utils/log/test_log_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from airflow.operators.dummy import DummyOperator
from airflow.utils import timezone
from airflow.utils.log.log_reader import TaskLogReader
from airflow.utils.log.logging_mixin import ExternalLoggingMixin
from airflow.utils.session import create_session
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_runs
Expand Down Expand Up @@ -215,3 +216,23 @@ def test_read_log_stream_should_read_each_try_in_turn(self, mock_read):
],
any_order=False,
)

def test_supports_external_link(self):
task_log_reader = TaskLogReader()

# Short circuit if log_handler doesn't include ExternalLoggingMixin
task_log_reader.log_handler = mock.MagicMock()
mock_prop = mock.PropertyMock()
mock_prop.return_value = False
type(task_log_reader.log_handler).supports_external_link = mock_prop
assert not task_log_reader.supports_external_link
mock_prop.assert_not_called()

# Otherwise, defer to the log_handlers supports_external_link
task_log_reader.log_handler = mock.MagicMock(spec=ExternalLoggingMixin)
type(task_log_reader.log_handler).supports_external_link = mock_prop
assert not task_log_reader.supports_external_link
mock_prop.assert_called_once()

mock_prop.return_value = True
assert task_log_reader.supports_external_link
16 changes: 12 additions & 4 deletions tests/www/views/test_views_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,17 +379,25 @@ def test_redirect_to_external_log_with_local_log_handler(log_admin_client, task_
assert 'http://localhost/home' == response.headers['Location']


class ExternalHandler(ExternalLoggingMixin):
class _ExternalHandler(ExternalLoggingMixin):
EXTERNAL_URL = 'http://external-service.com'

def get_external_log_url(self, *args, **kwargs):
@property
def log_name(self) -> str:
return 'ExternalLog'

def get_external_log_url(self, *args, **kwargs) -> str:
return self.EXTERNAL_URL

@property
def supports_external_link(self) -> bool:
return True


@unittest.mock.patch(
'airflow.utils.log.log_reader.TaskLogReader.log_handler',
new_callable=unittest.mock.PropertyMock,
return_value=ExternalHandler(),
return_value=_ExternalHandler(),
)
def test_redirect_to_external_log_with_external_log_handler(_, log_admin_client):
url_template = "redirect_to_external_log?dag_id={}&task_id={}&execution_date={}&try_number={}"
Expand All @@ -402,4 +410,4 @@ def test_redirect_to_external_log_with_external_log_handler(_, log_admin_client)
)
response = log_admin_client.get(url)
assert 302 == response.status_code
assert ExternalHandler.EXTERNAL_URL == response.headers['Location']
assert _ExternalHandler.EXTERNAL_URL == response.headers['Location']
128 changes: 127 additions & 1 deletion tests/www/views/test_views_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,12 +551,20 @@ def test_show_external_log_redirect_link_with_local_log_handler(capture_template


class _ExternalHandler(ExternalLoggingMixin):
_supports_external_link = True
LOG_NAME = 'ExternalLog'

@property
def log_name(self):
def log_name(self) -> str:
return self.LOG_NAME

def get_external_log_url(self, *args, **kwargs) -> str:
return 'http://external-service.com'

@property
def supports_external_link(self) -> bool:
return self._supports_external_link


@pytest.mark.parametrize("endpoint", ["graph", "tree"])
@unittest.mock.patch(
Expand All @@ -574,3 +582,121 @@ def test_show_external_log_redirect_link_with_external_log_handler(
ctx = templates[0].local_context
assert ctx['show_external_log_redirect']
assert ctx['external_log_name'] == _ExternalHandler.LOG_NAME


@pytest.mark.parametrize("endpoint", ["graph", "tree"])
@unittest.mock.patch(
'airflow.utils.log.log_reader.TaskLogReader.log_handler',
new_callable=unittest.mock.PropertyMock,
return_value=_ExternalHandler(),
)
def test_external_log_redirect_link_with_external_log_handler_not_shown(
_external_handler, capture_templates, admin_client, endpoint
):
"""Show external links if log handler is external."""
_external_handler.return_value._supports_external_link = False
url = f'{endpoint}?dag_id=example_bash_operator'
with capture_templates() as templates:
admin_client.get(url, follow_redirects=True)
ctx = templates[0].local_context
assert not ctx['show_external_log_redirect']
assert ctx['external_log_name'] is None


def _get_appbuilder_pk_string(model_view_cls, instance) -> str:
"""Utility to get Flask-Appbuilder's string format "pk" for an object.
Used to generate requests to FAB action views without *too* much difficulty.
The implementation relies on FAB internals, but unfortunately I don't see
a better way around it.
Example usage::
>>> from airflow.www.views import TaskInstanceModelView
>>> ti = session.Query(TaskInstance).filter(...).one()
>>> pk = _get_appbuilder_pk_string(TaskInstanceModelView, ti)
>>> client.post("...", data={"action": "...", "rowid": pk})
"""
pk_value = model_view_cls.datamodel.get_pk_value(instance)
return model_view_cls._serialize_pk_if_composite(model_view_cls, pk_value)


def test_task_instance_clear(session, admin_client):
task_id = "runme_0"

# Set the state to success for clearing.
ti_q = session.query(TaskInstance).filter(TaskInstance.task_id == task_id)
ti_q.update({"state": State.SUCCESS})
session.commit()

# Send a request to clear.
rowid = _get_appbuilder_pk_string(TaskInstanceModelView, ti_q.one())
resp = admin_client.post(
"/taskinstance/action_post",
data={"action": "clear", "rowid": rowid},
follow_redirects=True,
)
assert resp.status_code == 200

# Now the state should be None.
state = session.query(TaskInstance.state).filter(TaskInstance.task_id == task_id).scalar()
assert state == State.NONE


def test_task_instance_clear_failure(admin_client):
rowid = '["12345"]' # F.A.B. crashes if the rowid is *too* invalid.
resp = admin_client.post(
"/taskinstance/action_post",
data={"action": "clear", "rowid": rowid},
follow_redirects=True,
)
assert resp.status_code == 200
check_content_in_response("Failed to clear task instances:", resp)


@pytest.mark.parametrize(
"action, expected_state",
[
("set_running", State.RUNNING),
("set_failed", State.FAILED),
("set_success", State.SUCCESS),
("set_retry", State.UP_FOR_RETRY),
],
ids=["running", "failed", "success", "retry"],
)
def test_task_instance_set_state(session, admin_client, action, expected_state):
task_id = "runme_0"

# Send a request to clear.
ti_q = session.query(TaskInstance).filter(TaskInstance.task_id == task_id)
rowid = _get_appbuilder_pk_string(TaskInstanceModelView, ti_q.one())
resp = admin_client.post(
"/taskinstance/action_post",
data={"action": action, "rowid": rowid},
follow_redirects=True,
)
assert resp.status_code == 200

# Now the state should be modified.
state = session.query(TaskInstance.state).filter(TaskInstance.task_id == task_id).scalar()
assert state == expected_state


@pytest.mark.parametrize(
"action",
[
"set_running",
"set_failed",
"set_success",
"set_retry",
],
)
def test_task_instance_set_state_failure(admin_client, action):
rowid = '["12345"]' # F.A.B. crashes if the rowid is *too* invalid.
resp = admin_client.post(
"/taskinstance/action_post",
data={"action": action, "rowid": rowid},
follow_redirects=True,
)
assert resp.status_code == 200
check_content_in_response("Failed to set state", resp)

0 comments on commit 5ea02cc

Please sign in to comment.