Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: cleanup unneeded references to typing module #63

Merged
merged 5 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions python/idsse_common/idsse/common/aws_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import logging
import fnmatch
import os
from collections.abc import Sequence
from datetime import datetime, timedelta, UTC
from typing import Sequence, Set, Tuple, Optional

from .path_builder import PathBuilder
from .utils import TimeDelta, datetime_gen, exec_cmd
Expand Down Expand Up @@ -91,15 +91,15 @@ def aws_cp(self, path: str, dest: str) -> bool:
finally:
pass

def check_for(self, issue: datetime, valid: datetime) -> Optional[Tuple[datetime, str]]:
def check_for(self, issue: datetime, valid: datetime) -> tuple[datetime, str] | None:
"""Checks if an object passed issue/valid exists

Args:
issue (datetime): The issue date/time used to format the path to the object's location
valid (datetime): The valid date/time used to format the path to the object's location

Returns:
Optional[Tuple[datetime, str]]: A tuple of the valid date/time (indicated by object's
[tuple[datetime, str] | None]: A tuple of the valid date/time (indicated by object's
location) and location (path) of a object, or None
if object does not exist
"""
Expand All @@ -117,7 +117,7 @@ def check_for(self, issue: datetime, valid: datetime) -> Optional[Tuple[datetime

def get_issues(self,
num_issues: int = 1,
issue_start: Optional[datetime] = None,
issue_start: datetime | None = None,
issue_end: datetime = datetime.now(UTC),
time_delta: timedelta = timedelta(hours=1)
) -> Sequence[datetime]:
Expand All @@ -136,7 +136,7 @@ def get_issues(self,
if time_delta == zero_time_delta:
raise ValueError('Time delta must be non zero')

issues_set: Set[datetime] = set()
issues_set: set[datetime] = set()
if issue_start:
datetimes = datetime_gen(issue_end, time_delta, issue_start, num_issues)
else:
Expand All @@ -161,19 +161,19 @@ def get_issues(self,

def get_valids(self,
issue: datetime,
valid_start: Optional[datetime] = None,
valid_end: Optional[datetime] = None) -> Sequence[Tuple[datetime, str]]:
valid_start: datetime | None = None,
valid_end: datetime | None = None) -> Sequence[tuple[datetime, str]]:
"""Get all objects consistent with the passed issue date/time and filter by valid range

Args:
issue (datetime): The issue date/time used to format the path to the object's location
valid_start (datetime, optional): All returned objects will be for
valid_start (datetime | None, optional): All returned objects will be for
valids >= valid_start. Defaults to None.
valid_end (datetime, optional): All returned objects will be for valids <= valid_end.
Defaults to None.
valid_end (datetime | None, optional): All returned objects will be for
valids <= valid_end. Defaults to None.

Returns:
Sequence[Tuple[datetime, str]]: A sequence of tuples with valid date/time (indicated by
Sequence[tuple[datetime, str]]: A sequence of tuples with valid date/time (indicated by
object's location) and the object's location (path).
Empty Sequence if no valids found for given time range.
"""
Expand Down
11 changes: 6 additions & 5 deletions python/idsse_common/idsse/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@
import glob
import json
import logging
from collections.abc import Iterable
from inspect import signature
from typing import Self, Union, List, Optional
from typing import Self

logger = logging.getLogger(__name__)


class Config:
"""Configuration data class"""
def __init__(self,
config: Union[dict, List[dict], str],
keys: Optional[Union[list, str]] = None,
config: dict | Iterable[dict] | str,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I normally use Sequence but Iterable is likely more correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought I read somewhere that Iterable was preferred for arguments because it gives the most flexibility to the caller, whereas Sequence was preferred for return values because it's more specific, and you (hopefully) know what exactly you're returning.

In general I too prefer Sequence.

keys: Iterable | str | None = None,
recursive: bool = False,
ignore_missing: bool = False) -> None:

Expand Down Expand Up @@ -90,7 +91,7 @@ def _load_from_filepath(self, filepath: str) -> dict:
with open(filepath, 'r', encoding='utf8') as file:
return json.load(file)

def _from_filepaths(self, filepaths: List[str], keys: str) -> Self:
def _from_filepaths(self, filepaths: Iterable[str], keys: str) -> Self:
config_dicts = [self._load_from_filepath(filepath)
for filepath in filepaths]
self._from_config_dicts(config_dicts, keys)
Expand All @@ -106,7 +107,7 @@ def _from_config_dict(self, config_dict: dict, keys: str) -> Self:
# update the instance dictionary to hold all configuration attributes
return self.__dict__.update(config_dict)

def _from_config_dicts(self, config_dicts: List[dict], keys: str) -> Self:
def _from_config_dicts(self, config_dicts: Iterable[dict], keys: str) -> Self:
self._from_config_dict(config_dicts[0], keys)
for config_dict in config_dicts[1:]:
# if inherited class takes only one argument
Expand Down
25 changes: 13 additions & 12 deletions python/idsse_common/idsse/common/json_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,24 @@
# ------------------------------------------------------------------------------

import json
from typing import Any, Dict, List, Optional, Tuple, Union
from collections.abc import Sequence
from typing import Any
from uuid import UUID, uuid4

Json = Union[Dict[str, Any], List[Any], int, str, float, bool, type[None]]
Json = dict[str, Any] | Sequence[Any] | int | str | float | bool | None


def get_corr_id(
message: Union[str, dict]
) -> Optional[Tuple[Optional[str], Optional[Union[UUID, str]], Optional[str]]]:
message: str | dict
) -> tuple[str | None, UUID | str | None, str | None] | None:
"""Extract the correlation id from a json message.
The correlation id is made of three parts: originator, uuid, issue date/time

Args:
message (Union[str, json]): The message to be searched as either a string or json obj
message (str | json]): The message to be searched as either a string or json obj

Returns:
Optional[Tuple[Optional[str], Optional[Union[UUID, str]], Optional[str]]]:
tuple[str | None, UUID | str | None, str | None] | None:
A tuple containing originator, uuid, and issue date/time, or None if a given part
was not found. Returns simply None if no parts found
"""
Expand All @@ -46,17 +47,17 @@ def get_corr_id(
return None


def add_corr_id(message: Union[dict, str],
def add_corr_id(message: dict | str,
originator: str,
uuid_: Optional[Union[UUID, str]] = None,
issue_dt: Optional[str] = None) -> dict:
uuid_: UUID | str | None = None,
issue_dt: str | None = None) -> dict:
"""Add (or overwrites) the three part correlation id to a json message

Args:
message (Union[dict, str]): The message to be updated
message (dict | str): The message to be updated
originator (str): String representation of the originating service
uuid_ (Union[UUID, str], optional): A UUID. Defaults to None.
issue_dt (str, optional): The specific issue date/time associated with the message.
uuid_ (UUID | str | None, optional): A UUID. Defaults to None.
issue_dt (str | None, optional): The specific issue date/time associated with the message.
Defaults to None.

Returns:
Expand Down
18 changes: 8 additions & 10 deletions python/idsse_common/idsse/common/log_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
import logging
import time
import uuid
from collections.abc import Sequence
from contextvars import ContextVar
from datetime import datetime
from typing import Union, Optional, List

from .utils import to_iso

Expand All @@ -28,21 +28,18 @@

def set_corr_id_context_var(
originator: str,
key: Optional[uuid.UUID] = None,
issue_dt: Optional[Union[str, datetime]] = None
key: uuid.UUID = uuid.uuid4(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uuid4() is new to me, I'm guessing that is what we should be using everywhere, I'll try to remember this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just copied code from inside this function, which I think was using uuid4(). I've seen places in our code that alias uuid4() as just uuid(), because it's the most common version people think of when they say "UUID". The format is xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.

There are other versions of the UUID standard, such as version 1 which uses the current time and the MAC address of the computer to generate a probably unique ID, but I've only ever used v4.

https://en.wikipedia.org/wiki/Universally_unique_identifier#Version_4_(random)

issue_dt: str | datetime | None = None
) -> None:
"""
Build and set correlation ID ContextVar for logging module, based on originator and
key (or generated UUID). Include issue_dt in correlation ID if provided.

Args:
originator (str): Function, class, service name, etc. that is using logging module
key (Optional[uuid.UUID]): a UUID. Default: randomly generated UUID.
issue_dt (Optional[Union[str, datetime]]): Datetime when a relevant forecast was issued
key (uuid.UUID, optional): a UUID. Default: randomly generated UUID.
issue_dt (str | datetime | None, optional): Datetime when a relevant forecast was issued
"""
if not key:
key = uuid.uuid4()

if issue_dt:
if not isinstance(issue_dt, str):
issue_dt = to_iso(issue_dt)
Expand All @@ -56,7 +53,7 @@ def get_corr_id_context_var_str() -> str:
return corr_id_context_var.get()


def get_corr_id_context_var_parts() -> List[str]:
def get_corr_id_context_var_parts() -> Sequence[str]:
"""Split correlation ID ContextVar into its parts, such as [originator, key, issue_datetime]"""
return corr_id_context_var.get().split(';')

Expand Down Expand Up @@ -159,7 +156,8 @@ def get_default_log_config(level: str,
'loggers': {
'': {
'level': level,
'handlers': ['default', 'rabbit']
# 'handlers': ['default', 'rabbit']
'handlers': ['default']
},
}
}
66 changes: 33 additions & 33 deletions python/idsse_common/idsse/common/path_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import os
import re
from datetime import datetime, timedelta, UTC
from typing import Dict, Self, Union
from typing import Self

from .utils import TimeDelta

Expand Down Expand Up @@ -115,17 +115,17 @@ def path_fmt(self):
return os.path.join(self.dir_fmt, self.filename_fmt)

def build_dir(self,
issue: datetime = None,
valid: datetime = None,
lead: Union[timedelta, TimeDelta] = None) -> str:
issue: datetime | None = None,
valid: datetime | None = None,
lead: timedelta | TimeDelta | None = None) -> str:
"""Attempts to build the directory with provided arguments

Args:
issue (datetime, optional): Issue datetime, should be provided is the
issue (datetime | None, optional): Issue datetime, should be provided is the
directory is dependant on it. Defaults to None.
valid (datetime, optional): Valid datetime, should be provided is the
valid (datetime | None, optional): Valid datetime, should be provided is the
directory is dependant on it. . Defaults to None.
lead (Union[timedelta, TimeDelta], optional): Lead can be provided in addition
lead (timedelta | TimeDelta | None, optional): Lead can be provided in addition
to issue or valid. Defaults to None.

Returns:
Expand All @@ -137,17 +137,17 @@ def build_dir(self,
return self.dir_fmt.format(issue=issue, valid=valid, lead=lead)

def build_filename(self,
issue: datetime = None,
valid: datetime = None,
lead: Union[timedelta, TimeDelta] = None) -> str:
issue: datetime | None = None,
valid: datetime | None = None,
lead: timedelta | TimeDelta | None = None) -> str:
"""Attempts to build the filename with provided arguments

Args:
issue (datetime, optional): Issue datetime, should be provided is the
issue (datetime | None, optional): Issue datetime, should be provided is the
filename is dependant on it. Defaults to None.
valid (datetime, optional): Valid datetime, should be provided is the
valid (datetime | None, optional): Valid datetime, should be provided is the
filename is dependant on it. . Defaults to None.
lead (Union[timedelta, TimeDelta], optional): Lead can be provided in addition
lead (timedelta | TimeDelta | None, optional): Lead can be provided in addition
to issue or valid. Defaults to None.

Returns:
Expand All @@ -157,17 +157,17 @@ def build_filename(self,
return self.filename_fmt.format(issue=issue, valid=valid, lead=lead)

def build_path(self,
issue: datetime = None,
valid: datetime = None,
lead: Union[timedelta, TimeDelta] = None) -> str:
issue: datetime | None = None,
valid: datetime | None = None,
lead: timedelta | TimeDelta | None = None) -> str:
"""Attempts to build the path with provided arguments

Args:
issue (datetime, optional): Issue datetime, should be provided is the
issue (datetime | None, optional): Issue datetime, should be provided is the
path is dependant on it. Defaults to None.
valid (datetime, optional): Valid datetime, should be provided is the
valid (datetime | None, optional): Valid datetime, should be provided is the
path is dependant on it. . Defaults to None.
lead (Union[timedelta, TimeDelta], optional): Lead can be provided in addition
lead (timedelta | TimeDelta | None, optional): Lead can be provided in addition
to issue or valid. Defaults to None.

Returns:
Expand Down Expand Up @@ -235,17 +235,17 @@ def get_valid(self, path: str) -> datetime:
return self.get_valid_from_time_args(time_args)

@staticmethod
def get_issue_from_time_args(parsed_args: Dict,
valid: datetime = None,
lead: timedelta = None) -> datetime:
def get_issue_from_time_args(parsed_args: dict,
valid: datetime | None = None,
lead: timedelta | None = None) -> datetime:
"""Static method for creating an issue date/time from parsed arguments and optional inputs

Args:
parsed_args (dict): A dictionary of issue, valid and/or lead info resulting
from parsing a path, dir, or filename
valid (datetime, optional): Depending on info found during parsing, valid date/time
can be useful. Defaults to None.
lead (timedelta, optional): Depending on info found during parsing, lead time
valid (datetime | None, optional): Depending on info found during parsing,
valid date/time can be useful. Defaults to None.
lead (timedelta | None, optional): Depending on info found during parsing, lead time
can be useful. . Defaults to None.

Returns:
Expand Down Expand Up @@ -274,17 +274,17 @@ def get_issue_from_time_args(parsed_args: Dict,

@staticmethod
def get_valid_from_time_args(parsed_args: dict,
issue: datetime = None,
lead: timedelta = None) -> datetime:
issue: datetime | None = None,
lead: timedelta | None = None) -> datetime:
"""Static method for creating a valid date/time from parsed arguments and optional inputs

Args:
parsed_args (dict): A dictionary of issue, valid and/or lead info resulting
from parsing a path, dir, or filename
issue (datetime, optional): Depending on info found during parsing, issue date/time
issue (datetime | None, optional): Depending on info found during parsing,
issue date/time can be useful. Defaults to None.
lead (timedelta | None, optional): Depending on info found during parsing, lead time
can be useful. Defaults to None.
lead (timedelta, optional): Depending on info found during parsing, lead time
can be useful. . Defaults to None.

Returns:
datetime: Valid date/time
Expand Down Expand Up @@ -329,7 +329,7 @@ def get_lead_from_time_args(time_args: dict) -> timedelta:
@staticmethod
def _ensure_lead(issue: datetime,
valid: datetime,
lead: Union[timedelta, TimeDelta]) -> TimeDelta:
lead: timedelta | TimeDelta) -> TimeDelta:
if lead:
if isinstance(lead, timedelta):
return TimeDelta(lead)
Expand All @@ -338,8 +338,8 @@ def _ensure_lead(issue: datetime,
return TimeDelta(valid-issue)
return None

def _parse_times(self, string: str, format_str: str) -> Dict:
def parse_args(key: str, value: str, result: Dict):
def _parse_times(self, string: str, format_str: str) -> dict:
def parse_args(key: str, value: str, result: dict):
for arg in key.split('{')[1:]:
var_name, var_size = arg.split(':')
var_type = var_size[2:3]
Expand Down
Loading
Loading