From 82bbb6a86fae24a38a7314f291ea5fcb87d74854 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 3 Nov 2025 14:51:10 +0800 Subject: [PATCH] Send executor integration info in workload --- .../docs/core-concepts/executor/index.rst | 4 +- .../src/airflow/executors/base_executor.py | 2 +- .../src/airflow/executors/workloads.py | 3 ++ .../src/airflow/jobs/scheduler_job_runner.py | 24 +++++++++- .../unit/executors/test_base_executor.py | 6 +-- .../unit/executors/test_local_executor.py | 4 +- .../tests/unit/jobs/test_scheduler_job.py | 2 + .../src/tests_common/pytest_plugin.py | 7 ++- .../celery/executors/celery_executor.py | 3 ++ .../celery/executors/test_celery_executor.py | 7 ++- .../executors/test_kubernetes_executor.py | 13 ++++-- .../edge3/openapi/v2-edge-generated.yaml | 4 ++ .../www/openapi-gen/requests/schemas.gen.ts | 5 ++ .../www/openapi-gen/requests/types.gen.ts | 1 + providers/edge3/www-hash.txt | 2 +- task-sdk/src/airflow/sdk/definitions/dag.py | 1 + .../src/airflow/sdk/execution_time/comms.py | 1 + .../sdk/execution_time/execute_workload.py | 1 + .../sdk/execution_time/sentry/configured.py | 31 +++++++------ .../airflow/sdk/execution_time/supervisor.py | 22 ++++++++- .../airflow/sdk/execution_time/task_runner.py | 3 ++ .../task_sdk/execution_time/test_comms.py | 1 + .../task_sdk/execution_time/test_sentry.py | 46 +++++++++++++++---- .../execution_time/test_task_runner.py | 11 +++++ 24 files changed, 163 insertions(+), 41 deletions(-) diff --git a/airflow-core/docs/core-concepts/executor/index.rst b/airflow-core/docs/core-concepts/executor/index.rst index 7a4b5e88fe353..d27b306c3dc97 100644 --- a/airflow-core/docs/core-concepts/executor/index.rst +++ b/airflow-core/docs/core-concepts/executor/index.rst @@ -308,12 +308,10 @@ Compatibility Attributes The ``BaseExecutor`` class interface contains a set of attributes that Airflow core code uses to check the features that your executor is compatible with. When writing your own Airflow executor be sure to set these correctly for your use case. Each attribute is simply a boolean to enable/disable a feature or indicate that a feature is supported/unsupported by the executor: * ``supports_pickling``: Whether or not the executor supports reading pickled Dags from the Database before execution (rather than reading the Dag definition from the file system). -* ``supports_sentry``: Whether or not the executor supports `Sentry `_. - +* ``sentry_integration``: If the executor supports `Sentry `_, this should be a import path to a callable that creates the integration. For example, ``CeleryExecutor`` sets this to ``"sentry_sdk.integrations.celery.CeleryIntegration"``. * ``is_local``: Whether or not the executor is remote or local. See the `Executor Types`_ section above. * ``is_single_threaded``: Whether or not the executor is single threaded. This is particularly relevant to what database backends are supported. Single threaded executors can run with any backend, including SQLite. * ``is_production``: Whether or not the executor should be used for production purposes. A UI message is displayed to users when they are using a non-production ready executor. - * ``serve_logs``: Whether or not the executor supports serving logs, see :doc:`/administration-and-deployment/logging-monitoring/logging-tasks`. CLI diff --git a/airflow-core/src/airflow/executors/base_executor.py b/airflow-core/src/airflow/executors/base_executor.py index 70b9ba9ef0843..985a6bcab8bc3 100644 --- a/airflow-core/src/airflow/executors/base_executor.py +++ b/airflow-core/src/airflow/executors/base_executor.py @@ -129,7 +129,7 @@ class BaseExecutor(LoggingMixin): active_spans = ThreadSafeDict() supports_ad_hoc_ti_run: bool = False - supports_sentry: bool = False + sentry_integration: str = "" is_local: bool = False is_production: bool = True diff --git a/airflow-core/src/airflow/executors/workloads.py b/airflow-core/src/airflow/executors/workloads.py index c453f153d2132..7cf1aae60ff21 100644 --- a/airflow-core/src/airflow/executors/workloads.py +++ b/airflow-core/src/airflow/executors/workloads.py @@ -112,6 +112,7 @@ class ExecuteTask(BaseDagBundleWorkload): """Execute the given Task.""" ti: TaskInstance + sentry_integration: str = "" type: Literal["ExecuteTask"] = Field(init=False, default="ExecuteTask") @@ -122,6 +123,7 @@ def make( dag_rel_path: Path | None = None, generator: JWTGenerator | None = None, bundle_info: BundleInfo | None = None, + sentry_integration: str = "", ) -> ExecuteTask: from airflow.utils.helpers import log_filename_template_renderer @@ -140,6 +142,7 @@ def make( token=cls.generate_token(str(ti.id), generator), log_path=fname, bundle_info=bundle_info, + sentry_integration=sentry_integration, ) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index b1d007c15e02f..057b3b06e76f0 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -791,13 +791,35 @@ def _enqueue_task_instances_with_queued_state( :param executor: The executor to enqueue tasks for :param session: The session object """ + + def _get_sentry_integration(executor: BaseExecutor) -> str: + try: + sentry_integration = executor.sentry_integration + except AttributeError: + # Old executor interface hard-codes the supports_sentry flag. + if getattr(executor, "supports_sentry", False): + return "sentry_sdk.integrations.celery.CeleryIntegration" + return "" + if not isinstance(sentry_integration, str): + self.log.warning( + "Ignoring invalid sentry_integration on executor", + executor=executor, + sentry_integration=sentry_integration, + ) + return "" + return sentry_integration + # actually enqueue them for ti in task_instances: if ti.dag_run.state in State.finished_dr_states: ti.set_state(None, session=session) continue - workload = workloads.ExecuteTask.make(ti, generator=executor.jwt_generator) + workload = workloads.ExecuteTask.make( + ti, + generator=executor.jwt_generator, + sentry_integration=_get_sentry_integration(executor), + ) executor.queue_workload(workload, session=session) def _critical_section_enqueue_task_instances(self, session: Session) -> int: diff --git a/airflow-core/tests/unit/executors/test_base_executor.py b/airflow-core/tests/unit/executors/test_base_executor.py index 13ab21d3a41d9..ba540bbbddb9d 100644 --- a/airflow-core/tests/unit/executors/test_base_executor.py +++ b/airflow-core/tests/unit/executors/test_base_executor.py @@ -33,15 +33,15 @@ from airflow.executors import workloads from airflow.executors.base_executor import BaseExecutor, RunningRetryAttemptType from airflow.executors.local_executor import LocalExecutor -from airflow.models.baseoperator import BaseOperator from airflow.models.taskinstance import TaskInstance, TaskInstanceKey +from airflow.sdk import BaseOperator from airflow.utils.state import State, TaskInstanceState from tests_common.test_utils.markers import skip_if_force_lowest_dependencies_marker -def test_supports_sentry(): - assert not BaseExecutor.supports_sentry +def test_sentry_integration(): + assert not BaseExecutor.sentry_integration def test_is_local_default_value(): diff --git a/airflow-core/tests/unit/executors/test_local_executor.py b/airflow-core/tests/unit/executors/test_local_executor.py index 845f43a863455..b0261baf04e13 100644 --- a/airflow-core/tests/unit/executors/test_local_executor.py +++ b/airflow-core/tests/unit/executors/test_local_executor.py @@ -46,8 +46,8 @@ class TestLocalExecutor: TEST_SUCCESS_COMMANDS = 5 - def test_supports_sentry(self): - assert not LocalExecutor.supports_sentry + def test_sentry_integration(self): + assert not LocalExecutor.sentry_integration def test_is_local_default_value(self): assert LocalExecutor.is_local diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 2794b1005193b..3b7f15e2ee6e4 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -226,10 +226,12 @@ def mock_executors(self): default_executor.name = ExecutorName(alias="default_exec", module_path="default.exec.module.path") default_executor.jwt_generator = mock_jwt_generator default_executor.team_name = None # Global executor + default_executor.sentry_integration = "" second_executor = mock.MagicMock(name="SeconadaryExecutor", slots_available=8, slots_occupied=0) second_executor.name = ExecutorName(alias="secondary_exec", module_path="secondary.exec.module.path") second_executor.jwt_generator = mock_jwt_generator second_executor.team_name = None # Global executor + second_executor.sentry_integration = "" # TODO: Task-SDK Make it look like a bound method. Needed until we remove the old queue_workload # interface from executors diff --git a/devel-common/src/tests_common/pytest_plugin.py b/devel-common/src/tests_common/pytest_plugin.py index e70ebd0e6934c..6c32b001295ab 100644 --- a/devel-common/src/tests_common/pytest_plugin.py +++ b/devel-common/src/tests_common/pytest_plugin.py @@ -2490,6 +2490,11 @@ def _create_task_instance( if upstream_map_indexes is not None: ti_context.upstream_map_indexes = upstream_map_indexes + compat_fields = { + "requests_fd": 0, + "sentry_integration": "", + } + startup_details = StartupDetails( ti=TaskInstance( id=ti_id, @@ -2505,7 +2510,7 @@ def _create_task_instance( ti_context=ti_context, start_date=start_date, # type: ignore # Back-compat of task-sdk. Only affects us when we manually create these objects in tests. - **({"requests_fd": 0} if "requests_fd" in StartupDetails.model_fields else {}), # type: ignore + **{k: v for k, v in compat_fields.items() if k in StartupDetails.model_fields}, # type: ignore ) ti = mocked_parse(startup_details, dag_id, task) diff --git a/providers/celery/src/airflow/providers/celery/executors/celery_executor.py b/providers/celery/src/airflow/providers/celery/executors/celery_executor.py index c8107b9160ebe..58e83ff66d24a 100644 --- a/providers/celery/src/airflow/providers/celery/executors/celery_executor.py +++ b/providers/celery/src/airflow/providers/celery/executors/celery_executor.py @@ -289,6 +289,9 @@ class CeleryExecutor(BaseExecutor): """ supports_ad_hoc_ti_run: bool = True + sentry_integration: str = "sentry_sdk.integrations.celery.CeleryIntegration" + + # TODO: Remove this flag once providers depend on Airflow 3.2. supports_sentry: bool = True if TYPE_CHECKING and AIRFLOW_V_3_0_PLUS: diff --git a/providers/celery/tests/unit/celery/executors/test_celery_executor.py b/providers/celery/tests/unit/celery/executors/test_celery_executor.py index c45090a0a4407..0a1a49345ed62 100644 --- a/providers/celery/tests/unit/celery/executors/test_celery_executor.py +++ b/providers/celery/tests/unit/celery/executors/test_celery_executor.py @@ -44,7 +44,7 @@ from tests_common.test_utils import db from tests_common.test_utils.config import conf_vars from tests_common.test_utils.dag import sync_dag_to_db -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS, AIRFLOW_V_3_2_PLUS if AIRFLOW_V_3_0_PLUS: from airflow.models.dag_version import DagVersion @@ -121,6 +121,11 @@ def teardown_method(self) -> None: db.clear_db_runs() db.clear_db_jobs() + @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Airflow 3.2+ prefers new configuration") + def test_sentry_integration(self): + assert CeleryExecutor.sentry_integration == "sentry_sdk.integrations.celery.CeleryIntegration" + + @pytest.mark.skipif(AIRFLOW_V_3_2_PLUS, reason="Test only for Airflow < 3.2") def test_supports_sentry(self): assert CeleryExecutor.supports_sentry diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index 24b8f4aff1be1..a80a031f66d6a 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -28,7 +28,6 @@ from kubernetes.client.rest import ApiException from urllib3 import HTTPResponse -from airflow import __version__ from airflow.exceptions import AirflowException from airflow.models.taskinstancekey import TaskInstanceKey from airflow.providers.cncf.kubernetes import pod_generator @@ -64,11 +63,12 @@ from airflow.utils.state import State, TaskInstanceState from tests_common.test_utils.config import conf_vars +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS -if __version__.startswith("2."): - LOGICAL_DATE_KEY = "execution_date" -else: +if AIRFLOW_V_3_0_PLUS: LOGICAL_DATE_KEY = "logical_date" +else: + LOGICAL_DATE_KEY = "execution_date" class TestAirflowKubernetesScheduler: @@ -1438,6 +1438,11 @@ def test_get_task_log(self, mock_get_kube_client, create_task_instance_of_operat "Reading from k8s pod logs failed: error_fetching_pod_log", ] + @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Airflow 3.2+ prefers new configuration") + def test_sentry_integration(self): + assert not KubernetesExecutor.sentry_integration + + @pytest.mark.skipif(AIRFLOW_V_3_2_PLUS, reason="Test only for Airflow < 3.2") def test_supports_sentry(self): assert not KubernetesExecutor.supports_sentry diff --git a/providers/edge3/src/airflow/providers/edge3/openapi/v2-edge-generated.yaml b/providers/edge3/src/airflow/providers/edge3/openapi/v2-edge-generated.yaml index 015902e16ed9b..36e8a52641126 100644 --- a/providers/edge3/src/airflow/providers/edge3/openapi/v2-edge-generated.yaml +++ b/providers/edge3/src/airflow/providers/edge3/openapi/v2-edge-generated.yaml @@ -950,6 +950,10 @@ components: title: Log Path ti: $ref: '#/components/schemas/TaskInstance' + sentry_integration: + type: string + title: Sentry Integration + default: '' type: type: string const: ExecuteTask diff --git a/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/schemas.gen.ts b/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/schemas.gen.ts index fe7c26596d2a5..11e48528f69e6 100644 --- a/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/schemas.gen.ts +++ b/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/schemas.gen.ts @@ -103,6 +103,11 @@ export const $ExecuteTask = { ti: { '$ref': '#/components/schemas/TaskInstance' }, + sentry_integration: { + type: 'string', + title: 'Sentry Integration', + default: '' + }, type: { type: 'string', const: 'ExecuteTask', diff --git a/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/types.gen.ts b/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/types.gen.ts index 6768f986c2080..12e2a71fd645b 100644 --- a/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/types.gen.ts +++ b/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/types.gen.ts @@ -56,6 +56,7 @@ export type ExecuteTask = { bundle_info: BundleInfo; log_path: string | null; ti: TaskInstance; + sentry_integration?: string; type?: "ExecuteTask"; }; diff --git a/providers/edge3/www-hash.txt b/providers/edge3/www-hash.txt index 399af71085acf..f328d185da2c3 100644 --- a/providers/edge3/www-hash.txt +++ b/providers/edge3/www-hash.txt @@ -1 +1 @@ -f1bc9efa109cfcd7587f48af7dd45c4a1f50c15cef2f00fd54bc7bdd61857177 +cb1cf4eb7022cf4ec04aa222cb39090796117014cb2dd8c9353955d2148e28a3 diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py b/task-sdk/src/airflow/sdk/definitions/dag.py index 862fedfa5e060..e0ce764d0160c 100644 --- a/task-sdk/src/airflow/sdk/definitions/dag.py +++ b/task-sdk/src/airflow/sdk/definitions/dag.py @@ -1339,6 +1339,7 @@ def test( ti, dag_rel_path=Path(self.fileloc), generator=executor.jwt_generator, + sentry_integration=executor.sentry_integration, # For the system test/debug purpose, we use the default bundle which uses # local file system. If it turns out to be a feature people want, we could # plumb the Bundle to use as a parameter to dag.test diff --git a/task-sdk/src/airflow/sdk/execution_time/comms.py b/task-sdk/src/airflow/sdk/execution_time/comms.py index df24085ea66f4..934667c5e66d4 100644 --- a/task-sdk/src/airflow/sdk/execution_time/comms.py +++ b/task-sdk/src/airflow/sdk/execution_time/comms.py @@ -280,6 +280,7 @@ class StartupDetails(BaseModel): bundle_info: BundleInfo start_date: datetime ti_context: TIRunContext + sentry_integration: str type: Literal["StartupDetails"] = "StartupDetails" diff --git a/task-sdk/src/airflow/sdk/execution_time/execute_workload.py b/task-sdk/src/airflow/sdk/execution_time/execute_workload.py index 477e66e88c60a..410c676eeb913 100644 --- a/task-sdk/src/airflow/sdk/execution_time/execute_workload.py +++ b/task-sdk/src/airflow/sdk/execution_time/execute_workload.py @@ -71,6 +71,7 @@ def execute_workload(workload: ExecuteTask) -> None: token=workload.token, server=server, log_path=workload.log_path, + sentry_integration=workload.sentry_integration, # Include the output of the task to stdout too, so that in process logs can be read from via the # kubeapi as pod logs. subprocess_logs_to_stdout=True, diff --git a/task-sdk/src/airflow/sdk/execution_time/sentry/configured.py b/task-sdk/src/airflow/sdk/execution_time/sentry/configured.py index fef3662b6825f..d833964621662 100644 --- a/task-sdk/src/airflow/sdk/execution_time/sentry/configured.py +++ b/task-sdk/src/airflow/sdk/execution_time/sentry/configured.py @@ -27,7 +27,8 @@ from __future__ import annotations import functools -from typing import TYPE_CHECKING +import importlib +from typing import TYPE_CHECKING, Any import sentry_sdk import sentry_sdk.integrations.logging @@ -62,7 +63,7 @@ class ConfiguredSentry(NoopSentry): ) ) - def __init__(self): + def prepare_to_enrich_errors(self, executor_integration: str) -> None: """Initialize the Sentry SDK.""" from airflow.sdk.configuration import conf @@ -71,16 +72,14 @@ def __init__(self): # LoggingIntegration is set by default. integrations = [] - # TODO: How can we get executor info in the runner to support this? - # executor_class, _ = ExecutorLoader.import_default_executor_cls() - # if executor_class.supports_sentry: - # from sentry_sdk.integrations.celery import CeleryIntegration + if executor_integration: + try: + mod_p, cls_n = executor_integration.rsplit(".", 1) + integrations.append(getattr(importlib.import_module(mod_p), cls_n)()) + except Exception: + log.exception("Invalid executor Sentry integration", import_path=executor_integration) - # sentry_celery = CeleryIntegration() - # integrations.append(sentry_celery) - - dsn = None - sentry_config_opts = conf.getsection("sentry") or {} + sentry_config_opts: dict[str, Any] = conf.getsection("sentry") or {} if sentry_config_opts: sentry_config_opts.pop("sentry_on") old_way_dsn = sentry_config_opts.pop("sentry_dsn", None) @@ -93,9 +92,12 @@ def __init__(self): "There are unsupported options in [sentry] section", options=unsupported_options, ) - - sentry_config_opts["before_send"] = conf.getimport("sentry", "before_send", fallback=None) - sentry_config_opts["transport"] = conf.getimport("sentry", "transport", fallback=None) + else: + dsn = None + if before_send := conf.getimport("sentry", "before_send", fallback=None): + sentry_config_opts["before_send"] = before_send + if transport := conf.getimport("sentry", "transport", fallback=None): + sentry_config_opts["transport"] = transport if dsn: sentry_sdk.init(dsn=dsn, integrations=integrations, **sentry_config_opts) @@ -137,6 +139,7 @@ def enrich_errors(self, run: Run) -> Run: @functools.wraps(run) def wrapped_run(ti: RuntimeTaskInstance, context: Context, log: Logger) -> RunReturn: + self.prepare_to_enrich_errors(ti.sentry_integration) with sentry_sdk.push_scope(): try: self.add_tagging(context["dag_run"], ti) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 51466535318fb..3fd3e23b15425 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -957,15 +957,28 @@ def start( # type: ignore[override] client: Client, target: Callable[[], None] = _subprocess_main, logger: FilteringBoundLogger | None = None, + sentry_integration: str = "", **kwargs, ) -> Self: """Fork and start a new subprocess to execute the given task.""" proc: Self = super().start(id=what.id, client=client, target=target, logger=logger, **kwargs) # Tell the task process what it needs to do! - proc._on_child_started(ti=what, dag_rel_path=dag_rel_path, bundle_info=bundle_info) + proc._on_child_started( + ti=what, + dag_rel_path=dag_rel_path, + bundle_info=bundle_info, + sentry_integration=sentry_integration, + ) return proc - def _on_child_started(self, ti: TaskInstance, dag_rel_path: str | os.PathLike[str], bundle_info): + def _on_child_started( + self, + *, + ti: TaskInstance, + dag_rel_path: str | os.PathLike[str], + bundle_info, + sentry_integration: str, + ) -> None: """Send startup message to the subprocess.""" self.ti = ti # type: ignore[assignment] start_date = datetime.now(tz=timezone.utc) @@ -987,6 +1000,7 @@ def _on_child_started(self, ti: TaskInstance, dag_rel_path: str | os.PathLike[st bundle_info=bundle_info, ti_context=ti_context, start_date=start_date, + sentry_integration=sentry_integration, ) # Send the message to tell the process what it needs to execute @@ -1943,6 +1957,7 @@ def supervise( log_path: str | None = None, subprocess_logs_to_stdout: bool = False, client: Client | None = None, + sentry_integration: str = "", ) -> int: """ Run a single task execution to completion. @@ -1956,6 +1971,8 @@ def supervise( :param log_path: Path to write logs, if required. :param subprocess_logs_to_stdout: Should task logs also be sent to stdout via the main logger. :param client: Optional preconfigured client for communication with the server (Mostly for tests). + :param sentry_integration: If the executor has a Sentry integration, import + path to a callable to initialize it (empty means no integration). :return: Exit code of the process. :raises ValueError: If server URL is empty or invalid. """ @@ -2029,6 +2046,7 @@ def supervise( logger=logger, bundle_info=bundle_info, subprocess_logs_to_stdout=subprocess_logs_to_stdout, + sentry_integration=sentry_integration, ) exit_code = process.wait() diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index dc3c9dba294a7..82807bba902b9 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -156,6 +156,8 @@ class RuntimeTaskInstance(TaskInstance): rendered_map_index: str | None = None + sentry_integration: str = "" + def __rich_repr__(self): yield "id", self.id yield "task_id", self.task_id @@ -679,6 +681,7 @@ def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance: max_tries=what.ti_context.max_tries, start_date=what.start_date, state=TaskInstanceState.RUNNING, + sentry_integration=what.sentry_integration, ) diff --git a/task-sdk/tests/task_sdk/execution_time/test_comms.py b/task-sdk/tests/task_sdk/execution_time/test_comms.py index fd5d352e14ddf..861b1d51c8ac6 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_comms.py +++ b/task-sdk/tests/task_sdk/execution_time/test_comms.py @@ -104,6 +104,7 @@ def test_recv_StartupDetails(self): "start_date": "2024-12-01T01:00:00Z", "dag_rel_path": "/dev/null", "bundle_info": {"name": "any-name", "version": "any-version"}, + "sentry_integration": "", } bytes = msgspec.msgpack.encode(_ResponseFrame(0, msg, None)) w.sendall(len(bytes).to_bytes(4, byteorder="big") + bytes) diff --git a/task-sdk/tests/task_sdk/execution_time/test_sentry.py b/task-sdk/tests/task_sdk/execution_time/test_sentry.py index 5aaaada2c5f4c..c4efb929052b4 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_sentry.py +++ b/task-sdk/tests/task_sdk/execution_time/test_sentry.py @@ -57,6 +57,20 @@ def before_send(_): pass +class CustomIntegration: + """ + Integration object to use in tests. + + All instances of this class are equal to each other. + """ + + def __hash__(self): # Implemented to satisfy Ruff. + return 0 + + def __eq__(self, other): + return type(self) is type(other) + + class CustomTransport: pass @@ -170,15 +184,30 @@ def sentry_minimum(self, mock_sentry_sdk): importlib.reload(sentry) - def test_init(self, mock_sentry_sdk, sentry): + def test_prepare_to_enrich_errors(self, mock_sentry_sdk, sentry): assert is_configured(sentry) + + sentry.prepare_to_enrich_errors(executor_integration="") assert mock_sentry_sdk.integrations.logging.ignore_logger.mock_calls == [mock.call("airflow.task")] assert mock_sentry_sdk.init.mock_calls == [ mock.call( integrations=[], default_integrations=False, - before_send=import_string("task_sdk.execution_time.test_sentry.before_send"), - transport=None, + before_send="task_sdk.execution_time.test_sentry.before_send", + ), + ] + + def test_prepare_to_enrich_errors_with_executor_integration(self, mock_sentry_sdk, sentry): + assert is_configured(sentry) + + executor_integration = "task_sdk.execution_time.test_sentry.CustomIntegration" + sentry.prepare_to_enrich_errors(executor_integration) + assert mock_sentry_sdk.integrations.logging.ignore_logger.mock_calls == [mock.call("airflow.task")] + assert mock_sentry_sdk.init.mock_calls == [ + mock.call( + integrations=[import_string("task_sdk.execution_time.test_sentry.CustomIntegration")()], + default_integrations=False, + before_send="task_sdk.execution_time.test_sentry.before_send", ), ] @@ -222,13 +251,14 @@ def test_custom_transport(self, mock_sentry_sdk, sentry_custom_transport): Test transport gets passed to the sentry SDK """ assert is_configured(sentry_custom_transport) + + sentry_custom_transport.prepare_to_enrich_errors(executor_integration="") assert mock_sentry_sdk.integrations.logging.ignore_logger.mock_calls == [mock.call("airflow.task")] assert mock_sentry_sdk.init.mock_calls == [ mock.call( integrations=[], default_integrations=False, - before_send=None, - transport=import_string("task_sdk.execution_time.test_sentry.CustomTransport"), + transport="task_sdk.execution_time.test_sentry.CustomTransport", ), ] @@ -237,7 +267,7 @@ def test_minimum_config(self, mock_sentry_sdk, sentry_minimum): Test before_send doesn't raise an exception when not set """ assert is_configured(sentry_minimum) + + sentry_minimum.prepare_to_enrich_errors(executor_integration="") assert mock_sentry_sdk.integrations.logging.ignore_logger.mock_calls == [mock.call("airflow.task")] - assert mock_sentry_sdk.init.mock_calls == [ - mock.call(integrations=[], before_send=None, transport=None), - ] + assert mock_sentry_sdk.init.mock_calls == [mock.call(integrations=[])] diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 1f01de15b47b2..5d6a2dfbb42ca 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -164,6 +164,7 @@ def test_parse(test_dags_dir: Path, make_ti_context): bundle_info=BundleInfo(name="my-bundle", version=None), ti_context=make_ti_context(), start_date=timezone.utcnow(), + sentry_integration="", ) with patch.dict( @@ -210,6 +211,7 @@ def test_parse_dag_bag(mock_dagbag, test_dags_dir: Path, make_ti_context): bundle_info=BundleInfo(name="my-bundle", version=None), ti_context=make_ti_context(), start_date=timezone.utcnow(), + sentry_integration="", ) with patch.dict( @@ -269,6 +271,7 @@ def test_parse_not_found(test_dags_dir: Path, make_ti_context, dag_id, task_id, bundle_info=BundleInfo(name="my-bundle", version=None), ti_context=make_ti_context(), start_date=timezone.utcnow(), + sentry_integration="", ) log = mock.Mock() @@ -323,6 +326,7 @@ def test_parse_module_in_bundle_root(tmp_path: Path, make_ti_context): bundle_info=BundleInfo(name="my-bundle", version=None), ti_context=make_ti_context(), start_date=timezone.utcnow(), + sentry_integration="", ) with patch.dict( @@ -593,6 +597,7 @@ def test_basic_templated_dag(mocked_parse, make_ti_context, mock_supervisor_comm dag_rel_path="", ti_context=make_ti_context(), start_date=timezone.utcnow(), + sentry_integration="", ) ti = mocked_parse(what, "basic_templated_dag", task) @@ -708,6 +713,7 @@ def execute(self, context): bundle_info=FAKE_BUNDLE, ti_context=make_ti_context(), start_date=timezone.utcnow(), + sentry_integration="", ) mocked_parse(what, "basic_dag", task) @@ -755,6 +761,7 @@ def execute(self, context): bundle_info=FAKE_BUNDLE, ti_context=make_ti_context(), start_date=timezone.utcnow(), + sentry_integration="", ) mocked_parse(what, "basic_dag", task) @@ -802,6 +809,7 @@ def execute(self, context): bundle_info=FAKE_BUNDLE, ti_context=make_ti_context(), start_date=timezone.utcnow(), + sentry_integration="", ) mocked_parse(what, "basic_dag", task) @@ -841,6 +849,7 @@ def execute(self, context): bundle_info=FAKE_BUNDLE, ti_context=make_ti_context(), start_date=timezone.utcnow(), + sentry_integration="", ) mocked_parse(what, "basic_dag", task) @@ -980,6 +989,7 @@ def test_dag_parsing_context(make_ti_context, mock_supervisor_comms, monkeypatch bundle_info=BundleInfo(name="my-bundle", version=None), ti_context=make_ti_context(dag_id=dag_id, run_id="c"), start_date=timezone.utcnow(), + sentry_integration="", ) mock_supervisor_comms._get_response.return_value = what @@ -2891,6 +2901,7 @@ def execute(self, context): bundle_info=FAKE_BUNDLE, ti_context=make_ti_context(), start_date=timezone.utcnow(), + sentry_integration="", ) mock_supervisor_comms._get_response.return_value = what