From 9ec9eb79a0cc845d86e7380c73269d2ee1d3c210 Mon Sep 17 00:00:00 2001 From: Maxim Martynov Date: Tue, 23 Jul 2024 15:43:34 +0300 Subject: [PATCH] openlineage: Add AirflowRunFacet for dag runEvents (#40854) --- .../facets/AirflowDagRunFacet.json | 105 ++++++++++++++++++ .../providers/openlineage/plugins/adapter.py | 5 +- .../providers/openlineage/plugins/facets.py | 10 +- .../providers/openlineage/plugins/listener.py | 2 +- airflow/providers/openlineage/utils/utils.py | 12 ++ .../openlineage/plugins/test_adapter.py | 32 ++++-- .../openlineage/plugins/test_facets.py | 21 +++- .../providers/openlineage/utils/test_utils.py | 58 +++++++++- 8 files changed, 231 insertions(+), 14 deletions(-) create mode 100644 airflow/providers/openlineage/facets/AirflowDagRunFacet.json diff --git a/airflow/providers/openlineage/facets/AirflowDagRunFacet.json b/airflow/providers/openlineage/facets/AirflowDagRunFacet.json new file mode 100644 index 0000000000000..165a8e6a59855 --- /dev/null +++ b/airflow/providers/openlineage/facets/AirflowDagRunFacet.json @@ -0,0 +1,105 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$defs": { + "AirflowDagRunFacet": { + "allOf": [ + { + "$ref": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet" + }, + { + "type": "object", + "properties": { + "dag": { + "$ref": "#/$defs/DAG" + }, + "dagRun": { + "$ref": "#/$defs/DagRun" + } + }, + "required": [ + "dag", + "dagRun" + ] + } + ] + }, + "DAG": { + "type": "object", + "properties": { + "dag_id": { + "type": "string" + }, + "description": { + "type": "string" + }, + "owner": { + "type": "string" + }, + "schedule_interval": { + "type": "string" + }, + "start_date": { + "type": "string", + "format": "date-time" + }, + "tags": { + "type": "string" + }, + "timetable": { + "description": "Describes timetable (successor of schedule_interval)", + "type": "object", + "additionalProperties": true + } + }, + "additionalProperties": true, + "required": [ + "dag_id", + "start_date" + ] + }, + "DagRun": { + "type": "object", + "properties": { + "conf": { + "type": "object", + "additionalProperties": true + }, + "dag_id": { + "type": "string" + }, + "data_interval_start": { + "type": "string", + "format": "date-time" + }, + "data_interval_end": { + "type": "string", + "format": "date-time" + }, + "external_trigger": { + "type": "boolean" + }, + "run_id": { + "type": "string" + }, + "run_type": { + "type": "string" + }, + "start_date": { + "type": "string", + "format": "date-time" + } + }, + "additionalProperties": true, + "required": [ + "dag_id", + "run_id" + ] + } + }, + "type": "object", + "properties": { + "airflowDagRun": { + "$ref": "#/$defs/AirflowDagRunFacet" + } + } +} diff --git a/airflow/providers/openlineage/plugins/adapter.py b/airflow/providers/openlineage/plugins/adapter.py index 9e2613d8db9ed..8e1d924bb979e 100644 --- a/airflow/providers/openlineage/plugins/adapter.py +++ b/airflow/providers/openlineage/plugins/adapter.py @@ -40,6 +40,7 @@ from airflow.providers.openlineage import __version__ as OPENLINEAGE_PROVIDER_VERSION, conf from airflow.providers.openlineage.utils.utils import ( OpenLineageRedactor, + get_airflow_dag_run_facet, get_airflow_state_run_facet, ) from airflow.stats import Stats @@ -334,6 +335,7 @@ def dag_started( job_facets: dict[str, BaseFacet] | None = None, # Custom job facets ): try: + owner = [x.strip() for x in dag_run.dag.owner.split(",")] if dag_run.dag else None event = RunEvent( eventType=RunState.START, eventTime=dag_run.start_date.isoformat(), @@ -341,7 +343,7 @@ def dag_started( job_name=dag_run.dag_id, job_type=_JOB_TYPE_DAG, job_description=dag_run.dag.description if dag_run.dag else None, - owners=[x.strip() for x in dag_run.dag.owner.split(",")] if dag_run.dag else None, + owners=owner, job_facets=job_facets, ), run=self._build_run( @@ -352,6 +354,7 @@ def dag_started( job_name=dag_run.dag_id, nominal_start_time=nominal_start_time, nominal_end_time=nominal_end_time, + run_facets=get_airflow_dag_run_facet(dag_run), ), inputs=[], outputs=[], diff --git a/airflow/providers/openlineage/plugins/facets.py b/airflow/providers/openlineage/plugins/facets.py index fb642579fc766..d282c72ac8133 100644 --- a/airflow/providers/openlineage/plugins/facets.py +++ b/airflow/providers/openlineage/plugins/facets.py @@ -91,7 +91,7 @@ class AirflowStateRunFacet(BaseFacet): @define(slots=False) class AirflowRunFacet(BaseFacet): - """Composite Airflow run facet.""" + """Composite Airflow task run facet.""" dag: dict dagRun: dict @@ -100,6 +100,14 @@ class AirflowRunFacet(BaseFacet): taskUuid: str +@define(slots=False) +class AirflowDagRunFacet(BaseFacet): + """Composite Airflow DAG run facet.""" + + dag: dict + dagRun: dict + + @define(slots=False) class UnknownOperatorInstance(RedactMixin): """ diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index 8798c542c1cd2..a552cb283b88a 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -420,7 +420,7 @@ def on_dag_run_running(self, dag_run: DagRun, msg: str) -> None: nominal_end_time=data_interval_end, # AirflowJobFacet should be created outside ProcessPoolExecutor that pickles objects, # as it causes lack of some TaskGroup attributes and crashes event emission. - job_facets={**get_airflow_job_facet(dag_run=dag_run)}, + job_facets=get_airflow_job_facet(dag_run=dag_run), ) @hookimpl diff --git a/airflow/providers/openlineage/utils/utils.py b/airflow/providers/openlineage/utils/utils.py index a36f44b3d5220..171f35a775884 100644 --- a/airflow/providers/openlineage/utils/utils.py +++ b/airflow/providers/openlineage/utils/utils.py @@ -37,6 +37,7 @@ from airflow.models import DAG, BaseOperator, MappedOperator from airflow.providers.openlineage import conf from airflow.providers.openlineage.plugins.facets import ( + AirflowDagRunFacet, AirflowJobFacet, AirflowMappedTaskRunFacet, AirflowRunFacet, @@ -345,6 +346,17 @@ class TaskGroupInfo(InfoJsonEncodable): ] +def get_airflow_dag_run_facet(dag_run: DagRun) -> dict[str, BaseFacet]: + if not dag_run.dag: + return {} + return { + "airflowDagRun": AirflowDagRunFacet( + dag=DagInfo(dag_run.dag), + dagRun=DagRunInfo(dag_run), + ) + } + + def get_airflow_run_facet( dag_run: DagRun, dag: DAG, diff --git a/tests/providers/openlineage/plugins/test_adapter.py b/tests/providers/openlineage/plugins/test_adapter.py index e588b25dcc23e..fb60b5cc8c504 100644 --- a/tests/providers/openlineage/plugins/test_adapter.py +++ b/tests/providers/openlineage/plugins/test_adapter.py @@ -43,14 +43,10 @@ from airflow.models.taskinstance import TaskInstance, TaskInstanceState from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator -from airflow.providers.openlineage.conf import ( - namespace, -) +from airflow.providers.openlineage.conf import namespace from airflow.providers.openlineage.extractors import OperatorLineage from airflow.providers.openlineage.plugins.adapter import _PRODUCER, OpenLineageAdapter -from airflow.providers.openlineage.plugins.facets import ( - AirflowStateRunFacet, -) +from airflow.providers.openlineage.plugins.facets import AirflowDagRunFacet, AirflowStateRunFacet from airflow.providers.openlineage.utils.utils import get_airflow_job_facet from airflow.utils.task_group import TaskGroup from tests.test_utils.config import conf_vars @@ -518,6 +514,7 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat run_id=run_id, start_date=event_time, execution_date=event_time, + data_interval=(event_time, event_time), ) dag_run.dag = dag generate_static_uuid.return_value = random_uuid @@ -544,7 +541,28 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat "nominalTime": NominalTimeRunFacet( nominalStartTime=event_time.isoformat(), nominalEndTime=event_time.isoformat(), - ) + ), + "airflowDagRun": AirflowDagRunFacet( + dag={ + "timetable": {"delta": 86400.0}, + "dag_id": dag_id, + "description": "dag desc", + "owner": "airflow", + "schedule_interval": "86400.0 seconds", + "start_date": "2024-06-01T00:00:00+00:00", + "tags": [], + }, + dagRun={ + "conf": {}, + "dag_id": "dag_id", + "data_interval_start": event_time.isoformat(), + "data_interval_end": event_time.isoformat(), + "external_trigger": None, + "run_id": run_id, + "run_type": None, + "start_date": event_time.isoformat(), + }, + ), }, ), job=Job( diff --git a/tests/providers/openlineage/plugins/test_facets.py b/tests/providers/openlineage/plugins/test_facets.py index dd4e5851f2801..73eaebd0c03f6 100644 --- a/tests/providers/openlineage/plugins/test_facets.py +++ b/tests/providers/openlineage/plugins/test_facets.py @@ -16,7 +16,7 @@ # under the License. from __future__ import annotations -from airflow.providers.openlineage.plugins.facets import AirflowRunFacet +from airflow.providers.openlineage.plugins.facets import AirflowDagRunFacet, AirflowRunFacet def test_airflow_run_facet(): @@ -27,7 +27,11 @@ def test_airflow_run_facet(): task_uuid = "XXX" airflow_run_facet = AirflowRunFacet( - dag=dag, dagRun=dag_run, task=task, taskInstance=task_instance, taskUuid=task_uuid + dag=dag, + dagRun=dag_run, + task=task, + taskInstance=task_instance, + taskUuid=task_uuid, ) assert airflow_run_facet.dag == dag @@ -35,3 +39,16 @@ def test_airflow_run_facet(): assert airflow_run_facet.task == task assert airflow_run_facet.taskInstance == task_instance assert airflow_run_facet.taskUuid == task_uuid + + +def test_airflow_dag_run_facet(): + dag = {"dag_id": "123"} + dag_run = {"dag_run_id": "456"} + + airflow_dag_run_facet = AirflowDagRunFacet( + dag=dag, + dagRun=dag_run, + ) + + assert airflow_dag_run_facet.dag == dag + assert airflow_dag_run_facet.dagRun == dag_run diff --git a/tests/providers/openlineage/utils/test_utils.py b/tests/providers/openlineage/utils/test_utils.py index d3a9d89445ffa..6f6fc104b3910 100644 --- a/tests/providers/openlineage/utils/test_utils.py +++ b/tests/providers/openlineage/utils/test_utils.py @@ -23,17 +23,19 @@ from airflow import DAG from airflow.decorators import task from airflow.models.baseoperator import BaseOperator +from airflow.models.dagrun import DagRun from airflow.models.mappedoperator import MappedOperator from airflow.models.taskinstance import TaskInstance from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator from airflow.operators.python import PythonOperator -from airflow.providers.openlineage.plugins.facets import AirflowJobFacet +from airflow.providers.openlineage.plugins.facets import AirflowDagRunFacet, AirflowJobFacet from airflow.providers.openlineage.utils.utils import ( _get_parsed_dag_tree, _get_task_groups_details, _get_tasks_details, _safe_get_dag_tree_view, + get_airflow_dag_run_facet, get_airflow_job_facet, get_custom_facets, get_fully_qualified_class_name, @@ -42,6 +44,7 @@ ) from airflow.serialization.serialized_objects import SerializedBaseOperator from airflow.utils.task_group import TaskGroup +from airflow.utils.types import DagRunType from tests.test_utils.mock_operators import MockOperator @@ -62,7 +65,7 @@ def test_get_airflow_job_facet(): task_0 >> task_10 - dagrun_mock = MagicMock() + dagrun_mock = MagicMock(DagRun) dagrun_mock.dag = dag result = get_airflow_job_facet(dagrun_mock) @@ -104,6 +107,57 @@ def test_get_airflow_job_facet(): } +def test_get_airflow_dag_run_facet(): + with DAG( + dag_id="dag", + schedule="@once", + start_date=datetime.datetime(2024, 6, 1), + tags=["test"], + ) as dag: + task_0 = BashOperator(task_id="task_0", bash_command="exit 0;") + + with TaskGroup("section_1", prefix_group_id=True): + task_10 = PythonOperator(task_id="task_3", python_callable=lambda: 1) + + task_0 >> task_10 + + dagrun_mock = MagicMock(DagRun) + dagrun_mock.dag = dag + dagrun_mock.conf = {} + dagrun_mock.dag_id = dag.dag_id + dagrun_mock.data_interval_start = datetime.datetime(2024, 6, 1, 1, 2, 3, tzinfo=datetime.timezone.utc) + dagrun_mock.data_interval_end = datetime.datetime(2024, 6, 1, 2, 3, 4, tzinfo=datetime.timezone.utc) + dagrun_mock.external_trigger = True + dagrun_mock.run_id = "manual_2024-06-01T00:00:00+00:00" + dagrun_mock.run_type = DagRunType.MANUAL + dagrun_mock.start_date = datetime.datetime(2024, 6, 1, 1, 2, 4, tzinfo=datetime.timezone.utc) + + result = get_airflow_dag_run_facet(dagrun_mock) + assert result == { + "airflowDagRun": AirflowDagRunFacet( + dag={ + "dag_id": "dag", + "description": None, + "owner": "airflow", + "timetable": {}, + "schedule_interval": "@once", + "start_date": "2024-06-01T00:00:00+00:00", + "tags": ["test"], + }, + dagRun={ + "conf": {}, + "dag_id": "dag", + "data_interval_start": "2024-06-01T01:02:03+00:00", + "data_interval_end": "2024-06-01T02:03:04+00:00", + "external_trigger": True, + "run_id": "manual_2024-06-01T00:00:00+00:00", + "run_type": "manual", + "start_date": "2024-06-01T01:02:04+00:00", + }, + ) + } + + def test_get_fully_qualified_class_name_serialized_operator(): op_module_path = "airflow.operators.bash" op_name = "BashOperator"