Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def start_task(
code_location: str | None,
nominal_start_time: str | None,
nominal_end_time: str | None,
owners: list[str],
owners: list[str] | None,
task: OperatorLineage | None,
run_facets: dict[str, RunFacet] | None = None,
) -> RunEvent:
Expand All @@ -204,7 +204,7 @@ def start_task(
:param code_location: file path or URL of DAG file
:param nominal_start_time: scheduled time of dag run
:param nominal_end_time: following schedule of dag run
:param owners: list of owners of DAG
:param owners: list of owners
:param task: metadata container with information extracted from operator
:param run_facets: custom run facets
"""
Expand Down Expand Up @@ -241,6 +241,7 @@ def complete_task(
job_name: str,
end_time: str,
task: OperatorLineage,
owners: list[str] | None,
run_facets: dict[str, RunFacet] | None = None,
) -> RunEvent:
"""
Expand All @@ -250,6 +251,7 @@ def complete_task(
:param job_name: globally unique identifier of task between dags
:param end_time: time of task completion
:param task: metadata container with information extracted from operator
:param owners: list of owners
:param run_facets: additional run facets
"""
run_facets = run_facets or {}
Expand All @@ -263,7 +265,7 @@ def complete_task(
run_id=run_id,
run_facets=run_facets,
),
job=self._build_job(job_name, job_type=_JOB_TYPE_TASK, job_facets=task.job_facets),
job=self._build_job(job_name, job_type=_JOB_TYPE_TASK, job_facets=task.job_facets, owners=owners),
inputs=task.inputs,
outputs=task.outputs,
producer=_PRODUCER,
Expand All @@ -276,6 +278,7 @@ def fail_task(
job_name: str,
end_time: str,
task: OperatorLineage,
owners: list[str] | None,
error: str | BaseException | None = None,
run_facets: dict[str, RunFacet] | None = None,
) -> RunEvent:
Expand All @@ -287,6 +290,7 @@ def fail_task(
:param end_time: time of task completion
:param task: metadata container with information extracted from operator
:param run_facets: custom run facets
:param owners: list of owners
:param error: error
:param run_facets: additional run facets
"""
Expand All @@ -312,7 +316,7 @@ def fail_task(
run_id=run_id,
run_facets=run_facets,
),
job=self._build_job(job_name, job_type=_JOB_TYPE_TASK, job_facets=task.job_facets),
job=self._build_job(job_name, job_type=_JOB_TYPE_TASK, job_facets=task.job_facets, owners=owners),
inputs=task.inputs,
outputs=task.outputs,
producer=_PRODUCER,
Expand All @@ -326,7 +330,7 @@ def dag_started(
start_date: datetime,
nominal_start_time: str,
nominal_end_time: str,
owners: list[str],
owners: list[str] | None,
run_facets: dict[str, RunFacet],
clear_number: int,
description: str | None = None,
Expand Down Expand Up @@ -371,13 +375,14 @@ def dag_success(
clear_number: int,
dag_run_state: DagRunState,
task_ids: list[str],
owners: list[str] | None,
run_facets: dict[str, RunFacet],
):
try:
event = RunEvent(
eventType=RunState.COMPLETE,
eventTime=end_date.isoformat(),
job=self._build_job(job_name=dag_id, job_type=_JOB_TYPE_DAG),
job=self._build_job(job_name=dag_id, job_type=_JOB_TYPE_DAG, owners=owners),
run=Run(
runId=self.build_dag_run_id(
dag_id=dag_id, logical_date=logical_date, clear_number=clear_number
Expand Down Expand Up @@ -409,14 +414,15 @@ def dag_failed(
clear_number: int,
dag_run_state: DagRunState,
task_ids: list[str],
owners: list[str] | None,
msg: str,
run_facets: dict[str, RunFacet],
):
try:
event = RunEvent(
eventType=RunState.FAIL,
eventTime=end_date.isoformat(),
job=self._build_job(job_name=dag_id, job_type=_JOB_TYPE_DAG),
job=self._build_job(job_name=dag_id, job_type=_JOB_TYPE_DAG, owners=owners),
run=Run(
runId=self.build_dag_run_id(
dag_id=dag_id,
Expand Down Expand Up @@ -488,7 +494,7 @@ def _build_job(
facets.update(
{
"ownership": ownership_job.OwnershipJobFacet(
owners=[ownership_job.Owner(name=owner) for owner in owners]
owners=[ownership_job.Owner(name=owner) for owner in sorted(owners)]
)
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ def on_running():
code_location=None,
nominal_start_time=data_interval_start,
nominal_end_time=data_interval_end,
owners=dag.owner.split(", "),
# If task owner is default ("airflow"), use DAG owner instead that may have more details
owners=[x.strip() for x in (task if task.owner != "airflow" else dag).owner.split(",")],
task=task_metadata,
run_facets={
**get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id),
Expand Down Expand Up @@ -316,6 +317,8 @@ def on_success():
job_name=get_job_name(task),
end_time=end_date.isoformat(),
task=task_metadata,
# If task owner is default ("airflow"), use DAG owner instead that may have more details
owners=[x.strip() for x in (task if task.owner != "airflow" else dag).owner.split(",")],
run_facets={
**get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id),
**get_user_provided_run_facets(task_instance, TaskInstanceState.SUCCESS),
Expand Down Expand Up @@ -436,6 +439,8 @@ def on_failure():
end_time=end_date.isoformat(),
task=task_metadata,
error=error,
# If task owner is default ("airflow"), use DAG owner instead that may have more details
owners=[x.strip() for x in (task if task.owner != "airflow" else dag).owner.split(",")],
run_facets={
**get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id),
**get_user_provided_run_facets(task_instance, TaskInstanceState.FAILED),
Expand Down Expand Up @@ -641,6 +646,7 @@ def on_dag_run_success(self, dag_run: DagRun, msg: str) -> None:
end_date=dag_run.end_date,
logical_date=date,
clear_number=dag_run.clear_number,
owners=[x.strip() for x in dag_run.dag.owner.split(",")] if dag_run.dag else None,
task_ids=task_ids,
dag_run_state=dag_run.get_state(),
run_facets={**get_airflow_dag_run_facet(dag_run)},
Expand Down Expand Up @@ -677,6 +683,7 @@ def on_dag_run_failed(self, dag_run: DagRun, msg: str) -> None:
end_date=dag_run.end_date,
logical_date=date,
clear_number=dag_run.clear_number,
owners=[x.strip() for x in dag_run.dag.owner.split(",")] if dag_run.dag else None,
dag_run_state=dag_run.get_state(),
task_ids=task_ids,
msg=msg,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,7 @@ def test_emit_complete_event(mock_stats_incr, mock_stats_timer):
run_id = str(uuid.uuid4())
event_time = datetime.datetime.now().isoformat()
adapter.complete_task(
run_id=run_id,
end_time=event_time,
job_name="job",
task=OperatorLineage(),
run_id=run_id, end_time=event_time, job_name="job", task=OperatorLineage(), owners=[]
)

assert (
Expand Down Expand Up @@ -367,6 +364,7 @@ def test_emit_complete_event_with_additional_information(mock_stats_incr, mock_s
run_id=run_id,
end_time=event_time,
job_name="job",
owners=["owner1", "owner2"],
task=OperatorLineage(
inputs=[Dataset(namespace="bigquery", name="a.b.c"), Dataset(namespace="bigquery", name="x.y.z")],
outputs=[Dataset(namespace="gs://bucket", name="exported_folder")],
Expand Down Expand Up @@ -423,6 +421,12 @@ def test_emit_complete_event_with_additional_information(mock_stats_incr, mock_s
namespace="default",
name="job",
facets={
"ownership": ownership_job.OwnershipJobFacet(
owners=[
ownership_job.Owner(name="owner1", type=None),
ownership_job.Owner(name="owner2", type=None),
]
),
"sql": sql_job.SQLJobFacet(query="SELECT 1;"),
"jobType": job_type_job.JobTypeJobFacet(
processingType="BATCH", integration="AIRFLOW", jobType="TASK"
Expand Down Expand Up @@ -457,6 +461,7 @@ def test_emit_failed_event(mock_stats_incr, mock_stats_timer):
end_time=event_time,
job_name="job",
task=OperatorLineage(),
owners=[],
)

assert (
Expand Down Expand Up @@ -506,6 +511,7 @@ def test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta
run_id=run_id,
end_time=event_time,
job_name="job",
owners=["owner1", "owner2"],
task=OperatorLineage(
inputs=[Dataset(namespace="bigquery", name="a.b.c"), Dataset(namespace="bigquery", name="x.y.z")],
outputs=[Dataset(namespace="gs://bucket", name="exported_folder")],
Expand Down Expand Up @@ -565,6 +571,12 @@ def test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta
namespace="default",
name="job",
facets={
"ownership": ownership_job.OwnershipJobFacet(
owners=[
ownership_job.Owner(name="owner1", type=None),
ownership_job.Owner(name="owner2", type=None),
]
),
"sql": sql_job.SQLJobFacet(query="SELECT 1;"),
"jobType": job_type_job.JobTypeJobFacet(
processingType="BATCH", integration="AIRFLOW", jobType="TASK"
Expand Down Expand Up @@ -663,7 +675,7 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat
clear_number=0,
nominal_start_time=event_time.isoformat(),
nominal_end_time=event_time.isoformat(),
owners=["airflow"],
owners=["owner1", "owner2"],
description=dag.description,
run_facets={
"parent": parent_run.ParentRunFacet(
Expand Down Expand Up @@ -725,7 +737,8 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat
"documentation": documentation_job.DocumentationJobFacet(description="dag desc"),
"ownership": ownership_job.OwnershipJobFacet(
owners=[
ownership_job.Owner(name="airflow", type=None),
ownership_job.Owner(name="owner1", type=None),
ownership_job.Owner(name="owner2", type=None),
]
),
**job_facets,
Expand Down Expand Up @@ -796,6 +809,7 @@ def test_emit_dag_complete_event(
clear_number=0,
dag_run_state=DagRunState.SUCCESS,
task_ids=["task_0", "task_1", "task_2.test"],
owners=["owner1", "owner2"],
run_facets={
"parent": parent_run.ParentRunFacet(
run=parent_run.Run(runId=random_uuid),
Expand Down Expand Up @@ -843,9 +857,15 @@ def test_emit_dag_complete_event(
namespace=namespace(),
name=dag_id,
facets={
"ownership": ownership_job.OwnershipJobFacet(
owners=[
ownership_job.Owner(name="owner1", type=None),
ownership_job.Owner(name="owner2", type=None),
]
),
"jobType": job_type_job.JobTypeJobFacet(
processingType="BATCH", integration="AIRFLOW", jobType="DAG"
)
),
},
),
producer=_PRODUCER,
Expand Down Expand Up @@ -910,6 +930,7 @@ def test_emit_dag_failed_event(
dag_run_state=DagRunState.FAILED,
task_ids=["task_0", "task_1", "task_2.test"],
msg="error msg",
owners=["owner1", "owner2"],
run_facets={
"parent": parent_run.ParentRunFacet(
run=parent_run.Run(runId=random_uuid),
Expand Down Expand Up @@ -960,9 +981,15 @@ def test_emit_dag_failed_event(
namespace=namespace(),
name=dag_id,
facets={
"ownership": ownership_job.OwnershipJobFacet(
owners=[
ownership_job.Owner(name="owner1", type=None),
ownership_job.Owner(name="owner2", type=None),
]
),
"jobType": job_type_job.JobTypeJobFacet(
processingType="BATCH", integration="AIRFLOW", jobType="DAG"
)
),
},
),
producer=_PRODUCER,
Expand Down
Loading
Loading