diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py index 7e3c5c84f7483..d20dc52e009e7 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py @@ -19,7 +19,7 @@ import os import traceback from contextlib import ExitStack -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Literal import yaml from openlineage.client import OpenLineageClient, set_producer @@ -32,7 +32,7 @@ job_type_job, nominal_time_run, ownership_job, - source_code_location_job, + tags_job, ) from openlineage.client.uuid import generate_static_uuid @@ -63,11 +63,8 @@ set_producer(_PRODUCER) -# https://openlineage.io/docs/spec/facets/job-facets/job-type -# They must be set after the `set_producer(_PRODUCER)` -# otherwise the `JobTypeJobFacet._producer` will be set with the default value -_JOB_TYPE_DAG = job_type_job.JobTypeJobFacet(jobType="DAG", integration="AIRFLOW", processingType="BATCH") -_JOB_TYPE_TASK = job_type_job.JobTypeJobFacet(jobType="TASK", integration="AIRFLOW", processingType="BATCH") +_JOB_TYPE_DAG: Literal["DAG"] = "DAG" +_JOB_TYPE_TASK: Literal["TASK"] = "TASK" class OpenLineageAdapter(LoggingMixin): @@ -187,10 +184,10 @@ def start_task( job_name: str, job_description: str, event_time: str, - code_location: str | None, nominal_start_time: str | None, nominal_end_time: str | None, owners: list[str] | None, + tags: list[str] | None, task: OperatorLineage | None, run_facets: dict[str, RunFacet] | None = None, ) -> RunEvent: @@ -201,17 +198,16 @@ def start_task( :param job_name: globally unique identifier of task in dag :param job_description: user provided description of job :param event_time: - :param code_location: file path or URL of DAG file :param nominal_start_time: scheduled time of dag run :param nominal_end_time: following schedule of dag run :param owners: list of owners + :param tags: list of tags :param task: metadata container with information extracted from operator :param run_facets: custom run facets """ run_facets = run_facets or {} if task: run_facets = {**task.run_facets, **run_facets} - run_facets = {**run_facets, **get_processing_engine_facet()} # type: ignore event = RunEvent( eventType=RunState.START, eventTime=event_time, @@ -225,8 +221,8 @@ def start_task( job_name=job_name, job_type=_JOB_TYPE_TASK, job_description=job_description, - code_location=code_location, - owners=owners, + job_owners=owners, + job_tags=tags, job_facets=task.job_facets if task else None, ), inputs=task.inputs if task else [], @@ -242,6 +238,7 @@ def complete_task( end_time: str, task: OperatorLineage, owners: list[str] | None, + tags: list[str] | None, run_facets: dict[str, RunFacet] | None = None, ) -> RunEvent: """ @@ -250,6 +247,7 @@ def complete_task( :param run_id: globally unique identifier of task in dag run :param job_name: globally unique identifier of task between dags :param end_time: time of task completion + :param tags: list of tags :param task: metadata container with information extracted from operator :param owners: list of owners :param run_facets: additional run facets @@ -257,7 +255,6 @@ def complete_task( run_facets = run_facets or {} if task: run_facets = {**task.run_facets, **run_facets} - run_facets = {**run_facets, **get_processing_engine_facet()} # type: ignore event = RunEvent( eventType=RunState.COMPLETE, eventTime=end_time, @@ -265,7 +262,13 @@ def complete_task( run_id=run_id, run_facets=run_facets, ), - job=self._build_job(job_name, job_type=_JOB_TYPE_TASK, job_facets=task.job_facets, owners=owners), + job=self._build_job( + job_name, + job_type=_JOB_TYPE_TASK, + job_facets=task.job_facets, + job_owners=owners, + job_tags=tags, + ), inputs=task.inputs, outputs=task.outputs, producer=_PRODUCER, @@ -279,6 +282,7 @@ def fail_task( end_time: str, task: OperatorLineage, owners: list[str] | None, + tags: list[str] | None, error: str | BaseException | None = None, run_facets: dict[str, RunFacet] | None = None, ) -> RunEvent: @@ -290,6 +294,7 @@ def fail_task( :param end_time: time of task completion :param task: metadata container with information extracted from operator :param run_facets: custom run facets + :param tags: list of tags :param owners: list of owners :param error: error :param run_facets: additional run facets @@ -297,7 +302,6 @@ def fail_task( run_facets = run_facets or {} if task: run_facets = {**task.run_facets, **run_facets} - run_facets = {**run_facets, **get_processing_engine_facet()} # type: ignore if error: stack_trace = None @@ -316,7 +320,13 @@ def fail_task( run_id=run_id, run_facets=run_facets, ), - job=self._build_job(job_name, job_type=_JOB_TYPE_TASK, job_facets=task.job_facets, owners=owners), + job=self._build_job( + job_name, + job_type=_JOB_TYPE_TASK, + job_facets=task.job_facets, + job_owners=owners, + job_tags=tags, + ), inputs=task.inputs, outputs=task.outputs, producer=_PRODUCER, @@ -328,9 +338,10 @@ def dag_started( dag_id: str, logical_date: datetime, start_date: datetime, - nominal_start_time: str, - nominal_end_time: str, + nominal_start_time: str | None, + nominal_end_time: str | None, owners: list[str] | None, + tags: list[str], run_facets: dict[str, RunFacet], clear_number: int, description: str | None = None, @@ -344,8 +355,9 @@ def dag_started( job_name=dag_id, job_type=_JOB_TYPE_DAG, job_description=description, - owners=owners, + job_owners=owners, job_facets=job_facets, + job_tags=tags, ), run=self._build_run( run_id=self.build_dag_run_id( @@ -353,7 +365,7 @@ def dag_started( ), nominal_start_time=nominal_start_time, nominal_end_time=nominal_end_time, - run_facets={**run_facets, **get_airflow_debug_facet(), **get_processing_engine_facet()}, + run_facets={**run_facets, **get_airflow_debug_facet()}, ), inputs=[], outputs=[], @@ -372,6 +384,7 @@ def dag_success( run_id: str, end_date: datetime, logical_date: datetime, + tags: list[str] | None, clear_number: int, dag_run_state: DagRunState, task_ids: list[str], @@ -382,15 +395,19 @@ def dag_success( event = RunEvent( eventType=RunState.COMPLETE, eventTime=end_date.isoformat(), - job=self._build_job(job_name=dag_id, job_type=_JOB_TYPE_DAG, owners=owners), - run=Run( - runId=self.build_dag_run_id( + job=self._build_job( + job_name=dag_id, + job_type=_JOB_TYPE_DAG, + job_owners=owners, + job_tags=tags, + ), + run=self._build_run( + run_id=self.build_dag_run_id( dag_id=dag_id, logical_date=logical_date, clear_number=clear_number ), - facets={ + run_facets={ **get_airflow_state_run_facet(dag_id, run_id, task_ids, dag_run_state), **get_airflow_debug_facet(), - **get_processing_engine_facet(), **run_facets, }, ), @@ -411,6 +428,7 @@ def dag_failed( run_id: str, end_date: datetime, logical_date: datetime, + tags: list[str] | None, clear_number: int, dag_run_state: DagRunState, task_ids: list[str], @@ -422,20 +440,22 @@ def dag_failed( event = RunEvent( eventType=RunState.FAIL, eventTime=end_date.isoformat(), - job=self._build_job(job_name=dag_id, job_type=_JOB_TYPE_DAG, owners=owners), - run=Run( - runId=self.build_dag_run_id( - dag_id=dag_id, - logical_date=logical_date, - clear_number=clear_number, + job=self._build_job( + job_name=dag_id, + job_type=_JOB_TYPE_DAG, + job_owners=owners, + job_tags=tags, + ), + run=self._build_run( + run_id=self.build_dag_run_id( + dag_id=dag_id, logical_date=logical_date, clear_number=clear_number ), - facets={ + run_facets={ "errorMessage": error_message_run.ErrorMessageRunFacet( message=msg, programmingLanguage="python" ), **get_airflow_state_run_facet(dag_id, run_id, task_ids, dag_run_state), **get_airflow_debug_facet(), - **get_processing_engine_facet(), **run_facets, }, ), @@ -457,10 +477,16 @@ def _build_run( nominal_end_time: str | None = None, run_facets: dict[str, RunFacet] | None = None, ) -> Run: - facets: dict[str, RunFacet] = {} + facets: dict[str, RunFacet] = get_processing_engine_facet() # type: ignore[assignment] if nominal_start_time: facets.update( - {"nominalTime": nominal_time_run.NominalTimeRunFacet(nominal_start_time, nominal_end_time)} + { + "nominalTime": nominal_time_run.NominalTimeRunFacet( + nominalStartTime=nominal_start_time, + nominalEndTime=nominal_end_time, + producer=_PRODUCER, + ) + } ) if run_facets: facets.update(run_facets) @@ -470,37 +496,51 @@ def _build_run( @staticmethod def _build_job( job_name: str, - job_type: job_type_job.JobTypeJobFacet, + job_type: Literal["DAG", "TASK"], job_description: str | None = None, - code_location: str | None = None, - owners: list[str] | None = None, + job_owners: list[str] | None = None, + job_tags: list[str] | None = None, job_facets: dict[str, JobFacet] | None = None, ): - facets: dict[str, JobFacet] = {} - + facets: dict[str, JobFacet] = { + "jobType": job_type_job.JobTypeJobFacet( + jobType=job_type, integration="AIRFLOW", processingType="BATCH", producer=_PRODUCER + ) + } if job_description: facets.update( - {"documentation": documentation_job.DocumentationJobFacet(description=job_description)} + { + "documentation": documentation_job.DocumentationJobFacet( + description=job_description, producer=_PRODUCER + ) + } ) - if code_location: + if job_owners: facets.update( { - "sourceCodeLocation": source_code_location_job.SourceCodeLocationJobFacet( - "", url=code_location + "ownership": ownership_job.OwnershipJobFacet( + owners=[ownership_job.Owner(name=owner) for owner in sorted(job_owners)], + producer=_PRODUCER, ) } ) - if owners: + if job_tags: facets.update( { - "ownership": ownership_job.OwnershipJobFacet( - owners=[ownership_job.Owner(name=owner) for owner in sorted(owners)] + "tags": tags_job.TagsJobFacet( + tags=[ + tags_job.TagsJobFacetFields( + key=tag, + value=tag, + source="AIRFLOW", + ) + for tag in sorted(job_tags) + ], + producer=_PRODUCER, ) } ) if job_facets: - facets = {**facets, **job_facets} - - facets.update({"jobType": job_type}) + facets.update(job_facets) - return Job(conf.namespace(), job_name, facets) + return Job(namespace=conf.namespace(), name=job_name, facets=facets) diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py index a8b90611e9b0e..2b6bc10078e2e 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py @@ -151,7 +151,6 @@ def _on_task_instance_running( ) return - # Needs to be calculated outside of inner method so that it gets cached for usage in fork processes data_interval_start = dagrun.data_interval_start if isinstance(data_interval_start, datetime): data_interval_start = data_interval_start.isoformat() @@ -163,6 +162,7 @@ def _on_task_instance_running( if hasattr(dagrun, "clear_number"): clear_number = dagrun.clear_number + # Needs to be calculated outside of inner method so that it gets cached for usage in fork processes debug_facet = get_airflow_debug_facet() @print_warning(self.log) @@ -202,11 +202,11 @@ def on_running(): job_name=get_job_name(task), job_description=dag.description, event_time=start_date.isoformat(), - code_location=None, nominal_start_time=data_interval_start, nominal_end_time=data_interval_end, # If task owner is default ("airflow"), use DAG owner instead that may have more details owners=[x.strip() for x in (task if task.owner != "airflow" else dag).owner.split(",")], + tags=dag.tags, task=task_metadata, run_facets={ **get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id), @@ -319,6 +319,7 @@ def on_success(): task=task_metadata, # If task owner is default ("airflow"), use DAG owner instead that may have more details owners=[x.strip() for x in (task if task.owner != "airflow" else dag).owner.split(",")], + tags=dag.tags, run_facets={ **get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id), **get_user_provided_run_facets(task_instance, TaskInstanceState.SUCCESS), @@ -439,6 +440,7 @@ def on_failure(): end_time=end_date.isoformat(), task=task_metadata, error=error, + tags=dag.tags, # If task owner is default ("airflow"), use DAG owner instead that may have more details owners=[x.strip() for x in (task if task.owner != "airflow" else dag).owner.split(",")], run_facets={ @@ -610,6 +612,7 @@ def on_dag_run_running(self, dag_run: DagRun, msg: str) -> None: clear_number=dag_run.clear_number, owners=[x.strip() for x in dag_run.dag.owner.split(",")] if dag_run.dag else None, description=dag_run.dag.description if dag_run.dag else None, + tags=dag_run.dag.tags if dag_run.dag else [], # AirflowJobFacet should be created outside ProcessPoolExecutor that pickles objects, # as it causes lack of some TaskGroup attributes and crashes event emission. job_facets=get_airflow_job_facet(dag_run=dag_run), @@ -647,6 +650,7 @@ def on_dag_run_success(self, dag_run: DagRun, msg: str) -> None: logical_date=date, clear_number=dag_run.clear_number, owners=[x.strip() for x in dag_run.dag.owner.split(",")] if dag_run.dag else None, + tags=dag_run.dag.tags if dag_run.dag else [], task_ids=task_ids, dag_run_state=dag_run.get_state(), run_facets={**get_airflow_dag_run_facet(dag_run)}, @@ -684,6 +688,7 @@ def on_dag_run_failed(self, dag_run: DagRun, msg: str) -> None: logical_date=date, clear_number=dag_run.clear_number, owners=[x.strip() for x in dag_run.dag.owner.split(",")] if dag_run.dag else None, + tags=dag_run.dag.tags if dag_run.dag else [], dag_run_state=dag_run.get_state(), task_ids=task_ids, msg=msg, diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py b/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py index 86f16e639d05f..45d060cadfa2f 100644 --- a/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py +++ b/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py @@ -35,6 +35,7 @@ parent_run, processing_engine_run, sql_job, + tags_job, ) from airflow import DAG @@ -154,10 +155,10 @@ def test_emit_start_event(mock_stats_incr, mock_stats_timer): job_name="job", job_description="description", event_time=event_time, - code_location=None, nominal_start_time=datetime.datetime(2022, 1, 1).isoformat(), nominal_end_time=datetime.datetime(2022, 1, 1).isoformat(), owners=[], + tags=[], task=None, run_facets=None, ) @@ -215,10 +216,10 @@ def test_emit_start_event_with_additional_information(mock_stats_incr, mock_stat job_name="job", job_description="description", event_time=event_time, - code_location=None, nominal_start_time=datetime.datetime(2022, 1, 1).isoformat(), nominal_end_time=datetime.datetime(2022, 1, 1).isoformat(), owners=["owner1", "owner2"], + tags=["tag1", "tag2"], task=OperatorLineage( inputs=[Dataset(namespace="bigquery", name="a.b.c"), Dataset(namespace="bigquery", name="x.y.z")], outputs=[Dataset(namespace="gs://bucket", name="exported_folder")], @@ -288,6 +289,12 @@ def test_emit_start_event_with_additional_information(mock_stats_incr, mock_stat "jobType": job_type_job.JobTypeJobFacet( processingType="BATCH", integration="AIRFLOW", jobType="TASK" ), + "tags": tags_job.TagsJobFacet( + tags=[ + tags_job.TagsJobFacetFields(key="tag1", value="tag1", source="AIRFLOW"), + tags_job.TagsJobFacetFields(key="tag2", value="tag2", source="AIRFLOW"), + ] + ), }, ), producer=_PRODUCER, @@ -314,7 +321,7 @@ def test_emit_complete_event(mock_stats_incr, mock_stats_timer): run_id = str(uuid.uuid4()) event_time = datetime.datetime.now().isoformat() adapter.complete_task( - run_id=run_id, end_time=event_time, job_name="job", task=OperatorLineage(), owners=[] + run_id=run_id, end_time=event_time, job_name="job", task=OperatorLineage(), owners=[], tags=[] ) assert ( @@ -365,6 +372,7 @@ def test_emit_complete_event_with_additional_information(mock_stats_incr, mock_s end_time=event_time, job_name="job", owners=["owner1", "owner2"], + tags=["tag1", "tag2"], task=OperatorLineage( inputs=[Dataset(namespace="bigquery", name="a.b.c"), Dataset(namespace="bigquery", name="x.y.z")], outputs=[Dataset(namespace="gs://bucket", name="exported_folder")], @@ -431,6 +439,12 @@ def test_emit_complete_event_with_additional_information(mock_stats_incr, mock_s "jobType": job_type_job.JobTypeJobFacet( processingType="BATCH", integration="AIRFLOW", jobType="TASK" ), + "tags": tags_job.TagsJobFacet( + tags=[ + tags_job.TagsJobFacetFields(key="tag1", value="tag1", source="AIRFLOW"), + tags_job.TagsJobFacetFields(key="tag2", value="tag2", source="AIRFLOW"), + ] + ), }, ), producer=_PRODUCER, @@ -462,6 +476,7 @@ def test_emit_failed_event(mock_stats_incr, mock_stats_timer): job_name="job", task=OperatorLineage(), owners=[], + tags=[], ) assert ( @@ -512,6 +527,7 @@ def test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta end_time=event_time, job_name="job", owners=["owner1", "owner2"], + tags=["tag1", "tag2"], task=OperatorLineage( inputs=[Dataset(namespace="bigquery", name="a.b.c"), Dataset(namespace="bigquery", name="x.y.z")], outputs=[Dataset(namespace="gs://bucket", name="exported_folder")], @@ -581,6 +597,12 @@ def test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta "jobType": job_type_job.JobTypeJobFacet( processingType="BATCH", integration="AIRFLOW", jobType="TASK" ), + "tags": tags_job.TagsJobFacet( + tags=[ + tags_job.TagsJobFacetFields(key="tag1", value="tag1", source="AIRFLOW"), + tags_job.TagsJobFacetFields(key="tag2", value="tag2", source="AIRFLOW"), + ] + ), }, ), producer=_PRODUCER, @@ -613,6 +635,7 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat schedule=datetime.timedelta(days=1), start_date=datetime.datetime(2024, 6, 1), description="dag desc", + tags=["mytag1"], ) as dag: tg = TaskGroup(group_id="tg1") tg2 = TaskGroup(group_id="tg2", parent_group=tg) @@ -647,7 +670,7 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat "description": "dag desc", "owner": "airflow", "start_date": "2024-06-01T00:00:00+00:00", - "tags": "[]", + "tags": "['mytag1']", "fileloc": pathlib.Path(__file__).resolve().as_posix(), } if hasattr(dag, "schedule_interval"): # Airflow 2 compat. @@ -677,6 +700,7 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat nominal_end_time=event_time.isoformat(), owners=["owner1", "owner2"], description=dag.description, + tags=["tag1", "tag2"], run_facets={ "parent": parent_run.ParentRunFacet( run=parent_run.Run(runId=random_uuid), @@ -741,6 +765,12 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat ownership_job.Owner(name="owner2", type=None), ] ), + "tags": tags_job.TagsJobFacet( + tags=[ + tags_job.TagsJobFacetFields(key="tag1", value="tag1", source="AIRFLOW"), + tags_job.TagsJobFacetFields(key="tag2", value="tag2", source="AIRFLOW"), + ] + ), **job_facets, "jobType": job_type_job.JobTypeJobFacet( processingType="BATCH", integration="AIRFLOW", jobType="DAG" @@ -810,6 +840,7 @@ def test_emit_dag_complete_event( dag_run_state=DagRunState.SUCCESS, task_ids=["task_0", "task_1", "task_2.test"], owners=["owner1", "owner2"], + tags=["tag1", "tag2"], run_facets={ "parent": parent_run.ParentRunFacet( run=parent_run.Run(runId=random_uuid), @@ -863,6 +894,12 @@ def test_emit_dag_complete_event( ownership_job.Owner(name="owner2", type=None), ] ), + "tags": tags_job.TagsJobFacet( + tags=[ + tags_job.TagsJobFacetFields(key="tag1", value="tag1", source="AIRFLOW"), + tags_job.TagsJobFacetFields(key="tag2", value="tag2", source="AIRFLOW"), + ] + ), "jobType": job_type_job.JobTypeJobFacet( processingType="BATCH", integration="AIRFLOW", jobType="DAG" ), @@ -929,6 +966,7 @@ def test_emit_dag_failed_event( clear_number=0, dag_run_state=DagRunState.FAILED, task_ids=["task_0", "task_1", "task_2.test"], + tags=["tag1", "tag2"], msg="error msg", owners=["owner1", "owner2"], run_facets={ @@ -987,6 +1025,12 @@ def test_emit_dag_failed_event( ownership_job.Owner(name="owner2", type=None), ] ), + "tags": tags_job.TagsJobFacet( + tags=[ + tags_job.TagsJobFacetFields(key="tag1", value="tag1", source="AIRFLOW"), + tags_job.TagsJobFacetFields(key="tag2", value="tag2", source="AIRFLOW"), + ] + ), "jobType": job_type_job.JobTypeJobFacet( processingType="BATCH", integration="AIRFLOW", jobType="DAG" ), diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py index f5e1c02798087..e6625c6304d41 100644 --- a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py +++ b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py @@ -249,6 +249,7 @@ def mock_task_id(dag_id, task_id, try_number, logical_date, map_index): task_instance.task.dag.dag_id = "dag_id" task_instance.task.dag.description = "Test DAG Description" task_instance.task.dag.owner = "Test Owner" + task_instance.task.dag.tags = ["tag1", "tag2"] task_instance.task.owner = "task_owner" task_instance.task.inlets = [] task_instance.task.outlets = [] @@ -312,10 +313,10 @@ def test_adapter_start_task_is_called_with_proper_arguments( job_name="job_name", job_description="Test DAG Description", event_time="2023-01-01T13:01:01+00:00", - code_location=None, nominal_start_time=None, nominal_end_time=None, owners=["task_owner"], + tags=["tag1", "tag2"], task=listener.extractor_manager.extract_metadata(), run_facets={ "parent": 4, @@ -409,6 +410,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments( job_name="job_name", run_id="2020-01-01T01:01:01+00:00.dag_id.task_id.1.-1", owners=["task_owner"], + tags=["tag1", "tag2"], task=listener.extractor_manager.extract_metadata(), run_facets={ "parent": 4, @@ -507,6 +509,7 @@ def test_adapter_complete_task_is_called_with_proper_arguments( run_id=f"2020-01-01T01:01:01+00:00.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}.-1", task=listener.extractor_manager.extract_metadata(), owners=["task_owner"], + tags=["tag1", "tag2"], run_facets={ "parent": 4, "custom_user_facet": 2, @@ -985,6 +988,7 @@ def _create_listener_and_task_instance( dag = DAG( dag_id="dag_id", description="Test DAG Description", + tags=["tag1", "tag2"], ) task = EmptyOperator(task_id="task_id", dag=dag, owner="task_owner") task2 = EmptyOperator(task_id="task_id2", dag=dag, owner="another_owner") # noqa: F841 @@ -1090,10 +1094,10 @@ def test_adapter_start_task_is_called_with_proper_arguments( job_name="job_name", job_description="Test DAG Description", event_time="2023-01-01T13:01:01+00:00", - code_location=None, nominal_start_time=None, nominal_end_time=None, owners=["task_owner"], + tags={"tag1", "tag2"}, task=listener.extractor_manager.extract_metadata(), run_facets={ "mapped_facet": 1, @@ -1187,6 +1191,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments( run_id="2020-01-01T01:01:01+00:00.dag_id.task_id.1.-1", task=listener.extractor_manager.extract_metadata(), owners=["task_owner"], + tags={"tag1", "tag2"}, run_facets={ "parent": 4, "custom_user_facet": 2, @@ -1331,6 +1336,7 @@ def test_adapter_complete_task_is_called_with_proper_arguments( run_id="2020-01-01T01:01:01+00:00.dag_id.task_id.1.-1", task=listener.extractor_manager.extract_metadata(), owners=["task_owner"], + tags={"tag1", "tag2"}, run_facets={ "parent": 4, "custom_user_facet": 2,