diff --git a/lagrange/__init__.py b/lagrange/__init__.py index a70a98a..febc07b 100644 --- a/lagrange/__init__.py +++ b/lagrange/__init__.py @@ -2,8 +2,8 @@ import asyncio from .client.client import Client as Client -from .client.server_push.msg import msg_push_handler -from .client.server_push.service import server_kick_handler +# from .client.server_push.msg import msg_push_handler +# from .client.server_push.service import server_kick_handler from .utils.log import log as log from .utils.log import install_loguru as install_loguru from .utils.sign import sign_provider diff --git a/lagrange/client/base.py b/lagrange/client/base.py index ac13896..c1d221a 100644 --- a/lagrange/client/base.py +++ b/lagrange/client/base.py @@ -1,7 +1,8 @@ import asyncio import hashlib import time -from typing import Callable, Coroutine, Dict, Optional, Tuple, Union, overload +from typing import Callable, Optional, Union, overload +from collections.abc import Coroutine from typing_extensions import Literal @@ -60,7 +61,7 @@ def __init__( self._captcha_info = ["", "", ""] # ticket, rand_str, aid self._server_push_queue: asyncio.Queue[SSOPacket] = asyncio.Queue() - self._tasks: Dict[str, asyncio.Task] = {} + self._tasks: dict[str, asyncio.Task] = {} self._network = ClientNetwork( sig_info, self._server_push_queue, @@ -70,8 +71,8 @@ def __init__( ) self._sign_provider = sign_provider - self._t106 = bytes() - self._t16a = bytes() + self._t106 = b"" + self._t16a = b"" self._online = asyncio.Event() @@ -214,7 +215,7 @@ async def send_uni_packet(self, cmd, buf, send_only: bool = False, timeout=10): return await self._network.send(packet, wait_seq=-1, timeout=timeout) return await self._network.send(packet, wait_seq=seq, timeout=timeout) - async def fetch_qrcode(self) -> Union[int, Tuple[bytes, str]]: + async def fetch_qrcode(self) -> Union[int, tuple[bytes, str]]: tlv = QrCodeTlvBuilder() body = ( PacketBuilder() diff --git a/lagrange/client/client.py b/lagrange/client/client.py index e9ce010..02198d9 100644 --- a/lagrange/client/client.py +++ b/lagrange/client/client.py @@ -5,13 +5,12 @@ from typing import ( BinaryIO, Callable, - Coroutine, - List, Optional, Union, overload, Literal, ) +from collections.abc import Coroutine from lagrange.info import AppInfo, DeviceInfo, SigInfo from lagrange.pb.message.msg_push import MsgPushBody @@ -176,7 +175,7 @@ async def send_oidb_svc( rsp = OidbResponse.decode( ( await self.send_uni_packet( - "OidbSvcTrpcTcp.0x{:0>2X}_{}".format(cmd, sub_cmd), + f"OidbSvcTrpcTcp.0x{cmd:0>2X}_{sub_cmd}", OidbRequest( cmd=cmd, sub_cmd=sub_cmd, data=bytes(buf), is_uid=is_uid ).encode(), @@ -219,7 +218,7 @@ async def _send_msg_raw(self, pb: dict, *, grp_id=0, uid="") -> SendMsgRsp: packet = await self.send_uni_packet("MessageSvc.PbSendMsg", proto_encode(body)) return SendMsgRsp.decode(packet.data) - async def send_grp_msg(self, msg_chain: List[Element], grp_id: int) -> int: + async def send_grp_msg(self, msg_chain: list[Element], grp_id: int) -> int: result = await self._send_msg_raw( {1: build_message(msg_chain).encode()}, grp_id=grp_id ) @@ -227,7 +226,7 @@ async def send_grp_msg(self, msg_chain: List[Element], grp_id: int) -> int: raise AssertionError(result.ret_code, result.err_msg) return result.seq - async def send_friend_msg(self, msg_chain: List[Element], uid: str) -> int: + async def send_friend_msg(self, msg_chain: list[Element], uid: str) -> int: result = await self._send_msg_raw( {1: build_message(msg_chain).encode()}, uid=uid ) @@ -257,7 +256,7 @@ async def upload_grp_audio(self, voice: BinaryIO, grp_id: int) -> Audio: async def upload_friend_audio(self, voice: BinaryIO, uid: str) -> Audio: return await self._highway.upload_voice(voice, uid=uid) - async def fetch_audio_url(self, file_key: str, uid=None, gid=None): + async def fetch_audio_url(self, file_key: str, gid: int = 0, uid: str = ""): return await self._highway.get_audio_down_url(file_key, uid=uid, gid=gid) async def down_grp_audio(self, audio: Audio, grp_id: int) -> BytesIO: @@ -267,7 +266,7 @@ async def down_friend_audio(self, audio: Audio) -> BytesIO: return await self._highway.download_audio(audio, uid=self.uid) async def fetch_image_url( - self, bus_type: Literal[10, 20], node: "IndexNode", uid=None, gid=None + self, bus_type: Literal[10, 20], node: "IndexNode", gid: int = 0, uid: str = "" ): if bus_type == 10: return await self._get_pri_img_url(uid, node) @@ -327,7 +326,7 @@ async def get_grp_members( async def get_grp_msg( self, grp_id: int, start: int, end: int = 0, filter_deleted_msg=True - ) -> List[GroupMessage]: + ) -> list[GroupMessage]: if not end: end = start payload = GetGrpMsgRsp.decode( @@ -354,9 +353,9 @@ async def get_grp_msg( return [*filter(lambda msg: msg.rand != -1, rsp)] return rsp - async def get_friend_list(self) -> List[BotFriend]: - nextuin_cache: List[GetFriendListUin] = [] - rsp: List[BotFriend] = [] + async def get_friend_list(self) -> list[BotFriend]: + nextuin_cache: list[GetFriendListUin] = [] + rsp: list[BotFriend] = [] frist_send = GetFriendListRsp.decode( (await self.send_oidb_svc(0xFD4, 1, PBGetFriendListRequest().encode())).data ) @@ -416,7 +415,7 @@ async def recall_grp_msg(self, grp_id: int, seq: int): PBGroupRecallRequest.build(grp_id, seq).encode(), ) result = proto_decode(payload.data) - if result[2] != b"Success": + if result.into(2, bytes) != b"Success": raise AssertionError(result) async def rename_grp_name(self, grp_id: int, name: str) -> int: # not test @@ -550,11 +549,11 @@ async def set_grp_request( async def get_user_info(self, uid: str) -> UserInfo: ... @overload - async def get_user_info(self, uid: List[str]) -> List[UserInfo]: ... + async def get_user_info(self, uid: list[str]) -> list[UserInfo]: ... async def get_user_info( - self, uid: Union[str, List[str]] - ) -> Union[UserInfo, List[UserInfo]]: + self, uid: Union[str, list[str]] + ) -> Union[UserInfo, list[UserInfo]]: if isinstance(uid, str): uid = [uid] rsp = GetInfoFromUidRsp.decode( @@ -615,7 +614,7 @@ def _gtk_1(self, skey_or_pskey: str): _hash += (_hash << 5) + ord(skey_or_pskey[i]) return _hash & 2147483647 - async def get_cookies(self, domains: list[str]) -> List[str]: + async def get_cookies(self, domains: list[str]) -> list[str]: """pskey""" return [ i.value.decode() @@ -631,8 +630,14 @@ async def get_cookies(self, domains: list[str]) -> List[str]: ] async def get_skey(self) -> str: - jump = "https%3A%2F%2Fh5.qzone.qq.com%2Fqqnt%2Fqzoneinpcqq%2Ffriend%3Frefresh%3D0%26clientuin%3D0%26darkMode%3D0&keyindex=19&random=2599" - url = f"https://ssl.ptlogin2.qq.com/jump?ptlang=1033&clientuin={self.uin}&clientkey={await self._get_client_key()}&u1={jump}" + jump = ( + "https%3A%2F%2Fh5.qzone.qq.com%2Fqqnt%2Fqzoneinpcqq%2F" + "friend%3Frefresh%3D0%26clientuin%3D0%26darkMode%3D0&keyindex=19&random=2599" + ) + url = ( + f"https://ssl.ptlogin2.qq.com/jump?ptlang=1033&clientuin={self.uin}" + f"&clientkey={await self._get_client_key()}&u1={jump}" + ) resp = await HttpCat.request("GET", url, follow_redirect=False) return resp.cookies["skey"] diff --git a/lagrange/client/event.py b/lagrange/client/event.py index 9595e8e..64175f2 100644 --- a/lagrange/client/event.py +++ b/lagrange/client/event.py @@ -1,5 +1,6 @@ import asyncio -from typing import TYPE_CHECKING, Any, Callable, Awaitable, Dict, Set, Type, TypeVar +from typing import TYPE_CHECKING, Any, Callable, TypeVar +from collections.abc import Awaitable from lagrange.utils.log import log @@ -13,18 +14,18 @@ class Events: def __init__(self): - self._task_group: Set[asyncio.Task] = set() - self._handle_map: Dict[Type["BaseEvent"], EVENT_HANDLER] = {} + self._task_group: set[asyncio.Task] = set() + self._handle_map: dict[type["BaseEvent"], EVENT_HANDLER] = {} - def subscribe(self, event: Type[T], handler: EVENT_HANDLER[T]): + def subscribe(self, event: type[T], handler: EVENT_HANDLER[T]): if event not in self._handle_map: self._handle_map[event] = handler else: raise AssertionError( - "Event already subscribed to {}".format(self._handle_map[event]) + f"Event already subscribed to {self._handle_map[event]}" ) - def unsubscribe(self, event: Type["BaseEvent"]): + def unsubscribe(self, event: type["BaseEvent"]): return self._handle_map.pop(event) async def _task_exec(self, client: "Client", event: "BaseEvent", handler: EVENT_HANDLER): @@ -32,7 +33,7 @@ async def _task_exec(self, client: "Client", event: "BaseEvent", handler: EVENT_ await handler(client, event) except Exception as e: log.root.error( - "Unhandled exception on task {}".format(event), exc_info=e + f"Unhandled exception on task {event}", exc_info=e ) def emit(self, event: "BaseEvent", client: "Client"): diff --git a/lagrange/client/events/friend.py b/lagrange/client/events/friend.py index 9800b11..3b79b6f 100644 --- a/lagrange/client/events/friend.py +++ b/lagrange/client/events/friend.py @@ -1,7 +1,7 @@ from __future__ import annotations from dataclasses import dataclass -from typing import TYPE_CHECKING, List +from typing import TYPE_CHECKING from . import BaseEvent if TYPE_CHECKING: @@ -22,4 +22,4 @@ class FriendMessage(FriendEvent): msg_id: int timestamp: int msg: str - msg_chain: List[Element] + msg_chain: list[Element] diff --git a/lagrange/client/events/group.py b/lagrange/client/events/group.py index 462e0fa..4bdee02 100644 --- a/lagrange/client/events/group.py +++ b/lagrange/client/events/group.py @@ -1,7 +1,7 @@ from __future__ import annotations from dataclasses import dataclass, field -from typing import TYPE_CHECKING, List, Optional, Union, Dict +from typing import TYPE_CHECKING from . import BaseEvent @@ -30,7 +30,7 @@ class GroupMessage(GroupEvent, MessageInfo): sub_id: int = field(repr=False) # client ver identify sender_type: int = field(repr=False) msg: str - msg_chain: List[Element] + msg_chain: list[Element] @property def is_bot(self) -> bool: @@ -48,7 +48,7 @@ class GroupNudge(GroupEvent): target_uin: int action: str suffix: str - attrs: Dict[str, Union[str, int]] = field(repr=False) + attrs: dict[str, str | int] = field(repr=False) attrs_xml: str = field(repr=False) @@ -59,7 +59,7 @@ class GroupSign(GroupEvent): uin: int nickname: str timestamp: int - attrs: Dict[str, Union[str, int]] = field(repr=False) + attrs: dict[str, str | int] = field(repr=False) attrs_xml: str = field(repr=False) @@ -75,8 +75,8 @@ class GroupMuteMember(GroupEvent): @dataclass class GroupMemberJoinRequest(GroupEvent): uid: str - invitor_uid: Optional[str] = None - answer: Optional[str] = None # 问题:(.*)答案:(.*) + invitor_uid: str | None = None + answer: str | None = None # 问题:(.*)答案:(.*) @dataclass @@ -96,7 +96,7 @@ class GroupMemberQuit(GroupEvent): @property def is_kicked(self) -> bool: return self.exit_type in [3, 131] - + @property def is_kicked_self(self) -> bool: return self.exit_type == 3 diff --git a/lagrange/client/highway/encoders.py b/lagrange/client/highway/encoders.py index 204c7ee..6bb30cb 100644 --- a/lagrange/client/highway/encoders.py +++ b/lagrange/client/highway/encoders.py @@ -83,8 +83,8 @@ def encode_upload_img_req( fn = f"{md5.hex().upper()}.{info.name or 'jpg'}" c2c_info = None grp_info = None - c2c_pb = bytes() - grp_pb = bytes() + c2c_pb = b"" + grp_pb = b"" if grp_id: scene_type = 2 grp_info = GroupInfo(grp_id=grp_id) diff --git a/lagrange/client/highway/frame.py b/lagrange/client/highway/frame.py index b97fd7e..5f0e01c 100644 --- a/lagrange/client/highway/frame.py +++ b/lagrange/client/highway/frame.py @@ -1,5 +1,5 @@ import struct -from typing import BinaryIO, Tuple +from typing import BinaryIO from lagrange.pb.highway.head import HighwayTransRespHead @@ -16,7 +16,7 @@ def write_frame(head: bytes, body: bytes) -> bytes: def read_frame( reader: BinaryIO, -) -> Tuple[HighwayTransRespHead, bytes]: +) -> tuple[HighwayTransRespHead, bytes]: head = reader.read(9) if len(head) != 9 and head[0] != 0x28: raise ValueError("Invalid frame head", head) diff --git a/lagrange/client/highway/highway.py b/lagrange/client/highway/highway.py index 0085edc..9c4d28d 100644 --- a/lagrange/client/highway/highway.py +++ b/lagrange/client/highway/highway.py @@ -2,7 +2,7 @@ import time from hashlib import md5 from io import BytesIO -from typing import TYPE_CHECKING, BinaryIO, List, Optional, Tuple, overload, Union +from typing import TYPE_CHECKING, BinaryIO, Optional, Union from lagrange.client.message.elems import Audio, Image from lagrange.pb.highway.comm import IndexNode @@ -37,7 +37,7 @@ def __init__(self, client: "Client"): self._client = client self._session_sig: Optional[bytes] = None self._session_key: Optional[bytes] = None - self._session_addr_list: List[Tuple[str, int]] = [] + self._session_addr_list: list[tuple[str, int]] = [] async def _get_bdh_session(self): rsp = await self._client.send_uni_packet( @@ -76,7 +76,7 @@ async def upload_controller( cmd_id: int, ticket: bytes, ext=None, - addrs: Optional[List[Tuple[str, int]]] = None, + addrs: Optional[list[tuple[str, int]]] = None, bs=65535, ) -> Optional[bytes]: if not addrs: @@ -108,8 +108,8 @@ async def upload_controller( async def _bdh_uploader( self, cmd: str, - addr: Tuple[str, int], - files: List[BinaryIO], + addr: tuple[str, int], + files: list[BinaryIO], cmd_id: int, ticket: bytes, ext: Optional[bytes] = None, @@ -217,15 +217,10 @@ async def upload_image(self, file: BinaryIO, gid=0, uid="") -> Image: ) w, h = info.width, info.height if gid: - fileid: int = proto_decode(ret.upload.compat_qmsg)[7] - url = "https://gchat.qpic.cn/gchatpic_new/{uin}/{gid}-{file_id}-{fmd5}/0?term=2".format( - uin=self._client.uin, - gid=gid, - file_id=fileid, - fmd5=fmd5.hex().upper(), - ) + fileid = proto_decode(ret.upload.compat_qmsg).into(7, int) + url = f"https://gchat.qpic.cn/gchatpic_new/{self._client.uin}/{gid}-{fileid}-{fmd5.hex().upper()}/0?term=2" else: - path = proto_decode(ret.upload.compat_qmsg)[29][30] + path = proto_decode(ret.upload.compat_qmsg).into(29, dict[int, bytes])[30] fileid = 0 url = "https://multimedia.nt.qq.com.cn/" + path.decode() @@ -256,6 +251,7 @@ async def get_grp_img_url(self, grp_id: int, node: "IndexNode") -> str: ).data ) body = ret.download + assert body, "Internal error, check log for more detail" return f"https://{body.info.domain}{body.info.url_path}{body.rkey}" async def get_pri_img_url(self, uid: str, node: IndexNode) -> str: @@ -272,6 +268,7 @@ async def get_pri_img_url(self, uid: str, node: IndexNode) -> str: ).data ) body = ret.download + assert body, "Internal error, check log for more detail" return f"https://{body.info.domain}{body.info.url_path}{body.rkey}" async def upload_voice(self, file: BinaryIO, gid=0, uid="") -> Audio: @@ -321,14 +318,14 @@ async def upload_voice(self, file: BinaryIO, gid=0, uid="") -> Audio: bs=1048576, ) - compat = proto_decode(ret.upload.compat_qmsg, 0)[4] + compat = proto_decode(ret.upload.compat_qmsg, 0).into(4, bytes) + pt = proto_decode(compat, 0) if gid: - pd = proto_decode(compat, 0) - file_id: int = pd[8] - file_key = pd[18] + file_id = pt.into(8, int) + file_key = pt.into(18, bytes) else: file_id = 0 - file_key = proto_decode(compat, 0)[3] + file_key = pt.into(3, bytes) # print(f"https://grouptalk.c2c.qq.com/?ver=0&rkey={compat[18].hex()}&filetype=4%voice_codec=0") return Audio( @@ -343,13 +340,7 @@ async def upload_voice(self, file: BinaryIO, gid=0, uid="") -> Audio: url=await self.get_audio_down_url(file_key.decode(), gid, uid), ) - @overload - async def get_audio_down_url(self, file_key_or_audio: str, gid: int = 0, uid: str = "") -> str: ... - - @overload - async def get_audio_down_url(self, file_key_or_audio: Audio, gid: int = 0, uid: str = "") -> str: ... - - async def get_audio_down_url(self, file_key_or_audio: Union[str, Audio], gid: int = 0, uid="") -> str: + async def get_audio_down_url(self, file_key_or_audio: Union[str, Audio], gid: int = 0, uid: str = "") -> str: if not self._session_addr_list: await self._get_bdh_session() diff --git a/lagrange/client/highway/utils.py b/lagrange/client/highway/utils.py index f4bd171..1da5f89 100644 --- a/lagrange/client/highway/utils.py +++ b/lagrange/client/highway/utils.py @@ -1,9 +1,10 @@ import time from hashlib import md5, sha1 -from typing import Any, Awaitable, BinaryIO, Tuple +from typing import Any, BinaryIO +from collections.abc import Awaitable -def calc_file_hash_and_length(*files: BinaryIO, bs=4096) -> Tuple[bytes, bytes, int]: +def calc_file_hash_and_length(*files: BinaryIO, bs=4096) -> tuple[bytes, bytes, int]: fm, fs, length = md5(), sha1(), 0 for f in files: try: @@ -26,7 +27,7 @@ def itoa(i: int) -> str: # int to address(str) return ".".join([str(p) for p in i.to_bytes(4, "big", signed=signed)]) -async def timeit(func: Awaitable) -> Tuple[float, Any]: +async def timeit(func: Awaitable) -> tuple[float, Any]: start = time.time() result = await func return time.time() - start, result diff --git a/lagrange/client/message/decoder.py b/lagrange/client/message/decoder.py index 4b3a1f2..9dde5e8 100644 --- a/lagrange/client/message/decoder.py +++ b/lagrange/client/message/decoder.py @@ -1,5 +1,6 @@ import zlib -from typing import List, Tuple, Sequence, TYPE_CHECKING, cast, Literal, Union +from typing import TYPE_CHECKING, cast, Literal, Union +from collections.abc import Sequence from lagrange.client.events.group import GroupMessage from lagrange.client.events.friend import FriendMessage @@ -17,7 +18,7 @@ from lagrange.client.client import Client -def parse_msg_info(pb: MsgPushBody) -> Tuple[int, str, int, int, int]: +def parse_msg_info(pb: MsgPushBody) -> tuple[int, str, int, int, int]: user_id = pb.response_head.from_uin uid = pb.response_head.from_uid seq = pb.content_head.seq @@ -27,7 +28,7 @@ def parse_msg_info(pb: MsgPushBody) -> Tuple[int, str, int, int, int]: return user_id, uid, seq, time, rand -def parse_friend_info(pkg: MsgPushBody) -> Tuple[int, str, int, str]: +def parse_friend_info(pkg: MsgPushBody) -> tuple[int, str, int, str]: info = pkg.response_head from_uin = info.from_uin from_uid = info.from_uid @@ -68,8 +69,8 @@ async def parse_msg_new(client: "Client", pkg: MsgPushBody, url=await client.fetch_audio_url(file_key, uid=fri_id, gid=grp_id) ) ] - el: List[Elems] = rich.content - msg_chain: List[Element] = [] + el: list[Elems] = rich.content + msg_chain: list[Element] = [] ignore_next = False for raw in el: if not raw or raw == Elems(): diff --git a/lagrange/client/message/encoder.py b/lagrange/client/message/encoder.py index c7f734b..bdc525e 100644 --- a/lagrange/client/message/encoder.py +++ b/lagrange/client/message/encoder.py @@ -1,6 +1,6 @@ import struct import zlib -from typing import List, Optional +from typing import Optional from lagrange.pb.message.rich_text import Elems, RichText from lagrange.pb.message.rich_text.elems import ( @@ -37,10 +37,10 @@ from .types import Element -def build_message(msg_chain: List[Element], compatible=True) -> RichText: +def build_message(msg_chain: list[Element], compatible=True) -> RichText: if not msg_chain: raise ValueError("Message chain is empty") - msg_pb: List[Elems] = [] + msg_pb: list[Elems] = [] msg_ptt: Optional[Ptt] = None if not isinstance(msg_chain[0], Audio): for msg in msg_chain: diff --git a/lagrange/client/network.py b/lagrange/client/network.py index 1248c75..3560662 100644 --- a/lagrange/client/network.py +++ b/lagrange/client/network.py @@ -5,7 +5,8 @@ import asyncio import ipaddress import sys -from typing import Dict, Callable, Coroutine, Tuple, overload, Optional +from typing import Callable, overload, Optional +from collections.abc import Coroutine from typing_extensions import Literal from lagrange.info import SigInfo @@ -27,7 +28,7 @@ def __init__( disconnect_cb: Callable[[bool], Coroutine], use_v6=False, *, - manual_address: Optional[Tuple[str, int]] = None, + manual_address: Optional[tuple[str, int]] = None, ): if not manual_address: host, port = self.V6UPSTREAM if use_v6 else self.V4UPSTREAM @@ -40,7 +41,7 @@ def __init__( self._push_store = push_store self._reconnect_cb = reconnect_cb self._disconnect_cb = disconnect_cb - self._wait_fut_map: Dict[int, asyncio.Future[SSOPacket]] = {} + self._wait_fut_map: dict[int, asyncio.Future[SSOPacket]] = {} self._connected = False self._sig = sig_info @@ -91,7 +92,7 @@ async def on_connected(self): self._using_v6 = False log.network.info(f"Connected to {host}:{port}") if self._connected and not self._stop_flag: - t = asyncio.create_task(self._reconnect_cb(), name="reconnect_cb") + asyncio.create_task(self._reconnect_cb(), name="reconnect_cb") else: self._connected = True @@ -99,7 +100,7 @@ async def on_close(self): self.conn_event.clear() log.network.warning("Connection closed") self._cancel_all_task() - t = asyncio.create_task(self._disconnect_cb(False), name="disconnect_cb") + asyncio.create_task(self._disconnect_cb(False), name="disconnect_cb") async def on_error(self) -> bool: _, err, _ = sys.exc_info() @@ -113,7 +114,7 @@ async def on_error(self) -> bool: log.network.error(f"Connection got an unexpected error: {repr(err)}") recover = False self._cancel_all_task() - t = asyncio.create_task(self._disconnect_cb(recover), name="disconnect_cb") + asyncio.create_task(self._disconnect_cb(recover), name="disconnect_cb") return recover async def on_message(self, message_length: int): diff --git a/lagrange/client/server_push/binder.py b/lagrange/client/server_push/binder.py index 140bea5..1bed361 100644 --- a/lagrange/client/server_push/binder.py +++ b/lagrange/client/server_push/binder.py @@ -1,4 +1,5 @@ -from typing import Any, Callable, Coroutine, Dict, TYPE_CHECKING +from typing import Any, Callable, TYPE_CHECKING +from collections.abc import Coroutine from lagrange.client.wtlogin.sso import SSOPacket @@ -11,7 +12,7 @@ class PushDeliver: def __init__(self, client: "Client"): self._client = client - self._handle_map: Dict[ + self._handle_map: dict[ str, Callable[["Client", SSOPacket], Coroutine[None, None, Any]] ] = {} diff --git a/lagrange/client/server_push/msg.py b/lagrange/client/server_push/msg.py index d63cc55..8d9bdf7 100644 --- a/lagrange/client/server_push/msg.py +++ b/lagrange/client/server_push/msg.py @@ -1,7 +1,7 @@ import json import re from urllib.parse import parse_qsl -from typing import TYPE_CHECKING, Type, Tuple, TypeVar, Union, Dict +from typing import TYPE_CHECKING, TypeVar, Union from lagrange.client.message.decoder import parse_grp_msg, parse_friend_msg from lagrange.pb.message.msg_push import MsgPush @@ -42,7 +42,7 @@ T = TypeVar("T", bound=ProtoStruct) -def unpack(buf2: bytes, decoder: Type[T]) -> Tuple[int, T]: +def unpack(buf2: bytes, decoder: type[T]) -> tuple[int, T]: reader = Reader(buf2) grp_id = reader.read_u32() reader.read_u8() @@ -54,7 +54,7 @@ async def msg_push_handler(client: "Client", sso: SSOPacket): typ = pkg.content_head.type sub_typ = pkg.content_head.sub_type - logger.debug("msg_push received, type: {}.{}".format(typ, sub_typ)) + logger.debug(f"msg_push received, type: {typ}.{sub_typ}") if typ == 82: # grp msg return await parse_grp_msg(client, pkg) elif typ in [166, 208, 529]: # frd msg @@ -89,13 +89,14 @@ async def msg_push_handler(client: "Client", sso: SSOPacket): grp_id=inn.grp_id, uid=inn.uid, invitor_uid=inn.invitor_uid ) elif typ == 0x210: # friend event / group file upload notice event - logger.debug("unhandled friend event / group file upload notice event: %s" % pkg) # TODO: paste + logger.debug(f"unhandled friend event / group file upload notice event: {pkg}") # TODO: paste elif typ == 0x2DC: # grp event, 732 if sub_typ == 20: # nudge and group_sign(群打卡) if pkg.message: grp_id, pb = unpack(pkg.message.buf2, GroupSub20Head) - attrs: Dict[str, Union[str, int]] = {} - for x in pb.body.attrs: # type: dict[bytes, bytes] + attrs: dict[str, Union[str, int]] = {} + for x in pb.body.attrs: + x: dict[bytes, bytes] k, v = x.values() if isinstance(v, dict): v = proto_encode(v) @@ -201,10 +202,10 @@ async def msg_push_handler(client: "Client", sso: SSOPacket): elif sub_typ == 12: # mute info = proto_decode(pkg.message.buf2) return GroupMuteMember( - grp_id=info[1], - operator_uid=info[4].decode(), - target_uid=unpack_dict(info, "5.3.1", b"").decode(), - duration=unpack_dict(info, "5.3.2"), + grp_id=info.into(1, int), + operator_uid=info.into(4, bytes).decode(), + target_uid=unpack_dict(info.proto, "5.3.1", b"").decode(), + duration=unpack_dict(info.proto, "5.3.2"), ) elif sub_typ == 21: # set/unset essence msg pass # todo diff --git a/lagrange/client/wtlogin/ntlogin.py b/lagrange/client/wtlogin/ntlogin.py index 3f32e76..fca4329 100644 --- a/lagrange/client/wtlogin/ntlogin.py +++ b/lagrange/client/wtlogin/ntlogin.py @@ -48,7 +48,7 @@ def parse_ntlogin_response( response: bytes, sig: SigInfo, captcha: list ) -> LoginErrorCode: frame = proto_decode(response, 0) - rsp = NTLoginRsp.decode(aes_gcm_decrypt(frame[3], sig.exchange_key)) + rsp = NTLoginRsp.decode(aes_gcm_decrypt(frame.into(3, bytes), sig.exchange_key)) if not rsp.head.error and rsp.body and rsp.body.credentials: cr = rsp.body.credentials diff --git a/lagrange/client/wtlogin/oicq.py b/lagrange/client/wtlogin/oicq.py index 3ad2cf8..55da81f 100644 --- a/lagrange/client/wtlogin/oicq.py +++ b/lagrange/client/wtlogin/oicq.py @@ -142,11 +142,11 @@ def decode_login_response(buf: bytes, sig: SigInfo): sig.d2_key = tlv.get(0x305) or sig.d2_key sig.tgtgt = hashlib.md5(sig.d2_key).digest() sig.temp_pwd = tlv[0x106] - sig.uid = proto_decode(tlv[0x543])[9][11][1].decode() # type: ignore + sig.uid = proto_decode(tlv[0x543]).into((9, 11, 1), bytes).decode() sig.info_updated() log.login.debug("SigInfo got") - log.login.info("Login success, username: %s" % tlv[0x11A][5:].decode()) + log.login.info(f"Login success, username: {tlv[0x11A][5:].decode()}") return True elif 0x146 in tlv: diff --git a/lagrange/client/wtlogin/sso.py b/lagrange/client/wtlogin/sso.py index ecb65b4..d96c5c8 100644 --- a/lagrange/client/wtlogin/sso.py +++ b/lagrange/client/wtlogin/sso.py @@ -2,7 +2,6 @@ import zlib from dataclasses import dataclass, field from io import BytesIO -from typing import Tuple from lagrange.utils.binary.reader import Reader from lagrange.utils.crypto.ecdh import ecdh @@ -24,7 +23,7 @@ def parse_lv(buffer: BytesIO): # u32 len only return buffer.read(length - 4) -def parse_sso_header(raw: bytes, d2_key: bytes) -> Tuple[int, str, bytes]: +def parse_sso_header(raw: bytes, d2_key: bytes) -> tuple[int, str, bytes]: buf = BytesIO(raw) # parse sso header buf.read(4) diff --git a/lagrange/info/sig.py b/lagrange/info/sig.py index 5f6245b..82d0d0e 100644 --- a/lagrange/info/sig.py +++ b/lagrange/info/sig.py @@ -31,16 +31,16 @@ def info_updated(self): def new(cls, seq=8830) -> "SigInfo": return cls( sequence=seq, - tgtgt=bytes(), - tgt=bytes(), - d2=bytes(), + tgtgt=b"", + tgt=b"", + d2=b"", d2_key=bytes(16), - qrsig=bytes(), - exchange_key=bytes(), - key_sig=bytes(), + qrsig=b"", + exchange_key=b"", + key_sig=b"", cookies="", - unusual_sig=bytes(), - temp_pwd=bytes(), + unusual_sig=b"", + temp_pwd=b"", uin=0, uid="", nickname="", diff --git a/lagrange/pb/highway/comm.py b/lagrange/pb/highway/comm.py index 5f50caa..e928728 100644 --- a/lagrange/pb/highway/comm.py +++ b/lagrange/pb/highway/comm.py @@ -11,14 +11,14 @@ class CommonHead(ProtoStruct): class PicExtInfo(ProtoStruct): biz_type: Optional[int] = proto_field(1, default=None) summary: Optional[str] = proto_field(2, default=None) - c2c_reserved: bytes = proto_field(11, default=bytes()) - troop_reserved: bytes = proto_field(12, default=bytes()) + c2c_reserved: bytes = proto_field(11, default=b"") + troop_reserved: bytes = proto_field(12, default=b"") class VideoExtInfo(ProtoStruct): from_scene: Optional[int] = proto_field(1, default=None) to_scene: Optional[int] = proto_field(2, default=None) - pb_reserved: bytes = proto_field(3, default=bytes()) + pb_reserved: bytes = proto_field(3, default=b"") class AudioExtInfo(ProtoStruct): @@ -28,9 +28,9 @@ class AudioExtInfo(ProtoStruct): change_voice: Optional[int] = proto_field(4, default=None) waveform: Optional[bytes] = proto_field(5, default=None) audio_convert_text: Optional[int] = proto_field(6, default=None) - bytes_reserved: bytes = proto_field(11, default=bytes()) - pb_reserved: bytes = proto_field(12, default=bytes()) - general_flags: bytes = proto_field(13, default=bytes()) + bytes_reserved: bytes = proto_field(11, default=b"") + pb_reserved: bytes = proto_field(12, default=b"") + general_flags: bytes = proto_field(13, default=b"") class ExtBizInfo(ProtoStruct): diff --git a/lagrange/pb/highway/head.py b/lagrange/pb/highway/head.py index 237376e..5154163 100644 --- a/lagrange/pb/highway/head.py +++ b/lagrange/pb/highway/head.py @@ -12,7 +12,7 @@ class DataHighwayHead(ProtoStruct): app_id: int = proto_field(6) data_flag: int = proto_field(7, default=16) command_id: int = proto_field(8) - build_ver: bytes = proto_field(9, default=bytes()) + build_ver: bytes = proto_field(9, default=b"") class SegHead(ProtoStruct): @@ -21,7 +21,7 @@ class SegHead(ProtoStruct): data_offset: int = proto_field(3) data_length: int = proto_field(4) ret_code: int = proto_field(5, default=0) - ticket: bytes = proto_field(6, default=bytes()) + ticket: bytes = proto_field(6, default=b"") md5: bytes = proto_field(8) file_md5: bytes = proto_field(9) cache_addr: Optional[int] = proto_field(10, default=None) @@ -30,14 +30,14 @@ class SegHead(ProtoStruct): class LoginSigHead(ProtoStruct): login_sig_type: int = proto_field(1) - login_sig: bytes = proto_field(2, default=bytes()) + login_sig: bytes = proto_field(2, default=b"") app_id: int = proto_field(3) class HighwayTransReqHead(ProtoStruct): msg_head: Optional[DataHighwayHead] = proto_field(1, default=None) seg_head: Optional[SegHead] = proto_field(2, default=None) - req_ext_info: bytes = proto_field(3, default=bytes()) + req_ext_info: bytes = proto_field(3, default=b"") timestamp: int = proto_field(4) login_head: Optional[LoginSigHead] = proto_field(5, default=None) @@ -49,7 +49,7 @@ class HighwayTransRespHead(ProtoStruct): allow_retry: int = proto_field(4) cache_cost: Optional[int] = proto_field(5, default=None) ht_cost: Optional[int] = proto_field(6, default=None) - ext_info: bytes = proto_field(7, default=bytes()) + ext_info: bytes = proto_field(7, default=b"") timestamp: Optional[int] = proto_field(8, default=None) range: Optional[int] = proto_field(9, default=None) is_reset: Optional[int] = proto_field(10, default=None) diff --git a/lagrange/pb/highway/httpconn.py b/lagrange/pb/highway/httpconn.py index b334564..b98f319 100644 --- a/lagrange/pb/highway/httpconn.py +++ b/lagrange/pb/highway/httpconn.py @@ -11,7 +11,7 @@ class X501ReqBody(ProtoStruct): field_4: int = proto_field(4, default=1) tgt_hex: str = proto_field(5) field_6: int = proto_field(6, default=3) - field_7: list[int] = proto_field(7, default=[1, 5, 10, 21]) + field_7: list[int] = proto_field(7, default_factory=lambda: [1, 5, 10, 21]) field_9: int = proto_field(9, default=2) field_10: int = proto_field(10, default=9) field_11: int = proto_field(11, default=8) @@ -54,14 +54,14 @@ def ip(self) -> str: class ServerInfo(ProtoStruct): service_type: int = proto_field(1) - v4_addr: list[ServerV4Address] = proto_field(2, default=[]) - v6_addr: list[ServerV6Address] = proto_field(5, default=[]) + v4_addr: list[ServerV4Address] = proto_field(2, default_factory=list) + v6_addr: list[ServerV6Address] = proto_field(5, default_factory=list) class X501RspBody(ProtoStruct): sig_session: bytes = proto_field(1) sig_key: bytes = proto_field(2) - servers: list[ServerInfo] = proto_field(3, default=[]) + servers: list[ServerInfo] = proto_field(3, default_factory=list) class HttpConn0x6ffRsp(ProtoStruct): diff --git a/lagrange/pb/highway/req.py b/lagrange/pb/highway/req.py index 5c4a9c4..5740913 100644 --- a/lagrange/pb/highway/req.py +++ b/lagrange/pb/highway/req.py @@ -29,7 +29,7 @@ class SceneInfo(ProtoStruct): class MultiMediaReqHead(ProtoStruct): common: CommonHead = proto_field(1) scene: SceneInfo = proto_field(2) - meta: ClientMeta = proto_field(3, default=ClientMeta()) + meta: ClientMeta = proto_field(3, default_factory=ClientMeta) class UploadInfo(ProtoStruct): diff --git a/lagrange/pb/highway/rsp.py b/lagrange/pb/highway/rsp.py index df69759..07e79c2 100644 --- a/lagrange/pb/highway/rsp.py +++ b/lagrange/pb/highway/rsp.py @@ -32,9 +32,9 @@ class UploadRsp(ProtoStruct): v6_addrs: list[IPv6] = proto_field(4) msg_seq: int = proto_field(5, default=0) msg_info: MsgInfo = proto_field(6) - ext: list[RichMediaStorageTransInfo] = proto_field(7, default=[]) + ext: list[RichMediaStorageTransInfo] = proto_field(7, default_factory=list) compat_qmsg: bytes = proto_field(8) - sub_file_info: list[SubFileInfo] = proto_field(10, default=[]) + sub_file_info: list[SubFileInfo] = proto_field(10, default_factory=list) class DownloadInfo(ProtoStruct): diff --git a/lagrange/pb/message/msg.py b/lagrange/pb/message/msg.py index 22f0901..25e6d8b 100644 --- a/lagrange/pb/message/msg.py +++ b/lagrange/pb/message/msg.py @@ -7,5 +7,5 @@ class Message(ProtoStruct): body: Optional[RichText] = proto_field(1, default=None) - buf2: bytes = proto_field(2, default=bytes()) - buf3: bytes = proto_field(3, default=bytes()) + buf2: bytes = proto_field(2, default=b"") + buf3: bytes = proto_field(3, default=b"") diff --git a/lagrange/pb/service/friend.py b/lagrange/pb/service/friend.py index 3ee3768..28e0642 100644 --- a/lagrange/pb/service/friend.py +++ b/lagrange/pb/service/friend.py @@ -1,4 +1,4 @@ -from typing import List, Optional +from typing import Optional from lagrange.utils.binary.protobuf import ProtoStruct, proto_field @@ -9,7 +9,7 @@ class FriendProperty(ProtoStruct): class FriendLayer1(ProtoStruct): - properties: List[FriendProperty] = proto_field(2, default=None) + properties: list[FriendProperty] = proto_field(2, default=None) class FriendAdditional(ProtoStruct): @@ -21,11 +21,11 @@ class FriendInfo(ProtoStruct): uid: str = proto_field(1) custom_group: Optional[int] = proto_field(2, default=None) uin: int = proto_field(3) - additional: List[FriendAdditional] = proto_field(10001) + additional: list[FriendAdditional] = proto_field(10001) class GetFriendNumbers(ProtoStruct): - f1: List[int] = proto_field(1) + f1: list[int] = proto_field(1) class GetFriendBody(ProtoStruct): @@ -43,14 +43,14 @@ class PBGetFriendListRequest(ProtoStruct): next_uin: Optional[GetFriendListUin] = proto_field(5, default=None) f6: int = proto_field(6, default=1) f7: int = proto_field(7, default=2147483647) # MaxValue - body: List[GetFriendBody] = proto_field( + body: list[GetFriendBody] = proto_field( 10001, default=[ GetFriendBody(type=1, f2=GetFriendNumbers(f1=[103, 102, 20002, 27394])), GetFriendBody(type=4, f2=GetFriendNumbers(f1=[100, 101, 102])), ], ) - f10002: List[int] = proto_field(10002, default=[13578, 13579, 13573, 13572, 13568]) + f10002: list[int] = proto_field(10002, default_factory=lambda: [13578, 13579, 13573, 13572, 13568]) f10003: int = proto_field(10003, default=4051) """ * GetFriendNumbers里是要拿到的东西 @@ -66,8 +66,8 @@ class GetFriendListRsp(ProtoStruct): display_friend_count: int = proto_field(3) timestamp: int = proto_field(6) self_uin: int = proto_field(7) - friend_list: List[FriendInfo] = proto_field(101) + friend_list: list[FriendInfo] = proto_field(101) -def propertys(properties: List[FriendProperty]): +def propertys(properties: list[FriendProperty]): return {prop.code: prop.value for prop in properties} diff --git a/lagrange/pb/service/group.py b/lagrange/pb/service/group.py index d811f39..4696834 100644 --- a/lagrange/pb/service/group.py +++ b/lagrange/pb/service/group.py @@ -33,7 +33,7 @@ class PBGroupRecallRequest(ProtoStruct): type: int = proto_field(1, default=1) grp_id: int = proto_field(2) field3: RecallRequestF3 = proto_field(3) - field4: dict = proto_field(4, default={1: 0}) + field4: dict = proto_field(4, default_factory=lambda: {1: 0}) @classmethod def build(cls, grp_id: int, seq: int) -> "PBGroupRecallRequest": @@ -81,7 +81,7 @@ class GetGrpMsgRspBody(ProtoStruct): grp_id: int = proto_field(3) start_seq: int = proto_field(4) end_seq: int = proto_field(5) - elems: list[bytes] = proto_field(6, default=[]) + elems: list[bytes] = proto_field(6, default_factory=list) class GetGrpMsgRsp(ProtoStruct): @@ -307,7 +307,7 @@ def build( "500158016001680170017801800101a00101a00601a80601c00601c80601c00c01" ), # 10-16, 20, 100, 101, 104, 105, 200 account=account, - next_key=next_key, + next_key=next_key.encode() if next_key else None, ) @@ -444,7 +444,7 @@ class Oidb88D0Args(ProtoStruct): class GetGrpLastSeqReqBody(ProtoStruct): grp_id: int = proto_field(1) - args: Oidb88D0Args = proto_field(2, default=Oidb88D0Args(seq=0)) + args: Oidb88D0Args = proto_field(2, default_factory=lambda: Oidb88D0Args(seq=0)) class PBGetGrpLastSeq(ProtoStruct): diff --git a/lagrange/pb/status/group.py b/lagrange/pb/status/group.py index 8ad90ed..7885075 100644 --- a/lagrange/pb/status/group.py +++ b/lagrange/pb/status/group.py @@ -22,7 +22,7 @@ class MemberJoinRequest(ProtoStruct): uid: str = proto_field(3) src: int = proto_field(4) request_field: str = proto_field(5, default="") - field_9: bytes = proto_field(9, default=bytes()) + field_9: bytes = proto_field(9, default=b"") class InviteInner(ProtoStruct): @@ -118,7 +118,7 @@ class GroupSub20Body(ProtoStruct): # f2: int = proto_field(2) # 1061 # f3: int = proto_field(3) # 7 # f6: int = proto_field(6) # 1132 - attrs: list[dict] = proto_field(7, default={}) + attrs: list[dict] = proto_field(7, default_factory=list) attrs_xml: str = proto_field(8, default=None) f10: int = proto_field(10) # rand? diff --git a/lagrange/utils/binary/builder.py b/lagrange/utils/binary/builder.py index 6c1884b..859e20c 100644 --- a/lagrange/utils/binary/builder.py +++ b/lagrange/utils/binary/builder.py @@ -1,7 +1,8 @@ import struct from typing import Union -from typing_extensions import Optional, Self, TypeAlias +from typing_extensions import Self, TypeAlias +from typing import Optional from lagrange.utils.crypto.tea import qqtea_encrypt diff --git a/lagrange/utils/binary/protobuf/coder.py b/lagrange/utils/binary/protobuf/coder.py index 81eb808..cb8d128 100644 --- a/lagrange/utils/binary/protobuf/coder.py +++ b/lagrange/utils/binary/protobuf/coder.py @@ -1,20 +1,38 @@ -from typing import Dict, List, Union, TYPE_CHECKING, cast - +from typing import Union, TypeVar, TYPE_CHECKING, cast +from collections.abc import Mapping, Sequence from typing_extensions import Self, TypeAlias from lagrange.utils.binary.builder import Builder from lagrange.utils.binary.reader import Reader -Proto: TypeAlias = Dict[int, "ProtoEncodable"] +Proto: TypeAlias = dict[int, "ProtoEncodable"] LengthDelimited: TypeAlias = Union[str, "Proto", bytes] ProtoEncodable: TypeAlias = Union[ int, float, bool, LengthDelimited, - List["ProtoEncodable"], - Dict[int, "ProtoEncodable"], + Sequence["ProtoEncodable"], + Mapping[int, "ProtoEncodable"], ] +TProtoEncodable = TypeVar("TProtoEncodable", bound="ProtoEncodable") + + +class ProtoDecoded: + def __init__(self, proto: Proto): + self.proto = proto + + def __getitem__(self, item: int) -> "ProtoEncodable": + return self.proto[item] + + def into(self, field: Union[int, tuple[int, ...]], tp: type[TProtoEncodable]) -> TProtoEncodable: + if isinstance(field, int): + return cast(tp, self.proto[field]) + else: + data = self.proto + for f in field: + data = data[f] # type: ignore + return cast(tp, data) class ProtoBuilder(Builder): @@ -105,7 +123,7 @@ def _encode(builder: ProtoBuilder, tag: int, value: ProtoEncodable): raise AssertionError -def proto_decode(data: bytes, max_layer=-1) -> Proto: +def proto_decode(data: bytes, max_layer=-1) -> ProtoDecoded: reader = ProtoReader(data) proto = {} @@ -123,7 +141,7 @@ def proto_decode(data: bytes, max_layer=-1) -> Proto: if max_layer > 0 or max_layer < 0 and len(value) > 1: try: # serialize nested - value = proto_decode(value, max_layer - 1) + value = proto_decode(value, max_layer - 1).proto except Exception: pass elif wire_type == 5: @@ -138,7 +156,7 @@ def proto_decode(data: bytes, max_layer=-1) -> Proto: else: proto[tag] = value - return proto + return ProtoDecoded(proto) def proto_encode(proto: Proto) -> bytes: diff --git a/lagrange/utils/binary/protobuf/models.py b/lagrange/utils/binary/protobuf/models.py index 8bce841..3075a0e 100644 --- a/lagrange/utils/binary/protobuf/models.py +++ b/lagrange/utils/binary/protobuf/models.py @@ -1,31 +1,39 @@ import inspect +from dataclasses import MISSING from types import GenericAlias -from typing import cast, Dict, List, Tuple, Type, TypeVar, Union, Generic, Any, Callable, Mapping, overload -from typing_extensions import Optional, Self, TypeAlias, dataclass_transform, get_origin, get_args +from typing import cast, TypeVar, Union, Any, Callable, overload +from collections.abc import Mapping +from typing_extensions import Self, TypeAlias, dataclass_transform +from typing import Optional from .coder import Proto, proto_decode, proto_encode _ProtoTypes = Union[str, list, dict, bytes, int, float, bool, "ProtoStruct"] -T = TypeVar("T", str, list, dict, bytes, int, float, bool, "ProtoStruct") +T = TypeVar("T", bound=_ProtoTypes) V = TypeVar("V") -NT: TypeAlias = Dict[int, Union[_ProtoTypes, "NT"]] +NT: TypeAlias = dict[int, Union[_ProtoTypes, "NT"]] NoneType = type(None) -class ProtoField(Generic[T]): - def __init__(self, tag: int, default: T): +class ProtoField: + def __init__(self, tag: int, default: Any, default_factory: Any): if tag <= 0: raise ValueError("Tag must be a positive integer") self._tag = tag self._default = default + self._default_factory = default_factory @property def tag(self) -> int: return self._tag - def get_default(self) -> T: - return self._default + def get_default(self) -> Any: + if self._default is not MISSING: + return self._default + elif self._default_factory is not MISSING: + return self._default_factory() + return MISSING @overload # `default` and `default_factory` are optional and mutually exclusive. @@ -69,23 +77,23 @@ def proto_field( def proto_field( tag: int, *, - default: Optional[Any] = ..., - default_factory: Optional[Any] = ..., + default: Optional[Any] = MISSING, + default_factory: Optional[Any] = MISSING, init: bool = True, repr: bool = True, metadata: Optional[Mapping[Any, Any]] = None, kw_only: bool = False, ) -> "Any": - return ProtoField(tag, default) + return ProtoField(tag, default, default_factory) @dataclass_transform(kw_only_default=True, field_specifiers=(proto_field,)) class ProtoStruct: - _anno_map: Dict[str, Tuple[Type[_ProtoTypes], ProtoField[Any]]] + _anno_map: dict[str, tuple[type[_ProtoTypes], ProtoField]] _proto_debug: bool def __init__(self, *args, **kwargs): - undefined_params: List[str] = [] + undefined_params: list[str] = [] args = list(args) for name, (typ, field) in self._anno_map.items(): if args: @@ -93,13 +101,13 @@ def __init__(self, *args, **kwargs): elif name in kwargs: self._set_attr(name, typ, kwargs.pop(name)) else: - if field.get_default() is not ...: + if field.get_default() is not MISSING: self._set_attr(name, typ, field.get_default()) else: undefined_params.append(name) if undefined_params: raise AttributeError( - "Undefined parameters in '{}': {}".format(self, undefined_params) + f"Undefined parameters in '{self}': {undefined_params}" ) def __init_subclass__(cls, **kwargs): @@ -113,22 +121,22 @@ def __repr__(self) -> str: attrs += f"{k}={v}, " return f"{self.__class__.__name__}({attrs[:-2]})" - def _set_attr(self, name: str, data_typ: Type[V], value: V) -> None: + def _set_attr(self, name: str, data_typ: type[V], value: V) -> None: # if get_origin(data_typ) is Union: # data_typ = (typ for typ in get_args(data_typ) if typ is not NoneType) # type: ignore if isinstance(data_typ, GenericAlias): # force ignore pass elif not isinstance(value, data_typ) and value is not None: raise TypeError( - "'{}' is not a instance of type '{}'".format(value, data_typ) + f"'{value}' is not a instance of type '{data_typ}'" ) setattr(self, name, value) @classmethod def _get_annotations( cls, - ) -> Dict[str, Tuple[Type[_ProtoTypes], "ProtoField"]]: # Name: (ReturnType, ProtoField) - annotations: Dict[str, Tuple[Type[_ProtoTypes], "ProtoField"]] = {} + ) -> dict[str, tuple[type[_ProtoTypes], "ProtoField"]]: # Name: (ReturnType, ProtoField) + annotations: dict[str, tuple[type[_ProtoTypes], "ProtoField"]] = {} for obj in reversed(inspect.getmro(cls)): if obj in (ProtoStruct, object): # base object, ignore continue @@ -149,14 +157,14 @@ def _get_annotations( return annotations @classmethod - def _get_field_mapping(cls) -> Dict[int, Tuple[str, Type[_ProtoTypes]]]: # Tag, (Name, Type) - field_mapping: Dict[int, Tuple[str, Type[_ProtoTypes]]] = {} + def _get_field_mapping(cls) -> dict[int, tuple[str, type[_ProtoTypes]]]: # Tag, (Name, Type) + field_mapping: dict[int, tuple[str, type[_ProtoTypes]]] = {} for name, (typ, field) in cls._anno_map.items(): field_mapping[field.tag] = (name, typ) return field_mapping - def _get_stored_mapping(self) -> Dict[str, NT]: - stored_mapping: Dict[str, NT] = {} + def _get_stored_mapping(self) -> dict[str, NT]: + stored_mapping: dict[str, NT] = {} for name, (_, _) in self._anno_map.items(): stored_mapping[name] = getattr(self, name) return stored_mapping @@ -164,7 +172,7 @@ def _get_stored_mapping(self) -> Dict[str, NT]: def _encode(self, v: _ProtoTypes) -> NT: if isinstance(v, ProtoStruct): v = v.encode() - return v + return v # type: ignore def encode(self) -> bytes: pb_dict: NT = {} @@ -180,16 +188,16 @@ def encode(self) -> bytes: return proto_encode(cast(Proto, pb_dict)) @classmethod - def _decode(cls, typ: Type[_ProtoTypes], value): + def _decode(cls, typ: type[_ProtoTypes], value): if issubclass(typ, ProtoStruct): return typ.decode(value) - elif typ == str: + elif typ is str: return value.decode(errors="ignore") - elif typ == dict: - return proto_decode(value) - elif typ == bool: + elif typ is dict: + return proto_decode(value).proto + elif typ is bool: return value == 1 - elif typ == list: + elif typ is list: if not isinstance(value, list): return [value] return value @@ -212,21 +220,21 @@ def _decode(cls, typ: Type[_ProtoTypes], value): def decode(cls, data: bytes) -> Self: if not data: return None # type: ignore - pb_dict: Proto = proto_decode(data, 0) + pb_dict: Proto = proto_decode(data, 0).proto mapping = cls._get_field_mapping() kwargs = {} for tag, (name, typ) in mapping.items(): if tag not in pb_dict: _, field = cls._anno_map[name] - if field.get_default() is not ...: + if field.get_default() is not MISSING: kwargs[name] = field.get_default() continue raise KeyError(f"tag {tag} not found in '{cls.__name__}'") kwargs[name] = cls._decode(typ, pb_dict.pop(tag)) if pb_dict and cls._proto_debug: # unhandled tags - print(f"unhandled tags on '{cls.__name__}': {pb_dict}") + pass return cls(**kwargs) diff --git a/lagrange/utils/binary/reader.py b/lagrange/utils/binary/reader.py index 14e0dc3..eb19d43 100644 --- a/lagrange/utils/binary/reader.py +++ b/lagrange/utils/binary/reader.py @@ -1,5 +1,5 @@ import struct -from typing import Any, Dict, Tuple, Union +from typing import Any, Union from typing_extensions import TypeAlias, Literal @@ -38,7 +38,7 @@ def read_u64(self) -> int: self._pos += 8 return struct.unpack(">Q", v)[0] - def read_struct(self, format: str) -> Tuple[Any, ...]: + def read_struct(self, format: str) -> tuple[Any, ...]: size = struct.calcsize(format) v = self._buffer[self._pos : self._pos + size] self._pos += size @@ -78,7 +78,7 @@ def read_bytes_with_length(self, prefix: LENGTH_PREFIX, with_prefix=True) -> byt def read_string_with_length(self, prefix: LENGTH_PREFIX, with_prefix=True) -> str: return self.read_bytes_with_length(prefix, with_prefix).decode("utf-8") - def read_tlv(self) -> Dict[int, bytes]: + def read_tlv(self) -> dict[int, bytes]: result = {} count = self.read_u16() diff --git a/lagrange/utils/httpcat.py b/lagrange/utils/httpcat.py index 28419ba..441cdaa 100644 --- a/lagrange/utils/httpcat.py +++ b/lagrange/utils/httpcat.py @@ -3,7 +3,7 @@ import json import zlib from dataclasses import dataclass -from typing import Dict, Optional, Tuple, overload, Literal +from typing import Optional, overload, Literal from urllib import parse from .log import log @@ -15,9 +15,9 @@ class HttpResponse: code: int status: str - header: Dict[str, str] + header: dict[str, str] body: bytes - cookies: Dict[str, str] + cookies: dict[str, str] @property def decompressed_body(self) -> bytes: @@ -51,16 +51,16 @@ def __init__( self, host: str, port: int, - headers: Optional[Dict[str, str]] = None, - cookies: Optional[Dict[str, str]] = None, + headers: Optional[dict[str, str]] = None, + cookies: Optional[dict[str, str]] = None, ssl=False, timeout=5, ): self.host = host self.port = port self.ssl = ssl - self.header: Dict[str, str] = headers or {} - self.cookie: Dict[str, str] = cookies or {} + self.header: dict[str, str] = headers or {} + self.cookie: dict[str, str] = cookies or {} self._reader: Optional[asyncio.StreamReader] = None self._writer: Optional[asyncio.StreamWriter] = None self._stop_flag = True @@ -69,7 +69,7 @@ def __init__( @classmethod def _encode_header( - cls, method: str, path: str, header: Dict[str, str], *, protocol="HTTP/1.1" + cls, method: str, path: str, header: dict[str, str], *, protocol="HTTP/1.1" ) -> bytearray: ret = bytearray() ret += f"{method.upper()} {path} {protocol}\r\n".encode() @@ -83,7 +83,7 @@ async def _read_line(reader: asyncio.StreamReader) -> str: return (await reader.readline()).rstrip(b"\r\n").decode() @staticmethod - def _parse_url(url: str) -> Tuple[Tuple[str, int], str, bool]: + def _parse_url(url: str) -> tuple[tuple[str, int], str, bool]: purl = parse.urlparse(url) if purl.scheme not in ("http", "https"): raise ValueError("unsupported scheme:", purl.scheme) @@ -155,9 +155,9 @@ async def _request( writer: asyncio.StreamWriter, method: str, path: str, - header: Optional[Dict[str, str]] = None, + header: Optional[dict[str, str]] = None, body: Optional[bytes] = None, - cookies: Optional[Dict[str, str]] = None, + cookies: Optional[dict[str, str]] = None, wait_rsp: Literal[True] = True, loop: Optional[asyncio.AbstractEventLoop] = None, ) -> HttpResponse: @@ -172,9 +172,9 @@ async def _request( writer: asyncio.StreamWriter, method: str, path: str, - header: Optional[Dict[str, str]] = None, + header: Optional[dict[str, str]] = None, body: Optional[bytes] = None, - cookies: Optional[Dict[str, str]] = None, + cookies: Optional[dict[str, str]] = None, wait_rsp: Literal[False] = False, loop: Optional[asyncio.AbstractEventLoop] = None, ) -> None: @@ -188,9 +188,9 @@ async def _request( writer: asyncio.StreamWriter, method: str, path: str, - header: Optional[Dict[str, str]] = None, + header: Optional[dict[str, str]] = None, body: Optional[bytes] = None, - cookies: Optional[Dict[str, str]] = None, + cookies: Optional[dict[str, str]] = None, wait_rsp: bool = True, loop: Optional[asyncio.AbstractEventLoop] = None, ) -> Optional[HttpResponse]: @@ -224,9 +224,9 @@ async def request( cls, method: str, url: str, - header: Optional[Dict[str, str]] = None, + header: Optional[dict[str, str]] = None, body: Optional[bytes] = None, - cookies: Optional[Dict[str, str]] = None, + cookies: Optional[dict[str, str]] = None, follow_redirect=True, conn_timeout=0, loop: Optional[asyncio.AbstractEventLoop] = None, diff --git a/lagrange/utils/network.py b/lagrange/utils/network.py index 22d4c08..96d789b 100644 --- a/lagrange/utils/network.py +++ b/lagrange/utils/network.py @@ -1,5 +1,4 @@ import asyncio -import socket import traceback from typing import Optional @@ -104,7 +103,7 @@ async def loop(self): try: await self.connect() fail = False - except (ConnectionError, socket.error) as e: + except (OSError, ConnectionError) as e: if fail: _logger.debug(f"connect retry fail: {repr(e)}") else: diff --git a/lagrange/utils/sign.py b/lagrange/utils/sign.py index ff1fe45..8420dfc 100644 --- a/lagrange/utils/sign.py +++ b/lagrange/utils/sign.py @@ -52,7 +52,7 @@ async def get_sign(cmd: str, seq: int, buf: bytes) -> dict: return {} params = {"cmd": cmd, "seq": seq, "src": buf.hex()} - body = json.dumps(params).encode('utf-8') + body = json.dumps(params).encode("utf-8") headers = { "Content-Type": "application/json" } diff --git a/main.py b/main.py index c79aa58..e84ba33 100644 --- a/main.py +++ b/main.py @@ -46,7 +46,7 @@ async def handle_grp_sign(client: "Client", event: "GroupSign"): if uid: break if kk.next_key: - k = kk.next_key + k = kk.next_key.decode() else: raise ValueError(f"cannot find member: {event.uin}") diff --git a/pyproject.toml b/pyproject.toml index bd050a0..906dce9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,3 +45,19 @@ dev = [ [build-system] requires = ["pdm-backend"] build-backend = "pdm.backend" + +[tool.ruff] +line-length = 120 +target-version = "py39" +exclude = ["pdm_build.py"] + +[tool.ruff.lint] +select = ["E", "W", "F", "UP", "C", "T", "Q"] +ignore = ["E402", "F403", "F405", "C901", "UP037"] + +[tool.pyright] +pythonPlatform = "All" +pythonVersion = "3.9" +typeCheckingMode = "basic" +reportShadowedImports = false +disableBytesTypePromotions = true