diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py index ad8bc7c7b069..76b8214cb998 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py @@ -425,6 +425,14 @@ def _do_retryable_operation( self._container_id, last_exception, ) + if isinstance(last_exception, OperationTimeoutError): + description = "If trying to receive from NEXT_AVAILABLE_SESSION, "\ + "use max_wait_time on the ServiceBusReceiver to control the"\ + " timeout." + error = OperationTimeoutError( + message=description, + ) + raise error from last_exception raise last_exception from None self._backoff( retried_times=retried_times, @@ -461,6 +469,15 @@ def _backoff( entity_name, last_exception, ) + if isinstance(last_exception, OperationTimeoutError): + description = "If trying to receive from NEXT_AVAILABLE_SESSION, "\ + "use max_wait_time on the ServiceBusReceiver to control the"\ + " timeout." + error = OperationTimeoutError( + message=description, + ) + + raise error from last_exception raise last_exception def _mgmt_request_response( diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py index 204895a17475..752af614e481 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py @@ -61,6 +61,7 @@ CONSUMER_IDENTIFIER = VENDOR + b":receiver-name" UAMQP_LIBRARY = "uamqp" PYAMQP_LIBRARY = "pyamqp" +OPERATION_TIMEOUT = VENDOR + b":timeout" MANAGEMENT_PATH_SUFFIX = "/$management" diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py index 7306f002b61f..262845606dab 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py @@ -356,10 +356,12 @@ def get_queue_receiver( will be immediately removed from the queue, and cannot be subsequently rejected or re-received if the client fails to process the message. The default receive_mode is PEEK_LOCK. :paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str] - :keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the - receiver will automatically stop receiving. The default value is None, meaning no timeout. If connection - errors are occurring due to write timing out, the connection timeout value may need to be adjusted. See - the `socket_timeout` optional parameter for more details. + :keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent + messages to arrive. If no messages arrive, and no timeout is specified, this call will not return + until the connection is closed. The default value is None, meaning no timeout. On a sessionful + queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting. + If connection errors are occurring due to write timing out,the connection timeout + value may need to be adjusted. See the `socket_timeout` optional parameter for more details. :keyword Optional[~azure.servicebus.AutoLockRenewer] auto_lock_renewer: An ~azure.servicebus.AutoLockRenewer can be provided such that messages are automatically registered on receipt. If the receiver is a session receiver, it will apply to the session instead. @@ -546,10 +548,12 @@ def get_subscription_receiver( will be immediately removed from the subscription, and cannot be subsequently rejected or re-received if the client fails to process the message. The default receive_mode is PEEK_LOCK. :paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str] - :keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the - receiver will automatically stop receiving. The default value is None, meaning no timeout. If connection - errors are occurring due to write timing out, the connection timeout value may need to be adjusted. See - the `socket_timeout` optional parameter for more details. + :keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent + messages to arrive. If no messages arrive, and no timeout is specified, this call will not return + until the connection is closed. The default value is None, meaning no timeout. On a sessionful + queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting. + If connection errors are occurring due to write timing out,the connection timeout + value may need to be adjusted. See the `socket_timeout` optional parameter for more details. :keyword Optional[~azure.servicebus.AutoLockRenewer] auto_lock_renewer: An ~azure.servicebus.AutoLockRenewer can be provided such that messages are automatically registered on receipt. If the receiver is a session receiver, it will apply to the session instead. diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py index a3635c007aa3..39d8661ad612 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py @@ -105,8 +105,12 @@ class ServiceBusReceiver( the client connects to. :keyword str subscription_name: The path of specific Service Bus Subscription under the specified Topic the client connects to. - :keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the - receiver will automatically stop receiving. The default value is None, meaning no timeout. + :keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent + messages to arrive. If no messages arrive, and no timeout is specified, this call will not return + until the connection is closed. The default value is None, meaning no timeout. On a sessionful + queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting + to a session. If connection errors are occurring due to write timing out,the connection timeout + value may need to be adjusted. See the `socket_timeout` optional parameter for more details. :keyword receive_mode: The mode with which messages will be retrieved from the entity. The two options are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given lock period before they will be removed from the queue. Messages received with RECEIVE_AND_DELETE @@ -296,8 +300,12 @@ def _from_connection_string( if the client fails to process the message. The default mode is PEEK_LOCK. :paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str] - :keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the - receiver will automatically stop receiving. The default value is None, meaning no timeout. + :keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent + messages to arrive. If no messages arrive, and no timeout is specified, this call will not return + until the connection is closed. The default value is None, meaning no timeout. On a sessionful + queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting + to a session. If connection errors are occurring due to write timing out,the connection timeout + value may need to be adjusted. See the `socket_timeout` optional parameter for more details. :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. :keyword transport_type: The type of transport protocol that will be used for communicating with the Service Bus service. Default is `TransportType.Amqp`. @@ -668,8 +676,10 @@ def receive_messages( Setting to None will fully depend on the prefetch config. The default value is 1. :param Optional[float] max_wait_time: Maximum time to wait in seconds for the first message to arrive. If no messages arrive, and no timeout is specified, this call will not return - until the connection is closed. If specified, an no messages arrive within the - timeout period, an empty list will be returned. + until the connection is closed. If specified, and no messages arrive within the + timeout period, an empty list will be returned. NOTE: Setting max_wait_time on receive_messages + when NEXT_AVAILABLE_SESSION is specified will not impact the timeout for connecting to a session. + Please use max_wait_time on the constructor to set the timeout for connecting to a session. :return: A list of messages received. If no messages are available, this will be an empty list. :rtype: List[~azure.servicebus.ServiceBusReceivedMessage] diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_pyamqp_transport.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_pyamqp_transport.py index a78ca613c6e5..0231b54467e4 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_pyamqp_transport.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_pyamqp_transport.py @@ -5,6 +5,8 @@ # pylint: disable=too-many-lines import functools import time +import math +import random import datetime from datetime import timezone from typing import Optional, Tuple, cast, List, TYPE_CHECKING, Any, Callable, Dict, Union, Iterator, Type @@ -66,6 +68,8 @@ ERROR_CODE_ENTITY_ALREADY_EXISTS, ERROR_CODE_PRECONDITION_FAILED, ServiceBusReceiveMode, + OPERATION_TIMEOUT, + NEXT_AVAILABLE_SESSION, ) from ..exceptions import ( @@ -576,6 +580,28 @@ def create_receive_client( config = receiver._config # pylint: disable=protected-access source = kwargs.pop("source") receive_mode = kwargs.pop("receive_mode") + link_properties = kwargs.pop("link_properties") + + # When NEXT_AVAILABLE_SESSION is set, the default time to wait to connect to a session is 65 seconds. + # If there are no messages in the topic/queue the client will wait for 65 seconds for an AttachFrame + # frame from the service before raising an OperationTimeoutError due to failure to connect. + # max_wait_time, if specified, will allow the user to wait for fewer or more than 65 seconds to + # connect to a session. + if receiver._session_id == NEXT_AVAILABLE_SESSION and receiver._max_wait_time: # pylint: disable=protected-access + timeout_in_ms = receiver._max_wait_time * 1000 # pylint: disable=protected-access + open_receive_link_base_jitter_in_ms = 100 + open_recieve_link_buffer_in_ms = 20 + open_receive_link_buffer_threshold_in_ms = 1000 + jitter_base_in_ms = min(timeout_in_ms * 0.01, open_receive_link_base_jitter_in_ms) + timeout_in_ms = math.floor(timeout_in_ms - jitter_base_in_ms * random.random()) + if timeout_in_ms >= open_receive_link_buffer_threshold_in_ms: + timeout_in_ms -= open_recieve_link_buffer_in_ms + + # If we have specified a client-side timeout, assure that it is encoded as an uint + link_properties[OPERATION_TIMEOUT] = amqp_uint_value(timeout_in_ms) + + kwargs["link_properties"] = link_properties + return ReceiveClient( config.hostname, source, diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py index b22328b04f24..727de125cd28 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py @@ -274,6 +274,14 @@ async def _do_retryable_operation( self._container_id, last_exception, ) + if isinstance(last_exception, OperationTimeoutError): + description = "If trying to receive from NEXT_AVAILABLE_SESSION, "\ + "use max_wait_time on the ServiceBusReceiver to control the"\ + " timeout." + error = OperationTimeoutError( + message=description, + ) + raise error from last_exception raise last_exception from None await self._backoff( retried_times=retried_times, @@ -306,6 +314,14 @@ async def _backoff( entity_name, last_exception, ) + if isinstance(last_exception, OperationTimeoutError): + description = "If trying to receive from NEXT_AVAILABLE_SESSION, "\ + "use max_wait_time on the ServiceBusReceiver to control the"\ + " timeout." + error = OperationTimeoutError( + message=description, + ) + raise error from last_exception raise last_exception async def _mgmt_request_response( diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py index 32fb1576eb66..d55b37e22ade 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py @@ -341,10 +341,12 @@ def get_queue_receiver( will be immediately removed from the queue, and cannot be subsequently rejected or re-received if the client fails to process the message. The default mode is PEEK_LOCK. :paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str] - :keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the - receiver will automatically stop receiving. The default value is None, meaning no timeout. If connection - errors are occurring due to write timing out, the connection timeout value may need to be adjusted. See - the `socket_timeout` optional parameter for more details. + :keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent + messages to arrive. If no messages arrive, and no timeout is specified, this call will not return + until the connection is closed. The default value is None, meaning no timeout. On a sessionful + queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting. + If connection errors are occurring due to write timing out,the connection timeout + value may need to be adjusted. See the `socket_timeout` optional parameter for more details. :keyword Optional[~azure.servicebus.aio.AutoLockRenewer] auto_lock_renewer: An ~azure.servicebus.aio.AutoLockRenewer can be provided such that messages are automatically registered on receipt. If the receiver is a session receiver, it will apply to the session instead. @@ -520,10 +522,12 @@ def get_subscription_receiver( will be immediately removed from the subscription, and cannot be subsequently rejected or re-received if the client fails to process the message. The default mode is PEEK_LOCK. :paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str] - :keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the - receiver will automatically stop receiving. The default value is None, meaning no timeout. If connection - errors are occurring due to write timing out, the connection timeout value may need to be adjusted. See - the `socket_timeout` optional parameter for more details. + :keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent + messages to arrive. If no messages arrive, and no timeout is specified, this call will not return + until the connection is closed. The default value is None, meaning no timeout. On a sessionful + queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting. + If connection errors are occurring due to write timing out,the connection timeout + value may need to be adjusted. See the `socket_timeout` optional parameter for more details. :keyword Optional[~azure.servicebus.aio.AutoLockRenewer] auto_lock_renewer: An ~azure.servicebus.aio.AutoLockRenewer can be provided such that messages are automatically registered on receipt. If the receiver is a session receiver, it will apply to the session instead. diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py index 74311b4ccf32..74b8690bc9c1 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py @@ -117,8 +117,12 @@ class ServiceBusReceiver(AsyncIterator, BaseHandler, ReceiverMixin): if the client fails to process the message. The default mode is PEEK_LOCK. :paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str] - :keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the receiver - will automatically stop receiving. The default value is None, meaning no timeout. + :keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent + messages to arrive. If no messages arrive, and no timeout is specified, this call will not return + until the connection is closed. The default value is None, meaning no timeout. On a sessionful + queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting + to a session. If connection errors are occurring due to write timing out,the connection timeout + value may need to be adjusted. See the `socket_timeout` optional parameter for more details. :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. :keyword transport_type: The type of transport protocol that will be used for communicating with the Service Bus service. Default is `TransportType.Amqp`. @@ -294,8 +298,12 @@ def _from_connection_string( if the client fails to process the message. The default mode is PEEK_LOCK. :paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str] - :keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the - receiver will automatically stop receiving. The default value is None, meaning no timeout. + :keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent + messages to arrive. If no messages arrive, and no timeout is specified, this call will not return + until the connection is closed. The default value is None, meaning no timeout. On a sessionful + queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting + to a session. If connection errors are occurring due to write timing out,the connection timeout + value may need to be adjusted. See the `socket_timeout` optional parameter for more details. :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. :keyword transport_type: The type of transport protocol that will be used for communicating with the Service Bus service. Default is `TransportType.Amqp`. @@ -649,7 +657,9 @@ async def receive_messages( :param Optional[float] max_wait_time: Maximum time to wait in seconds for the first message to arrive. If no messages arrive, and no timeout is specified, this call will not return until the connection is closed. If specified, and no messages arrive within the - timeout period, an empty list will be returned. + timeout period, an empty list will be returned. NOTE: Setting max_wait_time on receive_messages + when NEXT_AVAILABLE_SESSION is specified will not impact the timeout for connecting to a session. + Please use max_wait_time on the constructor to set the timeout for connecting to a session. :return: A list of messages received. If no messages are available, this will be an empty list. :rtype: list[~azure.servicebus.aio.ServiceBusReceivedMessage] diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_transport/_pyamqp_transport_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_transport/_pyamqp_transport_async.py index fac721d810b8..eb5c9cfddb20 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_transport/_pyamqp_transport_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_transport/_pyamqp_transport_async.py @@ -7,10 +7,12 @@ import functools from typing import TYPE_CHECKING, Optional, Any, Callable, Union, AsyncIterator, cast import time +import math +import random from ..._pyamqp import constants from ..._pyamqp.message import BatchMessage -from ..._pyamqp.utils import amqp_string_value +from ..._pyamqp.utils import amqp_string_value, amqp_uint_value from ..._pyamqp.aio import SendClientAsync, ReceiveClientAsync from ..._pyamqp.aio._authentication_async import JWTTokenAuthAsync from ..._pyamqp.aio._connection_async import Connection as ConnectionAsync @@ -35,6 +37,8 @@ MESSAGE_DEFER, MESSAGE_DEAD_LETTER, ServiceBusReceiveMode, + OPERATION_TIMEOUT, + NEXT_AVAILABLE_SESSION, ) from ..._transport._pyamqp_transport import PyamqpTransport from ...exceptions import ( @@ -179,6 +183,27 @@ def create_receive_client_async( config = receiver._config # pylint: disable=protected-access source = kwargs.pop("source") receive_mode = kwargs.pop("receive_mode") + link_properties = kwargs.pop("link_properties") + + # When NEXT_AVAILABLE_SESSION is set, the default time to wait to connect to a session is 65 seconds. + # If there are no messages in the topic/queue the client will wait for 65 seconds for an AttachFrame + # frame from the service before raising an OperationTimeoutError due to failure to connect. + # max_wait_time, if specified, will allow the user to wait for fewer or more than 65 seconds to + # connect to a session. + if receiver._session_id == NEXT_AVAILABLE_SESSION and receiver._max_wait_time: # pylint: disable=protected-access + timeout_in_ms = receiver._max_wait_time * 1000 # pylint: disable=protected-access + open_receive_link_base_jitter_in_ms = 100 + open_recieve_link_buffer_in_ms = 20 + open_receive_link_buffer_threshold_in_ms = 1000 + jitter_base_in_ms = min(timeout_in_ms * 0.01, open_receive_link_base_jitter_in_ms) + timeout_in_ms = math.floor(timeout_in_ms - jitter_base_in_ms * random.random()) + if timeout_in_ms >= open_receive_link_buffer_threshold_in_ms: + timeout_in_ms -= open_recieve_link_buffer_in_ms + + # If we have specified a client-side timeout, assure that it is encoded as an uint + link_properties[OPERATION_TIMEOUT] = amqp_uint_value(timeout_in_ms) + + kwargs["link_properties"] = link_properties return ReceiveClientAsync( config.hostname, diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py index 98f33b42d537..c26ea49b082c 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py @@ -1170,4 +1170,40 @@ async def test_async_session_non_session_send_to_session_queue_should_fail(self, async with sb_client.get_queue_sender(servicebus_queue.name) as sender: with pytest.raises(ServiceBusError): message = ServiceBusMessage("Handler message") - await sender.send_messages(message) \ No newline at end of file + await sender.send_messages(message) + + @pytest.mark.asyncio + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedServiceBusResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @ServiceBusQueuePreparer(name_prefix='servicebustest', requires_session=True) + @pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids) + @ArgPasserAsync() + async def test_async_next_available_session_timeout_value(self, uamqp_transport, *, servicebus_namespace_connection_string=None, servicebus_queue=None, **kwargs): + if uamqp_transport: + pytest.skip("This test is for pyamqp only") + async with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, logging_enable=False, uamqp_transport=uamqp_transport) as sb_client: + + receiver = sb_client.get_queue_receiver(servicebus_queue.name, session_id=NEXT_AVAILABLE_SESSION, max_wait_time=10) + start_time = time.time() + with pytest.raises(OperationTimeoutError): + await receiver.receive_messages(max_wait_time=5) + assert time.time() - start_time < 65 # Default service timeout value is 65 seconds + start_time2 = time.time() + with pytest.raises(OperationTimeoutError): + async for msg in receiver: + pass + assert time.time() - start_time2 < 65 # Default service timeout value is 65 seconds + + receiver = sb_client.get_queue_receiver(servicebus_queue.name, session_id=NEXT_AVAILABLE_SESSION, max_wait_time=70) + start_time = time.time() + with pytest.raises(OperationTimeoutError): + await receiver.receive_messages(max_wait_time=5) + assert time.time() - start_time > 65 # Default service timeout value is 65 seconds + start_time2 = time.time() + with pytest.raises(OperationTimeoutError): + async for msg in receiver: + pass + assert time.time() - start_time2 > 65 # Default service timeout value is 65 seconds \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/tests/test_sessions.py b/sdk/servicebus/azure-servicebus/tests/test_sessions.py index 494e85ec2b3e..c9febcb66e87 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_sessions.py +++ b/sdk/servicebus/azure-servicebus/tests/test_sessions.py @@ -1327,3 +1327,43 @@ def test_session_id_str_bytes(self, uamqp_transport, *, servicebus_namespace_con messages = receiver.receive_messages(max_wait_time=10) assert len(messages) == 1 assert messages[0].session_id == session_id + + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedServiceBusResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @ServiceBusQueuePreparer(name_prefix='servicebustest', requires_session=True) + @pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids) + @ArgPasser() + def test_next_available_session_timeout_value(self, uamqp_transport, *, servicebus_namespace_connection_string=None, servicebus_queue=None, **kwargs): + if uamqp_transport: + pytest.skip("This test is for pyamqp only") + with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, logging_enable=False, uamqp_transport=uamqp_transport, retry_total=1) as sb_client: + + receiver = sb_client.get_queue_receiver(servicebus_queue.name, session_id=NEXT_AVAILABLE_SESSION, max_wait_time=10,) + + start_time = time.time() + with pytest.raises(OperationTimeoutError): + receiver.receive_messages(max_wait_time=5) + assert time.time() - start_time < 65 # Default service operation timeout is 65 seconds + start_time2 = time.time() + with pytest.raises(OperationTimeoutError): + for msg in receiver: + pass + + assert time.time() - start_time2 < 65 # Default service operation timeout is 65 seconds + + receiver = sb_client.get_queue_receiver(servicebus_queue.name, session_id=NEXT_AVAILABLE_SESSION, max_wait_time=70) + + start_time = time.time() + with pytest.raises(OperationTimeoutError): + receiver.receive_messages(max_wait_time=5) + assert time.time() - start_time > 65 # Default service operation timeout is 65 seconds + start_time2 = time.time() + with pytest.raises(OperationTimeoutError): + for msg in receiver: + pass + + assert time.time() - start_time2 > 65 # Default service operation timeout is 65 seconds \ No newline at end of file