diff --git a/airflow/providers/amazon/aws/hooks/logs.py b/airflow/providers/amazon/aws/hooks/logs.py index 2dff0aaaf3dfd..6d006a95a3e51 100644 --- a/airflow/providers/amazon/aws/hooks/logs.py +++ b/airflow/providers/amazon/aws/hooks/logs.py @@ -21,8 +21,10 @@ """ from __future__ import annotations +import warnings from typing import Generator +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook # Guidance received from the AWS team regarding the correct way to check for the end of a stream is that the @@ -62,7 +64,7 @@ def get_log_events( log_stream_name: str, start_time: int = 0, skip: int = 0, - start_from_head: bool = True, + start_from_head: bool | None = None, continuation_token: ContinuationToken | None = None, ) -> Generator: """ @@ -77,8 +79,8 @@ def get_log_events( :param start_time: The time stamp value to start reading the logs from (default: 0). :param skip: The number of log entries to skip at the start (default: 0). This is for when there are multiple entries at the same timestamp. - :param start_from_head: whether to start from the beginning (True) of the log or - at the end of the log (False). + :param start_from_head: Deprecated. Do not use with False, logs would be retrieved out of order. + If possible, retrieve logs in one query, or implement pagination yourself. :param continuation_token: a token indicating where to read logs from. Will be updated as this method reads new logs, to be reused in subsequent calls. :return: | A CloudWatch log event with the following key-value pairs: @@ -86,6 +88,21 @@ def get_log_events( | 'message' (str): The log event data. | 'ingestionTime' (int): The time in milliseconds the event was ingested. """ + if start_from_head is not None: + message = ( + "start_from_head is deprecated, please remove this parameter." + if start_from_head + else "Do not use this method with start_from_head=False, logs will be returned out of order. " + "If possible, retrieve logs in one query, or implement pagination yourself." + ) + warnings.warn( + message, + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + else: + start_from_head = True + if continuation_token is None: continuation_token = AwsLogsHook.ContinuationToken() diff --git a/airflow/providers/amazon/aws/utils/task_log_fetcher.py b/airflow/providers/amazon/aws/utils/task_log_fetcher.py index 22a5e5f2a1afe..fc33219d72d41 100644 --- a/airflow/providers/amazon/aws/utils/task_log_fetcher.py +++ b/airflow/providers/amazon/aws/utils/task_log_fetcher.py @@ -18,7 +18,6 @@ from __future__ import annotations import time -from collections import deque from datetime import datetime, timedelta from logging import Logger from threading import Event, Thread @@ -95,7 +94,18 @@ def event_to_str(event: dict) -> str: return f"[{formatted_event_dt}] {message}" def get_last_log_messages(self, number_messages) -> list: - return [log["message"] for log in deque(self._get_log_events(), maxlen=number_messages)] + """ + Gets the last logs messages in one single request, so restrictions apply: + - if logs are too old, the response will be empty + - the max number of messages we can retrieve is constrained by cloudwatch limits (10,000). + """ + response = self.hook.conn.get_log_events( + logGroupName=self.log_group, + logStreamName=self.log_stream_name, + startFromHead=False, + limit=number_messages, + ) + return [log["message"] for log in response["events"]] def get_last_log_message(self) -> str | None: try: diff --git a/tests/providers/amazon/aws/utils/test_task_log_fetcher.py b/tests/providers/amazon/aws/utils/test_task_log_fetcher.py index a5598ebf552c0..1170bb643f500 100644 --- a/tests/providers/amazon/aws/utils/test_task_log_fetcher.py +++ b/tests/providers/amazon/aws/utils/test_task_log_fetcher.py @@ -120,41 +120,30 @@ def test_event_to_str(self): ] ) - @mock.patch( - "airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events", - return_value=(), - ) - def test_get_last_log_message_with_no_log_events(self, mock_log_events): + @mock.patch.object(AwsLogsHook, "conn") + def test_get_last_log_message_with_no_log_events(self, mock_conn): assert self.log_fetcher.get_last_log_message() is None - @mock.patch( - "airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events", - return_value=iter( - [ - {"timestamp": 1617400267123, "message": "First"}, - {"timestamp": 1617400367456, "message": "Second"}, + @mock.patch.object(AwsLogsHook, "conn") + def test_get_last_log_message_with_log_events(self, log_conn_mock): + log_conn_mock.get_log_events.return_value = { + "events": [ + {"timestamp": 1617400267123, "message": "Last"}, ] - ), - ) - def test_get_last_log_message_with_log_events(self, mock_log_events): - assert self.log_fetcher.get_last_log_message() == "Second" + } + assert self.log_fetcher.get_last_log_message() == "Last" - @mock.patch( - "airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events", - return_value=iter( - [ + @mock.patch.object(AwsLogsHook, "conn") + def test_get_last_log_messages_with_log_events(self, log_conn_mock): + log_conn_mock.get_log_events.return_value = { + "events": [ {"timestamp": 1617400267123, "message": "First"}, {"timestamp": 1617400367456, "message": "Second"}, {"timestamp": 1617400367458, "message": "Third"}, ] - ), - ) - def test_get_last_log_messages_with_log_events(self, mock_log_events): - assert self.log_fetcher.get_last_log_messages(2) == ["Second", "Third"] + } + assert self.log_fetcher.get_last_log_messages(2) == ["First", "Second", "Third"] - @mock.patch( - "airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events", - return_value=(), - ) - def test_get_last_log_messages_with_no_log_events(self, mock_log_events): + @mock.patch.object(AwsLogsHook, "conn") + def test_get_last_log_messages_with_no_log_events(self, mock_conn): assert self.log_fetcher.get_last_log_messages(2) == []