Skip to content

Commit

Permalink
[ServiceBus] set timeout on link properties (Azure#30832)
Browse files Browse the repository at this point in the history
* attempt to set timeout on link properties

* jitter math

* update receiver logic

* update encode

* move logic to pyamqp_transport

* update docstring

* add tests

* update if statement

* update if statements

* whitespace

* trailing whitespace

* move logic into pyamqp_transport

* pylint

* update kwargs

* Update sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py

Co-authored-by: Kashif Khan <361477+kashifkhan@users.noreply.github.com>

* update async docstring too

* update docstrings

* add back in jitter logic to align with other lang

* update client wording

* update other receiver docstring

* remove spacing

* pylint

* add comment

* update client

* fix pylint

* update max_wait_time docstring

* align iterator and receive_messages() behavior

* docstring updates

* asyn align iterator and receive_messages()

* add deprecation

* pylint fixes

* add timeout kwarg

* update docstring

* client docstring

* add to OperationTimeoutError logic

* remove timeout from mock

* nit

* nit

* revert doc

* update error msg

* nit

* base handler async handling

* doc

* pylint

* pylint

* conn str doc update

* remove ==True

Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com>

* add >65 to test

* add warning

* move note to docstring

---------

Co-authored-by: Kashif Khan <361477+kashifkhan@users.noreply.github.com>
Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com>
  • Loading branch information
3 people authored and sofiar-msft committed Feb 16, 2024
1 parent b8af7be commit ecb5c66
Show file tree
Hide file tree
Showing 11 changed files with 218 additions and 29 deletions.
17 changes: 17 additions & 0 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,14 @@ def _do_retryable_operation(
self._container_id,
last_exception,
)
if isinstance(last_exception, OperationTimeoutError):
description = "If trying to receive from NEXT_AVAILABLE_SESSION, "\
"use max_wait_time on the ServiceBusReceiver to control the"\
" timeout."
error = OperationTimeoutError(
message=description,
)
raise error from last_exception
raise last_exception from None
self._backoff(
retried_times=retried_times,
Expand Down Expand Up @@ -461,6 +469,15 @@ def _backoff(
entity_name,
last_exception,
)
if isinstance(last_exception, OperationTimeoutError):
description = "If trying to receive from NEXT_AVAILABLE_SESSION, "\
"use max_wait_time on the ServiceBusReceiver to control the"\
" timeout."
error = OperationTimeoutError(
message=description,
)

raise error from last_exception
raise last_exception

def _mgmt_request_response(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
CONSUMER_IDENTIFIER = VENDOR + b":receiver-name"
UAMQP_LIBRARY = "uamqp"
PYAMQP_LIBRARY = "pyamqp"
OPERATION_TIMEOUT = VENDOR + b":timeout"

MANAGEMENT_PATH_SUFFIX = "/$management"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,12 @@ def get_queue_receiver(
will be immediately removed from the queue, and cannot be subsequently rejected or re-received if
the client fails to process the message. The default receive_mode is PEEK_LOCK.
:paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str]
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
receiver will automatically stop receiving. The default value is None, meaning no timeout. If connection
errors are occurring due to write timing out, the connection timeout value may need to be adjusted. See
the `socket_timeout` optional parameter for more details.
:keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent
messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
until the connection is closed. The default value is None, meaning no timeout. On a sessionful
queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting.
If connection errors are occurring due to write timing out,the connection timeout
value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
:keyword Optional[~azure.servicebus.AutoLockRenewer] auto_lock_renewer: An ~azure.servicebus.AutoLockRenewer
can be provided such that messages are automatically registered on receipt. If the receiver is a session
receiver, it will apply to the session instead.
Expand Down Expand Up @@ -546,10 +548,12 @@ def get_subscription_receiver(
will be immediately removed from the subscription, and cannot be subsequently rejected or re-received if
the client fails to process the message. The default receive_mode is PEEK_LOCK.
:paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str]
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
receiver will automatically stop receiving. The default value is None, meaning no timeout. If connection
errors are occurring due to write timing out, the connection timeout value may need to be adjusted. See
the `socket_timeout` optional parameter for more details.
:keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent
messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
until the connection is closed. The default value is None, meaning no timeout. On a sessionful
queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting.
If connection errors are occurring due to write timing out,the connection timeout
value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
:keyword Optional[~azure.servicebus.AutoLockRenewer] auto_lock_renewer: An ~azure.servicebus.AutoLockRenewer
can be provided such that messages are automatically registered on receipt. If the receiver is a session
receiver, it will apply to the session instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,12 @@ class ServiceBusReceiver(
the client connects to.
:keyword str subscription_name: The path of specific Service Bus Subscription under the
specified Topic the client connects to.
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
receiver will automatically stop receiving. The default value is None, meaning no timeout.
:keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent
messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
until the connection is closed. The default value is None, meaning no timeout. On a sessionful
queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting
to a session. If connection errors are occurring due to write timing out,the connection timeout
value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
:keyword receive_mode: The mode with which messages will be retrieved from the entity. The two options
are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given
lock period before they will be removed from the queue. Messages received with RECEIVE_AND_DELETE
Expand Down Expand Up @@ -296,8 +300,12 @@ def _from_connection_string(
if the client fails to process the message.
The default mode is PEEK_LOCK.
:paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str]
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
receiver will automatically stop receiving. The default value is None, meaning no timeout.
:keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent
messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
until the connection is closed. The default value is None, meaning no timeout. On a sessionful
queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting
to a session. If connection errors are occurring due to write timing out,the connection timeout
value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
:keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`.
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Service Bus service. Default is `TransportType.Amqp`.
Expand Down Expand Up @@ -668,8 +676,10 @@ def receive_messages(
Setting to None will fully depend on the prefetch config. The default value is 1.
:param Optional[float] max_wait_time: Maximum time to wait in seconds for the first message to arrive.
If no messages arrive, and no timeout is specified, this call will not return
until the connection is closed. If specified, an no messages arrive within the
timeout period, an empty list will be returned.
until the connection is closed. If specified, and no messages arrive within the
timeout period, an empty list will be returned. NOTE: Setting max_wait_time on receive_messages
when NEXT_AVAILABLE_SESSION is specified will not impact the timeout for connecting to a session.
Please use max_wait_time on the constructor to set the timeout for connecting to a session.
:return: A list of messages received. If no messages are available, this will be an empty list.
:rtype: List[~azure.servicebus.ServiceBusReceivedMessage]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
# pylint: disable=too-many-lines
import functools
import time
import math
import random
import datetime
from datetime import timezone
from typing import Optional, Tuple, cast, List, TYPE_CHECKING, Any, Callable, Dict, Union, Iterator, Type
Expand Down Expand Up @@ -66,6 +68,8 @@
ERROR_CODE_ENTITY_ALREADY_EXISTS,
ERROR_CODE_PRECONDITION_FAILED,
ServiceBusReceiveMode,
OPERATION_TIMEOUT,
NEXT_AVAILABLE_SESSION,
)

from ..exceptions import (
Expand Down Expand Up @@ -576,6 +580,28 @@ def create_receive_client(
config = receiver._config # pylint: disable=protected-access
source = kwargs.pop("source")
receive_mode = kwargs.pop("receive_mode")
link_properties = kwargs.pop("link_properties")

# When NEXT_AVAILABLE_SESSION is set, the default time to wait to connect to a session is 65 seconds.
# If there are no messages in the topic/queue the client will wait for 65 seconds for an AttachFrame
# frame from the service before raising an OperationTimeoutError due to failure to connect.
# max_wait_time, if specified, will allow the user to wait for fewer or more than 65 seconds to
# connect to a session.
if receiver._session_id == NEXT_AVAILABLE_SESSION and receiver._max_wait_time: # pylint: disable=protected-access
timeout_in_ms = receiver._max_wait_time * 1000 # pylint: disable=protected-access
open_receive_link_base_jitter_in_ms = 100
open_recieve_link_buffer_in_ms = 20
open_receive_link_buffer_threshold_in_ms = 1000
jitter_base_in_ms = min(timeout_in_ms * 0.01, open_receive_link_base_jitter_in_ms)
timeout_in_ms = math.floor(timeout_in_ms - jitter_base_in_ms * random.random())
if timeout_in_ms >= open_receive_link_buffer_threshold_in_ms:
timeout_in_ms -= open_recieve_link_buffer_in_ms

# If we have specified a client-side timeout, assure that it is encoded as an uint
link_properties[OPERATION_TIMEOUT] = amqp_uint_value(timeout_in_ms)

kwargs["link_properties"] = link_properties

return ReceiveClient(
config.hostname,
source,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,14 @@ async def _do_retryable_operation(
self._container_id,
last_exception,
)
if isinstance(last_exception, OperationTimeoutError):
description = "If trying to receive from NEXT_AVAILABLE_SESSION, "\
"use max_wait_time on the ServiceBusReceiver to control the"\
" timeout."
error = OperationTimeoutError(
message=description,
)
raise error from last_exception
raise last_exception from None
await self._backoff(
retried_times=retried_times,
Expand Down Expand Up @@ -306,6 +314,14 @@ async def _backoff(
entity_name,
last_exception,
)
if isinstance(last_exception, OperationTimeoutError):
description = "If trying to receive from NEXT_AVAILABLE_SESSION, "\
"use max_wait_time on the ServiceBusReceiver to control the"\
" timeout."
error = OperationTimeoutError(
message=description,
)
raise error from last_exception
raise last_exception

async def _mgmt_request_response(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,12 @@ def get_queue_receiver(
will be immediately removed from the queue, and cannot be subsequently rejected or re-received if
the client fails to process the message. The default mode is PEEK_LOCK.
:paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str]
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
receiver will automatically stop receiving. The default value is None, meaning no timeout. If connection
errors are occurring due to write timing out, the connection timeout value may need to be adjusted. See
the `socket_timeout` optional parameter for more details.
:keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent
messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
until the connection is closed. The default value is None, meaning no timeout. On a sessionful
queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting.
If connection errors are occurring due to write timing out,the connection timeout
value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
:keyword Optional[~azure.servicebus.aio.AutoLockRenewer] auto_lock_renewer: An
~azure.servicebus.aio.AutoLockRenewer can be provided such that messages are automatically registered on
receipt. If the receiver is a session receiver, it will apply to the session instead.
Expand Down Expand Up @@ -520,10 +522,12 @@ def get_subscription_receiver(
will be immediately removed from the subscription, and cannot be subsequently rejected or re-received if
the client fails to process the message. The default mode is PEEK_LOCK.
:paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str]
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
receiver will automatically stop receiving. The default value is None, meaning no timeout. If connection
errors are occurring due to write timing out, the connection timeout value may need to be adjusted. See
the `socket_timeout` optional parameter for more details.
:keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent
messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
until the connection is closed. The default value is None, meaning no timeout. On a sessionful
queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting.
If connection errors are occurring due to write timing out,the connection timeout
value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
:keyword Optional[~azure.servicebus.aio.AutoLockRenewer] auto_lock_renewer: An
~azure.servicebus.aio.AutoLockRenewer can be provided such that messages are automatically registered on
receipt. If the receiver is a session receiver, it will apply to the session instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,12 @@ class ServiceBusReceiver(AsyncIterator, BaseHandler, ReceiverMixin):
if the client fails to process the message.
The default mode is PEEK_LOCK.
:paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str]
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the receiver
will automatically stop receiving. The default value is None, meaning no timeout.
:keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent
messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
until the connection is closed. The default value is None, meaning no timeout. On a sessionful
queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting
to a session. If connection errors are occurring due to write timing out,the connection timeout
value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
:keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`.
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Service Bus service. Default is `TransportType.Amqp`.
Expand Down Expand Up @@ -294,8 +298,12 @@ def _from_connection_string(
if the client fails to process the message.
The default mode is PEEK_LOCK.
:paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str]
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
receiver will automatically stop receiving. The default value is None, meaning no timeout.
:keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent
messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
until the connection is closed. The default value is None, meaning no timeout. On a sessionful
queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting
to a session. If connection errors are occurring due to write timing out,the connection timeout
value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
:keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`.
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Service Bus service. Default is `TransportType.Amqp`.
Expand Down Expand Up @@ -649,7 +657,9 @@ async def receive_messages(
:param Optional[float] max_wait_time: Maximum time to wait in seconds for the first message to arrive.
If no messages arrive, and no timeout is specified, this call will not return
until the connection is closed. If specified, and no messages arrive within the
timeout period, an empty list will be returned.
timeout period, an empty list will be returned. NOTE: Setting max_wait_time on receive_messages
when NEXT_AVAILABLE_SESSION is specified will not impact the timeout for connecting to a session.
Please use max_wait_time on the constructor to set the timeout for connecting to a session.
:return: A list of messages received. If no messages are available, this will be an empty list.
:rtype: list[~azure.servicebus.aio.ServiceBusReceivedMessage]
Expand Down
Loading

0 comments on commit ecb5c66

Please sign in to comment.