Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions airflow/providers/amazon/aws/hooks/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
"""
from __future__ import annotations

import warnings
from typing import Generator

from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook

# Guidance received from the AWS team regarding the correct way to check for the end of a stream is that the
Expand Down Expand Up @@ -62,7 +64,7 @@ def get_log_events(
log_stream_name: str,
start_time: int = 0,
skip: int = 0,
start_from_head: bool = True,
start_from_head: bool | None = None,
continuation_token: ContinuationToken | None = None,
) -> Generator:
"""
Expand All @@ -77,15 +79,30 @@ def get_log_events(
:param start_time: The time stamp value to start reading the logs from (default: 0).
:param skip: The number of log entries to skip at the start (default: 0).
This is for when there are multiple entries at the same timestamp.
:param start_from_head: whether to start from the beginning (True) of the log or
at the end of the log (False).
:param start_from_head: Deprecated. Do not use with False, logs would be retrieved out of order.
If possible, retrieve logs in one query, or implement pagination yourself.
:param continuation_token: a token indicating where to read logs from.
Will be updated as this method reads new logs, to be reused in subsequent calls.
:return: | A CloudWatch log event with the following key-value pairs:
| 'timestamp' (int): The time in milliseconds of the event.
| 'message' (str): The log event data.
| 'ingestionTime' (int): The time in milliseconds the event was ingested.
"""
if start_from_head is not None:
message = (
"start_from_head is deprecated, please remove this parameter."
if start_from_head
else "Do not use this method with start_from_head=False, logs will be returned out of order. "
"If possible, retrieve logs in one query, or implement pagination yourself."
)
warnings.warn(
message,
AirflowProviderDeprecationWarning,
stacklevel=2,
)
else:
start_from_head = True

if continuation_token is None:
continuation_token = AwsLogsHook.ContinuationToken()

Expand Down
14 changes: 12 additions & 2 deletions airflow/providers/amazon/aws/utils/task_log_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from __future__ import annotations

import time
from collections import deque
from datetime import datetime, timedelta
from logging import Logger
from threading import Event, Thread
Expand Down Expand Up @@ -95,7 +94,18 @@ def event_to_str(event: dict) -> str:
return f"[{formatted_event_dt}] {message}"

def get_last_log_messages(self, number_messages) -> list:
return [log["message"] for log in deque(self._get_log_events(), maxlen=number_messages)]
"""
Gets the last logs messages in one single request, so restrictions apply:
- if logs are too old, the response will be empty
- the max number of messages we can retrieve is constrained by cloudwatch limits (10,000).
"""
response = self.hook.conn.get_log_events(
logGroupName=self.log_group,
logStreamName=self.log_stream_name,
startFromHead=False,
limit=number_messages,
)
return [log["message"] for log in response["events"]]

def get_last_log_message(self) -> str | None:
try:
Expand Down
45 changes: 17 additions & 28 deletions tests/providers/amazon/aws/utils/test_task_log_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,41 +120,30 @@ def test_event_to_str(self):
]
)

@mock.patch(
"airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events",
return_value=(),
)
def test_get_last_log_message_with_no_log_events(self, mock_log_events):
@mock.patch.object(AwsLogsHook, "conn")
def test_get_last_log_message_with_no_log_events(self, mock_conn):
assert self.log_fetcher.get_last_log_message() is None

@mock.patch(
"airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events",
return_value=iter(
[
{"timestamp": 1617400267123, "message": "First"},
{"timestamp": 1617400367456, "message": "Second"},
@mock.patch.object(AwsLogsHook, "conn")
def test_get_last_log_message_with_log_events(self, log_conn_mock):
log_conn_mock.get_log_events.return_value = {
"events": [
{"timestamp": 1617400267123, "message": "Last"},
]
),
)
def test_get_last_log_message_with_log_events(self, mock_log_events):
assert self.log_fetcher.get_last_log_message() == "Second"
}
assert self.log_fetcher.get_last_log_message() == "Last"

@mock.patch(
"airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events",
return_value=iter(
[
@mock.patch.object(AwsLogsHook, "conn")
def test_get_last_log_messages_with_log_events(self, log_conn_mock):
log_conn_mock.get_log_events.return_value = {
"events": [
{"timestamp": 1617400267123, "message": "First"},
{"timestamp": 1617400367456, "message": "Second"},
{"timestamp": 1617400367458, "message": "Third"},
]
),
)
def test_get_last_log_messages_with_log_events(self, mock_log_events):
assert self.log_fetcher.get_last_log_messages(2) == ["Second", "Third"]
}
assert self.log_fetcher.get_last_log_messages(2) == ["First", "Second", "Third"]

@mock.patch(
"airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events",
return_value=(),
)
def test_get_last_log_messages_with_no_log_events(self, mock_log_events):
@mock.patch.object(AwsLogsHook, "conn")
def test_get_last_log_messages_with_no_log_events(self, mock_conn):
assert self.log_fetcher.get_last_log_messages(2) == []