Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit DataDog statsd metrics with metadata tags #28961

Merged
merged 10 commits into from
Jan 20, 2023
Merged
7 changes: 7 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,13 @@ metrics:
type: string
example: ~
default: ""
statsd_datadog_metrics_tags:
description: |
Set to False to disable metadata tags for some of the emitted metrics
version_added: 2.6.0
type: boolean
example: ~
default: "True"
statsd_custom_client_path:
description: |
If you want to utilise your own custom StatsD client set the relevant
Expand Down
3 changes: 3 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,9 @@ statsd_datadog_enabled = False
# List of datadog tags attached to all metrics(e.g: key1:value1,key2:value2)
statsd_datadog_tags =

# Set to False to disable metadata tags for some of the emitted metrics
statsd_datadog_metrics_tags = True

# If you want to utilise your own custom StatsD client set the relevant
# module path below.
# Note: The module path must exist on your PYTHONPATH for Airflow to pick it up
Expand Down
1 change: 1 addition & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ class AirflowConfigParser(ConfigParser):
("metrics", "stat_name_handler"): ("scheduler", "stat_name_handler", "2.0.0"),
("metrics", "statsd_datadog_enabled"): ("scheduler", "statsd_datadog_enabled", "2.0.0"),
("metrics", "statsd_datadog_tags"): ("scheduler", "statsd_datadog_tags", "2.0.0"),
("metrics", "statsd_datadog_metrics_tags"): ("scheduler", "statsd_datadog_metrics_tags", "2.6.0"),
("metrics", "statsd_custom_client_path"): ("scheduler", "statsd_custom_client_path", "2.0.0"),
("scheduler", "parsing_processes"): ("scheduler", "max_threads", "1.10.14"),
("scheduler", "scheduler_idle_sleep_time"): ("scheduler", "processor_poll_interval", "2.2.0"),
Expand Down
14 changes: 8 additions & 6 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ def set_file_paths(self, new_file_paths):
filtered_processors[file_path] = processor
else:
self.log.warning("Stopping processor for %s", file_path)
Stats.decr("dag_processing.processes")
Stats.decr("dag_processing.processes", tags={"file_path": file_path, "action": "stop"})
processor.terminate()
self._file_stats.pop(file_path)

Expand All @@ -965,7 +965,7 @@ def wait_until_finished(self):

def _collect_results_from_processor(self, processor) -> None:
self.log.debug("Processor for %s finished", processor.file_path)
Stats.decr("dag_processing.processes")
Stats.decr("dag_processing.processes", tags={"file_path": processor.file_path, "action": "finish"})
last_finish_time = timezone.utcnow()

if processor.result is not None:
Expand Down Expand Up @@ -1037,7 +1037,7 @@ def start_new_processes(self):
)

del self._callback_to_execute[file_path]
Stats.incr("dag_processing.processes")
Stats.incr("dag_processing.processes", tags={"file_path": file_path, "action": "start"})

processor.start()
self.log.debug("Started a process (PID: %s) to generate tasks for %s", processor.pid, file_path)
Expand Down Expand Up @@ -1157,8 +1157,8 @@ def _kill_timed_out_processors(self):
processor.pid,
processor.start_time.isoformat(),
)
Stats.decr("dag_processing.processes")
Stats.incr("dag_processing.processor_timeouts")
Stats.decr("dag_processing.processes", tags={"file_path": file_path, "action": "timeout"})
Stats.incr("dag_processing.processor_timeouts", tags={"file_path": file_path})
# Deprecated; may be removed in a future Airflow release.
Stats.incr("dag_file_processor_timeouts")
processor.kill()
Expand Down Expand Up @@ -1194,7 +1194,9 @@ def max_runs_reached(self):
def terminate(self):
"""Stops all running processors."""
for processor in self._processors.values():
Stats.decr("dag_processing.processes")
Stats.decr(
"dag_processing.processes", tags={"file_path": processor.file_path, "action": "terminate"}
)
processor.terminate()

def end(self):
Expand Down
18 changes: 13 additions & 5 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,9 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
timestamp=ts,
)
sla_misses.append(sla_miss)
Stats.incr("sla_missed")
Stats.incr(
"sla_missed", tags={"dag_id": ti.dag_id, "run_id": ti.run_id, "task_id": ti.task_id}
)
if sla_misses:
session.add_all(sla_misses)
session.commit()
Expand Down Expand Up @@ -484,10 +486,16 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
callback(dag, task_list, blocking_task_list, slas, blocking_tis)
notification_sent = True
except Exception:
Stats.incr("sla_callback_notification_failure")
Stats.incr(
"sla_callback_notification_failure",
tags={
"dag_id": dag.dag_id,
"func_name": callback.__name__,
},
)
self.log.exception(
"Could not call sla_miss_callback(%s) for DAG %s",
callback.func_name, # type: ignore[attr-defined]
callback.__name__,
dag.dag_id,
)
email_content = f"""\
Expand Down Expand Up @@ -523,7 +531,7 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
email_sent = True
notification_sent = True
except Exception:
Stats.incr("sla_email_notification_failure")
Stats.incr("sla_email_notification_failure", tags={"dag_id": dag.dag_id})
self.log.exception("Could not send SLA Miss email notification for DAG %s", dag.dag_id)
# If we sent any notification, update the sla_miss table
if notification_sent:
Expand Down Expand Up @@ -761,7 +769,7 @@ def process_file(
dagbag = DagBag(file_path, include_examples=False)
except Exception:
self.log.exception("Failed at reloading the DAG file %s", file_path)
Stats.incr("dag_file_refresh_error", 1, 1)
Stats.incr("dag_file_refresh_error", 1, 1, tags={"file_path": file_path})
return 0, 0

if len(dagbag.dags) > 0:
Expand Down
9 changes: 7 additions & 2 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,10 @@ def _process_executor_events(self, session: Session) -> int:
ti_requeued = ti.queued_by_job_id != self.id or self.executor.has_task(ti)

if ti_queued and not ti_requeued:
Stats.incr("scheduler.tasks.killed_externally")
Stats.incr(
"scheduler.tasks.killed_externally",
tags={"dag_id": ti.dag_id, "run_id": ti.run_id, "task_id": ti.task_id},
)
msg = (
"Executor reports task instance %s finished (%s) although the "
"task says its %s. (Info: %s) Was the task killed externally?"
Expand Down Expand Up @@ -1564,7 +1567,9 @@ def _find_zombies(self) -> None:
)
self.log.error("Detected zombie job: %s", request)
self.executor.send_callback(request)
Stats.incr("zombies_killed")
Stats.incr(
"zombies_killed", tags={"dag_id": ti.dag_id, "run_id": ti.run_id, "task_id": ti.task_id}
)

@staticmethod
def _generate_zombie_message_details(ti: TaskInstance):
Expand Down
4 changes: 3 additions & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1327,7 +1327,9 @@ def handle_callback(self, dagrun, success=True, reason=None, session=NEW_SESSION
callback(context)
except Exception:
self.log.exception("failed to invoke dag state update callback")
Stats.incr("dag.callback_exceptions")
Stats.incr(
"dag.callback_exceptions", tags={"dag_id": dagrun.dag_id, "run_id": dagrun.run_id}
)

def get_active_runs(self):
"""
Expand Down
15 changes: 12 additions & 3 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1230,7 +1230,12 @@ def check_and_change_state_before_execution(
self.pid = None

if not ignore_all_deps and not ignore_ti_state and self.state == State.SUCCESS:
Stats.incr("previously_succeeded", 1, 1)
Stats.incr(
"previously_succeeded",
1,
1,
tags={"dag_id": self.dag_id, "run_id": self.run_id, "task_id": self.task_id},
)

if not mark_success:
# Firstly find non-runnable and non-requeueable tis.
Expand Down Expand Up @@ -1533,7 +1538,9 @@ def signal_handler(signum, frame):
self.task.post_execute(context=context, result=result)

Stats.incr(f"operator_successes_{self.task.task_type}", 1, 1)
Stats.incr("ti_successes")
Stats.incr(
"ti_successes", tags={"dag_id": self.dag_id, "run_id": self.run_id, "task_id": self.task_id}
)

def _run_finished_callback(
self,
Expand Down Expand Up @@ -1797,7 +1804,9 @@ def handle_failure(
self.end_date = timezone.utcnow()
self.set_duration()
Stats.incr(f"operator_failures_{self.operator}")
Stats.incr("ti_failures")
Stats.incr(
"ti_failures", tags={"dag_id": self.dag_id, "run_id": self.run_id, "task_id": self.task_id}
)
if not test_mode:
session.add(Log(State.FAILED, self))

Expand Down
77 changes: 48 additions & 29 deletions airflow/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,21 @@ class StatsLogger(Protocol):
"""This class is only used for TypeChecking (for IDEs, mypy, etc)."""

@classmethod
def incr(cls, stat: str, count: int = 1, rate: int = 1) -> None:
def incr(cls, stat: str, count: int = 1, rate: int = 1, tags: dict[str, str] | None = None) -> None:
"""Increment stat."""

@classmethod
def decr(cls, stat: str, count: int = 1, rate: int = 1) -> None:
def decr(cls, stat: str, count: int = 1, rate: int = 1, tags: dict[str, str] | None = None) -> None:
"""Decrement stat."""

@classmethod
def gauge(cls, stat: str, value: float, rate: int = 1, delta: bool = False) -> None:
def gauge(
cls, stat: str, value: float, rate: int = 1, delta: bool = False, tags: dict[str, str] | None = None
) -> None:
"""Gauge stat."""

@classmethod
def timing(cls, stat: str, dt: float | datetime.timedelta) -> None:
def timing(cls, stat: str, dt: float | datetime.timedelta, tags: dict[str, str] | None = None) -> None:
"""Stats timing."""

@classmethod
Expand Down Expand Up @@ -156,19 +158,19 @@ class DummyStatsLogger:
"""If no StatsLogger is configured, DummyStatsLogger is used as a fallback."""

@classmethod
def incr(cls, stat, count=1, rate=1):
def incr(cls, stat, count=1, rate=1, tags=None):
"""Increment stat."""

@classmethod
def decr(cls, stat, count=1, rate=1):
def decr(cls, stat, count=1, rate=1, tags=None):
"""Decrement stat."""

@classmethod
def gauge(cls, stat, value, rate=1, delta=False):
def gauge(cls, stat, value, rate=1, delta=False, tags=None):
"""Gauge stat."""

@classmethod
def timing(cls, stat, dt):
def timing(cls, stat, dt, tags=None):
"""Stats timing."""

@classmethod
Expand Down Expand Up @@ -256,35 +258,35 @@ def __init__(self, statsd_client, allow_list_validator=AllowListValidator()):
self.allow_list_validator = allow_list_validator

@validate_stat
def incr(self, stat, count=1, rate=1):
def incr(self, stat, count=1, rate=1, tags: dict[str, str] | None = None):
"""Increment stat."""
if self.allow_list_validator.test(stat):
return self.statsd.incr(stat, count, rate)
return None

@validate_stat
def decr(self, stat, count=1, rate=1):
def decr(self, stat, count=1, rate=1, tags: dict[str, str] | None = None):
"""Decrement stat."""
if self.allow_list_validator.test(stat):
return self.statsd.decr(stat, count, rate)
return None

@validate_stat
def gauge(self, stat, value, rate=1, delta=False):
def gauge(self, stat, value, rate=1, delta=False, tags: dict[str, str] | None = None):
"""Gauge stat."""
if self.allow_list_validator.test(stat):
return self.statsd.gauge(stat, value, rate, delta)
return None

@validate_stat
def timing(self, stat, dt):
def timing(self, stat, dt, tags: dict[str, str] | None = None):
"""Stats timing."""
if self.allow_list_validator.test(stat):
return self.statsd.timing(stat, dt)
return None

@validate_stat
def timer(self, stat=None, *args, **kwargs):
def timer(self, stat=None, *args, tags: dict[str, str] | None = None, **kwargs):
"""Timer metric that can be cancelled."""
if stat and self.allow_list_validator.test(stat):
return Timer(self.statsd.timer(stat, *args, **kwargs))
Expand All @@ -294,50 +296,66 @@ def timer(self, stat=None, *args, **kwargs):
class SafeDogStatsdLogger:
"""DogStatsd Logger."""

def __init__(self, dogstatsd_client, allow_list_validator=AllowListValidator()):
def __init__(self, dogstatsd_client, allow_list_validator=AllowListValidator(), metrics_tags=False):
self.dogstatsd = dogstatsd_client
self.allow_list_validator = allow_list_validator
self.metrics_tags = metrics_tags

@validate_stat
def incr(self, stat, count=1, rate=1, tags=None):
def incr(self, stat, count=1, rate=1, tags: dict[str, str] | None = None):
"""Increment stat."""
if self.metrics_tags and isinstance(tags, dict):
tags_list = [f"{key}:{value}" for key, value in tags.items()]
else:
tags_list = []
if self.allow_list_validator.test(stat):
tags = tags or []
return self.dogstatsd.increment(metric=stat, value=count, tags=tags, sample_rate=rate)
return self.dogstatsd.increment(metric=stat, value=count, tags=tags_list, sample_rate=rate)
return None

@validate_stat
def decr(self, stat, count=1, rate=1, tags=None):
def decr(self, stat, count=1, rate=1, tags: dict[str, str] | None = None):
"""Decrement stat."""
if self.metrics_tags and isinstance(tags, dict):
tags_list = [f"{key}:{value}" for key, value in tags.items()]
else:
tags_list = []
if self.allow_list_validator.test(stat):
tags = tags or []
return self.dogstatsd.decrement(metric=stat, value=count, tags=tags, sample_rate=rate)
return self.dogstatsd.decrement(metric=stat, value=count, tags=tags_list, sample_rate=rate)
return None

@validate_stat
def gauge(self, stat, value, rate=1, delta=False, tags=None):
def gauge(self, stat, value, rate=1, delta=False, tags: dict[str, str] | None = None):
"""Gauge stat."""
if self.metrics_tags and isinstance(tags, dict):
tags_list = [f"{key}:{value}" for key, value in tags.items()]
else:
tags_list = []
if self.allow_list_validator.test(stat):
tags = tags or []
return self.dogstatsd.gauge(metric=stat, value=value, tags=tags, sample_rate=rate)
return self.dogstatsd.gauge(metric=stat, value=value, tags=tags_list, sample_rate=rate)
return None

@validate_stat
def timing(self, stat, dt: float | datetime.timedelta, tags: list[str] | None = None):
def timing(self, stat, dt: float | datetime.timedelta, tags: dict[str, str] | None = None):
"""Stats timing."""
if self.metrics_tags and isinstance(tags, dict):
tags_list = [f"{key}:{value}" for key, value in tags.items()]
else:
tags_list = []
if self.allow_list_validator.test(stat):
tags = tags or []
if isinstance(dt, datetime.timedelta):
dt = dt.total_seconds()
return self.dogstatsd.timing(metric=stat, value=dt, tags=tags)
return self.dogstatsd.timing(metric=stat, value=dt, tags=tags_list)
return None

@validate_stat
def timer(self, stat=None, *args, tags=None, **kwargs):
"""Timer metric that can be cancelled."""
if self.metrics_tags and isinstance(tags, dict):
tags_list = [f"{key}:{value}" for key, value in tags.items()]
else:
tags_list = []
if stat and self.allow_list_validator.test(stat):
tags = tags or []
return Timer(self.dogstatsd.timed(stat, *args, tags=tags, **kwargs))
return Timer(self.dogstatsd.timed(stat, *args, tags=tags_list, **kwargs))
return Timer()


Expand Down Expand Up @@ -407,7 +425,8 @@ def get_dogstatsd_logger(cls):
)
dogstatsd_allow_list = conf.get("metrics", "statsd_allow_list", fallback=None)
allow_list_validator = AllowListValidator(dogstatsd_allow_list)
return SafeDogStatsdLogger(dogstatsd, allow_list_validator)
datadog_metrics_tags = conf.get("metrics", "statsd_datadog_metrics_tags", fallback=True)
return SafeDogStatsdLogger(dogstatsd, allow_list_validator, datadog_metrics_tags)

@classmethod
def get_constant_tags(cls):
Expand Down
Loading