Skip to content

Commit

Permalink
Emit telemetry to Scarf during DAG run (#1397)
Browse files Browse the repository at this point in the history
Export telemetry related to Cosmos usage to
[Scarf](https://about.scarf.sh/).

This data assists the project maintainers in better understanding how
Cosmos is used. Insights from this telemetry are critical for
prioritizing patches, minor releases, and security fixes. Additionally,
this information supports critical decisions related to the development
road map.

Deployments and individual users can opt out of analytics by setting the
configuration:

```
[cosmos]
enable_telemetry: False
```

As described in the [official
documentation](https://docs.scarf.sh/gateway/#do-not-track), it is also
possible to opt-out by setting one of the following environment
variables:

```commandline
AIRFLOW__COSMOS__ENABLE_TELEMETRY=False
DO_NOT_TRACK=True
SCARF_NO_ANALYTICS=True
```

In addition to Scarf's default data collection, Cosmos collects the
following information when running Cosmos-powered DAGs:

- Cosmos version
- Airflow version
- Python version
- Operating system & machine architecture
- Event type
- DAG hash
- Total tasks
- Total Cosmos tasks

No user-identifiable information (IP included) is stored in Scarf, even
though Scarf infers information from the IP, such as location, and
stores that. The data collection is GDPR compliant.

The Apache Foundation supports this same strategy in many of its
OpenSource projects, including Airflow
([#39510](apache/airflow#39510)).

Example of visualisation of the data via the Scarf UI:

<img width="1235" alt="Screenshot 2024-12-19 at 10 22 59"
src="https://github.com/user-attachments/assets/12b9fbd4-2fdd-4e62-9876-defee3c4d8da"
/>

<img width="1231" alt="Screenshot 2024-12-19 at 10 23 13"
src="https://github.com/user-attachments/assets/f98b849c-99be-4764-9e6d-cb7730da3688"
/>

<img width="1227" alt="Screenshot 2024-12-19 at 10 23 21"
src="https://github.com/user-attachments/assets/421b7581-c641-422a-8469-252ba5a2fd33"
/>

<img width="1237" alt="Screenshot 2024-12-19 at 10 23 28"
src="https://github.com/user-attachments/assets/2e5995a2-fe09-4017-a625-4dd4a60028d0"
/>

<img width="1248" alt="Screenshot 2024-12-19 at 10 23 51"
src="https://github.com/user-attachments/assets/64a8a07f-df56-493c-a3f5-0f5165fd58e8"
/>

<img width="1229" alt="Screenshot 2024-12-19 at 10 24 01"
src="https://github.com/user-attachments/assets/1e3e8b8d-b11d-4b31-8b46-853d541b01b8"
/>

<img width="1240" alt="Screenshot 2024-12-19 at 10 24 11"
src="https://github.com/user-attachments/assets/b5e79cc7-4e2e-44b2-a94b-891b9226b152"
/>

<img width="1241" alt="Screenshot 2024-12-19 at 10 24 20"
src="https://github.com/user-attachments/assets/2fb5d666-d749-416d-acf8-4a3bc94ba014"
/>

<img width="1234" alt="Screenshot 2024-12-19 at 10 24 31"
src="https://github.com/user-attachments/assets/353eb82c-44d2-44ec-87e2-ace7138132f5"
/>

<img width="1245" alt="Screenshot 2024-12-19 at 10 24 39"
src="https://github.com/user-attachments/assets/4a637a2a-14ad-41a8-b7fd-db186ec74357"
/>

<img width="1233" alt="Screenshot 2024-12-19 at 10 24 48"
src="https://github.com/user-attachments/assets/bec4e2b0-49c3-4289-8f9b-3285db9ec40c"
/>


Closes: #1143
  • Loading branch information
tatiana authored Dec 20, 2024
1 parent 4d958f5 commit 103c2ae
Show file tree
Hide file tree
Showing 11 changed files with 477 additions and 5 deletions.
41 changes: 41 additions & 0 deletions PRIVACY_NOTICE.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
Privacy Notice
==============

This project follows the `Privacy Policy of Astronomer <https://www.astronomer.io/privacy/>`_.

Collection of Data
------------------

Astronomer Cosmos integrates `Scarf <https://about.scarf.sh/>`_ to collect basic telemetry data during operation.
This data assists the project maintainers in better understanding how Cosmos is used.
Insights gained from this telemetry are critical for prioritizing patches, minor releases, and
security fixes. Additionally, this information supports key decisions related to the development road map.

Deployments and individual users can opt-out of analytics by setting the configuration:


.. code-block::
[cosmos] enable_telemetry False
As described in the `official documentation <https://docs.scarf.sh/gateway/#do-not-track>`_, it is also possible to opt out by setting one of the following environment variables:

.. code-block::
DO_NOT_TRACK=True
SCARF_NO_ANALYTICS=True
In addition to Scarf's default data collection, Cosmos collect the following information when running Cosmos-powered DAGs:

- Cosmos version
- Airflow version
- Python version
- Operating system & machine architecture
- Event type
- The DAG hash
- Total tasks
- Total Cosmos tasks

No user-identifiable information (IP included) is stored in Scarf.
4 changes: 3 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ _______
Privacy Notice
______________

This project follows `Astronomer's Privacy Policy <https://www.astronomer.io/privacy/>`_
The application and this website collect telemetry to support the project's development. These can be disabled by the end-users.

Read the `Privacy Notice <https://github.com/astronomer/astronomer-cosmos/blob/main/PRIVACY_NOTICE.rst>`_ to learn more about it.

.. Tracking pixel for Scarf
Expand Down
4 changes: 4 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,7 @@ def _missing_value_(cls, value): # type: ignore
TESTABLE_DBT_RESOURCES = {DbtResourceType.MODEL, DbtResourceType.SOURCE, DbtResourceType.SNAPSHOT, DbtResourceType.SEED}

DBT_COMPILE_TASK_ID = "dbt_compile"

TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/astronomer-cosmos/{telemetry_version}/{cosmos_version}/{airflow_version}/{python_version}/{platform_system}/{platform_machine}/{event_type}/{status}/{dag_hash}/{task_count}/{cosmos_task_count}"
TELEMETRY_VERSION = "v1"
TELEMETRY_TIMEOUT = 1.0
Empty file added cosmos/listeners/__init__.py
Empty file.
84 changes: 84 additions & 0 deletions cosmos/listeners/dag_run_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from __future__ import annotations

from airflow.listeners import hookimpl
from airflow.models.dag import DAG
from airflow.models.dagrun import DagRun

from cosmos import telemetry
from cosmos.log import get_logger

logger = get_logger(__name__)


class EventStatus:
SUCCESS = "success"
FAILED = "failed"


DAG_RUN = "dag_run"


def total_cosmos_tasks(dag: DAG) -> int:
"""
Identify if there are any Cosmos DAGs on a given serialized `airflow.serialization.serialized_objects.SerializedDAG`.
The approach is naive, from the perspective it does not take into account subclasses, but it is inexpensive and
works.
"""
cosmos_tasks = 0
for task in dag.task_dict.values():
# In a real Airflow deployment, the following `task` is an instance of
# `airflow.serialization.serialized_objects.SerializedBaseOperator`
# and the only reference to Cosmos is in the _task_module.
# It is suboptimal, but works as of Airflow 2.10
task_module = getattr(task, "_task_module", None) or task.__class__.__module__
if task_module.startswith("cosmos."):
cosmos_tasks += 1
return cosmos_tasks


# @provide_session
@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str) -> None:
logger.debug("Running on_dag_run_success")
# In a real Airflow deployment, the following `serialized_dag` is an instance of
# `airflow.serialization.serialized_objects.SerializedDAG`
# and it is not a subclass of DbtDag, nor contain any references to Cosmos
serialized_dag = dag_run.get_dag()

if not total_cosmos_tasks(serialized_dag):
logger.debug("The DAG does not use Cosmos")
return

additional_telemetry_metrics = {
"dag_hash": dag_run.dag_hash,
"status": EventStatus.SUCCESS,
"task_count": len(serialized_dag.task_ids),
"cosmos_task_count": total_cosmos_tasks(serialized_dag),
}

telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics)
logger.debug("Completed on_dag_run_success")


@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str) -> None:
logger.debug("Running on_dag_run_failed")
# In a real Airflow deployment, the following `serialized_dag` is an instance of
# `airflow.serialization.serialized_objects.SerializedDAG`
# and it is not a subclass of DbtDag, nor contain any references to Cosmos
serialized_dag = dag_run.get_dag()

if not total_cosmos_tasks(serialized_dag):
logger.debug("The DAG does not use Cosmos")
return

additional_telemetry_metrics = {
"dag_hash": dag_run.dag_hash,
"status": EventStatus.FAILED,
"task_count": len(serialized_dag.task_ids),
"cosmos_task_count": total_cosmos_tasks(serialized_dag),
}

telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics)
logger.debug("Completed on_dag_run_failed")
2 changes: 2 additions & 0 deletions cosmos/plugin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from flask import abort, url_for
from flask_appbuilder import AppBuilder, expose

from cosmos.listeners import dag_run_listener
from cosmos.settings import dbt_docs_conn_id, dbt_docs_dir, dbt_docs_index_file_name, in_astro_cloud

if in_astro_cloud:
Expand Down Expand Up @@ -269,3 +270,4 @@ class CosmosPlugin(AirflowPlugin):
"href": conf.get("webserver", "base_url") + "/cosmos/dbt_docs",
}
appbuilder_views = [item]
listeners = [dag_run_listener]
22 changes: 19 additions & 3 deletions cosmos/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,28 @@
remote_target_path = conf.get("cosmos", "remote_target_path", fallback=None)
remote_target_path_conn_id = conf.get("cosmos", "remote_target_path_conn_id", fallback=None)

AIRFLOW_IO_AVAILABLE = Version(airflow_version) >= Version("2.8.0")

# The following environment variable is populated in Astro Cloud
in_astro_cloud = os.getenv("ASTRONOMER_ENVIRONMENT") == "cloud"

try:
LINEAGE_NAMESPACE = conf.get("openlineage", "namespace")
except airflow.exceptions.AirflowConfigException:
LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE)

AIRFLOW_IO_AVAILABLE = Version(airflow_version) >= Version("2.8.0")

# The following environment variable is populated in Astro Cloud
in_astro_cloud = os.getenv("ASTRONOMER_ENVIRONMENT") == "cloud"
def convert_to_boolean(value: str | None) -> bool:
"""
Convert a string that represents a boolean to a Python boolean.
"""
value = str(value).lower().strip()
if value in ("f", "false", "0", "", "none"):
return False
return True


# Telemetry-related settings
enable_telemetry = conf.getboolean("cosmos", "enable_telemetry", fallback=True)
do_not_track = convert_to_boolean(os.getenv("DO_NOT_TRACK"))
no_analytics = convert_to_boolean(os.getenv("SCARF_NO_ANALYTICS"))
77 changes: 77 additions & 0 deletions cosmos/telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from __future__ import annotations

import platform
from urllib import parse
from urllib.parse import urlencode

import httpx
from airflow import __version__ as airflow_version

import cosmos
from cosmos import constants, settings
from cosmos.log import get_logger

logger = get_logger(__name__)


def should_emit() -> bool:
"""
Identify if telemetry metrics should be emitted or not.
"""
return settings.enable_telemetry and not settings.do_not_track and not settings.no_analytics


def collect_standard_usage_metrics() -> dict[str, object]:
"""
Return standard telemetry metrics.
"""
metrics = {
"cosmos_version": cosmos.__version__, # type: ignore[attr-defined]
"airflow_version": parse.quote(airflow_version),
"python_version": platform.python_version(),
"platform_system": platform.system(),
"platform_machine": platform.machine(),
"variables": {},
}
return metrics


def emit_usage_metrics(metrics: dict[str, object]) -> bool:
"""
Emit desired telemetry metrics to remote telemetry endpoint.
The metrics must contain the necessary fields to build the TELEMETRY_URL.
"""
query_string = urlencode(metrics)
telemetry_url = constants.TELEMETRY_URL.format(
**metrics, telemetry_version=constants.TELEMETRY_VERSION, query_string=query_string
)
logger.debug("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics)
response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT, follow_redirects=True)
if not response.is_success:
logger.warning(
"Unable to emit usage metrics to %s. Status code: %s. Message: %s",
telemetry_url,
response.status_code,
response.text,
)
return response.is_success


def emit_usage_metrics_if_enabled(event_type: str, additional_metrics: dict[str, object]) -> bool:
"""
Checks if telemetry should be emitted, fetch standard metrics, complement with custom metrics
and emit them to remote telemetry endpoint.
:returns: If the event was successfully sent to the telemetry backend or not.
"""
if should_emit():
metrics = collect_standard_usage_metrics()
metrics["event_type"] = event_type
metrics["variables"].update(additional_metrics) # type: ignore[attr-defined]
metrics.update(additional_metrics)
is_success = emit_usage_metrics(metrics)
return is_success
else:
logger.debug("Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True.")
return False
6 changes: 5 additions & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,14 @@ _______

`Apache License 2.0 <https://github.com/astronomer/astronomer-cosmos/blob/main/LICENSE>`_


Privacy Notice
______________

This project follows `Astronomer's Privacy Policy <https://www.astronomer.io/privacy/>`_
The application and this website collect telemetry to support the project's development. These can be disabled by the end-users.

Read the `Privacy Notice <https://github.com/astronomer/astronomer-cosmos/blob/main/PRIVACY_NOTICE.rst>`_ to learn more about it.


.. Tracking pixel for Scarf
.. raw:: html
Expand Down
Loading

0 comments on commit 103c2ae

Please sign in to comment.