Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
71421c2
move metrics and traces packages under observability
xBis7 Sep 28, 2025
a98fc65
rename tracer to base_tracer
xBis7 Sep 28, 2025
5460548
fix backwards compatibility issues with providers
xBis7 Sep 30, 2025
276a5a1
move observability package under shared
xBis7 Oct 11, 2025
e8db75c
change the imports
xBis7 Oct 13, 2025
16cca8e
providers get packages from sdk observability
xBis7 Oct 13, 2025
3c725eb
fix task-sdk imports for backwards compat
xBis7 Oct 13, 2025
288f731
fix providers leftover imports
xBis7 Oct 14, 2025
15cef85
moved tests under shared
xBis7 Oct 14, 2025
d909eb5
fix failing test
xBis7 Oct 14, 2025
37ab2ed
fix observability dependencies
xBis7 Oct 14, 2025
9106524
fix errors in ci image checks
xBis7 Nov 6, 2025
a535864
fix wrong import in edge_executor
xBis7 Nov 7, 2025
0199d05
make dependencies optional
xBis7 Nov 7, 2025
f37c57a
fix duplicate dependencies
xBis7 Nov 7, 2025
09c9aee
more duplicate dependencies fixed
xBis7 Nov 7, 2025
a56287d
fixup! more duplicate dependencies fixed
potiuk Nov 11, 2025
2a8b677
fixup! fixup! more duplicate dependencies fixed
potiuk Nov 11, 2025
704d2ac
reorganize imports
xBis7 Nov 14, 2025
346ac14
fix failing tests
xBis7 Nov 15, 2025
74e1c79
cleanup airflow.utils and airflow.models imports
xBis7 Nov 17, 2025
6a7fcf8
move InvalidStatsNameException under the new package
xBis7 Nov 18, 2025
0cf60e4
fix pyproject accidental removal
xBis7 Nov 20, 2025
f462e95
add the observability package to the project workspace
xBis7 Nov 21, 2025
146949b
metrics
xBis7 Nov 24, 2025
d7b43df
traces
xBis7 Nov 24, 2025
3f90bfd
fix shared tests
xBis7 Nov 25, 2025
5f222f1
fix shared observability tests
xBis7 Nov 25, 2025
2e1cc9a
fix stats and traces imports
xBis7 Nov 25, 2025
5f0d2dd
fix otel integration tests
xBis7 Nov 26, 2025
2fec150
fix ruff error
xBis7 Nov 27, 2025
f9a1bf3
remove unused dependencies
xBis7 Nov 27, 2025
10b1a78
add sdk stats
xBis7 Nov 27, 2025
10dda6b
add observability factory methods under task-sdk
xBis7 Nov 27, 2025
8bf750f
remove imports from other shared modules
xBis7 Nov 27, 2025
9443906
move stats_class check to the factory method
xBis7 Nov 27, 2025
925affb
use relative imports for _shared in observability
xBis7 Nov 28, 2025
eaf5882
add Stats imports for providers in common sdk
xBis7 Nov 28, 2025
ef3605c
disable ban-relative-imports check
xBis7 Nov 28, 2025
7899fba
fix sdk/test_public_api.py
xBis7 Nov 29, 2025
80fb034
adjust pyproject toml for relative imports
xBis7 Nov 29, 2025
30ce940
fix mypy providers error
xBis7 Nov 29, 2025
ddb2e7f
upgrade providers-common-compat version where needed
xBis7 Nov 30, 2025
f4f7972
add 'use next version' comment where needed
xBis7 Nov 30, 2025
598f938
undo version upgrade for openlineage common-compat
xBis7 Nov 30, 2025
bc91ecd
fix observability imports for the task-sdk
xBis7 Nov 30, 2025
925122b
remove config import from observability tests
xBis7 Nov 30, 2025
bced033
move tests under core where needed
xBis7 Dec 1, 2025
dcd6739
fix failing test_pytest_args_for_test_types
xBis7 Dec 1, 2025
8465a5b
add observability to the task-sdk public api
xBis7 Dec 2, 2025
600b776
add deprecation warning when accessing Stats from the old path
xBis7 Dec 3, 2025
e50f3b1
make the factory method params, keywords-only
xBis7 Dec 3, 2025
7cd8ac6
use pendulum instead of datetime in otel_tracer
xBis7 Dec 3, 2025
8b8168f
finish making the methods keywords-only
xBis7 Dec 3, 2025
b67575f
fix mypy errors
xBis7 Dec 3, 2025
61c1691
fix imports for otel test dags
xBis7 Dec 3, 2025
0d74d25
fix static checks error
xBis7 Dec 3, 2025
099cf22
add Trace to the sdk docs
xBis7 Dec 4, 2025
88ffc7e
remove test for deprecation warning in test_stats.py
xBis7 Dec 4, 2025
912e9bc
remove configuration dependency from shared observability
xBis7 Dec 4, 2025
db24c3b
remove leftover dependencies from airflow-core
xBis7 Dec 4, 2025
650c995
add deprecation warnings for metrics and traces
xBis7 Dec 4, 2025
cd0a464
add methodtools dependency to observability
xBis7 Dec 4, 2025
5a90c0b
fix test_docs_inventory failure
xBis7 Dec 4, 2025
18a4de9
Merge branch 'main' into observability-refactoring
amoghrajesh Dec 4, 2025
5b715fd
Merge branch 'main' into observability-refactoring
amoghrajesh Dec 5, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airflow-core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ exclude = [
"../shared/secrets_masker/src/airflow_shared/secrets_masker" = "src/airflow/_shared/secrets_masker"
"../shared/timezones/src/airflow_shared/timezones" = "src/airflow/_shared/timezones"
"../shared/secrets_backend/src/airflow_shared/secrets_backend" = "src/airflow/_shared/secrets_backend"
"../shared/observability/src/airflow_shared/observability" = "src/airflow/_shared/observability"

[tool.hatch.build.targets.custom]
path = "./hatch_build.py"
Expand Down Expand Up @@ -299,4 +300,5 @@ shared_distributions = [
"apache-airflow-shared-secrets-backend",
"apache-airflow-shared-secrets-masker",
"apache-airflow-shared-timezones",
"apache-airflow-shared-observability",
]
4 changes: 4 additions & 0 deletions airflow-core/src/airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@
# Deprecated lazy imports
"AirflowException": (".exceptions", "AirflowException", True),
"Dataset": (".sdk", "Asset", True),
"Stats": (".observability.stats", "Stats", True),
"Trace": (".observability.trace", "Trace", True),
"metrics": (".observability.metrics", "", True),
"traces": (".observability.traces", "", True),
}
if TYPE_CHECKING:
# These objects are imported by PEP-562, however, static analyzers and IDE's
Expand Down
1 change: 1 addition & 0 deletions airflow-core/src/airflow/_shared/observability
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/assets/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
DagScheduleAssetUriReference,
PartitionedAssetKeyLog,
)
from airflow.stats import Stats
from airflow.observability.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.sqlalchemy import get_dialect_name

Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/cli/commands/daemon_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def run_command_with_daemon_option(

with ctx:
# in daemon context stats client needs to be reinitialized.
from airflow.stats import Stats
from airflow.observability.stats import Stats

Stats.instance = None
callback()
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@
from airflow.models.dagwarning import DagWarning
from airflow.models.db_callback_request import DbCallbackRequest
from airflow.models.errors import ParseImportError
from airflow.observability.stats import Stats
from airflow.observability.trace import DebugTrace
from airflow.sdk import SecretCache
from airflow.sdk.log import init_log_file, logging_processors
from airflow.stats import Stats
from airflow.traces.tracer import DebugTrace
from airflow.utils.file import list_py_file_paths, might_contain_dag
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
)
from airflow.configuration import conf
from airflow.dag_processing.dagbag import DagBag
from airflow.observability.stats import Stats
from airflow.sdk.exceptions import TaskNotFound
from airflow.sdk.execution_time.comms import (
ConnectionResult,
Expand Down Expand Up @@ -66,7 +67,6 @@
from airflow.sdk.execution_time.supervisor import WatchedSubprocess
from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance, _send_error_email_notification
from airflow.serialization.serialized_objects import LazyDeserializedDAG, SerializedDAG
from airflow.stats import Stats
from airflow.utils.file import iter_airflow_imports
from airflow.utils.state import TaskInstanceState

Expand Down
15 changes: 5 additions & 10 deletions airflow-core/src/airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@

import pendulum

from airflow._shared.observability.traces import NO_TRACE_ID
from airflow.cli.cli_config import DefaultHelpParser
from airflow.configuration import conf
from airflow.executors import workloads
from airflow.executors.executor_loader import ExecutorLoader
from airflow.models import Log
from airflow.stats import Stats
from airflow.traces import NO_TRACE_ID
from airflow.traces.tracer import DebugTrace, Trace, add_debug_span, gen_context
from airflow.traces.utils import gen_span_id_from_ti_key
from airflow.observability.stats import Stats
from airflow.observability.trace import DebugTrace, Trace, add_debug_span
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import TaskInstanceState
from airflow.utils.thread_safe_dict import ThreadSafeDict
Expand Down Expand Up @@ -419,11 +418,9 @@ def fail(self, key: TaskInstanceKey, info=None) -> None:
"""
trace_id = Trace.get_current_span().get_span_context().trace_id
if trace_id != NO_TRACE_ID:
span_id = int(gen_span_id_from_ti_key(key, as_int=True))
with DebugTrace.start_span(
with DebugTrace.start_child_span(
span_name="fail",
component="BaseExecutor",
parent_sc=gen_context(trace_id=trace_id, span_id=span_id),
) as span:
span.set_attributes(
{
Expand All @@ -446,11 +443,9 @@ def success(self, key: TaskInstanceKey, info=None) -> None:
"""
trace_id = Trace.get_current_span().get_span_context().trace_id
if trace_id != NO_TRACE_ID:
span_id = int(gen_span_id_from_ti_key(key, as_int=True))
with DebugTrace.start_span(
with DebugTrace.start_child_span(
span_name="success",
component="BaseExecutor",
parent_sc=gen_context(trace_id=trace_id, span_id=span_id),
) as span:
span.set_attributes(
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from airflow.jobs.base_job_runner import BaseJobRunner
from airflow.jobs.job import Job, perform_heartbeat
from airflow.stats import Stats
from airflow.observability.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
from airflow.executors.executor_loader import ExecutorLoader
from airflow.listeners.listener import get_listener_manager
from airflow.models.base import ID_LEN, Base
from airflow.stats import Stats
from airflow.traces.tracer import DebugTrace, add_debug_span
from airflow.observability.stats import Stats
from airflow.observability.trace import DebugTrace, add_debug_span
from airflow.utils.helpers import convert_camel_to_snake
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
Expand Down
13 changes: 4 additions & 9 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,11 @@
from airflow.models.taskinstance import TaskInstance
from airflow.models.team import Team
from airflow.models.trigger import TRIGGER_FAIL_REPR, Trigger, TriggerFailureReason
from airflow.observability.stats import Stats
from airflow.observability.trace import DebugTrace, Trace, add_debug_span
from airflow.serialization.definitions.notset import NOTSET
from airflow.stats import Stats
from airflow.ti_deps.dependencies_states import EXECUTION_STATES
from airflow.timetables.simple import AssetTriggeredTimetable
from airflow.traces import utils as trace_utils
from airflow.traces.tracer import DebugTrace, Trace, add_debug_span
from airflow.utils.dates import datetime_to_nano
from airflow.utils.event_scheduler import EventScheduler
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down Expand Up @@ -2085,12 +2084,8 @@ def _schedule_dag_run(
:param dag_run: The DagRun to schedule
:return: Callback that needs to be executed
"""
trace_id = int(trace_utils.gen_trace_id(dag_run=dag_run, as_int=True))
span_id = int(trace_utils.gen_dag_span_id(dag_run=dag_run, as_int=True))
links = [{"trace_id": trace_id, "span_id": span_id}]

with DebugTrace.start_span(
span_name="_schedule_dag_run", component="SchedulerJobRunner", links=links
with DebugTrace.start_root_span(
span_name="_schedule_dag_run", component="SchedulerJobRunner"
) as span:
span.set_attributes(
{
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
from airflow.jobs.base_job_runner import BaseJobRunner
from airflow.jobs.job import perform_heartbeat
from airflow.models.trigger import Trigger
from airflow.observability.stats import Stats
from airflow.observability.trace import DebugTrace, Trace, add_debug_span
from airflow.sdk.api.datamodels._generated import HITLDetailResponse
from airflow.sdk.execution_time.comms import (
CommsDecoder,
Expand Down Expand Up @@ -73,8 +75,6 @@
_RequestFrame,
)
from airflow.sdk.execution_time.supervisor import WatchedSubprocess, make_buffered_socket_reader
from airflow.stats import Stats
from airflow.traces.tracer import DebugTrace, Trace, add_debug_span
from airflow.triggers import base as events
from airflow.utils.helpers import log_filename_template_renderer
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from airflow._shared.timezones import timezone
from airflow.models import Base
from airflow.stats import Stats
from airflow.observability.stats import Stats
from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime, mapped_column

if TYPE_CHECKING:
Expand Down
5 changes: 3 additions & 2 deletions airflow-core/src/airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@
from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
from airflow.models.tasklog import LogTemplate
from airflow.models.taskmap import TaskMap
from airflow.observability.stats import Stats
from airflow.observability.trace import Trace
from airflow.sdk.definitions.deadline import DeadlineReference
from airflow.serialization.definitions.notset import NOTSET, ArgNotSet, is_arg_set
from airflow.stats import Stats
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_states import SCHEDULEABLE_STATES
from airflow.traces.tracer import EmptySpan, Trace
from airflow.utils.dates import datetime_to_nano
from airflow.utils.helpers import chunks, is_container, prune_dict
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down Expand Up @@ -102,6 +102,7 @@
from sqlalchemy.orm import Session
from sqlalchemy.sql.elements import Case, ColumnElement

from airflow._shared.observability.traces.base_tracer import EmptySpan
from airflow.models.dag_version import DagVersion
from airflow.models.mappedoperator import MappedOperator
from airflow.models.taskinstancekey import TaskInstanceKey
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/deadline.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from airflow._shared.timezones import timezone
from airflow.models.base import Base
from airflow.models.callback import Callback, CallbackDefinitionProtocol
from airflow.stats import Stats
from airflow.observability.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
from airflow.utils.sqlalchemy import UtcDateTime, get_dialect_name, mapped_column
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@
from airflow.models.taskmap import TaskMap
from airflow.models.taskreschedule import TaskReschedule
from airflow.models.xcom import XCOM_RETURN_KEY, LazyXComSelectSequence, XComModel
from airflow.observability.stats import Stats
from airflow.settings import task_instance_mutation_hook
from airflow.stats import Stats
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import REQUEUEABLE_DEPS, RUNNING_DEPS
from airflow.utils.helpers import prune_dict
Expand Down
16 changes: 16 additions & 0 deletions airflow-core/src/airflow/observability/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
40 changes: 40 additions & 0 deletions airflow-core/src/airflow/observability/metrics/datadog_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING

from airflow._shared.observability.metrics import datadog_logger
from airflow.configuration import conf

if TYPE_CHECKING:
from airflow._shared.observability.metrics.datadog_logger import SafeDogStatsdLogger


def get_dogstatsd_logger(cls) -> SafeDogStatsdLogger:
return datadog_logger.get_dogstatsd_logger(
cls,
host=conf.get("metrics", "statsd_host"),
port=conf.getint("metrics", "statsd_port"),
namespace=conf.get("metrics", "statsd_prefix"),
datadog_metrics_tags=conf.getboolean("metrics", "statsd_datadog_metrics_tags", fallback=True),
statsd_disabled_tags=conf.get("metrics", "statsd_disabled_tags", fallback=None),
metrics_allow_list=conf.get("metrics", "metrics_allow_list", fallback=None),
metrics_block_list=conf.get("metrics", "metrics_block_list", fallback=None),
stat_name_handler=conf.getimport("metrics", "stat_name_handler"),
statsd_influxdb_enabled=conf.getboolean("metrics", "statsd_influxdb_enabled", fallback=False),
)
43 changes: 43 additions & 0 deletions airflow-core/src/airflow/observability/metrics/otel_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING

from airflow._shared.observability.metrics import otel_logger
from airflow.configuration import conf

if TYPE_CHECKING:
from airflow._shared.observability.metrics.otel_logger import SafeOtelLogger


def get_otel_logger(cls) -> SafeOtelLogger:
return otel_logger.get_otel_logger(
cls,
host=conf.get("metrics", "otel_host"), # ex: "breeze-otel-collector"
port=conf.getint("metrics", "otel_port"), # ex: 4318
prefix=conf.get("metrics", "otel_prefix"), # ex: "airflow"
ssl_active=conf.getboolean("metrics", "otel_ssl_active"),
# PeriodicExportingMetricReader will default to an interval of 60000 millis.
conf_interval=conf.getfloat("metrics", "otel_interval_milliseconds", fallback=None), # ex: 30000
debug=conf.getboolean("metrics", "otel_debugging_on"),
service_name=conf.get("metrics", "otel_service"),
metrics_allow_list=conf.get("metrics", "metrics_allow_list", fallback=None),
metrics_block_list=conf.get("metrics", "metrics_block_list", fallback=None),
stat_name_handler=conf.getimport("metrics", "stat_name_handler"),
statsd_influxdb_enabled=conf.getboolean("metrics", "statsd_influxdb_enabled", fallback=False),
)
Loading
Loading