Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] Resend received message #12457

Merged
merged 11 commits into from
Jul 23, 2020
4 changes: 2 additions & 2 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

* Added new properties to Message, PeekMessage and ReceivedMessage: `content_type`, `correlation_id`, `label`,
`message_id`, `reply_to`, `reply_to_session_id` and `to`. Please refer to the docstring for further information.

* Add new properties to PeekedMessaged and ReceivedMessage: `enqueued_sequence_number`, `dead_letter_error_description`,
* Added new properties to PeekedMessaged and ReceivedMessage: `enqueued_sequence_number`, `dead_letter_error_description`,
`dead_letter_reason`, `dead_letter_source`, `delivery_count` and `expires_at_utc`. Please refer to the docstring for further information.
* Added support for sending received messages via `ServiceBusSender.send_messages`.

**Breaking Changes**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
SessionLockExpired,
MessageSettleFailed,
MessageContentTooLarge)
from .utils import utc_from_timestamp, utc_now
from .utils import utc_from_timestamp, utc_now, copy_messages_to_sendable_if_needed
if TYPE_CHECKING:
from .._servicebus_receiver import ServiceBusReceiver
from .._servicebus_session_receiver import ServiceBusSessionReceiver
Expand Down Expand Up @@ -540,6 +540,7 @@ def add(self, message):
:rtype: None
:raises: :class: ~azure.servicebus.exceptions.MessageContentTooLarge, when exceeding the size limit.
"""
message = copy_messages_to_sendable_if_needed(message)
message_size = message.message.get_message_encoded_size()

# For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that
Expand Down Expand Up @@ -575,6 +576,35 @@ class PeekMessage(Message):
def __init__(self, message):
super(PeekMessage, self).__init__(None, message=message)

def _to_outgoing_message(self):
# type: () -> Message
amqp_message = self.message
amqp_body = amqp_message._body # pylint: disable=protected-access

if isinstance(amqp_body, uamqp.message.DataBody):
body = b''.join(amqp_body.data)
else:
# amqp_body is type of uamqp.message.ValueBody
body = amqp_body.data

return Message(
body=body,
content_type=self.content_type,
correlation_id=self.correlation_id,
label=self.label,
message_id=self.message_id,
partition_key=self.partition_key,
properties=self.properties,
reply_to=self.reply_to,
reply_to_session_id=self.reply_to_session_id,
session_id=self.session_id,
scheduled_enqueue_time_utc=self.scheduled_enqueue_time_utc,
time_to_live=self.time_to_live,
to=self.to,
via_partition_key=self.via_partition_key
)


@property
def dead_letter_error_description(self):
# type: () -> Optional[str]
Expand Down
20 changes: 20 additions & 0 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,26 @@ def generate_dead_letter_entity_name(
return entity_name


def copy_messages_to_sendable_if_needed(messages):
"""
This method is to convert single/multiple received messages to sendable messages to enable message resending.
"""
# pylint: disable=protected-access
try:
msgs_to_return = []
for each in messages:
try:
msgs_to_return.append(each._to_outgoing_message())
except AttributeError:
msgs_to_return.append(each)
return msgs_to_return
except TypeError:
try:
return messages._to_outgoing_message()
except AttributeError:
return messages


class AutoLockRenew(object):
"""Auto renew locks for messages and sessions using a background thread pool.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
OperationTimeoutError,
_ServiceBusErrorPolicy,
)
from ._common.utils import create_authentication
from ._common.utils import create_authentication, copy_messages_to_sendable_if_needed
from ._common.constants import (
REQUEST_RESPONSE_CANCEL_SCHEDULED_MESSAGE_OPERATION,
REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION,
Expand Down Expand Up @@ -68,6 +68,7 @@ def _build_schedule_request(cls, schedule_time_utc, *messages):
if not isinstance(message, Message):
raise ValueError("Scheduling batch messages only supports iterables containing Message Objects."
" Received instead: {}".format(message.__class__.__name__))
message = copy_messages_to_sendable_if_needed(message)
message.scheduled_enqueue_time_utc = schedule_time_utc
message_data = {}
message_data[MGMT_REQUEST_MESSAGE_ID] = message.message_id
Expand Down Expand Up @@ -326,6 +327,7 @@ def send_messages(self, message):
:caption: Send message.

"""
message = copy_messages_to_sendable_if_needed(message)
try:
batch = self.create_batch()
batch._from_list(message) # pylint: disable=protected-access
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
MGMT_REQUEST_SEQUENCE_NUMBERS
)
from .._common import mgmt_handlers
from .._common.utils import copy_messages_to_sendable_if_needed
from ._async_utils import create_authentication

if TYPE_CHECKING:
Expand Down Expand Up @@ -267,6 +268,7 @@ async def send_messages(self, message):
:caption: Send message.

"""
message = copy_messages_to_sendable_if_needed(message)
try:
batch = await self.create_batch()
batch._from_list(message) # pylint: disable=protected-access
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -993,20 +993,27 @@ async def test_async_queue_schedule_multiple_messages(self, servicebus_namespace
servicebus_namespace_connection_string, logging_enable=False) as sb_client:
enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0)
messages = []
async with sb_client.get_queue_receiver(servicebus_queue.name, prefetch=20) as receiver:
async with sb_client.get_queue_sender(servicebus_queue.name) as sender:
content = str(uuid.uuid4())
message_id_a = uuid.uuid4()
message_a = Message(content)
message_a.message_id = message_id_a
message_id_b = uuid.uuid4()
message_b = Message(content)
message_b.message_id = message_id_b
tokens = await sender.schedule_messages([message_a, message_b], enqueue_time)
assert len(tokens) == 2
receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch=20)
sender = sb_client.get_queue_sender(servicebus_queue.name)
async with sender, receiver:
content = str(uuid.uuid4())
message_id_a = uuid.uuid4()
message_a = Message(content)
message_a.message_id = message_id_a
message_id_b = uuid.uuid4()
message_b = Message(content)
message_b.message_id = message_id_b

recv = await receiver.receive_messages(max_wait_time=120)
messages.extend(recv)
await sender.send_messages([message_a, message_b])

received_messages = await receiver.receive_messages(max_batch_size=2, max_wait_time=5)
for message in received_messages:
await message.complete()

tokens = await sender.schedule_messages(received_messages, enqueue_time)
assert len(tokens) == 2

messages = await receiver.receive_messages(max_wait_time=120)
recv = await receiver.receive_messages(max_wait_time=5)
messages.extend(recv)
if messages:
Expand Down Expand Up @@ -1156,13 +1163,34 @@ def message_content():
for i in range(20):
yield Message("Message no. {}".format(i))

async with sb_client.get_queue_sender(servicebus_queue.name) as sender:
sender = sb_client.get_queue_sender(servicebus_queue.name)
receiver = sb_client.get_queue_receiver(servicebus_queue.name)

async with sender, receiver:
message = BatchMessage()
for each in message_content():
message.add(each)
await sender.send_messages(message)

async with sb_client.get_queue_receiver(servicebus_queue.name) as receiver:
receive_counter = 0
message_received_cnt = 0
while message_received_cnt < 20:
messages = await receiver.receive_messages(max_batch_size=20, max_wait_time=5)
if not messages:
break
receive_counter += 1
message_received_cnt += len(messages)
for m in messages:
print_message(_logger, m)
await sender.send_messages(message)
await m.complete()

assert message_received_cnt == 20
# Network/server might be unstable making flow control ineffective in the leading rounds of connection iteration
assert receive_counter < 10 # Dynamic link credit issuing come info effect

# received resent messages

receive_counter = 0
message_received_cnt = 0
while message_received_cnt < 20:
Expand Down
Loading