From fb7ec83072a213ba3784e9a76a1d7f609efd233a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Wed, 7 Jun 2023 16:49:20 -0700 Subject: [PATCH 1/4] rewrite method used in ecs to fetch less logs The current behavior is to fetch all logs and only keep the last message(s) This is wasteful, as there is an option to fetch from the end directly, allowing to send only the minimum number of requests. Since a generator is used in get_log_events, stopping the iteration after we've collected enough logs prevents it from doing more requests. With this change, we can expect less API calls & faster execution time for this method, especially in tasks that emit a log of logs. --- airflow/providers/amazon/aws/hooks/ecs.py | 14 +++++++---- airflow/providers/amazon/aws/hooks/logs.py | 8 ++++++- tests/providers/amazon/aws/hooks/test_ecs.py | 25 +++++++++----------- 3 files changed, 27 insertions(+), 20 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/ecs.py b/airflow/providers/amazon/aws/hooks/ecs.py index 5f74b4c138d35..85f5fb1fc3f72 100644 --- a/airflow/providers/amazon/aws/hooks/ecs.py +++ b/airflow/providers/amazon/aws/hooks/ecs.py @@ -17,8 +17,8 @@ # under the License. from __future__ import annotations +import itertools import time -from collections import deque from datetime import datetime, timedelta from logging import Logger from threading import Event, Thread @@ -168,7 +168,7 @@ def __init__( self.log_group = log_group self.log_stream_name = log_stream_name - self.hook = AwsLogsHook(aws_conn_id=aws_conn_id, region_name=region_name) + self.logs_hook = AwsLogsHook(aws_conn_id=aws_conn_id, region_name=region_name) def run(self) -> None: logs_to_skip = 0 @@ -181,11 +181,10 @@ def run(self) -> None: def _get_log_events(self, skip: int = 0) -> Generator: try: - yield from self.hook.get_log_events(self.log_group, self.log_stream_name, skip=skip) + yield from self.logs_hook.get_log_events(self.log_group, self.log_stream_name, skip=skip) except ClientError as error: if error.response["Error"]["Code"] != "ResourceNotFoundException": self.logger.warning("Error on retrieving Cloudwatch log events", error) - yield from () except ConnectionClosedError as error: self.logger.warning("ConnectionClosedError on retrieving Cloudwatch log events", error) @@ -198,7 +197,12 @@ def _event_to_str(self, event: dict) -> str: return f"[{formatted_event_dt}] {message}" def get_last_log_messages(self, number_messages) -> list: - return [log["message"] for log in deque(self._get_log_events(), maxlen=number_messages)] + last_logs_iterator = self.logs_hook.get_log_events( + self.log_group, self.log_stream_name, start_from_head=False + ) + truncated = list(itertools.islice(last_logs_iterator, number_messages)) + # need to reverse the order to put the logs back in order after getting them from the end + return [log["message"] for log in reversed(truncated)] def get_last_log_message(self) -> str | None: try: diff --git a/airflow/providers/amazon/aws/hooks/logs.py b/airflow/providers/amazon/aws/hooks/logs.py index 6680e1939e903..a6126152669f3 100644 --- a/airflow/providers/amazon/aws/hooks/logs.py +++ b/airflow/providers/amazon/aws/hooks/logs.py @@ -72,6 +72,7 @@ def get_log_events( This is for when there are multiple entries at the same timestamp. :param start_from_head: whether to start from the beginning (True) of the log or at the end of the log (False). + If iterating from the end of the file, the logs are returned in reverse order. :return: | A CloudWatch log event with the following key-value pairs: | 'timestamp' (int): The time in milliseconds of the event. | 'message' (str): The log event data. @@ -103,7 +104,12 @@ def get_log_events( skip -= event_count events = [] - yield from events + if not start_from_head: + # if we are not reading from head, it doesn't make sense to return events in "normal" order + # while hiding the subsequent calls, bc 1-9 queried by batches of 3 would return 789 456 123 + yield from reversed(events) + else: + yield from events if not event_count: num_consecutive_empty_response += 1 diff --git a/tests/providers/amazon/aws/hooks/test_ecs.py b/tests/providers/amazon/aws/hooks/test_ecs.py index d9a4f53fa8a75..f065c75ecda56 100644 --- a/tests/providers/amazon/aws/hooks/test_ecs.py +++ b/tests/providers/amazon/aws/hooks/test_ecs.py @@ -24,6 +24,7 @@ from airflow.providers.amazon.aws.exceptions import EcsOperatorError, EcsTaskFailToStart from airflow.providers.amazon.aws.hooks.ecs import EcsHook, EcsTaskLogFetcher, should_retry, should_retry_eni +from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook DEFAULT_CONN_ID: str = "aws_default" REGION: str = "us-east-1" @@ -167,29 +168,25 @@ def test_event_to_str(self): def test_get_last_log_message_with_no_log_events(self, mock_log_events): assert self.log_fetcher.get_last_log_message() is None - @mock.patch( - "airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events", - return_value=iter( - [ + @mock.patch.object(AwsLogsHook, "conn") + def test_get_last_log_message_with_log_events(self, log_conn_mock): + log_conn_mock.get_log_events.return_value = { + "events": [ {"timestamp": 1617400267123, "message": "First"}, {"timestamp": 1617400367456, "message": "Second"}, ] - ), - ) - def test_get_last_log_message_with_log_events(self, mock_log_events): + } assert self.log_fetcher.get_last_log_message() == "Second" - @mock.patch( - "airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events", - return_value=iter( - [ + @mock.patch.object(AwsLogsHook, "conn") + def test_get_last_log_messages_with_log_events(self, log_conn_mock): + log_conn_mock.get_log_events.return_value = { + "events": [ {"timestamp": 1617400267123, "message": "First"}, {"timestamp": 1617400367456, "message": "Second"}, {"timestamp": 1617400367458, "message": "Third"}, ] - ), - ) - def test_get_last_log_messages_with_log_events(self, mock_log_events): + } assert self.log_fetcher.get_last_log_messages(2) == ["Second", "Third"] @mock.patch( From 1e678bd16c0f01b49d177ba81a20e0eb3642d2ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Thu, 8 Jun 2023 09:22:39 -0700 Subject: [PATCH 2/4] less memory allocation --- airflow/providers/amazon/aws/hooks/ecs.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/ecs.py b/airflow/providers/amazon/aws/hooks/ecs.py index 85f5fb1fc3f72..3fb39b90cd448 100644 --- a/airflow/providers/amazon/aws/hooks/ecs.py +++ b/airflow/providers/amazon/aws/hooks/ecs.py @@ -200,9 +200,11 @@ def get_last_log_messages(self, number_messages) -> list: last_logs_iterator = self.logs_hook.get_log_events( self.log_group, self.log_stream_name, start_from_head=False ) - truncated = list(itertools.islice(last_logs_iterator, number_messages)) + truncated_iterator = itertools.islice(last_logs_iterator, number_messages) + messages = [log["message"] for log in truncated_iterator] # need to reverse the order to put the logs back in order after getting them from the end - return [log["message"] for log in reversed(truncated)] + messages.reverse() + return messages def get_last_log_message(self) -> str | None: try: From 33c66dba206e3a397995cb6190dc4975f3071aeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Wed, 14 Jun 2023 14:59:00 -0700 Subject: [PATCH 3/4] other approach: use one request & deprecate param --- airflow/providers/amazon/aws/hooks/ecs.py | 19 ++++++----- airflow/providers/amazon/aws/hooks/logs.py | 35 +++++++++++--------- tests/providers/amazon/aws/hooks/test_ecs.py | 21 ++++-------- 3 files changed, 38 insertions(+), 37 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/ecs.py b/airflow/providers/amazon/aws/hooks/ecs.py index 3fb39b90cd448..d74aadae52375 100644 --- a/airflow/providers/amazon/aws/hooks/ecs.py +++ b/airflow/providers/amazon/aws/hooks/ecs.py @@ -17,7 +17,6 @@ # under the License. from __future__ import annotations -import itertools import time from datetime import datetime, timedelta from logging import Logger @@ -197,14 +196,18 @@ def _event_to_str(self, event: dict) -> str: return f"[{formatted_event_dt}] {message}" def get_last_log_messages(self, number_messages) -> list: - last_logs_iterator = self.logs_hook.get_log_events( - self.log_group, self.log_stream_name, start_from_head=False + """ + Gets the last logs messages in one single request, so restrictions apply: + - if logs are too old, the response will be empty + - the max number of messages we can retrieve is constrained by cloudwatch limits (10,000). + """ + response = self.logs_hook.conn.get_log_events( + logGroupName=self.log_group, + logStreamName=self.log_stream_name, + startFromHead=False, + limit=number_messages, ) - truncated_iterator = itertools.islice(last_logs_iterator, number_messages) - messages = [log["message"] for log in truncated_iterator] - # need to reverse the order to put the logs back in order after getting them from the end - messages.reverse() - return messages + return [log["message"] for log in response["events"]] def get_last_log_message(self) -> str | None: try: diff --git a/airflow/providers/amazon/aws/hooks/logs.py b/airflow/providers/amazon/aws/hooks/logs.py index a6126152669f3..0982bcd9ed0c6 100644 --- a/airflow/providers/amazon/aws/hooks/logs.py +++ b/airflow/providers/amazon/aws/hooks/logs.py @@ -21,8 +21,10 @@ """ from __future__ import annotations +import warnings from typing import Generator +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook # Guidance received from the AWS team regarding the correct way to check for the end of a stream is that the @@ -51,12 +53,7 @@ def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) def get_log_events( - self, - log_group: str, - log_stream_name: str, - start_time: int = 0, - skip: int = 0, - start_from_head: bool = True, + self, log_group: str, log_stream_name: str, start_time: int = 0, skip: int = 0, **kwargs ) -> Generator: """ A generator for log items in a single stream. This will yield all the @@ -70,14 +67,27 @@ def get_log_events( :param start_time: The time stamp value to start reading the logs from (default: 0). :param skip: The number of log entries to skip at the start (default: 0). This is for when there are multiple entries at the same timestamp. - :param start_from_head: whether to start from the beginning (True) of the log or - at the end of the log (False). - If iterating from the end of the file, the logs are returned in reverse order. + :param start_from_head: Do not use with False, logs would be retrieved out of order. + If possible, retrieve logs in one query, or implement pagination yourself. :return: | A CloudWatch log event with the following key-value pairs: | 'timestamp' (int): The time in milliseconds of the event. | 'message' (str): The log event data. | 'ingestionTime' (int): The time in milliseconds the event was ingested. """ + start_from_head = kwargs.get("start_from_head", True) + if "start_from_head" in kwargs: + message = ( + "start_from_head is deprecated, please remove this parameter." + if start_from_head + else "Do not use this method with start_from_head=False, logs will be returned out of order. " + "If possible, retrieve logs in one query, or implement pagination yourself." + ) + warnings.warn( + message, + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + num_consecutive_empty_response = 0 next_token = None while True: @@ -104,12 +114,7 @@ def get_log_events( skip -= event_count events = [] - if not start_from_head: - # if we are not reading from head, it doesn't make sense to return events in "normal" order - # while hiding the subsequent calls, bc 1-9 queried by batches of 3 would return 789 456 123 - yield from reversed(events) - else: - yield from events + yield from events if not event_count: num_consecutive_empty_response += 1 diff --git a/tests/providers/amazon/aws/hooks/test_ecs.py b/tests/providers/amazon/aws/hooks/test_ecs.py index f065c75ecda56..5fd54688b858c 100644 --- a/tests/providers/amazon/aws/hooks/test_ecs.py +++ b/tests/providers/amazon/aws/hooks/test_ecs.py @@ -161,22 +161,18 @@ def test_event_to_str(self): ] ) - @mock.patch( - "airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events", - return_value=(), - ) - def test_get_last_log_message_with_no_log_events(self, mock_log_events): + @mock.patch.object(AwsLogsHook, "conn") + def test_get_last_log_message_with_no_log_events(self, conn_mock): assert self.log_fetcher.get_last_log_message() is None @mock.patch.object(AwsLogsHook, "conn") def test_get_last_log_message_with_log_events(self, log_conn_mock): log_conn_mock.get_log_events.return_value = { "events": [ - {"timestamp": 1617400267123, "message": "First"}, - {"timestamp": 1617400367456, "message": "Second"}, + {"timestamp": 1617400267123, "message": "Last"}, ] } - assert self.log_fetcher.get_last_log_message() == "Second" + assert self.log_fetcher.get_last_log_message() == "Last" @mock.patch.object(AwsLogsHook, "conn") def test_get_last_log_messages_with_log_events(self, log_conn_mock): @@ -187,11 +183,8 @@ def test_get_last_log_messages_with_log_events(self, log_conn_mock): {"timestamp": 1617400367458, "message": "Third"}, ] } - assert self.log_fetcher.get_last_log_messages(2) == ["Second", "Third"] + assert self.log_fetcher.get_last_log_messages(2) == ["First", "Second", "Third"] - @mock.patch( - "airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events", - return_value=(), - ) - def test_get_last_log_messages_with_no_log_events(self, mock_log_events): + @mock.patch.object(AwsLogsHook, "conn") + def test_get_last_log_messages_with_no_log_events(self, mock_conn): assert self.log_fetcher.get_last_log_messages(2) == [] From 32ba6e0a272f04536373f1e2b5bd9621d2108b2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Mon, 26 Jun 2023 16:02:10 -0700 Subject: [PATCH 4/4] +deprecated in comment --- airflow/providers/amazon/aws/hooks/logs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/hooks/logs.py b/airflow/providers/amazon/aws/hooks/logs.py index 69ae9f332c351..6d006a95a3e51 100644 --- a/airflow/providers/amazon/aws/hooks/logs.py +++ b/airflow/providers/amazon/aws/hooks/logs.py @@ -79,7 +79,7 @@ def get_log_events( :param start_time: The time stamp value to start reading the logs from (default: 0). :param skip: The number of log entries to skip at the start (default: 0). This is for when there are multiple entries at the same timestamp. - :param start_from_head: Do not use with False, logs would be retrieved out of order. + :param start_from_head: Deprecated. Do not use with False, logs would be retrieved out of order. If possible, retrieve logs in one query, or implement pagination yourself. :param continuation_token: a token indicating where to read logs from. Will be updated as this method reads new logs, to be reused in subsequent calls.