Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion providers/amazon/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ dependencies = [
"apache-airflow-providers-mongo"
]
"openlineage" = [
"apache-airflow-providers-openlineage"
"apache-airflow-providers-openlineage>=2.3.0"
]
"salesforce" = [
"apache-airflow-providers-salesforce"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ def test_inject_composite_openlineage_config_to_spark(
mock_ti.try_number = 1
mock_ti.dag_run.logical_date = DEFAULT_DATE
mock_ti.dag_run.run_after = DEFAULT_DATE
mock_ti.dag_run.clear_number = 0
mock_ti.logical_date = DEFAULT_DATE
mock_ti.map_index = -1
mock_get_batch_state.return_value = BatchState.SUCCESS
Expand All @@ -472,6 +473,9 @@ def test_inject_composite_openlineage_config_to_spark(
"spark.openlineage.parentJobName": "test_dag_id.spark_submit_job",
"spark.openlineage.parentJobNamespace": "default",
"spark.openlineage.parentRunId": "01595753-6400-710b-8a12-9e978335a56d",
"spark.openlineage.rootParentJobName": "test_dag_id",
"spark.openlineage.rootParentJobNamespace": "default",
"spark.openlineage.rootParentRunId": "01595753-6400-71fe-a08c-aaed126ab6fb",
"spark.openlineage.transport.type": "composite",
"spark.openlineage.transport.continueOnFailure": "True",
"spark.openlineage.transport.transports.test1.type": "http",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ def test_inject_composite_openlineage_config_to_spark(self, mock_get_openlineage
mock_ti.try_number = 1
mock_ti.dag_run.logical_date = DEFAULT_DATE
mock_ti.dag_run.run_after = DEFAULT_DATE
mock_ti.dag_run.clear_number = 0
mock_ti.logical_date = DEFAULT_DATE
mock_ti.map_index = -1

Expand All @@ -386,6 +387,9 @@ def test_inject_composite_openlineage_config_to_spark(self, mock_get_openlineage
"spark.openlineage.parentJobName": "test_dag_id.spark_submit_job",
"spark.openlineage.parentJobNamespace": "default",
"spark.openlineage.parentRunId": "01595753-6400-710b-8a12-9e978335a56d",
"spark.openlineage.rootParentJobName": "test_dag_id",
"spark.openlineage.rootParentJobNamespace": "default",
"spark.openlineage.rootParentRunId": "01595753-6400-71fe-a08c-aaed126ab6fb",
"spark.openlineage.transport.type": "composite",
"spark.openlineage.transport.continueOnFailure": "True",
"spark.openlineage.transport.transports.test1.type": "http",
Expand Down
2 changes: 1 addition & 1 deletion providers/dbt/cloud/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ dependencies = [
[project.optional-dependencies]
# pip install apache-airflow-providers-dbt-cloud[openlineage]
"openlineage" = [
"apache-airflow-providers-openlineage>=2.0.0",
"apache-airflow-providers-openlineage>=2.3.0",
]

[dependency-groups]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def _get_logical_date(task_instance):
return date


@require_openlineage_version(provider_min_version="2.0.0")
@require_openlineage_version(provider_min_version="2.3.0")
def generate_openlineage_events_from_dbt_cloud_run(
operator: DbtCloudRunJobOperator | DbtCloudJobRunSensor, task_instance: TaskInstance
) -> OperatorLineage:
Expand Down Expand Up @@ -141,10 +141,19 @@ async def get_artifacts_for_steps(steps, artifacts):
map_index=task_instance.map_index,
)

root_parent_run_id = OpenLineageAdapter.build_dag_run_id(
dag_id=task_instance.dag_id,
logical_date=_get_logical_date(task_instance),
clear_number=task_instance.dag_run.clear_number,
)

parent_job = ParentRunMetadata(
run_id=parent_run_id,
job_name=f"{task_instance.dag_id}.{task_instance.task_id}",
job_namespace=namespace(),
root_parent_run_id=root_parent_run_id,
root_parent_job_name=task_instance.dag_id,
root_parent_job_namespace=namespace(),
)
client = get_openlineage_listener().adapter.get_or_create_openlineage_client()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
TASK_ID = "dbt_test"
DAG_ID = "dbt_dag"
TASK_UUID = "01481cfa-0ff7-3692-9bba-79417cf498c2"
DAG_UUID = "01481cfa-1a1a-2b2b-3c3c-79417cf498c2"


class MockResponse:
Expand Down Expand Up @@ -88,30 +89,48 @@ def get_dbt_artifact(*args, **kwargs):
return None


def test_previous_version_openlineage_provider():
"""When using OpenLineage, the dbt-cloud provider now depends on openlineage provider >= 2.0"""
@pytest.mark.parametrize(
"value, is_error",
[
("1.99.0", True),
("2.0.0", True),
("2.3.0", False),
("2.99.0", False),
],
)
def test_previous_version_openlineage_provider(value, is_error):
"""When using OpenLineage, the dbt-cloud provider now depends on openlineage provider >= 2.3"""

def _mock_version(package):
if package == "apache-airflow-providers-openlineage":
return "1.99.0"
return value
raise Exception("Unexpected package")

mock_operator = MagicMock()
mock_task_instance = MagicMock()

expected_err = (
"OpenLineage provider version `1.99.0` is lower than required `2.0.0`, "
f"OpenLineage provider version `{value}` is lower than required `2.3.0`, "
"skipping function `generate_openlineage_events_from_dbt_cloud_run` execution"
)

with patch("importlib.metadata.version", side_effect=_mock_version):
with pytest.raises(AirflowOptionalProviderFeatureException, match=expected_err):
generate_openlineage_events_from_dbt_cloud_run(mock_operator, mock_task_instance)
if is_error:
with patch("importlib.metadata.version", side_effect=_mock_version):
with pytest.raises(AirflowOptionalProviderFeatureException, match=expected_err):
generate_openlineage_events_from_dbt_cloud_run(mock_operator, mock_task_instance)
else:
with patch("importlib.metadata.version", side_effect=_mock_version):
# Error that would certainly not happen on version checking
mock_operator.hook.get_job_run.side_effect = ZeroDivisionError("error for test")
with pytest.raises(ZeroDivisionError, match="error for test"):
generate_openlineage_events_from_dbt_cloud_run(mock_operator, mock_task_instance)


class TestGenerateOpenLineageEventsFromDbtCloudRun:
@patch("importlib.metadata.version", return_value="2.3.0")
@patch("airflow.providers.openlineage.plugins.listener.get_openlineage_listener")
@patch("airflow.providers.openlineage.plugins.adapter.OpenLineageAdapter.build_task_instance_run_id")
@patch("airflow.providers.openlineage.plugins.adapter.OpenLineageAdapter.build_dag_run_id")
@patch.object(DbtCloudHook, "get_job_run")
@patch.object(DbtCloudHook, "get_project")
@patch.object(DbtCloudHook, "get_job_run_artifact")
Expand All @@ -120,8 +139,10 @@ def test_generate_events(
mock_get_job_run_artifact,
mock_get_project,
mock_get_job_run,
mock_build_dag_run_id,
mock_build_task_instance_run_id,
mock_get_openlineage_listener,
mock_version,
):
mock_operator = MagicMock(spec=DbtCloudRunJobOperator)
mock_operator.account_id = None
Expand Down Expand Up @@ -154,6 +175,7 @@ def test_generate_events(
mock_task_instance = MagicMock()
mock_task_instance.task_id = TASK_ID
mock_task_instance.dag_id = DAG_ID
mock_task_instance.dag_run.clear_number = 0

mock_client = MagicMock()

Expand All @@ -163,6 +185,7 @@ def test_generate_events(
)

mock_build_task_instance_run_id.return_value = TASK_UUID
mock_build_dag_run_id.return_value = DAG_UUID
generate_openlineage_events_from_dbt_cloud_run(mock_operator, task_instance=mock_task_instance)
assert mock_client.emit.call_count == 4

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,13 @@
EXAMPLE_CONTEXT = {
"ti": MagicMock(
dag_id="dag_id",
dag_run=MagicMock(run_after=dt.datetime(2024, 11, 11), logical_date=dt.datetime(2024, 11, 11)),
task_id="task_id",
try_number=1,
map_index=1,
logical_date=dt.datetime(2024, 11, 11),
dag_run=MagicMock(
run_after=dt.datetime(2024, 11, 11), logical_date=dt.datetime(2024, 11, 11), clear_number=0
),
)
}
OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG = {
Expand Down Expand Up @@ -142,6 +144,9 @@
"spark.openlineage.parentJobName": "dag_id.task_id",
"spark.openlineage.parentJobNamespace": "default",
"spark.openlineage.parentRunId": "01931885-2800-7be7-aa8d-aaa15c337267",
"spark.openlineage.rootParentJobName": "dag_id",
"spark.openlineage.rootParentJobNamespace": "default",
"spark.openlineage.rootParentRunId": "01931885-2800-799d-8041-88a263ffa0d8",
}


Expand Down Expand Up @@ -1034,6 +1039,9 @@ def test_inject_openlineage_properties_into_dataproc_workflow_template_parent_in
"spark.openlineage.parentJobName": "dag_id.task_id",
"spark.openlineage.parentJobNamespace": "default",
"spark.openlineage.parentRunId": "01931885-2800-7be7-aa8d-aaa15c337267",
"spark.openlineage.rootParentJobName": "dag_id",
"spark.openlineage.rootParentJobNamespace": "default",
"spark.openlineage.rootParentRunId": "01931885-2800-799d-8041-88a263ffa0d8",
},
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@
try_number=1,
map_index=1,
logical_date=dt.datetime(2024, 11, 11),
dag_run=MagicMock(logical_date=dt.datetime(2024, 11, 11), clear_number=0),
)
}
OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG = {
Expand Down Expand Up @@ -436,6 +437,9 @@
"spark.openlineage.parentJobName": "dag_id.task_id",
"spark.openlineage.parentJobNamespace": "default",
"spark.openlineage.parentRunId": "01931885-2800-7be7-aa8d-aaa15c337267",
"spark.openlineage.rootParentJobName": "dag_id",
"spark.openlineage.rootParentJobNamespace": "default",
"spark.openlineage.rootParentRunId": "01931885-2800-7be7-aa8d-aaa15c337267",
}


Expand Down Expand Up @@ -1464,6 +1468,9 @@ def test_execute_openlineage_parent_job_info_injection(
"spark.openlineage.parentJobName": "dag_id.task_id",
"spark.openlineage.parentJobNamespace": "default",
"spark.openlineage.parentRunId": "01931885-2800-7be7-aa8d-aaa15c337267",
"spark.openlineage.rootParentJobName": "dag_id",
"spark.openlineage.rootParentJobNamespace": "default",
"spark.openlineage.rootParentRunId": "01931885-2800-7be7-aa8d-aaa15c337267",
},
},
}
Expand Down Expand Up @@ -2646,6 +2653,9 @@ def test_execute_openlineage_parent_job_info_injection(
"spark.openlineage.parentJobName": "dag_id.task_id",
"spark.openlineage.parentJobNamespace": "default",
"spark.openlineage.parentRunId": "01931885-2800-7be7-aa8d-aaa15c337267",
"spark.openlineage.rootParentJobName": "dag_id",
"spark.openlineage.rootParentJobNamespace": "default",
"spark.openlineage.rootParentRunId": "01931885-2800-7be7-aa8d-aaa15c337267",
},
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
job_type_job,
nominal_time_run,
ownership_job,
parent_run,
source_code_location_job,
)
from openlineage.client.uuid import generate_static_uuid
Expand Down Expand Up @@ -188,8 +187,6 @@ def start_task(
job_name: str,
job_description: str,
event_time: str,
parent_job_name: str | None,
parent_run_id: str | None,
code_location: str | None,
nominal_start_time: str | None,
nominal_end_time: str | None,
Expand All @@ -204,9 +201,6 @@ def start_task(
:param job_name: globally unique identifier of task in dag
:param job_description: user provided description of job
:param event_time:
:param parent_job_name: the name of the parent job (typically the DAG,
but possibly a task group)
:param parent_run_id: identifier of job spawning this task
:param code_location: file path or URL of DAG file
:param nominal_start_time: scheduled time of dag run
:param nominal_end_time: following schedule of dag run
Expand All @@ -223,9 +217,6 @@ def start_task(
eventTime=event_time,
run=self._build_run(
run_id=run_id,
job_name=job_name,
parent_job_name=parent_job_name,
parent_run_id=parent_run_id,
nominal_start_time=nominal_start_time,
nominal_end_time=nominal_end_time,
run_facets=run_facets,
Expand All @@ -248,8 +239,6 @@ def complete_task(
self,
run_id: str,
job_name: str,
parent_job_name: str | None,
parent_run_id: str | None,
end_time: str,
task: OperatorLineage,
run_facets: dict[str, RunFacet] | None = None,
Expand All @@ -259,9 +248,6 @@ def complete_task(

:param run_id: globally unique identifier of task in dag run
:param job_name: globally unique identifier of task between dags
:param parent_job_name: the name of the parent job (typically the DAG,
but possibly a task group)
:param parent_run_id: identifier of job spawning this task
:param end_time: time of task completion
:param task: metadata container with information extracted from operator
:param run_facets: additional run facets
Expand All @@ -275,9 +261,6 @@ def complete_task(
eventTime=end_time,
run=self._build_run(
run_id=run_id,
job_name=job_name,
parent_job_name=parent_job_name,
parent_run_id=parent_run_id,
run_facets=run_facets,
),
job=self._build_job(job_name, job_type=_JOB_TYPE_TASK, job_facets=task.job_facets),
Expand All @@ -291,8 +274,6 @@ def fail_task(
self,
run_id: str,
job_name: str,
parent_job_name: str | None,
parent_run_id: str | None,
end_time: str,
task: OperatorLineage,
error: str | BaseException | None = None,
Expand All @@ -303,9 +284,6 @@ def fail_task(

:param run_id: globally unique identifier of task in dag run
:param job_name: globally unique identifier of task between dags
:param parent_job_name: the name of the parent job (typically the DAG,
but possibly a task group)
:param parent_run_id: identifier of job spawning this task
:param end_time: time of task completion
:param task: metadata container with information extracted from operator
:param run_facets: custom run facets
Expand All @@ -332,9 +310,6 @@ def fail_task(
eventTime=end_time,
run=self._build_run(
run_id=run_id,
job_name=job_name,
parent_job_name=parent_job_name,
parent_run_id=parent_run_id,
run_facets=run_facets,
),
job=self._build_job(job_name, job_type=_JOB_TYPE_TASK, job_facets=task.job_facets),
Expand Down Expand Up @@ -372,7 +347,6 @@ def dag_started(
run_id=self.build_dag_run_id(
dag_id=dag_id, logical_date=logical_date, clear_number=clear_number
),
job_name=dag_id,
nominal_start_time=nominal_start_time,
nominal_end_time=nominal_end_time,
run_facets={**run_facets, **get_airflow_debug_facet(), **get_processing_engine_facet()},
Expand Down Expand Up @@ -473,9 +447,6 @@ def dag_failed(
@staticmethod
def _build_run(
run_id: str,
job_name: str,
parent_job_name: str | None = None,
parent_run_id: str | None = None,
nominal_start_time: str | None = None,
nominal_end_time: str | None = None,
run_facets: dict[str, RunFacet] | None = None,
Expand All @@ -485,13 +456,6 @@ def _build_run(
facets.update(
{"nominalTime": nominal_time_run.NominalTimeRunFacet(nominal_start_time, nominal_end_time)}
)
if parent_run_id:
parent_run_facet = parent_run.ParentRunFacet(
run=parent_run.Run(runId=parent_run_id),
job=parent_run.Job(namespace=conf.namespace(), name=parent_job_name or job_name),
)
facets.update({"parent": parent_run_facet})

if run_facets:
facets.update(run_facets)

Expand Down
Loading