Skip to content

Commit

Permalink
feat: mipac.utilをmipac.utils配下に分離
Browse files Browse the repository at this point in the history
  • Loading branch information
yupix committed Mar 30, 2023
1 parent f2b32f2 commit 381ad6e
Show file tree
Hide file tree
Showing 7 changed files with 449 additions and 21 deletions.
60 changes: 39 additions & 21 deletions mipac/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import aiohttp

from mipac.utils.util import deprecated as new_deprecated

try:
import orjson # type: ignore
except ModuleNotFoundError:
Expand Down Expand Up @@ -49,6 +51,29 @@
DEFAULT_CACHE_VALUE: dict[str, Any] = {}


@new_deprecated
def deprecated(func):
"""
This is a decorator which can be used to mark functions
as deprecated. It will result in a warning being emitted
when the function is used.
"""

@functools.wraps(func)
def new_func(*args, **kwargs):
warnings.simplefilter('always', DeprecationWarning) # turn off filter
warnings.warn(
'Call to deprecated function {}.'.format(func.__name__),
category=DeprecationWarning,
stacklevel=2,
)
warnings.simplefilter('default', DeprecationWarning) # reset filter
return func(*args, **kwargs)

return new_func


@new_deprecated
def snake_to_camel(snake_str: str, replace_list: dict[str, str]) -> str:
components: list[str] = snake_str.split('_')
for i in range(len(components)):
Expand All @@ -57,6 +82,7 @@ def snake_to_camel(snake_str: str, replace_list: dict[str, str]) -> str:
return components[0] + ''.join(x.title() for x in components[1:])


@new_deprecated
def convert_dict_keys_to_camel(
data: Mapping[Any, Any], replace_list: dict[str, str] | None = None
) -> Mapping[Any, Any]:
Expand All @@ -69,6 +95,7 @@ def convert_dict_keys_to_camel(
return new_dict


@new_deprecated
def str_to_datetime(data: str, format: str = '%Y-%m-%dT%H:%M:%S.%fZ') -> datetime:
"""
Parameters
Expand All @@ -86,33 +113,14 @@ def str_to_datetime(data: str, format: str = '%Y-%m-%dT%H:%M:%S.%fZ') -> datetim
return datetime.strptime(data, format)


def deprecated(func):
"""
This is a decorator which can be used to mark functions
as deprecated. It will result in a warning being emitted
when the function is used.
"""

@functools.wraps(func)
def new_func(*args, **kwargs):
warnings.simplefilter('always', DeprecationWarning) # turn off filter
warnings.warn(
'Call to deprecated function {}.'.format(func.__name__),
category=DeprecationWarning,
stacklevel=2,
)
warnings.simplefilter('default', DeprecationWarning) # reset filter
return func(*args, **kwargs)

return new_func


@new_deprecated
class MiTime:
def __init__(self, start: timedelta, end: datetime):
self.start = start
self.end = end


@new_deprecated
class AuthClient:
"""
Tokenの取得を手助けするクラス
Expand Down Expand Up @@ -225,6 +233,7 @@ async def check_auth(self) -> str:
return data['token'] if self.__use_miauth else data['accessToken']


@new_deprecated
def set_cache(group: str, key: str, value: Any):
if len(DEFAULT_CACHE.get(group, [])) > 50:
del DEFAULT_CACHE[group][-1]
Expand All @@ -236,6 +245,7 @@ def set_cache(group: str, key: str, value: Any):
DEFAULT_CACHE_VALUE[key] = value


@new_deprecated
def cache(group: str = 'default', override: bool = False):
def decorator(func):
async def wrapper(self, *args, **kwargs):
Expand All @@ -253,6 +263,7 @@ async def wrapper(self, *args, **kwargs):
return decorator


@new_deprecated
def get_cache_key(func):
async def decorator(self, *args, **kwargs):
ordered_kwargs = sorted(kwargs.items())
Expand All @@ -262,6 +273,7 @@ async def decorator(self, *args, **kwargs):
return decorator


@new_deprecated
def key_builder(func, cls, *args, **kwargs):
ordered_kwargs = sorted(kwargs.items())
key = (
Expand All @@ -270,6 +282,7 @@ def key_builder(func, cls, *args, **kwargs):
return key


@new_deprecated
def check_multi_arg(*args: Any) -> bool:
"""
複数の値を受け取り値が存在するかをboolで返します
Expand All @@ -287,6 +300,7 @@ def check_multi_arg(*args: Any) -> bool:
return bool([i for i in args if i])


@new_deprecated
def remove_list_empty(data: list[Any]) -> list[Any]:
"""
Parameters
Expand All @@ -302,6 +316,7 @@ def remove_list_empty(data: list[Any]) -> list[Any]:
return [k for k in data if k]


@new_deprecated
def remove_dict_empty(data: dict[str, Any]) -> dict[str, Any]:
"""
Parameters
Expand All @@ -319,6 +334,7 @@ def remove_dict_empty(data: dict[str, Any]) -> dict[str, Any]:
return _data


@new_deprecated
def upper_to_lower(
data: dict[str, Any],
field: Optional[dict[str, Any]] = None,
Expand Down Expand Up @@ -368,6 +384,7 @@ def upper_to_lower(
return field


@new_deprecated
def str_lower(text: str):
pattern = re.compile('[A-Z]')
large = [i.group().lower() for i in pattern.finditer(text)]
Expand All @@ -377,6 +394,7 @@ def str_lower(text: str):
return ''.join(result)


@new_deprecated
def bool_to_string(boolean: bool) -> str:
"""
boolを小文字にして文字列として返します
Expand Down
Empty file added mipac/utils/__init__.py
Empty file.
119 changes: 119 additions & 0 deletions mipac/utils/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import asyncio
import uuid
from urllib.parse import urlencode

import aiohttp

from mipac.utils.format import remove_dict_empty


class AuthClient:
"""
Tokenの取得を手助けするクラス
"""

def __init__(
self,
instance_uri: str,
name: str,
description: str,
permissions: list[str] | None = None,
*,
icon: str | None = None,
use_miauth: bool = False,
):
"""
Parameters
----------
instance_uri : str
アプリケーションを作成したいインスタンスのURL
name : str
アプリケーションの名前
description : str
アプリケーションの説明
permissions : Optional[list[str]], default=None
アプリケーションが要求する権限
icon: str | None, default=None
アプリケーションのアイコン画像URL
use_miauth: bool, default=False
MiAuthを使用するか
"""
if permissions is None:
permissions = ['read:account']
self.__client_session = aiohttp.ClientSession()
self.__instance_uri: str = instance_uri
self.__name: str = name
self.__description: str = description
self.__permissions: list[str] = permissions
self.__icon: str | None = icon
self.__use_miauth: bool = use_miauth
self.__session_token: uuid.UUID
self.__secret: str

async def get_auth_url(self) -> str:
"""
認証に使用するURLを取得します
Returns
-------
str
認証に使用するURL
"""
field = remove_dict_empty(
{'name': self.__name, 'description': self.__description, 'icon': self.__icon}
)
if self.__use_miauth:
field['permissions'] = self.__permissions
query = urlencode(field)
self.__session_token = uuid.uuid4()
return f'{self.__instance_uri}/miauth/{self.__session_token}?{query}'
else:
field['permission'] = self.__permissions
async with self.__client_session.post(
f'{self.__instance_uri}/api/app/create', json=field
) as res:
data = await res.json()
self.__secret = data['secret']
async with self.__client_session.post(
f'{self.__instance_uri}/api/auth/session/generate',
json={'appSecret': self.__secret},
) as res:
data = await res.json()
self.__session_token = data['token']
return data['url']

async def wait_miauth(self) -> str:
url = f'{self.__instance_uri}/api/miauth/{self.__session_token}/check'
while True:
async with self.__client_session.post(url) as res:
data = await res.json()
if data.get('ok') is True:
return data
await asyncio.sleep(1)

async def wait_oldauth(self) -> None:
while True:
async with self.__client_session.post(
f'{self.__instance_uri}/api/auth/session/userkey',
json={'appSecret': self.__secret, 'token': self.__session_token},
) as res:
data = await res.json()
if data.get('error', {}).get('code') != 'PENDING_SESSION':
break
await asyncio.sleep(1)

async def check_auth(self) -> str:
"""
認証が完了するまで待機し、完了した場合はTokenを返します
Returns
-------
str
Token
"""
if self.__use_miauth:
data = await self.wait_miauth()
else:
data = await self.wait_oldauth()
await self.__client_session.close()
return data['token'] if self.__use_miauth else data['accessToken']
47 changes: 47 additions & 0 deletions mipac/utils/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from functools import lru_cache
from typing import Any

DEFAULT_CACHE: dict[str, list[str]] = {}
DEFAULT_CACHE_VALUE: dict[str, Any] = {}


def set_cache(group: str, key: str, value: Any):
if len(DEFAULT_CACHE.get(group, [])) > 50:
del DEFAULT_CACHE[group][-1]
del DEFAULT_CACHE_VALUE[key]

if DEFAULT_CACHE.get(group) is None:
DEFAULT_CACHE[group] = []
DEFAULT_CACHE[group].append(key)
DEFAULT_CACHE_VALUE[key] = value


def cache(group: str = 'default', override: bool = False):
def decorator(func):
async def wrapper(self, *args, **kwargs):
key = cache_key_builder(func, self, *args, **kwargs)
hit_item = DEFAULT_CACHE_VALUE.get(key)
if hit_item and override is False and kwargs.get('cache_override') is None:
return hit_item
res = await func(self, *args, **kwargs)
set_cache(group, key, res)
return res

return wrapper

return decorator


def get_cache_key(func):
async def decorator(self, *args, **kwargs):
key = cache_key_builder(func, self, *args, **kwargs)
return await func(self, *args, **kwargs, cache_key=key)

return decorator


@lru_cache
def cache_key_builder(func, cls, *args, **kwargs):
ordered_kwargs = sorted(kwargs.items())
key = (func.__module__ or '') + '.{0}' + f'{cls}' + str(args) + str(ordered_kwargs)
return key
Loading

0 comments on commit 381ad6e

Please sign in to comment.