Skip to content

Commit

Permalink
openlineage: add config to include 'full' task info based on conf set…
Browse files Browse the repository at this point in the history
…ting

Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
  • Loading branch information
mobuchowski committed Jul 3, 2024
1 parent c5c50cc commit d1295e3
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 2 deletions.
5 changes: 5 additions & 0 deletions airflow/providers/openlineage/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,8 @@ def execution_timeout() -> int:
"""[openlineage] execution_timeout."""
option = conf.get(_CONFIG_SECTION, "execution_timeout", fallback="")
return _safe_int_convert(str(option).strip(), default=10)


@cache
def include_full_task_info() -> bool:
return _is_true(conf.get(_CONFIG_SECTION, "include_full_task_info", fallback="False"))
7 changes: 7 additions & 0 deletions airflow/providers/openlineage/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,10 @@ config:
example: ~
type: integer
version_added: 1.9.0
include_full_task_info:
description: |
If true, OpenLineage event will include full task info - potentially containing large fields.
default: "False"
example: ~
type: boolean
version_added: 1.10.0
21 changes: 20 additions & 1 deletion airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,25 @@ class TaskInfo(InfoJsonEncodable):
}


class TaskInfoComplete(TaskInfo):
"""Defines encoding BaseOperator/AbstractOperator object to JSON used when user enables full task info."""

includes = []
excludes = [
"_BaseOperator__instantiated",
"_dag",
"_hook",
"_log",
"_outlets",
"_inlets",
"_lock_for_execution",
"handler",
"params",
"python_callable",
"retry_delay",
]


class TaskGroupInfo(InfoJsonEncodable):
"""Defines encoding TaskGroup object to JSON."""

Expand Down Expand Up @@ -297,7 +316,7 @@ def get_airflow_run_facet(
dag=DagInfo(dag),
dagRun=DagRunInfo(dag_run),
taskInstance=TaskInstanceInfo(task_instance),
task=TaskInfo(task),
task=TaskInfoComplete(task) if conf.include_full_task_info() else TaskInfo(task),
taskUuid=task_uuid,
)
}
Expand Down
39 changes: 38 additions & 1 deletion tests/providers/openlineage/plugins/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import uuid
from json import JSONEncoder
from typing import Any
from unittest.mock import patch
from unittest.mock import MagicMock, patch

import pytest
from attrs import define
Expand All @@ -34,6 +34,7 @@
InfoJsonEncodable,
OpenLineageRedactor,
_is_name_redactable,
get_airflow_run_facet,
get_fully_qualified_class_name,
is_operator_disabled,
)
Expand Down Expand Up @@ -227,3 +228,39 @@ def test_is_operator_disabled(mock_disabled_operators):
"airflow.operators.python.PythonOperator",
}
assert is_operator_disabled(op) is True


@patch("airflow.providers.openlineage.conf.include_full_task_info")
def test_includes_full_task_info(mock_include_full_task_info):
mock_include_full_task_info.return_value = True
# There should be no 'bash_command' in excludes and it's not in includes - so
# it's a good choice for checking TaskInfo vs TaskInfoComplete
assert (
"bash_command"
in get_airflow_run_facet(
MagicMock(),
MagicMock(),
MagicMock(),
BashOperator(task_id="bash_op", bash_command="sleep 1"),
MagicMock(),
)["airflow"].task
)


@patch("airflow.providers.openlineage.conf.include_full_task_info")
def test_does_not_include_full_task_info(mock_include_full_task_info):
from airflow.operators.bash import BashOperator

mock_include_full_task_info.return_value = False
# There should be no 'bash_command' in excludes and it's not in includes - so
# it's a good choice for checking TaskInfo vs TaskInfoComplete
assert (
"bash_command"
not in get_airflow_run_facet(
MagicMock(),
MagicMock(),
MagicMock(),
BashOperator(task_id="bash_op", bash_command="sleep 1"),
MagicMock(),
)["airflow"].task
)
19 changes: 19 additions & 0 deletions tests/providers/openlineage/test_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
custom_extractors,
dag_state_change_process_pool_size,
disabled_operators,
include_full_task_info,
is_disabled,
is_source_enabled,
namespace,
Expand All @@ -52,6 +53,7 @@
_VAR_URL = "OPENLINEAGE_URL"
_CONFIG_OPTION_SELECTIVE_ENABLE = "selective_enable"
_CONFIG_OPTION_DAG_STATE_CHANGE_PROCESS_POOL_SIZE = "dag_state_change_process_pool_size"
_CONFIG_OPTION_INCLUDE_FULL_TASK_INFO = "include_full_task_info"

_BOOL_PARAMS = (
("1", True),
Expand Down Expand Up @@ -487,3 +489,20 @@ def test_dag_state_change_process_pool_size(var_string, expected):
with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_DAG_STATE_CHANGE_PROCESS_POOL_SIZE): var_string}):
result = dag_state_change_process_pool_size()
assert result == expected


@pytest.mark.parametrize(
("var", "expected"),
(
("False", False),
("True", True),
("t", True),
("a", False),
("asdf", False),
("31", False),
("true", True),
),
)
def test_include_full_task_info_reads_config(var, expected):
with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_INCLUDE_FULL_TASK_INFO): var}):
assert include_full_task_info() is expected

0 comments on commit d1295e3

Please sign in to comment.