Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PyAMQP] Kashif Client refactoring changes #25451

Merged
49 commits merged into from
Sep 6, 2022
Merged
Changes from 12 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
912c993
init changes to client
kashifkhan Jul 18, 2022
1a5faf2
fix doctring
kashifkhan Jul 18, 2022
20fbe03
remove error
kashifkhan Jul 18, 2022
18599d5
cleanup
kashifkhan Jul 27, 2022
943f0d6
further clean up
kashifkhan Jul 27, 2022
179321a
fix tests
kashifkhan Jul 27, 2022
e880b9c
tweaks
kashifkhan Jul 27, 2022
cf66a14
fix import
kashifkhan Jul 27, 2022
7424d4a
fix unit tests
kashifkhan Jul 27, 2022
592046d
fix var
kashifkhan Jul 27, 2022
65c628c
looking into removing hostname on SDK side -- new branch so not messy
l0lawrence Jul 28, 2022
a7599a4
test-fix
l0lawrence Jul 28, 2022
07dd588
switch to kwargs
l0lawrence Jul 28, 2022
ba4b04d
SendClient had no docstring
l0lawrence Aug 1, 2022
ec307a5
adding back in hostname
l0lawrence Aug 1, 2022
45d9587
making send client param keyword instead
l0lawrence Aug 1, 2022
31a5380
making param to keyword in recieveClient docstring
l0lawrence Aug 1, 2022
293da9d
final uamqp docstring comments - need advice
l0lawrence Aug 1, 2022
f1f2f1b
renaming mesage to message
l0lawrence Aug 1, 2022
03bf52c
removing comment about typo
l0lawrence Aug 1, 2022
00b1815
hostname type string
l0lawrence Aug 1, 2022
f498d10
fixing tests
l0lawrence Aug 18, 2022
3a43101
removing print
l0lawrence Aug 18, 2022
306e097
adding hostname
l0lawrence Aug 18, 2022
88be0cc
Merge branch 'feature/eventhub/pyproto' of https://github.com/Azure/a…
l0lawrence Aug 18, 2022
72662d9
Merge branch 'feature/eventhub/pyproto' of https://github.com/Azure/a…
l0lawrence Aug 22, 2022
1dad77d
retry spelled wrong
l0lawrence Aug 22, 2022
33b18eb
Merge branch 'feature/eventhub/pyproto' into kashif_client_changes
l0lawrence Aug 25, 2022
f687953
git conflict
l0lawrence Aug 25, 2022
44c4279
removing some todos
l0lawrence Aug 25, 2022
50211ef
async
l0lawrence Aug 25, 2022
266dd2f
syncing together docs
l0lawrence Aug 25, 2022
d97ff8a
adding in InternalError on client_ready()
l0lawrence Aug 26, 2022
b26d4c6
made private var public
l0lawrence Aug 26, 2022
838383e
removing pylint disable to see
l0lawrence Aug 26, 2022
75598b3
remove auto add
l0lawrence Aug 26, 2022
6fb6ad8
initial pr fixes
l0lawrence Aug 29, 2022
019350a
removing desired_capabilities for now, spelling fix adding keywords f…
l0lawrence Aug 29, 2022
5e08d07
adding back in desired_cap -- used by consumer
l0lawrence Aug 29, 2022
8272c83
adding docstring patch for review
l0lawrence Aug 30, 2022
4bd01ae
auth as keyword
l0lawrence Aug 30, 2022
9c518f1
renaming prefetch to link_credit to match code
l0lawrence Aug 30, 2022
b67fa9c
swathi pr comments
l0lawrence Aug 30, 2022
8a83c0e
making pr comment changes
l0lawrence Aug 31, 2022
b1f34b4
nonetype err
l0lawrence Aug 31, 2022
6c4e933
fixing docstring async
l0lawrence Aug 31, 2022
e817f4b
making this uniform with sync side
l0lawrence Aug 31, 2022
ced95bc
fixing leftover doc issue
l0lawrence Aug 31, 2022
975db59
str->bytes, removing __doc__ override
l0lawrence Sep 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py
Original file line number Diff line number Diff line change
@@ -374,7 +374,8 @@ def _management_request(self, mgmt_msg, op_type):
if custom_endpoint_address:
custom_endpoint_address += '/$servicebus/websocket/'
mgmt_client = AMQPClient(
hostname,
# Do we want this to be hostname or remote_address
# hostname,
auth=mgmt_auth,
network_trace=self._config.network_tracing,
transport_type=self._config.transport_type,
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhub/azure/eventhub/_consumer.py
Original file line number Diff line number Diff line change
@@ -159,7 +159,7 @@ def _create_handler(self, auth):
custom_endpoint_address += '/$servicebus/websocket/'

self._handler = ReceiveClient(
hostname,
# hostname,
source,
auth=auth,
idle_timeout=self._idle_timeout,
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhub/azure/eventhub/_producer.py
Original file line number Diff line number Diff line change
@@ -133,7 +133,7 @@ def _create_handler(self, auth):
if custom_endpoint_address:
custom_endpoint_address += '/$servicebus/websocket/'
self._handler = SendClient(
hostname,
# hostname,
self._target,
auth=auth,
idle_timeout=self._idle_timeout,
Original file line number Diff line number Diff line change
@@ -121,6 +121,7 @@ def _get_partitions(self):
for p_id in cast(List[str], self._partition_ids):
self._producers[p_id] = None

# this is spelled wrong here -- do we change - or were the changes elsewhere
def _get_max_mesage_size(self):
l0lawrence marked this conversation as resolved.
Show resolved Hide resolved
# type: () -> None
# pylint: disable=protected-access,line-too-long
91 changes: 49 additions & 42 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/client.py
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@
from .session import Session
from .sender import SenderLink
from .receiver import ReceiverLink
from .sasl import SASLTransport
from .sasl import SASLAnonymousCredential, SASLTransport
from .endpoints import Source, Target
from .error import (
AMQPConnectionError,
@@ -59,14 +59,14 @@ class AMQPClient(object):

:param remote_address: The AMQP endpoint to connect to. This could be a send target
or a receive source.
:type remote_address: str, bytes or ~uamqp.address.Address
:param auth: Authentication for the connection. This should be one of the subclasses of
uamqp.authentication.AMQPAuth. Currently this includes:
- uamqp.authentication.SASLAnonymous
- uamqp.authentication.SASLPlain
- uamqp.authentication.SASTokenAuth
:type remote_address: str, bytes or ~pyamqp.endpoint.Target or ~pyamqp.endpoint.Source
:param auth: Authentication for the connection. This should be one of the following:
- pyamqp.authentication.SASLAnonymous
- pyamqp.authentication.SASLPlain
- pyamqp.authentication.SASTokenAuth
- pyamqp.authentication.JWTTokenAuth
If no authentication is supplied, SASLAnnoymous will be used by default.
:type auth: ~uamqp.authentication.common.AMQPAuth
:type auth: ~pyamqp.authentication
:param client_name: The name for the client, also known as the Container ID.
If no name is provided, a random GUID will be used.
:type client_name: str or bytes
@@ -75,7 +75,7 @@ class AMQPClient(object):
:type debug: bool
:param retry_policy: A policy for parsing errors on link, connection and message
disposition to determine whether the error should be retryable.
:type retry_policy: ~uamqp.errors.RetryPolicy
:type retry_policy: ~pyamqp.error.RetryPolicy
:param keep_alive_interval: If set, a thread will be started to keep the connection
alive during periods of user inactivity. The value will determine how long the
thread will sleep (in seconds) between pinging the connection. If 0 or None, no
@@ -105,27 +105,29 @@ class AMQPClient(object):
:type handle_max: int
:param on_attach: A callback function to be run on receipt of an ATTACH frame.
The function must take 4 arguments: source, target, properties and error.
:type on_attach: func[~uamqp.address.Source, ~uamqp.address.Target, dict, ~uamqp.errors.AMQPConnectionError]
:type on_attach: func[~pyamqp.endpoint.Source, ~pyamqp.endpoint.Target, dict, ~pyamqp.error.AMQPConnectionError]
:param send_settle_mode: The mode by which to settle message send
operations. If set to `Unsettled`, the client will wait for a confirmation
from the service that the message was successfully sent. If set to 'Settled',
the client will not wait for confirmation and assume success.
:type send_settle_mode: ~uamqp.constants.SenderSettleMode
:type send_settle_mode: ~pyamqp.constants.SenderSettleMode
:param receive_settle_mode: The mode by which to settle message receive
operations. If set to `PeekLock`, the receiver will lock a message once received until
the client accepts or rejects the message. If set to `ReceiveAndDelete`, the service
will assume successful receipt of the message and clear it from the queue. The
default is `PeekLock`.
:type receive_settle_mode: ~uamqp.constants.ReceiverSettleMode
:type receive_settle_mode: ~pyamqp.constants.ReceiverSettleMode
:param encoding: The encoding to use for parameters supplied as strings.
Default is 'UTF-8'
:type encoding: str
"""

def __init__(self, hostname, auth=None, **kwargs):
self._hostname = hostname
def __init__(self, remote_address, auth=None, client_name=None, debug=False, retry_policy=None,
l0lawrence marked this conversation as resolved.
Show resolved Hide resolved
keep_alive_interval=None, **kwargs):
# I think these are just strings not instances of target or source
self._remote_address = remote_address.address if (isinstance(remote_address, Source) or isinstance(remote_address, Target)) else remote_address
self._auth = auth
self._name = kwargs.pop("client_name", str(uuid.uuid4()))
self._name = client_name if client_name else str(uuid.uuid4())

self._shutdown = False
self._connection = None
@@ -136,19 +138,23 @@ def __init__(self, hostname, auth=None, **kwargs):
self._cbs_authenticator = None
self._auth_timeout = kwargs.pop("auth_timeout", DEFAULT_AUTH_TIMEOUT)
self._mgmt_links = {}
self._retry_policy = kwargs.pop("retry_policy", RetryPolicy())
self._retry_policy = retry_policy if retry_policy else RetryPolicy()
self._keep_alive_interval = int(keep_alive_interval) if keep_alive_interval else 0

# Connection settings
self._max_frame_size = kwargs.pop('max_frame_size', None) or MAX_FRAME_SIZE_BYTES
self._channel_max = kwargs.pop('channel_max', None) or 65535
self._idle_timeout = kwargs.pop('idle_timeout', None)
self._properties = kwargs.pop('properties', None)
self._network_trace = kwargs.pop("network_trace", False)
self._remote_idle_timeout_empty_frame_send_ratio = kwargs.pop(
'remote_idle_timeout_empty_frame_send_ratio', None)
self._network_trace = debug
l0lawrence marked this conversation as resolved.
Show resolved Hide resolved

# Session settings
self._outgoing_window = kwargs.pop('outgoing_window', None) or OUTGOING_WIDNOW
self._incoming_window = kwargs.pop('incoming_window', None) or INCOMING_WINDOW
self._handle_max = kwargs.pop('handle_max', None)
self._on_attach = kwargs.pop('on_attach', None)

# Link settings
self._send_settle_mode = kwargs.pop('send_settle_mode', SenderSettleMode.Unsettled)
@@ -234,15 +240,15 @@ def open(self):

:param connection: An existing Connection that may be shared between
multiple clients.
:type connetion: ~uamqp.Connection
:type connection: ~pyamqp.Connection
"""
# pylint: disable=protected-access
if self._session:
return # already open.
_logger.debug("Opening client connection.")
if not self._connection:
self._connection = Connection(
"amqps://" + self._hostname,
"amqps://" + self._remote_address,
sasl_credential=self._auth.sasl,
ssl={'ca_certs':self._connection_verify or certifi.where()},
container_id=self._name,
@@ -332,7 +338,7 @@ def do_work(self, **kwargs):
to be shut down.

:rtype: bool
:raises: TimeoutError or ~uamqp.errors.ClientTimeout if CBS authentication timeout reached.
:raises: TimeoutError if CBS authentication timeout reached.
"""
if self._shutdown:
return False
@@ -343,7 +349,7 @@ def do_work(self, **kwargs):
def mgmt_request(self, message, **kwargs):
"""
:param message: The message to send in the management request.
:type message: ~uamqp.message.Message
:type message: ~pyamqp.message.Message
:keyword str operation: The type of operation to be performed. This value will
be service-specific, but common values include READ, CREATE and UPDATE.
This value will be added as an application property on the message.
@@ -353,7 +359,7 @@ def mgmt_request(self, message, **kwargs):
:keyword str node: The target node. Default node is `$management`.
:keyword float timeout: Provide an optional timeout in seconds within which a response
to the management request must be received.
:rtype: ~uamqp.message.Message
:rtype: ~pyamqp.message.Message
"""

# The method also takes "status_code_field" and "status_description_field"
@@ -385,13 +391,13 @@ def mgmt_request(self, message, **kwargs):


class SendClient(AMQPClient):
def __init__(self, hostname, target, auth=None, **kwargs):
def __init__(self, target, auth=None, **kwargs):
self.target = target
# Sender and Link settings
self._max_message_size = kwargs.pop('max_message_size', None) or MAX_FRAME_SIZE_BYTES
self._link_properties = kwargs.pop('link_properties', None)
self._link_credit = kwargs.pop('link_credit', None)
super(SendClient, self).__init__(hostname, auth=auth, **kwargs)
super(SendClient, self).__init__(target, auth=auth, **kwargs)

def _client_ready(self):
"""Determine whether the client is ready to start receiving messages.
@@ -519,7 +525,7 @@ def _send_message_impl(self, message, **kwargs):

def send_message(self, message, **kwargs):
"""
:param ~uamqp.message.Message message:
:param ~pyamqp.message.Message message:
:keyword float timeout: timeout in seconds. If set to
0, the client will continue to wait until the message is sent or error happens. The
default is 0.
@@ -530,16 +536,17 @@ def send_message(self, message, **kwargs):
class ReceiveClient(AMQPClient):
"""An AMQP client for receiving messages.

:param target: The source AMQP service endpoint. This can either be the URI as
a string or a ~uamqp.address.Source object.
:type target: str, bytes or ~uamqp.address.Source
:param source: The source AMQP service endpoint. This can either be the URI as
a string or a ~pyamqp.endpoint.Source object.
:type source: str, bytes or ~pyamqp.endpoint.Source
:param auth: Authentication for the connection. This should be one of the subclasses of
uamqp.authentication.AMQPAuth. Currently this includes:
- uamqp.authentication.SASLAnonymous
- uamqp.authentication.SASLPlain
- uamqp.authentication.SASTokenAuth
pyamqp.authentication.AMQPAuth. Currently this includes:
- pyamqp.authentication.SASLAnonymous
- pyamqp.authentication.SASLPlain
- pyamqp.authentication.SASTokenAuth
If no authentication is supplied, SASLAnnoymous will be used by default.
:type auth: ~uamqp.authentication.common.AMQPAuth
:type auth: ~pyamqp.authentication.SASLAnonymous or pyamqp.authentication.SASLPlain
or pyamqp.authentication.SASTokenAuth
:param client_name: The name for the client, also known as the Container ID.
If no name is provided, a random GUID will be used.
:type client_name: str or bytes
@@ -554,7 +561,7 @@ class ReceiveClient(AMQPClient):
:type auto_complete: bool
:param retry_policy: A policy for parsing errors on link, connection and message
disposition to determine whether the error should be retryable.
:type retry_policy: ~uamqp.errors.RetryPolicy
:type retry_policy: ~pyamqp.errors.RetryPolicy
:param keep_alive_interval: If set, a thread will be started to keep the connection
alive during periods of user inactivity. The value will determine how long the
thread will sleep (in seconds) between pinging the connection. If 0 or None, no
@@ -564,13 +571,13 @@ class ReceiveClient(AMQPClient):
operations. If set to `Unsettled`, the client will wait for a confirmation
from the service that the message was successfully sent. If set to 'Settled',
the client will not wait for confirmation and assume success.
:type send_settle_mode: ~uamqp.constants.SenderSettleMode
:type send_settle_mode: ~pyamqp.constants.SenderSettleMode
:param receive_settle_mode: The mode by which to settle message receive
operations. If set to `PeekLock`, the receiver will lock a message once received until
the client accepts or rejects the message. If set to `ReceiveAndDelete`, the service
will assume successful receipt of the message and clear it from the queue. The
default is `PeekLock`.
:type receive_settle_mode: ~uamqp.constants.ReceiverSettleMode
:type receive_settle_mode: ~pyamqp.constants.ReceiverSettleMode
:param desired_capabilities: The extension capabilities desired from the peer endpoint.
To create an desired_capabilities object, please do as follows:
- 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]`
@@ -605,13 +612,13 @@ class ReceiveClient(AMQPClient):
:type handle_max: int
:param on_attach: A callback function to be run on receipt of an ATTACH frame.
The function must take 4 arguments: source, target, properties and error.
:type on_attach: func[~uamqp.address.Source, ~uamqp.address.Target, dict, ~uamqp.errors.AMQPConnectionError]
:type on_attach: func[~pyamqp.endpoint.Source, ~pyamqp.endpoint.Target, dict, ~pyamqp.errors.AMQPConnectionError]
:param encoding: The encoding to use for parameters supplied as strings.
Default is 'UTF-8'
:type encoding: str
"""

def __init__(self, hostname, source, auth=None, **kwargs):
def __init__(self, source, auth=None, **kwargs):
l0lawrence marked this conversation as resolved.
Show resolved Hide resolved
l0lawrence marked this conversation as resolved.
Show resolved Hide resolved
self.source = source
self._streaming_receive = kwargs.pop("streaming_receive", False) # TODO: whether public?
self._received_messages = queue.Queue()
@@ -621,7 +628,7 @@ def __init__(self, hostname, source, auth=None, **kwargs):
self._max_message_size = kwargs.pop('max_message_size', None) or MAX_FRAME_SIZE_BYTES
self._link_properties = kwargs.pop('link_properties', None)
self._link_credit = kwargs.pop('link_credit', 300)
l0lawrence marked this conversation as resolved.
Show resolved Hide resolved
super(ReceiveClient, self).__init__(hostname, auth=auth, **kwargs)
super(ReceiveClient, self).__init__(source, auth=auth, **kwargs)

def _client_ready(self):
"""Determine whether the client is ready to start receiving messages.
@@ -673,7 +680,7 @@ def _message_received(self, message):
or iterator, the message will be added to an internal queue.

:param message: Received message.
:type message: ~uamqp.message.Message
:type message: ~pyamqp.message.Message
"""
if self._message_received_callback:
self._message_received_callback(message)
@@ -748,8 +755,8 @@ def receive_message_batch(self, **kwargs):
the prefetch value will be used.
:type max_batch_size: int
:param on_message_received: A callback to process messages as they arrive from the
service. It takes a single argument, a ~uamqp.message.Message object.
:type on_message_received: callable[~uamqp.message.Message]
service. It takes a single argument, a ~pyamqp.message.Message object.
:type on_message_received: callable[~pyamqp.message.Message]
:param timeout: I timeout in milliseconds for which to wait to receive any messages.
If no messages are received in this time, an empty list will be returned. If set to
0, the client will continue to wait until at least one message is received. The
Original file line number Diff line number Diff line change
@@ -9,7 +9,6 @@
def test_send_client_creation():

sender = SendClient(
"fake.host.com",
"fake_eh",
"my_fake_auth"
)
@@ -20,7 +19,6 @@ def test_send_client_creation():
def test_receive_client_creation():

receiver = ReceiveClient(
"fake.host.com",
"fake_eh",
"my_fake_auth"
)
Original file line number Diff line number Diff line change
@@ -9,10 +9,9 @@

def test_client_creation_exceptions():
with pytest.raises(TypeError):
l0lawrence marked this conversation as resolved.
Show resolved Hide resolved
sender = SendClient(
"fake.host.com",
)
assert sender._hostname == "fake.host.com"
sender = SendClient()
# assert sender._remote_address == "fake.host.com"
# We can't test this now b/c the assert would have to be against source and would pass -- maybe remove this test

def test_connection_endpoint_exceptions():
with pytest.raises(AMQPConnectionError):
@@ -34,7 +33,7 @@ def test_connection_sas_authentication_exception():
password=""
)
with pytest.raises(AttributeError):
sender = SendClient("fake.host.com", target, auth=sas_auth)
sender = SendClient(target, auth=sas_auth)
sender.client_ready()

def test_connection_sasl_annon_authentication_exception():
@@ -45,5 +44,5 @@ def test_connection_sasl_annon_authentication_exception():

sas_auth = authentication.SASLAnonymousCredential()
with pytest.raises(AttributeError):
sender = SendClient("fake.host.com", target, auth=sas_auth)
sender = SendClient(target, auth=sas_auth)
sender.client_ready()