Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions airflow-core/docs/core-concepts/executor/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -308,12 +308,10 @@ Compatibility Attributes
The ``BaseExecutor`` class interface contains a set of attributes that Airflow core code uses to check the features that your executor is compatible with. When writing your own Airflow executor be sure to set these correctly for your use case. Each attribute is simply a boolean to enable/disable a feature or indicate that a feature is supported/unsupported by the executor:

* ``supports_pickling``: Whether or not the executor supports reading pickled Dags from the Database before execution (rather than reading the Dag definition from the file system).
* ``supports_sentry``: Whether or not the executor supports `Sentry <https://sentry.io>`_.

* ``sentry_integration``: If the executor supports `Sentry <https://sentry.io>`_, this should be a import path to a callable that creates the integration. For example, ``CeleryExecutor`` sets this to ``"sentry_sdk.integrations.celery.CeleryIntegration"``.
* ``is_local``: Whether or not the executor is remote or local. See the `Executor Types`_ section above.
* ``is_single_threaded``: Whether or not the executor is single threaded. This is particularly relevant to what database backends are supported. Single threaded executors can run with any backend, including SQLite.
* ``is_production``: Whether or not the executor should be used for production purposes. A UI message is displayed to users when they are using a non-production ready executor.

* ``serve_logs``: Whether or not the executor supports serving logs, see :doc:`/administration-and-deployment/logging-monitoring/logging-tasks`.

CLI
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class BaseExecutor(LoggingMixin):
active_spans = ThreadSafeDict()

supports_ad_hoc_ti_run: bool = False
supports_sentry: bool = False
sentry_integration: str = ""

is_local: bool = False
is_production: bool = True
Expand Down
3 changes: 3 additions & 0 deletions airflow-core/src/airflow/executors/workloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class ExecuteTask(BaseDagBundleWorkload):
"""Execute the given Task."""

ti: TaskInstance
sentry_integration: str = ""

type: Literal["ExecuteTask"] = Field(init=False, default="ExecuteTask")

Expand All @@ -122,6 +123,7 @@ def make(
dag_rel_path: Path | None = None,
generator: JWTGenerator | None = None,
bundle_info: BundleInfo | None = None,
sentry_integration: str = "",
) -> ExecuteTask:
from airflow.utils.helpers import log_filename_template_renderer

Expand All @@ -140,6 +142,7 @@ def make(
token=cls.generate_token(str(ti.id), generator),
log_path=fname,
bundle_info=bundle_info,
sentry_integration=sentry_integration,
)


Expand Down
24 changes: 23 additions & 1 deletion airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -791,13 +791,35 @@ def _enqueue_task_instances_with_queued_state(
:param executor: The executor to enqueue tasks for
:param session: The session object
"""

def _get_sentry_integration(executor: BaseExecutor) -> str:
try:
sentry_integration = executor.sentry_integration
except AttributeError:
# Old executor interface hard-codes the supports_sentry flag.
if getattr(executor, "supports_sentry", False):
return "sentry_sdk.integrations.celery.CeleryIntegration"
return ""
if not isinstance(sentry_integration, str):
self.log.warning(
"Ignoring invalid sentry_integration on executor",
executor=executor,
sentry_integration=sentry_integration,
)
return ""
return sentry_integration

# actually enqueue them
for ti in task_instances:
if ti.dag_run.state in State.finished_dr_states:
ti.set_state(None, session=session)
continue

workload = workloads.ExecuteTask.make(ti, generator=executor.jwt_generator)
workload = workloads.ExecuteTask.make(
ti,
generator=executor.jwt_generator,
sentry_integration=_get_sentry_integration(executor),
)
executor.queue_workload(workload, session=session)

def _critical_section_enqueue_task_instances(self, session: Session) -> int:
Expand Down
6 changes: 3 additions & 3 deletions airflow-core/tests/unit/executors/test_base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@
from airflow.executors import workloads
from airflow.executors.base_executor import BaseExecutor, RunningRetryAttemptType
from airflow.executors.local_executor import LocalExecutor
from airflow.models.baseoperator import BaseOperator
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
from airflow.sdk import BaseOperator
from airflow.utils.state import State, TaskInstanceState

from tests_common.test_utils.markers import skip_if_force_lowest_dependencies_marker


def test_supports_sentry():
assert not BaseExecutor.supports_sentry
def test_sentry_integration():
assert not BaseExecutor.sentry_integration


def test_is_local_default_value():
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/tests/unit/executors/test_local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
class TestLocalExecutor:
TEST_SUCCESS_COMMANDS = 5

def test_supports_sentry(self):
assert not LocalExecutor.supports_sentry
def test_sentry_integration(self):
assert not LocalExecutor.sentry_integration

def test_is_local_default_value(self):
assert LocalExecutor.is_local
Expand Down
2 changes: 2 additions & 0 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,12 @@ def mock_executors(self):
default_executor.name = ExecutorName(alias="default_exec", module_path="default.exec.module.path")
default_executor.jwt_generator = mock_jwt_generator
default_executor.team_name = None # Global executor
default_executor.sentry_integration = ""
second_executor = mock.MagicMock(name="SeconadaryExecutor", slots_available=8, slots_occupied=0)
second_executor.name = ExecutorName(alias="secondary_exec", module_path="secondary.exec.module.path")
second_executor.jwt_generator = mock_jwt_generator
second_executor.team_name = None # Global executor
second_executor.sentry_integration = ""

# TODO: Task-SDK Make it look like a bound method. Needed until we remove the old queue_workload
# interface from executors
Expand Down
7 changes: 6 additions & 1 deletion devel-common/src/tests_common/pytest_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2490,6 +2490,11 @@ def _create_task_instance(
if upstream_map_indexes is not None:
ti_context.upstream_map_indexes = upstream_map_indexes

compat_fields = {
"requests_fd": 0,
"sentry_integration": "",
}

startup_details = StartupDetails(
ti=TaskInstance(
id=ti_id,
Expand All @@ -2505,7 +2510,7 @@ def _create_task_instance(
ti_context=ti_context,
start_date=start_date, # type: ignore
# Back-compat of task-sdk. Only affects us when we manually create these objects in tests.
**({"requests_fd": 0} if "requests_fd" in StartupDetails.model_fields else {}), # type: ignore
**{k: v for k, v in compat_fields.items() if k in StartupDetails.model_fields}, # type: ignore
)

ti = mocked_parse(startup_details, dag_id, task)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ class CeleryExecutor(BaseExecutor):
"""

supports_ad_hoc_ti_run: bool = True
sentry_integration: str = "sentry_sdk.integrations.celery.CeleryIntegration"

# TODO: Remove this flag once providers depend on Airflow 3.2.
supports_sentry: bool = True

if TYPE_CHECKING and AIRFLOW_V_3_0_PLUS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from tests_common.test_utils import db
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.dag import sync_dag_to_db
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS, AIRFLOW_V_3_2_PLUS

if AIRFLOW_V_3_0_PLUS:
from airflow.models.dag_version import DagVersion
Expand Down Expand Up @@ -121,6 +121,11 @@ def teardown_method(self) -> None:
db.clear_db_runs()
db.clear_db_jobs()

@pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Airflow 3.2+ prefers new configuration")
def test_sentry_integration(self):
assert CeleryExecutor.sentry_integration == "sentry_sdk.integrations.celery.CeleryIntegration"

@pytest.mark.skipif(AIRFLOW_V_3_2_PLUS, reason="Test only for Airflow < 3.2")
def test_supports_sentry(self):
assert CeleryExecutor.supports_sentry

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from kubernetes.client.rest import ApiException
from urllib3 import HTTPResponse

from airflow import __version__
from airflow.exceptions import AirflowException
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.cncf.kubernetes import pod_generator
Expand Down Expand Up @@ -64,11 +63,12 @@
from airflow.utils.state import State, TaskInstanceState

from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS

if __version__.startswith("2."):
LOGICAL_DATE_KEY = "execution_date"
else:
if AIRFLOW_V_3_0_PLUS:
LOGICAL_DATE_KEY = "logical_date"
else:
LOGICAL_DATE_KEY = "execution_date"


class TestAirflowKubernetesScheduler:
Expand Down Expand Up @@ -1438,6 +1438,11 @@ def test_get_task_log(self, mock_get_kube_client, create_task_instance_of_operat
"Reading from k8s pod logs failed: error_fetching_pod_log",
]

@pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Airflow 3.2+ prefers new configuration")
def test_sentry_integration(self):
assert not KubernetesExecutor.sentry_integration

@pytest.mark.skipif(AIRFLOW_V_3_2_PLUS, reason="Test only for Airflow < 3.2")
def test_supports_sentry(self):
assert not KubernetesExecutor.supports_sentry

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,10 @@ components:
title: Log Path
ti:
$ref: '#/components/schemas/TaskInstance'
sentry_integration:
type: string
title: Sentry Integration
default: ''
type:
type: string
const: ExecuteTask
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ export const $ExecuteTask = {
ti: {
'$ref': '#/components/schemas/TaskInstance'
},
sentry_integration: {
type: 'string',
title: 'Sentry Integration',
default: ''
},
type: {
type: 'string',
const: 'ExecuteTask',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export type ExecuteTask = {
bundle_info: BundleInfo;
log_path: string | null;
ti: TaskInstance;
sentry_integration?: string;
type?: "ExecuteTask";
};

Expand Down
2 changes: 1 addition & 1 deletion providers/edge3/www-hash.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
f1bc9efa109cfcd7587f48af7dd45c4a1f50c15cef2f00fd54bc7bdd61857177
cb1cf4eb7022cf4ec04aa222cb39090796117014cb2dd8c9353955d2148e28a3
1 change: 1 addition & 0 deletions task-sdk/src/airflow/sdk/definitions/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1339,6 +1339,7 @@ def test(
ti,
dag_rel_path=Path(self.fileloc),
generator=executor.jwt_generator,
sentry_integration=executor.sentry_integration,
# For the system test/debug purpose, we use the default bundle which uses
# local file system. If it turns out to be a feature people want, we could
# plumb the Bundle to use as a parameter to dag.test
Expand Down
1 change: 1 addition & 0 deletions task-sdk/src/airflow/sdk/execution_time/comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ class StartupDetails(BaseModel):
bundle_info: BundleInfo
start_date: datetime
ti_context: TIRunContext
sentry_integration: str
type: Literal["StartupDetails"] = "StartupDetails"


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def execute_workload(workload: ExecuteTask) -> None:
token=workload.token,
server=server,
log_path=workload.log_path,
sentry_integration=workload.sentry_integration,
# Include the output of the task to stdout too, so that in process logs can be read from via the
# kubeapi as pod logs.
subprocess_logs_to_stdout=True,
Expand Down
31 changes: 17 additions & 14 deletions task-sdk/src/airflow/sdk/execution_time/sentry/configured.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
from __future__ import annotations

import functools
from typing import TYPE_CHECKING
import importlib
from typing import TYPE_CHECKING, Any

import sentry_sdk
import sentry_sdk.integrations.logging
Expand Down Expand Up @@ -62,7 +63,7 @@ class ConfiguredSentry(NoopSentry):
)
)

def __init__(self):
def prepare_to_enrich_errors(self, executor_integration: str) -> None:
"""Initialize the Sentry SDK."""
from airflow.sdk.configuration import conf

Expand All @@ -71,16 +72,14 @@ def __init__(self):
# LoggingIntegration is set by default.
integrations = []

# TODO: How can we get executor info in the runner to support this?
# executor_class, _ = ExecutorLoader.import_default_executor_cls()
# if executor_class.supports_sentry:
# from sentry_sdk.integrations.celery import CeleryIntegration
if executor_integration:
try:
mod_p, cls_n = executor_integration.rsplit(".", 1)
integrations.append(getattr(importlib.import_module(mod_p), cls_n)())
except Exception:
log.exception("Invalid executor Sentry integration", import_path=executor_integration)

# sentry_celery = CeleryIntegration()
# integrations.append(sentry_celery)

dsn = None
sentry_config_opts = conf.getsection("sentry") or {}
sentry_config_opts: dict[str, Any] = conf.getsection("sentry") or {}
if sentry_config_opts:
sentry_config_opts.pop("sentry_on")
old_way_dsn = sentry_config_opts.pop("sentry_dsn", None)
Expand All @@ -93,9 +92,12 @@ def __init__(self):
"There are unsupported options in [sentry] section",
options=unsupported_options,
)

sentry_config_opts["before_send"] = conf.getimport("sentry", "before_send", fallback=None)
sentry_config_opts["transport"] = conf.getimport("sentry", "transport", fallback=None)
else:
dsn = None
if before_send := conf.getimport("sentry", "before_send", fallback=None):
sentry_config_opts["before_send"] = before_send
if transport := conf.getimport("sentry", "transport", fallback=None):
sentry_config_opts["transport"] = transport

if dsn:
sentry_sdk.init(dsn=dsn, integrations=integrations, **sentry_config_opts)
Expand Down Expand Up @@ -137,6 +139,7 @@ def enrich_errors(self, run: Run) -> Run:

@functools.wraps(run)
def wrapped_run(ti: RuntimeTaskInstance, context: Context, log: Logger) -> RunReturn:
self.prepare_to_enrich_errors(ti.sentry_integration)
with sentry_sdk.push_scope():
try:
self.add_tagging(context["dag_run"], ti)
Expand Down
Loading
Loading