diff --git a/changes/954.bugfix.md b/changes/954.bugfix.md new file mode 100644 index 0000000000..f99b1f2191 --- /dev/null +++ b/changes/954.bugfix.md @@ -0,0 +1 @@ +Fix deprecation warnings raised by usage of asyncio.gather outside of an active event loop in GatewayBot.run diff --git a/hikari/impl/bot.py b/hikari/impl/bot.py index fb895cf8b8..8a9557fa71 100644 --- a/hikari/impl/bot.py +++ b/hikari/impl/bot.py @@ -74,6 +74,82 @@ _LOGGER: typing.Final[logging.Logger] = logging.getLogger("hikari.bot") +async def _gather(coros: typing.Iterator[typing.Awaitable[typing.Any]]) -> None: + # Calling asyncio.gather outside of a running event loop isn't safe and + # will lead to RuntimeErrors in later versions of python, so this call is + # kept within a coroutine function. + await asyncio.gather(*coros) + + +def _destroy_loop(loop: asyncio.AbstractEventLoop) -> None: + async def murder(future: asyncio.Future[typing.Any]) -> None: + # These include _GatheringFuture which must be awaited if the children + # throw an asyncio.CancelledError, otherwise it will spam logs with warnings + # about exceptions not being retrieved before GC. + try: + _LOGGER.log(ux.TRACE, "killing %s", future) + future.cancel() + await future + except asyncio.CancelledError: + pass + except Exception as ex: + loop.call_exception_handler( + { + "message": "Future raised unexpected exception after requesting cancellation", + "exception": ex, + "future": future, + } + ) + + remaining_tasks = [t for t in asyncio.all_tasks(loop) if not t.done()] + + if remaining_tasks: + _LOGGER.debug("terminating %s remaining tasks forcefully", len(remaining_tasks)) + loop.run_until_complete(_gather((murder(task) for task in remaining_tasks))) + else: + _LOGGER.debug("No remaining tasks exist, good job!") + + if sys.version_info >= (3, 9): + _LOGGER.debug("shutting down default executor") + try: + # This seems to raise a NotImplementedError when running with uvloop. + loop.run_until_complete(loop.shutdown_default_executor()) + except NotImplementedError: + pass + + _LOGGER.debug("shutting down asyncgens") + loop.run_until_complete(loop.shutdown_asyncgens()) + + _LOGGER.debug("closing event loop") + loop.close() + # Closed loops cannot be re-used so it should also be un-set. + asyncio.set_event_loop(None) + + +def _validate_activity(activity: undefined.UndefinedNoneOr[presences.Activity]) -> None: + # This seems to cause confusion for a lot of people, so lets add some warnings into the mix. + + if activity is undefined.UNDEFINED or activity is None: + return + + # If you ever change where this is called from, make sure to check the stacklevels are correct + # or the code preview in the warning will be wrong... + if activity.type is presences.ActivityType.CUSTOM: + warnings.warn( + "The CUSTOM activity type is not supported by bots at the time of writing, and may therefore not have " + "any effect if used.", + category=errors.HikariWarning, + stacklevel=3, + ) + elif activity.type is presences.ActivityType.STREAMING and activity.url is None: + warnings.warn( + "The STREAMING activity type requires a 'url' parameter pointing to a valid Twitch or YouTube video " + "URL to be specified on the activity for the presence update to have any effect.", + category=errors.HikariWarning, + stacklevel=3, + ) + + class GatewayBot(traits.GatewayBotAware): """Basic auto-sharding bot implementation. @@ -811,7 +887,7 @@ def handle_os_interrupt(signum: int, frame: typing.Optional[types.FrameType]) -> pass if close_loop: - self._destroy_loop(loop) + _destroy_loop(loop) _LOGGER.info("successfully terminated") @@ -883,7 +959,7 @@ async def start( if shard_ids is not None and shard_count is None: raise TypeError("'shard_ids' must be passed with 'shard_count'") - self._validate_activity(activity) + _validate_activity(activity) # Dispatch the update checker, the sharding requirements checker, and dispatch # the starting event together to save a little time on startup. @@ -1204,7 +1280,7 @@ async def update_presence( afk: undefined.UndefinedOr[bool] = undefined.UNDEFINED, ) -> None: self._check_if_alive() - self._validate_activity(activity) + _validate_activity(activity) coros = [ s.update_presence(status=status, activity=activity, idle_since=idle_since, afk=afk) @@ -1290,72 +1366,3 @@ async def _start_one_shard( return new_shard raise errors.GatewayError(f"shard {shard_id} shut down immediately when starting") - - @staticmethod - def _destroy_loop(loop: asyncio.AbstractEventLoop) -> None: - async def murder(future: asyncio.Future[typing.Any]) -> None: - # These include _GatheringFuture which must be awaited if the children - # throw an asyncio.CancelledError, otherwise it will spam logs with warnings - # about exceptions not being retrieved before GC. - try: - _LOGGER.log(ux.TRACE, "killing %s", future) - future.cancel() - await future - except asyncio.CancelledError: - pass - except Exception as ex: - loop.call_exception_handler( - { - "message": "Future raised unexpected exception after requesting cancellation", - "exception": ex, - "future": future, - } - ) - - remaining_tasks = [t for t in asyncio.all_tasks(loop) if not t.done()] - - if remaining_tasks: - _LOGGER.debug("terminating %s remaining tasks forcefully", len(remaining_tasks)) - loop.run_until_complete(asyncio.gather(*(murder(task) for task in remaining_tasks))) - else: - _LOGGER.debug("No remaining tasks exist, good job!") - - if sys.version_info >= (3, 9): - _LOGGER.debug("shutting down default executor") - try: - # This seems to raise a NotImplementedError when running with uvloop. - loop.run_until_complete(loop.shutdown_default_executor()) - except NotImplementedError: - pass - - _LOGGER.debug("shutting down asyncgens") - loop.run_until_complete(loop.shutdown_asyncgens()) - - _LOGGER.debug("closing event loop") - loop.close() - # Closed loops cannot be re-used so it should also be un-set. - asyncio.set_event_loop(None) - - @staticmethod - def _validate_activity(activity: undefined.UndefinedNoneOr[presences.Activity]) -> None: - # This seems to cause confusion for a lot of people, so lets add some warnings into the mix. - - if activity is undefined.UNDEFINED or activity is None: - return - - # If you ever change where this is called from, make sure to check the stacklevels are correct - # or the code preview in the warning will be wrong... - if activity.type is presences.ActivityType.CUSTOM: - warnings.warn( - "The CUSTOM activity type is not supported by bots at the time of writing, and may therefore not have " - "any effect if used.", - category=errors.HikariWarning, - stacklevel=3, - ) - elif activity.type is presences.ActivityType.STREAMING and activity.url is None: - warnings.warn( - "The STREAMING activity type requires a 'url' parameter pointing to a valid Twitch or YouTube video " - "URL to be specified on the activity for the presence update to have any effect.", - category=errors.HikariWarning, - stacklevel=3, - ) diff --git a/tests/hikari/impl/test_bot.py b/tests/hikari/impl/test_bot.py index f9b37dc0a1..95521b887e 100644 --- a/tests/hikari/impl/test_bot.py +++ b/tests/hikari/impl/test_bot.py @@ -46,6 +46,51 @@ from hikari.internal import ux +@pytest.mark.parametrize("activity", [undefined.UNDEFINED, None]) +def test_validate_activity_when_no_activity(activity): + with mock.patch.object(warnings, "warn") as warn: + bot_impl._validate_activity(activity) + + warn.assert_not_called() + + +def test_validate_activity_when_type_is_custom(): + activity = presences.Activity(name="test", type=presences.ActivityType.CUSTOM) + + with mock.patch.object(warnings, "warn") as warn: + bot_impl._validate_activity(activity) + + warn.assert_called_once_with( + "The CUSTOM activity type is not supported by bots at the time of writing, and may therefore not have " + "any effect if used.", + category=errors.HikariWarning, + stacklevel=3, + ) + + +def test_validate_activity_when_type_is_streaming_but_no_url(): + activity = presences.Activity(name="test", url=None, type=presences.ActivityType.STREAMING) + + with mock.patch.object(warnings, "warn") as warn: + bot_impl._validate_activity(activity) + + warn.assert_called_once_with( + "The STREAMING activity type requires a 'url' parameter pointing to a valid Twitch or YouTube video " + "URL to be specified on the activity for the presence update to have any effect.", + category=errors.HikariWarning, + stacklevel=3, + ) + + +def test_validate_activity_when_no_warning(): + activity = presences.Activity(name="test", type=presences.ActivityType.PLAYING) + + with mock.patch.object(warnings, "warn") as warn: + bot_impl._validate_activity(activity) + + warn.assert_not_called() + + class TestGatewayBot: @pytest.fixture() def cache(self): @@ -575,7 +620,7 @@ def test_run_with_close_loop(self, bot): stack.enter_context(mock.patch.object(bot_impl.GatewayBot, "start", new=mock.Mock())) stack.enter_context(mock.patch.object(bot_impl.GatewayBot, "join", new=mock.Mock())) stack.enter_context(mock.patch.object(bot_impl.GatewayBot, "_close", new=mock.Mock())) - destroy_loop = stack.enter_context(mock.patch.object(bot_impl.GatewayBot, "_destroy_loop")) + destroy_loop = stack.enter_context(mock.patch.object(bot_impl, "_destroy_loop")) loop = stack.enter_context(mock.patch.object(aio, "get_or_make_loop")).return_value with stack: @@ -726,7 +771,7 @@ async def test_update_presence(self, bot): with mock.patch.object(bot_impl.GatewayBot, "_check_if_alive") as check_if_alive: with mock.patch.object(aio, "all_of") as all_of: - with mock.patch.object(bot_impl.GatewayBot, "_validate_activity") as validate_activity: + with mock.patch.object(bot_impl, "_validate_activity") as validate_activity: await bot.update_presence(status=status, activity=activity, idle_since=idle_since, afk=afk) check_if_alive.assert_called_once_with() @@ -857,44 +902,3 @@ async def test_start_one_shard_when_not_alive(self, bot): url="https://some.website", closing_event=bot._closing_event, ) - - @pytest.mark.parametrize("activity", [undefined.UNDEFINED, None]) - def test_validate_activity_when_no_activity(self, bot, activity): - with mock.patch.object(warnings, "warn") as warn: - bot._validate_activity(activity) - - warn.assert_not_called() - - def test_validate_activity_when_type_is_custom(self, bot): - activity = presences.Activity(name="test", type=presences.ActivityType.CUSTOM) - - with mock.patch.object(warnings, "warn") as warn: - bot._validate_activity(activity) - - warn.assert_called_once_with( - "The CUSTOM activity type is not supported by bots at the time of writing, and may therefore not have " - "any effect if used.", - category=errors.HikariWarning, - stacklevel=3, - ) - - def test_validate_activity_when_type_is_streaming_but_no_url(self, bot): - activity = presences.Activity(name="test", url=None, type=presences.ActivityType.STREAMING) - - with mock.patch.object(warnings, "warn") as warn: - bot._validate_activity(activity) - - warn.assert_called_once_with( - "The STREAMING activity type requires a 'url' parameter pointing to a valid Twitch or YouTube video " - "URL to be specified on the activity for the presence update to have any effect.", - category=errors.HikariWarning, - stacklevel=3, - ) - - def test_validate_activity_when_no_warning(self, bot): - activity = presences.Activity(name="test", type=presences.ActivityType.PLAYING) - - with mock.patch.object(warnings, "warn") as warn: - bot._validate_activity(activity) - - warn.assert_not_called()