diff --git a/airflow-core/pyproject.toml b/airflow-core/pyproject.toml index 80ecef6f05388..8a91dbf7f2dec 100644 --- a/airflow-core/pyproject.toml +++ b/airflow-core/pyproject.toml @@ -227,6 +227,7 @@ exclude = [ "../shared/secrets_masker/src/airflow_shared/secrets_masker" = "src/airflow/_shared/secrets_masker" "../shared/timezones/src/airflow_shared/timezones" = "src/airflow/_shared/timezones" "../shared/secrets_backend/src/airflow_shared/secrets_backend" = "src/airflow/_shared/secrets_backend" +"../shared/observability/src/airflow_shared/observability" = "src/airflow/_shared/observability" [tool.hatch.build.targets.custom] path = "./hatch_build.py" @@ -299,4 +300,5 @@ shared_distributions = [ "apache-airflow-shared-secrets-backend", "apache-airflow-shared-secrets-masker", "apache-airflow-shared-timezones", + "apache-airflow-shared-observability", ] diff --git a/airflow-core/src/airflow/__init__.py b/airflow-core/src/airflow/__init__.py index c357baaeba699..b86606f2f88d3 100644 --- a/airflow-core/src/airflow/__init__.py +++ b/airflow-core/src/airflow/__init__.py @@ -89,6 +89,10 @@ # Deprecated lazy imports "AirflowException": (".exceptions", "AirflowException", True), "Dataset": (".sdk", "Asset", True), + "Stats": (".observability.stats", "Stats", True), + "Trace": (".observability.trace", "Trace", True), + "metrics": (".observability.metrics", "", True), + "traces": (".observability.traces", "", True), } if TYPE_CHECKING: # These objects are imported by PEP-562, however, static analyzers and IDE's diff --git a/airflow-core/src/airflow/_shared/observability b/airflow-core/src/airflow/_shared/observability new file mode 120000 index 0000000000000..7b9e2789bc841 --- /dev/null +++ b/airflow-core/src/airflow/_shared/observability @@ -0,0 +1 @@ +../../../../shared/observability/src/airflow_shared/observability \ No newline at end of file diff --git a/airflow-core/src/airflow/assets/manager.py b/airflow-core/src/airflow/assets/manager.py index b1d1e85544378..74f5ef131741e 100644 --- a/airflow-core/src/airflow/assets/manager.py +++ b/airflow-core/src/airflow/assets/manager.py @@ -38,7 +38,7 @@ DagScheduleAssetUriReference, PartitionedAssetKeyLog, ) -from airflow.stats import Stats +from airflow.observability.stats import Stats from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.sqlalchemy import get_dialect_name diff --git a/airflow-core/src/airflow/cli/commands/daemon_utils.py b/airflow-core/src/airflow/cli/commands/daemon_utils.py index c55c12b380461..e156d178a073a 100644 --- a/airflow-core/src/airflow/cli/commands/daemon_utils.py +++ b/airflow-core/src/airflow/cli/commands/daemon_utils.py @@ -75,7 +75,7 @@ def run_command_with_daemon_option( with ctx: # in daemon context stats client needs to be reinitialized. - from airflow.stats import Stats + from airflow.observability.stats import Stats Stats.instance = None callback() diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index ae45a690074c0..4fcc5a58fb09a 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -58,10 +58,10 @@ from airflow.models.dagwarning import DagWarning from airflow.models.db_callback_request import DbCallbackRequest from airflow.models.errors import ParseImportError +from airflow.observability.stats import Stats +from airflow.observability.trace import DebugTrace from airflow.sdk import SecretCache from airflow.sdk.log import init_log_file, logging_processors -from airflow.stats import Stats -from airflow.traces.tracer import DebugTrace from airflow.utils.file import list_py_file_paths, might_contain_dag from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.net import get_hostname diff --git a/airflow-core/src/airflow/dag_processing/processor.py b/airflow-core/src/airflow/dag_processing/processor.py index 506cf515d39d8..ae984a0eecfd2 100644 --- a/airflow-core/src/airflow/dag_processing/processor.py +++ b/airflow-core/src/airflow/dag_processing/processor.py @@ -36,6 +36,7 @@ ) from airflow.configuration import conf from airflow.dag_processing.dagbag import DagBag +from airflow.observability.stats import Stats from airflow.sdk.exceptions import TaskNotFound from airflow.sdk.execution_time.comms import ( ConnectionResult, @@ -66,7 +67,6 @@ from airflow.sdk.execution_time.supervisor import WatchedSubprocess from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance, _send_error_email_notification from airflow.serialization.serialized_objects import LazyDeserializedDAG, SerializedDAG -from airflow.stats import Stats from airflow.utils.file import iter_airflow_imports from airflow.utils.state import TaskInstanceState diff --git a/airflow-core/src/airflow/executors/base_executor.py b/airflow-core/src/airflow/executors/base_executor.py index 985a6bcab8bc3..e99aabd4baa06 100644 --- a/airflow-core/src/airflow/executors/base_executor.py +++ b/airflow-core/src/airflow/executors/base_executor.py @@ -27,15 +27,14 @@ import pendulum +from airflow._shared.observability.traces import NO_TRACE_ID from airflow.cli.cli_config import DefaultHelpParser from airflow.configuration import conf from airflow.executors import workloads from airflow.executors.executor_loader import ExecutorLoader from airflow.models import Log -from airflow.stats import Stats -from airflow.traces import NO_TRACE_ID -from airflow.traces.tracer import DebugTrace, Trace, add_debug_span, gen_context -from airflow.traces.utils import gen_span_id_from_ti_key +from airflow.observability.stats import Stats +from airflow.observability.trace import DebugTrace, Trace, add_debug_span from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import TaskInstanceState from airflow.utils.thread_safe_dict import ThreadSafeDict @@ -419,11 +418,9 @@ def fail(self, key: TaskInstanceKey, info=None) -> None: """ trace_id = Trace.get_current_span().get_span_context().trace_id if trace_id != NO_TRACE_ID: - span_id = int(gen_span_id_from_ti_key(key, as_int=True)) - with DebugTrace.start_span( + with DebugTrace.start_child_span( span_name="fail", component="BaseExecutor", - parent_sc=gen_context(trace_id=trace_id, span_id=span_id), ) as span: span.set_attributes( { @@ -446,11 +443,9 @@ def success(self, key: TaskInstanceKey, info=None) -> None: """ trace_id = Trace.get_current_span().get_span_context().trace_id if trace_id != NO_TRACE_ID: - span_id = int(gen_span_id_from_ti_key(key, as_int=True)) - with DebugTrace.start_span( + with DebugTrace.start_child_span( span_name="success", component="BaseExecutor", - parent_sc=gen_context(trace_id=trace_id, span_id=span_id), ) as span: span.set_attributes( { diff --git a/airflow-core/src/airflow/jobs/dag_processor_job_runner.py b/airflow-core/src/airflow/jobs/dag_processor_job_runner.py index 787b6bbab96e1..afb50afdf0c5a 100644 --- a/airflow-core/src/airflow/jobs/dag_processor_job_runner.py +++ b/airflow-core/src/airflow/jobs/dag_processor_job_runner.py @@ -21,7 +21,7 @@ from airflow.jobs.base_job_runner import BaseJobRunner from airflow.jobs.job import Job, perform_heartbeat -from airflow.stats import Stats +from airflow.observability.stats import Stats from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: diff --git a/airflow-core/src/airflow/jobs/job.py b/airflow-core/src/airflow/jobs/job.py index 660f714698f6f..a13bf62206b55 100644 --- a/airflow-core/src/airflow/jobs/job.py +++ b/airflow-core/src/airflow/jobs/job.py @@ -35,8 +35,8 @@ from airflow.executors.executor_loader import ExecutorLoader from airflow.listeners.listener import get_listener_manager from airflow.models.base import ID_LEN, Base -from airflow.stats import Stats -from airflow.traces.tracer import DebugTrace, add_debug_span +from airflow.observability.stats import Stats +from airflow.observability.trace import DebugTrace, add_debug_span from airflow.utils.helpers import convert_camel_to_snake from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.net import get_hostname diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 87ed4646774e1..7a43f6416a65c 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -77,12 +77,11 @@ from airflow.models.taskinstance import TaskInstance from airflow.models.team import Team from airflow.models.trigger import TRIGGER_FAIL_REPR, Trigger, TriggerFailureReason +from airflow.observability.stats import Stats +from airflow.observability.trace import DebugTrace, Trace, add_debug_span from airflow.serialization.definitions.notset import NOTSET -from airflow.stats import Stats from airflow.ti_deps.dependencies_states import EXECUTION_STATES from airflow.timetables.simple import AssetTriggeredTimetable -from airflow.traces import utils as trace_utils -from airflow.traces.tracer import DebugTrace, Trace, add_debug_span from airflow.utils.dates import datetime_to_nano from airflow.utils.event_scheduler import EventScheduler from airflow.utils.log.logging_mixin import LoggingMixin @@ -2085,12 +2084,8 @@ def _schedule_dag_run( :param dag_run: The DagRun to schedule :return: Callback that needs to be executed """ - trace_id = int(trace_utils.gen_trace_id(dag_run=dag_run, as_int=True)) - span_id = int(trace_utils.gen_dag_span_id(dag_run=dag_run, as_int=True)) - links = [{"trace_id": trace_id, "span_id": span_id}] - - with DebugTrace.start_span( - span_name="_schedule_dag_run", component="SchedulerJobRunner", links=links + with DebugTrace.start_root_span( + span_name="_schedule_dag_run", component="SchedulerJobRunner" ) as span: span.set_attributes( { diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index b56a58ca6ccd9..d741a103968ed 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -44,6 +44,8 @@ from airflow.jobs.base_job_runner import BaseJobRunner from airflow.jobs.job import perform_heartbeat from airflow.models.trigger import Trigger +from airflow.observability.stats import Stats +from airflow.observability.trace import DebugTrace, Trace, add_debug_span from airflow.sdk.api.datamodels._generated import HITLDetailResponse from airflow.sdk.execution_time.comms import ( CommsDecoder, @@ -73,8 +75,6 @@ _RequestFrame, ) from airflow.sdk.execution_time.supervisor import WatchedSubprocess, make_buffered_socket_reader -from airflow.stats import Stats -from airflow.traces.tracer import DebugTrace, Trace, add_debug_span from airflow.triggers import base as events from airflow.utils.helpers import log_filename_template_renderer from airflow.utils.log.logging_mixin import LoggingMixin diff --git a/airflow-core/src/airflow/models/callback.py b/airflow-core/src/airflow/models/callback.py index 15925e16dd4fb..4ea23b36e1078 100644 --- a/airflow-core/src/airflow/models/callback.py +++ b/airflow-core/src/airflow/models/callback.py @@ -29,7 +29,7 @@ from airflow._shared.timezones import timezone from airflow.models import Base -from airflow.stats import Stats +from airflow.observability.stats import Stats from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime, mapped_column if TYPE_CHECKING: diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 07758be44842d..89999bdbf29f0 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -68,12 +68,12 @@ from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH from airflow.models.tasklog import LogTemplate from airflow.models.taskmap import TaskMap +from airflow.observability.stats import Stats +from airflow.observability.trace import Trace from airflow.sdk.definitions.deadline import DeadlineReference from airflow.serialization.definitions.notset import NOTSET, ArgNotSet, is_arg_set -from airflow.stats import Stats from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.dependencies_states import SCHEDULEABLE_STATES -from airflow.traces.tracer import EmptySpan, Trace from airflow.utils.dates import datetime_to_nano from airflow.utils.helpers import chunks, is_container, prune_dict from airflow.utils.log.logging_mixin import LoggingMixin @@ -102,6 +102,7 @@ from sqlalchemy.orm import Session from sqlalchemy.sql.elements import Case, ColumnElement + from airflow._shared.observability.traces.base_tracer import EmptySpan from airflow.models.dag_version import DagVersion from airflow.models.mappedoperator import MappedOperator from airflow.models.taskinstancekey import TaskInstanceKey diff --git a/airflow-core/src/airflow/models/deadline.py b/airflow-core/src/airflow/models/deadline.py index 9e55f02c30754..3d83b46311d47 100644 --- a/airflow-core/src/airflow/models/deadline.py +++ b/airflow-core/src/airflow/models/deadline.py @@ -31,7 +31,7 @@ from airflow._shared.timezones import timezone from airflow.models.base import Base from airflow.models.callback import Callback, CallbackDefinitionProtocol -from airflow.stats import Stats +from airflow.observability.stats import Stats from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.session import provide_session from airflow.utils.sqlalchemy import UtcDateTime, get_dialect_name, mapped_column diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 99ebcfd5f8568..feee9c4995929 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -82,8 +82,8 @@ from airflow.models.taskmap import TaskMap from airflow.models.taskreschedule import TaskReschedule from airflow.models.xcom import XCOM_RETURN_KEY, LazyXComSelectSequence, XComModel +from airflow.observability.stats import Stats from airflow.settings import task_instance_mutation_hook -from airflow.stats import Stats from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.dependencies_deps import REQUEUEABLE_DEPS, RUNNING_DEPS from airflow.utils.helpers import prune_dict diff --git a/airflow-core/src/airflow/metrics/__init__.py b/airflow-core/src/airflow/observability/__init__.py similarity index 100% rename from airflow-core/src/airflow/metrics/__init__.py rename to airflow-core/src/airflow/observability/__init__.py diff --git a/airflow-core/src/airflow/observability/metrics/__init__.py b/airflow-core/src/airflow/observability/metrics/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow-core/src/airflow/observability/metrics/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow-core/src/airflow/observability/metrics/datadog_logger.py b/airflow-core/src/airflow/observability/metrics/datadog_logger.py new file mode 100644 index 0000000000000..6ac9b6c4545ab --- /dev/null +++ b/airflow-core/src/airflow/observability/metrics/datadog_logger.py @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +from airflow._shared.observability.metrics import datadog_logger +from airflow.configuration import conf + +if TYPE_CHECKING: + from airflow._shared.observability.metrics.datadog_logger import SafeDogStatsdLogger + + +def get_dogstatsd_logger(cls) -> SafeDogStatsdLogger: + return datadog_logger.get_dogstatsd_logger( + cls, + host=conf.get("metrics", "statsd_host"), + port=conf.getint("metrics", "statsd_port"), + namespace=conf.get("metrics", "statsd_prefix"), + datadog_metrics_tags=conf.getboolean("metrics", "statsd_datadog_metrics_tags", fallback=True), + statsd_disabled_tags=conf.get("metrics", "statsd_disabled_tags", fallback=None), + metrics_allow_list=conf.get("metrics", "metrics_allow_list", fallback=None), + metrics_block_list=conf.get("metrics", "metrics_block_list", fallback=None), + stat_name_handler=conf.getimport("metrics", "stat_name_handler"), + statsd_influxdb_enabled=conf.getboolean("metrics", "statsd_influxdb_enabled", fallback=False), + ) diff --git a/airflow-core/src/airflow/observability/metrics/otel_logger.py b/airflow-core/src/airflow/observability/metrics/otel_logger.py new file mode 100644 index 0000000000000..bb68c60847f4d --- /dev/null +++ b/airflow-core/src/airflow/observability/metrics/otel_logger.py @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +from airflow._shared.observability.metrics import otel_logger +from airflow.configuration import conf + +if TYPE_CHECKING: + from airflow._shared.observability.metrics.otel_logger import SafeOtelLogger + + +def get_otel_logger(cls) -> SafeOtelLogger: + return otel_logger.get_otel_logger( + cls, + host=conf.get("metrics", "otel_host"), # ex: "breeze-otel-collector" + port=conf.getint("metrics", "otel_port"), # ex: 4318 + prefix=conf.get("metrics", "otel_prefix"), # ex: "airflow" + ssl_active=conf.getboolean("metrics", "otel_ssl_active"), + # PeriodicExportingMetricReader will default to an interval of 60000 millis. + conf_interval=conf.getfloat("metrics", "otel_interval_milliseconds", fallback=None), # ex: 30000 + debug=conf.getboolean("metrics", "otel_debugging_on"), + service_name=conf.get("metrics", "otel_service"), + metrics_allow_list=conf.get("metrics", "metrics_allow_list", fallback=None), + metrics_block_list=conf.get("metrics", "metrics_block_list", fallback=None), + stat_name_handler=conf.getimport("metrics", "stat_name_handler"), + statsd_influxdb_enabled=conf.getboolean("metrics", "statsd_influxdb_enabled", fallback=False), + ) diff --git a/airflow-core/src/airflow/observability/metrics/statsd_logger.py b/airflow-core/src/airflow/observability/metrics/statsd_logger.py new file mode 100644 index 0000000000000..2389d651f4ec4 --- /dev/null +++ b/airflow-core/src/airflow/observability/metrics/statsd_logger.py @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +from airflow._shared.configuration import AirflowConfigException +from airflow._shared.observability.metrics import statsd_logger +from airflow.configuration import conf + +if TYPE_CHECKING: + from airflow._shared.observability.metrics.statsd_logger import SafeStatsdLogger + +log = logging.getLogger(__name__) + + +def get_statsd_logger(cls) -> SafeStatsdLogger: + stats_class = conf.getimport("metrics", "statsd_custom_client_path", fallback=None) + + # no need to check for the scheduler/statsd_on -> this method is only called when it is set + # and previously it would crash with None is callable if it was called without it. + from statsd import StatsClient + + if stats_class: + if not issubclass(stats_class, StatsClient): + raise AirflowConfigException( + "Your custom StatsD client must extend the statsd.StatsClient in order to ensure " + "backwards compatibility." + ) + log.info("Successfully loaded custom StatsD client") + + else: + stats_class = StatsClient + + return statsd_logger.get_statsd_logger( + cls, + stats_class=stats_class, + host=conf.get("metrics", "statsd_host"), + port=conf.getint("metrics", "statsd_port"), + prefix=conf.get("metrics", "statsd_prefix"), + ipv6=conf.getboolean("metrics", "statsd_ipv6", fallback=False), + influxdb_tags_enabled=conf.getboolean("metrics", "statsd_influxdb_enabled", fallback=False), + statsd_disabled_tags=conf.get("metrics", "statsd_disabled_tags", fallback=None), + metrics_allow_list=conf.get("metrics", "metrics_allow_list", fallback=None), + metrics_block_list=conf.get("metrics", "metrics_block_list", fallback=None), + stat_name_handler=conf.getimport("metrics", "stat_name_handler"), + statsd_influxdb_enabled=conf.getboolean("metrics", "statsd_influxdb_enabled", fallback=False), + ) diff --git a/airflow-core/src/airflow/stats.py b/airflow-core/src/airflow/observability/stats.py similarity index 87% rename from airflow-core/src/airflow/stats.py rename to airflow-core/src/airflow/observability/stats.py index 6cb9229ab7388..cf586e7f8f496 100644 --- a/airflow-core/src/airflow/stats.py +++ b/airflow-core/src/airflow/observability/stats.py @@ -22,11 +22,11 @@ from collections.abc import Callable from typing import TYPE_CHECKING +from airflow._shared.observability.metrics.base_stats_logger import NoStatsLogger from airflow.configuration import conf -from airflow.metrics.base_stats_logger import NoStatsLogger if TYPE_CHECKING: - from airflow.metrics.base_stats_logger import StatsLogger + from airflow._shared.observability.metrics.base_stats_logger import StatsLogger log = logging.getLogger(__name__) @@ -49,15 +49,15 @@ def __init__(cls, *args, **kwargs) -> None: if not hasattr(cls.__class__, "factory"): is_datadog_enabled_defined = conf.has_option("metrics", "statsd_datadog_enabled") if is_datadog_enabled_defined and conf.getboolean("metrics", "statsd_datadog_enabled"): - from airflow.metrics import datadog_logger + from airflow.observability.metrics import datadog_logger cls.__class__.factory = datadog_logger.get_dogstatsd_logger elif conf.getboolean("metrics", "statsd_on"): - from airflow.metrics import statsd_logger + from airflow.observability.metrics import statsd_logger cls.__class__.factory = statsd_logger.get_statsd_logger elif conf.getboolean("metrics", "otel_on"): - from airflow.metrics import otel_logger + from airflow.observability.metrics import otel_logger cls.__class__.factory = otel_logger.get_otel_logger else: diff --git a/airflow-core/src/airflow/observability/trace.py b/airflow-core/src/airflow/observability/trace.py new file mode 100644 index 0000000000000..bb074be0152b7 --- /dev/null +++ b/airflow-core/src/airflow/observability/trace.py @@ -0,0 +1,122 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +from collections.abc import Callable +from functools import wraps +from socket import socket +from typing import TYPE_CHECKING + +from airflow._shared.observability.traces.base_tracer import EmptyTrace, Tracer +from airflow.configuration import conf + +log = logging.getLogger(__name__) + + +def add_debug_span(func): + """Decorate a function with span.""" + func_name = func.__name__ + qual_name = func.__qualname__ + module_name = func.__module__ + component = qual_name.rsplit(".", 1)[0] if "." in qual_name else module_name + + @wraps(func) + def wrapper(*args, **kwargs): + with DebugTrace.start_span(span_name=func_name, component=component): + return func(*args, **kwargs) + + return wrapper + + +class _TraceMeta(type): + factory: Callable[[], Tracer] | None = None + instance: Tracer | EmptyTrace | None = None + + def __new__(cls, name, bases, attrs): + # Read the debug flag from the class body. + if "check_debug_traces_flag" not in attrs: + raise TypeError(f"Class '{name}' must define 'check_debug_traces_flag'.") + + return super().__new__(cls, name, bases, attrs) + + def __getattr__(cls, name: str): + if not cls.factory: + # Lazy initialization of the factory + cls.configure_factory() + if not cls.instance: + cls._initialize_instance() + return getattr(cls.instance, name) + + def _initialize_instance(cls): + """Initialize the trace instance.""" + try: + cls.instance = cls.factory() + except (socket.gaierror, ImportError) as e: + log.error("Could not configure Trace: %s. Using EmptyTrace instead.", e) + cls.instance = EmptyTrace() + + def __call__(cls, *args, **kwargs): + """Ensure the class behaves as a singleton.""" + if not cls.instance: + cls._initialize_instance() + return cls.instance + + def configure_factory(cls): + """Configure the trace factory based on settings.""" + otel_on = conf.getboolean("traces", "otel_on") + + if cls.check_debug_traces_flag: + debug_traces_on = conf.getboolean("traces", "otel_debug_traces_on") + else: + # Set to true so that it will be ignored during the evaluation for the factory instance. + # If this is true, then (otel_on and debug_traces_on) will always evaluate to + # whatever value 'otel_on' has and therefore it will be ignored. + debug_traces_on = True + + if otel_on and debug_traces_on: + from airflow.observability.traces import otel_tracer + + cls.factory = staticmethod( + lambda use_simple_processor=False: otel_tracer.get_otel_tracer(cls, use_simple_processor) + ) + else: + # EmptyTrace is a class and not inherently callable. + # Using a lambda ensures it can be invoked as a callable factory. + # staticmethod ensures the lambda is treated as a standalone function + # and avoids passing `cls` as an implicit argument. + cls.factory = staticmethod(lambda: EmptyTrace()) + + def get_constant_tags(cls) -> str | None: + """Get constant tags to add to all traces.""" + return conf.get("traces", "tags", fallback=None) + + +if TYPE_CHECKING: + Trace: EmptyTrace + DebugTrace: EmptyTrace +else: + + class Trace(metaclass=_TraceMeta): + """Empty class for Trace - we use metaclass to inject the right one.""" + + check_debug_traces_flag = False + + class DebugTrace(metaclass=_TraceMeta): + """Empty class for Trace and in case the debug traces flag is enabled.""" + + check_debug_traces_flag = True diff --git a/airflow-core/src/airflow/observability/traces/__init__.py b/airflow-core/src/airflow/observability/traces/__init__.py new file mode 100644 index 0000000000000..217e5db960782 --- /dev/null +++ b/airflow-core/src/airflow/observability/traces/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow-core/src/airflow/observability/traces/otel_tracer.py b/airflow-core/src/airflow/observability/traces/otel_tracer.py new file mode 100644 index 0000000000000..73934c3da326e --- /dev/null +++ b/airflow-core/src/airflow/observability/traces/otel_tracer.py @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +from airflow._shared.observability.traces import otel_tracer +from airflow.configuration import conf + +if TYPE_CHECKING: + from airflow._shared.observability.traces.otel_tracer import OtelTrace + + +def get_otel_tracer(cls, use_simple_processor: bool = False) -> OtelTrace: + return otel_tracer.get_otel_tracer( + cls, + use_simple_processor, + host=conf.get("traces", "otel_host"), + port=conf.getint("traces", "otel_port"), + ssl_active=conf.getboolean("traces", "otel_ssl_active"), + otel_service=conf.get("traces", "otel_service"), + debug=conf.getboolean("traces", "otel_debugging_on"), + ) + + +def get_otel_tracer_for_task(cls) -> OtelTrace: + return get_otel_tracer(cls, use_simple_processor=True) diff --git a/airflow-core/src/airflow/plugins_manager.py b/airflow-core/src/airflow/plugins_manager.py index 6f74a8e958fe7..06f6fd3a28271 100644 --- a/airflow-core/src/airflow/plugins_manager.py +++ b/airflow-core/src/airflow/plugins_manager.py @@ -337,7 +337,7 @@ def ensure_plugins_loaded(): Plugins are only loaded if they have not been previously loaded. """ - from airflow.stats import Stats + from airflow.observability.stats import Stats global plugins diff --git a/airflow-core/src/airflow/serialization/serde.py b/airflow-core/src/airflow/serialization/serde.py index d4e86c25f2566..8aa90c5d9a1bf 100644 --- a/airflow-core/src/airflow/serialization/serde.py +++ b/airflow-core/src/airflow/serialization/serde.py @@ -32,8 +32,8 @@ import airflow.serialization.serializers from airflow.configuration import conf +from airflow.observability.stats import Stats from airflow.serialization.typing import is_pydantic_model -from airflow.stats import Stats from airflow.utils.module_loading import import_string, iter_namespace, qualname if TYPE_CHECKING: diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index 258995e420c25..505fe2cca2efc 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -75,6 +75,7 @@ from airflow.models.tasklog import LogTemplate from airflow.models.xcom import XComModel from airflow.models.xcom_arg import SchedulerXComArg, deserialize_xcom_arg +from airflow.observability.stats import Stats from airflow.sdk import DAG, Asset, AssetAlias, AssetAll, AssetAny, BaseOperator, XComArg from airflow.sdk.bases.operator import OPERATOR_DEFAULTS # TODO: Copy this into the scheduler? from airflow.sdk.definitions._internal.node import DAGNode @@ -111,7 +112,6 @@ from airflow.serialization.helpers import TimetableNotRegistered, serialize_template_field from airflow.serialization.json_schema import load_dag_schema from airflow.settings import DAGS_FOLDER, json -from airflow.stats import Stats from airflow.task.priority_strategy import ( PriorityWeightStrategy, airflow_priority_weight_strategies, diff --git a/airflow-core/src/airflow/traces/utils.py b/airflow-core/src/airflow/traces/utils.py deleted file mode 100644 index 9932c249f0772..0000000000000 --- a/airflow-core/src/airflow/traces/utils.py +++ /dev/null @@ -1,114 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import logging -from typing import TYPE_CHECKING - -from airflow.traces import NO_TRACE_ID -from airflow.utils.hashlib_wrapper import md5 - -if TYPE_CHECKING: - from airflow.models import DagRun, TaskInstance - from airflow.models.taskinstancekey import TaskInstanceKey - -TRACE_ID = 0 -SPAN_ID = 16 - -log = logging.getLogger(__name__) - - -def _gen_id(seeds: list[str], as_int: bool = False, type: int = TRACE_ID) -> str | int: - seed_str = "_".join(seeds).encode("utf-8") - hash_hex = md5(seed_str).hexdigest()[type:] - return int(hash_hex, 16) if as_int else hash_hex - - -def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int: - if dag_run.start_date is None: - return NO_TRACE_ID - - """Generate trace id from DagRun.""" - return _gen_id( - [dag_run.dag_id, str(dag_run.run_id), str(dag_run.start_date.timestamp())], - as_int, - ) - - -def gen_span_id_from_ti_key(ti_key: TaskInstanceKey, as_int: bool = False) -> str | int: - """Generate span id from TI key.""" - return _gen_id( - [ti_key.dag_id, str(ti_key.run_id), ti_key.task_id, str(ti_key.try_number)], - as_int, - SPAN_ID, - ) - - -def gen_dag_span_id(dag_run: DagRun, as_int: bool = False) -> str | int: - """Generate dag's root span id using dag_run.""" - if dag_run.start_date is None: - return NO_TRACE_ID - - return _gen_id( - [dag_run.dag_id, str(dag_run.run_id), str(dag_run.start_date.timestamp())], - as_int, - SPAN_ID, - ) - - -def gen_span_id(ti: TaskInstance, as_int: bool = False) -> str | int: - """Generate span id from the task instance.""" - dag_run = ti.dag_run - return _gen_id( - [dag_run.dag_id, dag_run.run_id, ti.task_id, str(ti.try_number)], - as_int, - SPAN_ID, - ) - - -def parse_traceparent(traceparent_str: str | None = None) -> dict: - """Parse traceparent string: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01.""" - if traceparent_str is None: - return {} - tokens = traceparent_str.split("-") - if len(tokens) != 4: - raise ValueError("The traceparent string does not have the correct format.") - return {"version": tokens[0], "trace_id": tokens[1], "parent_id": tokens[2], "flags": tokens[3]} - - -def parse_tracestate(tracestate_str: str | None = None) -> dict: - """Parse tracestate string: rojo=00f067aa0ba902b7,congo=t61rcWkgMzE.""" - if tracestate_str is None or len(tracestate_str) == 0: - return {} - tokens = tracestate_str.split(",") - result = {} - for pair in tokens: - if "=" in pair: - key, value = pair.split("=") - result[key.strip()] = value.strip() - return result - - -def is_valid_trace_id(trace_id: str) -> bool: - """Check whether trace id is valid.""" - return trace_id is not None and len(trace_id) == 34 and int(trace_id, 16) != 0 - - -def is_valid_span_id(span_id: str) -> bool: - """Check whether span id is valid.""" - return span_id is not None and len(span_id) == 18 and int(span_id, 16) != 0 diff --git a/airflow-core/tests/integration/otel/dags/otel_test_dag.py b/airflow-core/tests/integration/otel/dags/otel_test_dag.py index d742de938bafd..6c005a9927ee9 100644 --- a/airflow-core/tests/integration/otel/dags/otel_test_dag.py +++ b/airflow-core/tests/integration/otel/dags/otel_test_dag.py @@ -23,8 +23,8 @@ from airflow import DAG from airflow.sdk import chain, task -from airflow.traces import otel_tracer -from airflow.traces.tracer import Trace +from airflow.sdk.observability.trace import Trace +from airflow.sdk.observability.traces import otel_tracer logger = logging.getLogger("airflow.otel_test_dag") diff --git a/airflow-core/tests/integration/otel/dags/otel_test_dag_with_pause_between_tasks.py b/airflow-core/tests/integration/otel/dags/otel_test_dag_with_pause_between_tasks.py index 3bdd0d3314eda..72fb9148a40e5 100644 --- a/airflow-core/tests/integration/otel/dags/otel_test_dag_with_pause_between_tasks.py +++ b/airflow-core/tests/integration/otel/dags/otel_test_dag_with_pause_between_tasks.py @@ -28,8 +28,8 @@ from airflow.models import TaskInstance from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS from airflow.sdk import chain, task -from airflow.traces import otel_tracer -from airflow.traces.tracer import Trace +from airflow.sdk.observability.trace import Trace +from airflow.sdk.observability.traces import otel_tracer from airflow.utils.session import create_session logger = logging.getLogger("airflow.otel_test_dag_with_pause") diff --git a/airflow-core/tests/integration/otel/dags/otel_test_dag_with_pause_in_task.py b/airflow-core/tests/integration/otel/dags/otel_test_dag_with_pause_in_task.py index 92c174fb5547c..dfc5c30243f08 100644 --- a/airflow-core/tests/integration/otel/dags/otel_test_dag_with_pause_in_task.py +++ b/airflow-core/tests/integration/otel/dags/otel_test_dag_with_pause_in_task.py @@ -28,8 +28,8 @@ from airflow.models import TaskInstance from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS from airflow.sdk import chain, task -from airflow.traces import otel_tracer -from airflow.traces.tracer import Trace +from airflow.sdk.observability.trace import Trace +from airflow.sdk.observability.traces import otel_tracer from airflow.utils.session import create_session logger = logging.getLogger("airflow.otel_test_dag_with_pause_in_task") diff --git a/airflow-core/tests/integration/otel/test_otel.py b/airflow-core/tests/integration/otel/test_otel.py index 5bb6a45534196..81ab540c501dc 100644 --- a/airflow-core/tests/integration/otel/test_otel.py +++ b/airflow-core/tests/integration/otel/test_otel.py @@ -610,7 +610,7 @@ class TestOtelIntegration: "--daemon", ] - dags: dict[str, DAG] = {} + dags: dict[str, SerializedDAG] = {} @classmethod def setup_class(cls): diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 7779e3b519da4..5f03ab7aa4b3b 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -67,6 +67,7 @@ from airflow.models.taskinstance import TaskInstance from airflow.models.team import Team from airflow.models.trigger import Trigger +from airflow.observability.trace import Trace from airflow.providers.standard.operators.bash import BashOperator from airflow.providers.standard.operators.empty import EmptyOperator from airflow.providers.standard.triggers.temporal import DateTimeTrigger @@ -74,7 +75,6 @@ from airflow.sdk.definitions.callback import AsyncCallback, SyncCallback from airflow.serialization.serialized_objects import LazyDeserializedDAG, SerializedDAG from airflow.timetables.base import DataInterval -from airflow.traces.tracer import Trace from airflow.utils.session import create_session, provide_session from airflow.utils.span_status import SpanStatus from airflow.utils.state import DagRunState, State, TaskInstanceState diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index edee23978301f..ef6b5095fc5c2 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -41,6 +41,7 @@ from airflow.models.taskinstance import TaskInstance, TaskInstanceNote, clear_task_instances from airflow.models.taskmap import TaskMap from airflow.models.taskreschedule import TaskReschedule +from airflow.observability.stats import Stats from airflow.providers.standard.operators.bash import BashOperator from airflow.providers.standard.operators.empty import EmptyOperator from airflow.providers.standard.operators.python import PythonOperator, ShortCircuitOperator @@ -48,7 +49,6 @@ from airflow.sdk.definitions.callback import AsyncCallback from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference from airflow.serialization.serialized_objects import LazyDeserializedDAG, SerializedDAG -from airflow.stats import Stats from airflow.task.trigger_rule import TriggerRule from airflow.triggers.base import StartTriggerArgs from airflow.utils.span_status import SpanStatus @@ -564,7 +564,7 @@ def test_end_dr_span_if_needed(self, testing_dag_bundle, dag_maker, session): active_spans = ThreadSafeDict() dag_run.set_active_spans(active_spans) - from airflow.traces.tracer import Trace + from airflow.observability.trace import Trace dr_span = Trace.start_root_span(span_name="test_span", start_as_current=False) diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index ef8122de39e29..2e50fee1eac62 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -65,6 +65,7 @@ from airflow.models.taskreschedule import TaskReschedule from airflow.models.variable import Variable from airflow.models.xcom import XComModel +from airflow.observability.stats import Stats from airflow.providers.standard.operators.bash import BashOperator from airflow.providers.standard.operators.empty import EmptyOperator from airflow.providers.standard.operators.hitl import ( @@ -81,7 +82,6 @@ AssetEventsResult, ) from airflow.serialization.serialized_objects import SerializedBaseOperator, SerializedDAG -from airflow.stats import Stats from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.dependencies_deps import REQUEUEABLE_DEPS, RUNNING_DEPS from airflow.ti_deps.dependencies_states import RUNNABLE_STATES diff --git a/airflow-core/tests/unit/observability/__init__.py b/airflow-core/tests/unit/observability/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow-core/tests/unit/observability/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow-core/tests/unit/observability/metrics/__init__.py b/airflow-core/tests/unit/observability/metrics/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow-core/tests/unit/observability/metrics/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow-core/tests/unit/core/test_stats.py b/airflow-core/tests/unit/observability/metrics/test_stats.py similarity index 80% rename from airflow-core/tests/unit/core/test_stats.py rename to airflow-core/tests/unit/observability/metrics/test_stats.py index 7fe0a469188e0..5dec1c0f71ab7 100644 --- a/airflow-core/tests/unit/core/test_stats.py +++ b/airflow-core/tests/unit/observability/metrics/test_stats.py @@ -28,10 +28,11 @@ import statsd import airflow -from airflow.exceptions import AirflowConfigException, InvalidStatsNameException -from airflow.metrics.datadog_logger import SafeDogStatsdLogger -from airflow.metrics.statsd_logger import SafeStatsdLogger -from airflow.metrics.validators import ( +from airflow._shared.configuration import AirflowConfigException +from airflow._shared.observability.exceptions import InvalidStatsNameException +from airflow._shared.observability.metrics.datadog_logger import SafeDogStatsdLogger +from airflow._shared.observability.metrics.statsd_logger import SafeStatsdLogger +from airflow._shared.observability.metrics.validators import ( PatternAllowListValidator, PatternBlockListValidator, ) @@ -101,11 +102,11 @@ def test_decr(self): def test_enabled_by_config(self): """Test that enabling this sets the right instance properties""" with conf_vars({("metrics", "statsd_on"): "True"}): - importlib.reload(airflow.stats) - assert isinstance(airflow.stats.Stats.statsd, statsd.StatsClient) - assert not hasattr(airflow.stats.Stats, "dogstatsd") + importlib.reload(airflow.observability.stats) + assert isinstance(airflow.observability.stats.Stats.statsd, statsd.StatsClient) + assert not hasattr(airflow.observability.stats.Stats, "dogstatsd") # Avoid side-effects - importlib.reload(airflow.stats) + importlib.reload(airflow.observability.stats) def test_load_custom_statsd_client(self): with conf_vars( @@ -114,10 +115,10 @@ def test_load_custom_statsd_client(self): ("metrics", "statsd_custom_client_path"): f"{__name__}.CustomStatsd", } ): - importlib.reload(airflow.stats) - assert isinstance(airflow.stats.Stats.statsd, CustomStatsd) + importlib.reload(airflow.observability.stats) + assert isinstance(airflow.observability.stats.Stats.statsd, CustomStatsd) # Avoid side-effects - importlib.reload(airflow.stats) + importlib.reload(airflow.observability.stats) def test_load_invalid_custom_stats_client(self): with conf_vars( @@ -126,14 +127,14 @@ def test_load_invalid_custom_stats_client(self): ("metrics", "statsd_custom_client_path"): f"{__name__}.InvalidCustomStatsd", } ): - importlib.reload(airflow.stats) + importlib.reload(airflow.observability.stats) error_message = re.escape( "Your custom StatsD client must extend the statsd." "StatsClient in order to ensure backwards compatibility." ) with pytest.raises(AirflowConfigException, match=error_message): - airflow.stats.Stats.incr("empty_key") - importlib.reload(airflow.stats) + airflow.observability.stats.Stats.incr("empty_key") + importlib.reload(airflow.observability.stats) def test_load_allow_list_validator(self): with conf_vars( @@ -142,11 +143,14 @@ def test_load_allow_list_validator(self): ("metrics", "metrics_allow_list"): "name1,name2", } ): - importlib.reload(airflow.stats) - assert type(airflow.stats.Stats.metrics_validator) is PatternAllowListValidator - assert airflow.stats.Stats.metrics_validator.validate_list == ("name1", "name2") + importlib.reload(airflow.observability.stats) + assert type(airflow.observability.stats.Stats.metrics_validator) is PatternAllowListValidator + assert airflow.observability.stats.Stats.metrics_validator.validate_list == ( + "name1", + "name2", + ) # Avoid side-effects - importlib.reload(airflow.stats) + importlib.reload(airflow.observability.stats) def test_load_block_list_validator(self): with conf_vars( @@ -155,11 +159,14 @@ def test_load_block_list_validator(self): ("metrics", "metrics_block_list"): "name1,name2", } ): - importlib.reload(airflow.stats) - assert type(airflow.stats.Stats.metrics_validator) is PatternBlockListValidator - assert airflow.stats.Stats.metrics_validator.validate_list == ("name1", "name2") + importlib.reload(airflow.observability.stats) + assert type(airflow.observability.stats.Stats.metrics_validator) is PatternBlockListValidator + assert airflow.observability.stats.Stats.metrics_validator.validate_list == ( + "name1", + "name2", + ) # Avoid side-effects - importlib.reload(airflow.stats) + importlib.reload(airflow.observability.stats) def test_load_allow_and_block_list_validator_loads_only_allow_list_validator(self): with conf_vars( @@ -169,11 +176,14 @@ def test_load_allow_and_block_list_validator_loads_only_allow_list_validator(sel ("metrics", "metrics_block_list"): "name1,name2", } ): - importlib.reload(airflow.stats) - assert type(airflow.stats.Stats.metrics_validator) is PatternAllowListValidator - assert airflow.stats.Stats.metrics_validator.validate_list == ("name1", "name2") + importlib.reload(airflow.observability.stats) + assert type(airflow.observability.stats.Stats.metrics_validator) is PatternAllowListValidator + assert airflow.observability.stats.Stats.metrics_validator.validate_list == ( + "name1", + "name2", + ) # Avoid side-effects - importlib.reload(airflow.stats) + importlib.reload(airflow.observability.stats) class TestDogStats: @@ -259,11 +269,11 @@ def test_enabled_by_config(self): from datadog import DogStatsd with conf_vars({("metrics", "statsd_datadog_enabled"): "True"}): - importlib.reload(airflow.stats) - assert isinstance(airflow.stats.Stats.dogstatsd, DogStatsd) - assert not hasattr(airflow.stats.Stats, "statsd") + importlib.reload(airflow.observability.stats) + assert isinstance(airflow.observability.stats.Stats.dogstatsd, DogStatsd) + assert not hasattr(airflow.observability.stats.Stats, "statsd") # Avoid side-effects - importlib.reload(airflow.stats) + importlib.reload(airflow.observability.stats) def test_does_not_send_stats_using_statsd_when_statsd_and_dogstatsd_both_on(self): from datadog import DogStatsd @@ -274,10 +284,10 @@ def test_does_not_send_stats_using_statsd_when_statsd_and_dogstatsd_both_on(self ("metrics", "statsd_datadog_enabled"): "True", } ): - importlib.reload(airflow.stats) - assert isinstance(airflow.stats.Stats.dogstatsd, DogStatsd) - assert not hasattr(airflow.stats.Stats, "statsd") - importlib.reload(airflow.stats) + importlib.reload(airflow.observability.stats) + assert isinstance(airflow.observability.stats.Stats.dogstatsd, DogStatsd) + assert not hasattr(airflow.observability.stats.Stats, "statsd") + importlib.reload(airflow.observability.stats) class TestStatsAllowAndBlockLists: @@ -336,7 +346,7 @@ def test_regex_matches(self, match_pattern, expect_incr): class TestPatternValidatorConfigOption: def teardown_method(self): # Avoid side-effects - importlib.reload(airflow.stats) + importlib.reload(airflow.observability.stats) stats_on = {("metrics", "statsd_on"): "True"} allow_list = {("metrics", "metrics_allow_list"): "foo,bar"} @@ -369,18 +379,21 @@ def teardown_method(self): ) def test_pattern_picker(self, config, expected): with conf_vars(config): - importlib.reload(airflow.stats) + importlib.reload(airflow.observability.stats) - assert isinstance(airflow.stats.Stats.statsd, statsd.StatsClient) - assert type(airflow.stats.Stats.instance.metrics_validator) is expected + assert isinstance(airflow.observability.stats.Stats.statsd, statsd.StatsClient) + assert type(airflow.observability.stats.Stats.instance.metrics_validator) is expected @conf_vars({**stats_on, **block_list, ("metrics", "metrics_allow_list"): "baz,qux"}) def test_setting_allow_and_block_logs_warning(self, caplog): with caplog.at_level(logging.WARNING): - importlib.reload(airflow.stats) + importlib.reload(airflow.observability.stats) - assert isinstance(airflow.stats.Stats.statsd, statsd.StatsClient) - assert type(airflow.stats.Stats.instance.metrics_validator) is PatternAllowListValidator + assert isinstance(airflow.observability.stats.Stats.statsd, statsd.StatsClient) + assert ( + type(airflow.observability.stats.Stats.instance.metrics_validator) + is PatternAllowListValidator + ) assert "Ignoring metrics_block_list" in caplog.text @@ -450,7 +463,6 @@ def setup_method(self): with conf_vars( { ("metrics", "statsd_on"): "True", - ("metrics", "statsd_influxdb_enabled"): "True", } ): self.statsd_client = Mock(spec=statsd.StatsClient) @@ -458,6 +470,7 @@ def setup_method(self): self.statsd_client, influxdb_tags_enabled=True, metric_tags_validator=PatternBlockListValidator("key2,key3"), + statsd_influxdb_enabled=True, ) def test_increment_counter(self): @@ -497,55 +510,56 @@ class TestCustomStatsName: @conf_vars( { ("metrics", "statsd_on"): "True", - ("metrics", "stat_name_handler"): "unit.core.test_stats.always_invalid", + ("metrics", "stat_name_handler"): "unit.observability.metrics.test_stats.always_invalid", } ) @mock.patch("statsd.StatsClient") def test_does_not_send_stats_using_statsd_when_the_name_is_not_valid(self, mock_statsd): - importlib.reload(airflow.stats) - airflow.stats.Stats.incr("empty_key") + importlib.reload(airflow.observability.stats) + airflow.observability.stats.Stats.incr("empty_key") mock_statsd.return_value.assert_not_called() @skip_if_force_lowest_dependencies_marker @conf_vars( { ("metrics", "statsd_datadog_enabled"): "True", - ("metrics", "stat_name_handler"): "unit.core.test_stats.always_invalid", + ("metrics", "stat_name_handler"): "unit.observability.metrics.test_stats.always_invalid", } ) @mock.patch("datadog.DogStatsd") def test_does_not_send_stats_using_dogstatsd_when_the_name_is_not_valid(self, mock_dogstatsd): - importlib.reload(airflow.stats) - airflow.stats.Stats.incr("empty_key") + importlib.reload(airflow.observability.stats) + airflow.observability.stats.Stats.incr("empty_key") mock_dogstatsd.return_value.assert_not_called() @conf_vars( { ("metrics", "statsd_on"): "True", - ("metrics", "stat_name_handler"): "unit.core.test_stats.always_valid", + ("metrics", "stat_name_handler"): "unit.observability.metrics.test_stats.always_valid", } ) @mock.patch("statsd.StatsClient") def test_does_send_stats_using_statsd_when_the_name_is_valid(self, mock_statsd): - importlib.reload(airflow.stats) - airflow.stats.Stats.incr("empty_key") + importlib.reload(airflow.observability.stats) + importlib.reload(airflow.observability.stats) + airflow.observability.stats.Stats.incr("empty_key") mock_statsd.return_value.incr.assert_called_once_with("empty_key", 1, 1) @skip_if_force_lowest_dependencies_marker @conf_vars( { ("metrics", "statsd_datadog_enabled"): "True", - ("metrics", "stat_name_handler"): "unit.core.test_stats.always_valid", + ("metrics", "stat_name_handler"): "unit.observability.metrics.test_stats.always_valid", } ) @mock.patch("datadog.DogStatsd") def test_does_send_stats_using_dogstatsd_when_the_name_is_valid(self, mock_dogstatsd): - importlib.reload(airflow.stats) - airflow.stats.Stats.incr("empty_key") + importlib.reload(airflow.observability.stats) + airflow.observability.stats.Stats.incr("empty_key") mock_dogstatsd.return_value.increment.assert_called_once_with( metric="empty_key", sample_rate=1, tags=[], value=1 ) def teardown_method(self) -> None: # To avoid side-effect - importlib.reload(airflow.stats) + importlib.reload(airflow.observability.stats) diff --git a/airflow-core/tests/unit/observability/traces/__init__.py b/airflow-core/tests/unit/observability/traces/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow-core/tests/unit/observability/traces/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow-core/tests/unit/core/test_otel_tracer.py b/airflow-core/tests/unit/observability/traces/test_otel_tracer.py similarity index 90% rename from airflow-core/tests/unit/core/test_otel_tracer.py rename to airflow-core/tests/unit/observability/traces/test_otel_tracer.py index c6be7ba51ea13..ec124e1a24e49 100644 --- a/airflow-core/tests/unit/core/test_otel_tracer.py +++ b/airflow-core/tests/unit/observability/traces/test_otel_tracer.py @@ -25,13 +25,13 @@ from opentelemetry.sdk import util from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter -from airflow.configuration import conf -from airflow.traces import otel_tracer -from airflow.traces.otel_tracer import OtelTrace -from airflow.traces.tracer import DebugTrace, EmptyTrace, Trace -from airflow.utils.dates import datetime_to_nano +from airflow._shared.observability.traces.base_tracer import EmptyTrace +from airflow._shared.observability.traces.otel_tracer import OtelTrace +from airflow._shared.observability.traces.utils import datetime_to_nano +from airflow.observability.trace import DebugTrace, Trace +from airflow.observability.traces import otel_tracer -from tests_common.test_utils.config import env_vars +from tests_common.test_utils.config import conf_vars, env_vars @pytest.fixture @@ -40,11 +40,14 @@ def name(): class TestOtelTrace: + @conf_vars( + { + ("traces", "otel_on"): "True", + ("traces", "otel_debugging_on"): "True", + } + ) def test_get_otel_tracer_from_trace_metaclass(self): """Test that `Trace.some_method()`, uses an `OtelTrace` instance when otel is configured.""" - conf.set("traces", "otel_on", "True") - conf.set("traces", "otel_debugging_on", "True") - tracer = otel_tracer.get_otel_tracer(Trace) assert tracer.use_simple_processor is False @@ -59,11 +62,14 @@ def test_get_otel_tracer_from_trace_metaclass(self): task_tracer.get_otel_tracer_provider() assert task_tracer.use_simple_processor is True + @conf_vars( + { + ("traces", "otel_on"): "True", + ("traces", "otel_debug_traces_on"): "False", + } + ) def test_debug_trace_metaclass(self): """Test that `DebugTrace.some_method()`, uses the correct instance when the debug_traces flag is configured.""" - conf.set("traces", "otel_on", "True") - conf.set("traces", "otel_debug_traces_on", "False") - assert DebugTrace.check_debug_traces_flag is True # Factory hasn't been configured, it defaults to EmptyTrace. @@ -76,7 +82,7 @@ def test_debug_trace_metaclass(self): assert isinstance(DebugTrace.factory(), EmptyTrace) @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter") - @patch("airflow.traces.otel_tracer.conf") + @patch("airflow.observability.traces.otel_tracer.conf") def test_tracer(self, conf_a, exporter): # necessary to speed up the span to be emitted with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}): @@ -113,7 +119,7 @@ def test_tracer(self, conf_a, exporter): assert span2["resource"]["attributes"]["service.name"] == "abc" @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter") - @patch("airflow.traces.otel_tracer.conf") + @patch("airflow.observability.traces.otel_tracer.conf") def test_dag_tracer(self, conf_a, exporter): # necessary to speed up the span to be emitted with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}): @@ -146,7 +152,7 @@ def test_dag_tracer(self, conf_a, exporter): assert span1["context"]["span_id"] == span2["parent_id"] @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter") - @patch("airflow.traces.otel_tracer.conf") + @patch("airflow.observability.traces.otel_tracer.conf") def test_context_propagation(self, conf_a, exporter): # necessary to speed up the span to be emitted with env_vars({"OTEL_BSP_SCHEDULE_DELAY": "1"}): diff --git a/contributing-docs/05_pull_requests.rst b/contributing-docs/05_pull_requests.rst index fe801d1e3d798..a0d41e7ffc353 100644 --- a/contributing-docs/05_pull_requests.rst +++ b/contributing-docs/05_pull_requests.rst @@ -213,7 +213,7 @@ will be timed and submitted automatically: .. code-block:: python - from airflow.stats import Stats + from airflow.observability.stats import Stats ... @@ -224,7 +224,7 @@ or to time but not send a metric: .. code-block:: python - from airflow.stats import Stats + from airflow.observability.stats import Stats ... diff --git a/dev/breeze/tests/test_pytest_args_for_test_types.py b/dev/breeze/tests/test_pytest_args_for_test_types.py index d3b8abd00125b..b2b8193c5ebbf 100644 --- a/dev/breeze/tests/test_pytest_args_for_test_types.py +++ b/dev/breeze/tests/test_pytest_args_for_test_types.py @@ -167,6 +167,7 @@ def _find_all_integration_folders() -> list[str]: "airflow-core/tests/unit/listeners", "airflow-core/tests/unit/logging", "airflow-core/tests/unit/macros", + "airflow-core/tests/unit/observability", "airflow-core/tests/unit/plugins", "airflow-core/tests/unit/security", "airflow-core/tests/unit/sensors", diff --git a/providers/amazon/pyproject.toml b/providers/amazon/pyproject.toml index 1296686e9a6ce..8b570e36774f3 100644 --- a/providers/amazon/pyproject.toml +++ b/providers/amazon/pyproject.toml @@ -59,7 +59,7 @@ requires-python = ">=3.10" # After you modify the dependencies, and rebuild your Breeze CI image with ``breeze ci-image build`` dependencies = [ "apache-airflow>=2.11.0", - "apache-airflow-providers-common-compat>=1.10.0", + "apache-airflow-providers-common-compat>=1.10.0", # use next version "apache-airflow-providers-common-sql>=1.27.0", "apache-airflow-providers-http", # We should update minimum version of boto3 and here regularly to avoid `pip` backtracking with the number diff --git a/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py b/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py index d27ca04ce159d..46cc3d0751d51 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py @@ -42,7 +42,7 @@ ) from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook from airflow.providers.amazon.aws.hooks.sqs import SqsHook -from airflow.stats import Stats +from airflow.providers.common.compat.sdk import Stats try: from airflow.sdk import timezone diff --git a/providers/amazon/src/airflow/providers/amazon/aws/executors/batch/batch_executor.py b/providers/amazon/src/airflow/providers/amazon/aws/executors/batch/batch_executor.py index 7afb061df8b31..11cf6aa676791 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/executors/batch/batch_executor.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/executors/batch/batch_executor.py @@ -37,7 +37,7 @@ ) from airflow.providers.amazon.aws.hooks.batch_client import BatchClientHook from airflow.providers.amazon.version_compat import AIRFLOW_V_3_0_PLUS -from airflow.stats import Stats +from airflow.providers.common.compat.sdk import Stats try: from airflow.sdk import timezone diff --git a/providers/amazon/src/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py b/providers/amazon/src/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py index 396e998af611a..af1815fba0ec4 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py @@ -49,7 +49,7 @@ ) from airflow.providers.amazon.aws.hooks.ecs import EcsHook from airflow.providers.amazon.version_compat import AIRFLOW_V_3_0_PLUS -from airflow.stats import Stats +from airflow.providers.common.compat.sdk import Stats try: from airflow.sdk import timezone diff --git a/providers/celery/pyproject.toml b/providers/celery/pyproject.toml index e1099fe937738..563bbd98236f3 100644 --- a/providers/celery/pyproject.toml +++ b/providers/celery/pyproject.toml @@ -59,7 +59,7 @@ requires-python = ">=3.10" # After you modify the dependencies, and rebuild your Breeze CI image with ``breeze ci-image build`` dependencies = [ "apache-airflow>=2.11.0", - "apache-airflow-providers-common-compat>=1.10.0", + "apache-airflow-providers-common-compat>=1.10.0", # use next version # The Celery is known to introduce problems when upgraded to a MAJOR version. Airflow Core # Uses Celery for CeleryExecutor, and we also know that Kubernetes Python client follows SemVer # (https://docs.celeryq.dev/en/stable/contributing.html?highlight=semver#versions). diff --git a/providers/celery/src/airflow/providers/celery/executors/celery_executor.py b/providers/celery/src/airflow/providers/celery/executors/celery_executor.py index 2fa79da95ec47..367a8c7969ae4 100644 --- a/providers/celery/src/airflow/providers/celery/executors/celery_executor.py +++ b/providers/celery/src/airflow/providers/celery/executors/celery_executor.py @@ -54,8 +54,7 @@ from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.executors.base_executor import BaseExecutor from airflow.providers.celery.version_compat import AIRFLOW_V_3_0_PLUS -from airflow.providers.common.compat.sdk import AirflowTaskTimeout -from airflow.stats import Stats +from airflow.providers.common.compat.sdk import AirflowTaskTimeout, Stats from airflow.utils.state import TaskInstanceState log = logging.getLogger(__name__) diff --git a/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py b/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py index fc2ad5e44003c..ab2d0201a5ec1 100644 --- a/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py +++ b/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py @@ -45,8 +45,7 @@ from airflow.exceptions import AirflowException from airflow.executors.base_executor import BaseExecutor from airflow.providers.celery.version_compat import AIRFLOW_V_3_0_PLUS -from airflow.providers.common.compat.sdk import AirflowTaskTimeout, timeout -from airflow.stats import Stats +from airflow.providers.common.compat.sdk import AirflowTaskTimeout, Stats, timeout from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.net import get_hostname from airflow.utils.providers_configuration_loader import providers_configuration_loaded diff --git a/providers/cncf/kubernetes/pyproject.toml b/providers/cncf/kubernetes/pyproject.toml index c2b770ec56bab..349229b8ca8ac 100644 --- a/providers/cncf/kubernetes/pyproject.toml +++ b/providers/cncf/kubernetes/pyproject.toml @@ -60,7 +60,7 @@ requires-python = ">=3.10" dependencies = [ "aiofiles>=23.2.0", "apache-airflow>=2.11.0", - "apache-airflow-providers-common-compat>=1.10.0", + "apache-airflow-providers-common-compat>=1.10.0", # use next version "asgiref>=3.5.2", # TODO(potiuk): We should bump cryptography to >=46.0.0 when sqlalchemy>=2.0 is required "cryptography>=41.0.0,<46.0.0", diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index efb018b5cbf80..4756405523cda 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -30,7 +30,6 @@ import multiprocessing import time from collections import Counter, defaultdict -from collections.abc import Sequence from contextlib import suppress from datetime import datetime from queue import Empty, Queue @@ -71,7 +70,7 @@ ) from airflow.providers.cncf.kubernetes.kube_config import KubeConfig from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import annotations_to_key -from airflow.stats import Stats +from airflow.providers.common.compat.sdk import Stats from airflow.utils.log.logging_mixin import remove_escape_codes from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.state import TaskInstanceState diff --git a/providers/common/compat/src/airflow/providers/common/compat/sdk.py b/providers/common/compat/src/airflow/providers/common/compat/sdk.py index 7be62f61b7904..200a79b547c50 100644 --- a/providers/common/compat/src/airflow/providers/common/compat/sdk.py +++ b/providers/common/compat/src/airflow/providers/common/compat/sdk.py @@ -86,6 +86,7 @@ ParamValidationError as ParamValidationError, TaskDeferred as TaskDeferred, ) + from airflow.sdk.observability.stats import Stats # noqa: F401 # Airflow 3-only exceptions (conditionally imported) if AIRFLOW_V_3_0_PLUS: @@ -100,6 +101,7 @@ from airflow.sdk.execution_time.timeout import timeout as timeout from airflow.sdk.execution_time.xcom import XCom as XCom + from airflow.providers.common.compat._compat_utils import create_module_getattr # Rename map for classes that changed names between Airflow 2.x and 3.x @@ -224,6 +226,10 @@ "AirflowFailException": ("airflow.sdk.exceptions", "airflow.exceptions"), "ParamValidationError": ("airflow.sdk.exceptions", "airflow.exceptions"), "TaskDeferred": ("airflow.sdk.exceptions", "airflow.exceptions"), + # ============================================================================ + # Observability + # ============================================================================ + "Stats": ("airflow.sdk.observability.stats", "airflow.stats"), } # Airflow 3-only exceptions (not available in Airflow 2) diff --git a/providers/edge3/pyproject.toml b/providers/edge3/pyproject.toml index eb98938e7cbbd..41fd6d063c6ed 100644 --- a/providers/edge3/pyproject.toml +++ b/providers/edge3/pyproject.toml @@ -59,7 +59,7 @@ requires-python = ">=3.10" # After you modify the dependencies, and rebuild your Breeze CI image with ``breeze ci-image build`` dependencies = [ "apache-airflow>=2.11.0,!=3.1.0", - "apache-airflow-providers-common-compat>=1.10.0", + "apache-airflow-providers-common-compat>=1.10.0", # use next version "pydantic>=2.11.0", "retryhttp>=1.2.0,!=1.3.0", ] diff --git a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py index ea818b6fb7043..dc2821831cb1d 100644 --- a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py +++ b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py @@ -31,13 +31,12 @@ from airflow.configuration import conf from airflow.executors.base_executor import BaseExecutor from airflow.models.taskinstance import TaskInstance -from airflow.providers.common.compat.sdk import timezone +from airflow.providers.common.compat.sdk import Stats, timezone from airflow.providers.edge3.cli.edge_command import EDGE_COMMANDS from airflow.providers.edge3.models.edge_job import EdgeJobModel from airflow.providers.edge3.models.edge_logs import EdgeLogsModel from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, EdgeWorkerState, reset_metrics from airflow.providers.edge3.version_compat import AIRFLOW_V_3_0_PLUS -from airflow.stats import Stats from airflow.utils.db import DBLocks, create_global_lock from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.state import TaskInstanceState diff --git a/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py b/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py index f8d3ff95f43e2..f508af7ce0142 100644 --- a/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py +++ b/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py @@ -28,9 +28,8 @@ from airflow.exceptions import AirflowException from airflow.models.base import Base -from airflow.providers.common.compat.sdk import timezone +from airflow.providers.common.compat.sdk import Stats, timezone from airflow.providers.common.compat.sqlalchemy.orm import mapped_column -from airflow.stats import Stats from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.providers_configuration_loader import providers_configuration_loaded from airflow.utils.session import NEW_SESSION, provide_session diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py index a162ffc9db70a..9f0c282fb7025 100644 --- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py +++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py @@ -21,7 +21,7 @@ from sqlalchemy import select, update -from airflow.providers.common.compat.sdk import timezone +from airflow.providers.common.compat.sdk import Stats, timezone from airflow.providers.edge3.models.edge_job import EdgeJobModel from airflow.providers.edge3.worker_api.auth import jwt_token_authorization_rest from airflow.providers.edge3.worker_api.datamodels import ( @@ -38,7 +38,6 @@ parse_command, status, ) -from airflow.stats import Stats from airflow.utils.state import TaskInstanceState jobs_router = AirflowRouter(tags=["Jobs"], prefix="/jobs") diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py index c9e9f5078b43c..4e2298c8adb8b 100644 --- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py +++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py @@ -22,7 +22,7 @@ from sqlalchemy import select -from airflow.providers.common.compat.sdk import timezone +from airflow.providers.common.compat.sdk import Stats, timezone from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, EdgeWorkerState, set_metrics from airflow.providers.edge3.worker_api.auth import jwt_token_authorization_rest from airflow.providers.edge3.worker_api.datamodels import ( @@ -41,7 +41,6 @@ create_openapi_http_exception_doc, status, ) -from airflow.stats import Stats worker_router = AirflowRouter( tags=["Worker"], diff --git a/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py b/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py index 990ae25d77f5b..c49f03ead5ed0 100644 --- a/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py +++ b/providers/edge3/tests/unit/edge3/executors/test_edge_executor.py @@ -25,7 +25,7 @@ from airflow.configuration import conf from airflow.models.taskinstancekey import TaskInstanceKey -from airflow.providers.common.compat.sdk import timezone +from airflow.providers.common.compat.sdk import Stats, timezone from airflow.providers.edge3.executors.edge_executor import EdgeExecutor from airflow.providers.edge3.models.edge_job import EdgeJobModel from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, EdgeWorkerState @@ -86,7 +86,7 @@ def test__process_tasks_ok_command(self, pool_slots, expected_concurrency): assert jobs[0].task_id == "test_task" assert jobs[0].concurrency_slots == expected_concurrency - @patch("airflow.stats.Stats.incr") + @patch(f"{Stats.__module__}.Stats.incr") def test_sync_orphaned_tasks(self, mock_stats_incr): executor = EdgeExecutor() diff --git a/providers/edge3/tests/unit/edge3/worker_api/routes/test_jobs.py b/providers/edge3/tests/unit/edge3/worker_api/routes/test_jobs.py index d203cc65fbad6..6845a6d2257ff 100644 --- a/providers/edge3/tests/unit/edge3/worker_api/routes/test_jobs.py +++ b/providers/edge3/tests/unit/edge3/worker_api/routes/test_jobs.py @@ -21,6 +21,7 @@ import pytest +from airflow.providers.common.compat.sdk import Stats from airflow.providers.edge3.models.edge_job import EdgeJobModel from airflow.providers.edge3.worker_api.routes.jobs import state from airflow.utils.session import create_session @@ -44,7 +45,7 @@ def setup_test_cases(self, dag_maker, session: Session): session.query(EdgeJobModel).delete() session.commit() - @patch("airflow.stats.Stats.incr") + @patch(f"{Stats.__module__}.Stats.incr") def test_state(self, mock_stats_incr, session: Session): with create_session() as session: job = EdgeJobModel( diff --git a/providers/openlineage/pyproject.toml b/providers/openlineage/pyproject.toml index b88195362fe20..6018b489f9117 100644 --- a/providers/openlineage/pyproject.toml +++ b/providers/openlineage/pyproject.toml @@ -60,7 +60,7 @@ requires-python = ">=3.10" dependencies = [ "apache-airflow>=2.11.0", "apache-airflow-providers-common-sql>=1.20.0", - "apache-airflow-providers-common-compat>=1.8.0", + "apache-airflow-providers-common-compat>=1.8.0", # use next version "attrs>=22.2", "openlineage-integration-common>=1.40.0", "openlineage-python>=1.40.0", diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py index 091dd68318b24..4fb8a8f76f71e 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py @@ -37,6 +37,7 @@ from openlineage.client.uuid import generate_static_uuid from airflow.configuration import conf as airflow_conf +from airflow.providers.common.compat.sdk import Stats from airflow.providers.openlineage import __version__ as OPENLINEAGE_PROVIDER_VERSION, conf from airflow.providers.openlineage.utils.utils import ( OpenLineageRedactor, @@ -44,7 +45,6 @@ get_airflow_state_run_facet, get_processing_engine_facet, ) -from airflow.stats import Stats from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py index 5f7ed228070a0..cb90f80d19879 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py @@ -30,7 +30,7 @@ from airflow import settings from airflow.listeners import hookimpl from airflow.models import DagRun, TaskInstance -from airflow.providers.common.compat.sdk import timeout, timezone +from airflow.providers.common.compat.sdk import Stats, timeout, timezone from airflow.providers.openlineage import conf from airflow.providers.openlineage.extractors import ExtractorManager, OperatorLineage from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, RunState @@ -52,7 +52,6 @@ print_warning, ) from airflow.settings import configure_orm -from airflow.stats import Stats from airflow.utils.state import TaskInstanceState if TYPE_CHECKING: diff --git a/pyproject.toml b/pyproject.toml index ca6caf0ac6083..9b7d51684f147 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -93,9 +93,6 @@ packages = [] "all-task-sdk" = [ "apache-airflow-task-sdk[all]" ] -"sentry" = [ - "apache-airflow-task-sdk[sentry]" -] "airbyte" = [ "apache-airflow-providers-airbyte>=5.0.0" ] @@ -528,6 +525,9 @@ packages = [] "rabbitmq" = [ "amqp>=5.2.0", ] +"sentry" = [ + "sentry-sdk>=2.30.0", +] "s3fs" = [ # This is required for support of S3 file system which uses aiobotocore # which can have a conflict with boto3 as mentioned in aiobotocore extra @@ -537,7 +537,6 @@ packages = [] "uv>=0.9.15", ] - [project.urls] "Bug Tracker" = "https://github.com/apache/airflow/issues" Documentation = "https://airflow.apache.org/docs/" @@ -1296,6 +1295,7 @@ dev = [ "apache-airflow-shared-secrets-backend", "apache-airflow-shared-secrets-masker", "apache-airflow-shared-timezones", + "apache-airflow-shared-observability", ] # To build docs: @@ -1349,6 +1349,7 @@ apache-airflow-shared-logging = { workspace = true } apache-airflow-shared-secrets-backend = { workspace = true } apache-airflow-shared-secrets-masker = { workspace = true } apache-airflow-shared-timezones = { workspace = true } +apache-airflow-shared-observability = { workspace = true } # Automatically generated provider workspace items (update_airflow_pyproject_toml.py) apache-airflow-providers-airbyte = { workspace = true } apache-airflow-providers-alibaba = { workspace = true } @@ -1471,6 +1472,7 @@ members = [ "shared/secrets_backend", "shared/secrets_masker", "shared/timezones", + "shared/observability", # Automatically generated provider workspace members (update_airflow_pyproject_toml.py) "providers/airbyte", "providers/alibaba", diff --git a/scripts/ci/prek/check_tests_in_right_folders.py b/scripts/ci/prek/check_tests_in_right_folders.py index b0c0ed168e055..43d28d7e4af75 100755 --- a/scripts/ci/prek/check_tests_in_right_folders.py +++ b/scripts/ci/prek/check_tests_in_right_folders.py @@ -63,6 +63,7 @@ "macros", "models", "notifications", + "observability", "operators", "otel", "plugins", diff --git a/scripts/ci/prek/update_airflow_pyproject_toml.py b/scripts/ci/prek/update_airflow_pyproject_toml.py index 2d43e4e2df6e2..5c09d7cb54466 100755 --- a/scripts/ci/prek/update_airflow_pyproject_toml.py +++ b/scripts/ci/prek/update_airflow_pyproject_toml.py @@ -182,13 +182,7 @@ def get_python_exclusion(provider_dependencies: dict[str, Any]) -> str: else: all_optional_dependencies.append(f'"{optional}" = [\n "apache-airflow-core[{optional}]"\n]\n') optional_airflow_task_sdk_dependencies = get_optional_dependencies(AIRFLOW_TASK_SDK_PYPROJECT_TOML_FILE) - for optional in sorted(optional_airflow_task_sdk_dependencies): - if optional == "all": - all_optional_dependencies.append('"all-task-sdk" = [\n "apache-airflow-task-sdk[all]"\n]\n') - else: - all_optional_dependencies.append( - f'"{optional}" = [\n "apache-airflow-task-sdk[{optional}]"\n]\n' - ) + all_optional_dependencies.append('"all-task-sdk" = [\n "apache-airflow-task-sdk[all]"\n]\n') all_providers = sorted(get_all_provider_ids()) all_provider_lines = [] for provider_id in all_providers: diff --git a/shared/observability/pyproject.toml b/shared/observability/pyproject.toml new file mode 100644 index 0000000000000..cdf1755c46873 --- /dev/null +++ b/shared/observability/pyproject.toml @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[project] +name = "apache-airflow-shared-observability" +description = "Shared observability code for Airflow distributions" +version = "0.0" +classifiers = [ + "Private :: Do Not Upload", +] + +dependencies = [ + "pendulum>=3.1.0", + "structlog>=25.4.0", + "methodtools>=0.4.7", +] + +[project.optional-dependencies] +"otel" = [ + "opentelemetry-api>=1.27.0", + "opentelemetry-exporter-otlp>=1.27.0", + "opentelemetry-proto<9999,>=1.27.0", +] +"statsd" = [ + "statsd>=3.3.0", +] +"datadog" = [ + "datadog>=0.50.0", +] +"all" = ["apache-airflow-shared-observability[otel,statsd,datadog]"] + +[dependency-groups] +dev = [ + "apache-airflow-devel-common", + "apache-airflow-shared-observability[all]", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/airflow_shared"] + +[tool.ruff] +extend = "../../pyproject.toml" +src = ["src"] + +[tool.ruff.lint.per-file-ignores] +# Ignore Doc rules et al for anything outside of tests +"!src/*" = ["D", "S101", "TRY002"] diff --git a/shared/observability/src/airflow_shared/observability/__init__.py b/shared/observability/src/airflow_shared/observability/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/shared/observability/src/airflow_shared/observability/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/shared/observability/src/airflow_shared/observability/exceptions.py b/shared/observability/src/airflow_shared/observability/exceptions.py new file mode 100644 index 0000000000000..15d701e37a40d --- /dev/null +++ b/shared/observability/src/airflow_shared/observability/exceptions.py @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Exceptions for observability.""" + +from __future__ import annotations + + +class InvalidStatsNameException(Exception): + """Raise when name of the stats is invalid.""" diff --git a/shared/observability/src/airflow_shared/observability/metrics/__init__.py b/shared/observability/src/airflow_shared/observability/metrics/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/shared/observability/src/airflow_shared/observability/metrics/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow-core/src/airflow/metrics/base_stats_logger.py b/shared/observability/src/airflow_shared/observability/metrics/base_stats_logger.py similarity index 96% rename from airflow-core/src/airflow/metrics/base_stats_logger.py rename to shared/observability/src/airflow_shared/observability/metrics/base_stats_logger.py index 6ee3634f567f8..f64fb5b75ed9c 100644 --- a/airflow-core/src/airflow/metrics/base_stats_logger.py +++ b/shared/observability/src/airflow_shared/observability/metrics/base_stats_logger.py @@ -19,10 +19,10 @@ from typing import TYPE_CHECKING, Any, Protocol -from airflow.metrics.protocols import Timer +from .protocols import Timer if TYPE_CHECKING: - from airflow.metrics.protocols import DeltaType + from .protocols import DeltaType class StatsLogger(Protocol): diff --git a/airflow-core/src/airflow/metrics/datadog_logger.py b/shared/observability/src/airflow_shared/observability/metrics/datadog_logger.py similarity index 80% rename from airflow-core/src/airflow/metrics/datadog_logger.py rename to shared/observability/src/airflow_shared/observability/metrics/datadog_logger.py index aa1925d8b4476..13eaedc40e1d9 100644 --- a/airflow-core/src/airflow/metrics/datadog_logger.py +++ b/shared/observability/src/airflow_shared/observability/metrics/datadog_logger.py @@ -19,11 +19,11 @@ import datetime import logging +from collections.abc import Callable from typing import TYPE_CHECKING -from airflow.configuration import conf -from airflow.metrics.protocols import Timer -from airflow.metrics.validators import ( +from .protocols import Timer +from .validators import ( PatternAllowListValidator, PatternBlockListValidator, get_validator, @@ -33,10 +33,8 @@ if TYPE_CHECKING: from datadog import DogStatsd - from airflow.metrics.protocols import DeltaType - from airflow.metrics.validators import ( - ListValidator, - ) + from .protocols import DeltaType + from .validators import ListValidator log = logging.getLogger(__name__) @@ -50,11 +48,15 @@ def __init__( metrics_validator: ListValidator = PatternAllowListValidator(), metrics_tags: bool = False, metric_tags_validator: ListValidator = PatternAllowListValidator(), + stat_name_handler: Callable[[str], str] | None = None, + statsd_influxdb_enabled: bool = False, ) -> None: self.dogstatsd = dogstatsd_client self.metrics_validator = metrics_validator self.metrics_tags = metrics_tags self.metric_tags_validator = metric_tags_validator + self.stat_name_handler = stat_name_handler + self.statsd_influxdb_enabled = statsd_influxdb_enabled @validate_stat def incr( @@ -157,18 +159,35 @@ def timer( return Timer() -def get_dogstatsd_logger(cls) -> SafeDogStatsdLogger: +def get_dogstatsd_logger( + cls, + *, + host: str | None = None, + port: int | None = None, + namespace: str | None = None, + datadog_metrics_tags: bool = True, + statsd_disabled_tags: str | None = None, + metrics_allow_list: str | None = None, + metrics_block_list: str | None = None, + stat_name_handler: Callable[[str], str] | None = None, + statsd_influxdb_enabled: bool = False, +) -> SafeDogStatsdLogger: """Get DataDog StatsD logger.""" from datadog import DogStatsd dogstatsd = DogStatsd( - host=conf.get("metrics", "statsd_host"), - port=conf.getint("metrics", "statsd_port"), - namespace=conf.get("metrics", "statsd_prefix"), + host, + port, + namespace, constant_tags=cls.get_constant_tags(), ) - datadog_metrics_tags = conf.getboolean("metrics", "statsd_datadog_metrics_tags", fallback=True) - metric_tags_validator = PatternBlockListValidator( - conf.get("metrics", "statsd_disabled_tags", fallback=None) + metric_tags_validator = PatternBlockListValidator(statsd_disabled_tags) + validator = get_validator(metrics_allow_list, metrics_block_list) + return SafeDogStatsdLogger( + dogstatsd, + validator, + datadog_metrics_tags, + metric_tags_validator, + stat_name_handler, + statsd_influxdb_enabled, ) - return SafeDogStatsdLogger(dogstatsd, get_validator(), datadog_metrics_tags, metric_tags_validator) diff --git a/airflow-core/src/airflow/metrics/otel_logger.py b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py similarity index 92% rename from airflow-core/src/airflow/metrics/otel_logger.py rename to shared/observability/src/airflow_shared/observability/metrics/otel_logger.py index 61c66190c681c..c318ba2c46a32 100644 --- a/airflow-core/src/airflow/metrics/otel_logger.py +++ b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py @@ -28,24 +28,22 @@ from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics._internal.export import ConsoleMetricExporter, PeriodicExportingMetricReader -from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource +from opentelemetry.sdk.resources import SERVICE_NAME, Resource -from airflow.configuration import conf -from airflow.metrics.protocols import Timer -from airflow.metrics.validators import ( +from .protocols import Timer +from .validators import ( OTEL_NAME_MAX_LENGTH, ListValidator, PatternAllowListValidator, get_validator, stat_name_otel_handler, ) -from airflow.utils.net import get_hostname if TYPE_CHECKING: from opentelemetry.metrics import Instrument from opentelemetry.util.types import Attributes - from airflow.metrics.protocols import DeltaType + from .protocols import DeltaType log = logging.getLogger(__name__) @@ -172,12 +170,16 @@ def __init__( otel_provider, prefix: str = DEFAULT_METRIC_NAME_PREFIX, metrics_validator: ListValidator = PatternAllowListValidator(), + stat_name_handler: Callable[[str], str] | None = None, + statsd_influxdb_enabled: bool = False, ): self.otel: Callable = otel_provider self.prefix: str = prefix self.metrics_validator = metrics_validator self.meter = otel_provider.get_meter(__name__) self.metrics_map = MetricsMap(self.meter) + self.stat_name_handler = stat_name_handler + self.statsd_influxdb_enabled = statsd_influxdb_enabled def incr( self, @@ -371,17 +373,22 @@ def set_gauge_value(self, name: str, value: int | float, delta: bool, tags: Attr self.map[key].set_value(value, delta) -def get_otel_logger(cls) -> SafeOtelLogger: - host = conf.get("metrics", "otel_host") # ex: "breeze-otel-collector" - port = conf.getint("metrics", "otel_port") # ex: 4318 - prefix = conf.get("metrics", "otel_prefix") # ex: "airflow" - ssl_active = conf.getboolean("metrics", "otel_ssl_active") - # PeriodicExportingMetricReader will default to an interval of 60000 millis. - conf_interval = conf.getfloat("metrics", "otel_interval_milliseconds", fallback=None) # ex: 30000 - debug = conf.getboolean("metrics", "otel_debugging_on") - service_name = conf.get("metrics", "otel_service") - - resource = Resource.create(attributes={HOST_NAME: get_hostname(), SERVICE_NAME: service_name}) +def get_otel_logger( + cls, + *, + host: str | None = None, + port: int | None = None, + prefix: str | None = None, + ssl_active: bool = False, + conf_interval: float | None = None, + debug: bool = False, + service_name: str | None = None, + metrics_allow_list: str | None = None, + metrics_block_list: str | None = None, + stat_name_handler: Callable[[str], str] | None = None, + statsd_influxdb_enabled: bool = False, +) -> SafeOtelLogger: + resource = Resource.create(attributes={SERVICE_NAME: service_name}) protocol = "https" if ssl_active else "http" # Allow transparent support for standard OpenTelemetry SDK environment variables. # https://opentelemetry.io/docs/specs/otel/protocol/exporter/#configuration-options @@ -410,4 +417,8 @@ def get_otel_logger(cls) -> SafeOtelLogger: ), ) - return SafeOtelLogger(metrics.get_meter_provider(), prefix, get_validator()) + validator = get_validator(metrics_allow_list, metrics_block_list) + + return SafeOtelLogger( + metrics.get_meter_provider(), prefix, validator, stat_name_handler, statsd_influxdb_enabled + ) diff --git a/airflow-core/src/airflow/metrics/protocols.py b/shared/observability/src/airflow_shared/observability/metrics/protocols.py similarity index 100% rename from airflow-core/src/airflow/metrics/protocols.py rename to shared/observability/src/airflow_shared/observability/metrics/protocols.py diff --git a/airflow-core/src/airflow/metrics/statsd_logger.py b/shared/observability/src/airflow_shared/observability/metrics/statsd_logger.py similarity index 73% rename from airflow-core/src/airflow/metrics/statsd_logger.py rename to shared/observability/src/airflow_shared/observability/metrics/statsd_logger.py index d952693eb230d..535e0d3055e2b 100644 --- a/airflow-core/src/airflow/metrics/statsd_logger.py +++ b/shared/observability/src/airflow_shared/observability/metrics/statsd_logger.py @@ -22,10 +22,8 @@ from functools import wraps from typing import TYPE_CHECKING, TypeVar, cast -from airflow.configuration import conf -from airflow.exceptions import AirflowConfigException -from airflow.metrics.protocols import Timer -from airflow.metrics.validators import ( +from .protocols import Timer +from .validators import ( PatternAllowListValidator, PatternBlockListValidator, get_validator, @@ -35,10 +33,8 @@ if TYPE_CHECKING: from statsd import StatsClient - from airflow.metrics.protocols import DeltaType - from airflow.metrics.validators import ( - ListValidator, - ) + from .protocols import DeltaType + from .validators import ListValidator T = TypeVar("T", bound=Callable) @@ -74,11 +70,15 @@ def __init__( metrics_validator: ListValidator = PatternAllowListValidator(), influxdb_tags_enabled: bool = False, metric_tags_validator: ListValidator = PatternAllowListValidator(), + stat_name_handler: Callable[[str], str] | None = None, + statsd_influxdb_enabled: bool = False, ) -> None: self.statsd = statsd_client self.metrics_validator = metrics_validator self.influxdb_tags_enabled = influxdb_tags_enabled self.metric_tags_validator = metric_tags_validator + self.stat_name_handler = stat_name_handler + self.statsd_influxdb_enabled = statsd_influxdb_enabled @prepare_stat_with_tags @validate_stat @@ -155,33 +155,31 @@ def timer( return Timer() -def get_statsd_logger(cls) -> SafeStatsdLogger: +def get_statsd_logger( + cls, + *, + stats_class: type[StatsClient], + host: str | None = None, + port: int | None = None, + prefix: str | None = None, + ipv6: bool = False, + influxdb_tags_enabled: bool = False, + statsd_disabled_tags: str | None = None, + metrics_allow_list: str | None = None, + metrics_block_list: str | None = None, + stat_name_handler: Callable[[str], str] | None = None, + statsd_influxdb_enabled: bool = False, +) -> SafeStatsdLogger: """Return logger for StatsD.""" - # no need to check for the scheduler/statsd_on -> this method is only called when it is set - # and previously it would crash with None is callable if it was called without it. - from statsd import StatsClient - - stats_class = conf.getimport("metrics", "statsd_custom_client_path", fallback=None) - if stats_class: - if not issubclass(stats_class, StatsClient): - raise AirflowConfigException( - "Your custom StatsD client must extend the statsd.StatsClient in order to ensure " - "backwards compatibility." - ) - log.info("Successfully loaded custom StatsD client") - - else: - stats_class = StatsClient - - statsd = stats_class( - host=conf.get("metrics", "statsd_host"), - port=conf.getint("metrics", "statsd_port"), - prefix=conf.get("metrics", "statsd_prefix"), - ipv6=conf.getboolean("metrics", "statsd_ipv6", fallback=False), - ) - - influxdb_tags_enabled = conf.getboolean("metrics", "statsd_influxdb_enabled", fallback=False) - metric_tags_validator = PatternBlockListValidator( - conf.get("metrics", "statsd_disabled_tags", fallback=None) + statsd = stats_class(host, port, prefix, ipv6) + + metric_tags_validator = PatternBlockListValidator(statsd_disabled_tags) + validator = get_validator(metrics_allow_list, metrics_block_list) + return SafeStatsdLogger( + statsd, + validator, + influxdb_tags_enabled, + metric_tags_validator, + stat_name_handler, + statsd_influxdb_enabled, ) - return SafeStatsdLogger(statsd, get_validator(), influxdb_tags_enabled, metric_tags_validator) diff --git a/airflow-core/src/airflow/metrics/validators.py b/shared/observability/src/airflow_shared/observability/metrics/validators.py similarity index 91% rename from airflow-core/src/airflow/metrics/validators.py rename to shared/observability/src/airflow_shared/observability/metrics/validators.py index 252524455e022..3d9d3d3d3e5ab 100644 --- a/airflow-core/src/airflow/metrics/validators.py +++ b/shared/observability/src/airflow_shared/observability/metrics/validators.py @@ -29,8 +29,7 @@ from re import Pattern from typing import cast -from airflow.configuration import conf -from airflow.exceptions import InvalidStatsNameException +from ..exceptions import InvalidStatsNameException log = logging.getLogger(__name__) @@ -87,23 +86,26 @@ class MetricNameLengthExemptionWarning(Warning): DEFAULT_VALIDATOR_TYPE = "allow" -def get_validator() -> ListValidator: +def get_validator( + metrics_allow_list: str | None = None, + metrics_block_list: str | None = None, +) -> ListValidator: validators = { "allow": PatternAllowListValidator, "block": PatternBlockListValidator, } metric_lists = { - "allow": (metric_allow_list := conf.get("metrics", "metrics_allow_list", fallback=None)), - "block": (metric_block_list := conf.get("metrics", "metrics_block_list", fallback=None)), + "allow": metrics_allow_list, + "block": metrics_block_list, } - if metric_allow_list: + if metrics_allow_list: list_type = "allow" - if metric_block_list: + if metrics_block_list: log.warning( "Ignoring metrics_block_list as both metrics_allow_list and metrics_block_list have been set." ) - elif metric_block_list: + elif metrics_block_list: list_type = "block" else: list_type = DEFAULT_VALIDATOR_TYPE @@ -118,7 +120,9 @@ def validate_stat(fn: Callable) -> Callable: def wrapper(self, stat: str | None = None, *args, **kwargs) -> Callable | None: try: if stat is not None: - handler_stat_name_func = get_current_handler_stat_name_func() + handler_stat_name_func = get_current_handler_stat_name_func( + self.stat_name_handler, self.statsd_influxdb_enabled + ) stat = handler_stat_name_func(stat) return fn(self, stat, *args, **kwargs) except InvalidStatsNameException: @@ -214,15 +218,19 @@ def stat_name_default_handler( return stat_name -def get_current_handler_stat_name_func() -> Callable[[str], str]: +def get_current_handler_stat_name_func( + stat_name_handler: Callable[[str], str] | None = None, + statsd_influxdb_enabled: bool = False, +) -> Callable[[str], str]: """Get Stat Name Handler from airflow.cfg.""" - handler = conf.getimport("metrics", "stat_name_handler") - if handler is None: - if conf.get("metrics", "statsd_influxdb_enabled", fallback=False): - handler = partial(stat_name_default_handler, allowed_chars={*ALLOWED_CHARACTERS, ",", "="}) + if stat_name_handler is None: + if statsd_influxdb_enabled: + stat_name_handler = partial( + stat_name_default_handler, allowed_chars={*ALLOWED_CHARACTERS, ",", "="} + ) else: - handler = stat_name_default_handler - return handler + stat_name_handler = stat_name_default_handler + return stat_name_handler class ListValidator(metaclass=abc.ABCMeta): diff --git a/airflow-core/src/airflow/traces/__init__.py b/shared/observability/src/airflow_shared/observability/traces/__init__.py similarity index 100% rename from airflow-core/src/airflow/traces/__init__.py rename to shared/observability/src/airflow_shared/observability/traces/__init__.py diff --git a/airflow-core/src/airflow/traces/tracer.py b/shared/observability/src/airflow_shared/observability/traces/base_tracer.py similarity index 60% rename from airflow-core/src/airflow/traces/tracer.py rename to shared/observability/src/airflow_shared/observability/traces/base_tracer.py index 327516a063817..87766df528e60 100644 --- a/airflow-core/src/airflow/traces/tracer.py +++ b/shared/observability/src/airflow_shared/observability/traces/base_tracer.py @@ -17,49 +17,30 @@ # under the License. from __future__ import annotations -import logging -import socket -from collections.abc import Callable -from functools import wraps from typing import TYPE_CHECKING, Any, Protocol -from airflow.configuration import conf +import structlog if TYPE_CHECKING: from airflow.typing_compat import Self -log = logging.getLogger(__name__) +log = structlog.getLogger(__name__) def gen_context(trace_id, span_id): """Generate span context from trace_id and span_id.""" - from airflow.traces.otel_tracer import gen_context as otel_gen_context + from .otel_tracer import gen_context as otel_gen_context return otel_gen_context(trace_id, span_id) def gen_links_from_kv_list(list): """Generate links from kv list of {trace_id:int, span_id:int}.""" - from airflow.traces.otel_tracer import gen_links_from_kv_list + from .otel_tracer import gen_links_from_kv_list return gen_links_from_kv_list(list) -def add_debug_span(func): - """Decorate a function with span.""" - func_name = func.__name__ - qual_name = func.__qualname__ - module_name = func.__module__ - component = qual_name.rsplit(".", 1)[0] if "." in qual_name else module_name - - @wraps(func) - def wrapper(*args, **kwargs): - with DebugTrace.start_span(span_name=func_name, component=component): - return func(*args, **kwargs) - - return wrapper - - class EmptyContext: """If no Tracer is configured, EmptyContext is used as a fallback.""" @@ -249,82 +230,3 @@ def inject(cls): def extract(cls, carrier) -> EmptyContext: """Extract the span context from a provided carrier.""" return EMPTY_CTX - - -class _TraceMeta(type): - factory: Callable[[], Tracer] | None = None - instance: Tracer | EmptyTrace | None = None - - def __new__(cls, name, bases, attrs): - # Read the debug flag from the class body. - if "check_debug_traces_flag" not in attrs: - raise TypeError(f"Class '{name}' must define 'check_debug_traces_flag'.") - - return super().__new__(cls, name, bases, attrs) - - def __getattr__(cls, name: str): - if not cls.factory: - # Lazy initialization of the factory - cls.configure_factory() - if not cls.instance: - cls._initialize_instance() - return getattr(cls.instance, name) - - def _initialize_instance(cls): - """Initialize the trace instance.""" - try: - cls.instance = cls.factory() - except (socket.gaierror, ImportError) as e: - log.error("Could not configure Trace: %s. Using EmptyTrace instead.", e) - cls.instance = EmptyTrace() - - def __call__(cls, *args, **kwargs): - """Ensure the class behaves as a singleton.""" - if not cls.instance: - cls._initialize_instance() - return cls.instance - - def configure_factory(cls): - """Configure the trace factory based on settings.""" - otel_on = conf.getboolean("traces", "otel_on") - - if cls.check_debug_traces_flag: - debug_traces_on = conf.getboolean("traces", "otel_debug_traces_on") - else: - # Set to true so that it will be ignored during the evaluation for the factory instance. - # If this is true, then (otel_on and debug_traces_on) will always evaluate to - # whatever value 'otel_on' has and therefore it will be ignored. - debug_traces_on = True - - if otel_on and debug_traces_on: - from airflow.traces import otel_tracer - - cls.factory = staticmethod( - lambda use_simple_processor=False: otel_tracer.get_otel_tracer(cls, use_simple_processor) - ) - else: - # EmptyTrace is a class and not inherently callable. - # Using a lambda ensures it can be invoked as a callable factory. - # staticmethod ensures the lambda is treated as a standalone function - # and avoids passing `cls` as an implicit argument. - cls.factory = staticmethod(lambda: EmptyTrace()) - - def get_constant_tags(cls) -> str | None: - """Get constant tags to add to all traces.""" - return conf.get("traces", "tags", fallback=None) - - -if TYPE_CHECKING: - Trace: EmptyTrace - DebugTrace: EmptyTrace -else: - - class Trace(metaclass=_TraceMeta): - """Empty class for Trace - we use metaclass to inject the right one.""" - - check_debug_traces_flag = False - - class DebugTrace(metaclass=_TraceMeta): - """Empty class for Trace and in case the debug traces flag is enabled.""" - - check_debug_traces_flag = True diff --git a/airflow-core/src/airflow/traces/otel_tracer.py b/shared/observability/src/airflow_shared/observability/traces/otel_tracer.py similarity index 92% rename from airflow-core/src/airflow/traces/otel_tracer.py rename to shared/observability/src/airflow_shared/observability/traces/otel_tracer.py index aa1da87f6cec8..ff859e168a228 100644 --- a/airflow-core/src/airflow/traces/otel_tracer.py +++ b/shared/observability/src/airflow_shared/observability/traces/otel_tracer.py @@ -23,10 +23,11 @@ from contextlib import AbstractContextManager from typing import TYPE_CHECKING +import pendulum from opentelemetry import trace from opentelemetry.context import attach, create_key from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource +from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import Span, SpanProcessor, Tracer as OpenTelemetryTracer, TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter, SimpleSpanProcessor from opentelemetry.sdk.trace.id_generator import IdGenerator @@ -34,14 +35,11 @@ from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from opentelemetry.trace.span import INVALID_SPAN_ID, INVALID_TRACE_ID -from airflow._shared.timezones import timezone -from airflow.configuration import conf -from airflow.traces.utils import ( +from .utils import ( + datetime_to_nano, parse_traceparent, parse_tracestate, ) -from airflow.utils.dates import datetime_to_nano -from airflow.utils.net import get_hostname if TYPE_CHECKING: from opentelemetry.context.context import Context @@ -63,6 +61,8 @@ def __init__( span_exporter: OTLPSpanExporter, use_simple_processor: bool, tag_string: str | None = None, + otel_service: str | None = None, + debug: bool = False, ): self.span_exporter = span_exporter self.use_simple_processor = use_simple_processor @@ -77,13 +77,14 @@ def __init__( log.info("(otel_tracer.__init__) - [BatchSpanProcessor] is being used") self.span_processor = BatchSpanProcessor(self.span_exporter) self.tag_string = tag_string - self.otel_service = conf.get("traces", "otel_service") - self.resource = Resource.create( - attributes={HOST_NAME: get_hostname(), SERVICE_NAME: self.otel_service} - ) + self.otel_service = otel_service + self.resource = Resource.create(attributes={SERVICE_NAME: self.otel_service}) + self.debug = debug def get_otel_tracer_provider( - self, trace_id: int | None = None, span_id: int | None = None + self, + trace_id: int | None = None, + span_id: int | None = None, ) -> TracerProvider: """ Tracer that will use special AirflowOtelIdGenerator to control producing certain span and trace id. @@ -98,8 +99,7 @@ def get_otel_tracer_provider( ) else: tracer_provider = TracerProvider(resource=self.resource) - debug = conf.getboolean("traces", "otel_debugging_on") - if debug is True: + if self.debug is True: log.info("[ConsoleSpanExporter] is being used") if self.use_simple_processor: log.info("[SimpleSpanProcessor] is being used") @@ -114,7 +114,10 @@ def get_otel_tracer_provider( return tracer_provider def get_tracer( - self, component: str, trace_id: int | None = None, span_id: int | None = None + self, + component: str, + trace_id: int | None = None, + span_id: int | None = None, ) -> OpenTelemetryTracer | Tracer: tracer_provider = self.get_otel_tracer_provider(trace_id=trace_id, span_id=span_id) tracer = tracer_provider.get_tracer(component) @@ -256,7 +259,7 @@ def _new_span( tracer = self.get_tracer(component=component) if start_time is None: - start_time = timezone.utcnow() + start_time = pendulum.now(tz=pendulum.UTC) if links is None: links = [] @@ -326,11 +329,17 @@ def gen_link_from_traceparent(traceparent: str): return Link(context=span_ctx, attributes={"meta.annotation_type": "link", "from": "traceparent"}) -def get_otel_tracer(cls, use_simple_processor: bool = False) -> OtelTrace: +def get_otel_tracer( + cls, + use_simple_processor: bool = False, + *, + host: str | None = None, + port: int | None = None, + ssl_active: bool = False, + otel_service: str | None = None, + debug: bool = False, +) -> OtelTrace: """Get OTEL tracer from airflow configuration.""" - host = conf.get("traces", "otel_host") - port = conf.getint("traces", "otel_port") - ssl_active = conf.getboolean("traces", "otel_ssl_active") tag_string = cls.get_constant_tags() protocol = "https" if ssl_active else "http" @@ -344,13 +353,11 @@ def get_otel_tracer(cls, use_simple_processor: bool = False) -> OtelTrace: span_exporter=OTLPSpanExporter(endpoint=endpoint), use_simple_processor=use_simple_processor, tag_string=tag_string, + otel_service=otel_service, + debug=debug, ) -def get_otel_tracer_for_task(cls) -> OtelTrace: - return get_otel_tracer(cls, use_simple_processor=True) - - class AirflowOtelIdGenerator(IdGenerator): """ ID Generator for span id and trace id. diff --git a/shared/observability/src/airflow_shared/observability/traces/utils.py b/shared/observability/src/airflow_shared/observability/traces/utils.py new file mode 100644 index 0000000000000..2a75051c504af --- /dev/null +++ b/shared/observability/src/airflow_shared/observability/traces/utils.py @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import calendar +import logging + +TRACE_ID = 0 +SPAN_ID = 16 + +log = logging.getLogger(__name__) + + +def datetime_to_nano(datetime) -> int | None: + """Convert datetime to nanoseconds.""" + if datetime: + if datetime.tzinfo is None: + # There is no timezone info, handle it the same as UTC. + timestamp = calendar.timegm(datetime.timetuple()) + datetime.microsecond / 1e6 + else: + # The datetime is timezone-aware. Use timestamp directly. + timestamp = datetime.timestamp() + return int(timestamp * 1e9) + return None + + +def parse_traceparent(traceparent_str: str | None = None) -> dict: + """Parse traceparent string: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01.""" + if traceparent_str is None: + return {} + tokens = traceparent_str.split("-") + if len(tokens) != 4: + raise ValueError("The traceparent string does not have the correct format.") + return {"version": tokens[0], "trace_id": tokens[1], "parent_id": tokens[2], "flags": tokens[3]} + + +def parse_tracestate(tracestate_str: str | None = None) -> dict: + """Parse tracestate string: rojo=00f067aa0ba902b7,congo=t61rcWkgMzE.""" + if tracestate_str is None or len(tracestate_str) == 0: + return {} + tokens = tracestate_str.split(",") + result = {} + for pair in tokens: + if "=" in pair: + key, value = pair.split("=") + result[key.strip()] = value.strip() + return result diff --git a/shared/observability/tests/conftest.py b/shared/observability/tests/conftest.py new file mode 100644 index 0000000000000..93aecf261843a --- /dev/null +++ b/shared/observability/tests/conftest.py @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os + +os.environ["_AIRFLOW__AS_LIBRARY"] = "true" diff --git a/shared/observability/tests/observability/__init__.py b/shared/observability/tests/observability/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/shared/observability/tests/observability/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/shared/observability/tests/observability/metrics/__init__.py b/shared/observability/tests/observability/metrics/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/shared/observability/tests/observability/metrics/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow-core/tests/unit/core/test_otel_logger.py b/shared/observability/tests/observability/metrics/test_otel_logger.py similarity index 97% rename from airflow-core/tests/unit/core/test_otel_logger.py rename to shared/observability/tests/observability/metrics/test_otel_logger.py index 0796bf8cf3c8d..22578392c4bae 100644 --- a/airflow-core/tests/unit/core/test_otel_logger.py +++ b/shared/observability/tests/observability/metrics/test_otel_logger.py @@ -23,8 +23,8 @@ import pytest from opentelemetry.metrics import MeterProvider -from airflow.exceptions import InvalidStatsNameException -from airflow.metrics.otel_logger import ( +from airflow_shared.observability.exceptions import InvalidStatsNameException +from airflow_shared.observability.metrics.otel_logger import ( OTEL_NAME_MAX_LENGTH, UP_DOWN_COUNTERS, MetricsMap, @@ -33,7 +33,10 @@ _is_up_down_counter, full_name, ) -from airflow.metrics.validators import BACK_COMPAT_METRIC_NAMES, MetricNameLengthExemptionWarning +from airflow_shared.observability.metrics.validators import ( + BACK_COMPAT_METRIC_NAMES, + MetricNameLengthExemptionWarning, +) INVALID_STAT_NAME_CASES = [ (None, "can not be None"), diff --git a/airflow-core/tests/unit/utils/test_otel_utils.py b/shared/observability/tests/observability/test_otel_utils.py similarity index 98% rename from airflow-core/tests/unit/utils/test_otel_utils.py rename to shared/observability/tests/observability/test_otel_utils.py index 9711db804c71e..5152a74c430ed 100644 --- a/airflow-core/tests/unit/utils/test_otel_utils.py +++ b/shared/observability/tests/observability/test_otel_utils.py @@ -423,11 +423,11 @@ class TestUtilsUnit: """ example_task_output = r""" -{"timestamp":"2025-03-31T18:03:17.087597","level":"info","event":"[SimpleSpanProcessor] is being used","logger":"airflow.traces.otel_tracer"} +{"timestamp":"2025-03-31T18:03:17.087597","level":"info","event":"[SimpleSpanProcessor] is being used","logger":"airflow_shared.observability.traces.otel_tracer"} {"timestamp":"2025-03-31T18:03:17.087693","level":"info","event":"From task sub_span2.","logger":"airflow.otel_test_dag"} {"timestamp":"2025-03-31T18:03:17.087763","level":"info","event":"From task sub_span3.","logger":"airflow.otel_test_dag"} -{"timestamp":"2025-03-31T18:03:17.088075","level":"info","event":"[ConsoleSpanExporter] is being used","logger":"airflow.traces.otel_tracer"} -{"timestamp":"2025-03-31T18:03:17.088105","level":"info","event":"[SimpleSpanProcessor] is being used","logger":"airflow.traces.otel_tracer"} +{"timestamp":"2025-03-31T18:03:17.088075","level":"info","event":"[ConsoleSpanExporter] is being used","logger":"airflow_shared.observability.traces.otel_tracer"} +{"timestamp":"2025-03-31T18:03:17.088105","level":"info","event":"[SimpleSpanProcessor] is being used","logger":"airflow_shared.observability.traces.otel_tracer"} {"timestamp":"2025-04-01T17:20:08.687523Z","level":"info","event":"{","chan":"stdout","logger":"task"} {"timestamp":"2025-04-01T17:20:08.687579Z","level":"info","event":" \"name\": \"task1_sub_span1\",","chan":"stdout","logger":"task"} {"timestamp":"2025-04-01T17:20:08.687629Z","level":"info","event":" \"context\": {","chan":"stdout","logger":"task"} @@ -505,7 +505,7 @@ class TestUtilsUnit: {"timestamp":"2025-04-01T17:20:08.691483Z","level":"info","event":" \"attributes\": {","chan":"stdout","logger":"task"} {"timestamp":"2025-04-01T17:20:08.691538Z","level":"info","event":" \"telemetry.sdk.language\": \"python\",","chan":"stdout","logger":"task"} {"timestamp":"2025-04-01T17:20:08.691591Z","level":"info","event":" \"telemetry.sdk.name\": \"opentelemetry\",","chan":"stdout","logger":"task"} -{"timestamp":"2025-04-01T17:20:08.678865","level":"info","event":"[ConsoleSpanExporter] is being used","logger":"airflow.traces.otel_tracer"}""" +{"timestamp":"2025-04-01T17:20:08.678865","level":"info","event":"[ConsoleSpanExporter] is being used","logger":"airflow_shared.observability.traces.otel_tracer"}""" example_task_output_after_processing = r""" [SimpleSpanProcessor] is being used diff --git a/task-sdk/docs/api.rst b/task-sdk/docs/api.rst index 5e51c77aceb0e..bcf9250ecc52a 100644 --- a/task-sdk/docs/api.rst +++ b/task-sdk/docs/api.rst @@ -158,6 +158,10 @@ Execution Time Components .. autofunction:: airflow.sdk.log.mask_secret +Observability +------------- +.. autoclass:: airflow.sdk.Trace + Everything else --------------- diff --git a/task-sdk/pyproject.toml b/task-sdk/pyproject.toml index 8c9204946a5e2..97e299fa16400 100644 --- a/task-sdk/pyproject.toml +++ b/task-sdk/pyproject.toml @@ -83,7 +83,18 @@ dependencies = [ "sentry" = [ "sentry-sdk>=2.30.0", ] -"all" = ["apache-airflow-task-sdk[sentry]"] +"otel" = [ + "opentelemetry-api>=1.27.0", + "opentelemetry-exporter-otlp>=1.27.0", + "opentelemetry-proto<9999,>=1.27.0", +] +"statsd" = [ + "statsd>=3.3.0", +] +"datadog" = [ + "datadog>=0.50.0", +] +"all" = ["apache-airflow-task-sdk[sentry,otel,statsd,datadog]"] [project.urls] "Bug Tracker" = "https://github.com/apache/airflow/issues" @@ -109,6 +120,7 @@ path = "src/airflow/sdk/__init__.py" "../shared/secrets_masker/src/airflow_shared/secrets_masker" = "src/airflow/sdk/_shared/secrets_masker" "../shared/timezones/src/airflow_shared/timezones" = "src/airflow/sdk/_shared/timezones" "../shared/secrets_backend/src/airflow_shared/secrets_backend" = "src/airflow/sdk/_shared/secrets_backend" +"../shared/observability/src/airflow_shared/observability" = "src/airflow/_shared/observability" [tool.hatch.build.targets.wheel] packages = ["src/airflow"] @@ -255,4 +267,5 @@ shared_distributions = [ "apache-airflow-shared-secrets-backend", "apache-airflow-shared-secrets-masker", "apache-airflow-shared-timezones", + "apache-airflow-shared-observability", ] diff --git a/task-sdk/src/airflow/sdk/__init__.py b/task-sdk/src/airflow/sdk/__init__.py index 49b09727c0a8e..c0b892327a403 100644 --- a/task-sdk/src/airflow/sdk/__init__.py +++ b/task-sdk/src/airflow/sdk/__init__.py @@ -49,6 +49,7 @@ "PokeReturnValue", "TaskGroup", "TaskInstanceState", + "Trace", "TriggerRule", "Variable", "WeightRule", @@ -69,6 +70,8 @@ __version__ = "1.2.0" +from airflow.sdk.observability.trace import Trace + if TYPE_CHECKING: from airflow.sdk.api.datamodels._generated import DagRunState, TaskInstanceState, TriggerRule, WeightRule from airflow.sdk.bases.hook import BaseHook @@ -134,6 +137,7 @@ "SecretCache": ".execution_time.cache", "TaskGroup": ".definitions.taskgroup", "TaskInstanceState": ".api.datamodels._generated", + "Trace": ".observability.trace", "TriggerRule": ".api.datamodels._generated", "Variable": ".definitions.variable", "WeightRule": ".api.datamodels._generated", diff --git a/task-sdk/src/airflow/sdk/_shared/observability b/task-sdk/src/airflow/sdk/_shared/observability new file mode 120000 index 0000000000000..a31cf64c72407 --- /dev/null +++ b/task-sdk/src/airflow/sdk/_shared/observability @@ -0,0 +1 @@ +../../../../../shared/observability/src/airflow_shared/observability \ No newline at end of file diff --git a/task-sdk/src/airflow/sdk/configuration.py b/task-sdk/src/airflow/sdk/configuration.py index a9f3d09f58486..6ff386f5c53dd 100644 --- a/task-sdk/src/airflow/sdk/configuration.py +++ b/task-sdk/src/airflow/sdk/configuration.py @@ -26,8 +26,8 @@ from io import StringIO from typing import Any -from airflow._shared.configuration.parser import AirflowConfigParser as _SharedAirflowConfigParser from airflow.sdk import yaml +from airflow.sdk._shared.configuration.parser import AirflowConfigParser as _SharedAirflowConfigParser from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH log = logging.getLogger(__name__) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index aa08a8d065f6f..08c7d7da6d5f5 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -115,8 +115,8 @@ ) from airflow.sdk.execution_time.sentry import Sentry from airflow.sdk.execution_time.xcom import XCom +from airflow.sdk.observability.stats import Stats from airflow.sdk.timezone import coerce_datetime -from airflow.stats import Stats if TYPE_CHECKING: import jinja2 diff --git a/task-sdk/src/airflow/sdk/io/fs.py b/task-sdk/src/airflow/sdk/io/fs.py index a49d7c6ffcc53..c9184bc1967f5 100644 --- a/task-sdk/src/airflow/sdk/io/fs.py +++ b/task-sdk/src/airflow/sdk/io/fs.py @@ -26,7 +26,7 @@ from airflow.providers_manager import ProvidersManager from airflow.sdk.module_loading import import_string -from airflow.stats import Stats +from airflow.sdk.observability.stats import Stats if TYPE_CHECKING: from fsspec import AbstractFileSystem diff --git a/task-sdk/src/airflow/sdk/observability/__init__.py b/task-sdk/src/airflow/sdk/observability/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/task-sdk/src/airflow/sdk/observability/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/task-sdk/src/airflow/sdk/observability/metrics/__init__.py b/task-sdk/src/airflow/sdk/observability/metrics/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/task-sdk/src/airflow/sdk/observability/metrics/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/task-sdk/src/airflow/sdk/observability/metrics/datadog_logger.py b/task-sdk/src/airflow/sdk/observability/metrics/datadog_logger.py new file mode 100644 index 0000000000000..6e8590b09d567 --- /dev/null +++ b/task-sdk/src/airflow/sdk/observability/metrics/datadog_logger.py @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +from airflow.sdk._shared.observability.metrics import datadog_logger +from airflow.sdk.configuration import conf + +if TYPE_CHECKING: + from airflow.sdk._shared.observability.metrics.datadog_logger import SafeDogStatsdLogger + + +def get_dogstatsd_logger(cls) -> SafeDogStatsdLogger: + return datadog_logger.get_dogstatsd_logger( + cls, + host=conf.get("metrics", "statsd_host"), + port=conf.getint("metrics", "statsd_port"), + namespace=conf.get("metrics", "statsd_prefix"), + datadog_metrics_tags=conf.getboolean("metrics", "statsd_datadog_metrics_tags", fallback=True), + statsd_disabled_tags=conf.get("metrics", "statsd_disabled_tags", fallback=None), + metrics_allow_list=conf.get("metrics", "metrics_allow_list", fallback=None), + metrics_block_list=conf.get("metrics", "metrics_block_list", fallback=None), + stat_name_handler=conf.getimport("metrics", "stat_name_handler"), + statsd_influxdb_enabled=conf.getboolean("metrics", "statsd_influxdb_enabled", fallback=False), + ) diff --git a/task-sdk/src/airflow/sdk/observability/metrics/otel_logger.py b/task-sdk/src/airflow/sdk/observability/metrics/otel_logger.py new file mode 100644 index 0000000000000..a204b607e6a80 --- /dev/null +++ b/task-sdk/src/airflow/sdk/observability/metrics/otel_logger.py @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +from airflow.sdk._shared.observability.metrics import otel_logger +from airflow.sdk.configuration import conf + +if TYPE_CHECKING: + from airflow.sdk._shared.observability.metrics.otel_logger import SafeOtelLogger + + +def get_otel_logger(cls) -> SafeOtelLogger: + return otel_logger.get_otel_logger( + cls, + host=conf.get("metrics", "otel_host"), # ex: "breeze-otel-collector" + port=conf.getint("metrics", "otel_port"), # ex: 4318 + prefix=conf.get("metrics", "otel_prefix"), # ex: "airflow" + ssl_active=conf.getboolean("metrics", "otel_ssl_active"), + # PeriodicExportingMetricReader will default to an interval of 60000 millis. + conf_interval=conf.getfloat("metrics", "otel_interval_milliseconds", fallback=None), # ex: 30000 + debug=conf.getboolean("metrics", "otel_debugging_on"), + service_name=conf.get("metrics", "otel_service"), + metrics_allow_list=conf.get("metrics", "metrics_allow_list", fallback=None), + metrics_block_list=conf.get("metrics", "metrics_block_list", fallback=None), + stat_name_handler=conf.getimport("metrics", "stat_name_handler"), + statsd_influxdb_enabled=conf.getboolean("metrics", "statsd_influxdb_enabled", fallback=False), + ) diff --git a/task-sdk/src/airflow/sdk/observability/metrics/statsd_logger.py b/task-sdk/src/airflow/sdk/observability/metrics/statsd_logger.py new file mode 100644 index 0000000000000..fe976fbbc66f1 --- /dev/null +++ b/task-sdk/src/airflow/sdk/observability/metrics/statsd_logger.py @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +from airflow.sdk._shared.configuration import AirflowConfigException +from airflow.sdk._shared.observability.metrics import statsd_logger +from airflow.sdk.configuration import conf + +if TYPE_CHECKING: + from airflow.sdk._shared.observability.metrics.statsd_logger import SafeStatsdLogger + +log = logging.getLogger(__name__) + + +def get_statsd_logger(cls) -> SafeStatsdLogger: + stats_class = conf.getimport("metrics", "statsd_custom_client_path", fallback=None) + + # no need to check for the scheduler/statsd_on -> this method is only called when it is set + # and previously it would crash with None is callable if it was called without it. + from statsd import StatsClient + + if stats_class: + if not issubclass(stats_class, StatsClient): + raise AirflowConfigException( + "Your custom StatsD client must extend the statsd.StatsClient in order to ensure " + "backwards compatibility." + ) + log.info("Successfully loaded custom StatsD client") + + else: + stats_class = StatsClient + + return statsd_logger.get_statsd_logger( + cls, + stats_class=stats_class, + host=conf.get("metrics", "statsd_host"), + port=conf.getint("metrics", "statsd_port"), + prefix=conf.get("metrics", "statsd_prefix"), + ipv6=conf.getboolean("metrics", "statsd_ipv6", fallback=False), + influxdb_tags_enabled=conf.getboolean("metrics", "statsd_influxdb_enabled", fallback=False), + statsd_disabled_tags=conf.get("metrics", "statsd_disabled_tags", fallback=None), + metrics_allow_list=conf.get("metrics", "metrics_allow_list", fallback=None), + metrics_block_list=conf.get("metrics", "metrics_block_list", fallback=None), + stat_name_handler=conf.getimport("metrics", "stat_name_handler"), + statsd_influxdb_enabled=conf.getboolean("metrics", "statsd_influxdb_enabled", fallback=False), + ) diff --git a/task-sdk/src/airflow/sdk/observability/stats.py b/task-sdk/src/airflow/sdk/observability/stats.py new file mode 100644 index 0000000000000..bd6fb2d2afaaa --- /dev/null +++ b/task-sdk/src/airflow/sdk/observability/stats.py @@ -0,0 +1,80 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +import socket +from collections.abc import Callable +from typing import TYPE_CHECKING + +from airflow.sdk._shared.observability.metrics.base_stats_logger import NoStatsLogger +from airflow.sdk.configuration import conf + +if TYPE_CHECKING: + from airflow.sdk._shared.observability.metrics.base_stats_logger import StatsLogger + +log = logging.getLogger(__name__) + + +class _Stats(type): + factory: Callable + instance: StatsLogger | NoStatsLogger | None = None + + def __getattr__(cls, name: str) -> str: + if not cls.instance: + try: + cls.instance = cls.factory() + except (socket.gaierror, ImportError) as e: + log.error("Could not configure StatsClient: %s, using NoStatsLogger instead.", e) + cls.instance = NoStatsLogger() + return getattr(cls.instance, name) + + def __init__(cls, *args, **kwargs) -> None: + super().__init__(cls) + if not hasattr(cls.__class__, "factory"): + is_datadog_enabled_defined = conf.has_option("metrics", "statsd_datadog_enabled") + if is_datadog_enabled_defined and conf.getboolean("metrics", "statsd_datadog_enabled"): + from airflow.sdk.observability.metrics import datadog_logger + + cls.__class__.factory = datadog_logger.get_dogstatsd_logger + elif conf.getboolean("metrics", "statsd_on"): + from airflow.sdk.observability.metrics import statsd_logger + + cls.__class__.factory = statsd_logger.get_statsd_logger + elif conf.getboolean("metrics", "otel_on"): + from airflow.sdk.observability.metrics import otel_logger + + cls.__class__.factory = otel_logger.get_otel_logger + else: + cls.__class__.factory = NoStatsLogger + + @classmethod + def get_constant_tags(cls) -> list[str]: + """Get constant DataDog tags to add to all stats.""" + tags_in_string = conf.get("metrics", "statsd_datadog_tags", fallback=None) + if not tags_in_string: + return [] + return tags_in_string.split(",") + + +if TYPE_CHECKING: + Stats: StatsLogger +else: + + class Stats(metaclass=_Stats): + """Empty class for Stats - we use metaclass to inject the right one.""" diff --git a/task-sdk/src/airflow/sdk/observability/trace.py b/task-sdk/src/airflow/sdk/observability/trace.py new file mode 100644 index 0000000000000..7f3d44710592d --- /dev/null +++ b/task-sdk/src/airflow/sdk/observability/trace.py @@ -0,0 +1,122 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +from collections.abc import Callable +from functools import wraps +from socket import socket +from typing import TYPE_CHECKING + +from airflow.sdk._shared.observability.traces.base_tracer import EmptyTrace, Tracer +from airflow.sdk.configuration import conf + +log = logging.getLogger(__name__) + + +def add_debug_span(func): + """Decorate a function with span.""" + func_name = func.__name__ + qual_name = func.__qualname__ + module_name = func.__module__ + component = qual_name.rsplit(".", 1)[0] if "." in qual_name else module_name + + @wraps(func) + def wrapper(*args, **kwargs): + with DebugTrace.start_span(span_name=func_name, component=component): + return func(*args, **kwargs) + + return wrapper + + +class _TraceMeta(type): + factory: Callable[[], Tracer] | None = None + instance: Tracer | EmptyTrace | None = None + + def __new__(cls, name, bases, attrs): + # Read the debug flag from the class body. + if "check_debug_traces_flag" not in attrs: + raise TypeError(f"Class '{name}' must define 'check_debug_traces_flag'.") + + return super().__new__(cls, name, bases, attrs) + + def __getattr__(cls, name: str): + if not cls.factory: + # Lazy initialization of the factory + cls.configure_factory() + if not cls.instance: + cls._initialize_instance() + return getattr(cls.instance, name) + + def _initialize_instance(cls): + """Initialize the trace instance.""" + try: + cls.instance = cls.factory() + except (socket.gaierror, ImportError) as e: + log.error("Could not configure Trace: %s. Using EmptyTrace instead.", e) + cls.instance = EmptyTrace() + + def __call__(cls, *args, **kwargs): + """Ensure the class behaves as a singleton.""" + if not cls.instance: + cls._initialize_instance() + return cls.instance + + def configure_factory(cls): + """Configure the trace factory based on settings.""" + otel_on = conf.getboolean("traces", "otel_on") + + if cls.check_debug_traces_flag: + debug_traces_on = conf.getboolean("traces", "otel_debug_traces_on") + else: + # Set to true so that it will be ignored during the evaluation for the factory instance. + # If this is true, then (otel_on and debug_traces_on) will always evaluate to + # whatever value 'otel_on' has and therefore it will be ignored. + debug_traces_on = True + + if otel_on and debug_traces_on: + from airflow.sdk.observability.traces import otel_tracer + + cls.factory = staticmethod( + lambda use_simple_processor=False: otel_tracer.get_otel_tracer(cls, use_simple_processor) + ) + else: + # EmptyTrace is a class and not inherently callable. + # Using a lambda ensures it can be invoked as a callable factory. + # staticmethod ensures the lambda is treated as a standalone function + # and avoids passing `cls` as an implicit argument. + cls.factory = staticmethod(lambda: EmptyTrace()) + + def get_constant_tags(cls) -> str | None: + """Get constant tags to add to all traces.""" + return conf.get("traces", "tags", fallback=None) + + +if TYPE_CHECKING: + Trace: EmptyTrace + DebugTrace: EmptyTrace +else: + + class Trace(metaclass=_TraceMeta): + """Empty class for Trace - we use metaclass to inject the right one.""" + + check_debug_traces_flag = False + + class DebugTrace(metaclass=_TraceMeta): + """Empty class for Trace and in case the debug traces flag is enabled.""" + + check_debug_traces_flag = True diff --git a/task-sdk/src/airflow/sdk/observability/traces/__init__.py b/task-sdk/src/airflow/sdk/observability/traces/__init__.py new file mode 100644 index 0000000000000..217e5db960782 --- /dev/null +++ b/task-sdk/src/airflow/sdk/observability/traces/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/task-sdk/src/airflow/sdk/observability/traces/otel_tracer.py b/task-sdk/src/airflow/sdk/observability/traces/otel_tracer.py new file mode 100644 index 0000000000000..ef5a3542c62dd --- /dev/null +++ b/task-sdk/src/airflow/sdk/observability/traces/otel_tracer.py @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +from airflow.sdk._shared.observability.traces import otel_tracer +from airflow.sdk.configuration import conf + +if TYPE_CHECKING: + from airflow.sdk._shared.observability.traces.otel_tracer import OtelTrace + + +def get_otel_tracer(cls, use_simple_processor: bool = False) -> OtelTrace: + return otel_tracer.get_otel_tracer( + cls, + use_simple_processor, + host=conf.get("traces", "otel_host"), + port=conf.getint("traces", "otel_port"), + ssl_active=conf.getboolean("traces", "otel_ssl_active"), + otel_service=conf.get("traces", "otel_service"), + debug=conf.getboolean("traces", "otel_debugging_on"), + ) + + +def get_otel_tracer_for_task(cls) -> OtelTrace: + return get_otel_tracer(cls, use_simple_processor=True) diff --git a/task-sdk/tests/task_sdk/docs/test_public_api.py b/task-sdk/tests/task_sdk/docs/test_public_api.py index 99c57c42526ec..fc8331c586707 100644 --- a/task-sdk/tests/task_sdk/docs/test_public_api.py +++ b/task-sdk/tests/task_sdk/docs/test_public_api.py @@ -57,6 +57,7 @@ def test_airflow_sdk_no_unexpected_exports(): "configuration", "module_loading", "yaml", + "observability", } unexpected = actual - public - ignore assert not unexpected, f"Unexpected exports in airflow.sdk: {sorted(unexpected)}" diff --git a/task-sdk/tests/task_sdk/execution_time/test_sentry.py b/task-sdk/tests/task_sdk/execution_time/test_sentry.py index c4efb929052b4..c50dda40371c4 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_sentry.py +++ b/task-sdk/tests/task_sdk/execution_time/test_sentry.py @@ -26,8 +26,8 @@ import pytest import uuid6 -from airflow._shared.timezones import timezone from airflow.providers.standard.operators.python import PythonOperator +from airflow.sdk._shared.timezones import timezone from airflow.sdk.api.datamodels._generated import DagRun, DagRunState, DagRunType from airflow.sdk.execution_time.comms import GetTaskBreadcrumbs, TaskBreadcrumbsResult from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance