diff --git a/providers/openlineage/tests/system/openlineage/example_openlineage_defer_simple_dag.py b/providers/openlineage/tests/system/openlineage/example_openlineage_defer_simple_dag.py index 74275ee11d5c0..145fdca650555 100644 --- a/providers/openlineage/tests/system/openlineage/example_openlineage_defer_simple_dag.py +++ b/providers/openlineage/tests/system/openlineage/example_openlineage_defer_simple_dag.py @@ -24,14 +24,12 @@ from __future__ import annotations -import warnings from datetime import datetime, timedelta from airflow import DAG -from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.models import Variable from airflow.providers.standard.operators.python import PythonOperator -from airflow.providers.standard.sensors.time_delta import TimeDeltaSensorAsync +from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor from system.openlineage.expected_events import get_expected_event_file_path from system.openlineage.operator import OpenLineageTestOperator @@ -54,9 +52,7 @@ def check_events_number_func(): ) as dag: # Timedelta is compared to the DAGRun start timestamp, which can occur long before a worker picks up the # task. We need to ensure the sensor gets deferred at least once, so setting 180s. - with warnings.catch_warnings(): # TODO Switch to TimeDeltaSensor when deferrable is released - warnings.simplefilter("ignore", AirflowProviderDeprecationWarning) - wait = TimeDeltaSensorAsync(task_id="wait", delta=timedelta(seconds=180)) + wait = TimeDeltaSensor(task_id="wait", delta=timedelta(seconds=180), deferrable=True) check_events_number = PythonOperator( task_id="check_events_number", python_callable=check_events_number_func @@ -65,7 +61,7 @@ def check_events_number_func(): check_events = OpenLineageTestOperator( task_id="check_events", file_path=get_expected_event_file_path(DAG_ID), - allow_duplicate_events=True, + allow_duplicate_events_regex="openlineage_defer_simple_dag.wait.event.start", ) wait >> check_events_number >> check_events diff --git a/providers/openlineage/tests/system/openlineage/example_openlineage_mapped_simple_dag.py b/providers/openlineage/tests/system/openlineage/example_openlineage_mapped_simple_dag.py index 1e73d4edcf906..a80608edd233b 100644 --- a/providers/openlineage/tests/system/openlineage/example_openlineage_mapped_simple_dag.py +++ b/providers/openlineage/tests/system/openlineage/example_openlineage_mapped_simple_dag.py @@ -78,7 +78,9 @@ def sum_it(values): ) check_events = OpenLineageTestOperator( - task_id="check_events", file_path=get_expected_event_file_path(DAG_ID), allow_duplicate_events=True + task_id="check_events", + file_path=get_expected_event_file_path(DAG_ID), + allow_duplicate_events_regex="openlineage_mapped_simple_dag.add_one.event.(start|complete)", ) sum_it(added_values) >> check_events_number >> check_events diff --git a/providers/openlineage/tests/system/openlineage/example_openlineage_trigger_dag_deferrable.py b/providers/openlineage/tests/system/openlineage/example_openlineage_trigger_dag_deferrable.py new file mode 100644 index 0000000000000..4a4f8ee934bd8 --- /dev/null +++ b/providers/openlineage/tests/system/openlineage/example_openlineage_trigger_dag_deferrable.py @@ -0,0 +1,93 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Simple DAG that triggers another simple DAG in deferrable mode. + +It checks: + - task's trigger_dag_id, trigger_run_id, deferrable attribute + - DAGRun START and COMPLETE events, for the triggered DAG + - automatic injection of OL parent and root info to DAGRun conf + - multiple levels of triggering +""" + +from __future__ import annotations + +from datetime import datetime + +from airflow import DAG +from airflow.providers.standard.operators.bash import BashOperator +from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator + +from system.openlineage.expected_events import get_expected_event_file_path +from system.openlineage.operator import OpenLineageTestOperator + +DAG_ID = "openlineage_trigger_dag_deferrable" + +with DAG( + dag_id=DAG_ID, + start_date=datetime(2021, 1, 1), + schedule=None, + catchup=False, + default_args={"retries": 0}, +) as dag: + trigger_dagrun = TriggerDagRunOperator( + task_id="trigger_dagrun", + trigger_dag_id="openlineage_trigger_dag_deferrable_child__notrigger", + wait_for_completion=True, + conf={"some_config": "value1"}, + poke_interval=5, + deferrable=True, + ) + + check_events = OpenLineageTestOperator( + task_id="check_events", + file_path=get_expected_event_file_path(DAG_ID), + allow_duplicate_events_regex="openlineage_trigger_dag_deferrable.trigger_dagrun.event.start", + ) + + trigger_dagrun >> check_events + + +with DAG( + dag_id="openlineage_trigger_dag_deferrable_child__notrigger", + start_date=datetime(2021, 1, 1), + schedule=None, + catchup=False, + default_args={"retries": 0}, +) as child_dag: + trigger_dagrun2 = TriggerDagRunOperator( + task_id="trigger_dagrun2", + trigger_dag_id="openlineage_trigger_dag_deferrable_child2__notrigger", + wait_for_completion=True, + poke_interval=5, + ) + + +with DAG( + dag_id="openlineage_trigger_dag_deferrable_child2__notrigger", + start_date=datetime(2021, 1, 1), + schedule=None, + catchup=False, + default_args={"retries": 0}, +) as child_dag2: + do_nothing_task = BashOperator(task_id="do_nothing_task", bash_command="sleep 10;") + + +from tests_common.test_utils.system_tests import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag.json b/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag.json index 3695408b28fbd..a32c8659a815d 100644 --- a/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag.json +++ b/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag.json @@ -43,24 +43,24 @@ "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\") }}" }, "parent": { - "job": { - "namespace": "prod_biz", - "name": "get_files" - }, - "run": { - "runId": "3bb703d1-09c1-4a42-8da5-35a0b3216072" - }, - "root": { "job": { - "name": "generate_report_sales_e2e", - "namespace": "prod_analytics" + "namespace": "prod_biz", + "name": "get_files" }, "run": { - "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e" - } - }, - "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", - "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\") }}" + "runId": "3bb703d1-09c1-4a42-8da5-35a0b3216072" + }, + "root": { + "job": { + "name": "generate_report_sales_e2e", + "namespace": "prod_analytics" + }, + "run": { + "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e" + } + }, + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\") }}" }, "nominalTime": { "nominalEndTime": "{{ is_datetime(result) }}", @@ -82,10 +82,10 @@ "name": "openlineage_trigger_dag_child__notrigger", "facets": { "documentation": { - "description": "MD DAG doc", - "contentType": "text/markdown", - "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", - "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/DocumentationJobFacet.json\\#\\/\\$defs\\/DocumentationJobFacet\") }}" + "description": "MD DAG doc", + "contentType": "text/markdown", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/DocumentationJobFacet.json\\#\\/\\$defs\\/DocumentationJobFacet\") }}" }, "jobType": { "integration": "AIRFLOW", @@ -104,30 +104,30 @@ "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" }, "tags": { - "tags": [ - { - "key": "first", - "value": "first", - "source": "AIRFLOW" - }, - { - "key": "second@", - "value": "second@", - "source": "AIRFLOW" - }, - { - "key": "with'quote", - "value": "with'quote", - "source": "AIRFLOW" - }, - { - "key": "z\"e", - "value": "z\"e", - "source": "AIRFLOW" - } - ], - "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", - "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" }, "airflow": { "taskGroups": {}, @@ -204,24 +204,24 @@ "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\") }}" }, "parent": { - "job": { - "namespace": "prod_biz", - "name": "get_files" - }, - "run": { - "runId": "3bb703d1-09c1-4a42-8da5-35a0b3216072" - }, - "root": { "job": { - "name": "generate_report_sales_e2e", - "namespace": "prod_analytics" + "namespace": "prod_biz", + "name": "get_files" }, "run": { - "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e" - } - }, - "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", - "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\") }}" + "runId": "3bb703d1-09c1-4a42-8da5-35a0b3216072" + }, + "root": { + "job": { + "name": "generate_report_sales_e2e", + "namespace": "prod_analytics" + }, + "run": { + "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e" + } + }, + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\") }}" }, "nominalTime": { "nominalEndTime": "{{ is_datetime(result) }}", @@ -243,10 +243,10 @@ "name": "openlineage_trigger_dag_child__notrigger", "facets": { "documentation": { - "description": "MD DAG doc", - "contentType": "text/markdown", - "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", - "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/DocumentationJobFacet.json\\#\\/\\$defs\\/DocumentationJobFacet\") }}" + "description": "MD DAG doc", + "contentType": "text/markdown", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/DocumentationJobFacet.json\\#\\/\\$defs\\/DocumentationJobFacet\") }}" }, "ownership": { "owners": [ @@ -258,30 +258,30 @@ "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" }, "tags": { - "tags": [ - { - "key": "first", - "value": "first", - "source": "AIRFLOW" - }, - { - "key": "second@", - "value": "second@", - "source": "AIRFLOW" - }, - { - "key": "with'quote", - "value": "with'quote", - "source": "AIRFLOW" - }, - { - "key": "z\"e", - "value": "z\"e", - "source": "AIRFLOW" - } - ], - "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", - "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" + "tags": [ + { + "key": "first", + "value": "first", + "source": "AIRFLOW" + }, + { + "key": "second@", + "value": "second@", + "source": "AIRFLOW" + }, + { + "key": "with'quote", + "value": "with'quote", + "source": "AIRFLOW" + }, + { + "key": "z\"e", + "value": "z\"e", + "source": "AIRFLOW" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\") }}" }, "jobType": { "integration": "AIRFLOW", @@ -298,24 +298,24 @@ "run": { "facets": { "parent": { - "job": { - "namespace": "{{ result is string }}", - "name": "openlineage_trigger_dag_child__notrigger" - }, - "run": { - "runId": "{{ is_uuid(result) }}" - }, - "root": { "job": { - "name": "generate_report_sales_e2e", - "namespace": "prod_analytics" + "namespace": "{{ result is string }}", + "name": "openlineage_trigger_dag_child__notrigger" }, "run": { - "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e" - } - }, - "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", - "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\") }}" + "runId": "{{ is_uuid(result) }}" + }, + "root": { + "job": { + "name": "generate_report_sales_e2e", + "namespace": "prod_analytics" + }, + "run": { + "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e" + } + }, + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\") }}" } } }, @@ -328,24 +328,24 @@ "run": { "facets": { "parent": { - "job": { - "namespace": "{{ result is string }}", - "name": "openlineage_trigger_dag_child__notrigger" - }, - "run": { - "runId": "{{ is_uuid(result) }}" - }, - "root": { "job": { - "name": "generate_report_sales_e2e", - "namespace": "prod_analytics" + "namespace": "{{ result is string }}", + "name": "openlineage_trigger_dag_child__notrigger" }, "run": { - "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e" - } - }, - "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", - "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\") }}" + "runId": "{{ is_uuid(result) }}" + }, + "root": { + "job": { + "name": "generate_report_sales_e2e", + "namespace": "prod_analytics" + }, + "run": { + "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e" + } + }, + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\") }}" } } }, @@ -359,28 +359,29 @@ "facets": { "airflow": { "task": { - "trigger_dag_id": "openlineage_trigger_dag_child__notrigger" + "trigger_dag_id": "openlineage_trigger_dag_child__notrigger", + "trigger_run_id": "{{ result.startswith('openlineage_trigger_dag_triggering_child_202') }}" } }, "parent": { - "job": { - "namespace": "{{ result is string }}", - "name": "openlineage_trigger_dag" - }, - "run": { - "runId": "{{ is_uuid(result) }}" - }, - "root": { "job": { - "name": "openlineage_trigger_dag", - "namespace": "{{ result is string }}" + "namespace": "{{ result is string }}", + "name": "openlineage_trigger_dag" }, "run": { - "runId": "{{ is_uuid(result) }}" - } - }, - "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", - "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\") }}" + "runId": "{{ is_uuid(result) }}" + }, + "root": { + "job": { + "name": "openlineage_trigger_dag", + "namespace": "{{ result is string }}" + }, + "run": { + "runId": "{{ is_uuid(result) }}" + } + }, + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\") }}" } } }, @@ -394,28 +395,29 @@ "facets": { "airflow": { "task": { - "trigger_dag_id": "openlineage_trigger_dag_child__notrigger" + "trigger_dag_id": "openlineage_trigger_dag_child__notrigger", + "trigger_run_id": "{{ result.startswith('openlineage_trigger_dag_triggering_child_202') }}" } }, "parent": { - "job": { - "namespace": "{{ result is string }}", - "name": "openlineage_trigger_dag" - }, - "run": { - "runId": "{{ is_uuid(result) }}" - }, - "root": { "job": { - "name": "openlineage_trigger_dag", - "namespace": "{{ result is string }}" + "namespace": "{{ result is string }}", + "name": "openlineage_trigger_dag" }, "run": { - "runId": "{{ is_uuid(result) }}" - } - }, - "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", - "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\") }}" + "runId": "{{ is_uuid(result) }}" + }, + "root": { + "job": { + "name": "openlineage_trigger_dag", + "namespace": "{{ result is string }}" + }, + "run": { + "runId": "{{ is_uuid(result) }}" + } + }, + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\") }}" } } }, diff --git a/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag_deferrable.json b/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag_deferrable.json new file mode 100644 index 0000000000000..fc274dd93dc80 --- /dev/null +++ b/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag_deferrable.json @@ -0,0 +1,660 @@ +[ + { + "eventType": "START", + "eventTime": "{{ is_datetime(result) }}", + "producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunEvent$\") }}", + "inputs": [], + "outputs": [], + "run": { + "runId": "{{ is_uuid(result) }}", + "facets": { + "airflowDagRun": { + "dag": { + "dag_id": "openlineage_trigger_dag_deferrable_child__notrigger", + "fileloc": "{{ result.endswith('openlineage/example_openlineage_trigger_dag_deferrable.py') }}", + "owner": "airflow", + "owner_links": {}, + "start_date": "{{ is_datetime(result) }}", + "timetable": {} + }, + "dagRun": { + "conf": { + "some_config": "value1", + "openlineage": { + "parentRunId": "{{ is_uuid(result) }}", + "parentJobNamespace": "{{ result is string }}", + "parentJobName": "openlineage_trigger_dag_deferrable.trigger_dagrun", + "rootParentRunId": "{{ is_uuid(result) }}", + "rootParentJobNamespace": "{{ result is string }}", + "rootParentJobName": "openlineage_trigger_dag_deferrable" + } + }, + "dag_id": "openlineage_trigger_dag_deferrable_child__notrigger", + "data_interval_end": "{{ is_datetime(result) }}", + "data_interval_start": "{{ is_datetime(result) }}", + "logical_date": "{{ is_datetime(result) }}", + "run_id": "{{ result.startswith('manual__202') }}", + "run_type": "manual", + "start_date": "{{ is_datetime(result) }}" + }, + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\") }}" + }, + "parent": { + "job": { + "namespace": "{{ result is string }}", + "name": "openlineage_trigger_dag_deferrable.trigger_dagrun" + }, + "run": { + "runId": "{{ is_uuid(result) }}" + }, + "root": { + "job": { + "name": "openlineage_trigger_dag_deferrable", + "namespace": "{{ result is string }}" + }, + "run": { + "runId": "{{ is_uuid(result) }}" + } + }, + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\") }}" + }, + "nominalTime": { + "nominalEndTime": "{{ is_datetime(result) }}", + "nominalStartTime": "{{ is_datetime(result) }}", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/NominalTimeRunFacet.json\\#\\/\\$defs\\/NominalTimeRunFacet$\") }}" + }, + "processing_engine": { + "name": "Airflow", + "openlineageAdapterVersion": "{{ regex_match(result, \"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}", + "version": "{{ regex_match(result, \"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ProcessingEngineRunFacet.json\\#\\/\\$defs\\/ProcessingEngineRunFacet$\") }}" + } + } + }, + "job": { + "namespace": "{{ result is string }}", + "name": "openlineage_trigger_dag_deferrable_child__notrigger", + "facets": { + "jobType": { + "integration": "AIRFLOW", + "jobType": "DAG", + "processingType": "BATCH", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/JobTypeJobFacet.json\\#\\/\\$defs\\/JobTypeJobFacet\") }}" + }, + "ownership": { + "owners": [ + { + "name": "airflow" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" + }, + "airflow": { + "taskGroups": {}, + "taskTree": {}, + "tasks": { + "trigger_dagrun2": { + "downstream_task_ids": [], + "emits_ol_events": "{{ result == true }}", + "is_setup": false, + "is_teardown": false, + "operator": "airflow.providers.standard.operators.trigger_dagrun.TriggerDagRunOperator", + "ui_label": "trigger_dagrun2" + } + }, + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/JobFacet\") }}" + } + } + } + }, + { + "eventType": "COMPLETE", + "eventTime": "{{ is_datetime(result) }}", + "producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunEvent$\") }}", + "inputs": [], + "outputs": [], + "run": { + "runId": "{{ is_uuid(result) }}", + "facets": { + "airflowState": { + "dagRunState": "success", + "tasksState": { + "trigger_dagrun2": "success" + }, + "tasksDuration": { + "trigger_dagrun2": "{{ result is number }}" + }, + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\") }}" + }, + "airflowDagRun": { + "dag": { + "dag_id": "openlineage_trigger_dag_deferrable_child__notrigger", + "fileloc": "{{ result.endswith('openlineage/example_openlineage_trigger_dag_deferrable.py') }}", + "owner": "airflow", + "owner_links": {}, + "start_date": "{{ is_datetime(result) }}", + "timetable": {} + }, + "dagRun": { + "conf": { + "some_config": "value1", + "openlineage": { + "parentRunId": "{{ is_uuid(result) }}", + "parentJobNamespace": "{{ result is string }}", + "parentJobName": "openlineage_trigger_dag_deferrable.trigger_dagrun", + "rootParentRunId": "{{ is_uuid(result) }}", + "rootParentJobNamespace": "{{ result is string }}", + "rootParentJobName": "openlineage_trigger_dag_deferrable" + } + }, + "dag_id": "openlineage_trigger_dag_deferrable_child__notrigger", + "data_interval_end": "{{ is_datetime(result) }}", + "data_interval_start": "{{ is_datetime(result) }}", + "logical_date": "{{ is_datetime(result) }}", + "run_id": "{{ result.startswith('manual__202') }}", + "run_type": "manual", + "start_date": "{{ is_datetime(result) }}", + "duration": "{{ result is number }}" + }, + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\") }}" + }, + "parent": { + "job": { + "namespace": "{{ result is string }}", + "name": "openlineage_trigger_dag_deferrable.trigger_dagrun" + }, + "run": { + "runId": "{{ is_uuid(result) }}" + }, + "root": { + "job": { + "name": "openlineage_trigger_dag_deferrable", + "namespace": "{{ result is string }}" + }, + "run": { + "runId": "{{ is_uuid(result) }}" + } + }, + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\") }}" + }, + "nominalTime": { + "nominalEndTime": "{{ is_datetime(result) }}", + "nominalStartTime": "{{ is_datetime(result) }}", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/NominalTimeRunFacet.json\\#\\/\\$defs\\/NominalTimeRunFacet$\") }}" + }, + "processing_engine": { + "name": "Airflow", + "openlineageAdapterVersion": "{{ regex_match(result, \"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}", + "version": "{{ regex_match(result, \"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ProcessingEngineRunFacet.json\\#\\/\\$defs\\/ProcessingEngineRunFacet$\") }}" + } + } + }, + "job": { + "namespace": "{{ result is string }}", + "name": "openlineage_trigger_dag_deferrable_child__notrigger", + "facets": { + "ownership": { + "owners": [ + { + "name": "airflow" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" + }, + "jobType": { + "integration": "AIRFLOW", + "jobType": "DAG", + "processingType": "BATCH", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/JobTypeJobFacet.json\\#\\/\\$defs\\/JobTypeJobFacet\") }}" + } + } + } + }, + { + "eventType": "START", + "run": { + "facets": { + "airflow": { + "task": { + "trigger_dag_id": "openlineage_trigger_dag_deferrable_child2__notrigger" + } + }, + "parent": { + "job": { + "namespace": "{{ result is string }}", + "name": "openlineage_trigger_dag_deferrable_child__notrigger" + }, + "run": { + "runId": "{{ is_uuid(result) }}" + }, + "root": { + "job": { + "name": "openlineage_trigger_dag_deferrable", + "namespace": "{{ result is string }}" + }, + "run": { + "runId": "{{ is_uuid(result) }}" + } + }, + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\") }}" + } + } + }, + "job": { + "name": "openlineage_trigger_dag_deferrable_child__notrigger.trigger_dagrun2" + } + }, + { + "eventType": "COMPLETE", + "run": { + "facets": { + "airflow": { + "task": { + "trigger_dag_id": "openlineage_trigger_dag_deferrable_child2__notrigger", + "trigger_run_id": "{{ result.startswith('manual__202') }}" + } + }, + "parent": { + "job": { + "namespace": "{{ result is string }}", + "name": "openlineage_trigger_dag_deferrable_child__notrigger" + }, + "run": { + "runId": "{{ is_uuid(result) }}" + }, + "root": { + "job": { + "name": "openlineage_trigger_dag_deferrable", + "namespace": "{{ result is string }}" + }, + "run": { + "runId": "{{ is_uuid(result) }}" + } + }, + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\") }}" + } + } + }, + "job": { + "name": "openlineage_trigger_dag_deferrable_child__notrigger.trigger_dagrun2" + } + }, + { + "eventType": "START", + "eventTime": "{{ is_datetime(result) }}", + "producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunEvent$\") }}", + "inputs": [], + "outputs": [], + "run": { + "runId": "{{ is_uuid(result) }}", + "facets": { + "airflowDagRun": { + "dag": { + "dag_id": "openlineage_trigger_dag_deferrable_child2__notrigger", + "fileloc": "{{ result.endswith('openlineage/example_openlineage_trigger_dag_deferrable.py') }}", + "owner": "airflow", + "owner_links": {}, + "start_date": "{{ is_datetime(result) }}", + "timetable": {} + }, + "dagRun": { + "conf": { + "openlineage": { + "parentRunId": "{{ is_uuid(result) }}", + "parentJobNamespace": "{{ result is string }}", + "parentJobName": "openlineage_trigger_dag_deferrable_child__notrigger.trigger_dagrun2", + "rootParentRunId": "{{ is_uuid(result) }}", + "rootParentJobNamespace": "{{ result is string }}", + "rootParentJobName": "openlineage_trigger_dag_deferrable" + } + }, + "dag_id": "openlineage_trigger_dag_deferrable_child2__notrigger", + "data_interval_end": "{{ is_datetime(result) }}", + "data_interval_start": "{{ is_datetime(result) }}", + "logical_date": "{{ is_datetime(result) }}", + "run_id": "{{ result.startswith('manual__202') }}", + "run_type": "manual", + "start_date": "{{ is_datetime(result) }}" + }, + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\") }}" + }, + "parent": { + "job": { + "namespace": "{{ result is string }}", + "name": "openlineage_trigger_dag_deferrable_child__notrigger.trigger_dagrun2" + }, + "run": { + "runId": "{{ is_uuid(result) }}" + }, + "root": { + "job": { + "name": "openlineage_trigger_dag_deferrable", + "namespace": "{{ result is string }}" + }, + "run": { + "runId": "{{ is_uuid(result) }}" + } + }, + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\") }}" + }, + "nominalTime": { + "nominalEndTime": "{{ is_datetime(result) }}", + "nominalStartTime": "{{ is_datetime(result) }}", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/NominalTimeRunFacet.json\\#\\/\\$defs\\/NominalTimeRunFacet$\") }}" + }, + "processing_engine": { + "name": "Airflow", + "openlineageAdapterVersion": "{{ regex_match(result, \"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}", + "version": "{{ regex_match(result, \"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ProcessingEngineRunFacet.json\\#\\/\\$defs\\/ProcessingEngineRunFacet$\") }}" + } + } + }, + "job": { + "namespace": "{{ result is string }}", + "name": "openlineage_trigger_dag_deferrable_child2__notrigger", + "facets": { + "jobType": { + "integration": "AIRFLOW", + "jobType": "DAG", + "processingType": "BATCH", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/JobTypeJobFacet.json\\#\\/\\$defs\\/JobTypeJobFacet\") }}" + }, + "ownership": { + "owners": [ + { + "name": "airflow" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" + }, + "airflow": { + "taskGroups": {}, + "taskTree": {}, + "tasks": { + "do_nothing_task": { + "downstream_task_ids": [], + "emits_ol_events": "{{ result == true }}", + "is_setup": false, + "is_teardown": false, + "operator": "airflow.providers.standard.operators.bash.BashOperator", + "ui_label": "do_nothing_task" + } + }, + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/JobFacet\") }}" + } + } + } + }, + { + "eventType": "COMPLETE", + "eventTime": "{{ is_datetime(result) }}", + "producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunEvent$\") }}", + "inputs": [], + "outputs": [], + "run": { + "runId": "{{ is_uuid(result) }}", + "facets": { + "airflowState": { + "dagRunState": "success", + "tasksState": { + "do_nothing_task": "success" + }, + "tasksDuration": { + "do_nothing_task": "{{ result is number }}" + }, + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\") }}" + }, + "airflowDagRun": { + "dag": { + "dag_id": "openlineage_trigger_dag_deferrable_child2__notrigger", + "fileloc": "{{ result.endswith('openlineage/example_openlineage_trigger_dag_deferrable.py') }}", + "owner": "airflow", + "owner_links": {}, + "start_date": "{{ is_datetime(result) }}", + "timetable": {} + }, + "dagRun": { + "conf": { + "openlineage": { + "parentRunId": "{{ is_uuid(result) }}", + "parentJobNamespace": "{{ result is string }}", + "parentJobName": "openlineage_trigger_dag_deferrable_child__notrigger.trigger_dagrun2", + "rootParentRunId": "{{ is_uuid(result) }}", + "rootParentJobNamespace": "{{ result is string }}", + "rootParentJobName": "openlineage_trigger_dag_deferrable" + } + }, + "dag_id": "openlineage_trigger_dag_deferrable_child2__notrigger", + "data_interval_end": "{{ is_datetime(result) }}", + "data_interval_start": "{{ is_datetime(result) }}", + "logical_date": "{{ is_datetime(result) }}", + "run_id": "{{ result.startswith('manual__202') }}", + "run_type": "manual", + "start_date": "{{ is_datetime(result) }}", + "duration": "{{ result is number }}" + }, + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\") }}" + }, + "parent": { + "job": { + "namespace": "{{ result is string }}", + "name": "openlineage_trigger_dag_deferrable_child__notrigger.trigger_dagrun2" + }, + "run": { + "runId": "{{ is_uuid(result) }}" + }, + "root": { + "job": { + "name": "openlineage_trigger_dag_deferrable", + "namespace": "{{ result is string }}" + }, + "run": { + "runId": "{{ is_uuid(result) }}" + } + }, + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\") }}" + }, + "nominalTime": { + "nominalEndTime": "{{ is_datetime(result) }}", + "nominalStartTime": "{{ is_datetime(result) }}", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/NominalTimeRunFacet.json\\#\\/\\$defs\\/NominalTimeRunFacet$\") }}" + }, + "processing_engine": { + "name": "Airflow", + "openlineageAdapterVersion": "{{ regex_match(result, \"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}", + "version": "{{ regex_match(result, \"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ProcessingEngineRunFacet.json\\#\\/\\$defs\\/ProcessingEngineRunFacet$\") }}" + } + } + }, + "job": { + "namespace": "{{ result is string }}", + "name": "openlineage_trigger_dag_deferrable_child2__notrigger", + "facets": { + "ownership": { + "owners": [ + { + "name": "airflow" + } + ], + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\") }}" + }, + "jobType": { + "integration": "AIRFLOW", + "jobType": "DAG", + "processingType": "BATCH", + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/JobTypeJobFacet.json\\#\\/\\$defs\\/JobTypeJobFacet\") }}" + } + } + } + }, + { + "eventType": "START", + "run": { + "facets": { + "parent": { + "job": { + "namespace": "{{ result is string }}", + "name": "openlineage_trigger_dag_deferrable_child2__notrigger" + }, + "run": { + "runId": "{{ is_uuid(result) }}" + }, + "root": { + "job": { + "name": "openlineage_trigger_dag_deferrable", + "namespace": "{{ result is string }}" + }, + "run": { + "runId": "{{ is_uuid(result) }}" + } + }, + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\") }}" + } + } + }, + "job": { + "name": "openlineage_trigger_dag_deferrable_child2__notrigger.do_nothing_task" + } + }, + { + "eventType": "COMPLETE", + "run": { + "facets": { + "parent": { + "job": { + "namespace": "{{ result is string }}", + "name": "openlineage_trigger_dag_deferrable_child2__notrigger" + }, + "run": { + "runId": "{{ is_uuid(result) }}" + }, + "root": { + "job": { + "name": "openlineage_trigger_dag_deferrable", + "namespace": "{{ result is string }}" + }, + "run": { + "runId": "{{ is_uuid(result) }}" + } + }, + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\") }}" + } + } + }, + "job": { + "name": "openlineage_trigger_dag_deferrable_child2__notrigger.do_nothing_task" + } + }, + { + "eventType": "START", + "run": { + "facets": { + "airflow": { + "task": { + "deferrable": "{{ result == true }}", + "trigger_dag_id": "openlineage_trigger_dag_deferrable_child__notrigger" + } + }, + "parent": { + "job": { + "namespace": "{{ result is string }}", + "name": "openlineage_trigger_dag_deferrable" + }, + "run": { + "runId": "{{ is_uuid(result) }}" + }, + "root": { + "job": { + "name": "openlineage_trigger_dag_deferrable", + "namespace": "{{ result is string }}" + }, + "run": { + "runId": "{{ is_uuid(result) }}" + } + }, + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\") }}" + } + } + }, + "job": { + "name": "openlineage_trigger_dag_deferrable.trigger_dagrun" + } + }, + { + "eventType": "COMPLETE", + "run": { + "facets": { + "airflow": { + "task": { + "deferrable": "{{ result == true }}", + "trigger_dag_id": "openlineage_trigger_dag_deferrable_child__notrigger", + "trigger_run_id": "{{ result.startswith('manual__') }}" + } + }, + "parent": { + "job": { + "namespace": "{{ result is string }}", + "name": "openlineage_trigger_dag_deferrable" + }, + "run": { + "runId": "{{ is_uuid(result) }}" + }, + "root": { + "job": { + "name": "openlineage_trigger_dag_deferrable", + "namespace": "{{ result is string }}" + }, + "run": { + "runId": "{{ is_uuid(result) }}" + } + }, + "_producer": "{{ regex_match(result, \"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\") }}", + "_schemaURL": "{{ regex_match(result, \"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\") }}" + } + } + }, + "job": { + "name": "openlineage_trigger_dag_deferrable.trigger_dagrun" + } + } +] diff --git a/providers/openlineage/tests/system/openlineage/operator.py b/providers/openlineage/tests/system/openlineage/operator.py index 000696a95b1f0..0bc8aa7c003b5 100644 --- a/providers/openlineage/tests/system/openlineage/operator.py +++ b/providers/openlineage/tests/system/openlineage/operator.py @@ -188,7 +188,7 @@ class OpenLineageTestOperator(BaseOperator): :param event_templates: dictionary where key is the key used by VariableTransport in format of ..event., and value is event template (fragment) that need to be in received events. :param file_path: alternatively, file_path pointing to file with event templates will be used :param env: jinja environment used to render event templates - :param allow_duplicate_events: if set to True, allows multiple events for the same key + :param allow_duplicate_events_regex: regex pattern; keys matching it are allowed to have multiple events. :param clear_variables: if set to True, clears only variables to be checked after all events are checked or if any check fails :raises: ValueError if the received events do not match with expected ones. """ @@ -198,7 +198,7 @@ def __init__( event_templates: dict[str, dict] | None = None, file_path: str | None = None, env: Environment = setup_jinja(), - allow_duplicate_events: bool = False, + allow_duplicate_events_regex: str | None = None, clear_variables: bool = True, **kwargs, ): @@ -206,8 +206,8 @@ def __init__( self.event_templates = event_templates self.file_path = file_path self.env = env - self.multiple_events = allow_duplicate_events - self.delete = clear_variables + self.allow_duplicate_events_regex = allow_duplicate_events_regex + self.clear_variables = clear_variables if self.event_templates and self.file_path: raise ValueError("Can't pass both event_templates and file_path") @@ -234,13 +234,15 @@ def execute(self, context: Context) -> None: ) if len(actual_events) == 0: raise ValueError(f"No event for key {key}") - if len(actual_events) != 1 and not self.multiple_events: - raise ValueError(f"Expected one event for key {key}, got {len(actual_events)}") + if len(actual_events) != 1: + regex = self.allow_duplicate_events_regex + if regex is None or not re.fullmatch(regex, key): + raise ValueError(f"Expected one event for key {key}, got {len(actual_events)}") # Last event is checked against the template, this will allow to f.e. check change in try_num if not match(template, json.loads(actual_events[-1]), self.env): raise ValueError("Event received does not match one specified in test") finally: - if self.delete: + if self.clear_variables: for key in self.event_templates: # type: ignore[union-attr] log.info("Removing variable `%s`", key) Variable.delete(key=key)