Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,18 @@ def processors(self) -> tuple[structlog.typing.Processor, ...]:
import structlog.stdlib

logRecordFactory = getLogRecordFactory()
# The handler MUST be initted here, before the processor is actually used to log anything.
# Otherwise, logging that occurs during the creation of the handler can create infinite loops.
_handler = self.handler
from airflow.sdk.log import relative_path_from_logger

def proc(logger: structlog.typing.WrappedLogger, method_name: str, event: structlog.typing.EventDict):
if not logger or not (stream_name := relative_path_from_logger(logger)):
return event
# Only init the handler stream_name once. We cannot do it above when we init the handler because
# we don't yet know the log path at that point.
if not _handler.log_stream_name:
_handler.log_stream_name = stream_name.as_posix().replace(":", "_")
name = event.get("logger_name") or event.get("logger", "")
level = structlog.stdlib.NAME_TO_LEVEL.get(method_name.lower(), logging.INFO)
msg = copy.copy(event)
Expand All @@ -134,7 +144,7 @@ def proc(logger: structlog.typing.WrappedLogger, method_name: str, event: struct
ct = created.timestamp()
record.created = ct
record.msecs = int((ct - int(ct)) * 1000) + 0.0 # Copied from stdlib logging
self.handler.handle(record)
_handler.handle(record)
return event

return (proc,)
Expand Down Expand Up @@ -177,6 +187,7 @@ def get_cloudwatch_logs(self, stream_name: str, task_instance: TaskInstance | No
:param task_instance: the task instance to get logs about
:return: string of all logs from the given log stream
"""
stream_name = stream_name.replace(":", "_")
# If there is an end_date to the task instance, fetch logs until that date + 30 seconds
# 30 seconds is an arbitrary buffer so that we don't miss any logs that were emitted
end_time = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,24 @@ def setup_tests(self, create_runtime_ti, tmp_path, monkeypatch):

self.remote_log_group = "log_group_name"
self.region_name = "us-west-2"
self.task_log_path = "dag_id=a/0:0.log"
self.local_log_location = tmp_path / "local-cloudwatch-log-location"
self.local_log_location.mkdir()
# Create the local log file structure
task_log_path_parts = self.task_log_path.split("/")
dag_dir = self.local_log_location / task_log_path_parts[0]
dag_dir.mkdir()
task_log_file = dag_dir / task_log_path_parts[1]
task_log_file.touch()

# The subject under test
self.subject = CloudWatchRemoteLogIO(
base_log_folder=self.local_log_location,
log_group_arn=f"arn:aws:logs:{self.region_name}:11111111:log-group:{self.remote_log_group}",
log_stream_name="dag_id=a/0.log",
)

conn = boto3.client("logs", region_name=self.region_name)
conn.create_log_group(logGroupName=self.remote_log_group)
conn.create_log_stream(logGroupName=self.remote_log_group, logStreamName=self.subject.log_stream_name)

processors = structlog.get_config()["processors"]
old_processors = processors.copy()
Expand All @@ -108,7 +113,13 @@ def drop(*args):
raise structlog.DropEvent()

processors[-1] = drop
structlog.configure(processors=processors)
structlog.configure(
processors=processors,
# Create a logger factory and pass in the file path we want it to use
# This is because we use the logger to determine the streamname/filepath
# in the CloudWatchRemoteLogIO processor.
logger_factory=structlog.PrintLoggerFactory(file=task_log_file.open("w+")),
)
yield
finally:
# remove LogCapture and restore original processors
Expand All @@ -118,32 +129,38 @@ def drop(*args):

@time_machine.travel(datetime(2025, 3, 27, 21, 58, 1, 2345), tick=False)
def test_log_message(self):
import structlog
# Use a context instead of a decorator on the test method because we need access to self to
# get the path from the setup method.
with conf_vars({("logging", "base_log_folder"): self.local_log_location.as_posix()}):
import structlog

log = structlog.get_logger()
log.info("Hi", foo="bar")
# We need to close in order to flush the logs etc.
self.subject.close()
log = structlog.get_logger()
log.info("Hi", foo="bar")
# We need to close in order to flush the logs etc.
self.subject.close()

logs = self.subject.read("dag_id=a/0.log", self.ti)
# Inside the Cloudwatch logger we swap colons for underscores since colons are not allowed in
# stream names.
stream_name = self.task_log_path.replace(":", "_")
logs = self.subject.read(stream_name, self.ti)

if AIRFLOW_V_3_0_PLUS:
from airflow.utils.log.file_task_handler import StructuredLogMessage
if AIRFLOW_V_3_0_PLUS:
from airflow.utils.log.file_task_handler import StructuredLogMessage

metadata, logs = logs
metadata, logs = logs

results = TypeAdapter(list[StructuredLogMessage]).dump_python(logs)
assert metadata == [
"Reading remote log from Cloudwatch log_group: log_group_name log_stream: dag_id=a/0.log"
]
assert results == [
{
"event": "Hi",
"foo": "bar",
"level": "info",
"timestamp": datetime(2025, 3, 27, 21, 58, 1, 2000, tzinfo=TzInfo(0)),
},
]
results = TypeAdapter(list[StructuredLogMessage]).dump_python(logs)
assert metadata == [
f"Reading remote log from Cloudwatch log_group: log_group_name log_stream: {stream_name}"
]
assert results == [
{
"event": "Hi",
"foo": "bar",
"level": "info",
"timestamp": datetime(2025, 3, 27, 21, 58, 1, 2000, tzinfo=TzInfo(0)),
},
]

def test_event_to_str(self):
handler = self.subject
Expand Down
28 changes: 17 additions & 11 deletions task-sdk/src/airflow/sdk/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,26 +496,32 @@ def load_remote_log_handler() -> RemoteLogIO | None:
return airflow.logging_config.REMOTE_TASK_LOG


def upload_to_remote(logger: FilteringBoundLogger):
from airflow.configuration import conf

raw_logger = getattr(logger, "_logger")

if not raw_logger or not hasattr(raw_logger, "_file"):
def relative_path_from_logger(logger) -> Path | None:
if not logger:
return None
if not hasattr(logger, "_file"):
logger.warning("Unable to find log file, logger was of unexpected type", type=type(logger))
return
return None

fh = raw_logger._file
fh = logger._file
fname = fh.name

if fh.fileno() == 1 or not isinstance(fname, str):
# Logging to stdout, or something odd about this logger, don't try to upload!
return
return None
from airflow.configuration import conf

base_log_folder = conf.get("logging", "base_log_folder")
relative_path = Path(fname).relative_to(base_log_folder)
return Path(fname).relative_to(base_log_folder)


def upload_to_remote(logger: FilteringBoundLogger):
raw_logger = getattr(logger, "_logger")

relative_path = relative_path_from_logger(raw_logger)

handler = load_remote_log_handler()
if not handler:
if not handler or not relative_path:
return

log_relative_path = relative_path.as_posix()
Expand Down
Loading