Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deprecation warning raised by out of loop asyncio.gather calls #954

Merged
merged 8 commits into from
Jan 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/954.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix deprecation warnings raised by usage of asyncio.gather outside of an active event loop in GatewayBot.run
151 changes: 79 additions & 72 deletions hikari/impl/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,82 @@
_LOGGER: typing.Final[logging.Logger] = logging.getLogger("hikari.bot")


async def _gather(coros: typing.Iterator[typing.Awaitable[typing.Any]]) -> None:
# Calling asyncio.gather outside of a running event loop isn't safe and
# will lead to RuntimeErrors in later versions of python, so this call is
# kept within a coroutine function.
await asyncio.gather(*coros)


def _destroy_loop(loop: asyncio.AbstractEventLoop) -> None:
async def murder(future: asyncio.Future[typing.Any]) -> None:
# These include _GatheringFuture which must be awaited if the children
# throw an asyncio.CancelledError, otherwise it will spam logs with warnings
# about exceptions not being retrieved before GC.
try:
_LOGGER.log(ux.TRACE, "killing %s", future)
future.cancel()
await future
except asyncio.CancelledError:
pass
except Exception as ex:
loop.call_exception_handler(
{
"message": "Future raised unexpected exception after requesting cancellation",
"exception": ex,
"future": future,
}
)

remaining_tasks = [t for t in asyncio.all_tasks(loop) if not t.done()]

if remaining_tasks:
_LOGGER.debug("terminating %s remaining tasks forcefully", len(remaining_tasks))
loop.run_until_complete(_gather((murder(task) for task in remaining_tasks)))
else:
_LOGGER.debug("No remaining tasks exist, good job!")

if sys.version_info >= (3, 9):
_LOGGER.debug("shutting down default executor")
try:
# This seems to raise a NotImplementedError when running with uvloop.
loop.run_until_complete(loop.shutdown_default_executor())
except NotImplementedError:
pass

_LOGGER.debug("shutting down asyncgens")
loop.run_until_complete(loop.shutdown_asyncgens())

_LOGGER.debug("closing event loop")
loop.close()
# Closed loops cannot be re-used so it should also be un-set.
asyncio.set_event_loop(None)


def _validate_activity(activity: undefined.UndefinedNoneOr[presences.Activity]) -> None:
# This seems to cause confusion for a lot of people, so lets add some warnings into the mix.

if activity is undefined.UNDEFINED or activity is None:
return

# If you ever change where this is called from, make sure to check the stacklevels are correct
# or the code preview in the warning will be wrong...
if activity.type is presences.ActivityType.CUSTOM:
warnings.warn(
"The CUSTOM activity type is not supported by bots at the time of writing, and may therefore not have "
"any effect if used.",
category=errors.HikariWarning,
stacklevel=3,
)
elif activity.type is presences.ActivityType.STREAMING and activity.url is None:
warnings.warn(
"The STREAMING activity type requires a 'url' parameter pointing to a valid Twitch or YouTube video "
"URL to be specified on the activity for the presence update to have any effect.",
category=errors.HikariWarning,
stacklevel=3,
)


class GatewayBot(traits.GatewayBotAware):
"""Basic auto-sharding bot implementation.

Expand Down Expand Up @@ -811,7 +887,7 @@ def handle_os_interrupt(signum: int, frame: typing.Optional[types.FrameType]) ->
pass

if close_loop:
self._destroy_loop(loop)
_destroy_loop(loop)

_LOGGER.info("successfully terminated")

Expand Down Expand Up @@ -883,7 +959,7 @@ async def start(
if shard_ids is not None and shard_count is None:
raise TypeError("'shard_ids' must be passed with 'shard_count'")

self._validate_activity(activity)
_validate_activity(activity)

# Dispatch the update checker, the sharding requirements checker, and dispatch
# the starting event together to save a little time on startup.
Expand Down Expand Up @@ -1204,7 +1280,7 @@ async def update_presence(
afk: undefined.UndefinedOr[bool] = undefined.UNDEFINED,
) -> None:
self._check_if_alive()
self._validate_activity(activity)
_validate_activity(activity)

coros = [
s.update_presence(status=status, activity=activity, idle_since=idle_since, afk=afk)
Expand Down Expand Up @@ -1290,72 +1366,3 @@ async def _start_one_shard(
return new_shard

raise errors.GatewayError(f"shard {shard_id} shut down immediately when starting")

@staticmethod
def _destroy_loop(loop: asyncio.AbstractEventLoop) -> None:
async def murder(future: asyncio.Future[typing.Any]) -> None:
# These include _GatheringFuture which must be awaited if the children
# throw an asyncio.CancelledError, otherwise it will spam logs with warnings
# about exceptions not being retrieved before GC.
try:
_LOGGER.log(ux.TRACE, "killing %s", future)
future.cancel()
await future
except asyncio.CancelledError:
pass
except Exception as ex:
loop.call_exception_handler(
{
"message": "Future raised unexpected exception after requesting cancellation",
"exception": ex,
"future": future,
}
)

remaining_tasks = [t for t in asyncio.all_tasks(loop) if not t.done()]

if remaining_tasks:
_LOGGER.debug("terminating %s remaining tasks forcefully", len(remaining_tasks))
loop.run_until_complete(asyncio.gather(*(murder(task) for task in remaining_tasks)))
else:
_LOGGER.debug("No remaining tasks exist, good job!")

if sys.version_info >= (3, 9):
_LOGGER.debug("shutting down default executor")
try:
# This seems to raise a NotImplementedError when running with uvloop.
loop.run_until_complete(loop.shutdown_default_executor())
except NotImplementedError:
pass

_LOGGER.debug("shutting down asyncgens")
loop.run_until_complete(loop.shutdown_asyncgens())

_LOGGER.debug("closing event loop")
loop.close()
# Closed loops cannot be re-used so it should also be un-set.
asyncio.set_event_loop(None)

@staticmethod
def _validate_activity(activity: undefined.UndefinedNoneOr[presences.Activity]) -> None:
# This seems to cause confusion for a lot of people, so lets add some warnings into the mix.

if activity is undefined.UNDEFINED or activity is None:
return

# If you ever change where this is called from, make sure to check the stacklevels are correct
# or the code preview in the warning will be wrong...
if activity.type is presences.ActivityType.CUSTOM:
warnings.warn(
"The CUSTOM activity type is not supported by bots at the time of writing, and may therefore not have "
"any effect if used.",
category=errors.HikariWarning,
stacklevel=3,
)
elif activity.type is presences.ActivityType.STREAMING and activity.url is None:
warnings.warn(
"The STREAMING activity type requires a 'url' parameter pointing to a valid Twitch or YouTube video "
"URL to be specified on the activity for the presence update to have any effect.",
category=errors.HikariWarning,
stacklevel=3,
)
90 changes: 47 additions & 43 deletions tests/hikari/impl/test_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,51 @@
from hikari.internal import ux


@pytest.mark.parametrize("activity", [undefined.UNDEFINED, None])
def test_validate_activity_when_no_activity(activity):
with mock.patch.object(warnings, "warn") as warn:
bot_impl._validate_activity(activity)

warn.assert_not_called()


def test_validate_activity_when_type_is_custom():
activity = presences.Activity(name="test", type=presences.ActivityType.CUSTOM)

with mock.patch.object(warnings, "warn") as warn:
bot_impl._validate_activity(activity)

warn.assert_called_once_with(
"The CUSTOM activity type is not supported by bots at the time of writing, and may therefore not have "
"any effect if used.",
category=errors.HikariWarning,
stacklevel=3,
)


def test_validate_activity_when_type_is_streaming_but_no_url():
activity = presences.Activity(name="test", url=None, type=presences.ActivityType.STREAMING)

with mock.patch.object(warnings, "warn") as warn:
bot_impl._validate_activity(activity)

warn.assert_called_once_with(
"The STREAMING activity type requires a 'url' parameter pointing to a valid Twitch or YouTube video "
"URL to be specified on the activity for the presence update to have any effect.",
category=errors.HikariWarning,
stacklevel=3,
)


def test_validate_activity_when_no_warning():
activity = presences.Activity(name="test", type=presences.ActivityType.PLAYING)

with mock.patch.object(warnings, "warn") as warn:
bot_impl._validate_activity(activity)

warn.assert_not_called()


class TestGatewayBot:
@pytest.fixture()
def cache(self):
Expand Down Expand Up @@ -575,7 +620,7 @@ def test_run_with_close_loop(self, bot):
stack.enter_context(mock.patch.object(bot_impl.GatewayBot, "start", new=mock.Mock()))
stack.enter_context(mock.patch.object(bot_impl.GatewayBot, "join", new=mock.Mock()))
stack.enter_context(mock.patch.object(bot_impl.GatewayBot, "_close", new=mock.Mock()))
destroy_loop = stack.enter_context(mock.patch.object(bot_impl.GatewayBot, "_destroy_loop"))
destroy_loop = stack.enter_context(mock.patch.object(bot_impl, "_destroy_loop"))
loop = stack.enter_context(mock.patch.object(aio, "get_or_make_loop")).return_value

with stack:
Expand Down Expand Up @@ -726,7 +771,7 @@ async def test_update_presence(self, bot):

with mock.patch.object(bot_impl.GatewayBot, "_check_if_alive") as check_if_alive:
with mock.patch.object(aio, "all_of") as all_of:
with mock.patch.object(bot_impl.GatewayBot, "_validate_activity") as validate_activity:
with mock.patch.object(bot_impl, "_validate_activity") as validate_activity:
await bot.update_presence(status=status, activity=activity, idle_since=idle_since, afk=afk)

check_if_alive.assert_called_once_with()
Expand Down Expand Up @@ -857,44 +902,3 @@ async def test_start_one_shard_when_not_alive(self, bot):
url="https://some.website",
closing_event=bot._closing_event,
)

@pytest.mark.parametrize("activity", [undefined.UNDEFINED, None])
def test_validate_activity_when_no_activity(self, bot, activity):
with mock.patch.object(warnings, "warn") as warn:
bot._validate_activity(activity)

warn.assert_not_called()

def test_validate_activity_when_type_is_custom(self, bot):
activity = presences.Activity(name="test", type=presences.ActivityType.CUSTOM)

with mock.patch.object(warnings, "warn") as warn:
bot._validate_activity(activity)

warn.assert_called_once_with(
"The CUSTOM activity type is not supported by bots at the time of writing, and may therefore not have "
"any effect if used.",
category=errors.HikariWarning,
stacklevel=3,
)

def test_validate_activity_when_type_is_streaming_but_no_url(self, bot):
activity = presences.Activity(name="test", url=None, type=presences.ActivityType.STREAMING)

with mock.patch.object(warnings, "warn") as warn:
bot._validate_activity(activity)

warn.assert_called_once_with(
"The STREAMING activity type requires a 'url' parameter pointing to a valid Twitch or YouTube video "
"URL to be specified on the activity for the presence update to have any effect.",
category=errors.HikariWarning,
stacklevel=3,
)

def test_validate_activity_when_no_warning(self, bot):
activity = presences.Activity(name="test", type=presences.ActivityType.PLAYING)

with mock.patch.object(warnings, "warn") as warn:
bot._validate_activity(activity)

warn.assert_not_called()