Skip to content

Commit

Permalink
add basic system tests for OpenLineage
Browse files Browse the repository at this point in the history
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
  • Loading branch information
mobuchowski committed Nov 19, 2024
1 parent 86c4c6f commit 3a97b27
Show file tree
Hide file tree
Showing 12 changed files with 588 additions and 2 deletions.
5 changes: 5 additions & 0 deletions airflow/models/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ def get(
mask_secret(var_val, key)
return var_val

@staticmethod
@provide_session
def list(session: Session = None) -> list[Variable]:
return session.query(Variable).all()

@staticmethod
@provide_session
def set(
Expand Down
8 changes: 8 additions & 0 deletions docs/apache-airflow-providers-openlineage/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@
PyPI Repository <https://pypi.org/project/apache-airflow-providers-openlineage/>
Installing from sources <installing-providers-from-sources>

.. toctree::
:hidden:
:maxdepth: 1
:caption: System tests

System Tests <_api/tests/system/openlineage/index>


.. THE REMAINDER OF THE FILE IS AUTOMATICALLY GENERATED. IT WILL BE OVERWRITTEN AT RELEASE TIME!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,10 @@ def emit(self, event: RunEvent):
stack.enter_context(Stats.timer("ol.emit.attempts"))
self._client.emit(redacted_event)
self.log.debug("Successfully emitted OpenLineage event of id %s", event.run.runId)
except Exception as e:
except Exception:
Stats.incr("ol.emit.failed")
self.log.warning("Failed to emit OpenLineage event of id %s", event.run.runId)
self.log.debug("OpenLineage emission failure: %s", e)
self.log.debug("OpenLineage emission failure: %s", exc_info=True)

return redacted_event

Expand Down
16 changes: 16 additions & 0 deletions providers/src/airflow/providers/openlineage/transport/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
48 changes: 48 additions & 0 deletions providers/src/airflow/providers/openlineage/transport/variable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING

from openlineage.client.serde import Serde
from openlineage.client.transport import Transport

from airflow.models.variable import Variable
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
from openlineage.client.client import Event


class VariableTransport(Transport, LoggingMixin):
"""
This transport sends OpenLineage events to Variables.
Key schema is <DAG_ID>.<TASK_ID>.event.<EVENT_TYPE>.
It's made to be used in system tests, stored data read by OpenLineageTestOperator.
"""

def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)

def emit(self, event: Event) -> None:
key = f"{event.job.name}.event.{event.eventType.value.lower()}" # type: ignore[union-attr]
event_str = Serde.to_json(event)
if (var := Variable.get(key=key, default_var=None, deserialize_json=True)) is not None:
Variable.set(key=key, value=var + [event_str], serialize_json=True)
else:
Variable.set(key=key, value=[event_str], serialize_json=True)
16 changes: 16 additions & 0 deletions providers/tests/system/openlineage/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
35 changes: 35 additions & 0 deletions providers/tests/system/openlineage/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import pytest

from airflow.listeners.listener import get_listener_manager
from airflow.providers.openlineage.plugins.listener import OpenLineageListener
from airflow.providers.openlineage.transport.variable import VariableTransport


@pytest.fixture(autouse=True)
def set_transport_variable():
lm = get_listener_manager()
lm.clear()
listener = OpenLineageListener()
listener.adapter._client = listener.adapter.get_or_create_openlineage_client()
listener.adapter._client.transport = VariableTransport()
lm.add_listener(listener)
yield
lm.clear()
38 changes: 38 additions & 0 deletions providers/tests/system/openlineage/example_openlineage.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
[
{
"eventType": "START",
"eventTime": "{{ is_datetime(result) }}",
"run": {
"runId": "{{ is_uuid(result) }}"
},
"job": {
"namespace": "default",
"name": "openlineage_basic_dag.do_nothing_task",
"facets": {
"jobType": {
"integration": "AIRFLOW",
"jobType": "TASK",
"processingType": "BATCH"
}
}
}
},
{
"eventType": "COMPLETE",
"eventTime": "{{ is_datetime(result) }}",
"run": {
"runId": "{{ is_uuid(result) }}"
},
"job": {
"namespace": "default",
"name": "openlineage_basic_dag.do_nothing_task",
"facets": {
"jobType": {
"integration": "AIRFLOW",
"jobType": "TASK",
"processingType": "BATCH"
}
}
}
}
]
53 changes: 53 additions & 0 deletions providers/tests/system/openlineage/example_openlineage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from datetime import datetime

from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator

from providers.tests.system.openlineage.operator import OpenLineageTestOperator


def do_nothing():
pass


default_args = {"start_date": datetime(2021, 1, 1), "retries": 1}

# Instantiate the DAG
with DAG(
"openlineage_basic_dag",
default_args=default_args,
start_date=datetime(2021, 1, 1),
schedule=None,
catchup=False,
) as dag:
nothing_task = PythonOperator(task_id="do_nothing_task", python_callable=do_nothing)

check_events = OpenLineageTestOperator(
task_id="check_events", file_path="providers/tests/system/openlineage/example_openlineage.json"
)

nothing_task >> check_events


from tests_common.test_utils.system_tests import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
[
{
"eventType": "START",
"eventTime": "{{ is_datetime(result) }}",
"run": {
"runId": "{{ is_uuid(result) }}"
},
"job": {
"namespace": "default",
"name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds",
"facets": {
"jobType": {
"integration": "AIRFLOW",
"jobType": "TASK",
"processingType": "BATCH"
}
}
}
},
{
"eventType": "COMPLETE",
"eventTime": "{{ is_datetime(result) }}",
"run": {
"runId": "{{ is_uuid(result) }}"
},
"job": {
"namespace": "default",
"name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds",
"facets": {
"jobType": {
"integration": "AIRFLOW",
"jobType": "TASK",
"processingType": "BATCH"
}
}
}
},
{
"eventType": "START",
"eventTime": "{{ is_datetime(result) }}",
"run": {
"runId": "{{ is_uuid(result) }}"
},
"job": {
"namespace": "default",
"name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds",
"facets": {
"jobType": {
"integration": "AIRFLOW",
"jobType": "TASK",
"processingType": "BATCH"
}
}
}
},

{
"eventType": "COMPLETE",
"eventTime": "{{ is_datetime(result) }}",
"run": {
"runId": "{{ is_uuid(result) }}"
},
"job": {
"namespace": "default",
"name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds",
"facets": {
"jobType": {
"integration": "AIRFLOW",
"jobType": "TASK",
"processingType": "BATCH"
}
}
}
}
]
Loading

0 comments on commit 3a97b27

Please sign in to comment.