diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py index d20dc52e009e7..fd73a56a917ac 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py @@ -237,6 +237,8 @@ def complete_task( job_name: str, end_time: str, task: OperatorLineage, + nominal_start_time: str | None, + nominal_end_time: str | None, owners: list[str] | None, tags: list[str] | None, run_facets: dict[str, RunFacet] | None = None, @@ -248,6 +250,8 @@ def complete_task( :param job_name: globally unique identifier of task between dags :param end_time: time of task completion :param tags: list of tags + :param nominal_start_time: scheduled time of dag run + :param nominal_end_time: following schedule of dag run :param task: metadata container with information extracted from operator :param owners: list of owners :param run_facets: additional run facets @@ -260,6 +264,8 @@ def complete_task( eventTime=end_time, run=self._build_run( run_id=run_id, + nominal_start_time=nominal_start_time, + nominal_end_time=nominal_end_time, run_facets=run_facets, ), job=self._build_job( @@ -281,6 +287,8 @@ def fail_task( job_name: str, end_time: str, task: OperatorLineage, + nominal_start_time: str | None, + nominal_end_time: str | None, owners: list[str] | None, tags: list[str] | None, error: str | BaseException | None = None, @@ -295,6 +303,8 @@ def fail_task( :param task: metadata container with information extracted from operator :param run_facets: custom run facets :param tags: list of tags + :param nominal_start_time: scheduled time of dag run + :param nominal_end_time: following schedule of dag run :param owners: list of owners :param error: error :param run_facets: additional run facets @@ -318,6 +328,8 @@ def fail_task( eventTime=end_time, run=self._build_run( run_id=run_id, + nominal_start_time=nominal_start_time, + nominal_end_time=nominal_end_time, run_facets=run_facets, ), job=self._build_job( @@ -384,6 +396,8 @@ def dag_success( run_id: str, end_date: datetime, logical_date: datetime, + nominal_start_time: str | None, + nominal_end_time: str | None, tags: list[str] | None, clear_number: int, dag_run_state: DagRunState, @@ -405,6 +419,8 @@ def dag_success( run_id=self.build_dag_run_id( dag_id=dag_id, logical_date=logical_date, clear_number=clear_number ), + nominal_start_time=nominal_start_time, + nominal_end_time=nominal_end_time, run_facets={ **get_airflow_state_run_facet(dag_id, run_id, task_ids, dag_run_state), **get_airflow_debug_facet(), @@ -428,6 +444,8 @@ def dag_failed( run_id: str, end_date: datetime, logical_date: datetime, + nominal_start_time: str | None, + nominal_end_time: str | None, tags: list[str] | None, clear_number: int, dag_run_state: DagRunState, @@ -450,6 +468,8 @@ def dag_failed( run_id=self.build_dag_run_id( dag_id=dag_id, logical_date=logical_date, clear_number=clear_number ), + nominal_start_time=nominal_start_time, + nominal_end_time=nominal_end_time, run_facets={ "errorMessage": error_message_run.ErrorMessageRunFacet( message=msg, programmingLanguage="python" diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py index 2b6bc10078e2e..76b35974c538b 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py @@ -151,17 +151,6 @@ def _on_task_instance_running( ) return - data_interval_start = dagrun.data_interval_start - if isinstance(data_interval_start, datetime): - data_interval_start = data_interval_start.isoformat() - data_interval_end = dagrun.data_interval_end - if isinstance(data_interval_end, datetime): - data_interval_end = data_interval_end.isoformat() - - clear_number = 0 - if hasattr(dagrun, "clear_number"): - clear_number = dagrun.clear_number - # Needs to be calculated outside of inner method so that it gets cached for usage in fork processes debug_facet = get_airflow_debug_facet() @@ -176,6 +165,10 @@ def on_running(): if AIRFLOW_V_3_0_PLUS and date is None: date = dagrun.run_after + clear_number = 0 + if hasattr(dagrun, "clear_number"): + clear_number = dagrun.clear_number + parent_run_id = self.adapter.build_dag_run_id( dag_id=dag.dag_id, logical_date=date, @@ -192,6 +185,13 @@ def on_running(): event_type = RunState.RUNNING.value.lower() operator_name = task.task_type.lower() + data_interval_start = dagrun.data_interval_start + if isinstance(data_interval_start, datetime): + data_interval_start = data_interval_start.isoformat() + data_interval_end = dagrun.data_interval_end + if isinstance(data_interval_end, datetime): + data_interval_end = data_interval_end.isoformat() + with Stats.timer(f"ol.extract.{event_type}.{operator_name}"): task_metadata = self.extractor_manager.extract_metadata( dagrun=dagrun, task=task, task_instance_state=TaskInstanceState.RUNNING @@ -304,6 +304,13 @@ def on_success(): event_type = RunState.COMPLETE.value.lower() operator_name = task.task_type.lower() + data_interval_start = dagrun.data_interval_start + if isinstance(data_interval_start, datetime): + data_interval_start = data_interval_start.isoformat() + data_interval_end = dagrun.data_interval_end + if isinstance(data_interval_end, datetime): + data_interval_end = data_interval_end.isoformat() + with Stats.timer(f"ol.extract.{event_type}.{operator_name}"): task_metadata = self.extractor_manager.extract_metadata( dagrun=dagrun, @@ -320,6 +327,8 @@ def on_success(): # If task owner is default ("airflow"), use DAG owner instead that may have more details owners=[x.strip() for x in (task if task.owner != "airflow" else dag).owner.split(",")], tags=dag.tags, + nominal_start_time=data_interval_start, + nominal_end_time=data_interval_end, run_facets={ **get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id), **get_user_provided_run_facets(task_instance, TaskInstanceState.SUCCESS), @@ -426,6 +435,13 @@ def on_failure(): event_type = RunState.FAIL.value.lower() operator_name = task.task_type.lower() + data_interval_start = dagrun.data_interval_start + if isinstance(data_interval_start, datetime): + data_interval_start = data_interval_start.isoformat() + data_interval_end = dagrun.data_interval_end + if isinstance(data_interval_end, datetime): + data_interval_end = data_interval_end.isoformat() + with Stats.timer(f"ol.extract.{event_type}.{operator_name}"): task_metadata = self.extractor_manager.extract_metadata( dagrun=dagrun, @@ -440,6 +456,8 @@ def on_failure(): end_time=end_date.isoformat(), task=task_metadata, error=error, + nominal_start_time=data_interval_start, + nominal_end_time=data_interval_end, tags=dag.tags, # If task owner is default ("airflow"), use DAG owner instead that may have more details owners=[x.strip() for x in (task if task.owner != "airflow" else dag).owner.split(",")], @@ -642,11 +660,18 @@ def on_dag_run_success(self, dag_run: DagRun, msg: str) -> None: if AIRFLOW_V_3_0_PLUS and date is None: date = dag_run.run_after + data_interval_start = ( + dag_run.data_interval_start.isoformat() if dag_run.data_interval_start else None + ) + data_interval_end = dag_run.data_interval_end.isoformat() if dag_run.data_interval_end else None + self.submit_callable( self.adapter.dag_success, dag_id=dag_run.dag_id, run_id=dag_run.run_id, end_date=dag_run.end_date, + nominal_start_time=data_interval_start, + nominal_end_time=data_interval_end, logical_date=date, clear_number=dag_run.clear_number, owners=[x.strip() for x in dag_run.dag.owner.split(",")] if dag_run.dag else None, @@ -680,11 +705,18 @@ def on_dag_run_failed(self, dag_run: DagRun, msg: str) -> None: if AIRFLOW_V_3_0_PLUS and date is None: date = dag_run.run_after + data_interval_start = ( + dag_run.data_interval_start.isoformat() if dag_run.data_interval_start else None + ) + data_interval_end = dag_run.data_interval_end.isoformat() if dag_run.data_interval_end else None + self.submit_callable( self.adapter.dag_failed, dag_id=dag_run.dag_id, run_id=dag_run.run_id, end_date=dag_run.end_date, + nominal_start_time=data_interval_start, + nominal_end_time=data_interval_end, logical_date=date, clear_number=dag_run.clear_number, owners=[x.strip() for x in dag_run.dag.owner.split(",")] if dag_run.dag else None, diff --git a/providers/openlineage/tests/system/openlineage/example_openlineage_base_complex_dag.py b/providers/openlineage/tests/system/openlineage/example_openlineage_base_complex_dag.py index a8cadc73fef72..43ddaab341a6f 100644 --- a/providers/openlineage/tests/system/openlineage/example_openlineage_base_complex_dag.py +++ b/providers/openlineage/tests/system/openlineage/example_openlineage_base_complex_dag.py @@ -76,15 +76,15 @@ def __init__(self, **kwargs): catchup=False, description="OpenLineage complex DAG description", owner_links={"airflow": "https://airflow.apache.org/"}, - tags=["first", "second@", "with'quote"], + tags=["first", "second@", "with'quote", 'z"e'], default_args={"retries": 0}, ) as dag: # task_0 will not emit any events, but the owner will be picked up and added to DAG - task_0 = EmptyOperator(task_id="task_0", owner="owner1") + task_0 = EmptyOperator(task_id="task_0", owner='owner"1') task_1 = BashOperator( task_id="task_1.id.with.dots", bash_command="exit 0;", - owner="owner2", + owner="owner'2", execution_timeout=timedelta(seconds=456), ) task_2 = PythonOperator( diff --git a/providers/openlineage/tests/system/openlineage/example_openlineage_trigger_dag.py b/providers/openlineage/tests/system/openlineage/example_openlineage_trigger_dag.py index 4eac16070628d..f290d0454a688 100644 --- a/providers/openlineage/tests/system/openlineage/example_openlineage_trigger_dag.py +++ b/providers/openlineage/tests/system/openlineage/example_openlineage_trigger_dag.py @@ -63,6 +63,7 @@ start_date=datetime(2021, 1, 1), schedule=None, catchup=False, + tags=["first", "second@", "with'quote", 'z"e'], default_args={"retries": 0}, ) as child_dag: do_nothing_task = BashOperator(task_id="do_nothing_task", bash_command="sleep 10;") diff --git a/providers/openlineage/tests/system/openlineage/expected_events/openlineage_base_complex_dag__af2.json b/providers/openlineage/tests/system/openlineage/expected_events/openlineage_base_complex_dag__af2.json index 6f4e72ab601c0..d43f0462376c6 100644 --- a/providers/openlineage/tests/system/openlineage/expected_events/openlineage_base_complex_dag__af2.json +++ b/providers/openlineage/tests/system/openlineage/expected_events/openlineage_base_complex_dag__af2.json @@ -14,12 +14,12 @@ "dag_id": "openlineage_base_complex_dag", "description": "OpenLineage complex DAG description", "fileloc": "{{ result.endswith('openlineage/example_openlineage_base_complex_dag.py') }}", - "owner": "{{ result.split(', ') | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owner": "{{ result.split(', ') | sort == ['airflow', 'owner\"1', 'owner\\'2', 'owner3'] }}", "owner_links": { "airflow": "https://airflow.apache.org/" }, "start_date": "{{ is_datetime(result) }}", - "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\"] }}", + "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}", "timetable": {} }, "dagRun": { @@ -62,13 +62,52 @@ "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/JobTypeJobFacet.json\\#\\/\\$defs\\/JobTypeJobFacet\") }}" }, + "tags": { + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + }, "documentation": { "description": "OpenLineage complex DAG description", "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/DocumentationJobFacet.json\\#\\/\\$defs\\/DocumentationJobFacet\") }}" }, "ownership": { - "owners": "{{ result | map(attribute='name') | list | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owners": [ + { + "name": "airflow" + }, + { + "name": "owner\"1" + }, + { + "name": "owner'2" + }, + { + "name": "owner3" + } + ], "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" }, @@ -227,12 +266,12 @@ "dag_id": "openlineage_base_complex_dag", "description": "OpenLineage complex DAG description", "fileloc": "{{ result.endswith('openlineage/example_openlineage_base_complex_dag.py') }}", - "owner": "{{ result.split(', ') | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owner": "{{ result.split(', ') | sort == ['airflow', 'owner\"1', 'owner\\'2', 'owner3'] }}", "owner_links": { "airflow": "https://airflow.apache.org/" }, "start_date": "{{ is_datetime(result) }}", - "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\"] }}", + "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}", "timetable": {} }, "dagRun": { @@ -266,7 +305,7 @@ "operator_class": "BashOperator", "operator_class_path": "airflow.providers.standard.operators.bash.BashOperator", "outlets": "[]", - "owner": "owner2", + "owner": "owner'2", "priority_weight": 1, "queue": "{{ result is string }}", "retries": "{{ result is number }}", @@ -315,12 +354,38 @@ "ownership": { "owners": [ { - "name": "owner2" + "name": "owner'2" } ], "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" }, + "tags": { + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + }, "sourceCode": { "language": "bash", "sourceCode": "exit 0;", @@ -365,12 +430,12 @@ "dag_id": "openlineage_base_complex_dag", "description": "OpenLineage complex DAG description", "fileloc": "{{ result.endswith('openlineage/example_openlineage_base_complex_dag.py') }}", - "owner": "{{ result.split(', ') | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owner": "{{ result.split(', ') | sort == ['airflow', 'owner\"1', 'owner\\'2', 'owner3'] }}", "owner_links": { "airflow": "https://airflow.apache.org/" }, "start_date": "{{ is_datetime(result) }}", - "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\"] }}", + "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}", "timetable": {} }, "dagRun": { @@ -405,7 +470,7 @@ "operator_class": "BashOperator", "operator_class_path": "airflow.providers.standard.operators.bash.BashOperator", "outlets": "[]", - "owner": "owner2", + "owner": "owner'2", "priority_weight": 1, "queue": "{{ result is string }}", "retries": "{{ result is number }}", @@ -420,6 +485,12 @@ "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\") }}" }, + "nominalTime": { + "nominalEndTime": "{{ is_datetime(result) }}", + "nominalStartTime": "{{ is_datetime(result) }}", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/NominalTimeRunFacet.json\\#\\/\\$defs\\/NominalTimeRunFacet$\") }}" + }, "processing_engine": { "name": "Airflow", "openlineageAdapterVersion": "{{ regex_match(result, \"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}", @@ -436,12 +507,38 @@ "ownership": { "owners": [ { - "name": "owner2" + "name": "owner'2" } ], "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" }, + "tags": { + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + }, "jobType": { "integration": "AIRFLOW", "jobType": "TASK", @@ -487,12 +584,12 @@ "dag_id": "openlineage_base_complex_dag", "description": "OpenLineage complex DAG description", "fileloc": "{{ result.endswith('openlineage/example_openlineage_base_complex_dag.py') }}", - "owner": "{{ result.split(', ') | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owner": "{{ result.split(', ') | sort == ['airflow', 'owner\"1', 'owner\\'2', 'owner3'] }}", "owner_links": { "airflow": "https://airflow.apache.org/" }, "start_date": "{{ is_datetime(result) }}", - "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\"] }}", + "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}", "timetable": {} }, "dagRun": { @@ -573,10 +670,49 @@ "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/DocumentationJobFacet.json\\#\\/\\$defs\\/DocumentationJobFacet\") }}" }, "ownership": { - "owners": "{{ result | map(attribute='name') | list | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owners": [ + { + "name": "airflow" + }, + { + "name": "owner\"1" + }, + { + "name": "owner'2" + }, + { + "name": "owner3" + } + ], "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" }, + "tags": { + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + }, "sourceCode": { "language": "python", "sourceCode": "def do_nothing():\n pass\n", @@ -621,12 +757,12 @@ "dag_id": "openlineage_base_complex_dag", "description": "OpenLineage complex DAG description", "fileloc": "{{ result.endswith('openlineage/example_openlineage_base_complex_dag.py') }}", - "owner": "{{ result.split(', ') | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owner": "{{ result.split(', ') | sort == ['airflow', 'owner\"1', 'owner\\'2', 'owner3'] }}", "owner_links": { "airflow": "https://airflow.apache.org/" }, "start_date": "{{ is_datetime(result) }}", - "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\"] }}", + "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}", "timetable": {} }, "dagRun": { @@ -676,6 +812,12 @@ "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\") }}" }, + "nominalTime": { + "nominalEndTime": "{{ is_datetime(result) }}", + "nominalStartTime": "{{ is_datetime(result) }}", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/NominalTimeRunFacet.json\\#\\/\\$defs\\/NominalTimeRunFacet$\") }}" + }, "processing_engine": { "name": "Airflow", "openlineageAdapterVersion": "{{ regex_match(result, \"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}", @@ -690,10 +832,49 @@ "name": "openlineage_base_complex_dag.task_2", "facets": { "ownership": { - "owners": "{{ result | map(attribute='name') | list | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owners": [ + { + "name": "airflow" + }, + { + "name": "owner\"1" + }, + { + "name": "owner'2" + }, + { + "name": "owner3" + } + ], "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" }, + "tags": { + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + }, "jobType": { "integration": "AIRFLOW", "jobType": "TASK", @@ -739,12 +920,12 @@ "dag_id": "openlineage_base_complex_dag", "description": "OpenLineage complex DAG description", "fileloc": "{{ result.endswith('openlineage/example_openlineage_base_complex_dag.py') }}", - "owner": "{{ result.split(', ') | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owner": "{{ result.split(', ') | sort == ['airflow', 'owner\"1', 'owner\\'2', 'owner3'] }}", "owner_links": { "airflow": "https://airflow.apache.org/" }, "start_date": "{{ is_datetime(result) }}", - "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\"] }}", + "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}", "timetable": {} }, "dagRun": { @@ -823,8 +1004,47 @@ "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/DocumentationJobFacet.json\\#\\/\\$defs\\/DocumentationJobFacet\") }}" }, + "tags": { + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + }, "ownership": { - "owners": "{{ result | map(attribute='name') | list | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owners": [ + { + "name": "airflow" + }, + { + "name": "owner\"1" + }, + { + "name": "owner'2" + }, + { + "name": "owner3" + } + ], "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" } @@ -866,12 +1086,12 @@ "dag_id": "openlineage_base_complex_dag", "description": "OpenLineage complex DAG description", "fileloc": "{{ result.endswith('openlineage/example_openlineage_base_complex_dag.py') }}", - "owner": "{{ result.split(', ') | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owner": "{{ result.split(', ') | sort == ['airflow', 'owner\"1', 'owner\\'2', 'owner3'] }}", "owner_links": { "airflow": "https://airflow.apache.org/" }, "start_date": "{{ is_datetime(result) }}", - "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\"] }}", + "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}", "timetable": {} }, "dagRun": { @@ -920,6 +1140,12 @@ "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\") }}" }, + "nominalTime": { + "nominalEndTime": "{{ is_datetime(result) }}", + "nominalStartTime": "{{ is_datetime(result) }}", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/NominalTimeRunFacet.json\\#\\/\\$defs\\/NominalTimeRunFacet$\") }}" + }, "processing_engine": { "name": "Airflow", "openlineageAdapterVersion": "{{ regex_match(result, \"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}", @@ -934,10 +1160,49 @@ "name": "openlineage_base_complex_dag.task_3", "facets": { "ownership": { - "owners": "{{ result | map(attribute='name') | list | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owners": [ + { + "name": "airflow" + }, + { + "name": "owner\"1" + }, + { + "name": "owner'2" + }, + { + "name": "owner3" + } + ], "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" }, + "tags": { + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + }, "jobType": { "integration": "AIRFLOW", "jobType": "TASK", @@ -983,12 +1248,12 @@ "dag_id": "openlineage_base_complex_dag", "description": "OpenLineage complex DAG description", "fileloc": "{{ result.endswith('openlineage/example_openlineage_base_complex_dag.py') }}", - "owner": "{{ result.split(', ') | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owner": "{{ result.split(', ') | sort == ['airflow', 'owner\"1', 'owner\\'2', 'owner3'] }}", "owner_links": { "airflow": "https://airflow.apache.org/" }, "start_date": "{{ is_datetime(result) }}", - "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\"] }}", + "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}", "timetable": {} }, "dagRun": { @@ -1067,6 +1332,32 @@ "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/JobTypeJobFacet.json\\#\\/\\$defs\\/JobTypeJobFacet\") }}" }, + "tags": { + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + }, "documentation": { "description": "OpenLineage complex DAG description", "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", @@ -1119,12 +1410,12 @@ "dag_id": "openlineage_base_complex_dag", "description": "OpenLineage complex DAG description", "fileloc": "{{ result.endswith('openlineage/example_openlineage_base_complex_dag.py') }}", - "owner": "{{ result.split(', ') | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owner": "{{ result.split(', ') | sort == ['airflow', 'owner\"1', 'owner\\'2', 'owner3'] }}", "owner_links": { "airflow": "https://airflow.apache.org/" }, "start_date": "{{ is_datetime(result) }}", - "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\"] }}", + "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}", "timetable": {} }, "dagRun": { @@ -1178,6 +1469,12 @@ "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\") }}" }, + "nominalTime": { + "nominalEndTime": "{{ is_datetime(result) }}", + "nominalStartTime": "{{ is_datetime(result) }}", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/NominalTimeRunFacet.json\\#\\/\\$defs\\/NominalTimeRunFacet$\") }}" + }, "processing_engine": { "name": "Airflow", "openlineageAdapterVersion": "{{ regex_match(result, \"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}", @@ -1200,6 +1497,32 @@ "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" }, + "tags": { + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + }, "jobType": { "integration": "AIRFLOW", "jobType": "TASK", @@ -1245,12 +1568,12 @@ "dag_id": "openlineage_base_complex_dag", "description": "OpenLineage complex DAG description", "fileloc": "{{ result.endswith('openlineage/example_openlineage_base_complex_dag.py') }}", - "owner": "{{ result.split(', ') | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owner": "{{ result.split(', ') | sort == ['airflow', 'owner\"1', 'owner\\'2', 'owner3'] }}", "owner_links": { "airflow": "https://airflow.apache.org/" }, "start_date": "{{ is_datetime(result) }}", - "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\"] }}", + "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}", "timetable": {} }, "dagRun": { @@ -1334,13 +1657,52 @@ "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/JobTypeJobFacet.json\\#\\/\\$defs\\/JobTypeJobFacet\") }}" }, + "tags": { + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + }, "documentation": { "description": "OpenLineage complex DAG description", "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/DocumentationJobFacet.json\\#\\/\\$defs\\/DocumentationJobFacet\") }}" }, "ownership": { - "owners": "{{ result | map(attribute='name') | list | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owners": [ + { + "name": "airflow" + }, + { + "name": "owner\"1" + }, + { + "name": "owner'2" + }, + { + "name": "owner3" + } + ], "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" } @@ -1382,12 +1744,12 @@ "dag_id": "openlineage_base_complex_dag", "description": "OpenLineage complex DAG description", "fileloc": "{{ result.endswith('openlineage/example_openlineage_base_complex_dag.py') }}", - "owner": "{{ result.split(', ') | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owner": "{{ result.split(', ') | sort == ['airflow', 'owner\"1', 'owner\\'2', 'owner3'] }}", "owner_links": { "airflow": "https://airflow.apache.org/" }, "start_date": "{{ is_datetime(result) }}", - "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\"] }}", + "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}", "timetable": {} }, "dagRun": { @@ -1446,6 +1808,12 @@ "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\") }}" }, + "nominalTime": { + "nominalEndTime": "{{ is_datetime(result) }}", + "nominalStartTime": "{{ is_datetime(result) }}", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/NominalTimeRunFacet.json\\#\\/\\$defs\\/NominalTimeRunFacet$\") }}" + }, "processing_engine": { "name": "Airflow", "openlineageAdapterVersion": "{{ regex_match(result, \"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}", @@ -1460,10 +1828,49 @@ "name": "openlineage_base_complex_dag.section_1.section_2.task_6", "facets": { "ownership": { - "owners": "{{ result | map(attribute='name') | list | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owners": [ + { + "name": "airflow" + }, + { + "name": "owner\"1" + }, + { + "name": "owner'2" + }, + { + "name": "owner3" + } + ], "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" }, + "tags": { + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + }, "jobType": { "integration": "AIRFLOW", "jobType": "TASK", diff --git a/providers/openlineage/tests/system/openlineage/expected_events/openlineage_base_complex_dag__af3.json b/providers/openlineage/tests/system/openlineage/expected_events/openlineage_base_complex_dag__af3.json index 7d2212f7c7a36..458ca1d38ac3e 100644 --- a/providers/openlineage/tests/system/openlineage/expected_events/openlineage_base_complex_dag__af3.json +++ b/providers/openlineage/tests/system/openlineage/expected_events/openlineage_base_complex_dag__af3.json @@ -14,12 +14,12 @@ "dag_id": "openlineage_base_complex_dag", "description": "OpenLineage complex DAG description", "fileloc": "{{ result.endswith('openlineage/example_openlineage_base_complex_dag.py') }}", - "owner": "{{ result.split(', ') | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owner": "{{ result.split(', ') | sort == ['airflow', 'owner\"1', 'owner\\'2', 'owner3'] }}", "owner_links": { "airflow": "https://airflow.apache.org/" }, "start_date": "{{ is_datetime(result) }}", - "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\"] }}", + "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}", "timetable": {}, "timetable_summary": "None" }, @@ -69,10 +69,49 @@ "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/DocumentationJobFacet.json\\#\\/\\$defs\\/DocumentationJobFacet\") }}" }, "ownership": { - "owners": "{{ result | map(attribute='name') | list | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owners": [ + { + "name": "airflow" + }, + { + "name": "owner\"1" + }, + { + "name": "owner'2" + }, + { + "name": "owner3" + } + ], "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" }, + "tags": { + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + }, "airflow": { "taskGroups": { "section_1": { @@ -228,12 +267,12 @@ "dag_id": "openlineage_base_complex_dag", "description": "OpenLineage complex DAG description", "fileloc": "{{ result.endswith('openlineage/example_openlineage_base_complex_dag.py') }}", - "owner": "{{ result.split(', ') | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owner": "{{ result.split(', ') | sort == ['airflow', 'owner\"1', 'owner\\'2', 'owner3'] }}", "owner_links": { "airflow": "https://airflow.apache.org/" }, "start_date": "{{ is_datetime(result) }}", - "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\"] }}", + "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}", "timetable": {}, "timetable_summary": "None" }, @@ -263,7 +302,7 @@ "operator_class": "BashOperator", "operator_class_path": "airflow.providers.standard.operators.bash.BashOperator", "outlets": "[]", - "owner": "owner2", + "owner": "owner'2", "priority_weight": 1, "queue": "{{ result is string }}", "retries": "{{ result is number }}", @@ -312,12 +351,38 @@ "ownership": { "owners": [ { - "name": "owner2" + "name": "owner'2" } ], "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" }, + "tags": { + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + }, "sourceCode": { "language": "bash", "sourceCode": "exit 0;", @@ -362,12 +427,12 @@ "dag_id": "openlineage_base_complex_dag", "description": "OpenLineage complex DAG description", "fileloc": "{{ result.endswith('openlineage/example_openlineage_base_complex_dag.py') }}", - "owner": "{{ result.split(', ') | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owner": "{{ result.split(', ') | sort == ['airflow', 'owner\"1', 'owner\\'2', 'owner3'] }}", "owner_links": { "airflow": "https://airflow.apache.org/" }, "start_date": "{{ is_datetime(result) }}", - "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\"] }}", + "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}", "timetable": {}, "timetable_summary": "None" }, @@ -397,7 +462,7 @@ "operator_class": "BashOperator", "operator_class_path": "airflow.providers.standard.operators.bash.BashOperator", "outlets": "[]", - "owner": "owner2", + "owner": "owner'2", "priority_weight": 1, "queue": "{{ result is string }}", "retries": "{{ result is number }}", @@ -412,6 +477,12 @@ "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\") }}" }, + "nominalTime": { + "nominalEndTime": "{{ is_datetime(result) }}", + "nominalStartTime": "{{ is_datetime(result) }}", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/NominalTimeRunFacet.json\\#\\/\\$defs\\/NominalTimeRunFacet$\") }}" + }, "processing_engine": { "name": "Airflow", "openlineageAdapterVersion": "{{ regex_match(result, \"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}", @@ -428,12 +499,38 @@ "ownership": { "owners": [ { - "name": "owner2" + "name": "owner'2" } ], "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" }, + "tags": { + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + }, "jobType": { "integration": "AIRFLOW", "jobType": "TASK", @@ -479,12 +576,12 @@ "dag_id": "openlineage_base_complex_dag", "description": "OpenLineage complex DAG description", "fileloc": "{{ result.endswith('openlineage/example_openlineage_base_complex_dag.py') }}", - "owner": "{{ result.split(', ') | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owner": "{{ result.split(', ') | sort == ['airflow', 'owner\"1', 'owner\\'2', 'owner3'] }}", "owner_links": { "airflow": "https://airflow.apache.org/" }, "start_date": "{{ is_datetime(result) }}", - "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\"] }}", + "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}", "timetable": {}, "timetable_summary": "None" }, @@ -561,10 +658,49 @@ "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/DocumentationJobFacet.json\\#\\/\\$defs\\/DocumentationJobFacet\") }}" }, "ownership": { - "owners": "{{ result | map(attribute='name') | list | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owners": [ + { + "name": "airflow" + }, + { + "name": "owner\"1" + }, + { + "name": "owner'2" + }, + { + "name": "owner3" + } + ], "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" }, + "tags": { + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + }, "sourceCode": { "language": "python", "sourceCode": "def do_nothing():\n pass\n", @@ -609,12 +745,12 @@ "dag_id": "openlineage_base_complex_dag", "description": "OpenLineage complex DAG description", "fileloc": "{{ result.endswith('openlineage/example_openlineage_base_complex_dag.py') }}", - "owner": "{{ result.split(', ') | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owner": "{{ result.split(', ') | sort == ['airflow', 'owner\"1', 'owner\\'2', 'owner3'] }}", "owner_links": { "airflow": "https://airflow.apache.org/" }, "start_date": "{{ is_datetime(result) }}", - "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\"] }}", + "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}", "timetable": {}, "timetable_summary": "None" }, @@ -659,6 +795,12 @@ "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\") }}" }, + "nominalTime": { + "nominalEndTime": "{{ is_datetime(result) }}", + "nominalStartTime": "{{ is_datetime(result) }}", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/NominalTimeRunFacet.json\\#\\/\\$defs\\/NominalTimeRunFacet$\") }}" + }, "processing_engine": { "name": "Airflow", "openlineageAdapterVersion": "{{ regex_match(result, \"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}", @@ -673,10 +815,49 @@ "name": "openlineage_base_complex_dag.task_2", "facets": { "ownership": { - "owners": "{{ result | map(attribute='name') | list | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owners": [ + { + "name": "airflow" + }, + { + "name": "owner\"1" + }, + { + "name": "owner'2" + }, + { + "name": "owner3" + } + ], "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" }, + "tags": { + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + }, "jobType": { "integration": "AIRFLOW", "jobType": "TASK", @@ -722,12 +903,12 @@ "dag_id": "openlineage_base_complex_dag", "description": "OpenLineage complex DAG description", "fileloc": "{{ result.endswith('openlineage/example_openlineage_base_complex_dag.py') }}", - "owner": "{{ result.split(', ') | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owner": "{{ result.split(', ') | sort == ['airflow', 'owner\"1', 'owner\\'2', 'owner3'] }}", "owner_links": { "airflow": "https://airflow.apache.org/" }, "start_date": "{{ is_datetime(result) }}", - "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\"] }}", + "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}", "timetable": {}, "timetable_summary": "None" }, @@ -802,8 +983,47 @@ "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/DocumentationJobFacet.json\\#\\/\\$defs\\/DocumentationJobFacet\") }}" }, + "tags": { + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + }, "ownership": { - "owners": "{{ result | map(attribute='name') | list | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owners": [ + { + "name": "airflow" + }, + { + "name": "owner\"1" + }, + { + "name": "owner'2" + }, + { + "name": "owner3" + } + ], "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" } @@ -845,12 +1065,12 @@ "dag_id": "openlineage_base_complex_dag", "description": "OpenLineage complex DAG description", "fileloc": "{{ result.endswith('openlineage/example_openlineage_base_complex_dag.py') }}", - "owner": "{{ result.split(', ') | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owner": "{{ result.split(', ') | sort == ['airflow', 'owner\"1', 'owner\\'2', 'owner3'] }}", "owner_links": { "airflow": "https://airflow.apache.org/" }, "start_date": "{{ is_datetime(result) }}", - "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\"] }}", + "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}", "timetable": {}, "timetable_summary": "None" }, @@ -894,6 +1114,12 @@ "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\") }}" }, + "nominalTime": { + "nominalEndTime": "{{ is_datetime(result) }}", + "nominalStartTime": "{{ is_datetime(result) }}", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/NominalTimeRunFacet.json\\#\\/\\$defs\\/NominalTimeRunFacet$\") }}" + }, "processing_engine": { "name": "Airflow", "openlineageAdapterVersion": "{{ regex_match(result, \"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}", @@ -908,10 +1134,49 @@ "name": "openlineage_base_complex_dag.task_3", "facets": { "ownership": { - "owners": "{{ result | map(attribute='name') | list | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owners": [ + { + "name": "airflow" + }, + { + "name": "owner\"1" + }, + { + "name": "owner'2" + }, + { + "name": "owner3" + } + ], "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" }, + "tags": { + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + }, "jobType": { "integration": "AIRFLOW", "jobType": "TASK", @@ -957,12 +1222,12 @@ "dag_id": "openlineage_base_complex_dag", "description": "OpenLineage complex DAG description", "fileloc": "{{ result.endswith('openlineage/example_openlineage_base_complex_dag.py') }}", - "owner": "{{ result.split(', ') | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owner": "{{ result.split(', ') | sort == ['airflow', 'owner\"1', 'owner\\'2', 'owner3'] }}", "owner_links": { "airflow": "https://airflow.apache.org/" }, "start_date": "{{ is_datetime(result) }}", - "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\"] }}", + "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}", "timetable": {}, "timetable_summary": "None" }, @@ -1037,6 +1302,32 @@ "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/JobTypeJobFacet.json\\#\\/\\$defs\\/JobTypeJobFacet\") }}" }, + "tags": { + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + }, "documentation": { "description": "OpenLineage complex DAG description", "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", @@ -1089,12 +1380,12 @@ "dag_id": "openlineage_base_complex_dag", "description": "OpenLineage complex DAG description", "fileloc": "{{ result.endswith('openlineage/example_openlineage_base_complex_dag.py') }}", - "owner": "{{ result.split(', ') | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owner": "{{ result.split(', ') | sort == ['airflow', 'owner\"1', 'owner\\'2', 'owner3'] }}", "owner_links": { "airflow": "https://airflow.apache.org/" }, "start_date": "{{ is_datetime(result) }}", - "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\"] }}", + "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}", "timetable": {}, "timetable_summary": "None" }, @@ -1143,6 +1434,12 @@ "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\") }}" }, + "nominalTime": { + "nominalEndTime": "{{ is_datetime(result) }}", + "nominalStartTime": "{{ is_datetime(result) }}", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/NominalTimeRunFacet.json\\#\\/\\$defs\\/NominalTimeRunFacet$\") }}" + }, "processing_engine": { "name": "Airflow", "openlineageAdapterVersion": "{{ regex_match(result, \"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}", @@ -1165,6 +1462,32 @@ "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" }, + "tags": { + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + }, "jobType": { "integration": "AIRFLOW", "jobType": "TASK", @@ -1210,12 +1533,12 @@ "dag_id": "openlineage_base_complex_dag", "description": "OpenLineage complex DAG description", "fileloc": "{{ result.endswith('openlineage/example_openlineage_base_complex_dag.py') }}", - "owner": "{{ result.split(', ') | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owner": "{{ result.split(', ') | sort == ['airflow', 'owner\"1', 'owner\\'2', 'owner3'] }}", "owner_links": { "airflow": "https://airflow.apache.org/" }, "start_date": "{{ is_datetime(result) }}", - "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\"] }}", + "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}", "timetable": {}, "timetable_summary": "None" }, @@ -1300,8 +1623,47 @@ "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/DocumentationJobFacet.json\\#\\/\\$defs\\/DocumentationJobFacet\") }}" }, + "tags": { + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + }, "ownership": { - "owners": "{{ result | map(attribute='name') | list | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owners": [ + { + "name": "airflow" + }, + { + "name": "owner\"1" + }, + { + "name": "owner'2" + }, + { + "name": "owner3" + } + ], "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" } @@ -1343,12 +1705,12 @@ "dag_id": "openlineage_base_complex_dag", "description": "OpenLineage complex DAG description", "fileloc": "{{ result.endswith('openlineage/example_openlineage_base_complex_dag.py') }}", - "owner": "{{ result.split(', ') | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owner": "{{ result.split(', ') | sort == ['airflow', 'owner\"1', 'owner\\'2', 'owner3'] }}", "owner_links": { "airflow": "https://airflow.apache.org/" }, "start_date": "{{ is_datetime(result) }}", - "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\"] }}", + "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}", "timetable": {}, "timetable_summary": "None" }, @@ -1402,6 +1764,12 @@ "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\") }}" }, + "nominalTime": { + "nominalEndTime": "{{ is_datetime(result) }}", + "nominalStartTime": "{{ is_datetime(result) }}", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/NominalTimeRunFacet.json\\#\\/\\$defs\\/NominalTimeRunFacet$\") }}" + }, "processing_engine": { "name": "Airflow", "openlineageAdapterVersion": "{{ regex_match(result, \"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}", @@ -1416,10 +1784,49 @@ "name": "openlineage_base_complex_dag.section_1.section_2.task_6", "facets": { "ownership": { - "owners": "{{ result | map(attribute='name') | list | sort == ['airflow', 'owner1', 'owner2', 'owner3'] }}", + "owners": [ + { + "name": "airflow" + }, + { + "name": "owner\"1" + }, + { + "name": "owner'2" + }, + { + "name": "owner3" + } + ], "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" }, + "tags": { + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + }, "jobType": { "integration": "AIRFLOW", "jobType": "TASK", diff --git a/providers/openlineage/tests/system/openlineage/expected_events/openlineage_base_simple_dag__af2.json b/providers/openlineage/tests/system/openlineage/expected_events/openlineage_base_simple_dag__af2.json index 884829fe21cfd..f1545ce3f25e8 100644 --- a/providers/openlineage/tests/system/openlineage/expected_events/openlineage_base_simple_dag__af2.json +++ b/providers/openlineage/tests/system/openlineage/expected_events/openlineage_base_simple_dag__af2.json @@ -306,6 +306,12 @@ "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\") }}" }, + "nominalTime": { + "nominalEndTime": "{{ is_datetime(result) }}", + "nominalStartTime": "{{ is_datetime(result) }}", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/NominalTimeRunFacet.json\\#\\/\\$defs\\/NominalTimeRunFacet$\") }}" + }, "processing_engine": { "name": "Airflow", "openlineageAdapterVersion": "{{ regex_match(result, \"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}", diff --git a/providers/openlineage/tests/system/openlineage/expected_events/openlineage_base_simple_dag__af3.json b/providers/openlineage/tests/system/openlineage/expected_events/openlineage_base_simple_dag__af3.json index 37e2aa47af27e..8474ce7fced50 100644 --- a/providers/openlineage/tests/system/openlineage/expected_events/openlineage_base_simple_dag__af3.json +++ b/providers/openlineage/tests/system/openlineage/expected_events/openlineage_base_simple_dag__af3.json @@ -298,6 +298,12 @@ "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\") }}" }, + "nominalTime": { + "nominalEndTime": "{{ is_datetime(result) }}", + "nominalStartTime": "{{ is_datetime(result) }}", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/NominalTimeRunFacet.json\\#\\/\\$defs\\/NominalTimeRunFacet$\") }}" + }, "processing_engine": { "name": "Airflow", "openlineageAdapterVersion": "{{ regex_match(result, \"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}", diff --git a/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag.json b/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag.json index 0589caae18e4f..1b24b97fd3a49 100644 --- a/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag.json +++ b/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag.json @@ -16,7 +16,7 @@ "owner": "airflow", "owner_links": {}, "start_date": "{{ is_datetime(result) }}", - "tags": "[]", + "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}", "timetable": {} }, "dagRun": { @@ -69,6 +69,32 @@ "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" }, + "tags": { + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + }, "airflow": { "taskGroups": {}, "taskTree": {}, @@ -113,7 +139,7 @@ "owner": "airflow", "owner_links": {}, "start_date": "{{ is_datetime(result) }}", - "tags": "[]", + "tags": "{{ result[1:-1].split(', ') | sort == ['\"with\\'quote\"', \"'first'\", \"'second@'\", '\\'z\"e\\''] }}", "timetable": {} }, "dagRun": { @@ -132,6 +158,12 @@ "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\") }}" }, + "nominalTime": { + "nominalEndTime": "{{ is_datetime(result) }}", + "nominalStartTime": "{{ is_datetime(result) }}", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/NominalTimeRunFacet.json\\#\\/\\$defs\\/NominalTimeRunFacet$\") }}" + }, "processing_engine": { "name": "Airflow", "openlineageAdapterVersion": "{{ regex_match(result, \"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}", @@ -154,6 +186,32 @@ "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" }, + "tags": { + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + }, "jobType": { "integration": "AIRFLOW", "jobType": "DAG", diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py b/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py index 45d060cadfa2f..a1b3ecb467561 100644 --- a/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py +++ b/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py @@ -321,7 +321,14 @@ def test_emit_complete_event(mock_stats_incr, mock_stats_timer): run_id = str(uuid.uuid4()) event_time = datetime.datetime.now().isoformat() adapter.complete_task( - run_id=run_id, end_time=event_time, job_name="job", task=OperatorLineage(), owners=[], tags=[] + run_id=run_id, + end_time=event_time, + job_name="job", + task=OperatorLineage(), + owners=[], + tags=[], + nominal_start_time=datetime.datetime(2022, 1, 1).isoformat(), + nominal_end_time=datetime.datetime(2022, 1, 1).isoformat(), ) assert ( @@ -334,7 +341,11 @@ def test_emit_complete_event(mock_stats_incr, mock_stats_timer): facets={ "processing_engine": processing_engine_run.ProcessingEngineRunFacet( version=ANY, name="Airflow", openlineageAdapterVersion=ANY - ) + ), + "nominalTime": nominal_time_run.NominalTimeRunFacet( + nominalStartTime="2022-01-01T00:00:00", + nominalEndTime="2022-01-01T00:00:00", + ), }, ), job=Job( @@ -373,6 +384,8 @@ def test_emit_complete_event_with_additional_information(mock_stats_incr, mock_s job_name="job", owners=["owner1", "owner2"], tags=["tag1", "tag2"], + nominal_start_time=datetime.datetime(2022, 1, 1).isoformat(), + nominal_end_time=datetime.datetime(2022, 1, 1).isoformat(), task=OperatorLineage( inputs=[Dataset(namespace="bigquery", name="a.b.c"), Dataset(namespace="bigquery", name="x.y.z")], outputs=[Dataset(namespace="gs://bucket", name="exported_folder")], @@ -414,6 +427,10 @@ def test_emit_complete_event_with_additional_information(mock_stats_incr, mock_s job=parent_run.RootJob(namespace=namespace(), name="parent_job_name"), ), ), + "nominalTime": nominal_time_run.NominalTimeRunFacet( + nominalStartTime="2022-01-01T00:00:00", + nominalEndTime="2022-01-01T00:00:00", + ), "processing_engine": processing_engine_run.ProcessingEngineRunFacet( version=ANY, name="Airflow", openlineageAdapterVersion=ANY ), @@ -477,6 +494,8 @@ def test_emit_failed_event(mock_stats_incr, mock_stats_timer): task=OperatorLineage(), owners=[], tags=[], + nominal_start_time=datetime.datetime(2022, 1, 1).isoformat(), + nominal_end_time=datetime.datetime(2022, 1, 1).isoformat(), ) assert ( @@ -489,7 +508,11 @@ def test_emit_failed_event(mock_stats_incr, mock_stats_timer): facets={ "processing_engine": processing_engine_run.ProcessingEngineRunFacet( version=ANY, name="Airflow", openlineageAdapterVersion=ANY - ) + ), + "nominalTime": nominal_time_run.NominalTimeRunFacet( + nominalStartTime="2022-01-01T00:00:00", + nominalEndTime="2022-01-01T00:00:00", + ), }, ), job=Job( @@ -528,6 +551,8 @@ def test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta job_name="job", owners=["owner1", "owner2"], tags=["tag1", "tag2"], + nominal_start_time=datetime.datetime(2022, 1, 1).isoformat(), + nominal_end_time=datetime.datetime(2022, 1, 1).isoformat(), task=OperatorLineage( inputs=[Dataset(namespace="bigquery", name="a.b.c"), Dataset(namespace="bigquery", name="x.y.z")], outputs=[Dataset(namespace="gs://bucket", name="exported_folder")], @@ -569,6 +594,10 @@ def test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta job=parent_run.RootJob(namespace=namespace(), name="parent_job_name"), ), ), + "nominalTime": nominal_time_run.NominalTimeRunFacet( + nominalStartTime="2022-01-01T00:00:00", + nominalEndTime="2022-01-01T00:00:00", + ), "processing_engine": processing_engine_run.ProcessingEngineRunFacet( version=ANY, name="Airflow", openlineageAdapterVersion=ANY ), @@ -841,6 +870,8 @@ def test_emit_dag_complete_event( task_ids=["task_0", "task_1", "task_2.test"], owners=["owner1", "owner2"], tags=["tag1", "tag2"], + nominal_start_time=datetime.datetime(2022, 1, 1).isoformat(), + nominal_end_time=datetime.datetime(2022, 1, 1).isoformat(), run_facets={ "parent": parent_run.ParentRunFacet( run=parent_run.Run(runId=random_uuid), @@ -869,6 +900,10 @@ def test_emit_dag_complete_event( job=parent_run.RootJob(namespace=namespace(), name="parent_job_name"), ), ), + "nominalTime": nominal_time_run.NominalTimeRunFacet( + nominalStartTime="2022-01-01T00:00:00", + nominalEndTime="2022-01-01T00:00:00", + ), "airflowState": AirflowStateRunFacet( dagRunState=DagRunState.SUCCESS, tasksState={ @@ -969,6 +1004,8 @@ def test_emit_dag_failed_event( tags=["tag1", "tag2"], msg="error msg", owners=["owner1", "owner2"], + nominal_start_time=datetime.datetime(2022, 1, 1).isoformat(), + nominal_end_time=datetime.datetime(2022, 1, 1).isoformat(), run_facets={ "parent": parent_run.ParentRunFacet( run=parent_run.Run(runId=random_uuid), @@ -997,6 +1034,10 @@ def test_emit_dag_failed_event( job=parent_run.RootJob(namespace=namespace(), name="parent_job_name"), ), ), + "nominalTime": nominal_time_run.NominalTimeRunFacet( + nominalStartTime="2022-01-01T00:00:00", + nominalEndTime="2022-01-01T00:00:00", + ), "errorMessage": error_message_run.ErrorMessageRunFacet( message="error msg", programmingLanguage="python" ), diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py index e6625c6304d41..931083044c071 100644 --- a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py +++ b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py @@ -411,6 +411,8 @@ def test_adapter_fail_task_is_called_with_proper_arguments( run_id="2020-01-01T01:01:01+00:00.dag_id.task_id.1.-1", owners=["task_owner"], tags=["tag1", "tag2"], + nominal_start_time=None, + nominal_end_time=None, task=listener.extractor_manager.extract_metadata(), run_facets={ "parent": 4, @@ -510,6 +512,8 @@ def test_adapter_complete_task_is_called_with_proper_arguments( task=listener.extractor_manager.extract_metadata(), owners=["task_owner"], tags=["tag1", "tag2"], + nominal_start_time=None, + nominal_end_time=None, run_facets={ "parent": 4, "custom_user_facet": 2, @@ -1192,6 +1196,8 @@ def test_adapter_fail_task_is_called_with_proper_arguments( task=listener.extractor_manager.extract_metadata(), owners=["task_owner"], tags={"tag1", "tag2"}, + nominal_start_time=None, + nominal_end_time=None, run_facets={ "parent": 4, "custom_user_facet": 2, @@ -1337,6 +1343,8 @@ def test_adapter_complete_task_is_called_with_proper_arguments( task=listener.extractor_manager.extract_metadata(), owners=["task_owner"], tags={"tag1", "tag2"}, + nominal_start_time=None, + nominal_end_time=None, run_facets={ "parent": 4, "custom_user_facet": 2,