diff --git a/providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py b/providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py index 1a5232ae35d74..a3744602132c1 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py +++ b/providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py @@ -134,7 +134,7 @@ def extract_metadata( task_metadata.inputs = inputs task_metadata.outputs = outputs else: - self.extract_inlets_and_outlets(task_metadata, task.inlets, task.outlets) + self.extract_inlets_and_outlets(task_metadata, task) return task_metadata except Exception as e: @@ -156,9 +156,7 @@ def extract_metadata( task_metadata = OperatorLineage( run_facets=get_unknown_source_attribute_run_facet(task=task), ) - inlets = task.get_inlet_defs() - outlets = task.get_outlet_defs() - self.extract_inlets_and_outlets(task_metadata, inlets, outlets) + self.extract_inlets_and_outlets(task_metadata, task) return task_metadata return OperatorLineage() @@ -183,19 +181,14 @@ def _get_extractor(self, task: Operator) -> BaseExtractor | None: return extractor(task) return None - def extract_inlets_and_outlets( - self, - task_metadata: OperatorLineage, - inlets: list, - outlets: list, - ): - if inlets or outlets: + def extract_inlets_and_outlets(self, task_metadata: OperatorLineage, task) -> None: + if task.inlets or task.outlets: self.log.debug("Manually extracting lineage metadata from inlets and outlets") - for i in inlets: + for i in task.inlets: d = self.convert_to_ol_dataset(i) if d: task_metadata.inputs.append(d) - for o in outlets: + for o in task.outlets: d = self.convert_to_ol_dataset(o) if d: task_metadata.outputs.append(d) diff --git a/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py b/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py index ab5ba39e4dee9..9f80f2d8a962c 100644 --- a/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py +++ b/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py @@ -41,7 +41,7 @@ from airflow.providers.openlineage.utils.utils import Asset from airflow.utils.state import State -from tests_common.test_utils.compat import PythonOperator +from tests_common.test_utils.compat import DateTimeSensor, PythonOperator from tests_common.test_utils.markers import skip_if_force_lowest_dependencies_marker from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS @@ -527,3 +527,29 @@ def use_read(): assert len(datasets.outputs) == 1 assert datasets.outputs[0].asset == Asset(uri=path) + + +def test_extract_inlets_and_outlets_with_operator(): + inlets = [OpenLineageDataset(namespace="namespace1", name="name1")] + outlets = [OpenLineageDataset(namespace="namespace2", name="name2")] + + extractor_manager = ExtractorManager() + task = PythonOperator(task_id="task_id", python_callable=lambda x: x, inlets=inlets, outlets=outlets) + lineage = OperatorLineage() + extractor_manager.extract_inlets_and_outlets(lineage, task) + assert lineage.inputs == inlets + assert lineage.outputs == outlets + + +def test_extract_inlets_and_outlets_with_sensor(): + inlets = [OpenLineageDataset(namespace="namespace1", name="name1")] + outlets = [OpenLineageDataset(namespace="namespace2", name="name2")] + + extractor_manager = ExtractorManager() + task = DateTimeSensor( + task_id="task_id", target_time="2025-04-04T08:48:13.713922+00:00", inlets=inlets, outlets=outlets + ) + lineage = OperatorLineage() + extractor_manager.extract_inlets_and_outlets(lineage, task) + assert lineage.inputs == inlets + assert lineage.outputs == outlets