-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PyAMQP] Connections TODO #25802
[PyAMQP] Connections TODO #25802
Conversation
API change check APIView has identified API level changes in this PR and created following API reviews. |
/azp run python - eventhub - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
@@ -507,10 +510,13 @@ def _incoming_end(self, channel, frame): | |||
""" | |||
try: | |||
self._incoming_endpoints[channel]._incoming_end(frame) # pylint:disable=protected-access | |||
self._incoming_endpoints.pop(channel) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious: For an outgoing_close, could we receive a response from the service on that channel to confirm it is closing? (i.e from the incoming_endpoint channel). Edit: Or is that not relevant since they mutually close?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if the entire Connection
was being closed, then the state would be set to END after sending after sending a CLOSE to the other side. In this case while this channel has closed, there might be other channels still open. Unless I missed it here, doesnt seem to be the case in channels http://docs.oasis-open.org/amqp/core/v1.0/csprd01/amqp-core-transport-v1.0-csprd01.html#doc-idp191152
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's actually a really good question - and why this was a TODO in the first place.
It's hard to know where to pop the channel/session/link because if it's initiated by us, we don't always wait around for the service response.
So we need to test/be able to handle:
- If we get an incoming close/end/detach, then is this in response to one we already sent? If not, we can probably just send the response frame then pop it.
- If we are sending a close/end/detach, if we pop it then we need to be able to handle additional incoming frames that are sent to it. Otherwise, do we wait until we explicitly receive the response? That can be slow and messy if there's other things going on in the connection.
/azp run python - eventhub - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
pass # TODO: channel error | ||
#self._incoming_endpoints.pop(channel) # TODO | ||
#self._outgoing_endpoints.pop(channel) # TODO | ||
end_error = AMQPError(condition=ErrorCondition.InvalidField, description=f"Invalid channel {channel}", info=None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is InvalidField the best error condition here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed it to ClientError
since if a channel is missing from the dictionary (either when the frame comes from the client itself or the service) its something the SDK didn't handle on close properly.
sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_connection.py
Outdated
Show resolved
Hide resolved
#self._outgoing_endpoints.pop(channel) # TODO | ||
end_error = AMQPError(condition=ErrorCondition.ClientError, description=f"Invalid channel {channel}", info=None) | ||
_LOGGER.error(f"Invalid channel {channel} ") | ||
self.close(error=end_error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ErrorCondition.ClientError is not a protocol-supported condition - so only use if for errors generated for the sole purpose of raising exceptions to users.
If we are actually building an AMQPError object to send in a CLOSE frame - it should use one of the protocol conditions.
@@ -305,7 +305,11 @@ async def _incoming_open(self, channel, frame): | |||
await self._outgoing_open() | |||
await self._set_state(ConnectionState.OPENED) | |||
else: | |||
pass # TODO what now...? | |||
# TODO what now...? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could remove the todo?
This pull request is protected by Check Enforcer. What is Check Enforcer?Check Enforcer helps ensure all pull requests are covered by at least one check-run (typically an Azure Pipeline). When all check-runs associated with this pull request pass then Check Enforcer itself will pass. Why am I getting this message?You are getting this message because Check Enforcer did not detect any check-runs being associated with this pull request within five minutes. This may indicate that your pull request is not covered by any pipelines and so Check Enforcer is correctly blocking the pull request being merged. What should I do now?If the check-enforcer check-run is not passing and all other check-runs associated with this PR are passing (excluding license-cla) then you could try telling Check Enforcer to evaluate your pull request again. You can do this by adding a comment to this pull request as follows: What if I am onboarding a new service?Often, new services do not have validation pipelines associated with them, in order to bootstrap pipelines for a new service, you can issue the following command as a pull request comment: |
This PR tries to address 2 TODOs for Connection
_incoming_open
: if a connection flow during Open receives an illegal state, how do we handle. Todayuamqp
&pyamqp
both ignore it. Right now I've set it that if any other state appears while opening, close. Looking at some other libraries and the amqp specs, there doesnt seem to be any defined thing that should happen if the Open process goes off path, so it might be open to the implementer ?_incoming_close
: How to handle a close frame on an invalid channel. I moved the todos around popping the channels on both incoming and outgoing since the spec mentions either party can close and then they mutually close. If the channel specified is not found, then is it an log and move on or close the entire thing with an error. Ive chosen the latter now