Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(silence): allow to silence threads #3122

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
61 changes: 38 additions & 23 deletions bot/exts/moderation/silence.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
from collections import OrderedDict
from contextlib import suppress
from datetime import UTC, datetime, timedelta
from typing import Any

from async_rediscache import RedisCache
from discord import Guild, PermissionOverwrite, TextChannel, Thread, VoiceChannel
from discord.abc import Messageable
from discord.ext import commands, tasks
from discord.ext.commands import Context
from discord.utils import MISSING
Expand Down Expand Up @@ -33,6 +35,7 @@
MSG_UNSILENCE_SUCCESS = f"{constants.Emojis.check_mark} unsilenced {{channel}}."

TextOrVoiceChannel = TextChannel | VoiceChannel
SilenceableChannel = TextChannel | VoiceChannel | Thread

VOICE_CHANNELS = {
constants.Channels.code_help_voice_0: constants.Channels.code_help_chat_0,
Expand All @@ -57,17 +60,17 @@ def __init__(self, alert_channel: TextChannel):
time=MISSING,
name=None
)
self._silenced_channels = {}
self._silenced_channels: dict[SilenceableChannel, int] = {}
self._alert_channel = alert_channel

def add_channel(self, channel: TextOrVoiceChannel) -> None:
def add_channel(self, channel: SilenceableChannel) -> None:
"""Add channel to `_silenced_channels` and start loop if not launched."""
if not self._silenced_channels:
self.start()
log.info("Starting notifier loop.")
self._silenced_channels[channel] = self._current_loop

def remove_channel(self, channel: TextChannel) -> None:
def remove_channel(self, channel: SilenceableChannel) -> None:
"""Remove channel from `_silenced_channels` and stop loop if no channels remain."""
with suppress(KeyError):
del self._silenced_channels[channel]
Expand All @@ -92,7 +95,7 @@ async def _notifier(self) -> None:
)


async def _select_lock_channel(args: OrderedDict[str, any]) -> TextOrVoiceChannel:
async def _select_lock_channel(args: OrderedDict[str, Any]) -> SilenceableChannel:
"""Passes the channel to be silenced to the resource lock."""
channel, _ = Silence.parse_silence_args(args["ctx"], args["duration_or_channel"], args["duration"])
return channel
Expand Down Expand Up @@ -130,8 +133,8 @@ async def cog_load(self) -> None:
async def send_message(
self,
message: str,
source_channel: TextChannel,
target_channel: TextOrVoiceChannel,
source_channel: Messageable,
target_channel: SilenceableChannel,
*,
alert_target: bool = False
) -> None:
Expand Down Expand Up @@ -159,7 +162,7 @@ async def send_message(
async def silence(
self,
ctx: Context,
duration_or_channel: TextOrVoiceChannel | HushDurationConverter = None,
duration_or_channel: SilenceableChannel | HushDurationConverter | None = None,
duration: HushDurationConverter = 10,
*,
kick: bool = False
Expand All @@ -178,12 +181,6 @@ async def silence(
channel_info = f"#{channel} ({channel.id})"
log.debug(f"{ctx.author} is silencing channel {channel_info}.")

# Since threads don't have specific overrides, we cannot silence them individually.
# The parent channel has to be muted or the thread should be archived.
if isinstance(channel, Thread):
await ctx.send(":x: Threads cannot be silenced.")
return

if not await self._set_silence_overwrites(channel, kick=kick):
log.info(f"Tried to silence channel {channel_info} but the channel was already silenced.")
await self.send_message(MSG_SILENCE_FAIL, ctx.channel, channel, alert_target=False)
Expand All @@ -210,12 +207,12 @@ async def silence(
@staticmethod
def parse_silence_args(
ctx: Context,
duration_or_channel: TextOrVoiceChannel | int,
duration_or_channel: SilenceableChannel | int | None,
duration: HushDurationConverter
) -> tuple[TextOrVoiceChannel, int | None]:
) -> tuple[SilenceableChannel, int | None]:
"""Helper method to parse the arguments of the silence command."""
if duration_or_channel:
if isinstance(duration_or_channel, TextChannel | VoiceChannel):
if isinstance(duration_or_channel, TextChannel | VoiceChannel | Thread):
channel = duration_or_channel
else:
channel = ctx.channel
Expand All @@ -228,8 +225,11 @@ def parse_silence_args(

return channel, duration

async def _set_silence_overwrites(self, channel: TextOrVoiceChannel, *, kick: bool = False) -> bool:
async def _set_silence_overwrites(self, channel: SilenceableChannel, *, kick: bool = False) -> bool:
"""Set silence permission overwrites for `channel` and return True if successful."""
if channel.id in self.scheduler:
return False

# Get the original channel overwrites
if isinstance(channel, TextChannel):
role = self._everyone_role
Expand All @@ -242,6 +242,11 @@ async def _set_silence_overwrites(self, channel: TextOrVoiceChannel, *, kick: bo
send_messages_in_threads=overwrite.send_messages_in_threads
)

elif isinstance(channel, Thread):
log.info(f"Silencing thread #{channel.parent}/{channel} ({channel.parent.id}/{channel.id}).")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, logging this is handled in _silence() instead so this wasn't necessary.

Suggested change
log.info(f"Silencing thread #{channel.parent}/{channel} ({channel.parent.id}/{channel.id}).")

await channel.edit(locked=True)
Snipy7374 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the thread is already locked, this will just schedule an unlock in the set amount of time. Failing is probably better to keep consistent with how it behaves with text channels.

return True

else:
role = self._verified_voice_role
overwrite = channel.overwrites_for(role)
Expand All @@ -250,7 +255,7 @@ async def _set_silence_overwrites(self, channel: TextOrVoiceChannel, *, kick: bo
prev_overwrites.update(connect=overwrite.connect)

# Stop if channel was already silenced
if channel.id in self.scheduler or all(val is False for val in prev_overwrites.values()):
if all(val is False for val in prev_overwrites.values()):
return False

# Set new permissions, store
Expand All @@ -260,7 +265,7 @@ async def _set_silence_overwrites(self, channel: TextOrVoiceChannel, *, kick: bo

return True

async def _schedule_unsilence(self, ctx: Context, channel: TextOrVoiceChannel, duration: int | None) -> None:
async def _schedule_unsilence(self, ctx: Context, channel: SilenceableChannel, duration: int | None) -> None:
"""Schedule `ctx.channel` to be unsilenced if `duration` is not None."""
if duration is None:
await self.unsilence_timestamps.set(channel.id, -1)
Expand All @@ -270,7 +275,7 @@ async def _schedule_unsilence(self, ctx: Context, channel: TextOrVoiceChannel, d
await self.unsilence_timestamps.set(channel.id, unsilence_time.timestamp())

@commands.command(aliases=("unhush",))
async def unsilence(self, ctx: Context, *, channel: TextOrVoiceChannel = None) -> None:
async def unsilence(self, ctx: Context, *, channel: SilenceableChannel | None = None) -> None:
"""
Unsilence the given channel if given, else the current one.

Expand All @@ -282,7 +287,7 @@ async def unsilence(self, ctx: Context, *, channel: TextOrVoiceChannel = None) -
await self._unsilence_wrapper(channel, ctx)

@lock_arg(LOCK_NAMESPACE, "channel", raise_error=True)
async def _unsilence_wrapper(self, channel: TextOrVoiceChannel, ctx: Context | None = None) -> None:
async def _unsilence_wrapper(self, channel: SilenceableChannel, ctx: Context | None = None) -> None:
"""
Unsilence `channel` and send a success/failure message to ctx.channel.

Expand All @@ -297,6 +302,10 @@ async def _unsilence_wrapper(self, channel: TextOrVoiceChannel, ctx: Context | N
if isinstance(channel, VoiceChannel):
overwrite = channel.overwrites_for(self._verified_voice_role)
has_channel_overwrites = overwrite.speak is False

elif isinstance(channel, Thread):
has_channel_overwrites = False

else:
overwrite = channel.overwrites_for(self._everyone_role)
has_channel_overwrites = overwrite.send_messages is False or overwrite.add_reactions is False
Expand All @@ -310,15 +319,15 @@ async def _unsilence_wrapper(self, channel: TextOrVoiceChannel, ctx: Context | N
else:
await self.send_message(MSG_UNSILENCE_SUCCESS, msg_channel, channel, alert_target=True)

async def _unsilence(self, channel: TextOrVoiceChannel) -> bool:
async def _unsilence(self, channel: SilenceableChannel) -> bool:
"""
Unsilence `channel`.

If `channel` has a silence task scheduled or has its previous overwrites cached, unsilence
it, cancel the task, and remove it from the notifier. Notify admins if it has a task but
not cached overwrites.

Return `True` if channel permissions were changed, `False` otherwise.
Return `True` if channel was unsilenced, `False` otherwise.
"""
# Get stored overwrites, and return if channel is unsilenced
prev_overwrites = await self.previous_overwrites.get(channel.id)
Expand All @@ -331,6 +340,12 @@ async def _unsilence(self, channel: TextOrVoiceChannel) -> bool:
role = self._everyone_role
overwrite = channel.overwrites_for(role)
permissions = "`Send Messages` and `Add Reactions`"
elif isinstance(channel, Thread):
await channel.edit(locked=False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above: if the thread was manually unlocked before the scheduled time, there won't be any indication of it.

log.info(f"Unsilenced thread #{channel.parent}/{channel} ({channel.parent_id}/{channel.id}).")
self.scheduler.cancel(channel.id)
self.notifier.remove_channel(channel)
return True
else:
role = self._verified_voice_role
overwrite = channel.overwrites_for(role)
Expand Down