Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[openlineage] Update conf retrieval docstring and adjust pool_size #39721

Merged
merged 1 commit into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions airflow/providers/openlineage/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module provides functions for safely retrieving and handling OpenLineage configurations.

To prevent errors caused by invalid user-provided configuration values, we use ``conf.get()``
to fetch values as strings and perform safe conversions using custom functions.

Any invalid configuration values should be treated as incorrect and replaced with default values.
For example, if the default for boolean ``custom_ol_var`` is False, any non-true value provided:
``"asdf"``, ``12345``, ``{"key": 1}`` or empty string, will result in False being used.

By using default values for invalid configuration values, we ensure that the configurations are handled
safely, preventing potential runtime errors due to conversion issues.
"""

from __future__ import annotations

Expand All @@ -30,6 +43,13 @@ def _is_true(arg: Any) -> bool:
return str(arg).lower().strip() in ("true", "1", "t")


def _safe_int_convert(arg: Any, default: int) -> int:
try:
return int(arg)
except (ValueError, TypeError):
return default


@cache
def config_path(check_legacy_env_var: bool = True) -> str:
"""[openlineage] config_path."""
Expand Down Expand Up @@ -108,5 +128,5 @@ def is_disabled() -> bool:
@cache
def dag_state_change_process_pool_size() -> int:
"""[openlineage] dag_state_change_process_pool_size."""
option = conf.getint(_CONFIG_SECTION, "dag_state_change_process_pool_size", fallback=1)
return option
option = conf.get(_CONFIG_SECTION, "dag_state_change_process_pool_size", fallback="")
return _safe_int_convert(str(option).strip(), default=1)
56 changes: 56 additions & 0 deletions tests/providers/openlineage/test_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@

from airflow.providers.openlineage.conf import (
_is_true,
_safe_int_convert,
config_path,
custom_extractors,
dag_state_change_process_pool_size,
disabled_operators,
is_disabled,
is_source_enabled,
Expand All @@ -49,6 +51,7 @@
_CONFIG_OPTION_DISABLED = "disabled"
_VAR_URL = "OPENLINEAGE_URL"
_CONFIG_OPTION_SELECTIVE_ENABLE = "selective_enable"
_CONFIG_OPTION_DAG_STATE_CHANGE_PROCESS_POOL_SIZE = "dag_state_change_process_pool_size"

_BOOL_PARAMS = (
("1", True),
Expand Down Expand Up @@ -76,6 +79,7 @@ def clear_cache():
transport.cache_clear()
is_disabled.cache_clear()
selective_enable.cache_clear()
dag_state_change_process_pool_size.cache_clear()
try:
yield
finally:
Expand All @@ -87,6 +91,7 @@ def clear_cache():
transport.cache_clear()
is_disabled.cache_clear()
selective_enable.cache_clear()
dag_state_change_process_pool_size.cache_clear()


@pytest.mark.parametrize(
Expand All @@ -103,6 +108,35 @@ def test_is_true(var_string, expected):
assert _is_true(var_string) is expected


@pytest.mark.parametrize(
"input_value, expected",
[
("123", 123),
(456, 456),
("789", 789),
(0, 0),
("0", 0),
],
)
def test_safe_int_convert(input_value, expected):
assert _safe_int_convert(input_value, default=1) == expected


@pytest.mark.parametrize(
"input_value, default",
[
("abc", 1),
("", 2),
(None, 3),
("123abc", 4),
([], 5),
("1.2", 6),
],
)
def test_safe_int_convert_erroneous_values(input_value, default):
assert _safe_int_convert(input_value, default) == default


@env_vars({_VAR_CONFIG_PATH: "env_var_path"})
@conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_CONFIG_PATH): None})
def test_config_path_legacy_env_var_is_used_when_no_conf_option_set():
Expand Down Expand Up @@ -456,3 +490,25 @@ def test_is_disabled_empty_conf_option():
)
def test_is_disabled_do_not_fail_if_conf_option_missing():
assert is_disabled() is True


@pytest.mark.parametrize(
("var_string", "expected"),
(
("1", 1),
("2 ", 2),
(" 3", 3),
("4.56", 1), # default
("asdf", 1), # default
("true", 1), # default
("false", 1), # default
("None", 1), # default
("", 1), # default
(" ", 1), # default
(None, 1), # default
),
)
def test_dag_state_change_process_pool_size(var_string, expected):
with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_DAG_STATE_CHANGE_PROCESS_POOL_SIZE): var_string}):
result = dag_state_change_process_pool_size()
assert result == expected