Skip to content

Commit

Permalink
Merge pull request #448 from HopkinsIDD/feature/365/convenience-click…
Browse files Browse the repository at this point in the history
…-types

Duration/Memory Convenience Click Types
  • Loading branch information
TimothyWillard authored Jan 13, 2025
2 parents 5eee4a6 + 4071d37 commit 533fcd9
Show file tree
Hide file tree
Showing 4 changed files with 349 additions and 4 deletions.
211 changes: 211 additions & 0 deletions flepimop/gempyor_pkg/src/gempyor/_click.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
__all__ = []


from datetime import timedelta
from math import ceil
import re
from typing import Any, Literal

import click


class DurationParamType(click.ParamType):
"""
A custom Click parameter type for parsing duration strings into `timedelta` objects.
Attributes:
name: The name of the parameter type.
Examples:
>>> from gempyor._click import DurationParamType
>>> DurationParamType(False, "minutes").convert("23min", None, None)
datetime.timedelta(seconds=1380)
>>> DurationParamType(False, None).convert("2.5hr", None, None)
datetime.timedelta(seconds=9000)
>>> DurationParamType(False, "minutes").convert("-2", None, None)
datetime.timedelta(days=-1, seconds=86280)
"""

name = "duration"
_abbreviations = {
"s": "seconds",
"sec": "seconds",
"secs": "seconds",
"second": "seconds",
"seconds": "seconds",
"m": "minutes",
"min": "minutes",
"mins": "minutes",
"minute": "minutes",
"minutes": "minutes",
"h": "hours",
"hr": "hours",
"hrs": "hours",
"hour": "hours",
"hours": "hours",
"d": "days",
"day": "days",
"days": "days",
"w": "weeks",
"week": "weeks",
"weeks": "weeks",
}

def __init__(
self,
nonnegative: bool,
default_unit: Literal["seconds", "minutes", "hours", "days", "weeks"] | None,
) -> None:
"""
Initialize the instance based on parameter settings.
Args:
nonnegative: If `True` negative durations are not allowed.
default_unit: The default unit to use if no unit is specified in the input
string. If `None` a unitless duration is not allowed.
Notes:
It's on the user of this param type to document in their CLI help text what
the default unit is if they set it to a non-`None` value.
"""
super().__init__()
self._nonnegative = nonnegative
self._duration_regex = re.compile(
rf"^((-)?([0-9]+)?(\.[0-9]+)?)({'|'.join(self._abbreviations.keys())})?$",
flags=re.IGNORECASE,
)
self._default_unit = default_unit

def convert(
self, value: Any, param: click.Parameter | None, ctx: click.Context | None
) -> timedelta:
"""
Converts a string representation of a duration into a `timedelta` object.
Args:
value: The value to convert, expected to be a string like representation of
a duration. Allowed durations are limited to seconds, minutes, hours,
days, and weeks.
param: The Click parameter object for context in errors.
ctx: The Click context object for context in errors.
Returns:
The converted duration as a `timedelta` object.
Raises:
click.BadParameter: If the value is not a valid duration based on the
format.
click.BadParameter: If the duration is negative and the class was
initialized with `nonnegative` set to `True`.
click.BadParameter: If the duration is unitless and the class was
initialized with `default_unit` set to `None`.
"""
value = str(value).strip()
if (m := self._duration_regex.match(value)) is None:
self.fail(f"{value!r} is not a valid duration", param, ctx)
number, posneg, _, _, unit = m.groups()
if self._nonnegative and posneg == "-":
self.fail(f"{value!r} is a negative duration", param, ctx)
if unit is None:
if self._default_unit is None:
self.fail(f"{value!r} is a unitless duration", param, ctx)
unit = self._default_unit
kwargs = {}
kwargs[self._abbreviations.get(unit.lower())] = float(number)
return timedelta(**kwargs)


class MemoryParamType(click.ParamType):
"""
A custom Click parameter type for parsing memory strings.
Attributes:
name: The name of the parameter type.
Examples:
>>> from gempyor._click import MemoryParamType
>>> MemoryParamType(False, "mb", False).convert("12.34MB", None, None)
12.34
>>> MemoryParamType(True, "mb", True).convert("78.9", None, None)
79
>>> MemoryParamType(False, "gb", False).convert("123kb", None, None)
0.00011730194091796875
"""

name = "memory"
_units = {
"kb": 1024.0**1.0,
"k": 1024.0**1.0,
"mb": 1024.0**2.0,
"m": 1024.0**2.0,
"gb": 1024.0**3.0,
"g": 1024.0**3.0,
"t": 1024.0**4.0,
"tb": 1024.0**4.0,
}

def __init__(self, as_int: bool, unit: str, allow_unitless: bool) -> None:
"""
Initialize the instance based on parameter settings.
Args:
as_int: if `True` the `convert` method returns an integer instead of a
float.
unit: The output unit to use in the `convert` method.
Raises:
ValueError: If `unit` is not a valid memory unit size.
"""
super().__init__()
if (unit := unit.lower()) not in self._units.keys():
raise ValueError(
f"The `unit` given is not valid, given '{unit}' and "
f"must be one of: {', '.join(self._units.keys())}."
)
self._unit = unit
self._regex = re.compile(
rf"^(([0-9]+)?(\.[0-9]+)?)({'|'.join(self._units.keys())})?$",
flags=re.IGNORECASE,
)
self._as_int = as_int
self._allow_unitless = allow_unitless

def convert(
self, value: Any, param: click.Parameter | None, ctx: click.Context | None
) -> float | int:
"""
Converts a string representation of a memory size into a numeric.
Args:
value: The value to convert, expected to be a string like representation of
memory size.
param: The Click parameter object for context in errors.
ctx: The Click context object for context in errors.
Returns:
The converted memory size as a numeric. Specifically an integer if the
`as_int` attribute is `True` and float otherwise.
Raises:
click.BadParameter: If the value is not a valid memory size based on the
format.
click.BadParameter: If the memory size is unitless and the class was
initialized with `allow_unitless` set to `False`.
"""
value = str(value).strip()
if (m := self._regex.match(value)) is None:
self.fail(f"{value!r} is not a valid memory size.", param, ctx)
number, _, _, unit = m.groups()
if unit is None:
if not self._allow_unitless:
self.fail(f"{value!r} is a unitless memory size.", param, ctx)
unit = self._unit
else:
unit = unit.lower()
if unit == self._unit:
result = float(number)
else:
result = (self._units.get(unit, self._unit) * float(number)) / (
self._units.get(self._unit)
)
return ceil(result) if self._as_int else result
9 changes: 5 additions & 4 deletions flepimop/gempyor_pkg/src/gempyor/shared_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,21 @@
supported options for config file overrides, and custom click decorators.
"""

__all__ = []


import multiprocessing
import pathlib

from typing import Any, Callable
import warnings
from typing import Callable, Any
import re

import click
import confuse

from .logging import get_script_logger
from .utils import config, as_list

__all__ = []


@click.group()
@click.pass_context
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from datetime import timedelta
from typing import Literal

from click.exceptions import BadParameter
import pytest

from gempyor._click import DurationParamType


@pytest.mark.parametrize("nonnegative", (True, False))
@pytest.mark.parametrize("value", ("abc", "$12.34", "12..3", "12years", "12.a2"))
def test_invalid_duration_bad_parameter(nonnegative: bool, value: str) -> None:
duration = DurationParamType(nonnegative=nonnegative, default_unit="seconds")
with pytest.raises(BadParameter, match="^'.*' is not a valid duration$"):
duration.convert(value, None, None)


@pytest.mark.parametrize("value", ("-1", "-123", "-99.45", "-.9"))
def test_negative_duration_bad_parameter(value: str) -> None:
duration = DurationParamType(nonnegative=True, default_unit="seconds")
with pytest.raises(BadParameter, match="^'.*' is a negative duration$"):
duration.convert(value, None, None)


@pytest.mark.parametrize("value", ("1", "-123", "99.45", "-.9"))
def test_unitless_duration_bad_paramter(value: str) -> None:
duration = DurationParamType(nonnegative=False, default_unit=None)
with pytest.raises(BadParameter, match="^'.*' is a unitless duration$"):
duration.convert(value, None, None)


@pytest.mark.parametrize(
("value", "default_unit", "expected"),
(
("1", "minutes", timedelta(minutes=1)),
("1", "days", timedelta(days=1)),
("2s", None, timedelta(seconds=2)),
("3hrs", None, timedelta(hours=3)),
("-4min", None, timedelta(minutes=-4)),
("-5d", None, timedelta(days=-5)),
("12.3", "seconds", timedelta(seconds=12.3)),
("12.3", "hours", timedelta(hours=12.3)),
("12.3", "weeks", timedelta(weeks=12.3)),
("-45.6h", None, timedelta(hours=-45.6)),
("-.1w", None, timedelta(weeks=-0.1)),
("0.0Weeks", "days", timedelta(weeks=0)),
),
)
def test_exact_results_for_select_inputs(
value: str,
default_unit: Literal["seconds", "minutes", "hours", "days", "weeks"] | None,
expected: timedelta,
) -> None:
duration = DurationParamType(nonnegative=False, default_unit=default_unit)
assert duration.convert(value, None, None) == expected
assert duration.convert(value.upper(), None, None) == expected
assert duration.convert(value.lower(), None, None) == expected
76 changes: 76 additions & 0 deletions flepimop/gempyor_pkg/tests/_click/test_memory_param_type_class.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import random

from click.exceptions import BadParameter
import pytest

from gempyor._click import MemoryParamType


@pytest.mark.parametrize("unit", ("Nope", "NO CHANCE", "wrong", "bb"))
def test_invalid_unit_value_error(unit: str) -> None:
with pytest.raises(
ValueError,
match=(
"^The `unit` given is not valid, given "
f"'{unit.lower()}' and must be one of:.*.$"
),
):
MemoryParamType(False, unit, True)


@pytest.mark.parametrize("value", ("1..2MB", "3.4cb", "56.abc", "-1GB"))
def test_invalid_value_bad_parameter(value: str) -> None:
memory = MemoryParamType(False, "mb", True)
with pytest.raises(BadParameter, match="^.* is not a valid memory size.$"):
memory.convert(value, None, None)


@pytest.mark.parametrize("value", ("1", "123", "99.45", ".9"))
def test_unitless_value_bad_parameter(value: str) -> None:
memory = MemoryParamType(False, "mb", False)
with pytest.raises(BadParameter, match="^'.*' is a unitless memory size.$"):
memory.convert(value, None, None)


@pytest.mark.parametrize("unit", MemoryParamType._units.keys())
@pytest.mark.parametrize("as_int", (True, False))
@pytest.mark.parametrize(
"number",
[random.randint(1, 1000) for _ in range(3)] # int
+ [random.random() for _ in range(3)] # float without numbers left of decimal
+ [
random.randint(1, 25) + random.random() for _ in range(3)
], # float with numbers left of the decimal
)
def test_convert_acts_as_identity(unit: str, as_int: bool, number: int | float) -> None:
memory = MemoryParamType(as_int, unit, True)
for u in (unit, unit.upper()):
result = memory.convert(f"{number}{u}".lstrip("0"), None, None)
assert isinstance(result, int if as_int else float)
assert abs(result - number) <= 1 if as_int else result == number


@pytest.mark.parametrize(
("as_int", "unit", "allow_unitless", "value", "expected"),
(
(False, "gb", False, "1.2gb", 1.2),
(True, "gb", False, "1.2gb", 2),
(False, "kb", False, "1mb", 1024.0),
(True, "kb", False, "1mb", 1024),
(False, "gb", False, "30mb", 30.0 / 1024.0),
(True, "gb", False, "30mb", 1),
(False, "kb", False, "2tb", 2.0 * (1024.0**3.0)),
(True, "kb", False, "2tb", 2147483648),
(False, "mb", False, "0.1gb", 0.1 * 1024.0),
(True, "mb", False, "0.1gb", 103),
(False, "gb", True, "4", 4.0),
(True, "gb", True, "4", 4),
(False, "mb", True, "1234.56", 1234.56),
(True, "mb", True, "1234.56", 1235),
),
)
def test_exact_results_for_select_inputs(
unit: str, as_int: bool, allow_unitless: bool, value: str, expected: float | int
) -> None:
memory = MemoryParamType(as_int, unit, allow_unitless)
assert memory.convert(value, None, None) == expected

0 comments on commit 533fcd9

Please sign in to comment.