From aa1e41f5a8e126c3319b0fbce78b52c15310fbcd Mon Sep 17 00:00:00 2001 From: Maciej Obuchowski Date: Wed, 3 Jul 2024 17:49:45 +0200 Subject: [PATCH 1/4] openlineage: add config to include 'full' task info based on conf setting Signed-off-by: Maciej Obuchowski --- airflow/providers/openlineage/conf.py | 5 +++ airflow/providers/openlineage/provider.yaml | 7 ++++ airflow/providers/openlineage/utils/utils.py | 21 +++++++++- .../guides/user.rst | 22 +++++++++++ .../openlineage/plugins/test_utils.py | 39 ++++++++++++++++++- tests/providers/openlineage/test_conf.py | 33 ++++++++++++++++ 6 files changed, 125 insertions(+), 2 deletions(-) diff --git a/airflow/providers/openlineage/conf.py b/airflow/providers/openlineage/conf.py index 8f3c2b3571c7e..0e4af0a70bc04 100644 --- a/airflow/providers/openlineage/conf.py +++ b/airflow/providers/openlineage/conf.py @@ -145,3 +145,8 @@ def execution_timeout() -> int: """[openlineage] execution_timeout.""" option = conf.get(_CONFIG_SECTION, "execution_timeout", fallback="") return _safe_int_convert(str(option).strip(), default=10) + + +@cache +def include_full_task_info() -> bool: + return conf.getboolean(_CONFIG_SECTION, "include_full_task_info", fallback="False") diff --git a/airflow/providers/openlineage/provider.yaml b/airflow/providers/openlineage/provider.yaml index 9622226c7f907..cfa001fdbaae5 100644 --- a/airflow/providers/openlineage/provider.yaml +++ b/airflow/providers/openlineage/provider.yaml @@ -153,3 +153,10 @@ config: example: ~ type: integer version_added: 1.9.0 + include_full_task_info: + description: | + If true, OpenLineage event will include full task info - potentially containing large fields. + default: "False" + example: ~ + type: boolean + version_added: 1.10.0 diff --git a/airflow/providers/openlineage/utils/utils.py b/airflow/providers/openlineage/utils/utils.py index 0c6f4bdb8766b..ea3a663f7784d 100644 --- a/airflow/providers/openlineage/utils/utils.py +++ b/airflow/providers/openlineage/utils/utils.py @@ -272,6 +272,25 @@ class TaskInfo(InfoJsonEncodable): } +class TaskInfoComplete(TaskInfo): + """Defines encoding BaseOperator/AbstractOperator object to JSON used when user enables full task info.""" + + includes = [] + excludes = [ + "_BaseOperator__instantiated", + "_dag", + "_hook", + "_log", + "_outlets", + "_inlets", + "_lock_for_execution", + "handler", + "params", + "python_callable", + "retry_delay", + ] + + class TaskGroupInfo(InfoJsonEncodable): """Defines encoding TaskGroup object to JSON.""" @@ -300,7 +319,7 @@ def get_airflow_run_facet( dag=DagInfo(dag), dagRun=DagRunInfo(dag_run), taskInstance=TaskInstanceInfo(task_instance), - task=TaskInfo(task), + task=TaskInfoComplete(task) if conf.include_full_task_info() else TaskInfo(task), taskUuid=task_uuid, ) } diff --git a/docs/apache-airflow-providers-openlineage/guides/user.rst b/docs/apache-airflow-providers-openlineage/guides/user.rst index e7decec1f1539..b940024b7f366 100644 --- a/docs/apache-airflow-providers-openlineage/guides/user.rst +++ b/docs/apache-airflow-providers-openlineage/guides/user.rst @@ -246,6 +246,28 @@ full import paths of Airflow Operators to disable as ``disabled_for_operators`` AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS='airflow.operators.bash.BashOperator;airflow.operators.python.PythonOperator' +Full Task Info +^^^^^^^^^^^^^^ + +By default, OpenLineage integration's AirflowRunFacet - attached on START event for every task instance event - does +not contain full serialized task information (parameters to given operator), but only includes selected parameters. + +However, we allow users to set OpenLineage integration to include full task information. By doing this, rather than +serializing only few known attributes, we operate in exclude mode - and exclude certain non-serializable elements. + +.. code-block:: ini + + [openlineage] + transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"} + include_full_task_info = true + +``AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO`` environment variable is an equivalent. + +.. warning:: + + By setting this variable to true, OpenLineage integration does not control the size of event you sent. It can potentially include elements weighting megabytes in size, depending on the size of data you pass to the task. + + Custom Extractors ^^^^^^^^^^^^^^^^^ diff --git a/tests/providers/openlineage/plugins/test_utils.py b/tests/providers/openlineage/plugins/test_utils.py index 016bb99d4e6b0..8ca245d1f3d01 100644 --- a/tests/providers/openlineage/plugins/test_utils.py +++ b/tests/providers/openlineage/plugins/test_utils.py @@ -21,7 +21,7 @@ import uuid from json import JSONEncoder from typing import Any -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest from attrs import define @@ -34,6 +34,7 @@ InfoJsonEncodable, OpenLineageRedactor, _is_name_redactable, + get_airflow_run_facet, get_fully_qualified_class_name, is_operator_disabled, ) @@ -227,3 +228,39 @@ def test_is_operator_disabled(mock_disabled_operators): "airflow.operators.python.PythonOperator", } assert is_operator_disabled(op) is True + + +@patch("airflow.providers.openlineage.conf.include_full_task_info") +def test_includes_full_task_info(mock_include_full_task_info): + mock_include_full_task_info.return_value = True + # There should be no 'bash_command' in excludes and it's not in includes - so + # it's a good choice for checking TaskInfo vs TaskInfoComplete + assert ( + "bash_command" + in get_airflow_run_facet( + MagicMock(), + MagicMock(), + MagicMock(), + BashOperator(task_id="bash_op", bash_command="sleep 1"), + MagicMock(), + )["airflow"].task + ) + + +@patch("airflow.providers.openlineage.conf.include_full_task_info") +def test_does_not_include_full_task_info(mock_include_full_task_info): + from airflow.operators.bash import BashOperator + + mock_include_full_task_info.return_value = False + # There should be no 'bash_command' in excludes and it's not in includes - so + # it's a good choice for checking TaskInfo vs TaskInfoComplete + assert ( + "bash_command" + not in get_airflow_run_facet( + MagicMock(), + MagicMock(), + MagicMock(), + BashOperator(task_id="bash_op", bash_command="sleep 1"), + MagicMock(), + )["airflow"].task + ) diff --git a/tests/providers/openlineage/test_conf.py b/tests/providers/openlineage/test_conf.py index 60060b001c6d6..7e0a1c85a78b2 100644 --- a/tests/providers/openlineage/test_conf.py +++ b/tests/providers/openlineage/test_conf.py @@ -21,6 +21,7 @@ import pytest +from airflow.exceptions import AirflowConfigException from airflow.providers.openlineage.conf import ( _is_true, _safe_int_convert, @@ -28,6 +29,7 @@ custom_extractors, dag_state_change_process_pool_size, disabled_operators, + include_full_task_info, is_disabled, is_source_enabled, namespace, @@ -52,6 +54,7 @@ _VAR_URL = "OPENLINEAGE_URL" _CONFIG_OPTION_SELECTIVE_ENABLE = "selective_enable" _CONFIG_OPTION_DAG_STATE_CHANGE_PROCESS_POOL_SIZE = "dag_state_change_process_pool_size" +_CONFIG_OPTION_INCLUDE_FULL_TASK_INFO = "include_full_task_info" _BOOL_PARAMS = ( ("1", True), @@ -487,3 +490,33 @@ def test_dag_state_change_process_pool_size(var_string, expected): with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_DAG_STATE_CHANGE_PROCESS_POOL_SIZE): var_string}): result = dag_state_change_process_pool_size() assert result == expected + + +@pytest.mark.parametrize( + ("var", "expected"), + ( + ("False", False), + ("True", True), + ("t", True), + ("true", True), + ), +) +def test_include_full_task_info_reads_config(var, expected): + with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_INCLUDE_FULL_TASK_INFO): var}): + assert include_full_task_info() is expected + + +@pytest.mark.parametrize( + "var", + [ + "a", + "asdf", + "31", + "", + " ", + ], +) +def test_include_full_task_info_raises_exception(var): + with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_INCLUDE_FULL_TASK_INFO): var}): + with pytest.raises(AirflowConfigException): + include_full_task_info() From 5bf5574e0a8a059d5cff839a548960fdaf6b32b9 Mon Sep 17 00:00:00 2001 From: Maciej Obuchowski Date: Tue, 16 Jul 2024 00:14:45 +0200 Subject: [PATCH 2/4] Update docs/apache-airflow-providers-openlineage/guides/user.rst Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> --- docs/apache-airflow-providers-openlineage/guides/user.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/apache-airflow-providers-openlineage/guides/user.rst b/docs/apache-airflow-providers-openlineage/guides/user.rst index b940024b7f366..ee3bf1389fa01 100644 --- a/docs/apache-airflow-providers-openlineage/guides/user.rst +++ b/docs/apache-airflow-providers-openlineage/guides/user.rst @@ -250,7 +250,7 @@ Full Task Info ^^^^^^^^^^^^^^ By default, OpenLineage integration's AirflowRunFacet - attached on START event for every task instance event - does -not contain full serialized task information (parameters to given operator), but only includes selected parameters. +not contain full serialized task information (parameters to given operator), but only includes select parameters. However, we allow users to set OpenLineage integration to include full task information. By doing this, rather than serializing only few known attributes, we operate in exclude mode - and exclude certain non-serializable elements. From fb325ae85797639d1e78cb941c47882787bd1f2c Mon Sep 17 00:00:00 2001 From: Maciej Obuchowski Date: Tue, 16 Jul 2024 00:15:02 +0200 Subject: [PATCH 3/4] Update docs/apache-airflow-providers-openlineage/guides/user.rst Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> --- docs/apache-airflow-providers-openlineage/guides/user.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/apache-airflow-providers-openlineage/guides/user.rst b/docs/apache-airflow-providers-openlineage/guides/user.rst index ee3bf1389fa01..7a63507897f7f 100644 --- a/docs/apache-airflow-providers-openlineage/guides/user.rst +++ b/docs/apache-airflow-providers-openlineage/guides/user.rst @@ -253,7 +253,7 @@ By default, OpenLineage integration's AirflowRunFacet - attached on START event not contain full serialized task information (parameters to given operator), but only includes select parameters. However, we allow users to set OpenLineage integration to include full task information. By doing this, rather than -serializing only few known attributes, we operate in exclude mode - and exclude certain non-serializable elements. +serializing only a few known attributes, we exclude certain non-serializable elements and send everything else. .. code-block:: ini From f8454202f22728dd14899f83276f7ecfe675bf1b Mon Sep 17 00:00:00 2001 From: Maciej Obuchowski Date: Tue, 16 Jul 2024 00:15:10 +0200 Subject: [PATCH 4/4] Update docs/apache-airflow-providers-openlineage/guides/user.rst Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> --- docs/apache-airflow-providers-openlineage/guides/user.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/apache-airflow-providers-openlineage/guides/user.rst b/docs/apache-airflow-providers-openlineage/guides/user.rst index 7a63507897f7f..437da6d0fac19 100644 --- a/docs/apache-airflow-providers-openlineage/guides/user.rst +++ b/docs/apache-airflow-providers-openlineage/guides/user.rst @@ -265,7 +265,7 @@ serializing only a few known attributes, we exclude certain non-serializable ele .. warning:: - By setting this variable to true, OpenLineage integration does not control the size of event you sent. It can potentially include elements weighting megabytes in size, depending on the size of data you pass to the task. + By setting this variable to true, OpenLineage integration does not control the size of event you sent. It can potentially include elements that are megabytes in size or larger, depending on the size of data you pass to the task. Custom Extractors