Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] set timeout on link properties #30832

Merged
merged 50 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
e4792ff
attempt to set timeout on link properties
l0lawrence Jun 21, 2023
36e2927
jitter math
l0lawrence Jun 22, 2023
6a43cb8
update receiver logic
l0lawrence Jul 5, 2023
07cf442
update encode
l0lawrence Jul 6, 2023
0c47666
move logic to pyamqp_transport
l0lawrence Jul 6, 2023
e1a4762
update docstring
l0lawrence Jul 6, 2023
66a8d8a
add tests
l0lawrence Jul 6, 2023
8449724
update if statement
l0lawrence Jul 7, 2023
91b58cb
update if statements
l0lawrence Jul 7, 2023
d53386b
whitespace
l0lawrence Jul 7, 2023
4e28e4f
trailing whitespace
l0lawrence Jul 7, 2023
647aa6e
move logic into pyamqp_transport
l0lawrence Jul 7, 2023
f03df02
pylint
l0lawrence Jul 7, 2023
2aee2f9
update kwargs
l0lawrence Jul 10, 2023
1be333a
Update sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_r…
l0lawrence Jul 25, 2023
ef0a3eb
update async docstring too
l0lawrence Jul 25, 2023
70023ed
update docstrings
l0lawrence Aug 1, 2023
fb63534
add back in jitter logic to align with other lang
l0lawrence Aug 1, 2023
21f69a5
update client wording
l0lawrence Oct 2, 2023
63697ee
update other receiver docstring
l0lawrence Oct 2, 2023
5baf6f9
remove spacing
l0lawrence Oct 2, 2023
d08d928
pylint
l0lawrence Oct 3, 2023
8f10e07
add comment
l0lawrence Oct 24, 2023
87a7f4e
update client
l0lawrence Oct 24, 2023
5e78600
fix pylint
l0lawrence Oct 30, 2023
c2a83e7
update max_wait_time docstring
l0lawrence Jan 18, 2024
51f066f
align iterator and receive_messages() behavior
l0lawrence Jan 18, 2024
6a64272
docstring updates
l0lawrence Jan 18, 2024
21d33fe
asyn align iterator and receive_messages()
l0lawrence Jan 18, 2024
3579629
add deprecation
l0lawrence Jan 18, 2024
94ae2a6
pylint fixes
l0lawrence Jan 19, 2024
9863d15
add timeout kwarg
l0lawrence Jan 19, 2024
b6eb85c
update docstring
l0lawrence Jan 24, 2024
b819c1d
client docstring
l0lawrence Jan 24, 2024
92ba1a2
add to OperationTimeoutError logic
l0lawrence Jan 25, 2024
e5b940f
remove timeout from mock
l0lawrence Jan 25, 2024
e25204d
nit
l0lawrence Jan 25, 2024
a698395
nit
l0lawrence Jan 25, 2024
cd1eaab
revert doc
l0lawrence Jan 25, 2024
096a201
update error msg
l0lawrence Jan 25, 2024
a08c004
nit
l0lawrence Jan 25, 2024
7005955
base handler async handling
l0lawrence Jan 26, 2024
ac50d44
doc
l0lawrence Jan 26, 2024
26014a2
pylint
l0lawrence Jan 26, 2024
715901a
pylint
l0lawrence Jan 26, 2024
5e96081
conn str doc update
l0lawrence Jan 29, 2024
6d29fca
remove ==True
l0lawrence Jan 31, 2024
4631f4e
add >65 to test
l0lawrence Jan 31, 2024
7d994a3
add warning
l0lawrence Jan 31, 2024
77c8c41
move note to docstring
l0lawrence Jan 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,14 @@ def _do_retryable_operation(
self._container_id,
last_exception,
)
if isinstance(last_exception, OperationTimeoutError):
description = "If trying to receive from NEXT_AVAILABLE_SESSION, "\
"use max_wait_time on the ServiceBusReceiver to control the"\
" timeout."
error = OperationTimeoutError(
message=description,
)
raise error from last_exception
raise last_exception from None
self._backoff(
retried_times=retried_times,
Expand Down Expand Up @@ -461,6 +469,15 @@ def _backoff(
entity_name,
last_exception,
)
if isinstance(last_exception, OperationTimeoutError):
description = "If trying to receive from NEXT_AVAILABLE_SESSION, "\
"use max_wait_time on the ServiceBusReceiver to control the"\
" timeout."
error = OperationTimeoutError(
message=description,
)

raise error from last_exception
raise last_exception

def _mgmt_request_response(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
CONSUMER_IDENTIFIER = VENDOR + b":receiver-name"
UAMQP_LIBRARY = "uamqp"
PYAMQP_LIBRARY = "pyamqp"
OPERATION_TIMEOUT = VENDOR + b":timeout"
swathipil marked this conversation as resolved.
Show resolved Hide resolved

MANAGEMENT_PATH_SUFFIX = "/$management"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,12 @@ def get_queue_receiver(
will be immediately removed from the queue, and cannot be subsequently rejected or re-received if
the client fails to process the message. The default receive_mode is PEEK_LOCK.
:paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str]
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
receiver will automatically stop receiving. The default value is None, meaning no timeout. If connection
errors are occurring due to write timing out, the connection timeout value may need to be adjusted. See
the `socket_timeout` optional parameter for more details.
:keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent
messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
until the connection is closed. The default value is None, meaning no timeout. On a sessionful
queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting.
If connection errors are occurring due to write timing out,the connection timeout
value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
:keyword Optional[~azure.servicebus.AutoLockRenewer] auto_lock_renewer: An ~azure.servicebus.AutoLockRenewer
can be provided such that messages are automatically registered on receipt. If the receiver is a session
receiver, it will apply to the session instead.
Expand Down Expand Up @@ -546,10 +548,12 @@ def get_subscription_receiver(
will be immediately removed from the subscription, and cannot be subsequently rejected or re-received if
the client fails to process the message. The default receive_mode is PEEK_LOCK.
:paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str]
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
receiver will automatically stop receiving. The default value is None, meaning no timeout. If connection
errors are occurring due to write timing out, the connection timeout value may need to be adjusted. See
the `socket_timeout` optional parameter for more details.
:keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent
messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
until the connection is closed. The default value is None, meaning no timeout. On a sessionful
queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting.
If connection errors are occurring due to write timing out,the connection timeout
value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
:keyword Optional[~azure.servicebus.AutoLockRenewer] auto_lock_renewer: An ~azure.servicebus.AutoLockRenewer
can be provided such that messages are automatically registered on receipt. If the receiver is a session
receiver, it will apply to the session instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,12 @@ class ServiceBusReceiver(
the client connects to.
:keyword str subscription_name: The path of specific Service Bus Subscription under the
specified Topic the client connects to.
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
receiver will automatically stop receiving. The default value is None, meaning no timeout.
:keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent
messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
until the connection is closed. The default value is None, meaning no timeout. On a sessionful
queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting
to a session. If connection errors are occurring due to write timing out,the connection timeout
value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
:keyword receive_mode: The mode with which messages will be retrieved from the entity. The two options
are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given
lock period before they will be removed from the queue. Messages received with RECEIVE_AND_DELETE
Expand Down Expand Up @@ -296,8 +300,12 @@ def _from_connection_string(
if the client fails to process the message.
The default mode is PEEK_LOCK.
:paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str]
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
receiver will automatically stop receiving. The default value is None, meaning no timeout.
:keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent
messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
until the connection is closed. The default value is None, meaning no timeout. On a sessionful
queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting
to a session. If connection errors are occurring due to write timing out,the connection timeout
value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
:keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`.
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Service Bus service. Default is `TransportType.Amqp`.
Expand Down Expand Up @@ -668,8 +676,10 @@ def receive_messages(
Setting to None will fully depend on the prefetch config. The default value is 1.
:param Optional[float] max_wait_time: Maximum time to wait in seconds for the first message to arrive.
If no messages arrive, and no timeout is specified, this call will not return
until the connection is closed. If specified, an no messages arrive within the
l0lawrence marked this conversation as resolved.
Show resolved Hide resolved
timeout period, an empty list will be returned.
until the connection is closed. If specified, and no messages arrive within the
timeout period, an empty list will be returned. NOTE: Setting max_wait_time on receive_messages
when NEXT_AVAILABLE_SESSION is specified will not impact the timeout for connecting to a session.
Please use max_wait_time on the constructor to set the timeout for connecting to a session.
:return: A list of messages received. If no messages are available, this will be an empty list.
:rtype: List[~azure.servicebus.ServiceBusReceivedMessage]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
# pylint: disable=too-many-lines
import functools
import time
import math
import random
import datetime
from datetime import timezone
from typing import Optional, Tuple, cast, List, TYPE_CHECKING, Any, Callable, Dict, Union, Iterator, Type
Expand Down Expand Up @@ -66,6 +68,8 @@
ERROR_CODE_ENTITY_ALREADY_EXISTS,
ERROR_CODE_PRECONDITION_FAILED,
ServiceBusReceiveMode,
OPERATION_TIMEOUT,
NEXT_AVAILABLE_SESSION,
)

from ..exceptions import (
Expand Down Expand Up @@ -576,6 +580,28 @@ def create_receive_client(
config = receiver._config # pylint: disable=protected-access
source = kwargs.pop("source")
receive_mode = kwargs.pop("receive_mode")
link_properties = kwargs.pop("link_properties")

# When NEXT_AVAILABLE_SESSION is set, the default time to wait to connect to a session is 65 seconds.
# If there are no messages in the topic/queue the client will wait for 65 seconds for an AttachFrame
# frame from the service before raising an OperationTimeoutError due to failure to connect.
# max_wait_time, if specified, will allow the user to wait for fewer or more than 65 seconds to
# connect to a session.
if receiver._session_id == NEXT_AVAILABLE_SESSION and receiver._max_wait_time: # pylint: disable=protected-access
swathipil marked this conversation as resolved.
Show resolved Hide resolved
timeout_in_ms = receiver._max_wait_time * 1000 # pylint: disable=protected-access
kashifkhan marked this conversation as resolved.
Show resolved Hide resolved
open_receive_link_base_jitter_in_ms = 100
open_recieve_link_buffer_in_ms = 20
open_receive_link_buffer_threshold_in_ms = 1000
jitter_base_in_ms = min(timeout_in_ms * 0.01, open_receive_link_base_jitter_in_ms)
timeout_in_ms = math.floor(timeout_in_ms - jitter_base_in_ms * random.random())
if timeout_in_ms >= open_receive_link_buffer_threshold_in_ms:
timeout_in_ms -= open_recieve_link_buffer_in_ms

# If we have specified a client-side timeout, assure that it is encoded as an uint
link_properties[OPERATION_TIMEOUT] = amqp_uint_value(timeout_in_ms)

kwargs["link_properties"] = link_properties

return ReceiveClient(
config.hostname,
source,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,14 @@ async def _do_retryable_operation(
self._container_id,
last_exception,
)
if isinstance(last_exception, OperationTimeoutError):
description = "If trying to receive from NEXT_AVAILABLE_SESSION, "\
"use max_wait_time on the ServiceBusReceiver to control the"\
" timeout."
error = OperationTimeoutError(
message=description,
)
raise error from last_exception
raise last_exception from None
await self._backoff(
retried_times=retried_times,
Expand Down Expand Up @@ -306,6 +314,14 @@ async def _backoff(
entity_name,
last_exception,
)
if isinstance(last_exception, OperationTimeoutError):
description = "If trying to receive from NEXT_AVAILABLE_SESSION, "\
"use max_wait_time on the ServiceBusReceiver to control the"\
" timeout."
error = OperationTimeoutError(
message=description,
)
raise error from last_exception
raise last_exception

async def _mgmt_request_response(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,12 @@ def get_queue_receiver(
will be immediately removed from the queue, and cannot be subsequently rejected or re-received if
the client fails to process the message. The default mode is PEEK_LOCK.
:paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str]
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
receiver will automatically stop receiving. The default value is None, meaning no timeout. If connection
errors are occurring due to write timing out, the connection timeout value may need to be adjusted. See
the `socket_timeout` optional parameter for more details.
:keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent
messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
until the connection is closed. The default value is None, meaning no timeout. On a sessionful
queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting.
If connection errors are occurring due to write timing out,the connection timeout
value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
:keyword Optional[~azure.servicebus.aio.AutoLockRenewer] auto_lock_renewer: An
~azure.servicebus.aio.AutoLockRenewer can be provided such that messages are automatically registered on
receipt. If the receiver is a session receiver, it will apply to the session instead.
Expand Down Expand Up @@ -520,10 +522,12 @@ def get_subscription_receiver(
will be immediately removed from the subscription, and cannot be subsequently rejected or re-received if
the client fails to process the message. The default mode is PEEK_LOCK.
:paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str]
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
receiver will automatically stop receiving. The default value is None, meaning no timeout. If connection
errors are occurring due to write timing out, the connection timeout value may need to be adjusted. See
the `socket_timeout` optional parameter for more details.
:keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent
messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
until the connection is closed. The default value is None, meaning no timeout. On a sessionful
queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting.
If connection errors are occurring due to write timing out,the connection timeout
value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
:keyword Optional[~azure.servicebus.aio.AutoLockRenewer] auto_lock_renewer: An
~azure.servicebus.aio.AutoLockRenewer can be provided such that messages are automatically registered on
receipt. If the receiver is a session receiver, it will apply to the session instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,12 @@ class ServiceBusReceiver(AsyncIterator, BaseHandler, ReceiverMixin):
if the client fails to process the message.
The default mode is PEEK_LOCK.
:paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str]
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the receiver
will automatically stop receiving. The default value is None, meaning no timeout.
:keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent
messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
until the connection is closed. The default value is None, meaning no timeout. On a sessionful
queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting
to a session. If connection errors are occurring due to write timing out,the connection timeout
value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
:keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`.
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Service Bus service. Default is `TransportType.Amqp`.
Expand Down Expand Up @@ -294,8 +298,12 @@ def _from_connection_string(
if the client fails to process the message.
The default mode is PEEK_LOCK.
:paramtype receive_mode: Union[~azure.servicebus.ServiceBusReceiveMode, str]
:keyword Optional[float] max_wait_time: The timeout in seconds between received messages after which the
receiver will automatically stop receiving. The default value is None, meaning no timeout.
:keyword Optional[float] max_wait_time: The timeout in seconds to wait for the first and subsequent
messages to arrive. If no messages arrive, and no timeout is specified, this call will not return
until the connection is closed. The default value is None, meaning no timeout. On a sessionful
queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting
to a session. If connection errors are occurring due to write timing out,the connection timeout
value may need to be adjusted. See the `socket_timeout` optional parameter for more details.
:keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`.
:keyword transport_type: The type of transport protocol that will be used for communicating with
the Service Bus service. Default is `TransportType.Amqp`.
Expand Down Expand Up @@ -649,7 +657,9 @@ async def receive_messages(
:param Optional[float] max_wait_time: Maximum time to wait in seconds for the first message to arrive.
If no messages arrive, and no timeout is specified, this call will not return
until the connection is closed. If specified, and no messages arrive within the
timeout period, an empty list will be returned.
timeout period, an empty list will be returned. NOTE: Setting max_wait_time on receive_messages
when NEXT_AVAILABLE_SESSION is specified will not impact the timeout for connecting to a session.
Please use max_wait_time on the constructor to set the timeout for connecting to a session.
:return: A list of messages received. If no messages are available, this will be an empty list.
:rtype: list[~azure.servicebus.aio.ServiceBusReceivedMessage]

Expand Down
Loading
Loading