diff --git a/airflow/providers/openlineage/conf.py b/airflow/providers/openlineage/conf.py index 8f3c2b3571c7e..0e4af0a70bc04 100644 --- a/airflow/providers/openlineage/conf.py +++ b/airflow/providers/openlineage/conf.py @@ -145,3 +145,8 @@ def execution_timeout() -> int: """[openlineage] execution_timeout.""" option = conf.get(_CONFIG_SECTION, "execution_timeout", fallback="") return _safe_int_convert(str(option).strip(), default=10) + + +@cache +def include_full_task_info() -> bool: + return conf.getboolean(_CONFIG_SECTION, "include_full_task_info", fallback="False") diff --git a/airflow/providers/openlineage/provider.yaml b/airflow/providers/openlineage/provider.yaml index 9622226c7f907..cfa001fdbaae5 100644 --- a/airflow/providers/openlineage/provider.yaml +++ b/airflow/providers/openlineage/provider.yaml @@ -153,3 +153,10 @@ config: example: ~ type: integer version_added: 1.9.0 + include_full_task_info: + description: | + If true, OpenLineage event will include full task info - potentially containing large fields. + default: "False" + example: ~ + type: boolean + version_added: 1.10.0 diff --git a/airflow/providers/openlineage/utils/utils.py b/airflow/providers/openlineage/utils/utils.py index 0c6f4bdb8766b..ea3a663f7784d 100644 --- a/airflow/providers/openlineage/utils/utils.py +++ b/airflow/providers/openlineage/utils/utils.py @@ -272,6 +272,25 @@ class TaskInfo(InfoJsonEncodable): } +class TaskInfoComplete(TaskInfo): + """Defines encoding BaseOperator/AbstractOperator object to JSON used when user enables full task info.""" + + includes = [] + excludes = [ + "_BaseOperator__instantiated", + "_dag", + "_hook", + "_log", + "_outlets", + "_inlets", + "_lock_for_execution", + "handler", + "params", + "python_callable", + "retry_delay", + ] + + class TaskGroupInfo(InfoJsonEncodable): """Defines encoding TaskGroup object to JSON.""" @@ -300,7 +319,7 @@ def get_airflow_run_facet( dag=DagInfo(dag), dagRun=DagRunInfo(dag_run), taskInstance=TaskInstanceInfo(task_instance), - task=TaskInfo(task), + task=TaskInfoComplete(task) if conf.include_full_task_info() else TaskInfo(task), taskUuid=task_uuid, ) } diff --git a/docs/apache-airflow-providers-openlineage/guides/user.rst b/docs/apache-airflow-providers-openlineage/guides/user.rst index e7decec1f1539..437da6d0fac19 100644 --- a/docs/apache-airflow-providers-openlineage/guides/user.rst +++ b/docs/apache-airflow-providers-openlineage/guides/user.rst @@ -246,6 +246,28 @@ full import paths of Airflow Operators to disable as ``disabled_for_operators`` AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS='airflow.operators.bash.BashOperator;airflow.operators.python.PythonOperator' +Full Task Info +^^^^^^^^^^^^^^ + +By default, OpenLineage integration's AirflowRunFacet - attached on START event for every task instance event - does +not contain full serialized task information (parameters to given operator), but only includes select parameters. + +However, we allow users to set OpenLineage integration to include full task information. By doing this, rather than +serializing only a few known attributes, we exclude certain non-serializable elements and send everything else. + +.. code-block:: ini + + [openlineage] + transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"} + include_full_task_info = true + +``AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO`` environment variable is an equivalent. + +.. warning:: + + By setting this variable to true, OpenLineage integration does not control the size of event you sent. It can potentially include elements that are megabytes in size or larger, depending on the size of data you pass to the task. + + Custom Extractors ^^^^^^^^^^^^^^^^^ diff --git a/tests/providers/openlineage/plugins/test_utils.py b/tests/providers/openlineage/plugins/test_utils.py index 016bb99d4e6b0..8ca245d1f3d01 100644 --- a/tests/providers/openlineage/plugins/test_utils.py +++ b/tests/providers/openlineage/plugins/test_utils.py @@ -21,7 +21,7 @@ import uuid from json import JSONEncoder from typing import Any -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest from attrs import define @@ -34,6 +34,7 @@ InfoJsonEncodable, OpenLineageRedactor, _is_name_redactable, + get_airflow_run_facet, get_fully_qualified_class_name, is_operator_disabled, ) @@ -227,3 +228,39 @@ def test_is_operator_disabled(mock_disabled_operators): "airflow.operators.python.PythonOperator", } assert is_operator_disabled(op) is True + + +@patch("airflow.providers.openlineage.conf.include_full_task_info") +def test_includes_full_task_info(mock_include_full_task_info): + mock_include_full_task_info.return_value = True + # There should be no 'bash_command' in excludes and it's not in includes - so + # it's a good choice for checking TaskInfo vs TaskInfoComplete + assert ( + "bash_command" + in get_airflow_run_facet( + MagicMock(), + MagicMock(), + MagicMock(), + BashOperator(task_id="bash_op", bash_command="sleep 1"), + MagicMock(), + )["airflow"].task + ) + + +@patch("airflow.providers.openlineage.conf.include_full_task_info") +def test_does_not_include_full_task_info(mock_include_full_task_info): + from airflow.operators.bash import BashOperator + + mock_include_full_task_info.return_value = False + # There should be no 'bash_command' in excludes and it's not in includes - so + # it's a good choice for checking TaskInfo vs TaskInfoComplete + assert ( + "bash_command" + not in get_airflow_run_facet( + MagicMock(), + MagicMock(), + MagicMock(), + BashOperator(task_id="bash_op", bash_command="sleep 1"), + MagicMock(), + )["airflow"].task + ) diff --git a/tests/providers/openlineage/test_conf.py b/tests/providers/openlineage/test_conf.py index 60060b001c6d6..7e0a1c85a78b2 100644 --- a/tests/providers/openlineage/test_conf.py +++ b/tests/providers/openlineage/test_conf.py @@ -21,6 +21,7 @@ import pytest +from airflow.exceptions import AirflowConfigException from airflow.providers.openlineage.conf import ( _is_true, _safe_int_convert, @@ -28,6 +29,7 @@ custom_extractors, dag_state_change_process_pool_size, disabled_operators, + include_full_task_info, is_disabled, is_source_enabled, namespace, @@ -52,6 +54,7 @@ _VAR_URL = "OPENLINEAGE_URL" _CONFIG_OPTION_SELECTIVE_ENABLE = "selective_enable" _CONFIG_OPTION_DAG_STATE_CHANGE_PROCESS_POOL_SIZE = "dag_state_change_process_pool_size" +_CONFIG_OPTION_INCLUDE_FULL_TASK_INFO = "include_full_task_info" _BOOL_PARAMS = ( ("1", True), @@ -487,3 +490,33 @@ def test_dag_state_change_process_pool_size(var_string, expected): with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_DAG_STATE_CHANGE_PROCESS_POOL_SIZE): var_string}): result = dag_state_change_process_pool_size() assert result == expected + + +@pytest.mark.parametrize( + ("var", "expected"), + ( + ("False", False), + ("True", True), + ("t", True), + ("true", True), + ), +) +def test_include_full_task_info_reads_config(var, expected): + with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_INCLUDE_FULL_TASK_INFO): var}): + assert include_full_task_info() is expected + + +@pytest.mark.parametrize( + "var", + [ + "a", + "asdf", + "31", + "", + " ", + ], +) +def test_include_full_task_info_raises_exception(var): + with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_INCLUDE_FULL_TASK_INFO): var}): + with pytest.raises(AirflowConfigException): + include_full_task_info()