Skip to content

Commit

Permalink
openlineage: Add AirflowRunFacet for dag runEvents (#40854)
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus authored Jul 23, 2024
1 parent 8a912f9 commit 9ec9eb7
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 14 deletions.
105 changes: 105 additions & 0 deletions airflow/providers/openlineage/facets/AirflowDagRunFacet.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$defs": {
"AirflowDagRunFacet": {
"allOf": [
{
"$ref": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet"
},
{
"type": "object",
"properties": {
"dag": {
"$ref": "#/$defs/DAG"
},
"dagRun": {
"$ref": "#/$defs/DagRun"
}
},
"required": [
"dag",
"dagRun"
]
}
]
},
"DAG": {
"type": "object",
"properties": {
"dag_id": {
"type": "string"
},
"description": {
"type": "string"
},
"owner": {
"type": "string"
},
"schedule_interval": {
"type": "string"
},
"start_date": {
"type": "string",
"format": "date-time"
},
"tags": {
"type": "string"
},
"timetable": {
"description": "Describes timetable (successor of schedule_interval)",
"type": "object",
"additionalProperties": true
}
},
"additionalProperties": true,
"required": [
"dag_id",
"start_date"
]
},
"DagRun": {
"type": "object",
"properties": {
"conf": {
"type": "object",
"additionalProperties": true
},
"dag_id": {
"type": "string"
},
"data_interval_start": {
"type": "string",
"format": "date-time"
},
"data_interval_end": {
"type": "string",
"format": "date-time"
},
"external_trigger": {
"type": "boolean"
},
"run_id": {
"type": "string"
},
"run_type": {
"type": "string"
},
"start_date": {
"type": "string",
"format": "date-time"
}
},
"additionalProperties": true,
"required": [
"dag_id",
"run_id"
]
}
},
"type": "object",
"properties": {
"airflowDagRun": {
"$ref": "#/$defs/AirflowDagRunFacet"
}
}
}
5 changes: 4 additions & 1 deletion airflow/providers/openlineage/plugins/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from airflow.providers.openlineage import __version__ as OPENLINEAGE_PROVIDER_VERSION, conf
from airflow.providers.openlineage.utils.utils import (
OpenLineageRedactor,
get_airflow_dag_run_facet,
get_airflow_state_run_facet,
)
from airflow.stats import Stats
Expand Down Expand Up @@ -334,14 +335,15 @@ def dag_started(
job_facets: dict[str, BaseFacet] | None = None, # Custom job facets
):
try:
owner = [x.strip() for x in dag_run.dag.owner.split(",")] if dag_run.dag else None
event = RunEvent(
eventType=RunState.START,
eventTime=dag_run.start_date.isoformat(),
job=self._build_job(
job_name=dag_run.dag_id,
job_type=_JOB_TYPE_DAG,
job_description=dag_run.dag.description if dag_run.dag else None,
owners=[x.strip() for x in dag_run.dag.owner.split(",")] if dag_run.dag else None,
owners=owner,
job_facets=job_facets,
),
run=self._build_run(
Expand All @@ -352,6 +354,7 @@ def dag_started(
job_name=dag_run.dag_id,
nominal_start_time=nominal_start_time,
nominal_end_time=nominal_end_time,
run_facets=get_airflow_dag_run_facet(dag_run),
),
inputs=[],
outputs=[],
Expand Down
10 changes: 9 additions & 1 deletion airflow/providers/openlineage/plugins/facets.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class AirflowStateRunFacet(BaseFacet):

@define(slots=False)
class AirflowRunFacet(BaseFacet):
"""Composite Airflow run facet."""
"""Composite Airflow task run facet."""

dag: dict
dagRun: dict
Expand All @@ -100,6 +100,14 @@ class AirflowRunFacet(BaseFacet):
taskUuid: str


@define(slots=False)
class AirflowDagRunFacet(BaseFacet):
"""Composite Airflow DAG run facet."""

dag: dict
dagRun: dict


@define(slots=False)
class UnknownOperatorInstance(RedactMixin):
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/openlineage/plugins/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ def on_dag_run_running(self, dag_run: DagRun, msg: str) -> None:
nominal_end_time=data_interval_end,
# AirflowJobFacet should be created outside ProcessPoolExecutor that pickles objects,
# as it causes lack of some TaskGroup attributes and crashes event emission.
job_facets={**get_airflow_job_facet(dag_run=dag_run)},
job_facets=get_airflow_job_facet(dag_run=dag_run),
)

@hookimpl
Expand Down
12 changes: 12 additions & 0 deletions airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from airflow.models import DAG, BaseOperator, MappedOperator
from airflow.providers.openlineage import conf
from airflow.providers.openlineage.plugins.facets import (
AirflowDagRunFacet,
AirflowJobFacet,
AirflowMappedTaskRunFacet,
AirflowRunFacet,
Expand Down Expand Up @@ -345,6 +346,17 @@ class TaskGroupInfo(InfoJsonEncodable):
]


def get_airflow_dag_run_facet(dag_run: DagRun) -> dict[str, BaseFacet]:
if not dag_run.dag:
return {}
return {
"airflowDagRun": AirflowDagRunFacet(
dag=DagInfo(dag_run.dag),
dagRun=DagRunInfo(dag_run),
)
}


def get_airflow_run_facet(
dag_run: DagRun,
dag: DAG,
Expand Down
32 changes: 25 additions & 7 deletions tests/providers/openlineage/plugins/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,10 @@
from airflow.models.taskinstance import TaskInstance, TaskInstanceState
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.providers.openlineage.conf import (
namespace,
)
from airflow.providers.openlineage.conf import namespace
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.providers.openlineage.plugins.adapter import _PRODUCER, OpenLineageAdapter
from airflow.providers.openlineage.plugins.facets import (
AirflowStateRunFacet,
)
from airflow.providers.openlineage.plugins.facets import AirflowDagRunFacet, AirflowStateRunFacet
from airflow.providers.openlineage.utils.utils import get_airflow_job_facet
from airflow.utils.task_group import TaskGroup
from tests.test_utils.config import conf_vars
Expand Down Expand Up @@ -518,6 +514,7 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat
run_id=run_id,
start_date=event_time,
execution_date=event_time,
data_interval=(event_time, event_time),
)
dag_run.dag = dag
generate_static_uuid.return_value = random_uuid
Expand All @@ -544,7 +541,28 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat
"nominalTime": NominalTimeRunFacet(
nominalStartTime=event_time.isoformat(),
nominalEndTime=event_time.isoformat(),
)
),
"airflowDagRun": AirflowDagRunFacet(
dag={
"timetable": {"delta": 86400.0},
"dag_id": dag_id,
"description": "dag desc",
"owner": "airflow",
"schedule_interval": "86400.0 seconds",
"start_date": "2024-06-01T00:00:00+00:00",
"tags": [],
},
dagRun={
"conf": {},
"dag_id": "dag_id",
"data_interval_start": event_time.isoformat(),
"data_interval_end": event_time.isoformat(),
"external_trigger": None,
"run_id": run_id,
"run_type": None,
"start_date": event_time.isoformat(),
},
),
},
),
job=Job(
Expand Down
21 changes: 19 additions & 2 deletions tests/providers/openlineage/plugins/test_facets.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations

from airflow.providers.openlineage.plugins.facets import AirflowRunFacet
from airflow.providers.openlineage.plugins.facets import AirflowDagRunFacet, AirflowRunFacet


def test_airflow_run_facet():
Expand All @@ -27,11 +27,28 @@ def test_airflow_run_facet():
task_uuid = "XXX"

airflow_run_facet = AirflowRunFacet(
dag=dag, dagRun=dag_run, task=task, taskInstance=task_instance, taskUuid=task_uuid
dag=dag,
dagRun=dag_run,
task=task,
taskInstance=task_instance,
taskUuid=task_uuid,
)

assert airflow_run_facet.dag == dag
assert airflow_run_facet.dagRun == dag_run
assert airflow_run_facet.task == task
assert airflow_run_facet.taskInstance == task_instance
assert airflow_run_facet.taskUuid == task_uuid


def test_airflow_dag_run_facet():
dag = {"dag_id": "123"}
dag_run = {"dag_run_id": "456"}

airflow_dag_run_facet = AirflowDagRunFacet(
dag=dag,
dagRun=dag_run,
)

assert airflow_dag_run_facet.dag == dag
assert airflow_dag_run_facet.dagRun == dag_run
58 changes: 56 additions & 2 deletions tests/providers/openlineage/utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,19 @@
from airflow import DAG
from airflow.decorators import task
from airflow.models.baseoperator import BaseOperator
from airflow.models.dagrun import DagRun
from airflow.models.mappedoperator import MappedOperator
from airflow.models.taskinstance import TaskInstance
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.providers.openlineage.plugins.facets import AirflowJobFacet
from airflow.providers.openlineage.plugins.facets import AirflowDagRunFacet, AirflowJobFacet
from airflow.providers.openlineage.utils.utils import (
_get_parsed_dag_tree,
_get_task_groups_details,
_get_tasks_details,
_safe_get_dag_tree_view,
get_airflow_dag_run_facet,
get_airflow_job_facet,
get_custom_facets,
get_fully_qualified_class_name,
Expand All @@ -42,6 +44,7 @@
)
from airflow.serialization.serialized_objects import SerializedBaseOperator
from airflow.utils.task_group import TaskGroup
from airflow.utils.types import DagRunType
from tests.test_utils.mock_operators import MockOperator


Expand All @@ -62,7 +65,7 @@ def test_get_airflow_job_facet():

task_0 >> task_10

dagrun_mock = MagicMock()
dagrun_mock = MagicMock(DagRun)
dagrun_mock.dag = dag

result = get_airflow_job_facet(dagrun_mock)
Expand Down Expand Up @@ -104,6 +107,57 @@ def test_get_airflow_job_facet():
}


def test_get_airflow_dag_run_facet():
with DAG(
dag_id="dag",
schedule="@once",
start_date=datetime.datetime(2024, 6, 1),
tags=["test"],
) as dag:
task_0 = BashOperator(task_id="task_0", bash_command="exit 0;")

with TaskGroup("section_1", prefix_group_id=True):
task_10 = PythonOperator(task_id="task_3", python_callable=lambda: 1)

task_0 >> task_10

dagrun_mock = MagicMock(DagRun)
dagrun_mock.dag = dag
dagrun_mock.conf = {}
dagrun_mock.dag_id = dag.dag_id
dagrun_mock.data_interval_start = datetime.datetime(2024, 6, 1, 1, 2, 3, tzinfo=datetime.timezone.utc)
dagrun_mock.data_interval_end = datetime.datetime(2024, 6, 1, 2, 3, 4, tzinfo=datetime.timezone.utc)
dagrun_mock.external_trigger = True
dagrun_mock.run_id = "manual_2024-06-01T00:00:00+00:00"
dagrun_mock.run_type = DagRunType.MANUAL
dagrun_mock.start_date = datetime.datetime(2024, 6, 1, 1, 2, 4, tzinfo=datetime.timezone.utc)

result = get_airflow_dag_run_facet(dagrun_mock)
assert result == {
"airflowDagRun": AirflowDagRunFacet(
dag={
"dag_id": "dag",
"description": None,
"owner": "airflow",
"timetable": {},
"schedule_interval": "@once",
"start_date": "2024-06-01T00:00:00+00:00",
"tags": ["test"],
},
dagRun={
"conf": {},
"dag_id": "dag",
"data_interval_start": "2024-06-01T01:02:03+00:00",
"data_interval_end": "2024-06-01T02:03:04+00:00",
"external_trigger": True,
"run_id": "manual_2024-06-01T00:00:00+00:00",
"run_type": "manual",
"start_date": "2024-06-01T01:02:04+00:00",
},
)
}


def test_get_fully_qualified_class_name_serialized_operator():
op_module_path = "airflow.operators.bash"
op_name = "BashOperator"
Expand Down

0 comments on commit 9ec9eb7

Please sign in to comment.