Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
from airflow.api_fastapi.execution_api.app import InProcessExecutionAPI
from airflow.jobs.job import Job
from airflow.sdk.api.client import Client
from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
from airflow.triggers.base import BaseTrigger

HANDLER_SUPPORTS_TRIGGERER = False
Expand Down Expand Up @@ -235,6 +236,8 @@ class TriggerStateChanges(BaseModel):
class TriggerLoggingFactory:
log_path: str

ti: RuntimeTI

bound_logger: WrappedLogger = attrs.field(init=False)

def __call__(self, processors: Iterable[structlog.typing.Processor]) -> WrappedLogger:
Expand All @@ -261,7 +264,7 @@ def upload_to_remote(self):
# Never actually called, nothing to do
return

upload_to_remote(self.bound_logger)
upload_to_remote(self.bound_logger, self.ti)


def in_process_api_server() -> InProcessExecutionAPI:
Expand Down Expand Up @@ -515,14 +518,16 @@ def update_triggers(self, requested_trigger_ids: set[int]):
)
if new_trigger_orm.task_instance:
log_path = render_log_fname(ti=new_trigger_orm.task_instance)
# When producing logs from TIs, include the job id producing the logs to disambiguate it.
self.logger_cache[new_id] = TriggerLoggingFactory(
log_path=f"{log_path}.trigger.{self.job.id}.log"
)

ser_ti = workloads.TaskInstance.model_validate(
new_trigger_orm.task_instance, from_attributes=True
)
# When producing logs from TIs, include the job id producing the logs to disambiguate it.
self.logger_cache[new_id] = TriggerLoggingFactory(
log_path=f"{log_path}.trigger.{self.job.id}.log",
ti=ser_ti, # type: ignore
)

workload.ti = ser_ti
workload.timeout_after = new_trigger_orm.task_instance.trigger_timeout

Expand Down
5 changes: 3 additions & 2 deletions airflow-core/src/airflow/logging/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
if TYPE_CHECKING:
import structlog.typing

from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo


Expand All @@ -39,10 +40,10 @@ def processors(self) -> tuple[structlog.typing.Processor, ...]: ...
are being written to a file, or if you want to upload messages as they are generated.
"""

def upload(self, path: os.PathLike | str) -> None:
def upload(self, path: os.PathLike | str, ti: RuntimeTI) -> None:
"""Upload the given log path to the remote storage."""
...

def read(self, relative_path: str) -> tuple[LogSourceInfo, LogMessages | None]:
def read(self, relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages | None]:
"""Read logs from the given remote log path."""
...
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,5 +638,5 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[LogSourceInf
# This living here is not really a good plan, but it just about works for now.
# Ideally we move all the read+combine logic in to TaskLogReader and out of the task handler.
path = self._render_filename(ti, try_number)
sources, logs = remote_io.read(path)
sources, logs = remote_io.read(path, ti)
return sources, logs or []
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance
from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo


Expand All @@ -44,7 +44,7 @@ class OSSRemoteLogIO(LoggingMixin): # noqa: D101

processors = ()

def upload(self, path: os.PathLike | str):
def upload(self, path: os.PathLike | str, ti: RuntimeTI):
"""Upload the given log path to the remote storage."""
path = Path(path)
if path.is_absolute():
Expand Down Expand Up @@ -86,7 +86,7 @@ def hook(self):
remote_conn_id,
)

def read(self, relative_path, ti: TaskInstance | None = None) -> tuple[LogSourceInfo, LogMessages | None]:
def read(self, relative_path, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages | None]:
logs: list[str] = []
messages = [relative_path]

Expand Down Expand Up @@ -191,6 +191,7 @@ def set_context(self, ti):
self.log_relative_path = self._render_filename(ti, ti.try_number)
self.upload_on_close = not ti.raw

self.ti = ti
# Clear the file first so that duplicate data is not uploaded
# when reusing the same path (e.g. with rescheduled sensors)
if self.upload_on_close:
Expand All @@ -211,7 +212,8 @@ def close(self):
if not self.upload_on_close:
return

self.io.upload(self.log_relative_path)
if hasattr(self, "ti"):
self.io.upload(self.log_relative_path, self.ti)

# Mark closed so we don't double write if close is called twice
self.closed = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import structlog.typing

from airflow.models.taskinstance import TaskInstance
from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo


Expand Down Expand Up @@ -152,13 +153,13 @@ def proc(logger: structlog.typing.WrappedLogger, method_name: str, event: struct
def close(self):
self.handler.close()

def upload(self, path: os.PathLike | str):
def upload(self, path: os.PathLike | str, ti: RuntimeTI):
# No-op, as we upload via the processor as we go
# But we need to give the handler time to finish off its business
self.close()
return

def read(self, relative_path, ti: TaskInstance | None = None) -> tuple[LogSourceInfo, LogMessages | None]:
def read(self, relative_path, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages | None]:
logs: LogMessages | None = []
messages = [
f"Reading remote log from Cloudwatch log_group: {self.log_group} log_stream: {relative_path}"
Expand All @@ -179,7 +180,7 @@ def read(self, relative_path, ti: TaskInstance | None = None) -> tuple[LogSource

return messages, logs

def get_cloudwatch_logs(self, stream_name: str, task_instance: TaskInstance | None):
def get_cloudwatch_logs(self, stream_name: str, task_instance: RuntimeTI):
"""
Return all logs from the given log stream.

Expand All @@ -192,8 +193,8 @@ def get_cloudwatch_logs(self, stream_name: str, task_instance: TaskInstance | No
# 30 seconds is an arbitrary buffer so that we don't miss any logs that were emitted
end_time = (
None
if task_instance is None or task_instance.end_date is None
else datetime_to_epoch_utc_ms(task_instance.end_date + timedelta(seconds=30))
if (end_date := getattr(task_instance, "end_date", None)) is None
else datetime_to_epoch_utc_ms(end_date + timedelta(seconds=30))
)
events = self.hook.get_log_events(
log_group=self.log_group,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance
from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo


Expand All @@ -45,7 +46,7 @@ class S3RemoteLogIO(LoggingMixin): # noqa: D101

processors = ()

def upload(self, path: os.PathLike | str):
def upload(self, path: os.PathLike | str, ti: RuntimeTI):
"""Upload the given log path to the remote storage."""
path = pathlib.Path(path)
if path.is_absolute():
Expand Down Expand Up @@ -146,7 +147,7 @@ def write(
return False
return True

def read(self, relative_path: str) -> tuple[LogSourceInfo, LogMessages | None]:
def read(self, relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages | None]:
logs: list[str] = []
messages = []
bucket, prefix = self.hook.parse_s3_url(s3url=os.path.join(self.remote_base, relative_path))
Expand Down Expand Up @@ -195,6 +196,8 @@ def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> Non
if TYPE_CHECKING:
assert self.handler is not None

self.ti = ti

full_path = self.handler.baseFilename
self.log_relative_path = pathlib.Path(full_path).relative_to(self.local_base).as_posix()
is_trigger_log_context = getattr(ti, "is_trigger_log_context", False)
Expand All @@ -219,7 +222,8 @@ def close(self):
if not self.upload_on_close:
return

self.io.upload(self.log_relative_path)
if hasattr(self, "ti"):
self.io.upload(self.log_relative_path, self.ti)

# Mark closed so we don't double write if close is called twice
self.closed = True
Expand All @@ -230,7 +234,7 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[LogSourceInf
# in set_context method.
worker_log_rel_path = self._render_filename(ti, try_number)

messages, logs = self.io.read(worker_log_rel_path)
messages, logs = self.io.read(worker_log_rel_path, ti)

if logs is None:
logs = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance
from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo

_DEFAULT_SCOPESS = frozenset(
Expand All @@ -66,7 +67,7 @@ class GCSRemoteLogIO(LoggingMixin): # noqa: D101
scopes: Collection[str] | None
project_id: str

def upload(self, path: os.PathLike):
def upload(self, path: os.PathLike, ti: RuntimeTI):
"""Upload the given log path to the remote storage."""
path = Path(path)
if path.is_absolute():
Expand Down Expand Up @@ -146,7 +147,7 @@ def no_log_found(exc):
exc, "resp", {}
).get("status") == "404"

def read(self, relative_path) -> tuple[LogSourceInfo, LogMessages | None]:
def read(self, relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages | None]:
messages = []
logs = []
remote_loc = os.path.join(self.remote_base, relative_path)
Expand Down Expand Up @@ -237,6 +238,8 @@ def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> Non
if TYPE_CHECKING:
assert self.handler is not None

self.ti = ti

full_path = self.handler.baseFilename
self.log_relative_path = Path(full_path).relative_to(self.local_base).as_posix()
is_trigger_log_context = getattr(ti, "is_trigger_log_context", False)
Expand All @@ -256,7 +259,8 @@ def close(self):
if not self.upload_on_close:
return

self.io.upload(self.log_relative_path)
if hasattr(self, "ti"):
self.io.upload(self.log_relative_path, self.ti)

# Mark closed so we don't double write if close is called twice
self.closed = True
Expand All @@ -267,7 +271,7 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[LogSourceInf
# in set_context method.
worker_log_rel_path = self._render_filename(ti, try_number)

messages, logs = self.io.read(worker_log_rel_path)
messages, logs = self.io.read(worker_log_rel_path, ti)

if logs is None:
logs = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import logging

from airflow.models.taskinstance import TaskInstance
from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo


Expand All @@ -48,7 +49,7 @@ class WasbRemoteLogIO(LoggingMixin): # noqa: D101

processors = ()

def upload(self, path: str | os.PathLike):
def upload(self, path: str | os.PathLike, ti: RuntimeTI):
"""Upload the given log path to the remote storage."""
path = Path(path)
if path.is_absolute():
Expand Down Expand Up @@ -83,7 +84,7 @@ def hook(self):
)
return None

def read(self, relative_path) -> tuple[LogSourceInfo, LogMessages | None]:
def read(self, relative_path, ti: RuntimeTI) -> tuple[LogSourceInfo, LogMessages | None]:
messages = []
logs = []
# TODO: fix this - "relative path" i.e currently REMOTE_BASE_LOG_FOLDER should start with "wasb"
Expand Down Expand Up @@ -210,6 +211,7 @@ def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> Non
if TYPE_CHECKING:
assert self.handler is not None

self.ti = ti
full_path = self.handler.baseFilename
self.log_relative_path = Path(full_path).relative_to(self.local_base).as_posix()
is_trigger_log_context = getattr(ti, "is_trigger_log_context", False)
Expand All @@ -229,7 +231,8 @@ def close(self) -> None:
if not self.upload_on_close:
return

self.io.upload(self.log_relative_path)
if hasattr(self, "ti"):
self.io.upload(self.log_relative_path, self.ti)

# Mark closed so we don't double write if close is called twice
self.closed = True
Expand All @@ -240,7 +243,7 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[LogSourceInf
# in set_context method.
worker_log_rel_path = self._render_filename(ti, try_number)

messages, logs = self.io.read(worker_log_rel_path)
messages, logs = self.io.read(worker_log_rel_path, ti)

if logs is None:
logs = []
Expand Down
6 changes: 5 additions & 1 deletion task-sdk/src/airflow/sdk/execution_time/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
from structlog.typing import FilteringBoundLogger, WrappedLogger

from airflow.executors.workloads import BundleInfo
from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
from airflow.secrets import BaseSecretsBackend
from airflow.typing_compat import Self

Expand Down Expand Up @@ -697,6 +698,8 @@ class ActivitySubprocess(WatchedSubprocess):

decoder: ClassVar[TypeAdapter[ToSupervisor]] = TypeAdapter(ToSupervisor)

ti: RuntimeTI | None = None

@classmethod
def start( # type: ignore[override]
cls,
Expand All @@ -717,6 +720,7 @@ def start( # type: ignore[override]

def _on_child_started(self, ti: TaskInstance, dag_rel_path: str | os.PathLike[str], bundle_info):
"""Send startup message to the subprocess."""
self.ti = ti # type: ignore[assignment]
start_date = datetime.now(tz=timezone.utc)
try:
# We've forked, but the task won't start doing anything until we send it the StartupDetails
Expand Down Expand Up @@ -785,7 +789,7 @@ def _upload_logs(self):
"""
from airflow.sdk.log import upload_to_remote

upload_to_remote(self.process_log)
upload_to_remote(self.process_log, self.ti)

def _monitor_subprocess(self):
"""
Expand Down
5 changes: 3 additions & 2 deletions task-sdk/src/airflow/sdk/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from structlog.typing import EventDict, ExcInfo, FilteringBoundLogger, Processor

from airflow.logging_config import RemoteLogIO
from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI


__all__ = [
Expand Down Expand Up @@ -515,7 +516,7 @@ def relative_path_from_logger(logger) -> Path | None:
return Path(fname).relative_to(base_log_folder)


def upload_to_remote(logger: FilteringBoundLogger):
def upload_to_remote(logger: FilteringBoundLogger, ti: RuntimeTI):
raw_logger = getattr(logger, "_logger")

relative_path = relative_path_from_logger(raw_logger)
Expand All @@ -525,4 +526,4 @@ def upload_to_remote(logger: FilteringBoundLogger):
return

log_relative_path = relative_path.as_posix()
handler.upload(log_relative_path)
handler.upload(log_relative_path, ti)
Loading