Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Added Type hints" #3

Merged
merged 1 commit into from
Jun 14, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 45 additions & 54 deletions cogs/afterhours/afterhours.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,35 @@

A special cog to handle the special cases for this channel.
"""
from asyncio.tasks import Task
import os
import re
import logging
import asyncio
from typing import List, Optional, Tuple, Union
from typing import List, Optional, Tuple
from datetime import datetime, timedelta
from cogs.starboard.starboard_entry import StarboardEntry
import discord
from discord.ext import commands
from discord.guild import Guild
from discord.message import Message
from discord.role import Role
from redbot.core import Config, checks, commands, data_manager
from redbot.core.bot import Red
from redbot.core.commands.commands import Cog
from redbot.core.commands.context import Context
from redbot.core.config import Group, Value
from redbot.core.utils.chat_formatting import humanize_timedelta

# Basic constants
AH_CHANNEL: str = "after-hours"
KEY_CTX_CHANNEL_ID: str = "channelId"
KEY_CHANNEL_IDS: str = "channelIds"
KEY_ROLE_ID: str = "roleId"
STARBOARD: str = "highlights"
DELETE_TIME: int = 32 * 60 * 60
SLEEP_TIME: int = 60 * 60
AH_CHANNEL = "after-hours"
KEY_CTX_CHANNEL_ID = "channelId"
KEY_CHANNEL_IDS = "channelIds"
KEY_ROLE_ID = "roleId"
STARBOARD = "highlights"
DELETE_TIME = 32 * 60 * 60
SLEEP_TIME = 60 * 60

# Logging
KEY_LAST_MSG_TIMESTAMPS: str = "lastMsgTimestamps"
KEY_LAST_MSG_TIMESTAMPS = "lastMsgTimestamps"

# Auto-purging
KEY_AUTO_PURGE: str = "autoPurge"
KEY_BACKGROUND_LOOP: str = "backgroundLoop"
KEY_INACTIVE_DURATION: str = "inactiveDuration" # unit of time: seconds
KEY_AUTO_PURGE = "autoPurge"
KEY_BACKGROUND_LOOP = "backgroundLoop"
KEY_INACTIVE_DURATION = "inactiveDuration" # unit of time: seconds

# Default guild settings
DEFAULT_GUILD = {
Expand All @@ -56,23 +49,21 @@ class AfterHours(commands.Cog):
"""Special casing galore!"""

def __init__(self, bot: Red):
self.bot: Red = bot
self.config: Config = Config.get_conf(self, identifier=5842647, force_registration=True)
self.bot = bot
self.config = Config.get_conf(self, identifier=5842647, force_registration=True)
self.config.register_guild(**DEFAULT_GUILD)

# Initialize logger, and save to cog folder.
saveFolder = data_manager.cog_data_path(cog_instance=self)
self.logger: logging.Logger = logging.getLogger("red.luicogs.AfterHours")
if self.logger.level == 0:
# Prevents the self.logger from being loaded again in case of module reload.
self.logger.setLevel(logging.INFO)
logPath: str = os.path.join(saveFolder, "info.log")
handler: logging.FileHandler = logging.FileHandler(filename=logPath, encoding="utf-8", mode="a")
self.logger = logging.getLogger("red.luicogs.AfterHours")
if not self.logger.handlers:
logPath = os.path.join(saveFolder, "info.log")
handler = logging.FileHandler(filename=logPath, encoding="utf-8", mode="a")
handler.setFormatter(
logging.Formatter("%(asctime)s %(message)s", datefmt="[%d/%m/%Y %H:%M:%S]")
)
self.logger.addHandler(handler)
self.bgTask: Task = self.bot.loop.create_task(self.backgroundLoop())
self.bgTask = self.bot.loop.create_task(self.backgroundLoop())

# Cancel the background task on cog unload.
def __unload(self): # pylint: disable=invalid-name
Expand All @@ -98,15 +89,15 @@ async def checkGarbageCollect(self):
for guild in self.bot.guilds:
self.logger.debug("Checking guild %s", guild.id)
async with self.config.guild(guild).get_attr(KEY_CHANNEL_IDS)() as channels:
staleIds: list = []
staleIds = []
for channelId, data in channels.items():
self.logger.debug("Checking channel ID %s", channelId)
channel: discord.abc.GuildChannel = discord.utils.get(guild.channels, id=int(channelId))
channel = discord.utils.get(guild.channels, id=int(channelId))
if not channel:
self.logger.error("Channel ID %s doesn't exist!", channelId)
staleIds.append(channelId)
continue
creationTime: datetime = datetime.fromtimestamp(data["time"])
creationTime = datetime.fromtimestamp(data["time"])
self.logger.debug("Time difference = %s", datetime.now() - creationTime)
if datetime.now() - creationTime > timedelta(seconds=DELETE_TIME):
try:
Expand All @@ -130,9 +121,9 @@ async def checkGarbageCollect(self):

async def doAutoPurge(self, forced=False):
for guild in self.bot.guilds:
guildConfig: Group = self.config.guild(guild) #??? may be wrong group
autoPurgeConfig: Union[Value, Group] = guildConfig.get_attr(KEY_AUTO_PURGE)
autoPurgeInactiveDurationConfig: Union[Value, Group] = autoPurgeConfig.get_attr(KEY_INACTIVE_DURATION)
guildConfig = self.config.guild(guild)
autoPurgeConfig = guildConfig.get_attr(KEY_AUTO_PURGE)
autoPurgeInactiveDurationConfig = autoPurgeConfig.get_attr(KEY_INACTIVE_DURATION)

if not forced and await autoPurgeConfig.get_attr(KEY_BACKGROUND_LOOP)() is False:
self.logger.debug(
Expand Down Expand Up @@ -172,11 +163,11 @@ async def doAutoPurge(self, forced=False):
)

async with guildConfig.get_attr(KEY_LAST_MSG_TIMESTAMPS)() as lastMsgTimestamps:
for member in ahRole.members:
for member in ahRole.members:
if not member.bot:
memberId = str(member.id)
if memberId in lastMsgTimestamps:
lastMsgTime: datetime = datetime.fromtimestamp(lastMsgTimestamps[memberId])
lastMsgTime = datetime.fromtimestamp(lastMsgTimestamps[memberId])
if datetime.now() - lastMsgTime > inactiveDurationTimeDelta:
inactiveMembers.append((member, lastMsgTime))
else:
Expand All @@ -192,8 +183,8 @@ async def doAutoPurge(self, forced=False):
async with guildConfig.get_attr(KEY_LAST_MSG_TIMESTAMPS)() as lastMsgTimestamps:
for inactiveMember, lastMsgTime in inactiveMembers:
# obtain information
memberName: str = inactiveMember.name
memberDiscriminator: str = inactiveMember.discriminator
memberName = inactiveMember.name
memberDiscriminator = inactiveMember.discriminator
memberId = str(inactiveMember.id)
# purge this inactive member
await inactiveMember.remove_roles(ahRole, reason="AfterHours auto-purge")
Expand Down Expand Up @@ -230,14 +221,14 @@ async def getContext(self, channel: discord.TextChannel):
ctx: Context
The context needed to send messages and invoke methods from other cogs.
"""
ctxGuild: Guild = channel.guild
ctxChannelId: Union[Value, Group] = await self.config.guild(ctxGuild).get_attr(KEY_CTX_CHANNEL_ID)()
ctxChannel: discord.abc.GuildChannel = discord.utils.get(ctxGuild.channels, id=ctxChannelId)
ctxGuild = channel.guild
ctxChannelId = await self.config.guild(ctxGuild).get_attr(KEY_CTX_CHANNEL_ID)()
ctxChannel = discord.utils.get(ctxGuild.channels, id=ctxChannelId)
if not ctxChannel:
self.logger.error("Cannot find channel to construct context!")
return None
async for message in ctxChannel.history(limit=1):
lastMessage: Message = message # may be a string?
lastMessage = message
return await self.bot.get_context(lastMessage)

async def makeHighlightChanges(
Expand All @@ -255,7 +246,7 @@ async def makeHighlightChanges(
Indicate whether we want to remove the changes. Defaults to False.
"""
self.logger.info("Applying/removing Highlight exceptions, remove=%s", remove)
cog: Cog = self.bot.get_cog("Highlight")
cog = self.bot.get_cog("Highlight")
if not cog:
self.logger.error("Highlight not loaded. skipping")
return
Expand All @@ -280,13 +271,13 @@ async def makeStarboardChanges(
Indicate whether we want to remove the changes. Defaults to False.
"""
self.logger.info("Applying/removing Starboard exceptions, remove=%s", remove)
sbCog: Cog = self.bot.get_cog("Starboard")
sbCog = self.bot.get_cog("Starboard")
if not sbCog:
self.logger.error("Starboard not loaded. skipping")
return

try:
starboard: StarboardEntry = sbCog.starboards[ctx.guild.id]["highlights"] # may be wrong type
starboard = sbCog.starboards[ctx.guild.id]["highlights"]
except KeyError:
self.logger.error("Cannot get the starboard!")
else:
Expand All @@ -297,7 +288,7 @@ async def makeStarboardChanges(
else:
await ctx.invoke(sbCog.blacklist_add, starboard=starboard, channel_or_role=channel)

async def notifyChannel(self, ctx: Context, remove=False):
async def notifyChannel(self, ctx, remove=False):
if remove:
await ctx.send(f":information_source: **{AH_CHANNEL} removed, removing exceptions**")
else:
Expand All @@ -318,7 +309,7 @@ async def makeWordFilterChanges(
Indicate whether we want to remove the changes. Defaults to False.
"""
self.logger.info("Applying/removing WordFilter exceptions, remove=%s", remove)
cog: Cog = self.bot.get_cog("WordFilter")
cog = self.bot.get_cog("WordFilter")
if not cog:
self.logger.error("WordFilter not loaded. skipping")
return
Expand Down Expand Up @@ -373,7 +364,7 @@ async def handleChannelDelete(self, channel: discord.abc.GuildChannel):
del channelIds[str(channel.id)]

async def saveMessageTimestamp(self, message: discord.Message, timestamp: float):
guildConfig: Group = self.config.guild(message.guild)
guildConfig = self.config.guild(message.guild)

async with guildConfig.get_attr(KEY_CHANNEL_IDS)() as channels:
if str(message.channel.id) not in channels:
Expand Down Expand Up @@ -447,7 +438,7 @@ async def afterHoursRemoveRole(self, ctx: Context):
return

# check if user has roles
rolesList: List[Role] = ctx.author.roles
rolesList = ctx.author.roles
if role not in rolesList:
await ctx.send(f"You do not have the role {role.name}")
return
Expand Down Expand Up @@ -523,9 +514,9 @@ async def afterHoursAutoPurgeInactiveDuration(self, ctx: Context, *, duration: O
Leave blank to check the current settings.
"""

guildConfig: Group = self.config.guild(ctx.guild)
autoPurgeConfig: Union[Value, Group] = guildConfig.get_attr(KEY_AUTO_PURGE)
autoPurgeInactiveDurationConfig: Union[Value, Group] = autoPurgeConfig.get_attr(KEY_INACTIVE_DURATION)
guildConfig = self.config.guild(ctx.guild)
autoPurgeConfig = guildConfig.get_attr(KEY_AUTO_PURGE)
autoPurgeInactiveDurationConfig = autoPurgeConfig.get_attr(KEY_INACTIVE_DURATION)

if duration:
if duration == "0":
Expand Down Expand Up @@ -556,12 +547,12 @@ async def afterHoursAutoPurgeInactiveDuration(self, ctx: Context, *, duration: O
await ctx.send("Invalid duration.")
return
else:
groups: Tuple = match.groups()
groups = match.groups()
years, months, weeks, days, hours, minutes, seconds = [
int(x) if x else 0 for x in groups
]

totalSeconds: int = (
totalSeconds = (
years * 365 * 24 * 60 * 60
+ months * 30 * 24 * 60 * 60
+ weeks * 7 * 24 * 60 * 60
Expand Down