diff --git a/.github/workflows/python-dev.yml b/.github/workflows/python-dev.yml index afe86cf..fe0672f 100644 --- a/.github/workflows/python-dev.yml +++ b/.github/workflows/python-dev.yml @@ -23,6 +23,7 @@ jobs: uses: actions/setup-python@v2 with: python-version: ${{ matrix.python-version }} + cache: 'pip' - name: Install base deps run: | python -m pip install pyflakes mypy diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index f4a352d..88940a5 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -24,6 +24,7 @@ jobs: uses: actions/setup-python@v2 with: python-version: ${{ matrix.python-version }} + cache: 'pip' - name: Install base deps run: | python -m pip install -r requirements.buildtime.txt # dont pin linter versions @@ -56,4 +57,4 @@ jobs: python -m pip install mariadb - name: linting pass run: | - pylint */ -E -j4 + pylint **/*.py -E -j4 diff --git a/Makefile b/Makefile index 7059eeb..5e60921 100644 --- a/Makefile +++ b/Makefile @@ -4,6 +4,10 @@ GOCMD=go GOversion=2.0.0-DEV.3 compilefolder=./libs/compiled +typecheck: + pyflakes . + mypy . --strict --ignore-missing-imports --warn-unreachable + all: ${compilefolder} ./libs/sfdbcloader.c ${CC} -fPIC -shared -Wall -Wextra -Werror -O3 -o ${compilefolder}/sonnet.${version}.so ./libs/sfdbcloader.c diff --git a/build_tools/autotest.py b/build_tools/autotest.py index 059c56b..b6f2c60 100644 --- a/build_tools/autotest.py +++ b/build_tools/autotest.py @@ -60,10 +60,10 @@ def main() -> None: tests: Dict[str, Union[str, Shell]] = { "pyflakes": "pyflakes .", - "mypy": "mypy . --ignore-missing-imports --strict --warn-unreachable", + "mypy": "mypy . --ignore-missing-imports --strict --warn-unreachable --python-version 3.8", "yapf": "yapf -drp .", - "pylint": Shell("pylint */ -E -j4"), - "pytype": "pytype .", + "pylint": Shell("pylint **/*.py -E -j4"), + #"pytype": "pytype .", } nottest = set(sys.argv[1:]) diff --git a/build_tools/cmds_to_html.py b/build_tools/cmds_to_html.py index 032898c..35fcf55 100644 --- a/build_tools/cmds_to_html.py +++ b/build_tools/cmds_to_html.py @@ -5,6 +5,7 @@ import importlib import os import sys +import inspect from typing import Dict, List, cast @@ -59,7 +60,10 @@ if cmd.permission in ["everyone", "moderator", "administrator", "owner"]: continue elif isinstance(cmd.permission, (tuple, list)): - if isinstance(cmd.permission[0], str) and cmd.permission[1]: + if isinstance(cmd.permission[0], str) and callable(cmd.permission[1]): + spec = inspect.getfullargspec(cmd.permission[1]) + if len(spec.args) > 2: # Support for object instances with self first argument + raise SyntaxError(f"ERROR IN [{cmd.execute.__module__} : {command}] PERMISSION FUNCTION({cmd.permission[1]}) IS NOT VALID (EXPECTED ONE ARGUMENT)") continue raise SyntaxError(f"ERROR IN [{cmd.execute.__module__} : {command}] PERMISSION TYPE({cmd.permission}) IS NOT VALID") diff --git a/build_tools/wordlist_adder.py b/build_tools/wordlist_adder.py index 2c591a4..5c6502e 100644 --- a/build_tools/wordlist_adder.py +++ b/build_tools/wordlist_adder.py @@ -11,7 +11,7 @@ data = input("> ") if data in used: print("Already taken") - elif data and all([(o := ord(i)) >= ord('a') and o <= ord('z') for i in data]): + elif data and all('a' <= i <= 'z' for i in data): used.add(data) else: print("Nil input/contains non a-z") diff --git a/cmds/cmd_automod.py b/cmds/cmd_automod.py index c8b485c..e1fbff5 100644 --- a/cmds/cmd_automod.py +++ b/cmds/cmd_automod.py @@ -29,7 +29,11 @@ from typing import Final # pytype: disable=import-error import lib_constants as constants -re: Any = importlib.import_module(REGEX_VERSION) +# Import re to trick type checker into using re stubs +import re + +# Import into globals hashmap to ignore pyflakes redefinition errors +globals()["re"] = importlib.import_module(REGEX_VERSION) wb_allowedrunes = string.ascii_lowercase + string.digits + "," urlb_allowedrunes = string.ascii_lowercase + string.digits + "-,." @@ -685,4 +689,4 @@ async def add_joinrule(message: discord.Message, args: List[str], client: discor }, } -version_info: str = "1.2.10" +version_info: str = "1.2.11" diff --git a/cmds/cmd_moderation.py b/cmds/cmd_moderation.py index 9619ef9..1949208 100644 --- a/cmds/cmd_moderation.py +++ b/cmds/cmd_moderation.py @@ -3,7 +3,7 @@ import importlib -import discord, time, asyncio, math, io +import discord, time, asyncio, math, io, shlex import lib_db_obfuscator @@ -26,27 +26,41 @@ import lib_sonnetconfig importlib.reload(lib_sonnetconfig) +import lib_sonnetcommands + +importlib.reload(lib_sonnetcommands) +import lib_tparse + +importlib.reload(lib_tparse) from lib_goparsers import MustParseDuration from lib_loaders import generate_infractionid, load_embed_color, embed_colors, datetime_now, datetime_unix from lib_db_obfuscator import db_hlapi -from lib_parsers import grab_files, generate_reply_field, parse_channel_message, parse_user_member, format_duration +from lib_parsers import grab_files, generate_reply_field, parse_channel_message_noexcept, parse_user_member, format_duration from lib_compatibility import user_avatar_url -from lib_sonnetconfig import BOT_NAME +from lib_sonnetconfig import BOT_NAME, REGEX_VERSION +from lib_sonnetcommands import CommandCtx +from lib_tparse import Parser import lib_constants as constants from typing import List, Tuple, Any, Awaitable, Optional, Callable, Union, cast import lib_lexdpyk_h as lexdpyk +# Import re to trick type checker into using re stubs +import re + +# Import into globals hashmap to ignore pyflakes redefinition errors +globals()["re"] = importlib.import_module(REGEX_VERSION) + # Catches error if the bot cannot message the user async def catch_dm_error(user: Union[discord.User, discord.Member], contents: discord.Embed, log_channel: Optional[discord.TextChannel]) -> None: try: await user.send(embed=contents) except (AttributeError, discord.errors.HTTPException): - if log_channel: + if log_channel is not None: try: - await log_channel.send(f"ERROR: {user.mention}:{user.id} Could not DM user", allowed_mentions=discord.AllowedMentions.none()) + asyncio.create_task(log_channel.send(f"ERROR: {user.mention}:{user.id} Could not DM user", allowed_mentions=discord.AllowedMentions.none())) except discord.errors.Forbidden: pass @@ -172,36 +186,28 @@ async def process_infraction( return (member, user, reason, infraction_id, dm_sent) -async def warn_user(message: discord.Message, args: List[str], client: discord.Client, **kwargs: Any) -> Any: - - ramfs: lexdpyk.ram_filesystem = kwargs["ramfs"] - automod: bool = kwargs["automod"] - verbose: bool = kwargs["verbose"] +async def warn_user(message: discord.Message, args: List[str], client: discord.Client, ctx: CommandCtx) -> Any: try: - _, user, reason, _, _ = await process_infraction(message, args, client, "warn", ramfs, automod=automod) + _, user, reason, _, _ = await process_infraction(message, args, client, "warn", ctx.ramfs, automod=ctx.automod) except InfractionGenerationError: return 1 - if verbose and user: + if ctx.verbose and user: await message.channel.send(f"Warned {user.mention} with ID {user.id} for {reason}", allowed_mentions=discord.AllowedMentions.none()) elif not user: await message.channel.send("User does not exist") return 1 -async def note_user(message: discord.Message, args: List[str], client: discord.Client, **kwargs: Any) -> Any: - - ramfs: lexdpyk.ram_filesystem = kwargs["ramfs"] - automod: bool = kwargs["automod"] - verbose: bool = kwargs["verbose"] +async def note_user(message: discord.Message, args: List[str], client: discord.Client, ctx: CommandCtx) -> Any: try: - _, user, reason, _, _ = await process_infraction(message, args, client, "note", ramfs, infraction=False, automod=automod) + _, user, reason, _, _ = await process_infraction(message, args, client, "note", ctx.ramfs, infraction=False, automod=ctx.automod) except InfractionGenerationError: return 1 - if verbose and user: + if ctx.verbose and user: await message.channel.send(f"Put a note on {user.mention} with ID {user.id}: {reason}", allowed_mentions=discord.AllowedMentions.none()) elif not user: await message.channel.send("User does not exist") @@ -228,7 +234,7 @@ async def kick_user(message: discord.Message, args: List[str], client: discord.C await dm_sent # Wait for dm to be sent before kicking await message.guild.kick((member), reason=reason[:512]) except discord.errors.Forbidden: - await message.channel.send("The bot does not have permission to kick this user.") + await message.channel.send(f"{BOT_NAME} does not have permission to kick this user.") return 1 else: await message.channel.send("User is not in this guild") @@ -237,28 +243,40 @@ async def kick_user(message: discord.Message, args: List[str], client: discord.C if verbose: await message.channel.send(f"Kicked {member.mention} with ID {member.id} for {reason}", allowed_mentions=discord.AllowedMentions.none()) -async def ban_user(message: discord.Message, args: List[str], client: discord.Client, **kwargs: Any) -> Any: +async def ban_user(message: discord.Message, args: List[str], client: discord.Client, ctx: CommandCtx) -> Any: if not message.guild: return 1 - ramfs: lexdpyk.ram_filesystem = kwargs["ramfs"] - automod: bool = kwargs["automod"] - verbose: bool = kwargs["verbose"] + if len(args) >= 3 and args[1] in ["-d", "--days"]: + try: + delete_days = int(args[2]) + del args[2] + del args[1] + except ValueError: + delete_days = 0 + else: + delete_days = 0 + + # bounds check (docs say 0 is min and 7 is max) + if delete_days > 7: delete_days = 7 + elif delete_days < 0: delete_days = 0 try: - member, user, reason, _, dm_sent = await process_infraction(message, args, client, "ban", ramfs, automod=automod) + member, user, reason, _, dm_sent = await process_infraction(message, args, client, "ban", ctx.ramfs, automod=ctx.automod) except InfractionGenerationError: return 1 try: if member and dm_sent: await dm_sent # Wait for dm to be sent before banning - await message.guild.ban(user, delete_message_days=0, reason=reason[:512]) + await message.guild.ban(user, delete_message_days=delete_days, reason=reason[:512]) except discord.errors.Forbidden: - await message.channel.send("The bot does not have permission to ban this user.") + await message.channel.send(f"{BOT_NAME} does not have permission to ban this user.") return 1 - if verbose: await message.channel.send(f"Banned {user.mention} with ID {user.id} for {reason}", allowed_mentions=discord.AllowedMentions.none()) + delete_str = f", and deleted {delete_days} day{'s'*(delete_days!=1)} of messages," * bool(delete_days) + + if ctx.verbose: await message.channel.send(f"Banned {user.mention} with ID {user.id}{delete_str} for {reason}", allowed_mentions=discord.AllowedMentions.none()) async def unban_user(message: discord.Message, args: List[str], client: discord.Client, **kwargs: Any) -> Any: @@ -278,7 +296,7 @@ async def unban_user(message: discord.Message, args: List[str], client: discord. try: await message.guild.unban(user, reason=reason[:512]) except discord.errors.Forbidden: - await message.channel.send("The bot does not have permission to unban this user.") + await message.channel.send(f"{BOT_NAME} does not have permission to unban this user.") return 1 except discord.errors.NotFound: await message.channel.send("This user is not banned") @@ -358,7 +376,7 @@ async def mute_user(message: discord.Message, args: List[str], client: discord.C try: await member.add_roles(mute_role) except discord.errors.Forbidden: - await message.channel.send("The bot does not have permission to mute this user.") + await message.channel.send(f"{BOT_NAME} does not have permission to mute this user.") return 1 if verbose and not mutetime: @@ -408,7 +426,7 @@ async def unmute_user(message: discord.Message, args: List[str], client: discord try: await member.remove_roles(mute_role) except discord.errors.Forbidden: - await message.channel.send("The bot does not have permission to unmute this user.") + await message.channel.send(f"{BOT_NAME} does not have permission to unmute this user.") return 1 # Unmute in DB @@ -418,60 +436,74 @@ async def unmute_user(message: discord.Message, args: List[str], client: discord if verbose: await message.channel.send(f"Unmuted {member.mention} with ID {member.id} for {reason}", allowed_mentions=discord.AllowedMentions.none()) -async def search_infractions_by_user(message: discord.Message, args: List[str], client: discord.Client, **kwargs: Any) -> Any: +def get_user_id(s: str) -> int: + return int(s.strip("<@!>")) + + +async def search_infractions_by_user(message: discord.Message, args: List[str], client: discord.Client, ctx: CommandCtx) -> Any: if not message.guild: return 1 tstart = time.monotonic() # Reparse args - args = (" ".join(args)).replace("=", " ").split() + args = shlex.split(" ".join(args)) # Parse flags - selected_chunk: int = 0 - responsible_mod: Optional[int] = None - infraction_type: Optional[str] = None - per_page: int = 20 - user_affected: Optional[int] = None - automod: bool = True - for index, item in enumerate(args): - try: - if item in ["-p", "--page"]: - selected_chunk = int(args[index + 1]) - 1 - elif item in ["-m", "--mod"]: - responsible_mod = int(args[index + 1].strip("<@!>")) - elif item in ["-u", "--user"]: - user_affected = int(args[index + 1].strip("<@!>")) - elif item in ["-i", "--infractioncount"]: - per_page = int(args[index + 1]) - elif item in ["-t", "--type"]: - infraction_type = (args[index + 1]) - elif item == "--no-automod": - automod = False - except (ValueError, IndexError): - await message.channel.send("Invalid flags supplied") - return 1 + parser = Parser("search-infractions") + + selected_chunk_f = parser.add_arg(["-p", "--page"], lambda s: int(s) - 1) + responsible_mod_f = parser.add_arg(["-m", "--mod"], get_user_id) + user_affected_f = parser.add_arg(["-u", "--user"], get_user_id) + per_page_f = parser.add_arg(["-i", "--infractioncount"], int) + infraction_type_f = parser.add_arg(["-t", "--type"], str) + filtering_f = parser.add_arg(["-f", "--filter"], str) + automod_f = lib_tparse.add_true_false_flag(parser, "automod") + + try: + parser.parse(args, stderr=io.StringIO(), exit_on_fail=False, lazy=True) + except lib_tparse.ParseFailureError: + await message.channel.send("Failed to parse flags") + return 1 + + selected_chunk = selected_chunk_f.get(0) + per_page = per_page_f.get(20) + user_affected = user_affected_f.get() # Default to user if no user/mod flags are supplied - if None is responsible_mod is user_affected: + if None is responsible_mod_f.get() is user_affected: try: - user_affected = int(args[0].strip("<@!>")) + user_affected = get_user_id(args[0]) except (IndexError, ValueError): pass - if not 5 <= per_page <= 40: + if not 5 <= per_page <= 40: # pytype: disable=unsupported-operands await message.channel.send("ERROR: Cannot exeed range 5-40 infractions per page") return 1 + refilter: "Optional[re.Pattern[str]]" + + if (f := filtering_f.get()) is not None: + try: + refilter = re.compile(f) + except re.error: + raise lib_sonnetcommands.CommandError("ERROR: Filter regex is invalid") + else: + refilter = None + with db_hlapi(message.guild.id) as db: - if user_affected or responsible_mod: - infractions = cast(List[Tuple[str, str, str, str, str, int]], db.grab_filter_infractions(user=user_affected, moderator=responsible_mod, itype=infraction_type, automod=automod)) + if user_affected or responsible_mod_f.get(): + infractions = db.grab_filter_infractions(user=user_affected, moderator=responsible_mod_f.get(), itype=infraction_type_f.get(), automod=automod_f.get()) + assert isinstance(infractions, list) else: await message.channel.send("Please specify a user or moderator") return 1 + if refilter is not None: + infractions = [i for i in infractions if refilter.findall(i[4])] + # Sort newest first - infractions = sorted(infractions, reverse=True, key=lambda a: a[5]) + infractions.sort(reverse=True, key=lambda a: a[5]) # Return if no infractions, this is not an error as it returned a valid status if not infractions: @@ -484,11 +516,11 @@ async def search_infractions_by_user(message: discord.Message, args: List[str], if selected_chunk == -1: # ik it says page 0 but it does -1 on it up above so the user would have entered 0 await message.channel.send("ERROR: Cannot go to page 0") return 1 - elif selected_chunk < -1: + elif selected_chunk < -1: # pytype: disable=unsupported-operands selected_chunk %= cpagecount selected_chunk += 1 - if not 0 <= selected_chunk < cpagecount: + if not 0 <= selected_chunk < cpagecount: # pytype: disable=unsupported-operands await message.channel.send(f"ERROR: No such page {selected_chunk+1}") return 1 @@ -550,12 +582,10 @@ async def search_infractions_by_user(message: discord.Message, args: List[str], await message.channel.send(f"Page {selected_chunk+1} / {cpagecount} ({len(infractions)} infraction{'s'*(len(infractions)!=1)}) ({tprint:.1f}ms)\n```css\nID, Type, Reason\n{writer.getvalue()}```") -async def get_detailed_infraction(message: discord.Message, args: List[str], client: discord.Client, **kwargs: Any) -> Any: +async def get_detailed_infraction(message: discord.Message, args: List[str], client: discord.Client, ctx: CommandCtx) -> Any: if not message.guild: return 1 - ramfs: lexdpyk.ram_filesystem = kwargs["ramfs"] - if args: with db_hlapi(message.guild.id) as db: infraction = db.grab_infraction(args[0]) @@ -570,7 +600,7 @@ async def get_detailed_infraction(message: discord.Message, args: List[str], cli # pylint: disable=E0633 infraction_id, user_id, moderator_id, infraction_type, reason, timestamp = infraction - infraction_embed = discord.Embed(title="Infraction Search", description=f"Infraction for <@{user_id}>:", color=load_embed_color(message.guild, embed_colors.primary, ramfs)) + infraction_embed = discord.Embed(title="Infraction Search", description=f"Infraction for <@{user_id}>:", color=load_embed_color(message.guild, embed_colors.primary, ctx.ramfs)) infraction_embed.add_field(name="Infraction ID", value=infraction_id) infraction_embed.add_field(name="Moderator", value=f"<@{moderator_id}>") infraction_embed.add_field(name="Type", value=infraction_type) @@ -586,13 +616,10 @@ async def get_detailed_infraction(message: discord.Message, args: List[str], cli return 1 -async def delete_infraction(message: discord.Message, args: List[str], client: discord.Client, **kwargs: Any) -> Any: +async def delete_infraction(message: discord.Message, args: List[str], client: discord.Client, ctx: CommandCtx) -> Any: if not message.guild: return 1 - verbose: bool = kwargs["verbose"] - ramfs: lexdpyk.ram_filesystem = kwargs["ramfs"] - if args: with db_hlapi(message.guild.id) as db: infraction = db.grab_infraction(args[0]) @@ -605,13 +632,13 @@ async def delete_infraction(message: discord.Message, args: List[str], client: d await message.channel.send("ERROR: No argument supplied") return 1 - if not verbose: + if not ctx.verbose: return # pylint: disable=E0633 infraction_id, user_id, moderator_id, infraction_type, reason, timestamp = infraction - infraction_embed = discord.Embed(title="Infraction Deleted", description=f"Infraction for <@{user_id}>:", color=load_embed_color(message.guild, embed_colors.deletion, ramfs)) + infraction_embed = discord.Embed(title="Infraction Deleted", description=f"Infraction for <@{user_id}>:", color=load_embed_color(message.guild, embed_colors.deletion, ctx.ramfs)) infraction_embed.add_field(name="Infraction ID", value=infraction_id) infraction_embed.add_field(name="Moderator", value=f"<@{moderator_id}>") infraction_embed.add_field(name="Type", value=infraction_type) @@ -628,39 +655,43 @@ async def delete_infraction(message: discord.Message, args: List[str], client: d return 1 -async def grab_guild_message(message: discord.Message, args: List[str], client: discord.Client, **kwargs: Any) -> Any: +async def grab_guild_message(message: discord.Message, args: List[str], client: discord.Client, ctx: CommandCtx) -> Any: if not message.guild: return 1 - ramfs: lexdpyk.ram_filesystem = kwargs["ramfs"] - kernel_ramfs: lexdpyk.ram_filesystem = kwargs["kernel_ramfs"] - - try: - discord_message, _ = await parse_channel_message(message, args, client) - except lib_parsers.errors.message_parse_failure: - return 1 + discord_message, nargs = await parse_channel_message_noexcept(message, args, client) if not discord_message.guild: await message.channel.send("ERROR: Message not in any guild") return 1 + sendraw = False + for arg in args[nargs:]: + if arg in ["-r", "--raw"]: + sendraw = True + break + # Generate replies message_content = generate_reply_field(discord_message) # Message has been grabbed, start generating embed - message_embed = discord.Embed(title=f"Message in #{discord_message.channel}", description=message_content, color=load_embed_color(message.guild, embed_colors.primary, ramfs)) + message_embed = discord.Embed(title=f"Message in #{discord_message.channel}", description=message_content, color=load_embed_color(message.guild, embed_colors.primary, ctx.ramfs)) message_embed.set_author(name=str(discord_message.author), icon_url=user_avatar_url(discord_message.author)) message_embed.timestamp = discord_message.created_at # Grab files from cache - fileobjs = grab_files(discord_message.guild.id, discord_message.id, kernel_ramfs) + fileobjs = grab_files(discord_message.guild.id, discord_message.id, ctx.kernel_ramfs) # Grab files async if not in cache if not fileobjs: awaitobjs = [asyncio.create_task(i.to_file()) for i in discord_message.attachments] fileobjs = [await i for i in awaitobjs] + if sendraw: + file_content = io.BytesIO(discord_message.content.encode("utf8")) + fileobjs.append(discord.File(file_content, filename=f"{discord_message.id}.at.{int(datetime_now().timestamp())}.txt")) + try: await message.channel.send(embed=message_embed, files=fileobjs) except discord.errors.HTTPException: @@ -816,8 +847,8 @@ async def remove_mutedb(message: discord.Message, args: List[str], client: disco 'execute': kick_user }, 'ban': { - 'pretty_name': 'ban [reason]', - 'description': 'Ban a user', + 'pretty_name': 'ban [-d DAYS] [reason]', + 'description': 'Ban a user, optionally delete messages with -d', 'permission': 'moderator', 'execute': ban_user }, @@ -850,8 +881,8 @@ async def remove_mutedb(message: discord.Message, args: List[str], client: disco }, 'search-infractions': { - 'pretty_name': 'search-infractions <-u USER | -m MOD> [-t TYPE] [-p PAGE] [-i INF PER PAGE] [--no-automod]', - 'description': 'Grab infractions of a user', + 'pretty_name': 'search-infractions <-u USER | -m MOD> [-t TYPE] [-p PAGE] [-i INF PER PAGE] [--[no-]automod] [-f FILTER]', + 'description': 'Grab infractions of a user, -f uses regex', 'rich_description': 'Supports negative indexing in pager, flags are unix like', 'permission': 'moderator', 'execute': search_infractions_by_user @@ -883,12 +914,13 @@ async def remove_mutedb(message: discord.Message, args: List[str], client: disco 'get-message': { 'alias': 'grab-message' }, - 'grab-message': { - 'pretty_name': 'grab-message ', - 'description': 'Grab a message and show its contents', - 'permission': 'moderator', - 'execute': grab_guild_message - }, + 'grab-message': + { + 'pretty_name': 'grab-message [-r]', + 'description': 'Grab a message and show its contents, specify -r to get message content as a file', + 'permission': 'moderator', + 'execute': grab_guild_message + }, 'purge': { 'pretty_name': 'purge [user]', @@ -899,4 +931,4 @@ async def remove_mutedb(message: discord.Message, args: List[str], client: disco } } -version_info: str = "1.2.10" +version_info: str = "1.2.11" diff --git a/cmds/cmd_scripting.py b/cmds/cmd_scripting.py index 76c9fbe..c13a813 100644 --- a/cmds/cmd_scripting.py +++ b/cmds/cmd_scripting.py @@ -19,7 +19,7 @@ importlib.reload(lib_sonnetcommands) from lib_parsers import parse_permissions -from lib_sonnetcommands import SonnetCommand +from lib_sonnetcommands import SonnetCommand, CommandCtx from typing import List, Any, Tuple, Awaitable, Dict import lib_lexdpyk_h as lexdpyk @@ -53,15 +53,15 @@ def do_cache_sweep(cache: str, ramfs: lexdpyk.ram_filesystem, guild: discord.Gui pass -async def sonnet_sh(message: discord.Message, args: List[str], client: discord.Client, **kwargs: Any) -> Any: +async def sonnet_sh(message: discord.Message, args: List[str], client: discord.Client, ctx: CommandCtx) -> Any: if not message.guild: return 1 tstart: int = time.monotonic_ns() arguments: List[str] = message.content.split("\n") - verbose: bool = kwargs["verbose"] - cmds_dict: lexdpyk.cmd_modules_dict = kwargs["cmds_dict"] + verbose = ctx.verbose + cmds_dict = ctx.cmds_dict try: shellargs = shlex.split(arguments[0]) @@ -69,7 +69,7 @@ async def sonnet_sh(message: discord.Message, args: List[str], client: discord.C await message.channel.send("ERROR: shlex parser could not parse args") return 1 - self_name: str = shellargs[0][len(kwargs["conf_cache"]["prefix"]):] + self_name: str = shellargs[0][len(ctx.conf_cache["prefix"]):] if verbose == False: await message.channel.send(f"ERROR: {self_name}: detected anomalous command execution") @@ -112,11 +112,14 @@ async def sonnet_sh(message: discord.Message, args: List[str], client: discord.C cache_args: List[str] = [] + newctx = pycopy.copy(ctx) + newctx.verbose = False + for totalcommand in commandsparse: command = totalcommand[0] arguments = totalcommand[1] - message.content = f'{kwargs["conf_cache"]["prefix"]}{totalcommand[0]} ' + " ".join(totalcommand[1]) + message.content = f'{ctx.conf_cache["prefix"]}{totalcommand[0]} ' + " ".join(totalcommand[1]) if command in cmds_dict: if "alias" in cmds_dict[command]: @@ -124,28 +127,15 @@ async def sonnet_sh(message: discord.Message, args: List[str], client: discord.C cmd = SonnetCommand(cmds_dict[command]) - permission = await parse_permissions(message, kwargs["conf_cache"], cmd['permission']) + permission = await parse_permissions(message, ctx.conf_cache, cmd['permission']) if permission: - suc = ( - await cmd['execute']( - message, - arguments, - client, - stats=kwargs["stats"], - cmds=kwargs["cmds"], - ramfs=kwargs["ramfs"], - bot_start=kwargs["bot_start"], - dlibs=kwargs["dlibs"], - main_version=kwargs["main_version"], - kernel_ramfs=kwargs["kernel_ramfs"], - conf_cache=kwargs["conf_cache"], - automod=kwargs["automod"], - cmds_dict=cmds_dict, - verbose=False, - ) - ) or 0 + try: + suc = (await cmd.execute_ctx(message, arguments, client, newctx)) or 0 + except lib_sonnetcommands.CommandError as ce: + await message.channel.send(ce) + suc = 1 # Stop processing if error if suc != 0: @@ -157,7 +147,7 @@ async def sonnet_sh(message: discord.Message, args: List[str], client: discord.C else: return 1 - ramfs: lexdpyk.ram_filesystem = kwargs["ramfs"] + ramfs: lexdpyk.ram_filesystem = ctx.ramfs for i in cache_args: do_cache_sweep(i, ramfs, message.guild) @@ -221,15 +211,15 @@ async def map_preprocessor(message: discord.Message, args: List[str], client: di return targs, targlen, cmd, command, endlargs -async def sonnet_map(message: discord.Message, args: List[str], client: discord.Client, **kwargs: Any) -> Any: +async def sonnet_map(message: discord.Message, args: List[str], client: discord.Client, ctx: CommandCtx) -> Any: if not message.guild: return 1 tstart: int = time.monotonic_ns() - cmds_dict: lexdpyk.cmd_modules_dict = kwargs["cmds_dict"] + cmds_dict = ctx.cmds_dict try: - targs, targlen, cmd, command, endlargs = await map_preprocessor(message, args, client, cmds_dict, kwargs["conf_cache"]) + targs, targlen, cmd, command, endlargs = await map_preprocessor(message, args, client, cmds_dict, ctx.conf_cache) except MapProcessError: return 1 @@ -237,99 +227,84 @@ async def sonnet_map(message: discord.Message, args: List[str], client: discord. keepref = message.content try: + newctx = pycopy.copy(ctx) + newctx.verbose = False + for i in targs[targlen:]: - message.content = f'{kwargs["conf_cache"]["prefix"]}{command} {i} {" ".join(endlargs)}' - - suc = ( - await cmd['execute']( - message, - i.split() + endlargs, - client, - stats=kwargs["stats"], - cmds=kwargs["cmds"], - ramfs=kwargs["ramfs"], - bot_start=kwargs["bot_start"], - dlibs=kwargs["dlibs"], - main_version=kwargs["main_version"], - kernel_ramfs=kwargs["kernel_ramfs"], - conf_cache=kwargs["conf_cache"], - automod=kwargs["automod"], - cmds_dict=cmds_dict, - verbose=False, - ) - ) or 0 + message.content = f'{ctx.conf_cache["prefix"]}{command} {i} {" ".join(endlargs)}' + + try: + suc = (await cmd.execute_ctx(message, i.split() + endlargs, client, newctx)) or 0 + except lib_sonnetcommands.CommandError as ce: + await message.channel.send(ce) + suc = 1 if suc != 0: await message.channel.send(f"ERROR: command `{command}` exited with non success status") return 1 # Do cache sweep on command - do_cache_sweep(cmd['cache'], kwargs["ramfs"], message.guild) + do_cache_sweep(cmd.cache, ctx.ramfs, message.guild) tend: int = time.monotonic_ns() fmttime: int = (tend - tstart) // 1000 // 1000 - if kwargs["verbose"]: await message.channel.send(f"Completed execution of {len(targs[targlen:])} instances of {command} in {fmttime}ms") + if ctx.verbose: await message.channel.send(f"Completed execution of {len(targs[targlen:])} instances of {command} in {fmttime}ms") finally: message.content = keepref -async def sonnet_async_map(message: discord.Message, args: List[str], client: discord.Client, **kwargs: Any) -> Any: +async def wrapasyncerror(cmd: SonnetCommand, message: discord.Message, args: List[str], client: discord.Client, ctx: CommandCtx) -> None: + try: + await cmd.execute_ctx(message, args, client, ctx) + except lib_sonnetcommands.CommandError as ce: # catch CommandError to print message + try: + await message.channel.send(ce) + except discord.errors.Forbidden: + pass + + +async def sonnet_async_map(message: discord.Message, args: List[str], client: discord.Client, ctx: CommandCtx) -> Any: if not message.guild: return 1 tstart: int = time.monotonic_ns() - cmds_dict: lexdpyk.cmd_modules_dict = kwargs["cmds_dict"] + cmds_dict: lexdpyk.cmd_modules_dict = ctx.cmds_dict try: - targs, targlen, cmd, command, endlargs = await map_preprocessor(message, args, client, cmds_dict, kwargs["conf_cache"]) + targs, targlen, cmd, command, endlargs = await map_preprocessor(message, args, client, cmds_dict, ctx.conf_cache) except MapProcessError: return 1 promises: List[Awaitable[Any]] = [] + newctx = pycopy.copy(ctx) + newctx.verbose = False + for i in targs[targlen:]: # We need to copy the message object to avoid race conditions since all the commands run at once # All attrs are readonly except for content which we modify the pointer to, so avoiding a deepcopy is possible newmsg: discord.Message = pycopy.copy(message) - newmsg.content = f'{kwargs["conf_cache"]["prefix"]}{command} {i} {" ".join(endlargs)}' - - promises.append( - asyncio.create_task( - cmd['execute']( - newmsg, - i.split() + endlargs, - client, - stats=kwargs["stats"], - cmds=kwargs["cmds"], - ramfs=kwargs["ramfs"], - bot_start=kwargs["bot_start"], - dlibs=kwargs["dlibs"], - main_version=kwargs["main_version"], - kernel_ramfs=kwargs["kernel_ramfs"], - conf_cache=kwargs["conf_cache"], - automod=kwargs["automod"], - cmds_dict=cmds_dict, - verbose=False, - ) - ) - ) + newmsg.content = f'{ctx.conf_cache["prefix"]}{command} {i} {" ".join(endlargs)}' + + # Call error handler over command to allow catching CommandError in async + promises.append(asyncio.create_task(wrapasyncerror(cmd, newmsg, i.split() + endlargs, client, newctx))) for p in promises: await p # Do a cache sweep after running - do_cache_sweep(cmd['cache'], kwargs["ramfs"], message.guild) + do_cache_sweep(cmd['cache'], ctx.ramfs, message.guild) tend: int = time.monotonic_ns() fmttime: int = (tend - tstart) // 1000 // 1000 - if kwargs["verbose"]: await message.channel.send(f"Completed execution of {len(targs[targlen:])} instances of {command} in {fmttime}ms") + if ctx.verbose: await message.channel.send(f"Completed execution of {len(targs[targlen:])} instances of {command} in {fmttime}ms") category_info = {'name': 'scripting', 'pretty_name': 'Scripting', 'description': 'Scripting tools for all your shell like needs'} @@ -370,4 +345,4 @@ async def sonnet_async_map(message: discord.Message, args: List[str], client: di } } -version_info: str = "1.2.10" +version_info: str = "1.2.11" diff --git a/cmds/cmd_utils.py b/cmds/cmd_utils.py index 616cf87..b1c3796 100644 --- a/cmds/cmd_utils.py +++ b/cmds/cmd_utils.py @@ -1,11 +1,14 @@ # Utility Commands # Funey, 2020 -# Predefined dictionaries. - import importlib -import discord, time, asyncio, random +import discord + +import asyncio +import random +import time +import io from datetime import datetime import lib_db_obfuscator @@ -32,18 +35,22 @@ import lib_sonnetconfig importlib.reload(lib_sonnetconfig) +import lib_tparse + +importlib.reload(lib_tparse) +from lib_compatibility import discord_datetime_now, user_avatar_url from lib_db_obfuscator import db_hlapi -from lib_parsers import parse_permissions, parse_boolean, parse_user_member -from lib_loaders import load_embed_color, embed_colors, datetime_now -from lib_compatibility import user_avatar_url, discord_datetime_now -from lib_sonnetcommands import SonnetCommand +from lib_loaders import datetime_now, embed_colors, load_embed_color +from lib_parsers import (parse_boolean, parse_permissions, parse_user_member_noexcept) +from lib_sonnetcommands import CallCtx, CommandCtx, SonnetCommand from lib_sonnetconfig import BOT_NAME +from lib_tparse import Parser import lib_constants as constants - -from typing import List, Any, Optional, cast import lib_lexdpyk_h as lexdpyk +from typing import Any, List, Optional, Tuple, cast + def add_timestamp(embed: discord.Embed, name: str, start: int, end: int) -> None: embed.add_field(name=name, value=f"{(end - start) / 100}ms", inline=False) @@ -88,10 +95,7 @@ async def profile_function(message: discord.Message, args: List[str], client: di if not message.guild: return 1 - try: - user, member = await parse_user_member(message, args, client, default_self=True) - except lib_parsers.errors.user_parse_error: - return 1 + user, member = await parse_user_member_noexcept(message, args, client, default_self=True) # Status hashmap status_map = {"online": "🟢 (online)", "offline": "⚫ (offline)", "idle": "🟡 (idle)", "dnd": "🔴 (dnd)", "do_not_disturb": "🔴 (dnd)", "invisible": "⚫ (offline)"} @@ -118,18 +122,14 @@ async def profile_function(message: discord.Message, args: List[str], client: di try: await message.channel.send(embed=embed) except discord.errors.Forbidden: - await message.channel.send(constants.sonnet.error_embed) - return 1 + raise lib_sonnetcommands.CommandError(constants.sonnet.error_embed) async def avatar_function(message: discord.Message, args: List[str], client: discord.Client, **kwargs: Any) -> Any: if not message.guild: return 1 - try: - user, _ = await parse_user_member(message, args, client, default_self=True) - except lib_parsers.errors.user_parse_error: - return 1 + user, _ = await parse_user_member_noexcept(message, args, client, default_self=True) embed = discord.Embed(description=f"{user.mention}'s Avatar", color=load_embed_color(message.guild, embed_colors.primary, kwargs["ramfs"])) embed.set_image(url=user_avatar_url(user)) @@ -137,103 +137,165 @@ async def avatar_function(message: discord.Message, args: List[str], client: dis try: await message.channel.send(embed=embed) except discord.errors.Forbidden: - await message.channel.send(constants.sonnet.error_embed) - return 1 + raise lib_sonnetcommands.CommandError(constants.sonnet.error_embed) + + +class HelpHelper: + __slots__ = "guild", "args", "client", "ctx", "prefix", "helpname", "message" + + def __init__(self, message: discord.Message, guild: discord.Guild, args: List[str], client: discord.Client, ctx: CommandCtx, prefix: str, helpname: str): + self.message = message + self.guild = guild + self.args = args + self.client = client + self.ctx = ctx + self.prefix = prefix + self.helpname = helpname + + # Builds a single command + async def single_command(self, cmd_name: str) -> discord.Embed: + + cmds_dict = self.ctx.cmds_dict + + if "alias" in cmds_dict[cmd_name]: + cmd_name = cmds_dict[cmd_name]["alias"] + + command = SonnetCommand(cmds_dict[cmd_name]) + + cmd_embed = discord.Embed(title=f'Command "{cmd_name}"', description=command.description, color=load_embed_color(self.guild, embed_colors.primary, self.ctx.ramfs)) + cmd_embed.set_author(name=self.helpname) + + cmd_embed.add_field(name="Usage:", value=self.prefix + command.pretty_name, inline=False) + + if "rich_description" in command: + cmd_embed.add_field(name="Detailed information:", value=command.rich_description, inline=False) + + if isinstance(command.permission, str): + perms = command.permission + elif isinstance(command["permission"], (tuple, list)): + perms = command.permission[0] + else: + perms = "NULL" + + hasperm = await parse_permissions(self.message, self.ctx.conf_cache, command.permission, verbose=False) + permstr = f" (You {'do not '*(not hasperm)}have this perm)" + + cmd_embed.add_field(name="Permission level:", value=perms + permstr) + + aliases = ", ".join(filter(lambda c: "alias" in cmds_dict[c] and cmds_dict[c]["alias"] == cmd_name, cmds_dict)) + if aliases: + cmd_embed.add_field(name="Aliases:", value=aliases, inline=False) + + return cmd_embed + + # Builds help for a category + async def category_help(self, category_name: str) -> Tuple[str, List[Tuple[str, str]], lexdpyk.cmd_module]: + + curmod = next(mod for mod in self.ctx.cmds if mod.category_info["name"] == category_name) + nonAliasCommands = list(filter(lambda c: "alias" not in curmod.commands[c], curmod.commands)) + description = curmod.category_info["description"] + override_commands: Optional[List[Tuple[str, str]]] = None + + if (override := getattr(curmod, "__help_override__", None)) is not None: + newhelp: Optional[Tuple[str, List[Tuple[str, str]]]] = await CallCtx(override)(self.message, self.args, self.client, self.ctx) + + if newhelp is not None: + description = newhelp[0] + override_commands = newhelp[1] + + if override_commands is None: + normal_commands: List[Tuple[str, str]] = [] + for i in sorted(nonAliasCommands): + normal_commands.append((self.prefix + curmod.commands[i]['pretty_name'], curmod.commands[i]['description'])) + return description, normal_commands, curmod + else: + return description, override_commands, curmod + async def full_help(self, page: int, per_page: int) -> discord.Embed: -async def help_function(message: discord.Message, args: List[str], client: discord.Client, **kwargs: Any) -> Any: + cmds = self.ctx.cmds + cmds_dict = self.ctx.cmds_dict + + if page < 0 or page >= (len(self.ctx.cmds) + (per_page - 1)) // per_page: + raise lib_sonnetcommands.CommandError(f"ERROR: No such page {page+1}") + + cmd_embed = discord.Embed(title=f"Category Listing (Page {page+1} / {(len(cmds) + (per_page-1))//per_page})", color=load_embed_color(self.guild, embed_colors.primary, self.ctx.ramfs)) + cmd_embed.set_author(name=self.helpname) + + total = 0 + # Total counting is seperate due to pagination not counting all modules + for cmd in cmds_dict: + if 'alias' not in cmds_dict[cmd]: + total += 1 + + for module in sorted(cmds, key=lambda m: m.category_info['pretty_name'])[(page * per_page):(page * per_page) + per_page]: + mnames = [f"`{i}`" for i in module.commands if 'alias' not in module.commands[i]] + + helptext = ', '.join(mnames) if mnames else module.category_info['description'] + cmd_embed.add_field(name=f"{module.category_info['pretty_name']} ({module.category_info['name']})", value=helptext, inline=False) + + cmd_embed.set_footer(text=f"Total Commands: {total} | Total Endpoints: {len(cmds_dict)}") + + return cmd_embed + + +async def help_function(message: discord.Message, args: List[str], client: discord.Client, ctx: CommandCtx) -> Any: if not message.guild: return 1 helpname: str = f"{BOT_NAME} Help" + per_page: int = 10 - cmds: List[lexdpyk.cmd_module] = kwargs["cmds"] - cmds_dict: lexdpyk.cmd_modules_dict = kwargs["cmds_dict"] + cmds = ctx.cmds + cmds_dict = ctx.cmds_dict - page: int = 0 - per_page: int = 10 + parser = Parser("help") + pageP = parser.add_arg(["-p", "--page"], lambda s: int(s) - 1) + commandonlyP = parser.add_arg("-c", lib_tparse.store_true, flag=True) - # TODO(ultrabear): make this look less horrible, it works at least - if len(args) > 1: - try: - if args[0] in ["-p", "--page"]: - page = int(args[1]) - 1 - args = args[2:] - elif len(args) > 2 and args[1] in ["-p", "--page"]: - page = int(args[2]) - 1 - except ValueError: - await message.channel.send("ERROR: Page not valid int") - return 1 + try: + parser.parse(args, stderr=io.StringIO(), exit_on_fail=False, lazy=True, consume=True) + except lib_tparse.ParseFailureError: + raise lib_sonnetcommands.CommandError("Could not parse pagecount") + + page = pageP.get(0) + commandonly = commandonlyP.get() is True + + prefix = ctx.conf_cache["prefix"] + help_helper = HelpHelper(message, message.guild, args, client, ctx, prefix, helpname) if args: modules = {mod.category_info["name"] for mod in cmds} - PREFIX = kwargs["conf_cache"]["prefix"] # Per module help - if (a := args[0].lower()) in modules: + if (a := args[0].lower()) in modules and not commandonly: - curmod = [mod for mod in cmds if mod.category_info["name"] == a][0] - nonAliasCommands = list(filter(lambda c: "alias" not in curmod.commands[c], curmod.commands)) - pagecount = (len(nonAliasCommands) + (per_page - 1)) // per_page + description, commands, curmod = await help_helper.category_help(a) + pagecount = (len(commands) + (per_page - 1)) // per_page cmd_embed = discord.Embed( - title=f"{curmod.category_info['pretty_name']} (Page {page+1} / {pagecount})", - description=curmod.category_info["description"], - color=load_embed_color(message.guild, embed_colors.primary, kwargs["ramfs"]) + title=f"{curmod.category_info['pretty_name']} (Page {page+1} / {pagecount})", description=description, color=load_embed_color(message.guild, embed_colors.primary, ctx.ramfs) ) cmd_embed.set_author(name=helpname) - if page < 0 or page >= pagecount: - if page == 0: - await message.channel.send(embed=cmd_embed) - return 0 - await message.channel.send(f"ERROR: No such page {page+1}") - return 1 + if not (0 <= page < pagecount): # pytype: disable=unsupported-operands + raise lib_sonnetcommands.CommandError(f"ERROR: No such page {page+1}") - for i in sorted(nonAliasCommands)[page * per_page:(page * per_page) + per_page]: - cmd_embed.add_field(name=PREFIX + curmod.commands[i]['pretty_name'], value=curmod.commands[i]['description'], inline=False) + for name, desc in commands[page * per_page:(page * per_page) + per_page]: + cmd_embed.add_field(name=name, value=desc, inline=False) try: await message.channel.send(embed=cmd_embed) except discord.errors.Forbidden: - await message.channel.send(constants.sonnet.error_embed) - return 1 + raise lib_sonnetcommands.CommandError(constants.sonnet.error_embed) # Per command help elif a in cmds_dict: - if "alias" in cmds_dict[a]: - a = cmds_dict[a]["alias"] - - cmd_name = a - command = SonnetCommand(cmds_dict[a]) - - cmd_embed = discord.Embed(title=f'Command "{cmd_name}"', description=command.description, color=load_embed_color(message.guild, embed_colors.primary, kwargs["ramfs"])) - cmd_embed.set_author(name=helpname) - - cmd_embed.add_field(name="Usage:", value=PREFIX + command.pretty_name, inline=False) - - if "rich_description" in command: - cmd_embed.add_field(name="Detailed information:", value=command.rich_description, inline=False) - - if isinstance(command.permission, str): - perms = command.permission - elif isinstance(command["permission"], (tuple, list)): - perms = command.permission[0] - else: - perms = "NULL" - - cmd_embed.add_field(name="Permission level:", value=perms) - - aliases = ", ".join(filter(lambda c: "alias" in cmds_dict[c] and cmds_dict[c]["alias"] == a, cmds_dict)) - if aliases: - cmd_embed.add_field(name="Aliases:", value=aliases, inline=False) - try: - await message.channel.send(embed=cmd_embed) + await message.channel.send(embed=await help_helper.single_command(a)) except discord.errors.Forbidden: - await message.channel.send(constants.sonnet.error_embed) - return 1 + raise lib_sonnetcommands.CommandError(constants.sonnet.error_embed) # Do not echo user input else: @@ -244,44 +306,20 @@ async def help_function(message: discord.Message, args: List[str], client: disco except ValueError: probably_tried_paging = False - no_command_text: str = "No command or command module with that name" + no_command_text: str = f"No command {'or command module '*(not commandonly)}with that name" if probably_tried_paging: - await message.channel.send(f"{no_command_text} (did you mean `{PREFIX}help -p {int(args[0])}`?)") - else: - await message.channel.send(no_command_text) + raise lib_sonnetcommands.CommandError(f"{no_command_text} (did you mean `{prefix}help -p {int(args[0])}`?)") - return 1 + raise lib_sonnetcommands.CommandError(no_command_text) # Total help else: - if page < 0 or page >= (len(cmds) + (per_page - 1)) // per_page: - await message.channel.send(f"ERROR: No such page {page+1}") - return 1 - - cmd_embed = discord.Embed(title=f"Category Listing (Page {page+1} / {(len(cmds) + (per_page-1))//per_page})", color=load_embed_color(message.guild, embed_colors.primary, kwargs["ramfs"])) - cmd_embed.set_author(name=helpname) - - total = 0 - # Total counting is seperate due to pagination not counting all modules - for cmd in cmds_dict: - if 'alias' not in cmds_dict[cmd]: - total += 1 - - for module in sorted(cmds, key=lambda m: m.category_info['pretty_name'])[(page * per_page):(page * per_page) + per_page]: - mnames = [f"`{i}`" for i in module.commands if 'alias' not in module.commands[i]] - - helptext = ', '.join(mnames) if mnames else module.category_info['description'] - cmd_embed.add_field(name=f"{module.category_info['pretty_name']} ({module.category_info['name']})", value=helptext, inline=False) - - cmd_embed.set_footer(text=f"Total Commands: {total} | Total Endpoints: {len(cmds_dict)}") - try: - await message.channel.send(embed=cmd_embed) + await message.channel.send(embed=await help_helper.full_help(page, per_page)) except discord.errors.Forbidden: - await message.channel.send(constants.sonnet.error_embed) - return 1 + raise lib_sonnetcommands.CommandError(constants.sonnet.error_embed) async def grab_guild_info(message: discord.Message, args: List[str], client: discord.Client, **kwargs: Any) -> Any: @@ -306,8 +344,7 @@ async def grab_guild_info(message: discord.Message, args: List[str], client: dis try: await message.channel.send(embed=guild_embed) except discord.errors.Forbidden: - await message.channel.send(constants.sonnet.error_embed) - return 1 + raise lib_sonnetcommands.CommandError(constants.sonnet.error_embed) async def initialise_poll(message: discord.Message, args: List[str], client: discord.Client, **kwargs: Any) -> Any: @@ -316,11 +353,9 @@ async def initialise_poll(message: discord.Message, args: List[str], client: dis await message.add_reaction("👍") await message.add_reaction("👎") except discord.errors.Forbidden: - await message.channel.send("ERROR: The bot does not have permissions to add a reaction here") - return 1 + raise lib_sonnetcommands.CommandError("ERROR: The bot does not have permissions to add a reaction here") except discord.errors.NotFound: - await message.channel.send("ERROR: Could not find the message [404]") - return 1 + raise lib_sonnetcommands.CommandError("ERROR: Could not find the message [404]") async def coinflip(message: discord.Message, args: List[str], client: discord.Client, **kwargs: Any) -> Any: @@ -355,8 +390,8 @@ async def coinflip(message: discord.Message, args: List[str], client: discord.Cl }, 'help': { - 'pretty_name': 'help [category|command] [-p PAGE]', - 'description': 'Print helptext', + 'pretty_name': 'help [category|command] [-p PAGE] [-c]', + 'description': 'Print helptext, `-c` designates to only look for a command', 'rich_description': 'Gives permission level, aliases (if any), and detailed information (if any) on specific command lookups', 'execute': help_function }, @@ -396,4 +431,4 @@ async def coinflip(message: discord.Message, args: List[str], client: discord.Cl } } -version_info: str = "1.2.10" +version_info: str = "1.2.11" diff --git a/cmds/cmd_version.py b/cmds/cmd_version.py index 7e87d1a..836fc27 100644 --- a/cmds/cmd_version.py +++ b/cmds/cmd_version.py @@ -4,9 +4,9 @@ import importlib import discord -import sys import io import time +import platform import lib_loaders @@ -71,7 +71,7 @@ async def print_version_info(message: discord.Message, args: List[str], client: modules: List[lexdpyk.cmd_module] = kwargs["cmds"] base_versions = [] - base_versions.append(["Python", sys.version.split(" ")[0]]) + base_versions.append([platform.python_implementation(), platform.python_version()]) base_versions.append(["Wrapper", discord.__version__]) base_versions.append(["Kernel", kwargs['main_version']]) base = "\n".join(prettyprint(base_versions)) @@ -184,4 +184,4 @@ async def print_stats(message: discord.Message, args: List[str], client: discord } } -version_info: str = "1.2.9" +version_info: str = "1.2.11" diff --git a/common/wordlist.txt b/common/wordlist.txt index 123eeaf..2a05cb1 100644 --- a/common/wordlist.txt +++ b/common/wordlist.txt @@ -105,10 +105,12 @@ alive alternative always amazing +among ampersand amplifier analog analysis +and angle angled animal @@ -121,6 +123,8 @@ archive area art asterisk +asynchronous +attribute audio autobiography auxiliary @@ -134,6 +138,7 @@ baker bakery balance ban +banned bass bathroom battlegrounds @@ -144,6 +149,7 @@ beats bedrock better bids +big billion biography bioluminescent @@ -169,11 +175,13 @@ cable caffeine calculator calories +can cancel cancelled cancelling candy cannon +cannot cans capital car @@ -186,12 +194,13 @@ casting cat changer channel +char charisma chat chicken chilli -cholerstorol chronicles +class clay clean cleaner @@ -217,10 +226,13 @@ competitive complex comprehension comprehensive +compress +compressed compression computation computer console +consultant control controller conversation @@ -253,6 +265,7 @@ destroys developer deviant diamond +dictionary diet differentiaton difficult @@ -268,6 +281,7 @@ doctor dog door dot +double down dusk dust @@ -289,6 +303,7 @@ electronic eleven eleventh elite +else emerald emulator engineer @@ -329,6 +344,8 @@ eventuation eventuations evenwise evenworthy +ever +evergreen every everyone evil @@ -415,6 +432,7 @@ excoct excoction extravagant eye +far fast faster fastest @@ -440,6 +458,7 @@ flares flatbed flavor flip +float floppy flowers folding @@ -466,6 +485,7 @@ gamer garden gender generation +getter ghost gift glass @@ -473,6 +493,7 @@ glorious glow gold good +grab graphics grass gravity @@ -507,6 +528,8 @@ hub hundred hundredth hyper +if +imposter in incredible information @@ -515,6 +538,8 @@ insert inside instrument instrumental +int +integer integrated integration intellect @@ -553,7 +578,9 @@ limbo lime link lion +list locked +long made magenta magnitude @@ -561,6 +588,8 @@ main make mall manager +map +maria market marketing mars @@ -592,8 +621,10 @@ mouse multiplication music mute +muted nacho neckbeard +necromancy negative neptune netbook @@ -694,7 +725,6 @@ nonegregious nonegregiously nonegregiousness noneidetic -nonejaculatory nonejecting nonejection nonejective @@ -738,6 +768,7 @@ nonelite not notebook notepad +nothing number nurse nutrition @@ -755,6 +786,7 @@ onion only open opossum +or orange ordered ordering @@ -803,8 +835,10 @@ property protein protocol public +pull purgatory purple +push pyramid pythagoras quadrillion @@ -814,10 +848,10 @@ quick quintuple rabbit racer -raven reader reading reboot +record records red redstone @@ -855,6 +889,8 @@ second sensation sensations server +set +setter seven seventeen seventeenth @@ -863,10 +899,13 @@ seventieth seventy shasta shift +short showcase siding signaller simulator +since +sink six sixteen sixteenth @@ -879,6 +918,7 @@ sled sledgehammer sleep sleeping +slept slow slower slowest @@ -889,6 +929,7 @@ someone something sometimes sonic +sounds space speed speedrun @@ -907,17 +948,21 @@ stone stop stream street +string +structure subscribe subtraction subtractor sun sundried +super supply supposed suspicious sweets switch tank +tap tea television temple @@ -942,7 +987,7 @@ tilde time times toast -tomtoes +tomatoes tower train tram @@ -953,6 +998,7 @@ treble triangle trio trust +tune tuner tungsten turntable @@ -964,7 +1010,9 @@ twist two ultra underscore +union unit +unordered unreferenced unreferred unrefilled @@ -1012,6 +1060,7 @@ unrefused unrefusing unrefusingly unregainable +unsigned up uranus username @@ -1028,17 +1077,24 @@ vinyl vista volume wallet +warn +warned watch watching water ways +we weapon website +while whitelist +who +why win window winning won +yellow yes zebra zer0 diff --git a/dlibs/dlib_messages.py b/dlibs/dlib_messages.py index 25f3102..be60066 100644 --- a/dlibs/dlib_messages.py +++ b/dlibs/dlib_messages.py @@ -3,7 +3,8 @@ import importlib -import time, asyncio, os, hashlib, io +import time, asyncio, os, hashlib +import copy as pycopy import discord, lz4.frame @@ -37,7 +38,7 @@ from lib_parsers import parse_blacklist, parse_skip_message, parse_permissions, grab_files, generate_reply_field from lib_encryption_wrapper import encrypted_writer from lib_compatibility import user_avatar_url, discord_datetime_now -from lib_sonnetcommands import SonnetCommand +from lib_sonnetcommands import SonnetCommand, CommandCtx, CallCtx from typing import List, Any, Dict, Optional, Callable, Tuple import lib_lexdpyk_h as lexdpyk @@ -192,9 +193,24 @@ async def on_message_edit(old_message: discord.Message, message: discord.Message broke_blacklist, notify, infraction_type = parse_blacklist((message, mconf, ramfs), ) if broke_blacklist: + + command_ctx = CommandCtx( + stats={}, + cmds=kargs["command_modules"][0], + ramfs=ramfs, + bot_start=kargs["bot_start"], + dlibs=kargs["dynamiclib_modules"][0], + main_version=kargs["kernel_version"], + kernel_ramfs=kernel_ramfs, + conf_cache={}, + verbose=False, + cmds_dict=kargs["command_modules"][1], + automod=True + ) + asyncio.create_task(attempt_message_delete(message)) execargs = [str(message.author.id), "[AUTOMOD]", ", ".join(infraction_type), "Blacklist"] - await kargs["command_modules"][1][mconf["blacklist-action"]]['execute'](message, execargs, client, verbose=False, ramfs=ramfs, automod=True) + await CallCtx(kargs["command_modules"][1][mconf["blacklist-action"]]['execute'])(message, execargs, client, command_ctx) if notify: asyncio.create_task(grab_an_adult(message, message.guild, client, mconf, kargs["ramfs"])) @@ -328,14 +344,14 @@ async def log_message_files(message: discord.Message, kernel_ramfs: lexdpyk.ram_ ramfs_path = f"{message.guild.id}/files/{message.id}/{hashlib.sha256(fname).hexdigest()}" - namefile = kernel_ramfs.create_f(f"{ramfs_path}/name", f_type=io.BytesIO) + namefile = kernel_ramfs.create_f(f"{ramfs_path}/name") namefile.write(fname) - keyfile = kernel_ramfs.create_f(f"{ramfs_path}/key", f_type=io.BytesIO) + keyfile = kernel_ramfs.create_f(f"{ramfs_path}/key") keyfile.write(key := os.urandom(32)) keyfile.write(iv := os.urandom(16)) - pointerfile = kernel_ramfs.create_f(f"{ramfs_path}/pointer", f_type=io.BytesIO) + pointerfile = kernel_ramfs.create_f(f"{ramfs_path}/pointer") pointer = hashlib.sha256(fname + key + iv).hexdigest() file_loc = f"./datastore/{message.guild.id}-{pointer}.cache.db" pointerfile.write(file_loc.encode("utf8")) @@ -378,13 +394,31 @@ async def on_message(message: discord.Message, **kargs: Any) -> None: message_deleted: bool = False + command_ctx = CommandCtx( + stats=stats, + cmds=command_modules, + ramfs=ramfs, + bot_start=bot_start_time, + dlibs=kargs["dynamiclib_modules"][0], + main_version=main_version_info, + kernel_ramfs=kargs["kernel_ramfs"], + conf_cache=mconf, + verbose=True, + cmds_dict=command_modules_dict, + automod=False + ) + + automod_ctx = pycopy.copy(command_ctx) + automod_ctx.verbose = False + automod_ctx.automod = True + # If blacklist broken generate infraction broke_blacklist, notify, infraction_type = parse_blacklist((message, mconf, ramfs), ) if broke_blacklist: message_deleted = True asyncio.create_task(attempt_message_delete(message)) execargs = [str(message.author.id), "[AUTOMOD]", ", ".join(infraction_type), "Blacklist"] - asyncio.create_task(command_modules_dict[mconf["blacklist-action"]]['execute'](message, execargs, client, verbose=False, ramfs=ramfs, automod=True)) + asyncio.create_task(CallCtx(command_modules_dict[mconf["blacklist-action"]]['execute'])(message, execargs, client, automod_ctx)) if spammer: message_deleted = True @@ -392,7 +426,7 @@ async def on_message(message: discord.Message, **kargs: Any) -> None: with db_hlapi(message.guild.id) as db: if not db.is_muted(userid=message.author.id): execargs = [str(message.author.id), mconf["antispam-time"], "[AUTOMOD]", spamstr] - asyncio.create_task(command_modules_dict["mute"]['execute'](message, execargs, client, verbose=False, ramfs=ramfs, automod=True)) + asyncio.create_task(CallCtx(command_modules_dict["mute"]["execute"])(message, execargs, client, automod_ctx)) if notify: asyncio.create_task(grab_an_adult(message, message.guild, client, mconf, ramfs)) @@ -433,22 +467,13 @@ async def on_message(message: discord.Message, **kargs: Any) -> None: try: stats["end"] = round(time.time() * 100000) - await cmd.execute( - message, - arguments, - client, - stats=stats, - cmds=command_modules, - ramfs=ramfs, - bot_start=bot_start_time, - dlibs=kargs["dynamiclib_modules"][0], - main_version=main_version_info, - kernel_ramfs=kargs["kernel_ramfs"], - conf_cache=mconf, - verbose=True, - cmds_dict=command_modules_dict, - automod=False - ) + try: + await cmd.execute_ctx(message, arguments, client, command_ctx) + except lib_sonnetcommands.CommandError as ce: + try: + await message.channel.send(ce) + except discord.errors.Forbidden: + pass # Regenerate cache if cmd.cache in ["purge", "regenerate"]: @@ -497,4 +522,4 @@ async def on_message(message: discord.Message, **kargs: Any) -> None: "on-message-delete": on_message_delete, } -version_info: str = "1.2.10" +version_info: str = "1.2.11" diff --git a/dlibs/dlib_starboard.py b/dlibs/dlib_starboard.py index 87d18bf..7c88bdc 100644 --- a/dlibs/dlib_starboard.py +++ b/dlibs/dlib_starboard.py @@ -41,18 +41,16 @@ async def on_reaction_add(reaction: discord.Reaction, user: discord.User, **karg if (channel_id := mconf["starboard-channel"]) and (channel := client.get_channel(int(channel_id))) and isinstance(channel, discord.TextChannel): with db_hlapi(message.guild.id) as db: - db.inject_enum("starboard", [ - ("messageID", str), - ]) - if not (db.grab_enum("starboard", str(message.id))) and not (int(channel_id) == message.channel.id): + with db.inject_enum_context("starboard", [("messageID", str)]) as starboard: + if not (starboard.grab(str(message.id))) and not (int(channel_id) == message.channel.id): - # Add to starboard - db.set_enum("starboard", [str(message.id)]) + # Add to starboard + starboard.set([str(message.id)]) - try: - await channel.send(embed=(await build_starboard_embed(message))) - except discord.errors.Forbidden: - pass + try: + await channel.send(embed=(await build_starboard_embed(message))) + except discord.errors.Forbidden: + pass category_info = {'name': 'Starboard'} @@ -61,4 +59,4 @@ async def on_reaction_add(reaction: discord.Reaction, user: discord.User, **karg "on-reaction-add": on_reaction_add, } -version_info: str = "1.2.8" +version_info: str = "1.2.11" diff --git a/libs/lib_encryption_wrapper.py b/libs/lib_encryption_wrapper.py index 8b0e98b..79ea6cd 100644 --- a/libs/lib_encryption_wrapper.py +++ b/libs/lib_encryption_wrapper.py @@ -8,7 +8,7 @@ import io -from typing import Generator, Any, Union +from typing import Generator, Any, Union, Protocol class errors: @@ -23,18 +23,15 @@ class NotSonnetAESError(FileNotFoundError): # Never actually called class crypto_typing: - class encryptor_decryptor: - def __init__(self) -> None: - pass - + class encryptor_decryptor(Protocol): def update(self, buf: Union[bytes, bytearray]) -> bytes: - return bytes() + ... def update_into(self, bufin: Union[bytes, bytearray], bufout: Union[bytes, bytearray]) -> None: - pass + ... def finalize(self) -> bytes: - return bytes() + ... def directBinNumber(inData: int, length: int) -> bytes: diff --git a/libs/lib_lexdpyk_h.py b/libs/lib_lexdpyk_h.py index 412b5dc..179d020 100644 --- a/libs/lib_lexdpyk_h.py +++ b/libs/lib_lexdpyk_h.py @@ -1,13 +1,14 @@ # Headers for the kernel ramfs -from typing import Optional, List, Any, Tuple, Dict, Callable, Coroutine, Type, TypeVar +import io + +from typing import Optional, List, Any, Tuple, Dict, Callable, Coroutine, Type, TypeVar, Protocol, overload Obj = TypeVar("Obj") # Define ramfs headers -class ram_filesystem: - # pytype: disable=bad-return-type +class ram_filesystem(Protocol): def mkdir(self, dirstr: Optional[str] = None, dirlist: Optional[List[str]] = None) -> "ram_filesystem": ... @@ -17,9 +18,24 @@ def remove_f(self, dirstr: Optional[str] = None, dirlist: Optional[List[str]] = def read_f(self, dirstr: Optional[str] = None, dirlist: Optional[List[str]] = None) -> Any: ... + # pytype: disable=not-callable + @overload + def create_f(self, dirstr: Optional[str] = None, dirlist: Optional[List[str]] = None) -> io.BytesIO: + ... + + @overload + def create_f(self, dirstr: Optional[str] = None, dirlist: Optional[List[str]] = None, f_type: Optional[Callable[[Any], Obj]] = None, f_args: Optional[List[Any]] = None) -> Obj: + ... + + @overload + def create_f(self, dirstr: Optional[str] = None, dirlist: Optional[List[str]] = None, f_type: Optional[Callable[[], Obj]] = None) -> Obj: + ... + + @overload def create_f(self, dirstr: Optional[str] = None, dirlist: Optional[List[str]] = None, f_type: Optional[Type[Obj]] = None, f_args: Optional[List[Any]] = None) -> Obj: ... + # pytype: enable=not-callable def rmdir(self, dirstr: Optional[str] = None, dirlist: Optional[List[str]] = None) -> None: ... @@ -29,10 +45,8 @@ def ls(self, dirstr: Optional[str] = None, dirlist: Optional[List[str]] = None) def tree(self, dirstr: Optional[str] = None, dirlist: Optional[List[str]] = None) -> Tuple[List[str], Dict[str, Tuple[Any]]]: ... - # pytype: enable=bad-return-type - -class cmd_module: +class cmd_module(Protocol): __name__: str category_info: Dict[str, str] commands: Dict[str, Dict[str, Any]] @@ -42,7 +56,7 @@ class cmd_module: cmd_modules_dict = Dict[str, Dict[str, Any]] -class dlib_module: +class dlib_module(Protocol): __name__: str category_info: Dict[str, str] commands: Dict[str, Callable[..., Coroutine[Any, Any, None]]] diff --git a/libs/lib_loaders.py b/libs/lib_loaders.py index 9243490..3055096 100644 --- a/libs/lib_loaders.py +++ b/libs/lib_loaders.py @@ -25,7 +25,7 @@ from lib_db_obfuscator import db_hlapi from lib_sonnetconfig import CLIB_LOAD, GLOBAL_PREFIX, BLACKLIST_ACTION -from typing import Any, Tuple, Optional, Union, cast, Type, Dict +from typing import Any, Tuple, Optional, Union, cast, Type, Dict, Protocol import lib_lexdpyk_h as lexdpyk @@ -59,7 +59,7 @@ def _wrap(self, funcname: str) -> None: loader = DotHeaders(ctypes.CDLL(clib_name)).lib except OSError: try: - if subprocess.run(["make"]).returncode == 0: + if subprocess.run(["make", "all"]).returncode == 0: loader = DotHeaders(ctypes.CDLL(clib_name)).lib else: clib_exists = False @@ -85,13 +85,23 @@ def directBinNumber(inData: int, length: int) -> Tuple[int, ...]: } +class Reader(Protocol): + def read(self, size: int = -1) -> bytes: + ... + + +class Writer(Protocol): + def write(self, data: bytes) -> int: + ... + + # Read a vnum from a file stream -def read_vnum(fileobj: io.BufferedReader) -> int: +def read_vnum(fileobj: Reader) -> int: return int.from_bytes(fileobj.read(int.from_bytes(fileobj.read(1), "little")), "little") # Write a vnum to a file stream -def write_vnum(fileobj: io.BufferedWriter, number: int) -> None: +def write_vnum(fileobj: Writer, number: int) -> None: vnum_count = (number.bit_length() + 7) // 8 fileobj.write(bytes([vnum_count])) fileobj.write(bytes(directBinNumber(number, vnum_count))) @@ -109,9 +119,9 @@ def load_message_config(guild_id: int, ramfs: lexdpyk.ram_filesystem, datatypes: try: # Loads fileio object - blacklist_cache = ramfs.read_f(f"{guild_id}/caches/{datatypes[0]}") + blacklist_cache: io.BytesIO = ramfs.read_f(f"{guild_id}/caches/{datatypes[0]}") blacklist_cache.seek(0) - message_config: dict[str, Any] = {} + message_config: Dict[str, Any] = {} # Imports csv style data for i in datatypes["csv"]: diff --git a/libs/lib_parsers.py b/libs/lib_parsers.py index 9173b40..f710492 100644 --- a/libs/lib_parsers.py +++ b/libs/lib_parsers.py @@ -19,16 +19,23 @@ import lib_constants importlib.reload(lib_constants) +import lib_sonnetcommands + +importlib.reload(lib_sonnetcommands) from lib_sonnetconfig import REGEX_VERSION from lib_db_obfuscator import db_hlapi from lib_encryption_wrapper import encrypted_reader import lib_constants as constants -from typing import Callable, Iterable, Optional, Any, Tuple, Dict, Union, List, cast +from typing import Callable, Iterable, Optional, Any, Tuple, Dict, Union, List import lib_lexdpyk_h as lexdpyk -re: Any = importlib.import_module(REGEX_VERSION) +# Import re here to trick type checker into using re stubs even if importlib grabs re2, they (should) have the same stubs +import re + +# Place this in the globals scope by hand to avoid pyflakes saying its a redefinition +globals()["re"] = importlib.import_module(REGEX_VERSION) class errors: @@ -61,6 +68,11 @@ def _formatregexfind(gex: List[Any]) -> str: return ", ".join(i if isinstance(i, str) else "".join(i) for i in gex) +# This exists because type checkers cant infer lambda return types or something +def returnsNone() -> None: + ... + + # Run a blacklist pass over a messages content and files def parse_blacklist(indata: _parse_blacklist_inputs) -> tuple[bool, bool, list[str]]: """ @@ -101,7 +113,7 @@ def parse_blacklist(indata: _parse_blacklist_inputs) -> tuple[bool, bool, list[s if blacklist["url-blacklist"]: ramfs.create_f(f"{message.guild.id}/regex/url", f_type=re.compile, f_args=[_compileurl(blacklist["url-blacklist"])]) else: - ramfs.create_f(f"{message.guild.id}/regex/url", f_type=cast(Any, lambda: None), f_args=[]) + ramfs.create_f(f"{message.guild.id}/regex/url", f_type=returnsNone) blacklist["regex-blacklist"] = [ramfs.read_f(f"{message.guild.id}/regex/regex-blacklist/{i}") for i in ramfs.ls(f"{message.guild.id}/regex/regex-blacklist")[0]] blacklist["regex-notifier"] = [ramfs.read_f(f"{message.guild.id}/regex/regex-notifier/{i}") for i in ramfs.ls(f"{message.guild.id}/regex/regex-notifier")[0]] @@ -277,7 +289,7 @@ def _parse_role_perms(author: discord.Member, permrole: str) -> bool: # Parse user permissions to run a command -async def parse_permissions(message: discord.Message, mconf: dict[str, str], perms: Permtype, verbose: bool = True) -> bool: +async def parse_permissions(message: discord.Message, mconf: Dict[str, str], perms: Permtype, verbose: bool = True) -> bool: """ Parse the permissions of the given member object to check if they meet the required permtype Verbosity can be set to not print if the perm check failed @@ -476,17 +488,16 @@ async def parse_role(message: discord.Message, args: list[str], db_entry: str, v # Grab a message object from a link or message mention -async def parse_channel_message(message: discord.Message, args: list[str], client: discord.Client) -> tuple[discord.Message, int]: +async def parse_channel_message_noexcept(message: discord.Message, args: list[str], client: discord.Client) -> tuple[discord.Message, int]: """ Parse a channel message from a url, #channel messageid, or channelid-messageid field :returns: Tuple[discord.Message, int] -- The message and the amount of args the message grabbing took - :raises: errors.message_parse_failure -- The message did not exist or the function had invalid inputs + :raises: lib_sonnetcommands.CommandError -- The message did not exist or the function had invalid inputs """ if not message.guild: - await message.channel.send("ERROR: Not a guild message") - raise errors.message_parse_failure("ERROR: Not a guild message") + raise lib_sonnetcommands.CommandError("ERROR: Not a guild message") try: message_link = args[0].replace("-", "/").split("/") @@ -499,69 +510,77 @@ async def parse_channel_message(message: discord.Message, args: list[str], clien message_id = args[1] nargs = 2 except IndexError: - await message.channel.send(constants.sonnet.error_args.not_enough) - raise errors.message_parse_failure + raise lib_sonnetcommands.CommandError(constants.sonnet.error_args.not_enough) try: log_channel = int(log_channel) except ValueError: - await message.channel.send(constants.sonnet.error_channel.invalid) - raise errors.message_parse_failure + raise lib_sonnetcommands.CommandError(constants.sonnet.error_channel.invalid) discord_channel = client.get_channel(log_channel) if not discord_channel: - await message.channel.send(constants.sonnet.error_channel.invalid) - raise errors.message_parse_failure + raise lib_sonnetcommands.CommandError(constants.sonnet.error_channel.invalid) if not isinstance(discord_channel, discord.TextChannel): - await message.channel.send(constants.sonnet.error_channel.scope) - raise errors.message_parse_failure + raise lib_sonnetcommands.CommandError(constants.sonnet.error_channel.scope) if discord_channel.guild.id != message.guild.id: - await message.channel.send(constants.sonnet.error_channel.scope) - raise errors.message_parse_failure + raise lib_sonnetcommands.CommandError(constants.sonnet.error_channel.scope) try: discord_message = await discord_channel.fetch_message(int(message_id)) except (ValueError, discord.errors.HTTPException): - await message.channel.send(constants.sonnet.error_message.invalid) - raise errors.message_parse_failure + raise lib_sonnetcommands.CommandError(constants.sonnet.error_message.invalid) if not discord_message: - await message.channel.send(constants.sonnet.error_message.invalid) - raise errors.message_parse_failure + raise lib_sonnetcommands.CommandError(constants.sonnet.error_message.invalid) return (discord_message, nargs) -async def parse_user_member(message: discord.Message, - args: list[str], - client: discord.Client, - argindex: int = 0, - default_self: bool = False) -> tuple[discord.Member | discord.User, Optional[discord.Member]]: +async def parse_channel_message(message: discord.Message, args: List[str], client: discord.Client) -> Tuple[discord.Message, int]: + """ + Parse a channel message from a url, #channel messageid, or channelid-messageid field + + :returns: Tuple[discord.Message, int] -- The message and the amount of args the message grabbing took + :raises: errors.message_parse_failure -- The message did not exist or the function had invalid inputs + """ + try: + return await parse_channel_message_noexcept(message, args, client) + except lib_sonnetcommands.CommandError as ce: + await message.channel.send(ce) + raise errors.message_parse_failure(ce) + + +UserInterface = Union[discord.User, discord.Member] + + +async def parse_user_member_noexcept(message: discord.Message, + args: List[str], + client: discord.Client, + argindex: int = 0, + default_self: bool = False) -> Tuple[UserInterface, Optional[discord.Member]]: """ Parse a user and member object from a potential user string Always returns a user, only returns member if the user is in the guild User returned might be a member, do not rely on this. - :returns: tuple[discord.User | discord.Member, Optional[discord.Member]] -- A discord user and optional member - :raises: errors.user_parse_error -- Could not find the user or input invalid + :returns: Tuple[Union[discord.User, discord.Member], Optional[discord.Member]] -- A discord user and optional member + :raises: lib_sonnetcommands.CommandError -- Could not find the user or input invalid """ if not message.guild or not isinstance(message.author, discord.Member): - raise errors.user_parse_error("Not a guild message") + raise lib_sonnetcommands.CommandError("Not a guild message") try: uid = int(args[argindex].strip("<@!>")) except ValueError: - await message.channel.send("Invalid UserID") - raise errors.user_parse_error("Invalid User") + raise lib_sonnetcommands.CommandError("Invalid UserID") except IndexError: if default_self: return message.author, message.author else: - await message.channel.send("No user specified") - raise errors.user_parse_error("No user specified") + raise lib_sonnetcommands.CommandError("No user specified") member: Optional[discord.Member] user: Optional[discord.User | discord.Member] @@ -571,12 +590,27 @@ async def parse_user_member(message: discord.Message, if not (user := client.get_user(uid)): user = await client.fetch_user(uid) except (discord.errors.NotFound, discord.errors.HTTPException): - await message.channel.send("User does not exist") - raise errors.user_parse_error("User does not exist") + raise lib_sonnetcommands.CommandError("User does not exist") return user, member +async def parse_user_member(message: discord.Message, args: List[str], client: discord.Client, argindex: int = 0, default_self: bool = False) -> Tuple[UserInterface, Optional[discord.Member]]: + """ + Parse a user and member object from a potential user string + Always returns a user, only returns member if the user is in the guild + User returned might be a member, do not rely on this. + + :returns: tuple[discord.User | discord.Member, Optional[discord.Member]] -- A discord user and optional member + :raises: errors.user_parse_error -- Could not find the user or input invalid + """ + try: + return await parse_user_member_noexcept(message, args, client, argindex=argindex, default_self=default_self) + except lib_sonnetcommands.CommandError as ce: + await message.channel.send(ce) + raise errors.user_parse_error(ce) + + def format_duration(durationSeconds: Union[int, float]) -> str: """ Returns an end user formatted duration from a seconds duration up to decades diff --git a/libs/lib_sonnetcommands.py b/libs/lib_sonnetcommands.py index 3712393..7599826 100644 --- a/libs/lib_sonnetcommands.py +++ b/libs/lib_sonnetcommands.py @@ -1,9 +1,12 @@ # Command backwards compatible api wrapper that allows new endpoints # Ultrabear 2021 -import discord +import inspect +from typing import (Any, Callable, Coroutine, Dict, List, Protocol, Tuple, Union, cast) +from typing_extensions import TypeGuard # pytype: disable=not-supported-yet -from typing import Any, List, Callable, Coroutine, Union, Tuple, Protocol, cast +import discord +import lib_lexdpyk_h as lexdpyk class ExecutableT(Protocol): @@ -11,6 +14,11 @@ def __call__(self, message: discord.Message, args: List[str], client: discord.Cl ... +class ExecutableCtxT(Protocol): + def __call__(self, message: discord.Message, args: List[str], client: discord.Client, ctx: "CommandCtx") -> Coroutine[None, None, Any]: + ... + + PermissionT = Union[str, Tuple[str, Callable[[discord.Message], bool]]] _allowpool = { @@ -19,10 +27,93 @@ def __call__(self, message: discord.Message, args: List[str], client: discord.Cl } +class CommandError(Exception): + """ + CommandError is an error that can be raised by a command + + It is treated specially by the exception handler such that it + will print the error string to the current channel instead of raising to the kernel, and have the same effect as return != 1 + it should be used for cleaner user error handling + +` `await message.channel.send("error"); return 1` + may be replaced with + `raise CommandError("error")` + for the same effect + """ + __slots__ = () + + +class CommandCtx: + __slots__ = "stats", "cmds", "ramfs", "kernel_ramfs", "bot_start", "dlibs", "main_version", "conf_cache", "verbose", "cmds_dict", "automod" + + def __init__(self, CtxToKwargdata: Dict[str, Any] = {}, **askwargs: Any) -> None: + + kwargdata = askwargs if askwargs else CtxToKwargdata + + self.stats: Dict[str, int] = kwargdata["stats"] + self.cmds: List[lexdpyk.cmd_module] = kwargdata["cmds"] + self.ramfs: lexdpyk.ram_filesystem = kwargdata["ramfs"] + self.kernel_ramfs: lexdpyk.ram_filesystem = kwargdata["kernel_ramfs"] + self.bot_start: float = kwargdata["bot_start"] + self.dlibs: List[lexdpyk.dlib_module] = kwargdata["dlibs"] + self.main_version: str = kwargdata["main_version"] + self.conf_cache: Dict[str, Any] = kwargdata["conf_cache"] + self.verbose: bool = kwargdata["verbose"] + self.cmds_dict: lexdpyk.cmd_modules_dict = kwargdata["cmds_dict"] + self.automod: bool = kwargdata["automod"] + + def to_dict(self) -> Dict[str, Any]: + return { + "stats": self.stats, + "cmds": self.cmds, + "ramfs": self.ramfs, + "kernel_ramfs": self.kernel_ramfs, + "bot_start": self.bot_start, + "dlibs": self.dlibs, + "main_version": self.main_version, + "conf_cache": self.conf_cache, + "verbose": self.verbose, + "cmds_dict": self.cmds_dict, + "automod": self.automod, + } + + +def _iskwargcallable(func: Union[ExecutableT, ExecutableCtxT]) -> TypeGuard[ExecutableT]: + spec = inspect.getfullargspec(func) + return len(spec.args) == 3 and spec.varkw is not None + + +def CallKwargs(func: Union[ExecutableT, ExecutableCtxT]) -> ExecutableT: + if _iskwargcallable(func): + return func + else: + # Closures go brr + def KwargsToCtx(message: discord.Message, args: List[str], client: discord.Client, **kwargs: Any) -> Coroutine[None, None, Any]: + ctx = CommandCtx(kwargs) + return cast(ExecutableCtxT, func)(message, args, client, ctx) + + return KwargsToCtx + + +def CallCtx(func: Union[ExecutableCtxT, ExecutableT]) -> ExecutableCtxT: + if _iskwargcallable(func): + + def CtxToKwargs(message: discord.Message, args: List[str], client: discord.Client, ctx: CommandCtx) -> Coroutine[None, None, Any]: + kwargs = ctx.to_dict() + return func(message, args, client, **kwargs) + + return CtxToKwargs + else: + return cast(ExecutableCtxT, func) + + class SonnetCommand(dict): # type: ignore[type-arg] __slots__ = () def __getitem__(self, item: Any) -> Any: + # override execute to return a CallKwargs to maintain backwards compat + if item == "execute": + return CallKwargs(super().__getitem__(item)) try: return super().__getitem__(item) except KeyError: @@ -44,6 +135,14 @@ def __contains__(self, item: Any) -> bool: def execute(self) -> ExecutableT: return cast(ExecutableT, self["execute"]) + @property + def execute_kwargs(self) -> ExecutableT: + return CallKwargs(super().__getitem__("execute")) + + @property + def execute_ctx(self) -> ExecutableCtxT: + return CallCtx(super().__getitem__("execute")) + @property def cache(self) -> str: return cast(str, self["cache"]) diff --git a/libs/lib_sonnetdb.py b/libs/lib_sonnetdb.py index b63da31..93a1016 100644 --- a/libs/lib_sonnetdb.py +++ b/libs/lib_sonnetdb.py @@ -74,7 +74,7 @@ def __init__(self, guild_id: Optional[int], lock: Optional[threading.Lock] = Non self.database = self._db # Deprecated name self.guild: Optional[int] = guild_id - self.hlapi_version = (1, 2, 9) + self.hlapi_version = (1, 2, 11) self._sonnet_db_version = self._get_db_version() if lock is not None: @@ -193,7 +193,7 @@ def set_enum(self, name: str, cpush: List[Union[str, int]]) -> None: self.create_guild_db() self._db.add_to_table(f"{self.guild}_{name}", push) - def delete_enum(self, enumname: str, key: str) -> None: + def delete_enum(self, enumname: str, key: Union[str, int]) -> None: """ Deletes a row in an enums table based on primary key @@ -227,6 +227,23 @@ def list_enum(self, enumName: str) -> List[Union[str, int]]: except db_error.OperationalError: return [] + def enum_context(self, enumName: str) -> "_enum_context": + """ + Returns a context manager to access db_enums functions in a safer and more concise way + + :returns: _enum_context - An enum context manager + """ + return _enum_context(self, enumName) + + def inject_enum_context(self, enumName: str, schema: List[Tuple[str, Type[Union[str, int]]]]) -> "_enum_context": + """ + A combination of inject_enum and enum_context that returns an enum context of the just injected enum + + :returns: _enum_context - An enum context manager + """ + self.inject_enum(enumName, schema) + return _enum_context(self, enumName) + def create_guild_db(self) -> None: """ Create a guilds database @@ -295,7 +312,12 @@ def grab_moderator_infractions(self, moderatorid: Union[int, str]) -> Tuple[Any, return data - def grab_filter_infractions(self, user: Optional[int] = None, moderator: Optional[int] = None, itype: Optional[str] = None, automod: bool = True, count: bool = False) -> Union[InfractionT, int]: + def grab_filter_infractions(self, + user: Optional[int] = None, + moderator: Optional[int] = None, + itype: Optional[str] = None, + automod: Optional[bool] = None, + count: bool = False) -> Union[List[InfractionT], int]: schm: List[List[str]] = [] if user is not None: @@ -304,23 +326,22 @@ def grab_filter_infractions(self, user: Optional[int] = None, moderator: Optiona schm.append(["moderatorID", str(moderator)]) if itype is not None: schm.append(["type", itype]) - if not automod: - schm.append(["reason", "[AUTOMOD]%", "NOT LIKE"]) - db_type = Union[InfractionT, int] + if automod is False: + schm.append(["reason", "[AUTOMOD]%", "NOT LIKE"]) + elif automod is True: + schm.append(["reason", "[AUTOMOD]%", "LIKE"]) try: if self._db.TEXT_KEY: self._db.make_new_index(f"{self.guild}_infractions", f"{self.guild}_infractions_users", ["userID"]) self._db.make_new_index(f"{self.guild}_infractions", f"{self.guild}_infractions_moderators", ["moderatorID"]) if count: - data = cast(db_type, self._db.multicount_rows_from_table(f"{self.guild}_infractions", schm)) + return self._db.multicount_rows_from_table(f"{self.guild}_infractions", schm) else: - data = cast(db_type, self._db.multifetch_rows_from_table(f"{self.guild}_infractions", schm)) + return cast(List[InfractionT], list(self._db.multifetch_rows_from_table(f"{self.guild}_infractions", schm))) except db_error.OperationalError: - data = cast(db_type, tuple()) if not count else 0 - - return data + return 0 if count else list() # Check if a message is on the starboard already def in_starboard(self, message_id: int) -> bool: @@ -531,3 +552,39 @@ def __exit__(self, err_type: Optional[Type[Exception]], err_value: Optional[str] self._db.commit() if err_type: raise err_type(err_value) + + +class _enum_context: + __slots__ = "_hlapi", "_name" + + def __init__(self, hlapi: db_hlapi, enum_name: str) -> None: + + self._hlapi: db_hlapi = hlapi + self._name: str = enum_name + + def __enter__(self) -> "_enum_context": + return self + + def __exit__(self, err_type: Optional[Type[Exception]], err_value: Optional[str], err_traceback: Any) -> None: + if err_type: + raise err_type(err_value) + + def grab(self, name: Union[str, int]) -> Optional[List[Union[str, int]]]: + self._hlapi.grab_enum.__doc__ + + return self._hlapi.grab_enum(self._name, name) + + def set(self, cpush: List[Union[str, int]]) -> None: + self._hlapi.set_enum.__doc__ + + return self._hlapi.set_enum(self._name, cpush) + + def delete(self, name: Union[str, int]) -> None: + self._hlapi.delete_enum.__doc__ + + return self._hlapi.delete_enum(self._name, name) + + def list(self) -> List[Union[str, int]]: + self._hlapi.list_enum.__doc__ + + return self._hlapi.list_enum(self._name) diff --git a/libs/lib_tparse.py b/libs/lib_tparse.py new file mode 100644 index 0000000..8533664 --- /dev/null +++ b/libs/lib_tparse.py @@ -0,0 +1,243 @@ +# Typed argument parsing library for sonnet (or the world idk) +# Ultrabear 2022 + +import sys + +# Python types +from typing import List, Dict, Tuple, Type +# Predefined types +from typing import Optional, Iterator, Sequence, Callable +# Constructs +from typing import Any, Generic, Protocol, TypeVar, Union, Literal, overload + +__all__ = [ + "StringWriter", + "TParseError", + "NotParsedError", + "ParseFailureError", + "Promise", + "Parser", + "store_true", + "store_false", + "add_true_false_flag", + ] + + +class StringWriter(Protocol): + """ + A StringWriter is an interface for any object (T) containing a T.write(s: str) -> int method + """ + def write(self, s: str) -> int: + ... + + +class TParseError(Exception): + """ + An exception that is the BaseException of the tparse library + """ + __slots__ = () + + +class NotParsedError(TParseError): + """ + NotParsedError is raised when a Promises value is attempted to be gotten without having been parsed + """ + __slots__ = () + + +class ParseFailureError(TParseError): + """ + ParseFailureError is raised when Parser.parse() fails to parse arguments and exit_on_fail is set to False + """ + __slots__ = () + + +# Promise Type +_PT = TypeVar("_PT") +# Parser Argument Type +_PAT = TypeVar("_PAT") +# Parser Type +_ParserT = TypeVar("_ParserT") +# C Iterator Type +_CIT = TypeVar("_CIT") + + +class _IteratorCtx: + __slots__ = "i", + + def __init__(self) -> None: + self.i = -1 + + +class _CIterator(Generic[_CIT]): + """ + CIterator is a closer to C style for loop, allowing (i = 0; i < len(T); i++) where i is mutable + """ + __slots__ = "_sequence", "_state" + + def __init__(self, iterator: Sequence[_CIT]) -> None: + self._sequence = iterator + self._state = _IteratorCtx() + + def __iter__(self) -> Iterator[Tuple[_IteratorCtx, _CIT]]: + return self + + def __next__(self) -> Tuple[_IteratorCtx, _CIT]: + self._state.i += 1 + + if self._state.i >= len(self._sequence): + raise StopIteration + + return self._state, self._sequence[self._state.i] + + +class Promise(Generic[_PT]): + """ + A Promise is similar to the concept of an async promise, + an argument will be in the promise after parsing has completed, + but attempting to look before parsing has completed will error. + + As such the argument parser returns typed promise objects per argument that will return data once it has completed parsing, but not before. + You may also construct a Promise directly with Promise[T]() or with Promise(T) for py3.8 users, and pass it to add_arg(store=) to parse to for multi argument parsing. + Correct typing is not gauranteed at runtime, but by mypy type checking, code that fails mypy type checking will produce unpredictable runtime behavior. + """ + __slots__ = "_parsed", "_data" + + def __init__(self, typ: Optional[Type[_PT]] = None) -> None: + self._parsed: bool = False + self._data: Optional[_PT] = None + + @overload + def get(self, default: _PT) -> _PT: + ... + + @overload + def get(self, default: Optional[_PT] = None) -> Optional[_PT]: + ... + + def get(self, default: Optional[_PT] = None) -> Optional[_PT]: + if not self._parsed: + raise NotParsedError("This promised argument has not been parsed") + # Default override + return default if self._data is None else self._data + + +class _ParserArgument(Generic[_PAT]): + __slots__ = "names", "func", "flag", "store", "helpstr" + + def __init__(self, names: Union[str, List[str]], func: Callable[[str], _PAT], flag: bool, store: Promise[_PAT], helpstr: Optional[str]) -> None: + self.names = names + self.func = func + self.flag = flag + self.store = store + self.helpstr = helpstr + + +class Parser: + """ + Parser is the core of the argument parsing library, it facilitates parsing arguments and making good on promises + """ + __slots__ = "_arguments", "_arghash", "_name", "_buildhelp" + + def __init__(self, name: str = sys.argv[0], buildhelp: bool = False) -> None: + + self._arguments: List[_ParserArgument[Any]] = [] + self._arghash: Dict[str, _ParserArgument[Any]] = {} + self._name = name + self._buildhelp = buildhelp + # TODO(ultrabear): yeah put this in lmfao + if buildhelp is True: raise NotImplementedError("Help building is not yet implemented") + + def add_arg( + self, + names: Union[List[str], str], + func: Callable[[str], _ParserT], + flag: bool = False, + store: Optional[Promise[_ParserT]] = None, + helpstr: Optional[str] = None, + ) -> Promise[_ParserT]: + """add_arg returns a Promise for a passed argument and stores it internally for parsing""" + + if store is None: + store = Promise() + + # Pointers are fun + arg = _ParserArgument(names, func, flag, store, helpstr) + + self._arguments.append(arg) + + # Put all names into argument hashmap pointing to same arg for fast lookup + if isinstance(names, str): + self._arghash[names] = arg + else: + for name in names: + self._arghash[name] = arg + + return store + + def _error(self, errstr: str, exit_on_fail: bool, stderr: StringWriter) -> None: + + stderr.write(errstr) + stderr.write("\n") + + if exit_on_fail: + sys.exit(1) + else: + raise ParseFailureError(errstr) + + def parse(self, args: List[str] = sys.argv[1:], stderr: StringWriter = sys.stderr, exit_on_fail: bool = True, lazy: bool = False, consume: bool = False) -> None: + """ + parse will parse either given args or sys.argv and output errors to stderr or a given StringWriter, exiting or raising an exception based on exit_on_fail + lazy defines whether or not the parser will ignore garbage arguments + """ + + garbage: List[int] = [] + + for idx, val in _CIterator(args): + if val in self._arghash: + garbage.append(idx.i) + arg = self._arghash[val] + + if arg.flag is True: + arg.store._data = arg.func("") + else: + idx.i += 1 + garbage.append(idx.i) + if idx.i >= len(args): + self._error(f"Failure to parse argument {val}, expected parameter, reached EOL", exit_on_fail, stderr) + + try: + arg.store._data = arg.func(args[idx.i]) + except ValueError as ve: + self._error(f"Failed to parse argument {val}; ValueError: {ve}", exit_on_fail, stderr) + + elif not lazy: + self._error("Recieved garbage argument", exit_on_fail, stderr) + + if consume: + garbage.reverse() + for di in garbage: + del args[di] + + for i in self._arguments: + i.store._parsed = True + + +def store_true(s: str) -> Literal[True]: + """Returns true when called, can be used as func for add_arg flagtypes""" + return True + + +def store_false(s: str) -> Literal[False]: + """Returns false when called, can be used as func for add_arg flagtypes""" + return False + + +def add_true_false_flag(p: Parser, name: str) -> Promise[bool]: + + pr = Promise(bool) + + p.add_arg(f"--{name}", store_true, flag=True, store=pr) + p.add_arg(f"--no-{name}", store_false, flag=True, store=pr) + + return pr diff --git a/main.py b/main.py index d254467..0109d00 100644 --- a/main.py +++ b/main.py @@ -3,14 +3,19 @@ import warnings warnings.warn("LeXdPyK is not meant to be imported") +# Measure boot time +import time + +kernel_start = time.monotonic() + # Intro print("Booting LeXdPyK") # Import core systems -import os, importlib, sys, io, time, traceback +import os, importlib, sys, io, traceback # Import sub dependencies -import glob, json, hashlib, logging, getpass, datetime +import glob, json, hashlib, logging, getpass, datetime, argparse # Import typing support from typing import List, Optional, Any, Tuple, Dict, Union, Type, Protocol @@ -18,16 +23,11 @@ # Start Discord.py import discord, asyncio -if __name__ == "__main__": - # Start Logging - logger = logging.getLogger('discord') - logger.setLevel(logging.INFO) - handler = logging.FileHandler(filename='discord.log', encoding='utf-8', mode='w') - handler.setFormatter(logging.Formatter('%(asctime)s:%(levelname)s:%(name)s: %(message)s')) - logger.addHandler(handler) - - # Get token from environment variables. - TOKEN: Optional[str] = os.environ.get('SONNET_TOKEN') or os.environ.get('RHEA_TOKEN') +# Initialize logger +logger = logging.getLogger('discord') +handler = logging.FileHandler(filename='discord.log', encoding='utf-8', mode='w') +handler.setFormatter(logging.Formatter('%(asctime)s:%(levelname)s:%(name)s: %(message)s')) +logger.addHandler(handler) # Initialize kernel workspace sys.path.insert(1, os.getcwd() + '/cmds') @@ -78,9 +78,7 @@ def _decrypt(self, data: bytes) -> bytes: def encrypt(self, indata: str) -> bytes: - if type(indata) != str: raise TypeError(f"encrypt only accepts type 'str', not type `{type(indata).__name__}`") - - data: bytes = indata.encode("utf8") + data = indata.encode("utf8") data = self._encrypt(data)[::-1] data = self._encrypt(data) @@ -90,8 +88,6 @@ def encrypt(self, indata: str) -> bytes: def decrypt(self, data: bytes) -> Optional[str]: - if type(data) != bytes: raise TypeError(f"decrypt only accepts type 'bytes', not type `{type(data).__name__}`") - data = self._decrypt(data)[::-1] data = self._decrypt(data) data = self._decrypt(data)[::-1] @@ -246,20 +242,19 @@ def tree(self, dirstr: Optional[str] = None, dirlist: Optional[List[str]] = None return datamap -if __name__ == "__main__": - # Import blacklist - try: - with open("common/blacklist.json", "r", encoding="utf-8") as blacklist_file: - blacklist = json.load(blacklist_file) +# Import blacklist +try: + with open("common/blacklist.json", "r", encoding="utf-8") as blacklist_file: + blacklist = json.load(blacklist_file) - # Ensures blacklist properly init - assert isinstance(blacklist["guild"], list) - assert isinstance(blacklist["user"], list) + # Ensures blacklist properly init + assert isinstance(blacklist["guild"], list) + assert isinstance(blacklist["user"], list) - except FileNotFoundError: - blacklist = {"guild": [], "user": []} - with open("common/blacklist.json", "w", encoding="utf-8") as blacklist_file: - json.dump(blacklist, blacklist_file) +except FileNotFoundError: + blacklist = {"guild": [], "user": []} + with open("common/blacklist.json", "w", encoding="utf-8") as blacklist_file: + json.dump(blacklist, blacklist_file) # Define debug commands command_modules: List[Any] = [] @@ -493,28 +488,6 @@ def __call__(self, args: List[str] = []) -> Optional[Tuple[str, List[Exception]] debug_commands["debug-drop-commands"] = kernel_drop_cmds debug_commands["debug-toggle-logging"] = logging_toggle -if __name__ == "__main__": - # Generate tokenfile - if len(sys.argv) >= 2 and "--generate-token" in sys.argv: - tokenfile = open(".tokenfile", "wb") - encryptor = miniflip(getpass.getpass("Enter TOKEN password: ")) - tokenfile.write(encryptor.encrypt(TOKEN := getpass.getpass("Enter TOKEN: "))) - tokenfile.close() - - # Load token - if not TOKEN and os.path.isfile(".tokenfile"): - tokenfile = open(".tokenfile", "rb") - encryptor = miniflip(getpass.getpass("Enter TOKEN password: ")) - TOKEN = encryptor.decrypt(tokenfile.read()) - tokenfile.close() - if not TOKEN: - print("Invalid TOKEN password") - sys.exit(1) - - # Load command modules - if e := kernel_load_command_modules(): - print(e[0]) - # A object used to pass error messages from the kernel callers to the event handlers class errtype: @@ -845,24 +818,82 @@ async def on_member_unban(guild: discord.Guild, user: discord.User) -> None: await event_call("on-member-unban", guild, user) -# Define version info and start time -version_info: str = "LeXdPyK 1.4.6" -bot_start_time: float = time.time() +def gentoken() -> str: + + TOKEN = getpass.getpass("Enter TOKEN: ") + + passwd = getpass.getpass("Enter TOKEN password: ") + if passwd != getpass.getpass("Confirm TOKEN password: "): + print("ERROR: passwords do not match") + raise ValueError + + with open(".tokenfile", "wb") as tokenfile: + encryptor = miniflip(passwd) + tokenfile.write(encryptor.encrypt(TOKEN)) + + return TOKEN + + +# Main function, handles userland startup +def main(args: List[str]) -> int: + + parser = argparse.ArgumentParser() + parser.add_argument("--log-debug", action="store_true", help="Makes the logging module start in debug mode") + parser.add_argument("--generate-token", action="store_true", help="Discards the current token file if there is one, and generates a new encrypted tokenfile") + parsed = parser.parse_args() + + # Set Loglevel + loglevel = logging.DEBUG if parsed.log_debug else logging.INFO + logger.setLevel(loglevel) + + # Get token from environment variables. + TOKEN: Optional[str] = os.environ.get('SONNET_TOKEN') or os.environ.get('RHEA_TOKEN') + + # Generate tokenfile + if parsed.generate_token: + try: + TOKEN = gentoken() + except ValueError: + return 1 + + # Load token + if TOKEN is None and os.path.isfile(".tokenfile"): + tokenfile = open(".tokenfile", "rb") + encryptor = miniflip(getpass.getpass("Enter TOKEN password: ")) + TOKEN = encryptor.decrypt(tokenfile.read()) + tokenfile.close() + if TOKEN is None: + print("Invalid TOKEN password") + return 1 + + # Load command modules + if e := kernel_load_command_modules(): + print(e[0]) -if __name__ == "__main__": # Start bot if TOKEN: try: Client.run(TOKEN, reconnect=True) except discord.errors.LoginFailure: print("Invalid token passed") - sys.exit(1) + return 1 else: print("You need a token set in SONNET_TOKEN or RHEA_TOKEN environment variables, or a encrypted token in .tokenfile, to use sonnet") - sys.exit(1) + return 1 # Clear cache at exit for i in glob.glob("datastore/*.cache.db"): os.remove(i) print("\rCache Cleared, Thank you for Using Sonnet") + + return 0 + + +# Define version info and start time +version_info: str = "LeXdPyK 1.4.8" +bot_start_time: float = time.time() + +if __name__ == "__main__": + print(f"Booted kernel in {(time.monotonic()-kernel_start)*1000:.0f}ms") + sys.exit(main(sys.argv)) diff --git a/requirements.txt b/requirements.txt index 3f6eec2..56b0179 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ discord.py >= 1.7.3 discord.py-stubs >= 1.7.3 -cryptography >= 35.0.0 -lz4 >= 3.1.3 +cryptography >= 36.0.1 +lz4 >= 3.1.10 +typing-extensions >= 3.7.4