Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pyamqp] Intial TODOS Clean Up #25630

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,15 @@ def _incoming_open(self, channel, frame):
self._remote_idle_timeout_send_frame = self._idle_timeout_empty_frame_send_ratio * self._remote_idle_timeout

if frame[2] < 512: # Ensure minimum max frame size.
pass # TODO: error
#Close with error
#Codes_S_R_S_CONNECTION_01_143: [If any of the values in the received open frame are invalid then the connection shall be closed.]
#Codes_S_R_S_CONNECTION_01_220: [The error amqp:invalid-field shall be set in the error.condition field of the CLOSE frame.]
self.close(error=AMQPConnectionError(
condition=ErrorCondition.InvalidField,
description="connection_endpoint_frame_received::failed parsing OPEN frame"))
_LOGGER.error("connection_endpoint_frame_received::failed parsing OPEN frame")
else:
self._remote_max_frame_size = frame[2]
self._remote_max_frame_size = frame[2]
if self.state == ConnectionState.OPEN_SENT:
self._set_state(ConnectionState.OPENED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,12 +384,12 @@ def close(self):
except Exception as exc:
# TODO: shutdown could raise OSError, Transport endpoint is not connected if the endpoint is already
swathipil marked this conversation as resolved.
Show resolved Hide resolved
# disconnected. can we safely ignore the errors since the close operation is initiated by us.
_LOGGER.info("An error occurred when shutting down the socket: %r", exc)
_LOGGER.info("Transport endpoint is already disconnected: %r", exc)
self.sock.close()
self.sock = None
self.connected = False

def read(self, verify_frame_type=0, **kwargs): # TODO: verify frame type?
def read(self, verify_frame_type=0, **kwargs):
read = self._read
read_frame_buffer = BytesIO()
try:
Expand Down Expand Up @@ -446,7 +446,6 @@ def receive_frame(self, *args, **kwargs):
decoded = decode_empty_frame(header)
else:
decoded = decode_frame(payload)
# TODO: Catch decode error and return amqp:decode-error
return channel, decoded
except (socket.timeout, TimeoutError):
return None, None
Expand Down Expand Up @@ -508,14 +507,7 @@ def _wrap_socket_sni(self, sock, keyfile=None, certfile=None,
# Setup the right SSL version; default to optimal versions across
# ssl implementations
if ssl_version is None:
# older versions of python 2.7 and python 2.6 do not have the
# ssl.PROTOCOL_TLS defined the equivalent is ssl.PROTOCOL_SSLv23
# we default to PROTOCOL_TLS and fallback to PROTOCOL_SSLv23
# TODO: Drop this once we drop Python 2.7 support
if hasattr(ssl, 'PROTOCOL_TLS'):
ssl_version = ssl.PROTOCOL_TLS
else:
ssl_version = ssl.PROTOCOL_SSLv23
ssl_version = ssl.PROTOCOL_TLS

opts = {
'sock': sock,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,9 @@ async def _do_retryable_operation_async(self, operation, *args, **kwargs):
await asyncio.sleep(self._retry_policy.get_backoff_time(retry_settings, exc))
if exc.condition == ErrorCondition.LinkDetachForced:
await self._close_link_async() # if link level error, close and open a new link
# TODO: check if there's any other code that we want to close link?
if exc.condition in (ErrorCondition.ConnectionCloseForced, ErrorCondition.SocketError):
# if connection detach or socket error, close and open a new connection
await self.close_async()
# TODO: check if there's any other code we want to close connection
except Exception:
raise
finally:
Expand Down Expand Up @@ -383,8 +381,6 @@ async def _transfer_message_async(self, message_delivery, timeout=0):
raise RuntimeError("Message is not sent.")

async def _on_send_complete_async(self, message_delivery, reason, state):
# TODO: check whether the callback would be called in case of message expiry or link going down
# and if so handle the state in the callback
message_delivery.reason = reason
if reason == LinkDeliverySettleReason.DISPOSITION_RECEIVED:
if state and SEND_DISPOSITION_ACCEPT in state:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,15 @@ async def _incoming_open(self, channel, frame):
self.remote_idle_timeout_send_frame = self.idle_timeout_empty_frame_send_ratio * self.remote_idle_timeout

if frame[2] < 512:
pass # TODO: error
self._remote_max_frame_size = frame[2]
#Close with error
#Codes_S_R_S_CONNECTION_01_143: [If any of the values in the received open frame are invalid then the connection shall be closed.]
#Codes_S_R_S_CONNECTION_01_220: [The error amqp:invalid-field shall be set in the error.condition field of the CLOSE frame.]
await self.close(error=AMQPConnectionError(
condition=ErrorCondition.InvalidField,
description="connection_endpoint_frame_received::failed parsing OPEN frame"))
_LOGGER.error("connection_endpoint_frame_received::failed parsing OPEN frame")
else:
self._remote_max_frame_size = frame[2]
if self.state == ConnectionState.OPEN_SENT:
await self._set_state(ConnectionState.OPENED)
elif self.state == ConnectionState.HDR_EXCH:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ async def _set_state(self, new_state):
except Exception as e: # pylint: disable=broad-except
_LOGGER.error("Link state change callback failed: '%r'", e, extra=self.network_trace_params)

async def _remove_pending_deliveries(self): # TODO: move to sender
async def _remove_pending_deliveries(self):
futures = []
for delivery in self._pending_deliveries.values():
futures.append(asyncio.ensure_future(delivery.on_settled(LinkDeliverySettleReason.NOT_DELIVERED, None)))
Expand Down Expand Up @@ -189,10 +189,10 @@ async def _incoming_attach(self, frame):
_LOGGER.info("<- %r", AttachFrame(*frame), extra=self.network_trace_params)
if self._is_closed:
raise ValueError("Invalid link")
elif not frame[5] or not frame[6]: # TODO: not sure if we should check here
elif not frame[5] or not frame[6]:
_LOGGER.info("Cannot get source or target. Detaching link")
await self._remove_pending_deliveries()
await self._set_state(LinkState.DETACHED) # TODO: Send detach now?
await self._set_state(LinkState.DETACHED)
raise ValueError("Invalid link")
self.remote_handle = frame[1]
self.remote_max_message_size = frame[10]
Expand Down Expand Up @@ -264,7 +264,7 @@ async def detach(self, close=False, error=None):
return
try:
await self._check_if_closed()
await self._remove_pending_deliveries() # TODO: Keep?
await self._remove_pending_deliveries()
if self.state in [LinkState.ATTACH_SENT, LinkState.ATTACH_RCVD]:
await self._outgoing_detach(close=close, error=error)
await self._set_state(LinkState.DETACHED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,12 @@ async def receive_frame(self, *args, **kwargs):
decoded = decode_empty_frame(header)
else:
decoded = decode_frame(payload)
# TODO: Catch decode error and return amqp:decode-error
#_LOGGER.info("ICH%d <- %r", channel, decoded)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not part of the TODO, but do we want to uncomment or delete this line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ill uncomment it for now and we can review during the 'logging review' portion of the work

return channel, decoded
except (TimeoutError, socket.timeout, asyncio.IncompleteReadError, asyncio.TimeoutError):
return None, None

async def read(self, verify_frame_type=0, **kwargs): # TODO: verify frame type?
async def read(self, verify_frame_type=0, **kwargs):
async with self.socket_lock:
read_frame_buffer = BytesIO()
try:
Expand Down Expand Up @@ -483,11 +482,8 @@ async def _read(self, n, buffer=None, **kwargs): # pylint: disable=unused-argume

def close(self):
"""Do any preliminary work in shutting down the connection."""
# TODO: async close doesn't:
# 1) shutdown socket and close. --> self.sock.shutdown(socket.SHUT_RDWR) and self.sock.close()
swathipil marked this conversation as resolved.
Show resolved Hide resolved
# 2) set self.connected = False
# I think we need to do this, like in sync
self.ws.close()
self.connected = False

async def write(self, s):
"""Completely write a string to the peer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,9 @@ def _do_retryable_operation(self, operation, *args, **kwargs):
time.sleep(self._retry_policy.get_backoff_time(retry_settings, exc))
if exc.condition == ErrorCondition.LinkDetachForced:
self._close_link() # if link level error, close and open a new link
# TODO: check if there's any other code that we want to close link?
if exc.condition in (ErrorCondition.ConnectionCloseForced, ErrorCondition.SocketError):
# if connection detach or socket error, close and open a new connection
self.close()
# TODO: check if there's any other code we want to close connection
except Exception:
raise
finally:
Expand Down Expand Up @@ -457,8 +455,6 @@ def _process_send_error(message_delivery, condition, description=None, info=None
message_delivery.error = error

def _on_send_complete(self, message_delivery, reason, state):
# TODO: check whether the callback would be called in case of message expiry or link going down
# and if so handle the state in the callback
message_delivery.reason = reason
if reason == LinkDeliverySettleReason.DISPOSITION_RECEIVED:
if state and SEND_DISPOSITION_ACCEPT in state:
Expand Down
8 changes: 4 additions & 4 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/link.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def _set_state(self, new_state):
except Exception as e: # pylint: disable=broad-except
_LOGGER.error("Link state change callback failed: '%r'", e, extra=self.network_trace_params)

def _remove_pending_deliveries(self): # TODO: move to sender
def _remove_pending_deliveries(self):
for delivery in self._pending_deliveries.values():
delivery.on_settled(LinkDeliverySettleReason.NOT_DELIVERED, None)
self._pending_deliveries = {}
Expand Down Expand Up @@ -190,10 +190,10 @@ def _incoming_attach(self, frame):
_LOGGER.info("<- %r", AttachFrame(*frame), extra=self.network_trace_params)
if self._is_closed:
raise ValueError("Invalid link")
elif not frame[5] or not frame[6]: # TODO: not sure if we should source + target check here
elif not frame[5] or not frame[6]:
_LOGGER.info("Cannot get source or target. Detaching link")
self._remove_pending_deliveries()
self._set_state(LinkState.DETACHED) # TODO: Send detach now?
self._set_state(LinkState.DETACHED)
raise ValueError("Invalid link")
self.remote_handle = frame[1] # handle
self.remote_max_message_size = frame[10] # max_message_size
Expand Down Expand Up @@ -265,7 +265,7 @@ def detach(self, close=False, error=None):
return
try:
self._check_if_closed()
self._remove_pending_deliveries() # TODO: Keep?
self._remove_pending_deliveries()
if self.state in [LinkState.ATTACH_SENT, LinkState.ATTACH_RCVD]:
self._outgoing_detach(close=close, error=error)
self._set_state(LinkState.DETACHED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ def _outgoing_transfer(self, delivery):
'payload': output
}
if self.network_trace:
# TODO: whether we should move frame tracing into centralized place e.g. connection.py
_LOGGER.info("-> %r", TransferFrame(delivery_id='<pending>', **delivery.frame), extra=self.network_trace_params)
self._session._outgoing_transfer(delivery)
if delivery.transfer_state == SessionTransferState.OKAY:
Expand All @@ -131,7 +130,7 @@ def _incoming_disposition(self, frame):
if delivery:
delivery.on_settled(LinkDeliverySettleReason.DISPOSITION_RECEIVED, frame[4]) # state

def _update_pending_delivery_status(self): # TODO
def _update_pending_delivery_status(self):
now = time.time()
expired = []
for delivery in self._pending_deliveries.values():
Expand Down