From 51177602ab7e86b0893176cfafed4dc763f1d9d8 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sat, 14 Jan 2023 22:37:49 +0100 Subject: [PATCH 1/8] improve Stats tags argument --- airflow/stats.py | 56 +++++++++++++++++++++++++----------------------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/airflow/stats.py b/airflow/stats.py index d452b6010078a..268114df75d65 100644 --- a/airflow/stats.py +++ b/airflow/stats.py @@ -54,19 +54,21 @@ class StatsLogger(Protocol): """This class is only used for TypeChecking (for IDEs, mypy, etc).""" @classmethod - def incr(cls, stat: str, count: int = 1, rate: int = 1) -> None: + def incr(cls, stat: str, count: int = 1, rate: int = 1, tags: dict[str, str] | None = None) -> None: """Increment stat.""" @classmethod - def decr(cls, stat: str, count: int = 1, rate: int = 1) -> None: + def decr(cls, stat: str, count: int = 1, rate: int = 1, tags: dict[str, str] | None = None) -> None: """Decrement stat.""" @classmethod - def gauge(cls, stat: str, value: float, rate: int = 1, delta: bool = False) -> None: + def gauge( + cls, stat: str, value: float, rate: int = 1, delta: bool = False, tags: dict[str, str] | None = None + ) -> None: """Gauge stat.""" @classmethod - def timing(cls, stat: str, dt: float | datetime.timedelta) -> None: + def timing(cls, stat: str, dt: float | datetime.timedelta, tags: dict[str, str] | None = None) -> None: """Stats timing.""" @classmethod @@ -156,19 +158,19 @@ class DummyStatsLogger: """If no StatsLogger is configured, DummyStatsLogger is used as a fallback.""" @classmethod - def incr(cls, stat, count=1, rate=1): + def incr(cls, stat, count=1, rate=1, tags=None): """Increment stat.""" @classmethod - def decr(cls, stat, count=1, rate=1): + def decr(cls, stat, count=1, rate=1, tags=None): """Decrement stat.""" @classmethod - def gauge(cls, stat, value, rate=1, delta=False): + def gauge(cls, stat, value, rate=1, delta=False, tags=None): """Gauge stat.""" @classmethod - def timing(cls, stat, dt): + def timing(cls, stat, dt, tags=None): """Stats timing.""" @classmethod @@ -256,35 +258,35 @@ def __init__(self, statsd_client, allow_list_validator=AllowListValidator()): self.allow_list_validator = allow_list_validator @validate_stat - def incr(self, stat, count=1, rate=1): + def incr(self, stat, count=1, rate=1, tags: dict[str, str] | None = None): """Increment stat.""" if self.allow_list_validator.test(stat): return self.statsd.incr(stat, count, rate) return None @validate_stat - def decr(self, stat, count=1, rate=1): + def decr(self, stat, count=1, rate=1, tags: dict[str, str] | None = None): """Decrement stat.""" if self.allow_list_validator.test(stat): return self.statsd.decr(stat, count, rate) return None @validate_stat - def gauge(self, stat, value, rate=1, delta=False): + def gauge(self, stat, value, rate=1, delta=False, tags: dict[str, str] | None = None): """Gauge stat.""" if self.allow_list_validator.test(stat): return self.statsd.gauge(stat, value, rate, delta) return None @validate_stat - def timing(self, stat, dt): + def timing(self, stat, dt, tags: dict[str, str] | None = None): """Stats timing.""" if self.allow_list_validator.test(stat): return self.statsd.timing(stat, dt) return None @validate_stat - def timer(self, stat=None, *args, **kwargs): + def timer(self, stat=None, *args, tags: dict[str, str] | None = None, **kwargs): """Timer metric that can be cancelled.""" if stat and self.allow_list_validator.test(stat): return Timer(self.statsd.timer(stat, *args, **kwargs)) @@ -299,45 +301,45 @@ def __init__(self, dogstatsd_client, allow_list_validator=AllowListValidator()): self.allow_list_validator = allow_list_validator @validate_stat - def incr(self, stat, count=1, rate=1, tags=None): + def incr(self, stat, count=1, rate=1, tags: dict[str, str] | None = None): """Increment stat.""" + tags_list = [f"{key}:{value}" for key, value in tags.items()] if isinstance(tags, dict) else [] if self.allow_list_validator.test(stat): - tags = tags or [] - return self.dogstatsd.increment(metric=stat, value=count, tags=tags, sample_rate=rate) + return self.dogstatsd.increment(metric=stat, value=count, tags=tags_list, sample_rate=rate) return None @validate_stat - def decr(self, stat, count=1, rate=1, tags=None): + def decr(self, stat, count=1, rate=1, tags: dict[str, str] | None = None): """Decrement stat.""" + tags_list = [f"{key}:{value}" for key, value in tags.items()] if isinstance(tags, dict) else [] if self.allow_list_validator.test(stat): - tags = tags or [] - return self.dogstatsd.decrement(metric=stat, value=count, tags=tags, sample_rate=rate) + return self.dogstatsd.decrement(metric=stat, value=count, tags=tags_list, sample_rate=rate) return None @validate_stat - def gauge(self, stat, value, rate=1, delta=False, tags=None): + def gauge(self, stat, value, rate=1, delta=False, tags: dict[str, str] | None = None): """Gauge stat.""" + tags_list = [f"{key}:{value}" for key, value in tags.items()] if isinstance(tags, dict) else [] if self.allow_list_validator.test(stat): - tags = tags or [] - return self.dogstatsd.gauge(metric=stat, value=value, tags=tags, sample_rate=rate) + return self.dogstatsd.gauge(metric=stat, value=value, tags=tags_list, sample_rate=rate) return None @validate_stat - def timing(self, stat, dt: float | datetime.timedelta, tags: list[str] | None = None): + def timing(self, stat, dt: float | datetime.timedelta, tags: dict[str, str] | None = None): """Stats timing.""" + tags_list = [f"{key}:{value}" for key, value in tags.items()] if isinstance(tags, dict) else [] if self.allow_list_validator.test(stat): - tags = tags or [] if isinstance(dt, datetime.timedelta): dt = dt.total_seconds() - return self.dogstatsd.timing(metric=stat, value=dt, tags=tags) + return self.dogstatsd.timing(metric=stat, value=dt, tags=tags_list) return None @validate_stat def timer(self, stat=None, *args, tags=None, **kwargs): """Timer metric that can be cancelled.""" + tags_list = [f"{key}:{value}" for key, value in tags.items()] if isinstance(tags, dict) else [] if stat and self.allow_list_validator.test(stat): - tags = tags or [] - return Timer(self.dogstatsd.timed(stat, *args, tags=tags, **kwargs)) + return Timer(self.dogstatsd.timed(stat, *args, tags=tags_list, **kwargs)) return Timer() From 8f96b42edc086004d144639242f2ebc56e942c2e Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 15 Jan 2023 01:58:42 +0100 Subject: [PATCH 2/8] add some counters tags --- airflow/dag_processing/manager.py | 14 ++++++++------ airflow/dag_processing/processor.py | 16 ++++++++++++---- airflow/jobs/scheduler_job.py | 9 +++++++-- airflow/models/dag.py | 4 +++- airflow/models/taskinstance.py | 15 ++++++++++++--- 5 files changed, 42 insertions(+), 16 deletions(-) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 106f1cdca6795..fef7081041af8 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -946,7 +946,7 @@ def set_file_paths(self, new_file_paths): filtered_processors[file_path] = processor else: self.log.warning("Stopping processor for %s", file_path) - Stats.decr("dag_processing.processes") + Stats.decr("dag_processing.processes", tags={"file_path": file_path, "action": "stop"}) processor.terminate() self._file_stats.pop(file_path) @@ -965,7 +965,7 @@ def wait_until_finished(self): def _collect_results_from_processor(self, processor) -> None: self.log.debug("Processor for %s finished", processor.file_path) - Stats.decr("dag_processing.processes") + Stats.decr("dag_processing.processes", tags={"file_path": processor.file_path, "action": "finish"}) last_finish_time = timezone.utcnow() if processor.result is not None: @@ -1037,7 +1037,7 @@ def start_new_processes(self): ) del self._callback_to_execute[file_path] - Stats.incr("dag_processing.processes") + Stats.incr("dag_processing.processes", tags={"file_path": file_path, "action": "start"}) processor.start() self.log.debug("Started a process (PID: %s) to generate tasks for %s", processor.pid, file_path) @@ -1157,8 +1157,8 @@ def _kill_timed_out_processors(self): processor.pid, processor.start_time.isoformat(), ) - Stats.decr("dag_processing.processes") - Stats.incr("dag_processing.processor_timeouts") + Stats.decr("dag_processing.processes", tags={"file_path": file_path, "action": "timeout"}) + Stats.incr("dag_processing.processor_timeouts", tags={"file_path": file_path}) # Deprecated; may be removed in a future Airflow release. Stats.incr("dag_file_processor_timeouts") processor.kill() @@ -1194,7 +1194,9 @@ def max_runs_reached(self): def terminate(self): """Stops all running processors.""" for processor in self._processors.values(): - Stats.decr("dag_processing.processes") + Stats.decr( + "dag_processing.processes", tags={"file_path": processor.file_path, "action": "terminate"} + ) processor.terminate() def end(self): diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index d577c93ee791f..8437f1d9c5494 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -437,7 +437,9 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None: timestamp=ts, ) sla_misses.append(sla_miss) - Stats.incr("sla_missed") + Stats.incr( + "sla_missed", tags={"dag_id": ti.dag_id, "run_id": ti.run_id, "task_id": ti.task_id} + ) if sla_misses: session.add_all(sla_misses) session.commit() @@ -484,7 +486,13 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None: callback(dag, task_list, blocking_task_list, slas, blocking_tis) notification_sent = True except Exception: - Stats.incr("sla_callback_notification_failure") + Stats.incr( + "sla_callback_notification_failure", + tags={ + "dag_id": dag.dag_id, + "func_name": callback.func_name, # type: ignore[attr-defined] + }, + ) self.log.exception( "Could not call sla_miss_callback(%s) for DAG %s", callback.func_name, # type: ignore[attr-defined] @@ -523,7 +531,7 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None: email_sent = True notification_sent = True except Exception: - Stats.incr("sla_email_notification_failure") + Stats.incr("sla_email_notification_failure", tags={"dag_id": dag.dag_id}) self.log.exception("Could not send SLA Miss email notification for DAG %s", dag.dag_id) # If we sent any notification, update the sla_miss table if notification_sent: @@ -761,7 +769,7 @@ def process_file( dagbag = DagBag(file_path, include_examples=False) except Exception: self.log.exception("Failed at reloading the DAG file %s", file_path) - Stats.incr("dag_file_refresh_error", 1, 1) + Stats.incr("dag_file_refresh_error", 1, 1, tags={"file_path": file_path}) return 0, 0 if len(dagbag.dags) > 0: diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 5977a0cc9bbc6..433c425c1de79 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -686,7 +686,10 @@ def _process_executor_events(self, session: Session) -> int: ti_requeued = ti.queued_by_job_id != self.id or self.executor.has_task(ti) if ti_queued and not ti_requeued: - Stats.incr("scheduler.tasks.killed_externally") + Stats.incr( + "scheduler.tasks.killed_externally", + tags={"dag_id": ti.dag_id, "run_id": ti.run_id, "task_id": ti.task_id}, + ) msg = ( "Executor reports task instance %s finished (%s) although the " "task says its %s. (Info: %s) Was the task killed externally?" @@ -1563,7 +1566,9 @@ def _find_zombies(self) -> None: ) self.log.error("Detected zombie job: %s", request) self.executor.send_callback(request) - Stats.incr("zombies_killed") + Stats.incr( + "zombies_killed", tags={"dag_id": ti.dag_id, "run_id": ti.run_id, "task_id": ti.task_id} + ) @staticmethod def _generate_zombie_message_details(ti: TaskInstance): diff --git a/airflow/models/dag.py b/airflow/models/dag.py index ca5cf94b684b4..4abd789a02e8c 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1327,7 +1327,9 @@ def handle_callback(self, dagrun, success=True, reason=None, session=NEW_SESSION callback(context) except Exception: self.log.exception("failed to invoke dag state update callback") - Stats.incr("dag.callback_exceptions") + Stats.incr( + "dag.callback_exceptions", tags={"dag_id": dagrun.dag_id, "run_id": dagrun.run_id} + ) def get_active_runs(self): """ diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index e8d9fe8ceec47..e2fe24a803184 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1230,7 +1230,12 @@ def check_and_change_state_before_execution( self.pid = None if not ignore_all_deps and not ignore_ti_state and self.state == State.SUCCESS: - Stats.incr("previously_succeeded", 1, 1) + Stats.incr( + "previously_succeeded", + 1, + 1, + tags={"dag_id": self.dag_id, "run_id": self.run_id, "task_id": self.task_id}, + ) if not mark_success: # Firstly find non-runnable and non-requeueable tis. @@ -1533,7 +1538,9 @@ def signal_handler(signum, frame): self.task.post_execute(context=context, result=result) Stats.incr(f"operator_successes_{self.task.task_type}", 1, 1) - Stats.incr("ti_successes") + Stats.incr( + "ti_successes", tags={"dag_id": self.dag_id, "run_id": self.run_id, "task_id": self.task_id} + ) def _run_finished_callback( self, @@ -1797,7 +1804,9 @@ def handle_failure( self.end_date = timezone.utcnow() self.set_duration() Stats.incr(f"operator_failures_{self.operator}") - Stats.incr("ti_failures") + Stats.incr( + "ti_failures", tags={"dag_id": self.dag_id, "run_id": self.run_id, "task_id": self.task_id} + ) if not test_mode: session.add(Log(State.FAILED, self)) From c61777a2c0140c8d82ff7d0c3a2734dd032a3a74 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 15 Jan 2023 20:15:49 +0100 Subject: [PATCH 3/8] add a conf to enable datadog tags --- airflow/config_templates/config.yml | 7 +++++ airflow/config_templates/default_airflow.cfg | 3 ++ airflow/configuration.py | 1 + airflow/stats.py | 31 +++++++++++++++----- 4 files changed, 35 insertions(+), 7 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 1fed76a25cece..3ca84483e6b1b 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -851,6 +851,13 @@ metrics: type: string example: ~ default: "" + statsd_datadog_metrics_tags: + description: | + If set to True, Airflow will add metadata as tags for some of the emitted metrics + version_added: 2.6.0 + type: boolean + example: ~ + default: "False" statsd_custom_client_path: description: | If you want to utilise your own custom StatsD client set the relevant diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 5a6e03e404465..974e1406dfe5d 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -461,6 +461,9 @@ statsd_datadog_enabled = False # List of datadog tags attached to all metrics(e.g: key1:value1,key2:value2) statsd_datadog_tags = +# If set to True, Airflow will add metadata as tags for some of the emitted metrics +statsd_datadog_metrics_tags = False + # If you want to utilise your own custom StatsD client set the relevant # module path below. # Note: The module path must exist on your PYTHONPATH for Airflow to pick it up diff --git a/airflow/configuration.py b/airflow/configuration.py index 48f680ff3052c..387b6d5303482 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -206,6 +206,7 @@ class AirflowConfigParser(ConfigParser): ("metrics", "stat_name_handler"): ("scheduler", "stat_name_handler", "2.0.0"), ("metrics", "statsd_datadog_enabled"): ("scheduler", "statsd_datadog_enabled", "2.0.0"), ("metrics", "statsd_datadog_tags"): ("scheduler", "statsd_datadog_tags", "2.0.0"), + ("metrics", "statsd_datadog_metrics_tags"): ("scheduler", "statsd_datadog_metrics_tags", "2.6.0"), ("metrics", "statsd_custom_client_path"): ("scheduler", "statsd_custom_client_path", "2.0.0"), ("scheduler", "parsing_processes"): ("scheduler", "max_threads", "1.10.14"), ("scheduler", "scheduler_idle_sleep_time"): ("scheduler", "processor_poll_interval", "2.2.0"), diff --git a/airflow/stats.py b/airflow/stats.py index 268114df75d65..990d2c73a73de 100644 --- a/airflow/stats.py +++ b/airflow/stats.py @@ -296,14 +296,18 @@ def timer(self, stat=None, *args, tags: dict[str, str] | None = None, **kwargs): class SafeDogStatsdLogger: """DogStatsd Logger.""" - def __init__(self, dogstatsd_client, allow_list_validator=AllowListValidator()): + def __init__(self, dogstatsd_client, allow_list_validator=AllowListValidator(), metrics_tags=False): self.dogstatsd = dogstatsd_client self.allow_list_validator = allow_list_validator + self.metrics_tags = metrics_tags @validate_stat def incr(self, stat, count=1, rate=1, tags: dict[str, str] | None = None): """Increment stat.""" - tags_list = [f"{key}:{value}" for key, value in tags.items()] if isinstance(tags, dict) else [] + if self.metrics_tags and isinstance(tags, dict): + tags_list = [f"{key}:{value}" for key, value in tags.items()] + else: + tags_list = [] if self.allow_list_validator.test(stat): return self.dogstatsd.increment(metric=stat, value=count, tags=tags_list, sample_rate=rate) return None @@ -311,7 +315,10 @@ def incr(self, stat, count=1, rate=1, tags: dict[str, str] | None = None): @validate_stat def decr(self, stat, count=1, rate=1, tags: dict[str, str] | None = None): """Decrement stat.""" - tags_list = [f"{key}:{value}" for key, value in tags.items()] if isinstance(tags, dict) else [] + if self.metrics_tags and isinstance(tags, dict): + tags_list = [f"{key}:{value}" for key, value in tags.items()] + else: + tags_list = [] if self.allow_list_validator.test(stat): return self.dogstatsd.decrement(metric=stat, value=count, tags=tags_list, sample_rate=rate) return None @@ -319,7 +326,10 @@ def decr(self, stat, count=1, rate=1, tags: dict[str, str] | None = None): @validate_stat def gauge(self, stat, value, rate=1, delta=False, tags: dict[str, str] | None = None): """Gauge stat.""" - tags_list = [f"{key}:{value}" for key, value in tags.items()] if isinstance(tags, dict) else [] + if self.metrics_tags and isinstance(tags, dict): + tags_list = [f"{key}:{value}" for key, value in tags.items()] + else: + tags_list = [] if self.allow_list_validator.test(stat): return self.dogstatsd.gauge(metric=stat, value=value, tags=tags_list, sample_rate=rate) return None @@ -327,7 +337,10 @@ def gauge(self, stat, value, rate=1, delta=False, tags: dict[str, str] | None = @validate_stat def timing(self, stat, dt: float | datetime.timedelta, tags: dict[str, str] | None = None): """Stats timing.""" - tags_list = [f"{key}:{value}" for key, value in tags.items()] if isinstance(tags, dict) else [] + if self.metrics_tags and isinstance(tags, dict): + tags_list = [f"{key}:{value}" for key, value in tags.items()] + else: + tags_list = [] if self.allow_list_validator.test(stat): if isinstance(dt, datetime.timedelta): dt = dt.total_seconds() @@ -337,7 +350,10 @@ def timing(self, stat, dt: float | datetime.timedelta, tags: dict[str, str] | No @validate_stat def timer(self, stat=None, *args, tags=None, **kwargs): """Timer metric that can be cancelled.""" - tags_list = [f"{key}:{value}" for key, value in tags.items()] if isinstance(tags, dict) else [] + if self.metrics_tags and isinstance(tags, dict): + tags_list = [f"{key}:{value}" for key, value in tags.items()] + else: + tags_list = [] if stat and self.allow_list_validator.test(stat): return Timer(self.dogstatsd.timed(stat, *args, tags=tags_list, **kwargs)) return Timer() @@ -409,7 +425,8 @@ def get_dogstatsd_logger(cls): ) dogstatsd_allow_list = conf.get("metrics", "statsd_allow_list", fallback=None) allow_list_validator = AllowListValidator(dogstatsd_allow_list) - return SafeDogStatsdLogger(dogstatsd, allow_list_validator) + datadog_metrics_tags = conf.get("metrics", "statsd_datadog_metrics_tags", fallback=False) + return SafeDogStatsdLogger(dogstatsd, allow_list_validator, datadog_metrics_tags) @classmethod def get_constant_tags(cls): From 9844ce31b468de39d3216e35f344b5cc2c9e0d2d Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 15 Jan 2023 20:45:33 +0100 Subject: [PATCH 4/8] add UTests --- tests/core/test_stats.py | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/tests/core/test_stats.py b/tests/core/test_stats.py index 99b59e4f83d8d..e971ad148f6a3 100644 --- a/tests/core/test_stats.py +++ b/tests/core/test_stats.py @@ -159,10 +159,10 @@ def test_does_send_stats_using_dogstatsd_when_dogstatsd_on(self): metric="empty_key", sample_rate=1, tags=[], value=1 ) - def test_does_send_stats_using_dogstatsd_with_tags(self): - self.dogstatsd.incr("empty_key", 1, 1, ["key1:value1", "key2:value2"]) + def test_does_send_stats_using_dogstatsd_with_tags_without_enabled_metrics_tags(self): + self.dogstatsd.incr("empty_key", 1, 1, {"key1": "value1", "key2": "value2"}) self.dogstatsd_client.increment.assert_called_once_with( - metric="empty_key", sample_rate=1, tags=["key1:value1", "key2:value2"], value=1 + metric="empty_key", sample_rate=1, tags=[], value=1 ) def test_does_send_stats_using_dogstatsd_when_statsd_and_dogstatsd_both_on(self): @@ -265,6 +265,21 @@ def test_not_increment_counter_if_not_allowed(self): self.dogstatsd_client.assert_not_called() +class TestDogStatsWithMetricsTags: + def setup_method(self): + pytest.importorskip("datadog") + from datadog import DogStatsd + + self.dogstatsd_client = Mock(speck=DogStatsd) + self.dogstatsd = SafeDogStatsdLogger(self.dogstatsd_client, metrics_tags=True) + + def test_does_send_stats_using_dogstatsd_with_tags(self): + self.dogstatsd.incr("empty_key", 1, 1, {"key1": "value1", "key2": "value2"}) + self.dogstatsd_client.increment.assert_called_once_with( + metric="empty_key", sample_rate=1, tags=["key1:value1", "key2:value2"], value=1 + ) + + def always_invalid(stat_name): raise InvalidStatsNameException(f"Invalid name: {stat_name}") From a6c3ee1b303662ccc7c7544616b143d07f4f5c3a Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Mon, 16 Jan 2023 00:14:34 +0100 Subject: [PATCH 5/8] update tests --- tests/dag_processing/test_processor.py | 17 +++++++++++++---- tests/jobs/test_scheduler_job.py | 25 ++++++++++++++++++++----- tests/models/test_dag.py | 5 ++++- tests/models/test_taskinstance.py | 4 +++- 4 files changed, 40 insertions(+), 11 deletions(-) diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index 05e1a6ab42105..a662b2d886566 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -242,7 +242,9 @@ def test_dag_file_processor_sla_miss_doesnot_raise_integrity_error(self, mock_st .count() ) assert sla_miss_count == 1 - mock_stats_incr.assert_called_with("sla_missed") + mock_stats_incr.assert_called_with( + "sla_missed", tags={"dag_id": "test_sla_miss", "run_id": "test", "task_id": "dummy"} + ) # Now call manage_slas and see that it runs without errors # because of existing SlaMiss above. # Since this is run often, it's possible that it runs before another @@ -290,7 +292,9 @@ def test_dag_file_processor_sla_miss_continue_checking_the_task_instances_after_ .count() ) assert sla_miss_count == 2 - mock_stats_incr.assert_called_with("sla_missed") + mock_stats_incr.assert_called_with( + "sla_missed", tags={"dag_id": "test_sla_miss", "run_id": "test", "task_id": "dummy"} + ) @mock.patch("airflow.dag_processing.processor.Stats.incr") def test_dag_file_processor_sla_miss_callback_exception(self, mock_stats_incr, create_dummy_dag): @@ -330,7 +334,10 @@ def test_dag_file_processor_sla_miss_callback_exception(self, mock_stats_incr, c sla_callback.func_name, # type: ignore[attr-defined] f"test_sla_miss_{i}", ) - mock_stats_incr.assert_called_once_with("sla_callback_notification_failure") + mock_stats_incr.assert_called_once_with( + "sla_callback_notification_failure", + tags={"dag_id": f"test_sla_miss_{i}", "func_name": sla_callback.func_name}, + ) @mock.patch("airflow.dag_processing.processor.send_email") def test_dag_file_processor_only_collect_emails_from_sla_missed_tasks( @@ -401,7 +408,9 @@ def test_dag_file_processor_sla_miss_email_exception( mock_log.exception.assert_called_once_with( "Could not send SLA Miss email notification for DAG %s", "test_sla_miss" ) - mock_stats_incr.assert_called_once_with("sla_email_notification_failure") + mock_stats_incr.assert_called_once_with( + "sla_email_notification_failure", tags={"dag_id": "test_sla_miss"} + ) def test_dag_file_processor_sla_miss_deleted_task(self, create_dummy_dag): """ diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index b6858cf085774..caf9d8327f7b0 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -255,9 +255,14 @@ def test_process_executor_events(self, mock_stats_incr, mock_task_callback, dag_ self.scheduler_job.executor.callback_sink.send.assert_not_called() mock_stats_incr.assert_has_calls( [ - mock.call("scheduler.tasks.killed_externally"), + mock.call( + "scheduler.tasks.killed_externally", + tags={"dag_id": dag_id, "run_id": ti1.run_id, "task_id": ti1.task_id}, + ), mock.call("operator_failures_EmptyOperator"), - mock.call("ti_failures"), + mock.call( + "ti_failures", tags={"dag_id": dag_id, "run_id": ti1.run_id, "task_id": ti1.task_id} + ), ], any_order=True, ) @@ -312,9 +317,12 @@ def test_process_executor_events_with_no_callback(self, mock_stats_incr, mock_ta self.scheduler_job.executor.callback_sink.send.assert_not_called() mock_stats_incr.assert_has_calls( [ - mock.call("scheduler.tasks.killed_externally"), + mock.call( + "scheduler.tasks.killed_externally", + tags={"dag_id": dag_id, "run_id": "dr2", "task_id": task_id_1}, + ), mock.call("operator_failures_EmptyOperator"), - mock.call("ti_failures"), + mock.call("ti_failures", tags={"dag_id": dag_id, "run_id": "dr2", "task_id": task_id_1}), ], any_order=True, ) @@ -360,7 +368,14 @@ def test_process_executor_events_with_callback(self, mock_stats_incr, mock_task_ ) self.scheduler_job.executor.callback_sink.send.assert_called_once_with(task_callback) self.scheduler_job.executor.callback_sink.reset_mock() - mock_stats_incr.assert_called_once_with("scheduler.tasks.killed_externally") + mock_stats_incr.assert_called_once_with( + "scheduler.tasks.killed_externally", + tags={ + "dag_id": "test_process_executor_events_with_callback", + "run_id": "test", + "task_id": "dummy_task", + }, + ) @mock.patch("airflow.jobs.scheduler_job.TaskCallbackRequest") @mock.patch("airflow.jobs.scheduler_job.Stats.incr") diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 9f1ba1e743062..7d3b2bd5f0f6d 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -1324,7 +1324,10 @@ def test_dag_handle_callback_crash(self, mock_stats): dag.handle_callback(dag_run, success=False) dag.handle_callback(dag_run, success=True) - mock_stats.incr.assert_called_with("dag.callback_exceptions") + mock_stats.incr.assert_called_with( + "dag.callback_exceptions", + tags={"dag_id": "test_dag_callback_crash", "run_id": "manual__2015-01-02T00:00:00+00:00"}, + ) dag.clear() self._clean_up(dag_id) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 45ea3d75e9c46..a7c0d90bcea75 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -2506,7 +2506,9 @@ def test_handle_failure_no_task(self, Stats_incr, dag_maker): # Check 'ti.try_number' is bumped to 2. This is try_number for next run assert ti.try_number == 2 - Stats_incr.assert_any_call("ti_failures") + Stats_incr.assert_any_call( + "ti_failures", tags={"dag_id": ti.dag_id, "run_id": ti.run_id, "task_id": ti.task_id} + ) Stats_incr.assert_any_call("operator_failures_EmptyOperator") def test_handle_failure_task_undefined(self, create_task_instance): From 1221252adce93e0a716ba190d994c8783bb8ea21 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Tue, 17 Jan 2023 00:05:56 +0100 Subject: [PATCH 6/8] change the default value to True --- airflow/config_templates/config.yml | 4 ++-- airflow/config_templates/default_airflow.cfg | 4 ++-- airflow/stats.py | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 3ca84483e6b1b..a7fb39881e511 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -853,11 +853,11 @@ metrics: default: "" statsd_datadog_metrics_tags: description: | - If set to True, Airflow will add metadata as tags for some of the emitted metrics + Set to False to disable metadata tags for some of the emitted metrics version_added: 2.6.0 type: boolean example: ~ - default: "False" + default: "True" statsd_custom_client_path: description: | If you want to utilise your own custom StatsD client set the relevant diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 974e1406dfe5d..d99075a01035a 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -461,8 +461,8 @@ statsd_datadog_enabled = False # List of datadog tags attached to all metrics(e.g: key1:value1,key2:value2) statsd_datadog_tags = -# If set to True, Airflow will add metadata as tags for some of the emitted metrics -statsd_datadog_metrics_tags = False +# Set to False to disable metadata tags for some of the emitted metrics +statsd_datadog_metrics_tags = True # If you want to utilise your own custom StatsD client set the relevant # module path below. diff --git a/airflow/stats.py b/airflow/stats.py index 990d2c73a73de..d2708f1e134ed 100644 --- a/airflow/stats.py +++ b/airflow/stats.py @@ -425,7 +425,7 @@ def get_dogstatsd_logger(cls): ) dogstatsd_allow_list = conf.get("metrics", "statsd_allow_list", fallback=None) allow_list_validator = AllowListValidator(dogstatsd_allow_list) - datadog_metrics_tags = conf.get("metrics", "statsd_datadog_metrics_tags", fallback=False) + datadog_metrics_tags = conf.get("metrics", "statsd_datadog_metrics_tags", fallback=True) return SafeDogStatsdLogger(dogstatsd, allow_list_validator, datadog_metrics_tags) @classmethod From 73c8596a29161c358a3d22c2023cb24ef81335aa Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Wed, 18 Jan 2023 21:10:10 +0100 Subject: [PATCH 7/8] fix callback name attribute --- airflow/dag_processing/processor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 8437f1d9c5494..5c9b7ebfd6e8d 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -490,12 +490,12 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None: "sla_callback_notification_failure", tags={ "dag_id": dag.dag_id, - "func_name": callback.func_name, # type: ignore[attr-defined] + "func_name": callback.__name__, }, ) self.log.exception( "Could not call sla_miss_callback(%s) for DAG %s", - callback.func_name, # type: ignore[attr-defined] + callback.__name__, dag.dag_id, ) email_content = f"""\ From ce41d1e8369849b931c77c8d60fc4788e345028c Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Wed, 18 Jan 2023 22:05:20 +0100 Subject: [PATCH 8/8] fix UTest --- tests/dag_processing/test_processor.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index a662b2d886566..8bad4c8cb85ac 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -304,7 +304,9 @@ def test_dag_file_processor_sla_miss_callback_exception(self, mock_stats_incr, c """ session = settings.Session() - sla_callback = MagicMock(side_effect=RuntimeError("Could not call function")) + sla_callback = MagicMock( + __name__="function_name", side_effect=RuntimeError("Could not call function") + ) test_start_date = timezone.utcnow() - datetime.timedelta(days=1) @@ -331,12 +333,12 @@ def test_dag_file_processor_sla_miss_callback_exception(self, mock_stats_incr, c assert sla_callback.called mock_log.exception.assert_called_once_with( "Could not call sla_miss_callback(%s) for DAG %s", - sla_callback.func_name, # type: ignore[attr-defined] + sla_callback.__name__, f"test_sla_miss_{i}", ) mock_stats_incr.assert_called_once_with( "sla_callback_notification_failure", - tags={"dag_id": f"test_sla_miss_{i}", "func_name": sla_callback.func_name}, + tags={"dag_id": f"test_sla_miss_{i}", "func_name": sla_callback.__name__}, ) @mock.patch("airflow.dag_processing.processor.send_email")