diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py index fd73a56a917ac..613a71b4fdc8b 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py @@ -497,7 +497,9 @@ def _build_run( nominal_end_time: str | None = None, run_facets: dict[str, RunFacet] | None = None, ) -> Run: - facets: dict[str, RunFacet] = get_processing_engine_facet() # type: ignore[assignment] + facets: dict[str, RunFacet] = {} + if run_facets: + facets.update(run_facets) if nominal_start_time: facets.update( { @@ -508,8 +510,7 @@ def _build_run( ) } ) - if run_facets: - facets.update(run_facets) + facets.update(get_processing_engine_facet()) return Run(run_id, facets) @@ -522,11 +523,9 @@ def _build_job( job_tags: list[str] | None = None, job_facets: dict[str, JobFacet] | None = None, ): - facets: dict[str, JobFacet] = { - "jobType": job_type_job.JobTypeJobFacet( - jobType=job_type, integration="AIRFLOW", processingType="BATCH", producer=_PRODUCER - ) - } + facets: dict[str, JobFacet] = {} + if job_facets: + facets.update(job_facets) if job_description: facets.update( { @@ -560,7 +559,12 @@ def _build_job( ) } ) - if job_facets: - facets.update(job_facets) + facets.update( + { + "jobType": job_type_job.JobTypeJobFacet( + jobType=job_type, integration="AIRFLOW", processingType="BATCH", producer=_PRODUCER + ) + } + ) return Job(namespace=conf.namespace(), name=job_name, facets=facets) diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py b/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py index a1b3ecb467561..f59a9b8d62ce2 100644 --- a/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py +++ b/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py @@ -1316,3 +1316,66 @@ def test_configuration_precedence_when_creating_ol_client(): ): client = OpenLineageAdapter().get_or_create_openlineage_client() assert client.transport.kind == "console" + + +def test_adapter_build_run(): + run_id = str(uuid.uuid4()) + result = OpenLineageAdapter._build_run( + run_id=run_id, + nominal_start_time=datetime.datetime(2022, 1, 1).isoformat(), + nominal_end_time=datetime.datetime(2022, 1, 1).isoformat(), + run_facets={ + "my_custom_facet": external_query_run.ExternalQueryRunFacet( + externalQueryId="123", source="source" + ), + "processing_engine": "this_should_be_gone", + }, + ) + assert result.runId == run_id + assert result.facets == { + "my_custom_facet": external_query_run.ExternalQueryRunFacet(externalQueryId="123", source="source"), + "nominalTime": nominal_time_run.NominalTimeRunFacet( + nominalStartTime="2022-01-01T00:00:00", + nominalEndTime="2022-01-01T00:00:00", + ), + "processing_engine": processing_engine_run.ProcessingEngineRunFacet( + version=ANY, name="Airflow", openlineageAdapterVersion=ANY + ), + } + + +def test_adapter_build_job(): + result = OpenLineageAdapter._build_job( + job_name="job_name", + job_type="TASK", + job_description="job_description", + job_owners=["def", "abc"], + job_tags=["tag2", "tag1"], + job_facets={ + "my_custom_facet": sql_job.SQLJobFacet(query="sql"), + "jobType": "this_should_be_gone", + "documentation": "this_should_be_gone", + "ownership": "this_should_be_gone", + "tags": "this_should_be_gone", + }, + ) + assert result.name == "job_name" + assert result.facets == { + "my_custom_facet": sql_job.SQLJobFacet(query="sql"), + "documentation": documentation_job.DocumentationJobFacet(description="job_description"), + "ownership": ownership_job.OwnershipJobFacet( + owners=[ + ownership_job.Owner(name="abc", type=None), + ownership_job.Owner(name="def", type=None), + ] + ), + "tags": tags_job.TagsJobFacet( + tags=[ + tags_job.TagsJobFacetFields(key="tag1", value="tag1", source="AIRFLOW"), + tags_job.TagsJobFacetFields(key="tag2", value="tag2", source="AIRFLOW"), + ] + ), + "jobType": job_type_job.JobTypeJobFacet( + processingType="BATCH", integration="AIRFLOW", jobType="TASK" + ), + }