Skip to content

Commit

Permalink
[Pyamqp] Pyamqp fix conn (#26568)
Browse files Browse the repository at this point in the history
* remove unnecessary pops

* fix var name + remove unnecessary pop

* fix
  • Loading branch information
kashifkhan authored Oct 1, 2022
1 parent 5305372 commit ce4b257
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,6 @@ def _incoming_end(self, channel, frame):
description="Invalid channel number received"
))
return
self._incoming_endpoints.pop(channel)
self._outgoing_endpoints.pop(channel)

def _process_incoming_frame(self, channel, frame): # pylint:disable=too-many-return-statements
# type: (int, Optional[Union[bytes, Tuple[int, Tuple[Any, ...]]]]) -> bool
Expand Down
45 changes: 23 additions & 22 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,28 @@ def _connect(self, host, port, timeout):
try:
entries = socket.getaddrinfo(host, port, family, socket.SOCK_STREAM, SOL_TCP)
entries_num = len(entries)
# now that we have address(es) for the hostname, connect to broker
for i, res in enumerate(entries):
af, socktype, proto, _, sa = res
try:
self.sock = socket.socket(af, socktype, proto)
try:
set_cloexec(self.sock, True)
except NotImplementedError:
pass
self.sock.settimeout(timeout)
self.sock.connect(sa)
except socket.error as ex:
e = ex
if self.sock is not None:
self.sock.close()
self.sock = None
# we may have depleted all our options
if i + 1 >= entries_num and n + 1 >= addr_types_num:
raise
else:
# hurray, we established connection
return
except socket.gaierror:
# we may have depleted all our options
if n + 1 >= addr_types_num:
Expand All @@ -294,28 +316,7 @@ def _connect(self, host, port, timeout):
raise e if e is not None else socket.error("failed to resolve broker hostname")
continue # pragma: no cover

# now that we have address(es) for the hostname, connect to broker
for i, res in enumerate(entries):
af, socktype, proto, _, sa = res
try:
self.sock = socket.socket(af, socktype, proto)
try:
set_cloexec(self.sock, True)
except NotImplementedError:
pass
self.sock.settimeout(timeout)
self.sock.connect(sa)
except socket.error as ex:
e = ex
if self.sock is not None:
self.sock.close()
self.sock = None
# we may have depleted all our options
if i + 1 >= entries_num and n + 1 >= addr_types_num:
raise
else:
# hurray, we established connection
return


def _init_socket(self, socket_settings, read_timeout, write_timeout):
self.sock.settimeout(None) # set socket back to blocking mode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,8 @@ async def _incoming_end(self, channel, frame):
"""
try:
await self._incoming_endpoints[channel]._incoming_end(frame) # pylint:disable=protected-access
self.incoming_endpoints.pop(channel)
self.outgoing_endpoints.pop(channel)
self._incoming_endpoints.pop(channel)
self._outgoing_endpoints.pop(channel)
except KeyError:
#close the connection
await self.close(
Expand All @@ -480,8 +480,6 @@ async def _incoming_end(self, channel, frame):
description="Invalid channel number received"
))
return
self._incoming_endpoints.pop(channel)
self._outgoing_endpoints.pop(channel)

async def _process_incoming_frame(self, channel, frame): # pylint:disable=too-many-return-statements
# type: (int, Optional[Union[bytes, Tuple[int, Tuple[Any, ...]]]]) -> bool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,28 @@ async def _connect(self, host, port, timeout):
try:
entries = await self.loop.getaddrinfo(host, port, family=family, type=socket.SOCK_STREAM, proto=SOL_TCP)
entries_num = len(entries)
# now that we have address(es) for the hostname, connect to broker
for i, res in enumerate(entries):
af, socktype, proto, _, sa = res
try:
self.sock = socket.socket(af, socktype, proto)
try:
set_cloexec(self.sock, True)
except NotImplementedError:
pass
self.sock.settimeout(timeout)
await self.loop.sock_connect(self.sock, sa)
except socket.error as ex:
e = ex
if self.sock is not None:
self.sock.close()
self.sock = None
# we may have depleted all our options
if i + 1 >= entries_num and n + 1 >= addr_types_num:
raise
else:
# hurray, we established connection
return
except socket.gaierror:
# we may have depleted all our options
if n + 1 >= addr_types_num:
Expand All @@ -272,28 +294,7 @@ async def _connect(self, host, port, timeout):
# relevant to users
raise (e if e is not None else socket.error("failed to resolve broker hostname"))

# now that we have address(es) for the hostname, connect to broker
for i, res in enumerate(entries):
af, socktype, proto, _, sa = res
try:
self.sock = socket.socket(af, socktype, proto)
try:
set_cloexec(self.sock, True)
except NotImplementedError:
pass
self.sock.settimeout(timeout)
await self.loop.sock_connect(self.sock, sa)
except socket.error as ex:
e = ex
if self.sock is not None:
self.sock.close()
self.sock = None
# we may have depleted all our options
if i + 1 >= entries_num and n + 1 >= addr_types_num:
raise
else:
# hurray, we established connection
return


def _init_socket(self, socket_settings, read_timeout, write_timeout):
self.sock.settimeout(None) # set socket back to blocking mode
Expand Down

0 comments on commit ce4b257

Please sign in to comment.