Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions task-sdk/src/airflow/sdk/execution_time/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from socket import SO_SNDBUF, SOL_SOCKET, SocketIO, socket, socketpair
from typing import (
TYPE_CHECKING,
BinaryIO,
Callable,
ClassVar,
NoReturn,
Expand Down Expand Up @@ -1512,6 +1513,7 @@ def supervise(

# TODO: Use logging providers to handle the chunked upload for us etc.
logger: FilteringBoundLogger | None = None
log_file_descriptor: BinaryIO | TextIO | None = None
if log_path:
# If we are told to write logs to a file, redirect the task logger to it. Make sure we append to the
# file though, otherwise when we resume we would lose the logs from the start->deferral segment if it
Expand All @@ -1522,9 +1524,11 @@ def supervise(

pretty_logs = False
if pretty_logs:
underlying_logger: WrappedLogger = structlog.WriteLogger(log_file.open("a", buffering=1))
log_file_descriptor = log_file.open("a", buffering=1)
underlying_logger: WrappedLogger = structlog.WriteLogger(cast("TextIO", log_file_descriptor))
else:
underlying_logger = structlog.BytesLogger(log_file.open("ab"))
log_file_descriptor = log_file.open("ab")
underlying_logger = structlog.BytesLogger(cast("BinaryIO", log_file_descriptor))
processors = logging_processors(enable_pretty_log=pretty_logs)[0]
logger = structlog.wrap_logger(underlying_logger, processors=processors, logger_name="task").bind()

Expand All @@ -1549,4 +1553,6 @@ def supervise(
exit_code = process.wait()
end = time.monotonic()
log.info("Task finished", exit_code=exit_code, duration=end - start, final_state=process.final_state)
if log_path and log_file_descriptor:
log_file_descriptor.close()
return exit_code