Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ def on_running():
tags=dag.tags,
task=task_metadata,
run_facets={
**get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id),
**get_user_provided_run_facets(task_instance, TaskInstanceState.RUNNING),
**get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id),
**get_airflow_mapped_task_facet(task_instance),
**get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid),
**debug_facet,
Expand Down Expand Up @@ -349,8 +349,8 @@ def on_success():
nominal_start_time=data_interval_start,
nominal_end_time=data_interval_end,
run_facets={
**get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id),
**get_user_provided_run_facets(task_instance, TaskInstanceState.SUCCESS),
**get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id),
**get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid),
**get_airflow_debug_facet(),
},
Expand Down Expand Up @@ -487,8 +487,8 @@ def on_failure():
job_description=doc,
job_description_type=doc_type,
run_facets={
**get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id),
**get_user_provided_run_facets(task_instance, TaskInstanceState.FAILED),
**get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id),
**get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid),
**get_airflow_debug_facet(),
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,8 @@ def mock_task_id(dag_id, task_id, try_number, logical_date, map_index):
)
def test_adapter_start_task_is_called_with_proper_arguments(
self,
mock_get_airflow_mapped_task_facet,
mock_get_user_provided_run_facets,
mock_get_airflow_mapped_task_facet,
mock_get_airflow_run_facet,
mock_get_task_parent_run_facet,
mock_disabled,
Expand All @@ -332,7 +332,7 @@ def test_adapter_start_task_is_called_with_proper_arguments(

listener, task_instance = self._create_listener_and_task_instance()
mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99}
mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3}
mock_get_task_parent_run_facet.return_value = {"parent": 4}
mock_debug_facet.return_value = {"debug": "packages"}
Expand Down Expand Up @@ -371,8 +371,8 @@ def test_adapter_start_task_is_called_with_proper_arguments(
)
def test_adapter_start_task_is_called_with_dag_owners_when_task_owner_is_default(
self,
mock_get_airflow_mapped_task_facet,
mock_get_user_provided_run_facets,
mock_get_airflow_mapped_task_facet,
mock_get_airflow_run_facet,
mock_get_task_parent_run_facet,
mock_disabled,
Expand All @@ -381,7 +381,7 @@ def test_adapter_start_task_is_called_with_dag_owners_when_task_owner_is_default
):
listener, task_instance = self._create_listener_and_task_instance()
mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99}
mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3}
mock_get_task_parent_run_facet.return_value = {"parent": 4}
mock_debug_facet.return_value = {"debug": "packages"}
Expand All @@ -405,8 +405,8 @@ def test_adapter_start_task_is_called_with_dag_owners_when_task_owner_is_default
def test_adapter_start_task_is_called_with_dag_description_when_task_doc_is_empty(
self,
mock_get_job_name,
mock_get_airflow_mapped_task_facet,
mock_get_user_provided_run_facets,
mock_get_airflow_mapped_task_facet,
mock_get_airflow_run_facet,
mock_get_task_parent_run_facet,
mock_disabled,
Expand All @@ -416,7 +416,7 @@ def test_adapter_start_task_is_called_with_dag_description_when_task_doc_is_empt
listener, task_instance = self._create_listener_and_task_instance()
mock_get_job_name.return_value = "job_name"
mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99}
mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3}
mock_get_task_parent_run_facet.return_value = {"parent": 4}
mock_debug_facet.return_value = {"debug": "packages"}
Expand Down Expand Up @@ -457,7 +457,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments(

listener, task_instance = self._create_listener_and_task_instance()
task_instance.logical_date = timezone.datetime(2020, 1, 1, 1, 1, 1)
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99}
mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}}
mock_get_task_parent_run_facet.return_value = {"parent": 4}
mock_debug_facet.return_value = {"debug": "packages"}
Expand Down Expand Up @@ -499,8 +499,8 @@ def test_adapter_fail_task_is_called_with_proper_arguments(
)
def test_adapter_fail_task_is_called_with_dag_owners_when_task_owner_is_default(
self,
mock_get_airflow_mapped_task_facet,
mock_get_user_provided_run_facets,
mock_get_airflow_mapped_task_facet,
mock_get_airflow_run_facet,
mock_get_task_parent_run_facet,
mock_disabled,
Expand All @@ -509,7 +509,7 @@ def test_adapter_fail_task_is_called_with_dag_owners_when_task_owner_is_default(
):
listener, task_instance = self._create_listener_and_task_instance()
mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99}
mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3}
mock_get_task_parent_run_facet.return_value = {"parent": 4}
mock_debug_facet.return_value = {"debug": "packages"}
Expand All @@ -534,8 +534,8 @@ def test_adapter_fail_task_is_called_with_dag_owners_when_task_owner_is_default(
)
def test_adapter_fail_task_is_called_with_dag_description_when_task_doc_is_empty(
self,
mock_get_airflow_mapped_task_facet,
mock_get_user_provided_run_facets,
mock_get_airflow_mapped_task_facet,
mock_get_airflow_run_facet,
mock_get_task_parent_run_facet,
mock_disabled,
Expand All @@ -544,7 +544,7 @@ def test_adapter_fail_task_is_called_with_dag_description_when_task_doc_is_empty
):
listener, task_instance = self._create_listener_and_task_instance()
mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99}
mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3}
mock_get_task_parent_run_facet.return_value = {"parent": 4}
mock_debug_facet.return_value = {"debug": "packages"}
Expand Down Expand Up @@ -588,7 +588,7 @@ def test_adapter_complete_task_is_called_with_proper_arguments(
time_machine.move_to(timezone.datetime(2023, 1, 3, 13, 1, 1))

listener, task_instance = self._create_listener_and_task_instance()
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99}
mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}}
mock_get_task_parent_run_facet.return_value = {"parent": 4}
mock_debug_facet.return_value = {"debug": "packages"}
Expand Down Expand Up @@ -630,8 +630,8 @@ def test_adapter_complete_task_is_called_with_proper_arguments(
)
def test_adapter_complete_task_is_called_with_dag_owners_when_task_owner_is_default(
self,
mock_get_airflow_mapped_task_facet,
mock_get_user_provided_run_facets,
mock_get_airflow_mapped_task_facet,
mock_get_airflow_run_facet,
mock_get_task_parent_run_facet,
mock_disabled,
Expand All @@ -640,7 +640,7 @@ def test_adapter_complete_task_is_called_with_dag_owners_when_task_owner_is_defa
):
listener, task_instance = self._create_listener_and_task_instance()
mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99}
mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3}
mock_get_task_parent_run_facet.return_value = {"parent": 4}
mock_debug_facet.return_value = {"debug": "packages"}
Expand All @@ -662,8 +662,8 @@ def test_adapter_complete_task_is_called_with_dag_owners_when_task_owner_is_defa
)
def test_adapter_complete_task_is_called_with_dag_description_when_task_doc_is_empty(
self,
mock_get_airflow_mapped_task_facet,
mock_get_user_provided_run_facets,
mock_get_airflow_mapped_task_facet,
mock_get_airflow_run_facet,
mock_get_task_parent_run_facet,
mock_disabled,
Expand All @@ -672,7 +672,7 @@ def test_adapter_complete_task_is_called_with_dag_description_when_task_doc_is_e
):
listener, task_instance = self._create_listener_and_task_instance()
mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99}
mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3}
mock_get_task_parent_run_facet.return_value = {"parent": 4}
mock_debug_facet.return_value = {"debug": "packages"}
Expand Down Expand Up @@ -1198,8 +1198,8 @@ def mock_task_id(dag_id, task_id, try_number, logical_date, map_index):
)
def test_adapter_start_task_is_called_with_proper_arguments(
self,
mock_get_airflow_mapped_task_facet,
mock_get_user_provided_run_facets,
mock_get_airflow_mapped_task_facet,
mock_get_airflow_run_facet,
mock_get_task_parent_run_facet,
mock_disabled,
Expand All @@ -1217,7 +1217,7 @@ def test_adapter_start_task_is_called_with_proper_arguments(
listener, task_instance = self._create_listener_and_task_instance()

mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99}
mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3}
mock_get_task_parent_run_facet.return_value = {"parent": 4}
mock_debug_facet.return_value = {"debug": "packages"}
Expand Down Expand Up @@ -1256,8 +1256,8 @@ def test_adapter_start_task_is_called_with_proper_arguments(
)
def test_adapter_start_task_is_called_with_dag_owners_when_task_owner_is_default(
self,
mock_get_airflow_mapped_task_facet,
mock_get_user_provided_run_facets,
mock_get_airflow_mapped_task_facet,
mock_get_airflow_run_facet,
mock_get_task_parent_run_facet,
mock_disabled,
Expand All @@ -1266,7 +1266,7 @@ def test_adapter_start_task_is_called_with_dag_owners_when_task_owner_is_default
):
listener, task_instance = self._create_listener_and_task_instance()
mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99}
mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3}
mock_get_task_parent_run_facet.return_value = {"parent": 4}
mock_debug_facet.return_value = {"debug": "packages"}
Expand All @@ -1291,8 +1291,8 @@ def test_adapter_start_task_is_called_with_dag_owners_when_task_owner_is_default
def test_adapter_start_task_is_called_with_dag_description_when_task_doc_is_empty(
self,
mock_get_job_name,
mock_get_airflow_mapped_task_facet,
mock_get_user_provided_run_facets,
mock_get_airflow_mapped_task_facet,
mock_get_airflow_run_facet,
mock_get_task_parent_run_facet,
mock_disabled,
Expand All @@ -1302,7 +1302,7 @@ def test_adapter_start_task_is_called_with_dag_description_when_task_doc_is_empt
listener, task_instance = self._create_listener_and_task_instance()
mock_get_job_name.return_value = "job_name"
mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99}
mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3}
mock_get_task_parent_run_facet.return_value = {"parent": 4}
mock_debug_facet.return_value = {"debug": "packages"}
Expand Down Expand Up @@ -1343,7 +1343,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments(

listener, task_instance = self._create_listener_and_task_instance()
task_instance.get_template_context()["dag_run"].logical_date = timezone.datetime(2020, 1, 1, 1, 1, 1)
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99}
mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}}
mock_get_task_parent_run_facet.return_value = {"parent": 4}
mock_debug_facet.return_value = {"debug": "packages"}
Expand Down Expand Up @@ -1383,8 +1383,8 @@ def test_adapter_fail_task_is_called_with_proper_arguments(
)
def test_adapter_fail_task_is_called_with_dag_owners_when_task_owner_is_default(
self,
mock_get_airflow_mapped_task_facet,
mock_get_user_provided_run_facets,
mock_get_airflow_mapped_task_facet,
mock_get_airflow_run_facet,
mock_get_task_parent_run_facet,
mock_disabled,
Expand All @@ -1393,7 +1393,7 @@ def test_adapter_fail_task_is_called_with_dag_owners_when_task_owner_is_default(
):
listener, task_instance = self._create_listener_and_task_instance()
mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99}
mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3}
mock_get_task_parent_run_facet.return_value = {"parent": 4}
mock_debug_facet.return_value = {"debug": "packages"}
Expand All @@ -1418,8 +1418,8 @@ def test_adapter_fail_task_is_called_with_dag_owners_when_task_owner_is_default(
)
def test_adapter_fail_task_is_called_with_dag_description_when_task_doc_is_empty(
self,
mock_get_airflow_mapped_task_facet,
mock_get_user_provided_run_facets,
mock_get_airflow_mapped_task_facet,
mock_get_airflow_run_facet,
mock_get_task_parent_run_facet,
mock_disabled,
Expand All @@ -1428,7 +1428,7 @@ def test_adapter_fail_task_is_called_with_dag_description_when_task_doc_is_empty
):
listener, task_instance = self._create_listener_and_task_instance()
mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99}
mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3}
mock_get_task_parent_run_facet.return_value = {"parent": 4}
mock_debug_facet.return_value = {"debug": "packages"}
Expand Down Expand Up @@ -1526,7 +1526,7 @@ def test_adapter_complete_task_is_called_with_proper_arguments(
time_machine.move_to(timezone.datetime(2023, 1, 3, 13, 1, 1), tick=False)

listener, task_instance = self._create_listener_and_task_instance()
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99}
mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}}
mock_get_task_parent_run_facet.return_value = {"parent": 4}
mock_debug_facet.return_value = {"debug": "packages"}
Expand Down Expand Up @@ -1566,8 +1566,8 @@ def test_adapter_complete_task_is_called_with_proper_arguments(
)
def test_adapter_complete_task_is_called_with_dag_owners_when_task_owner_is_default(
self,
mock_get_airflow_mapped_task_facet,
mock_get_user_provided_run_facets,
mock_get_airflow_mapped_task_facet,
mock_get_airflow_run_facet,
mock_get_task_parent_run_facet,
mock_disabled,
Expand All @@ -1576,7 +1576,7 @@ def test_adapter_complete_task_is_called_with_dag_owners_when_task_owner_is_defa
):
listener, task_instance = self._create_listener_and_task_instance()
mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99}
mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3}
mock_get_task_parent_run_facet.return_value = {"parent": 4}
mock_debug_facet.return_value = {"debug": "packages"}
Expand All @@ -1600,8 +1600,8 @@ def test_adapter_complete_task_is_called_with_dag_owners_when_task_owner_is_defa
)
def test_adapter_complete_task_is_called_with_dag_description_when_task_doc_is_empty(
self,
mock_get_airflow_mapped_task_facet,
mock_get_user_provided_run_facets,
mock_get_airflow_mapped_task_facet,
mock_get_airflow_run_facet,
mock_get_task_parent_run_facet,
mock_disabled,
Expand All @@ -1610,7 +1610,7 @@ def test_adapter_complete_task_is_called_with_dag_description_when_task_doc_is_e
):
listener, task_instance = self._create_listener_and_task_instance()
mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99}
mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3}
mock_get_task_parent_run_facet.return_value = {"parent": 4}
mock_debug_facet.return_value = {"debug": "packages"}
Expand Down