Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHubs] Pure Python AMQP Sycn Implementation Integration #22397

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 49 additions & 67 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@
from urllib.parse import urlparse, quote_plus


from .pyamqp.client import AMQPClient as PyAMQPClient
from .pyamqp.authentication import _generate_sas_token as Py_generate_sas_token
from .pyamqp.message import Message as PyMessage, Properties as PyMessageProperties
from uamqp import authentication
from .pyamqp import constants, error as errors, utils
from .pyamqp.authentication import JWTTokenAuth as PyJWTTokenAuth
from .pyamqp.client import AMQPClient
from .pyamqp.message import Message, Properties
from .pyamqp import constants, error as errors, utils as pyamqp_utils
from .pyamqp.authentication import JWTTokenAuth


import six
Expand Down Expand Up @@ -147,7 +145,7 @@ def _generate_sas_token(uri, policy, key, expiry=None):

abs_expiry = int(time.time()) + expiry.seconds

token = utils.generate_sas_token(uri, policy, key, abs_expiry)
token = pyamqp_utils.generate_sas_token(uri, policy, key, abs_expiry)
return _AccessToken(token=token, expires_on=abs_expiry)


Expand Down Expand Up @@ -180,7 +178,7 @@ def get_token(self, *scopes, **kwargs): # pylint:disable=unused-argument
if not scopes:
raise ValueError("No token scope provided.")

return Py_generate_sas_token(scopes[0], self.policy, self.key)
return _generate_sas_token(scopes[0], self.policy, self.key)


class EventhubAzureNamedKeyTokenCredential(object):
Expand Down Expand Up @@ -226,6 +224,7 @@ def get_token(self, *scopes, **kwargs): # pylint:disable=unused-argument
"""
return AccessToken(self.token, self.expiry)


class EventhubAzureSasTokenCredential(object):
"""The shared access token credential used for authentication
when AzureSasCredential is provided.
Expand Down Expand Up @@ -303,8 +302,8 @@ def _create_auth(self):
except AttributeError:
token_type = b"jwt"
if token_type == b"servicebus.windows.net:sastoken":
return PyJWTTokenAuth(self._auth_uri, self._auth_uri, functools.partial(self._credential.get_token, self._auth_uri))
return authentication.JWTTokenAuth(
return JWTTokenAuth(self._auth_uri, self._auth_uri, functools.partial(self._credential.get_token, self._auth_uri))
return JWTTokenAuth(
self._auth_uri,
self._auth_uri,
functools.partial(self._credential.get_token, JWT_TOKEN_SCOPE),
Expand Down Expand Up @@ -350,21 +349,18 @@ def _management_request(self, mgmt_msg, op_type):
last_exception = None
while retried_times <= self._config.max_retries:
mgmt_auth = self._create_auth()
mgmt_client = PyAMQPClient(
mgmt_client = AMQPClient(
self._address.hostname, auth=mgmt_auth, debug=self._config.network_tracing
)
try:
conn = self._conn_manager.get_connection(
self._mgmt_target
) # pylint:disable=assignment-from-none
mgmt_client.open()
while not mgmt_client.client_ready():
time.sleep(0.05)
mgmt_msg.application_properties["security_token"] = mgmt_auth.get_token()
response = mgmt_client.mgmt_request(
mgmt_msg,
operation=READ_OPERATION,
operation_type=op_type,
operation=READ_OPERATION.decode(),
operation_type=op_type.decode(),
status_code_field=MGMT_STATUS_CODE,
description_fields=MGMT_STATUS_DESC,
)
Expand All @@ -375,35 +371,27 @@ def _management_request(self, mgmt_msg, op_type):
if status_code < 400:
return response
if status_code in [401]:
raise errors.AMQPException(
status_code,
description,
{}
raise errors.AuthenticationException(
errors.ErrorCodes.UnauthorizedAccess,
description="Management authentication failed. Status code: {}, Description: {!r}".format(
status_code,
description
)
)
# TODO: FEATURE PARITY
#raise errors.AuthenticationException(
# "Management authentication failed. Status code: {}, Description: {!r}".format(
# status_code,
# description
# )
#)
if status_code in [404]:
raise errors.AMQPException(
status_code,
description,
{}
raise errors.AMQPConnectionError(
errors.ErrorCodes.NotFound,
description="Management connection failed. Status code: {}, Description: {!r}".format(
status_code,
description
)
)
# TODO: FEATURE PARITY
#raise ConnectError(
# "Management connection failed. Status code: {}, Description: {!r}".format(
# status_code,
# description
# )
#)
raise errors.AMQPConnectionError(
status_code,
description,
{}
errors.ErrorCodes.UnknownError,
description="Management operation failed. Status code: {}, Description: {!r}".format(
status_code,
description
)
)
except Exception as exception: # pylint: disable=broad-except
last_exception = _handle_exception(exception, self)
Expand All @@ -427,23 +415,18 @@ def _add_span_request_attributes(self, span):

def _get_eventhub_properties(self):
# type:() -> Dict[str, Any]
# TODO: amqp mgmt support missing
#mgmt_msg = Message(application_properties={"name": self.eventhub_name})
#response = self._management_request(mgmt_msg, op_type=MGMT_OPERATION)
#output = {}
#eh_info = response.data # type: Dict[bytes, Any]
#if eh_info:
# output["eventhub_name"] = eh_info[b"name"].decode("utf-8")
# output["created_at"] = utc_from_timestamp(
# float(eh_info[b"created_at"]) / 1000
# )
# output["partition_ids"] = [
# p.decode("utf-8") for p in eh_info[b"partition_ids"]
# ]
output = {
# 32 is the max allowed partition count on azure portal
"partition_ids": [str(i) for i in range(32)]
}
mgmt_msg = Message(application_properties={"name": self.eventhub_name})
response = self._management_request(mgmt_msg, op_type=MGMT_OPERATION)
output = {}
eh_info = response.value # type: Dict[bytes, Any]
if eh_info:
output["eventhub_name"] = eh_info[b"name"].decode("utf-8")
output["created_at"] = utc_from_timestamp(
float(eh_info[b"created_at"]) / 1000
)
output["partition_ids"] = [
p.decode("utf-8") for p in eh_info[b"partition_ids"]
]
return output

def _get_partition_ids(self):
Expand All @@ -452,14 +435,14 @@ def _get_partition_ids(self):

def _get_partition_properties(self, partition_id):
# type:(str) -> Dict[str, Any]
mgmt_msg = PyMessage(
mgmt_msg = Message(
application_properties={
"name": self.eventhub_name,
"partition": partition_id,
}
)
response = self._management_request(mgmt_msg, op_type=MGMT_PARTITION_OPERATION)
partition_info = response.get_data() # type: Dict[bytes, Any]
partition_info = response.value # type: Dict[bytes, Any]
output = {}
if partition_info:
output["eventhub_name"] = partition_info[b"name"].decode("utf-8")
Expand Down Expand Up @@ -512,11 +495,7 @@ def _open(self):
self._handler.close()
auth = self._client._create_auth()
self._create_handler(auth)
self._handler.open(
# connection=self._client._conn_manager.get_connection(
# self._client._address.hostname, auth
# ) # pylint: disable=protected-access
)
self._handler.open()
while not self._handler.client_ready():
time.sleep(0.05)
self._max_message_size_on_link = (
Expand All @@ -535,8 +514,11 @@ def _close_connection(self):
self._client._conn_manager.reset_connection_if_broken() # pylint: disable=protected-access

def _handle_exception(self, exception):
#if not self.running and isinstance(exception, compat.TimeoutException):
# exception = errors.AuthenticationException("Authorization timeout.")
if not self.running and isinstance(exception, TimeoutError):
exception = errors.AuthenticationException(
errors.ErrorCodes.InternalError,
description="Authorization timeout."
)
return _handle_exception(exception, self)

def _do_retryable_operation(self, operation, timeout=None, **kwargs):
Expand Down
19 changes: 8 additions & 11 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
PROP_SEQ_NUMBER,
PROP_OFFSET,
PROP_PARTITION_KEY,
PROP_PARTITION_KEY_AMQP_SYMBOL,
PROP_TIMESTAMP,
PROP_ABSOLUTE_EXPIRY_TIME,
PROP_CONTENT_ENCODING,
Expand Down Expand Up @@ -107,7 +106,7 @@ def __init__(self, body=None):

# Internal usage only for transforming AmqpAnnotatedMessage to outgoing EventData
self._raw_amqp_message = AmqpAnnotatedMessage( # type: ignore
data_body=[body], annotations={}, application_properties={}
data_body=body, annotations={}, application_properties={}
)
self.message = (self._raw_amqp_message._message) # pylint:disable=protected-access
self._raw_amqp_message.header = AmqpMessageHeader()
Expand Down Expand Up @@ -193,13 +192,13 @@ def _encode_message(self):
def _decode_non_data_body_as_str(self, encoding="UTF-8"):
# type: (str) -> str
# pylint: disable=protected-access
body = self.raw_amqp_message._message._body
body = self.raw_amqp_message.body
if self.body_type == AmqpMessageBodyType.VALUE:
if not body.data:
if not body:
return ""
return str(decode_with_recurse(body.data, encoding))
return str(decode_with_recurse(body, encoding))

seq_list = [d for seq_section in body.data for d in seq_section]
seq_list = [d for seq_section in body for d in seq_section]
return str(decode_with_recurse(seq_list, encoding))

def _to_outgoing_message(self):
Expand Down Expand Up @@ -254,7 +253,7 @@ def partition_key(self):
:rtype: bytes
"""
try:
return self._raw_amqp_message.annotations[PROP_PARTITION_KEY_AMQP_SYMBOL]
return self._raw_amqp_message.annotations[PROP_PARTITION_KEY]
except KeyError:
return self._raw_amqp_message.annotations.get(PROP_PARTITION_KEY, None)

Expand Down Expand Up @@ -486,9 +485,7 @@ def __init__(self, max_size_in_bytes=None, partition_id=None, partition_key=None
self.message = BatchMessage(data=[])
self._partition_id = partition_id
self._partition_key = partition_key
# TODO: test whether we need to set partition key of a batch message, or setting each single message if enough
# this is performance related
#set_message_partition_key(self.message, self._partition_key)
self.message = set_message_partition_key(self.message, self._partition_key)
self._size = pyutils.get_message_encoded_size(self.message)
self._count = 0

Expand Down Expand Up @@ -557,7 +554,7 @@ def add(self, event_data):
"The partition key of event_data does not match the partition key of this batch."
)
if not outgoing_event_data.partition_key:
set_message_partition_key(
outgoing_event_data.message = set_message_partition_key(
outgoing_event_data.message, self._partition_key
)

Expand Down
14 changes: 10 additions & 4 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@
from __future__ import unicode_literals

from enum import Enum
from .pyamqp import types


PROP_SEQ_NUMBER = b"x-opt-sequence-number"
PROP_OFFSET = b"x-opt-offset"
PROP_PARTITION_KEY = b"x-opt-partition-key"
PROP_PARTITION_KEY_AMQP_SYMBOL = PROP_PARTITION_KEY # TODO: FIND REPLACEMENT - types.AMQPSymbol(PROP_PARTITION_KEY)
PROP_TIMESTAMP = b"x-opt-enqueued-time"
PROP_LAST_ENQUEUED_SEQUENCE_NUMBER = b"last_enqueued_sequence_number"
PROP_LAST_ENQUEUED_OFFSET = b"last_enqueued_offset"
Expand Down Expand Up @@ -47,13 +45,21 @@
MGMT_STATUS_DESC = b'status-description'
USER_AGENT_PREFIX = "azsdk-python-eventhubs"

NO_RETRY_ERRORS = (
NO_RETRY_ERRORS = [
b"com.microsoft:argument-out-of-range",
b"com.microsoft:entity-disabled",
b"com.microsoft:auth-failed",
b"com.microsoft:precondition-failed",
b"com.microsoft:argument-error",
)
]

CUSTOM_CONDITION_BACKOFF = {
b"com.microsoft:server-busy": 4,
b"com.microsoft:timeout": 2,
b"com.microsoft:operation-cancelled": 0,
b"com.microsoft:container-close": 4
}


## all below - previously uamqp
class TransportType(Enum):
Expand Down
Loading