Skip to content

Commit

Permalink
[EventHubs] Pure Python AMQP Sycn Implementation Integration (Azure#2…
Browse files Browse the repository at this point in the history
…2397)

* copy amqp changes

* eh python amqp integration

* fix time unit

* rename module pyamqp to _pyamqp

* more pyamqp to _pyamqp

* simplify todo
  • Loading branch information
yunhaoling authored and kashifkhan committed May 10, 2022
1 parent 92abbc0 commit dd96d59
Show file tree
Hide file tree
Showing 54 changed files with 1,467 additions and 917 deletions.
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhub/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from .pyamqp import constants
from ._pyamqp import constants
from ._common import EventData, EventDataBatch
from ._version import VERSION

Expand Down
115 changes: 48 additions & 67 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@
from urllib.parse import urlparse, quote_plus


from .pyamqp.client import AMQPClient as PyAMQPClient
from .pyamqp.authentication import _generate_sas_token as Py_generate_sas_token
from .pyamqp.message import Message as PyMessage, Properties as PyMessageProperties
from uamqp import authentication
from .pyamqp import constants, error as errors, utils
from .pyamqp.authentication import JWTTokenAuth as PyJWTTokenAuth
from ._pyamqp.client import AMQPClient
from ._pyamqp.message import Message, Properties
from ._pyamqp import constants, error as errors, utils as pyamqp_utils
from ._pyamqp.authentication import JWTTokenAuth


import six
Expand Down Expand Up @@ -153,7 +151,7 @@ def _generate_sas_token(uri, policy, key, expiry=None):

abs_expiry = int(time.time()) + expiry.seconds

token = utils.generate_sas_token(uri, policy, key, abs_expiry)
token = pyamqp_utils.generate_sas_token(uri, policy, key, abs_expiry)
return _AccessToken(token=token, expires_on=abs_expiry)


Expand Down Expand Up @@ -194,7 +192,7 @@ def get_token(self, *scopes, **kwargs): # pylint:disable=unused-argument
if not scopes:
raise ValueError("No token scope provided.")

return Py_generate_sas_token(scopes[0], self.policy, self.key)
return _generate_sas_token(scopes[0], self.policy, self.key)



Expand Down Expand Up @@ -333,8 +331,8 @@ def _create_auth(self):
except AttributeError:
token_type = b"jwt"
if token_type == b"servicebus.windows.net:sastoken":
return PyJWTTokenAuth(self._auth_uri, self._auth_uri, functools.partial(self._credential.get_token, self._auth_uri))
return authentication.JWTTokenAuth(
return JWTTokenAuth(self._auth_uri, self._auth_uri, functools.partial(self._credential.get_token, self._auth_uri))
return JWTTokenAuth(
self._auth_uri,
self._auth_uri,
functools.partial(self._credential.get_token, JWT_TOKEN_SCOPE),
Expand Down Expand Up @@ -387,21 +385,18 @@ def _management_request(self, mgmt_msg, op_type):
last_exception = None
while retried_times <= self._config.max_retries:
mgmt_auth = self._create_auth()
mgmt_client = PyAMQPClient(
mgmt_client = AMQPClient(
self._address.hostname, auth=mgmt_auth, debug=self._config.network_tracing
)
try:
conn = self._conn_manager.get_connection(
self._mgmt_target
) # pylint:disable=assignment-from-none
mgmt_client.open()
while not mgmt_client.client_ready():
time.sleep(0.05)
mgmt_msg.application_properties["security_token"] = mgmt_auth.get_token()
response = mgmt_client.mgmt_request(
mgmt_msg,
operation=READ_OPERATION,
operation_type=op_type,
operation=READ_OPERATION.decode(),
operation_type=op_type.decode(),
status_code_field=MGMT_STATUS_CODE,
description_fields=MGMT_STATUS_DESC,
)
Expand All @@ -414,35 +409,27 @@ def _management_request(self, mgmt_msg, op_type):
if status_code < 400:
return response
if status_code in [401]:
raise errors.AMQPException(
status_code,
description,
{}
raise errors.AuthenticationException(
errors.ErrorCodes.UnauthorizedAccess,
description="Management authentication failed. Status code: {}, Description: {!r}".format(
status_code,
description
)
)
# TODO: FEATURE PARITY
#raise errors.AuthenticationException(
# "Management authentication failed. Status code: {}, Description: {!r}".format(
# status_code,
# description
# )
#)
if status_code in [404]:
raise errors.AMQPException(
status_code,
description,
{}
raise errors.AMQPConnectionError(
errors.ErrorCodes.NotFound,
description="Management connection failed. Status code: {}, Description: {!r}".format(
status_code,
description
)
)
# TODO: FEATURE PARITY
#raise ConnectError(
# "Management connection failed. Status code: {}, Description: {!r}".format(
# status_code,
# description
# )
#)
raise errors.AMQPConnectionError(
status_code,
description,
{}
errors.ErrorCodes.UnknownError,
description="Management operation failed. Status code: {}, Description: {!r}".format(
status_code,
description
)
)
except Exception as exception: # pylint: disable=broad-except
last_exception = _handle_exception(exception, self)
Expand All @@ -466,23 +453,18 @@ def _add_span_request_attributes(self, span):

def _get_eventhub_properties(self):
# type:() -> Dict[str, Any]
# TODO: amqp mgmt support missing
#mgmt_msg = Message(application_properties={"name": self.eventhub_name})
#response = self._management_request(mgmt_msg, op_type=MGMT_OPERATION)
#output = {}
#eh_info = response.data # type: Dict[bytes, Any]
#if eh_info:
# output["eventhub_name"] = eh_info[b"name"].decode("utf-8")
# output["created_at"] = utc_from_timestamp(
# float(eh_info[b"created_at"]) / 1000
# )
# output["partition_ids"] = [
# p.decode("utf-8") for p in eh_info[b"partition_ids"]
# ]
output = {
# 32 is the max allowed partition count on azure portal
"partition_ids": [str(i) for i in range(32)]
}
mgmt_msg = Message(application_properties={"name": self.eventhub_name})
response = self._management_request(mgmt_msg, op_type=MGMT_OPERATION)
output = {}
eh_info = response.value # type: Dict[bytes, Any]
if eh_info:
output["eventhub_name"] = eh_info[b"name"].decode("utf-8")
output["created_at"] = utc_from_timestamp(
float(eh_info[b"created_at"]) / 1000
)
output["partition_ids"] = [
p.decode("utf-8") for p in eh_info[b"partition_ids"]
]
return output

def _get_partition_ids(self):
Expand All @@ -491,14 +473,14 @@ def _get_partition_ids(self):

def _get_partition_properties(self, partition_id):
# type:(str) -> Dict[str, Any]
mgmt_msg = PyMessage(
mgmt_msg = Message(
application_properties={
"name": self.eventhub_name,
"partition": partition_id,
}
)
response = self._management_request(mgmt_msg, op_type=MGMT_PARTITION_OPERATION)
partition_info = response.get_data() # type: Dict[bytes, Any]
partition_info = response.value # type: Dict[bytes, Any]
output = {}
if partition_info:
output["eventhub_name"] = partition_info[b"name"].decode("utf-8")
Expand Down Expand Up @@ -549,11 +531,7 @@ def _open(self):
self._handler.close()
auth = self._client._create_auth()
self._create_handler(auth)
self._handler.open(
# connection=self._client._conn_manager.get_connection(
# self._client._address.hostname, auth
# ) # pylint: disable=protected-access
)
self._handler.open()
while not self._handler.client_ready():
time.sleep(0.05)
self._max_message_size_on_link = (
Expand All @@ -572,8 +550,11 @@ def _close_connection(self):
self._client._conn_manager.reset_connection_if_broken() # pylint: disable=protected-access

def _handle_exception(self, exception):
#if not self.running and isinstance(exception, compat.TimeoutException):
# exception = errors.AuthenticationException("Authorization timeout.")
if not self.running and isinstance(exception, TimeoutError):
exception = errors.AuthenticationException(
errors.ErrorCodes.InternalError,
description="Authorization timeout."
)
return _handle_exception(exception, self)

def _do_retryable_operation(self, operation, timeout=None, **kwargs):
Expand Down
23 changes: 10 additions & 13 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
PROP_SEQ_NUMBER,
PROP_OFFSET,
PROP_PARTITION_KEY,
PROP_PARTITION_KEY_AMQP_SYMBOL,
PROP_TIMESTAMP,
PROP_ABSOLUTE_EXPIRY_TIME,
PROP_CONTENT_ENCODING,
Expand All @@ -54,8 +53,8 @@
AmqpMessageProperties,
)

from .pyamqp import constants, utils as pyutils
from .pyamqp.message import BatchMessage, Message
from ._pyamqp import constants, utils as pyutils
from ._pyamqp.message import BatchMessage, Message

if TYPE_CHECKING:
import datetime
Expand Down Expand Up @@ -124,7 +123,7 @@ def __init__(

# Internal usage only for transforming AmqpAnnotatedMessage to outgoing EventData
self._raw_amqp_message = AmqpAnnotatedMessage( # type: ignore
data_body=[body], annotations={}, application_properties={}
data_body=body, annotations={}, application_properties={}
)
self.message = (self._raw_amqp_message._message) # pylint:disable=protected-access
self._raw_amqp_message.header = AmqpMessageHeader()
Expand Down Expand Up @@ -243,13 +242,13 @@ def _encode_message(self):
def _decode_non_data_body_as_str(self, encoding="UTF-8"):
# type: (str) -> str
# pylint: disable=protected-access
body = self.raw_amqp_message._message._body
body = self.raw_amqp_message.body
if self.body_type == AmqpMessageBodyType.VALUE:
if not body.data:
if not body:
return ""
return str(decode_with_recurse(body.data, encoding))
return str(decode_with_recurse(body, encoding))

seq_list = [d for seq_section in body.data for d in seq_section]
seq_list = [d for seq_section in body for d in seq_section]
return str(decode_with_recurse(seq_list, encoding))

def _to_outgoing_message(self):
Expand Down Expand Up @@ -306,7 +305,7 @@ def partition_key(self):
:rtype: bytes
"""
try:
return self._raw_amqp_message.annotations[PROP_PARTITION_KEY_AMQP_SYMBOL]
return self._raw_amqp_message.annotations[PROP_PARTITION_KEY]
except KeyError:
return self._raw_amqp_message.annotations.get(PROP_PARTITION_KEY, None)

Expand Down Expand Up @@ -543,9 +542,7 @@ def __init__(self, max_size_in_bytes=None, partition_id=None, partition_key=None
self.message = BatchMessage(data=[])
self._partition_id = partition_id
self._partition_key = partition_key
# TODO: test whether we need to set partition key of a batch message, or setting each single message if enough
# this is performance related
#set_message_partition_key(self.message, self._partition_key)
self.message = set_message_partition_key(self.message, self._partition_key)
self._size = pyutils.get_message_encoded_size(self.message)
self._count = 0

Expand Down Expand Up @@ -616,7 +613,7 @@ def add(self, event_data):
"The partition key of event_data does not match the partition key of this batch."
)
if not outgoing_event_data.partition_key:
set_message_partition_key(
outgoing_event_data.message = set_message_partition_key(
outgoing_event_data.message, self._partition_key
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from typing import TYPE_CHECKING

from .pyamqp._connection import Connection, _CLOSING_STATES
from ._pyamqp._connection import Connection, _CLOSING_STATES
from ._constants import TransportType

if TYPE_CHECKING:
Expand Down
14 changes: 10 additions & 4 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@
from __future__ import unicode_literals

from enum import Enum
from .pyamqp import types


PROP_SEQ_NUMBER = b"x-opt-sequence-number"
PROP_OFFSET = b"x-opt-offset"
PROP_PARTITION_KEY = b"x-opt-partition-key"
PROP_PARTITION_KEY_AMQP_SYMBOL = PROP_PARTITION_KEY # TODO: FIND REPLACEMENT - types.AMQPSymbol(PROP_PARTITION_KEY)
PROP_TIMESTAMP = b"x-opt-enqueued-time"
PROP_LAST_ENQUEUED_SEQUENCE_NUMBER = b"last_enqueued_sequence_number"
PROP_LAST_ENQUEUED_OFFSET = b"last_enqueued_offset"
Expand Down Expand Up @@ -47,13 +45,21 @@
MGMT_STATUS_DESC = b'status-description'
USER_AGENT_PREFIX = "azsdk-python-eventhubs"

NO_RETRY_ERRORS = (
NO_RETRY_ERRORS = [
b"com.microsoft:argument-out-of-range",
b"com.microsoft:entity-disabled",
b"com.microsoft:auth-failed",
b"com.microsoft:precondition-failed",
b"com.microsoft:argument-error",
)
]

CUSTOM_CONDITION_BACKOFF = {
b"com.microsoft:server-busy": 4,
b"com.microsoft:timeout": 2,
b"com.microsoft:operation-cancelled": 0,
b"com.microsoft:container-close": 4
}


## all below - previously uamqp
class TransportType(Enum):
Expand Down
Loading

0 comments on commit dd96d59

Please sign in to comment.