Skip to content

Commit

Permalink
chore: Update conf retrieval docstring and adjust pool_size (#39721)
Browse files Browse the repository at this point in the history
Signed-off-by: Kacper Muda <mudakacper@gmail.com>
  • Loading branch information
kacpermuda authored May 21, 2024
1 parent f509b0a commit a81504e
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 2 deletions.
24 changes: 22 additions & 2 deletions airflow/providers/openlineage/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module provides functions for safely retrieving and handling OpenLineage configurations.
To prevent errors caused by invalid user-provided configuration values, we use ``conf.get()``
to fetch values as strings and perform safe conversions using custom functions.
Any invalid configuration values should be treated as incorrect and replaced with default values.
For example, if the default for boolean ``custom_ol_var`` is False, any non-true value provided:
``"asdf"``, ``12345``, ``{"key": 1}`` or empty string, will result in False being used.
By using default values for invalid configuration values, we ensure that the configurations are handled
safely, preventing potential runtime errors due to conversion issues.
"""

from __future__ import annotations

Expand All @@ -30,6 +43,13 @@ def _is_true(arg: Any) -> bool:
return str(arg).lower().strip() in ("true", "1", "t")


def _safe_int_convert(arg: Any, default: int) -> int:
try:
return int(arg)
except (ValueError, TypeError):
return default


@cache
def config_path(check_legacy_env_var: bool = True) -> str:
"""[openlineage] config_path."""
Expand Down Expand Up @@ -108,5 +128,5 @@ def is_disabled() -> bool:
@cache
def dag_state_change_process_pool_size() -> int:
"""[openlineage] dag_state_change_process_pool_size."""
option = conf.getint(_CONFIG_SECTION, "dag_state_change_process_pool_size", fallback=1)
return option
option = conf.get(_CONFIG_SECTION, "dag_state_change_process_pool_size", fallback="")
return _safe_int_convert(str(option).strip(), default=1)
56 changes: 56 additions & 0 deletions tests/providers/openlineage/test_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@

from airflow.providers.openlineage.conf import (
_is_true,
_safe_int_convert,
config_path,
custom_extractors,
dag_state_change_process_pool_size,
disabled_operators,
is_disabled,
is_source_enabled,
Expand All @@ -49,6 +51,7 @@
_CONFIG_OPTION_DISABLED = "disabled"
_VAR_URL = "OPENLINEAGE_URL"
_CONFIG_OPTION_SELECTIVE_ENABLE = "selective_enable"
_CONFIG_OPTION_DAG_STATE_CHANGE_PROCESS_POOL_SIZE = "dag_state_change_process_pool_size"

_BOOL_PARAMS = (
("1", True),
Expand Down Expand Up @@ -76,6 +79,7 @@ def clear_cache():
transport.cache_clear()
is_disabled.cache_clear()
selective_enable.cache_clear()
dag_state_change_process_pool_size.cache_clear()
try:
yield
finally:
Expand All @@ -87,6 +91,7 @@ def clear_cache():
transport.cache_clear()
is_disabled.cache_clear()
selective_enable.cache_clear()
dag_state_change_process_pool_size.cache_clear()


@pytest.mark.parametrize(
Expand All @@ -103,6 +108,35 @@ def test_is_true(var_string, expected):
assert _is_true(var_string) is expected


@pytest.mark.parametrize(
"input_value, expected",
[
("123", 123),
(456, 456),
("789", 789),
(0, 0),
("0", 0),
],
)
def test_safe_int_convert(input_value, expected):
assert _safe_int_convert(input_value, default=1) == expected


@pytest.mark.parametrize(
"input_value, default",
[
("abc", 1),
("", 2),
(None, 3),
("123abc", 4),
([], 5),
("1.2", 6),
],
)
def test_safe_int_convert_erroneous_values(input_value, default):
assert _safe_int_convert(input_value, default) == default


@env_vars({_VAR_CONFIG_PATH: "env_var_path"})
@conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_CONFIG_PATH): None})
def test_config_path_legacy_env_var_is_used_when_no_conf_option_set():
Expand Down Expand Up @@ -456,3 +490,25 @@ def test_is_disabled_empty_conf_option():
)
def test_is_disabled_do_not_fail_if_conf_option_missing():
assert is_disabled() is True


@pytest.mark.parametrize(
("var_string", "expected"),
(
("1", 1),
("2 ", 2),
(" 3", 3),
("4.56", 1), # default
("asdf", 1), # default
("true", 1), # default
("false", 1), # default
("None", 1), # default
("", 1), # default
(" ", 1), # default
(None, 1), # default
),
)
def test_dag_state_change_process_pool_size(var_string, expected):
with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_DAG_STATE_CHANGE_PROCESS_POOL_SIZE): var_string}):
result = dag_state_change_process_pool_size()
assert result == expected

0 comments on commit a81504e

Please sign in to comment.