Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ def complete_task(
job_name: str,
end_time: str,
task: OperatorLineage,
nominal_start_time: str | None,
nominal_end_time: str | None,
owners: list[str] | None,
tags: list[str] | None,
run_facets: dict[str, RunFacet] | None = None,
Expand All @@ -248,6 +250,8 @@ def complete_task(
:param job_name: globally unique identifier of task between dags
:param end_time: time of task completion
:param tags: list of tags
:param nominal_start_time: scheduled time of dag run
:param nominal_end_time: following schedule of dag run
:param task: metadata container with information extracted from operator
:param owners: list of owners
:param run_facets: additional run facets
Expand All @@ -260,6 +264,8 @@ def complete_task(
eventTime=end_time,
run=self._build_run(
run_id=run_id,
nominal_start_time=nominal_start_time,
nominal_end_time=nominal_end_time,
run_facets=run_facets,
),
job=self._build_job(
Expand All @@ -281,6 +287,8 @@ def fail_task(
job_name: str,
end_time: str,
task: OperatorLineage,
nominal_start_time: str | None,
nominal_end_time: str | None,
owners: list[str] | None,
tags: list[str] | None,
error: str | BaseException | None = None,
Expand All @@ -295,6 +303,8 @@ def fail_task(
:param task: metadata container with information extracted from operator
:param run_facets: custom run facets
:param tags: list of tags
:param nominal_start_time: scheduled time of dag run
:param nominal_end_time: following schedule of dag run
:param owners: list of owners
:param error: error
:param run_facets: additional run facets
Expand All @@ -318,6 +328,8 @@ def fail_task(
eventTime=end_time,
run=self._build_run(
run_id=run_id,
nominal_start_time=nominal_start_time,
nominal_end_time=nominal_end_time,
run_facets=run_facets,
),
job=self._build_job(
Expand Down Expand Up @@ -384,6 +396,8 @@ def dag_success(
run_id: str,
end_date: datetime,
logical_date: datetime,
nominal_start_time: str | None,
nominal_end_time: str | None,
tags: list[str] | None,
clear_number: int,
dag_run_state: DagRunState,
Expand All @@ -405,6 +419,8 @@ def dag_success(
run_id=self.build_dag_run_id(
dag_id=dag_id, logical_date=logical_date, clear_number=clear_number
),
nominal_start_time=nominal_start_time,
nominal_end_time=nominal_end_time,
run_facets={
**get_airflow_state_run_facet(dag_id, run_id, task_ids, dag_run_state),
**get_airflow_debug_facet(),
Expand All @@ -428,6 +444,8 @@ def dag_failed(
run_id: str,
end_date: datetime,
logical_date: datetime,
nominal_start_time: str | None,
nominal_end_time: str | None,
tags: list[str] | None,
clear_number: int,
dag_run_state: DagRunState,
Expand All @@ -450,6 +468,8 @@ def dag_failed(
run_id=self.build_dag_run_id(
dag_id=dag_id, logical_date=logical_date, clear_number=clear_number
),
nominal_start_time=nominal_start_time,
nominal_end_time=nominal_end_time,
run_facets={
"errorMessage": error_message_run.ErrorMessageRunFacet(
message=msg, programmingLanguage="python"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,6 @@ def _on_task_instance_running(
)
return

data_interval_start = dagrun.data_interval_start
if isinstance(data_interval_start, datetime):
data_interval_start = data_interval_start.isoformat()
data_interval_end = dagrun.data_interval_end
if isinstance(data_interval_end, datetime):
data_interval_end = data_interval_end.isoformat()

clear_number = 0
if hasattr(dagrun, "clear_number"):
clear_number = dagrun.clear_number

# Needs to be calculated outside of inner method so that it gets cached for usage in fork processes
debug_facet = get_airflow_debug_facet()

Expand All @@ -176,6 +165,10 @@ def on_running():
if AIRFLOW_V_3_0_PLUS and date is None:
date = dagrun.run_after

clear_number = 0
if hasattr(dagrun, "clear_number"):
clear_number = dagrun.clear_number

parent_run_id = self.adapter.build_dag_run_id(
dag_id=dag.dag_id,
logical_date=date,
Expand All @@ -192,6 +185,13 @@ def on_running():
event_type = RunState.RUNNING.value.lower()
operator_name = task.task_type.lower()

data_interval_start = dagrun.data_interval_start
if isinstance(data_interval_start, datetime):
data_interval_start = data_interval_start.isoformat()
data_interval_end = dagrun.data_interval_end
if isinstance(data_interval_end, datetime):
data_interval_end = data_interval_end.isoformat()

with Stats.timer(f"ol.extract.{event_type}.{operator_name}"):
task_metadata = self.extractor_manager.extract_metadata(
dagrun=dagrun, task=task, task_instance_state=TaskInstanceState.RUNNING
Expand Down Expand Up @@ -304,6 +304,13 @@ def on_success():
event_type = RunState.COMPLETE.value.lower()
operator_name = task.task_type.lower()

data_interval_start = dagrun.data_interval_start
if isinstance(data_interval_start, datetime):
data_interval_start = data_interval_start.isoformat()
data_interval_end = dagrun.data_interval_end
if isinstance(data_interval_end, datetime):
data_interval_end = data_interval_end.isoformat()

with Stats.timer(f"ol.extract.{event_type}.{operator_name}"):
task_metadata = self.extractor_manager.extract_metadata(
dagrun=dagrun,
Expand All @@ -320,6 +327,8 @@ def on_success():
# If task owner is default ("airflow"), use DAG owner instead that may have more details
owners=[x.strip() for x in (task if task.owner != "airflow" else dag).owner.split(",")],
tags=dag.tags,
nominal_start_time=data_interval_start,
nominal_end_time=data_interval_end,
run_facets={
**get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id),
**get_user_provided_run_facets(task_instance, TaskInstanceState.SUCCESS),
Expand Down Expand Up @@ -426,6 +435,13 @@ def on_failure():
event_type = RunState.FAIL.value.lower()
operator_name = task.task_type.lower()

data_interval_start = dagrun.data_interval_start
if isinstance(data_interval_start, datetime):
data_interval_start = data_interval_start.isoformat()
data_interval_end = dagrun.data_interval_end
if isinstance(data_interval_end, datetime):
data_interval_end = data_interval_end.isoformat()

with Stats.timer(f"ol.extract.{event_type}.{operator_name}"):
task_metadata = self.extractor_manager.extract_metadata(
dagrun=dagrun,
Expand All @@ -440,6 +456,8 @@ def on_failure():
end_time=end_date.isoformat(),
task=task_metadata,
error=error,
nominal_start_time=data_interval_start,
nominal_end_time=data_interval_end,
tags=dag.tags,
# If task owner is default ("airflow"), use DAG owner instead that may have more details
owners=[x.strip() for x in (task if task.owner != "airflow" else dag).owner.split(",")],
Expand Down Expand Up @@ -642,11 +660,18 @@ def on_dag_run_success(self, dag_run: DagRun, msg: str) -> None:
if AIRFLOW_V_3_0_PLUS and date is None:
date = dag_run.run_after

data_interval_start = (
dag_run.data_interval_start.isoformat() if dag_run.data_interval_start else None
)
data_interval_end = dag_run.data_interval_end.isoformat() if dag_run.data_interval_end else None

self.submit_callable(
self.adapter.dag_success,
dag_id=dag_run.dag_id,
run_id=dag_run.run_id,
end_date=dag_run.end_date,
nominal_start_time=data_interval_start,
nominal_end_time=data_interval_end,
logical_date=date,
clear_number=dag_run.clear_number,
owners=[x.strip() for x in dag_run.dag.owner.split(",")] if dag_run.dag else None,
Expand Down Expand Up @@ -680,11 +705,18 @@ def on_dag_run_failed(self, dag_run: DagRun, msg: str) -> None:
if AIRFLOW_V_3_0_PLUS and date is None:
date = dag_run.run_after

data_interval_start = (
dag_run.data_interval_start.isoformat() if dag_run.data_interval_start else None
)
data_interval_end = dag_run.data_interval_end.isoformat() if dag_run.data_interval_end else None

self.submit_callable(
self.adapter.dag_failed,
dag_id=dag_run.dag_id,
run_id=dag_run.run_id,
end_date=dag_run.end_date,
nominal_start_time=data_interval_start,
nominal_end_time=data_interval_end,
logical_date=date,
clear_number=dag_run.clear_number,
owners=[x.strip() for x in dag_run.dag.owner.split(",")] if dag_run.dag else None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ def __init__(self, **kwargs):
catchup=False,
description="OpenLineage complex DAG description",
owner_links={"airflow": "https://airflow.apache.org/"},
tags=["first", "second@", "with'quote"],
tags=["first", "second@", "with'quote", 'z"e'],
default_args={"retries": 0},
) as dag:
# task_0 will not emit any events, but the owner will be picked up and added to DAG
task_0 = EmptyOperator(task_id="task_0", owner="owner1")
task_0 = EmptyOperator(task_id="task_0", owner='owner"1')
task_1 = BashOperator(
task_id="task_1.id.with.dots",
bash_command="exit 0;",
owner="owner2",
owner="owner'2",
execution_timeout=timedelta(seconds=456),
)
task_2 = PythonOperator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
start_date=datetime(2021, 1, 1),
schedule=None,
catchup=False,
tags=["first", "second@", "with'quote", 'z"e'],
default_args={"retries": 0},
) as child_dag:
do_nothing_task = BashOperator(task_id="do_nothing_task", bash_command="sleep 10;")
Expand Down
Loading
Loading