Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Issues that were encountered during usage:
AmqpConnection
was closed due to a read/write error with the client, theAmqpConnection
object stayed registered in theAmqpServer
, polluting the server with dead connection objects.AmqpServer
had severalAmqpConnections
, during shutdown, it only properly closed half of the connections and left the other half with pending tasks, which caused very long error messages when shutting down the app.Improvements:
To resolve the first issue, the connection automatically unregisters itself when it's closed (due to the reader being closed). There are a few conditions under which a connection should be closed:
AmqpServer
is shutting down and notifies theAmqpConnection
withclose()
.To handle all of these cases properly:
AmqpServer
starts a connection, it is now done with an async callback rather than a sync callback. The async callback starts the connection and awaitsconnection.run_until_complete()
.connection.close()
is done for each connection, which will simply cancel the reader.AmqpConnection.run_until_closed()
, which awaits the reader. If the reader task was finished due to the loop ending (reader got an EOF) or an exception, the error is logged here, and a call to its ownconnection.close()
is done. If the reader was finished due to an external call toconnection.close()
, thenrun_until_done()
simply returns.The second issue was due to the
for connection in self._connections:
loop during shutdown, where theconnection.close()
would end up removing itself from theself._connections
list, basically invalidating the iterator and causing some connections to be skipped. To resolve this, tasks are created all at once and then awaited simultaneously withasyncio.gather()
.