Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import os
import traceback
from contextlib import ExitStack
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Literal

import yaml
from openlineage.client import OpenLineageClient, set_producer
Expand All @@ -32,7 +32,7 @@
job_type_job,
nominal_time_run,
ownership_job,
source_code_location_job,
tags_job,
)
from openlineage.client.uuid import generate_static_uuid

Expand Down Expand Up @@ -63,11 +63,8 @@

set_producer(_PRODUCER)

# https://openlineage.io/docs/spec/facets/job-facets/job-type
# They must be set after the `set_producer(_PRODUCER)`
# otherwise the `JobTypeJobFacet._producer` will be set with the default value
_JOB_TYPE_DAG = job_type_job.JobTypeJobFacet(jobType="DAG", integration="AIRFLOW", processingType="BATCH")
_JOB_TYPE_TASK = job_type_job.JobTypeJobFacet(jobType="TASK", integration="AIRFLOW", processingType="BATCH")
_JOB_TYPE_DAG: Literal["DAG"] = "DAG"
_JOB_TYPE_TASK: Literal["TASK"] = "TASK"


class OpenLineageAdapter(LoggingMixin):
Expand Down Expand Up @@ -187,10 +184,10 @@ def start_task(
job_name: str,
job_description: str,
event_time: str,
code_location: str | None,
nominal_start_time: str | None,
nominal_end_time: str | None,
owners: list[str] | None,
tags: list[str] | None,
task: OperatorLineage | None,
run_facets: dict[str, RunFacet] | None = None,
) -> RunEvent:
Expand All @@ -201,17 +198,16 @@ def start_task(
:param job_name: globally unique identifier of task in dag
:param job_description: user provided description of job
:param event_time:
:param code_location: file path or URL of DAG file
:param nominal_start_time: scheduled time of dag run
:param nominal_end_time: following schedule of dag run
:param owners: list of owners
:param tags: list of tags
:param task: metadata container with information extracted from operator
:param run_facets: custom run facets
"""
run_facets = run_facets or {}
if task:
run_facets = {**task.run_facets, **run_facets}
run_facets = {**run_facets, **get_processing_engine_facet()} # type: ignore
event = RunEvent(
eventType=RunState.START,
eventTime=event_time,
Expand All @@ -225,8 +221,8 @@ def start_task(
job_name=job_name,
job_type=_JOB_TYPE_TASK,
job_description=job_description,
code_location=code_location,
owners=owners,
job_owners=owners,
job_tags=tags,
job_facets=task.job_facets if task else None,
),
inputs=task.inputs if task else [],
Expand All @@ -242,6 +238,7 @@ def complete_task(
end_time: str,
task: OperatorLineage,
owners: list[str] | None,
tags: list[str] | None,
run_facets: dict[str, RunFacet] | None = None,
) -> RunEvent:
"""
Expand All @@ -250,22 +247,28 @@ def complete_task(
:param run_id: globally unique identifier of task in dag run
:param job_name: globally unique identifier of task between dags
:param end_time: time of task completion
:param tags: list of tags
:param task: metadata container with information extracted from operator
:param owners: list of owners
:param run_facets: additional run facets
"""
run_facets = run_facets or {}
if task:
run_facets = {**task.run_facets, **run_facets}
run_facets = {**run_facets, **get_processing_engine_facet()} # type: ignore
event = RunEvent(
eventType=RunState.COMPLETE,
eventTime=end_time,
run=self._build_run(
run_id=run_id,
run_facets=run_facets,
),
job=self._build_job(job_name, job_type=_JOB_TYPE_TASK, job_facets=task.job_facets, owners=owners),
job=self._build_job(
job_name,
job_type=_JOB_TYPE_TASK,
job_facets=task.job_facets,
job_owners=owners,
job_tags=tags,
),
inputs=task.inputs,
outputs=task.outputs,
producer=_PRODUCER,
Expand All @@ -279,6 +282,7 @@ def fail_task(
end_time: str,
task: OperatorLineage,
owners: list[str] | None,
tags: list[str] | None,
error: str | BaseException | None = None,
run_facets: dict[str, RunFacet] | None = None,
) -> RunEvent:
Expand All @@ -290,14 +294,14 @@ def fail_task(
:param end_time: time of task completion
:param task: metadata container with information extracted from operator
:param run_facets: custom run facets
:param tags: list of tags
:param owners: list of owners
:param error: error
:param run_facets: additional run facets
"""
run_facets = run_facets or {}
if task:
run_facets = {**task.run_facets, **run_facets}
run_facets = {**run_facets, **get_processing_engine_facet()} # type: ignore

if error:
stack_trace = None
Expand All @@ -316,7 +320,13 @@ def fail_task(
run_id=run_id,
run_facets=run_facets,
),
job=self._build_job(job_name, job_type=_JOB_TYPE_TASK, job_facets=task.job_facets, owners=owners),
job=self._build_job(
job_name,
job_type=_JOB_TYPE_TASK,
job_facets=task.job_facets,
job_owners=owners,
job_tags=tags,
),
inputs=task.inputs,
outputs=task.outputs,
producer=_PRODUCER,
Expand All @@ -328,9 +338,10 @@ def dag_started(
dag_id: str,
logical_date: datetime,
start_date: datetime,
nominal_start_time: str,
nominal_end_time: str,
nominal_start_time: str | None,
nominal_end_time: str | None,
owners: list[str] | None,
tags: list[str],
run_facets: dict[str, RunFacet],
clear_number: int,
description: str | None = None,
Expand All @@ -344,16 +355,17 @@ def dag_started(
job_name=dag_id,
job_type=_JOB_TYPE_DAG,
job_description=description,
owners=owners,
job_owners=owners,
job_facets=job_facets,
job_tags=tags,
),
run=self._build_run(
run_id=self.build_dag_run_id(
dag_id=dag_id, logical_date=logical_date, clear_number=clear_number
),
nominal_start_time=nominal_start_time,
nominal_end_time=nominal_end_time,
run_facets={**run_facets, **get_airflow_debug_facet(), **get_processing_engine_facet()},
run_facets={**run_facets, **get_airflow_debug_facet()},
),
inputs=[],
outputs=[],
Expand All @@ -372,6 +384,7 @@ def dag_success(
run_id: str,
end_date: datetime,
logical_date: datetime,
tags: list[str] | None,
clear_number: int,
dag_run_state: DagRunState,
task_ids: list[str],
Expand All @@ -382,15 +395,19 @@ def dag_success(
event = RunEvent(
eventType=RunState.COMPLETE,
eventTime=end_date.isoformat(),
job=self._build_job(job_name=dag_id, job_type=_JOB_TYPE_DAG, owners=owners),
run=Run(
runId=self.build_dag_run_id(
job=self._build_job(
job_name=dag_id,
job_type=_JOB_TYPE_DAG,
job_owners=owners,
job_tags=tags,
),
run=self._build_run(
run_id=self.build_dag_run_id(
dag_id=dag_id, logical_date=logical_date, clear_number=clear_number
),
facets={
run_facets={
**get_airflow_state_run_facet(dag_id, run_id, task_ids, dag_run_state),
**get_airflow_debug_facet(),
**get_processing_engine_facet(),
**run_facets,
},
),
Expand All @@ -411,6 +428,7 @@ def dag_failed(
run_id: str,
end_date: datetime,
logical_date: datetime,
tags: list[str] | None,
clear_number: int,
dag_run_state: DagRunState,
task_ids: list[str],
Expand All @@ -422,20 +440,22 @@ def dag_failed(
event = RunEvent(
eventType=RunState.FAIL,
eventTime=end_date.isoformat(),
job=self._build_job(job_name=dag_id, job_type=_JOB_TYPE_DAG, owners=owners),
run=Run(
runId=self.build_dag_run_id(
dag_id=dag_id,
logical_date=logical_date,
clear_number=clear_number,
job=self._build_job(
job_name=dag_id,
job_type=_JOB_TYPE_DAG,
job_owners=owners,
job_tags=tags,
),
run=self._build_run(
run_id=self.build_dag_run_id(
dag_id=dag_id, logical_date=logical_date, clear_number=clear_number
),
facets={
run_facets={
"errorMessage": error_message_run.ErrorMessageRunFacet(
message=msg, programmingLanguage="python"
),
**get_airflow_state_run_facet(dag_id, run_id, task_ids, dag_run_state),
**get_airflow_debug_facet(),
**get_processing_engine_facet(),
**run_facets,
},
),
Expand All @@ -457,10 +477,16 @@ def _build_run(
nominal_end_time: str | None = None,
run_facets: dict[str, RunFacet] | None = None,
) -> Run:
facets: dict[str, RunFacet] = {}
facets: dict[str, RunFacet] = get_processing_engine_facet() # type: ignore[assignment]
if nominal_start_time:
facets.update(
{"nominalTime": nominal_time_run.NominalTimeRunFacet(nominal_start_time, nominal_end_time)}
{
"nominalTime": nominal_time_run.NominalTimeRunFacet(
nominalStartTime=nominal_start_time,
nominalEndTime=nominal_end_time,
producer=_PRODUCER,
)
}
)
if run_facets:
facets.update(run_facets)
Expand All @@ -470,37 +496,51 @@ def _build_run(
@staticmethod
def _build_job(
job_name: str,
job_type: job_type_job.JobTypeJobFacet,
job_type: Literal["DAG", "TASK"],
job_description: str | None = None,
code_location: str | None = None,
owners: list[str] | None = None,
job_owners: list[str] | None = None,
job_tags: list[str] | None = None,
job_facets: dict[str, JobFacet] | None = None,
):
facets: dict[str, JobFacet] = {}

facets: dict[str, JobFacet] = {
"jobType": job_type_job.JobTypeJobFacet(
jobType=job_type, integration="AIRFLOW", processingType="BATCH", producer=_PRODUCER
)
}
if job_description:
facets.update(
{"documentation": documentation_job.DocumentationJobFacet(description=job_description)}
{
"documentation": documentation_job.DocumentationJobFacet(
description=job_description, producer=_PRODUCER
)
}
)
if code_location:
if job_owners:
facets.update(
{
"sourceCodeLocation": source_code_location_job.SourceCodeLocationJobFacet(
"", url=code_location
"ownership": ownership_job.OwnershipJobFacet(
owners=[ownership_job.Owner(name=owner) for owner in sorted(job_owners)],
producer=_PRODUCER,
)
}
)
if owners:
if job_tags:
facets.update(
{
"ownership": ownership_job.OwnershipJobFacet(
owners=[ownership_job.Owner(name=owner) for owner in sorted(owners)]
"tags": tags_job.TagsJobFacet(
tags=[
tags_job.TagsJobFacetFields(
key=tag,
value=tag,
source="AIRFLOW",
)
for tag in sorted(job_tags)
],
producer=_PRODUCER,
)
}
)
if job_facets:
facets = {**facets, **job_facets}

facets.update({"jobType": job_type})
facets.update(job_facets)

return Job(conf.namespace(), job_name, facets)
return Job(namespace=conf.namespace(), name=job_name, facets=facets)
Loading
Loading