Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] Settlement move from Message to Receiver #14681

Merged
merged 18 commits into from
Nov 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@

* Added support for `timeout` parameter on the following operations:
- `ServiceBusSender`: `send_messages`, `schedule_messages` and `cancel_scheduled_messages`
- `ServiceBusReceiver`: `receive_deferred_messages` and `peek_messages`
- `ServiceBusReceiver`: `receive_deferred_messages`, `peek_messages` and `renew_message_lock`
- `ServiceBusSession`: `get_state`, `set_state` and `renew_lock`
- `ReceivedMessage`: `renew_lock`
* `azure.servicebus.exceptions.ServiceBusError` now inherits from `azure.core.exceptions.AzureError`.

**Breaking Changes**
Expand All @@ -22,10 +21,11 @@
* Removed error `azure.servicebus.exceptions.ServiceBusResourceNotFound` as `azure.core.exceptions.ResourceNotFoundError` is now raised when a Service Bus
resource does not exist when using the `ServiceBusAdministrationClient`.
* Renamed `Message` to `ServiceBusMessage`.
* Renamed `PeekedMessage` to `ServiceBusPeekedMessage`.
* Renamed `ReceivedMessage` to `ServiceBusReceivedMessage`.
* Renamed `BatchMessage` to `ServiceBusMessageBatch`.
- Renamed method `add` to `add_message` on the class.
* Removed class `PeekedMessage`.
* Removed class `ReceivedMessage` under module `azure.servicebus.aio`.
* Renamed `ServiceBusSender.create_batch` to `ServiceBusSender.create_message_batch`.
* Removed class `ServiceBusSessionReceiver` which is now unified within class `ServiceBusReceiver`.
- Removed methods `ServiceBusClient.get_queue_session_receiver` and `ServiceBusClient.get_subscription_session_receiver`.
Expand All @@ -36,7 +36,15 @@ now raise more concrete exception other than `MessageSettleFailed` and `ServiceB
* Exceptions `MessageSendFailed`, `MessageSettleFailed` and `MessageLockExpired`
now inherit from `azure.servicebus.exceptions.MessageError`.
* `get_state` in `ServiceBusSession` now returns `bytes` instead of a `string`.
* `encoding` support is removed from `ServiceBusMessage`
* Message settlement methods are moved from `ServiceBusMessage` to `ServiceBusReceiver`:
- Use `ServiceBusReceiver.complete_message` instead of `ServiceBusReceivedMessage.complete` to complete a message.
- Use `ServiceBusReceiver.abandon_message` instead of `ServiceBusReceivedMessage.abandon` to abandon a message.
- Use `ServiceBusReceiver.defer_message` instead of `ServiceBusReceivedMessage.defer` to defer a message.
- Use `ServiceBusReceiver.dead_letter_message` instead of `ServiceBusReceivedMessage.dead_letter` to dead letter a message.
* Message `renew_lock` method is moved from `ServiceBusMessage` to `ServiceBusReceiver`:
- Changed `ServiceBusReceivedMessage.renew_lock` to `ServiceBusReceiver.renew_message_lock`
* `AutoLockRenewer.register` now takes `ServiceBusReceiver` as a positional parameter.
* Removed `encoding` support from `ServiceBusMessage`.

**BugFixes**

Expand Down
65 changes: 43 additions & 22 deletions sdk/servicebus/azure-servicebus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ To interact with these resources, one should be familiar with the following SDK

* [Receiver][receiver_reference]: To receive messages from a Queue or Subscription, one would use the corresponding `get_queue_receiver` or `get_subscription_receiver` method off of a `ServiceBusClient` instance as seen [here](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples/sync_samples/receive_queue.py).

* [Message][message_reference]: When sending, this is the type you will construct to contain your payload. When receiving, this is where you will access the payload and control how the message is "settled" (completed, dead-lettered, etc); these functions are only available on a received message.
* [Message][message_reference]: When sending, this is the type you will construct to contain your payload. When receiving, this is where you will access the payload.

## Examples

Expand Down Expand Up @@ -151,7 +151,7 @@ with ServiceBusClient.from_connection_string(connstr) as client:
```

> **NOTE:** Any message received with `mode=PeekLock` (this is the default, with the alternative ReceiveAndDelete removing the message from the queue immediately on receipt)
> has a lock that must be renewed via `message.renew_lock()` before it expires if processing would take longer than the lock duration.
> has a lock that must be renewed via `receiver.renew_message_lock` before it expires if processing would take longer than the lock duration.
> See [AutoLockRenewer](#automatically-renew-message-or-session-locks) for a helper to perform this in the background automatically.
> Lock duration is set in Azure on the queue or topic itself.

Expand Down Expand Up @@ -236,12 +236,12 @@ with ServiceBusClient.from_connection_string(connstr) as client:

When receiving from a queue, you have multiple actions you can take on the messages you receive.

> **NOTE**: You can only settle `ReceivedMessage` objects which are received in `ReceiveMode.PeekLock` mode (this is the default).
> `ReceiveMode.ReceiveAndDelete` mode removes the message from the queue on receipt. `PeekedMessage` messages
> returned from `peek()` cannot be settled, as the message lock is not taken like it is in the aforementioned receive methods. Sessionful messages have a similar limitation.
> **NOTE**: You can only settle `ServiceBusReceivedMessage` objects which are received in `ReceiveMode.PeekLock` mode (this is the default).
> `ReceiveMode.ReceiveAndDelete` mode removes the message from the queue on receipt. `ServiceBusReceivedMessage` messages
> returned from `peek_messages()` cannot be settled, as the message lock is not taken like it is in the aforementioned receive methods. Sessionful messages have a similar limitation.

If the message has a lock as mentioned above, settlement will fail if the message lock has expired.
If processing would take longer than the lock duration, it must be maintained via `message.renew_lock()` before it expires.
If processing would take longer than the lock duration, it must be maintained via `receiver.renew_message_lock` before it expires.
Lock duration is set in Azure on the queue or topic itself.
See [AutoLockRenewer](#automatically-renew-message-or-session-locks) for a helper to perform this in the background automatically.

Expand All @@ -260,7 +260,7 @@ with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_receiver(queue_name) as receiver:
for msg in receiver:
print(str(msg))
msg.complete()
receiver.complete_message(msg)
```

#### [Abandon][abandon_reference]
Expand All @@ -278,7 +278,7 @@ with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_receiver(queue_name) as receiver:
for msg in receiver:
print(str(msg))
msg.abandon()
receiver.abandon_message(receiver)
```

#### [DeadLetter][deadletter_reference]
Expand All @@ -296,7 +296,7 @@ with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_receiver(queue_name) as receiver:
for msg in receiver:
print(str(msg))
msg.dead_letter()
receiver.dead_letter_message(msg)
```

#### [Defer][defer_reference]
Expand All @@ -315,32 +315,53 @@ with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_receiver(queue_name) as receiver:
for msg in receiver:
print(str(msg))
msg.defer()
receiver.defer_message(msg)
```

### [Automatically renew Message or Session locks][autolockrenew_reference]

`AutoLockRenewer` is a simple method for ensuring your message or session remains locked even over long periods of time, if calling `renew_lock()` is impractical or undesired.
Internally, it is not much more than shorthand for creating a concurrent watchdog to call `renew_lock()` if the object is nearing expiry.
`AutoLockRenewer` is a simple method for ensuring your message or session remains locked even over long periods of time, if calling `receiver.renew_message_lock`/`receiver.session.renew_lock` is impractical or undesired.
Internally, it is not much more than shorthand for creating a concurrent watchdog to do lock renewal if the object is nearing expiry.
It should be used as follows:

* Message lock automatic renewing

```python
from azure.servicebus import ServiceBusClient, AutoLockRenewer

import os
connstr = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

# Can also be called via "with AutoLockRenewer() as renewer" to automate closing.
renewer = AutoLockRenewer()
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_receiver(queue_name) as receiver:
for msg in receiver.receive_messages():
renewer.register(receiver, msg, timeout=60)
# Do your application logic here
receiver.complete_message(msg)
renewer.close()
```

* Session lock automatic renewing

```python
from azure.servicebus import ServiceBusClient, AutoLockRenewer

import os
connstr = os.environ['SERVICE_BUS_CONN_STR']
session_queue_name = os.environ['SERVICE_BUS_SESSION_QUEUE_NAME']
session_id = os.environ['SERVICE_BUS_SESSION_ID']

# Can also be called via "with AutoLockRenewer() as renewer" to automate closing.
renewer = AutoLockRenewer()
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_receiver(queue_name, session_id=session_id) as receiver:
renewer.register(receiver.session, timeout=300) # Timeout for how long to maintain the lock for, in seconds.
with client.get_queue_receiver(session_queue_name, session_id=session_id) as receiver:
renewer.register(receiver, receiver.session, timeout=300) # Timeout for how long to maintain the lock for, in seconds.
for msg in receiver.receive_messages():
renewer.register(msg, timeout=60)
# Do your application logic here
msg.complete()
receiver.complete_message(msg)
renewer.close()
```

Expand All @@ -364,7 +385,7 @@ link will extend this timeout.
- max_wait_time: Provided on creation of a receiver or when calling `receive_messages()` or `get_streaming_message_iter()`, the time after which receiving messages will halt after no traffic. This applies both to the imperative `receive_messages()` function as well as the length
a generator-style receive will run for before exiting if there are no messages. Passing None (default) will wait forever, up until the 10 minute threshold if no other action is taken.

> **NOTE:** If processing of a message or session is sufficiently long as to cause timeouts, as an alternative to calling `renew_lock()` manually, one can
> **NOTE:** If processing of a message or session is sufficiently long as to cause timeouts, as an alternative to calling `receiver.renew_message_lock`/`receiver.session.renew_lock` manually, one can
> leverage the `AutoLockRenewer` functionality detailed [above](#automatically-renew-message-or-session-locks).

### Common Exceptions
Expand Down Expand Up @@ -408,7 +429,7 @@ You should be aware of the lock duration of a session and keep renewing the lock
This could happen when the receiver used by `AutoLockRenerer` is closed or the lock of the renewable has expired.
It is recommended to re-register the renewable message or session by receiving the message or connect to the sessionful entity again.
- **AutoLockRenewTimeout:** The time allocated to renew the message or session lock has elapsed. You could re-register the object that wants be auto lock renewed or extend the timeout in advance.
- **MessageError:** Operation on message failed because the message is in a wrong state. It is the root error class of message related errors described above.
- **ServiceBusMessageError:** Operation on message failed because the message is in a wrong state. It is the root error class of message related errors described above.
- **ServiceBusError:** All other Service Bus related errors. It is the root error class of all the errors described above.

Please view the [exceptions reference docs][exception_reference] for detailed descriptions of our common Exception types.
Expand Down Expand Up @@ -479,10 +500,10 @@ contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additio
[streaming_receive_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=get_streaming_message_iter#azure.servicebus.ServiceBusReceiver.get_streaming_message_iter
[session_receive_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=receive#azure.servicebus.ServiceBusSessionReceiver.receive_messages
[session_send_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=session_id#azure.servicebus.ServiceBusMessage.session_id
[complete_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=complete#azure.servicebus.ServiceBusReceivedMessage.complete
[abandon_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=abandon#azure.servicebus.ServiceBusReceivedMessage.abandon
[defer_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=defer#azure.servicebus.ServiceBusReceivedMessage.defer
[deadletter_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=dead_letter#azure.servicebus.ServiceBusReceivedMessage.dead_letter
[complete_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=complete_message#azure.servicebus.ServiceBusReceiver.complete_message
[abandon_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=abandon_message#azure.servicebus.ServiceBusReceiver.abandon_message
[defer_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=defer_message#azure.servicebus.ServiceBusReceiver.defer_message
[deadletter_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html?highlight=dead_letter_message#azure.servicebus.ServiceBusReceiver.dead_letter_message
[autolockrenew_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html#azure.servicebus.AutoLockRenewer
[exception_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.html#module-azure.servicebus.exceptions
[subscription_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/azure.servicebus.aio.html?highlight=subscription#azure.servicebus.aio.ServiceBusClient.get_subscription_receiver
Expand Down
8 changes: 1 addition & 7 deletions sdk/servicebus/azure-servicebus/azure/servicebus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,7 @@
from ._servicebus_sender import ServiceBusSender
from ._servicebus_receiver import ServiceBusReceiver
from ._servicebus_session import ServiceBusSession
from ._common.message import (
ServiceBusMessage,
ServiceBusMessageBatch,
ServiceBusPeekedMessage,
ServiceBusReceivedMessage
)
from ._common.message import ServiceBusMessage, ServiceBusMessageBatch, ServiceBusReceivedMessage
from ._common.constants import ReceiveMode, SubQueue, NEXT_AVAILABLE_SESSION
from ._common.auto_lock_renewer import AutoLockRenewer

Expand All @@ -26,7 +21,6 @@
__all__ = [
'ServiceBusMessage',
'ServiceBusMessageBatch',
'ServiceBusPeekedMessage',
'ServiceBusReceivedMessage',
'NEXT_AVAILABLE_SESSION',
'SubQueue',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING

from .._servicebus_receiver import ServiceBusReceiver
from .._servicebus_session import ServiceBusSession
from ..exceptions import AutoLockRenewFailed, AutoLockRenewTimeout, ServiceBusError
from .utils import renewable_start_time, utc_now
Expand All @@ -20,9 +21,11 @@
from .message import ServiceBusReceivedMessage
LockRenewFailureCallback = Callable[[Union[ServiceBusSession, ServiceBusReceivedMessage],
Optional[Exception]], None]
Renewable = Union[ServiceBusSession, ServiceBusReceivedMessage]

_log = logging.getLogger(__name__)


class AutoLockRenewer(object):
"""Auto renew locks for messages and sessions using a background thread pool.

Expand Down Expand Up @@ -90,19 +93,24 @@ def _renewable(self, renewable):
return False
return True

def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_failure=None):
def _auto_lock_renew(self, receiver, renewable, starttime, timeout, on_lock_renew_failure=None):
# pylint: disable=protected-access
_log.debug("Running lock auto-renew thread for %r seconds", timeout)
error = None
clean_shutdown = False # Only trigger the on_lock_renew_failure if halting was not expected (shutdown, etc)
clean_shutdown = False # Only trigger the on_lock_renew_failure if halting was not expected (shutdown, etc)
try:
while self._renewable(renewable):
if (utc_now() - starttime) >= datetime.timedelta(seconds=timeout):
_log.debug("Reached auto lock renew timeout - letting lock expire.")
raise AutoLockRenewTimeout("Auto-renew period ({} seconds) elapsed.".format(timeout))
if (renewable.locked_until_utc - utc_now()) <= datetime.timedelta(seconds=self._renew_period):
_log.debug("%r seconds or less until lock expires - auto renewing.", self._renew_period)
renewable.renew_lock()
try:
# Renewable is a session
renewable.renew_lock() # type: ignore
except AttributeError:
# Renewable is a message
receiver.renew_message_lock(renewable) # type: ignore
time.sleep(self._sleep_time)
clean_shutdown = not renewable._lock_expired
except AutoLockRenewTimeout as e:
Expand All @@ -119,10 +127,13 @@ def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_failure=
if on_lock_renew_failure and not clean_shutdown:
on_lock_renew_failure(renewable, error)

def register(self, renewable, timeout=300, on_lock_renew_failure=None):
# type: (Union[ServiceBusReceivedMessage, ServiceBusSession], float, Optional[LockRenewFailureCallback]) -> None
def register(self, receiver, renewable, timeout=300, on_lock_renew_failure=None):
# type: (ServiceBusReceiver, Renewable, float, Optional[LockRenewFailureCallback]) -> None
"""Register a renewable entity for automatic lock renewal.

:param receiver: The ServiceBusReceiver instance that is associated with the message or the session to
be auto-lock-renewed.
:type receiver: ~azure.servicebus.ServiceBusReceiver
:param renewable: A locked entity that needs to be renewed.
:type renewable: Union[~azure.servicebus.ServiceBusReceivedMessage, ~azure.servicebus.ServiceBusSession]
:param timeout: A time in seconds that the lock should be maintained for. Default value is 300 (5 minutes).
Expand All @@ -137,7 +148,7 @@ def register(self, renewable, timeout=300, on_lock_renew_failure=None):
raise ServiceBusError("The AutoLockRenewer has already been shutdown. Please create a new instance for"
" auto lock renewing.")
starttime = renewable_start_time(renewable)
self._executor.submit(self._auto_lock_renew, renewable, starttime, timeout, on_lock_renew_failure)
self._executor.submit(self._auto_lock_renew, receiver, renewable, starttime, timeout, on_lock_renew_failure)

def close(self, wait=True):
"""Cease autorenewal by shutting down the thread pool to clean up any remaining lock renewal threads.
Expand Down
Loading