diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py index 70a291eab89b..e11cc6830243 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py @@ -18,11 +18,15 @@ except ImportError: from urllib.parse import urlparse, quote_plus + +from .pyamqp.client import AMQPClient as PyAMQPClient +from .pyamqp.authentication import _generate_sas_token as Py_generate_sas_token +from .pyamqp.message import Message as PyMessage, Properties as PyMessageProperties from uamqp import authentication from .pyamqp import constants, error as errors, utils from .pyamqp.authentication import JWTTokenAuth as PyJWTTokenAuth -from .pyamqp.client import AMQPClient -from .pyamqp.message import Message + + import six from azure.core.credentials import AccessToken, AzureSasCredential, AzureNamedKeyCredential from azure.core.utils import parse_connection_string as core_parse_connection_string @@ -175,7 +179,9 @@ def get_token(self, *scopes, **kwargs): # pylint:disable=unused-argument # type: (str, Any) -> _AccessToken if not scopes: raise ValueError("No token scope provided.") - return _generate_sas_token(scopes[0], self.policy, self.key) + + return Py_generate_sas_token(scopes[0], self.policy, self.key) + class EventhubAzureNamedKeyTokenCredential(object): """The named key credential used for authentication. @@ -344,7 +350,7 @@ def _management_request(self, mgmt_msg, op_type): last_exception = None while retried_times <= self._config.max_retries: mgmt_auth = self._create_auth() - mgmt_client = AMQPClient( + mgmt_client = PyAMQPClient( self._address.hostname, auth=mgmt_auth, debug=self._config.network_tracing ) try: @@ -446,7 +452,7 @@ def _get_partition_ids(self): def _get_partition_properties(self, partition_id): # type:(str) -> Dict[str, Any] - mgmt_msg = Message( + mgmt_msg = PyMessage( application_properties={ "name": self.eventhub_name, "partition": partition_id, @@ -507,15 +513,15 @@ def _open(self): auth = self._client._create_auth() self._create_handler(auth) self._handler.open( - connection=self._client._conn_manager.get_connection( - self._client._address.hostname, auth - ) # pylint: disable=protected-access + # connection=self._client._conn_manager.get_connection( + # self._client._address.hostname, auth + # ) # pylint: disable=protected-access ) while not self._handler.client_ready(): time.sleep(0.05) self._max_message_size_on_link = ( - self._handler.message_handler._link.peer_max_message_size - or MAX_MESSAGE_LENGTH_BYTES + self._handler._link.remote_max_message_size + or constants.MAX_FRAME_SIZE_BYTES ) # pylint: disable=protected-access self.running = True diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py index 09c3c2e9c258..751b085d0c5f 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_common.py @@ -17,12 +17,8 @@ TYPE_CHECKING, cast, ) - import six -from .pyamqp import constants, _encode as encode -from .pyamqp.message import BatchMessage, Message - from ._utils import ( set_message_partition_key, trace_message, @@ -57,6 +53,9 @@ AmqpMessageProperties, ) +from .pyamqp import constants, utils as pyutils +from .pyamqp.message import BatchMessage, Message + if TYPE_CHECKING: import datetime @@ -108,8 +107,8 @@ def __init__(self, body=None): # Internal usage only for transforming AmqpAnnotatedMessage to outgoing EventData self._raw_amqp_message = AmqpAnnotatedMessage( # type: ignore - data_body=body, annotations={}, application_properties={} - ) + data_body=[body], annotations={}, application_properties={} + ) self.message = (self._raw_amqp_message._message) # pylint:disable=protected-access self._raw_amqp_message.header = AmqpMessageHeader() self._raw_amqp_message.properties = AmqpMessageProperties() @@ -483,13 +482,14 @@ def __init__(self, max_size_in_bytes=None, partition_id=None, partition_key=None "partition_key to only be string type, they might fail to parse the non-string value." ) - self.max_size_in_bytes = max_size_in_bytes #TODO: FIND REPLACEMENT - or constants.MAX_MESSAGE_LENGTH_BYTES - self.message = BatchMessage(data=[], multi_messages=False, properties=None) + self.max_size_in_bytes = max_size_in_bytes or constants.MAX_FRAME_SIZE_BYTES + self.message = BatchMessage(data=[]) self._partition_id = partition_id self._partition_key = partition_key - - set_message_partition_key(self.message, self._partition_key) - self._size = len(encode.encode_payload(b"", self.message)) + # TODO: test whether we need to set partition key of a batch message, or setting each single message if enough + # this is performance related + #set_message_partition_key(self.message, self._partition_key) + self._size = pyutils.get_message_encoded_size(self.message) self._count = 0 def __repr__(self): @@ -562,8 +562,7 @@ def add(self, event_data): ) trace_message(outgoing_event_data) - event_data_size = outgoing_event_data.message.get_message_encoded_size() - + event_data_size = pyutils.get_message_encoded_size(outgoing_event_data.message) # For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that # message into the BatchMessage would be 5 bytes, if >= 256, it would be 8 bytes. size_after_add = ( @@ -579,7 +578,7 @@ def add(self, event_data): ) ) - self.message._body_gen.append(outgoing_event_data) # pylint: disable=protected-access + pyutils.add_batch(self.message, outgoing_event_data.message) self._size = size_after_add self._count += 1 diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer.py index 75498fc0bf37..427bf2b26c34 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer.py @@ -34,6 +34,7 @@ transform_outbound_single_message, ) from ._constants import TIMEOUT_SYMBOL +from .pyamqp import SendClient as PySendClient _LOGGER = logging.getLogger(__name__) @@ -120,17 +121,12 @@ def __init__(self, client, target, **kwargs): def _create_handler(self, auth): # type: (JWTTokenAuth) -> None - self._handler = SendClient( + self._handler = PySendClient( + self._client._address.hostname, self._target, auth=auth, - debug=self._client._config.network_tracing, # pylint:disable=protected-access - msg_timeout=self._timeout * 1000, - idle_timeout=self._idle_timeout, - error_policy=self._retry_policy, - keep_alive_interval=self._keep_alive, - client_name=self._name, - link_properties=self._link_properties, - properties=create_properties(self._client._config.user_agent), # pylint: disable=protected-access + idle_timeout=10, + network_trace=self._client._config.network_tracing ) def _open_with_retry(self): @@ -156,14 +152,14 @@ def _send_event_data(self, timeout_time=None, last_exception=None): if self._unsent_events: self._open() self._set_msg_timeout(timeout_time, last_exception) - self._handler.queue_message(*self._unsent_events) # type: ignore - self._handler.wait() # type: ignore - self._unsent_events = self._handler.pending_messages # type: ignore - if self._outcome != constants.MessageSendResult.Ok: - if self._outcome == constants.MessageSendResult.Timeout: - self._condition = OperationTimeoutError("Send operation timed out") - if self._condition: - raise self._condition + self._handler.send_message(self._unsent_events[0]) + self._unsent_events = None + # self._unsent_events = self._handler.pending_messages # type: ignore + # if self._outcome != constants.MessageSendResult.Ok: + # if self._outcome == constants.MessageSendResult.Timeout: + # self._condition = OperationTimeoutError("Send operation timed out") + # if self._condition: + # raise self._condition def _send_event_data_with_retry(self, timeout=None): # type: (Optional[float]) -> None @@ -205,7 +201,8 @@ def _wrap_eventdata( raise ValueError( "The partition_key does not match the one of the EventDataBatch" ) - for event in event_data.message._body_gen: # pylint: disable=protected-access + + for event in event_data.message.data: # pylint: disable=protected-access trace_message(event, span) wrapper_event_data = event_data # type:ignore else: diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py index 39f44773202d..4854b93f6fc2 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py @@ -120,7 +120,7 @@ def _get_max_mesage_size(self): self._max_message_size_on_link = ( self._producers[ # type: ignore ALL_PARTITIONS - ]._handler.message_handler._link.peer_max_message_size + ]._handler._link.remote_max_message_size or constants.MAX_MESSAGE_LENGTH_BYTES ) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/amqp/_amqp_message.py b/sdk/eventhub/azure-eventhub/azure/eventhub/amqp/_amqp_message.py index f27e6c3354b6..893a910613f3 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/amqp/_amqp_message.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/amqp/_amqp_message.py @@ -9,6 +9,7 @@ import uamqp from ._constants import AMQP_MESSAGE_BODY_TYPE_MAP, AmqpMessageBodyType +from ..pyamqp.message import Message as PyMessage, Header as PyHeader, Properties as PyProperties class DictMixin(object): @@ -136,7 +137,7 @@ def __init__(self, **kwargs): self._body = kwargs.get("value_body") self._body_type = uamqp.MessageBodyType.Value - self._message = uamqp.message.Message(body=self._body, body_type=self._body_type) + #self._message = uamqp.message.Message(body=self._body, body_type=self._body_type) header_dict = cast(Mapping, kwargs.get("header")) self._header = AmqpMessageHeader(**header_dict) if "header" in kwargs else None self._footer = kwargs.get("footer") @@ -214,17 +215,18 @@ def _from_amqp_message(self, message): def _to_outgoing_amqp_message(self): message_header = None - if self.header: - message_header = uamqp.message.MessageHeader() - message_header.delivery_count = self.header.delivery_count - message_header.time_to_live = self.header.time_to_live - message_header.first_acquirer = self.header.first_acquirer - message_header.durable = self.header.durable - message_header.priority = self.header.priority + if self.header and any(self.header.values()): + message_header = PyHeader( + delivery_count=self.header.delivery_count, + ttl=self.header.time_to_live, + first_acquirer=self.header.first_acquirer, + durable=self.header.durable, + priority=self.header.priority + ) message_properties = None - if self.properties: - message_properties = uamqp.message.MessageProperties( + if self.properties and any(self.properties.values()): + message_properties = PyProperties( message_id=self.properties.message_id, user_id=self.properties.user_id, to=self.properties.to, @@ -238,33 +240,43 @@ def _to_outgoing_amqp_message(self): if self.properties.absolute_expiry_time else None, group_id=self.properties.group_id, group_sequence=self.properties.group_sequence, - reply_to_group_id=self.properties.reply_to_group_id, - encoding=self._encoding + reply_to_group_id=self.properties.reply_to_group_id ) - amqp_body = self._message._body # pylint: disable=protected-access - if isinstance(amqp_body, uamqp.message.DataBody): - amqp_body_type = uamqp.MessageBodyType.Data - amqp_body = list(amqp_body.data) - elif isinstance(amqp_body, uamqp.message.SequenceBody): - amqp_body_type = uamqp.MessageBodyType.Sequence - amqp_body = list(amqp_body.data) - else: - # amqp_body is type of uamqp.message.ValueBody - amqp_body_type = uamqp.MessageBodyType.Value - amqp_body = amqp_body.data - - return uamqp.message.Message( - body=amqp_body, - body_type=amqp_body_type, + # TODO: let's only support data body for prototyping + return PyMessage( + data=self._body, header=message_header, properties=message_properties, application_properties=self.application_properties, - annotations=self.annotations, + message_annotations=self.annotations, delivery_annotations=self.delivery_annotations, footer=self.footer ) + # amqp_body = self._message._body # pylint: disable=protected-access + # if isinstance(amqp_body, uamqp.message.DataBody): + # amqp_body_type = uamqp.MessageBodyType.Data + # amqp_body = list(amqp_body.data) + # elif isinstance(amqp_body, uamqp.message.SequenceBody): + # amqp_body_type = uamqp.MessageBodyType.Sequence + # amqp_body = list(amqp_body.data) + # else: + # # amqp_body is type of uamqp.message.ValueBody + # amqp_body_type = uamqp.MessageBodyType.Value + # amqp_body = amqp_body.data + # + # return uamqp.message.Message( + # body=amqp_body, + # body_type=amqp_body_type, + # header=message_header, + # properties=message_properties, + # application_properties=self.application_properties, + # annotations=self.annotations, + # delivery_annotations=self.delivery_annotations, + # footer=self.footer + # ) + @property def body(self): # type: () -> Any diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/pyamqp/client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/pyamqp/client.py index 8792ab2d63d9..23f3ec052e99 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/pyamqp/client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/pyamqp/client.py @@ -22,7 +22,6 @@ from .receiver import ReceiverLink from .sasl import SASLTransport from .endpoints import Source, Target -from .error import AMQPConnectionError from .constants import ( MessageDeliveryState, @@ -33,6 +32,9 @@ SEND_DISPOSITION_ACCEPT, SEND_DISPOSITION_REJECT, AUTH_TYPE_CBS, + MAX_FRAME_SIZE_BYTES, + INCOMING_WINDOW, + OUTGOING_WIDNOW, DEFAULT_AUTH_TIMEOUT, MESSAGE_DELIVERY_DONE_STATES ) @@ -43,7 +45,6 @@ _logger = logging.getLogger(__name__) -_MAX_FRAME_SIZE_BYTES = 64 * 1024 class AMQPClient(object): @@ -129,15 +130,15 @@ def __init__(self, hostname, auth=None, **kwargs): self._mgmt_links = {} # Connection settings - self._max_frame_size = kwargs.pop('max_frame_size', None) or _MAX_FRAME_SIZE_BYTES + self._max_frame_size = kwargs.pop('max_frame_size', None) or MAX_FRAME_SIZE_BYTES self._channel_max = kwargs.pop('channel_max', None) or 65535 self._idle_timeout = kwargs.pop('idle_timeout', None) self._properties = kwargs.pop('properties', None) self._network_trace = kwargs.pop("network_trace", False) # Session settings - self._outgoing_window = kwargs.pop('outgoing_window', None) or _MAX_FRAME_SIZE_BYTES - self._incoming_window = kwargs.pop('incoming_window', None) or _MAX_FRAME_SIZE_BYTES + self._outgoing_window = kwargs.pop('outgoing_window', None) or OUTGOING_WIDNOW + self._incoming_window = kwargs.pop('incoming_window', None) or INCOMING_WINDOW self._handle_max = kwargs.pop('handle_max', None) # Link settings @@ -303,10 +304,6 @@ def mgmt_request(self, message, operation=None, operation_type=None, node='$mana operation_type=operation_type, timeout=timeout ) - status_code_field = kwargs.pop('status_code_field', b'statusCode') - description_field = kwargs.pop('description_field', b'statusDescription') - response.application_properties[status_code_field] = status - response.application_properties[description_field] = description if parse_response: return parse_response(status, response, description) return response @@ -316,7 +313,7 @@ class SendClient(AMQPClient): def __init__(self, hostname, target, auth=None, **kwargs): self.target = target # Sender and Link settings - self._max_message_size = kwargs.pop('max_message_size', None) or _MAX_FRAME_SIZE_BYTES + self._max_message_size = kwargs.pop('max_message_size', None) or MAX_FRAME_SIZE_BYTES self._link_properties = kwargs.pop('link_properties', None) self._link_credit = kwargs.pop('link_credit', None) super(SendClient, self).__init__(hostname, auth=auth, **kwargs) @@ -503,7 +500,7 @@ def __init__(self, hostname, source, auth=None, **kwargs): self._message_received_callback = None # Sender and Link settings - self._max_message_size = kwargs.pop('max_message_size', None) or _MAX_FRAME_SIZE_BYTES + self._max_message_size = kwargs.pop('max_message_size', None) or MAX_FRAME_SIZE_BYTES self._link_properties = kwargs.pop('link_properties', None) self._link_credit = kwargs.pop('link_credit', None) super(ReceiveClient, self).__init__(hostname, auth=auth, **kwargs) @@ -562,6 +559,10 @@ def _message_received(self, message): self._message_received_callback(message) if not self._streaming_receive: self._received_messages.put(message) + # TODO: do we need settled property for a message? + #elif not message.settled: + # # Message was received with callback processing and wasn't settled. + # _logger.info("Message was not settled.") def receive_message_batch(self, max_batch_size=None, on_message_received=None, timeout=0): """Receive a batch of messages. Messages returned in the batch have already been diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/pyamqp/constants.py b/sdk/eventhub/azure-eventhub/azure/eventhub/pyamqp/constants.py index 3b72e1da506a..bb8df7b2b81a 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/pyamqp/constants.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/pyamqp/constants.py @@ -42,7 +42,7 @@ #: frame size can be set. By defining this value, the peers can guarantee that they can send frames of up to this #: size until they have agreed a definitive maximum frame size for that Connection. MIN_MAX_FRAME_SIZE = 512 -MAX_FRAME_SIZE_BYTES = 64 * 1024 +MAX_FRAME_SIZE_BYTES = 1024 * 1024 MAX_CHANNELS = 65535 INCOMING_WINDOW = 64 * 1024 OUTGOING_WIDNOW = 64 * 1024 diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/pyamqp/sender.py b/sdk/eventhub/azure-eventhub/azure/eventhub/pyamqp/sender.py index 9b3143d02955..f1eb7a228c73 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/pyamqp/sender.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/pyamqp/sender.py @@ -113,8 +113,6 @@ def _outgoing_transfer(self, delivery): self._outgoing_flow() def _incoming_disposition(self, frame): - if self.network_trace: - _LOGGER.info("<- %r", DispositionFrame(*frame), extra=self.network_trace_params) if not frame[3]: # settled return range_end = (frame[2] or frame[1]) + 1 # first or last diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/pyamqp/session.py b/sdk/eventhub/azure-eventhub/azure/eventhub/pyamqp/session.py index c45e60922924..7f4f1a60d6fc 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/pyamqp/session.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/pyamqp/session.py @@ -238,6 +238,8 @@ def _outgoing_disposition(self, frame): self._connection._process_outgoing_frame(self.channel, frame) def _incoming_disposition(self, frame): + if self.network_trace: + _LOGGER.info("<- %r", DispositionFrame(*frame), extra=self.network_trace_params) for link in self._input_handles.values(): link._incoming_disposition(frame) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/pyamqp/utils.py b/sdk/eventhub/azure-eventhub/azure/eventhub/pyamqp/utils.py index da4289f981bb..a3906b28e908 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/pyamqp/utils.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/pyamqp/utils.py @@ -89,3 +89,7 @@ def generate_sas_token(audience, policy, key, expiry=None): def add_batch(batch, message): # Add a message to a batch batch.data.append(encode_payload(b"", message)) + + +def get_message_encoded_size(message): + return len(encode_payload(b"", message)) diff --git a/sdk/eventhub/azure-eventhub/stress/pyamqp/sync_receive.py b/sdk/eventhub/azure-eventhub/stress/pyamqp/sync_receive.py new file mode 100644 index 000000000000..c6e448d0e27f --- /dev/null +++ b/sdk/eventhub/azure-eventhub/stress/pyamqp/sync_receive.py @@ -0,0 +1,66 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import os +import threading +import time + +from azure.eventhub import EventHubConsumerClient + + +CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR'] +EVENTHUB_NAME = os.environ['EVENT_HUB_NAME'] + + +def test_receive_fixed_time_interval(): + + consumer_client = EventHubConsumerClient.from_connection_string(CONNECTION_STR, consumer_group="$Default", eventhub_name=EVENTHUB_NAME) + + last_received_count = [0] + received_count = [0] + run_flag = [True] + all_perf_records = [] + check_interval = 5 + run_duration = 120 + + def on_event(partition_context, event): + received_count[0] += 1 + + def monitor(): + while run_flag[0]: + snap = received_count[0] + perf = (snap - last_received_count[0]) / check_interval + last_received_count[0] = snap + all_perf_records.append(perf) + time.sleep(check_interval) + + thread = threading.Thread( + target=consumer_client.receive, + kwargs={ + "on_event": on_event, + "partition_id": "0", + "starting_position": "-1", # "-1" is from the beginning of the partition. + } + ) + + monitor_thread = threading.Thread( + target=monitor + ) + + thread.daemon = True + monitor_thread.daemon = True + + thread.start() + monitor_thread.start() + time.sleep(run_duration) + consumer_client.close() + run_flag[0] = False + + valid_perf_records = all_perf_records[10:] # skip the first 10 records to let the receiving program be stable + avg_perf = sum(valid_perf_records) / len(valid_perf_records) + print("The average performance is {} events/s".format(avg_perf)) + + +test_receive_fixed_time_interval() diff --git a/sdk/eventhub/azure-eventhub/stress/pyamqp/sync_send.py b/sdk/eventhub/azure-eventhub/stress/pyamqp/sync_send.py new file mode 100644 index 000000000000..83a53db8394e --- /dev/null +++ b/sdk/eventhub/azure-eventhub/stress/pyamqp/sync_send.py @@ -0,0 +1,47 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import os +import time + +from azure.eventhub import EventHubProducerClient, EventData + +CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR'] +EVENTHUB_NAME = os.environ['EVENT_HUB_NAME'] + + +def test_send_small_message_fixed_amount(): + + client = EventHubProducerClient.from_connection_string(conn_str=CONNECTION_STR, eventhub_name=EVENTHUB_NAME) + + run_times = 5 + iter_count = 200 + batch_count = 100 + single_message_size = 1 + perf_records = [] + client.create_batch() # precall to retrieve sender link settings + + for _ in range(run_times): # run run_times and calculate the avg performance + start_time = time.time() + + for _ in range(iter_count): + event_data_batch = client.create_batch() + for j in range(batch_count): + ed = EventData(b"d" * single_message_size) + event_data_batch.add(ed) + client.send_batch(event_data_batch) + + end_time = time.time() + + total_amount = iter_count * batch_count + total_time = end_time - start_time + speed = total_amount / total_time + perf_records.append(speed) + + avg_perf = sum(perf_records) / len(perf_records) + print("The average performance is {} events/s".format(avg_perf)) + + +test_send_small_message_fixed_amount() \ No newline at end of file diff --git a/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/synctests/test_consumer.py b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/synctests/test_consumer.py new file mode 100644 index 000000000000..67d781c96b9b --- /dev/null +++ b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/synctests/test_consumer.py @@ -0,0 +1,43 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import threading +import time +import pytest + +from azure.eventhub import EventHubConsumerClient, EventHubProducerClient, EventData + + +@pytest.mark.liveTest +def test_receive_from_single_partition(live_eventhub): + producer_client = EventHubProducerClient.from_connection_string(live_eventhub["connection_str"]) + consumer_client = EventHubConsumerClient.from_connection_string(live_eventhub["connection_str"], consumer_group=live_eventhub["consumer_group"]) + + to_send_count = 10 + received_count = [0] + + def on_event(partition_context, event): + received_count[0] += 1 + + batch = producer_client.create_batch(partition_id="0") + for _ in range(to_send_count): + batch.add(EventData(b'data')) + + producer_client.send_batch(batch) + + thread = threading.Thread( + target=consumer_client.receive, + kwargs={ + "on_event": on_event, + "partition_id": "0", + "starting_position": "-1", # "-1" is from the beginning of the partition. + } + ) + thread.daemon = True + thread.start() + time.sleep(15) + consumer_client.close() + thread.join() + assert to_send_count == received_count[0] diff --git a/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/synctests/test_producer.py b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/synctests/test_producer.py new file mode 100644 index 000000000000..e0dfdb5c8fe4 --- /dev/null +++ b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/synctests/test_producer.py @@ -0,0 +1,43 @@ +import pytest + +from azure.eventhub import EventHubProducerClient, EventData + + +@pytest.mark.liveTest +def test_send_batch_event(live_eventhub): + client = EventHubProducerClient.from_connection_string(live_eventhub["connection_str"]) + batch = client.create_batch() + + # TODO: there is a bug that currently a batch message of size > 64*1024 could not be sent via a single transfer frame + # we probably need to support spliting a single message into multiple frames when the message is large (what uamqp library is doing) + # or check if tweaking link/session/connection frame settings allows us to send a large message + # while True: + # try: + # batch.add(EventData(b'test')) + # except ValueError: + # break + # client.send_batch(batch) + + for _ in range(100): + batch.add(EventData(b'test' * 60)) + client.send_batch(batch) + + +@pytest.mark.liveTest +def test_send_batch_event(live_eventhub): + client = EventHubProducerClient.from_connection_string(live_eventhub["connection_str"]) + batch = client.create_batch(partition_id="0") + + # TODO: there is a bug that currently a batch message of size > 64*1024 could not be sent via a single transfer frame + # we probably need to support spliting a single message into multiple frames when the message is large (what uamqp library is doing) + # or check if tweaking link/session/connection frame settings allows us to send a large message + # while True: + # try: + # batch.add(EventData(b'test')) + # except ValueError: + # break + # client.send_batch(batch) + + for _ in range(100): + batch.add(EventData(b'test' * 60)) + client.send_batch(batch) \ No newline at end of file