Skip to content

Commit

Permalink
Fix deprecation warning raised by out of loop asyncio.gather calls (#954
Browse files Browse the repository at this point in the history
)
  • Loading branch information
FasterSpeeding authored Jan 1, 2022
1 parent 8ec1cd2 commit 81efb9f
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 115 deletions.
1 change: 1 addition & 0 deletions changes/954.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix deprecation warnings raised by usage of asyncio.gather outside of an active event loop in GatewayBot.run
151 changes: 79 additions & 72 deletions hikari/impl/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,82 @@
_LOGGER: typing.Final[logging.Logger] = logging.getLogger("hikari.bot")


async def _gather(coros: typing.Iterator[typing.Awaitable[typing.Any]]) -> None:
# Calling asyncio.gather outside of a running event loop isn't safe and
# will lead to RuntimeErrors in later versions of python, so this call is
# kept within a coroutine function.
await asyncio.gather(*coros)


def _destroy_loop(loop: asyncio.AbstractEventLoop) -> None:
async def murder(future: asyncio.Future[typing.Any]) -> None:
# These include _GatheringFuture which must be awaited if the children
# throw an asyncio.CancelledError, otherwise it will spam logs with warnings
# about exceptions not being retrieved before GC.
try:
_LOGGER.log(ux.TRACE, "killing %s", future)
future.cancel()
await future
except asyncio.CancelledError:
pass
except Exception as ex:
loop.call_exception_handler(
{
"message": "Future raised unexpected exception after requesting cancellation",
"exception": ex,
"future": future,
}
)

remaining_tasks = [t for t in asyncio.all_tasks(loop) if not t.done()]

if remaining_tasks:
_LOGGER.debug("terminating %s remaining tasks forcefully", len(remaining_tasks))
loop.run_until_complete(_gather((murder(task) for task in remaining_tasks)))
else:
_LOGGER.debug("No remaining tasks exist, good job!")

if sys.version_info >= (3, 9):
_LOGGER.debug("shutting down default executor")
try:
# This seems to raise a NotImplementedError when running with uvloop.
loop.run_until_complete(loop.shutdown_default_executor())
except NotImplementedError:
pass

_LOGGER.debug("shutting down asyncgens")
loop.run_until_complete(loop.shutdown_asyncgens())

_LOGGER.debug("closing event loop")
loop.close()
# Closed loops cannot be re-used so it should also be un-set.
asyncio.set_event_loop(None)


def _validate_activity(activity: undefined.UndefinedNoneOr[presences.Activity]) -> None:
# This seems to cause confusion for a lot of people, so lets add some warnings into the mix.

if activity is undefined.UNDEFINED or activity is None:
return

# If you ever change where this is called from, make sure to check the stacklevels are correct
# or the code preview in the warning will be wrong...
if activity.type is presences.ActivityType.CUSTOM:
warnings.warn(
"The CUSTOM activity type is not supported by bots at the time of writing, and may therefore not have "
"any effect if used.",
category=errors.HikariWarning,
stacklevel=3,
)
elif activity.type is presences.ActivityType.STREAMING and activity.url is None:
warnings.warn(
"The STREAMING activity type requires a 'url' parameter pointing to a valid Twitch or YouTube video "
"URL to be specified on the activity for the presence update to have any effect.",
category=errors.HikariWarning,
stacklevel=3,
)


class GatewayBot(traits.GatewayBotAware):
"""Basic auto-sharding bot implementation.
Expand Down Expand Up @@ -811,7 +887,7 @@ def handle_os_interrupt(signum: int, frame: typing.Optional[types.FrameType]) ->
pass

if close_loop:
self._destroy_loop(loop)
_destroy_loop(loop)

_LOGGER.info("successfully terminated")

Expand Down Expand Up @@ -883,7 +959,7 @@ async def start(
if shard_ids is not None and shard_count is None:
raise TypeError("'shard_ids' must be passed with 'shard_count'")

self._validate_activity(activity)
_validate_activity(activity)

# Dispatch the update checker, the sharding requirements checker, and dispatch
# the starting event together to save a little time on startup.
Expand Down Expand Up @@ -1204,7 +1280,7 @@ async def update_presence(
afk: undefined.UndefinedOr[bool] = undefined.UNDEFINED,
) -> None:
self._check_if_alive()
self._validate_activity(activity)
_validate_activity(activity)

coros = [
s.update_presence(status=status, activity=activity, idle_since=idle_since, afk=afk)
Expand Down Expand Up @@ -1290,72 +1366,3 @@ async def _start_one_shard(
return new_shard

raise errors.GatewayError(f"shard {shard_id} shut down immediately when starting")

@staticmethod
def _destroy_loop(loop: asyncio.AbstractEventLoop) -> None:
async def murder(future: asyncio.Future[typing.Any]) -> None:
# These include _GatheringFuture which must be awaited if the children
# throw an asyncio.CancelledError, otherwise it will spam logs with warnings
# about exceptions not being retrieved before GC.
try:
_LOGGER.log(ux.TRACE, "killing %s", future)
future.cancel()
await future
except asyncio.CancelledError:
pass
except Exception as ex:
loop.call_exception_handler(
{
"message": "Future raised unexpected exception after requesting cancellation",
"exception": ex,
"future": future,
}
)

remaining_tasks = [t for t in asyncio.all_tasks(loop) if not t.done()]

if remaining_tasks:
_LOGGER.debug("terminating %s remaining tasks forcefully", len(remaining_tasks))
loop.run_until_complete(asyncio.gather(*(murder(task) for task in remaining_tasks)))
else:
_LOGGER.debug("No remaining tasks exist, good job!")

if sys.version_info >= (3, 9):
_LOGGER.debug("shutting down default executor")
try:
# This seems to raise a NotImplementedError when running with uvloop.
loop.run_until_complete(loop.shutdown_default_executor())
except NotImplementedError:
pass

_LOGGER.debug("shutting down asyncgens")
loop.run_until_complete(loop.shutdown_asyncgens())

_LOGGER.debug("closing event loop")
loop.close()
# Closed loops cannot be re-used so it should also be un-set.
asyncio.set_event_loop(None)

@staticmethod
def _validate_activity(activity: undefined.UndefinedNoneOr[presences.Activity]) -> None:
# This seems to cause confusion for a lot of people, so lets add some warnings into the mix.

if activity is undefined.UNDEFINED or activity is None:
return

# If you ever change where this is called from, make sure to check the stacklevels are correct
# or the code preview in the warning will be wrong...
if activity.type is presences.ActivityType.CUSTOM:
warnings.warn(
"The CUSTOM activity type is not supported by bots at the time of writing, and may therefore not have "
"any effect if used.",
category=errors.HikariWarning,
stacklevel=3,
)
elif activity.type is presences.ActivityType.STREAMING and activity.url is None:
warnings.warn(
"The STREAMING activity type requires a 'url' parameter pointing to a valid Twitch or YouTube video "
"URL to be specified on the activity for the presence update to have any effect.",
category=errors.HikariWarning,
stacklevel=3,
)
90 changes: 47 additions & 43 deletions tests/hikari/impl/test_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,51 @@
from hikari.internal import ux


@pytest.mark.parametrize("activity", [undefined.UNDEFINED, None])
def test_validate_activity_when_no_activity(activity):
with mock.patch.object(warnings, "warn") as warn:
bot_impl._validate_activity(activity)

warn.assert_not_called()


def test_validate_activity_when_type_is_custom():
activity = presences.Activity(name="test", type=presences.ActivityType.CUSTOM)

with mock.patch.object(warnings, "warn") as warn:
bot_impl._validate_activity(activity)

warn.assert_called_once_with(
"The CUSTOM activity type is not supported by bots at the time of writing, and may therefore not have "
"any effect if used.",
category=errors.HikariWarning,
stacklevel=3,
)


def test_validate_activity_when_type_is_streaming_but_no_url():
activity = presences.Activity(name="test", url=None, type=presences.ActivityType.STREAMING)

with mock.patch.object(warnings, "warn") as warn:
bot_impl._validate_activity(activity)

warn.assert_called_once_with(
"The STREAMING activity type requires a 'url' parameter pointing to a valid Twitch or YouTube video "
"URL to be specified on the activity for the presence update to have any effect.",
category=errors.HikariWarning,
stacklevel=3,
)


def test_validate_activity_when_no_warning():
activity = presences.Activity(name="test", type=presences.ActivityType.PLAYING)

with mock.patch.object(warnings, "warn") as warn:
bot_impl._validate_activity(activity)

warn.assert_not_called()


class TestGatewayBot:
@pytest.fixture()
def cache(self):
Expand Down Expand Up @@ -575,7 +620,7 @@ def test_run_with_close_loop(self, bot):
stack.enter_context(mock.patch.object(bot_impl.GatewayBot, "start", new=mock.Mock()))
stack.enter_context(mock.patch.object(bot_impl.GatewayBot, "join", new=mock.Mock()))
stack.enter_context(mock.patch.object(bot_impl.GatewayBot, "_close", new=mock.Mock()))
destroy_loop = stack.enter_context(mock.patch.object(bot_impl.GatewayBot, "_destroy_loop"))
destroy_loop = stack.enter_context(mock.patch.object(bot_impl, "_destroy_loop"))
loop = stack.enter_context(mock.patch.object(aio, "get_or_make_loop")).return_value

with stack:
Expand Down Expand Up @@ -726,7 +771,7 @@ async def test_update_presence(self, bot):

with mock.patch.object(bot_impl.GatewayBot, "_check_if_alive") as check_if_alive:
with mock.patch.object(aio, "all_of") as all_of:
with mock.patch.object(bot_impl.GatewayBot, "_validate_activity") as validate_activity:
with mock.patch.object(bot_impl, "_validate_activity") as validate_activity:
await bot.update_presence(status=status, activity=activity, idle_since=idle_since, afk=afk)

check_if_alive.assert_called_once_with()
Expand Down Expand Up @@ -857,44 +902,3 @@ async def test_start_one_shard_when_not_alive(self, bot):
url="https://some.website",
closing_event=bot._closing_event,
)

@pytest.mark.parametrize("activity", [undefined.UNDEFINED, None])
def test_validate_activity_when_no_activity(self, bot, activity):
with mock.patch.object(warnings, "warn") as warn:
bot._validate_activity(activity)

warn.assert_not_called()

def test_validate_activity_when_type_is_custom(self, bot):
activity = presences.Activity(name="test", type=presences.ActivityType.CUSTOM)

with mock.patch.object(warnings, "warn") as warn:
bot._validate_activity(activity)

warn.assert_called_once_with(
"The CUSTOM activity type is not supported by bots at the time of writing, and may therefore not have "
"any effect if used.",
category=errors.HikariWarning,
stacklevel=3,
)

def test_validate_activity_when_type_is_streaming_but_no_url(self, bot):
activity = presences.Activity(name="test", url=None, type=presences.ActivityType.STREAMING)

with mock.patch.object(warnings, "warn") as warn:
bot._validate_activity(activity)

warn.assert_called_once_with(
"The STREAMING activity type requires a 'url' parameter pointing to a valid Twitch or YouTube video "
"URL to be specified on the activity for the presence update to have any effect.",
category=errors.HikariWarning,
stacklevel=3,
)

def test_validate_activity_when_no_warning(self, bot):
activity = presences.Activity(name="test", type=presences.ActivityType.PLAYING)

with mock.patch.object(warnings, "warn") as warn:
bot._validate_activity(activity)

warn.assert_not_called()

0 comments on commit 81efb9f

Please sign in to comment.