Skip to content

Commit

Permalink
[ServiceBus] improve memory usage of service bus client (#19915)
Browse files Browse the repository at this point in the history
* improve memory usage of service bus client

* remove set

* Update sdk/servicebus/azure-servicebus/CHANGELOG.md

Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com>

* revert set

* update impl to avoid access private ivars

* fix bug

* use weakref

* fix pylint

Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com>
  • Loading branch information
yunhaoling and swathipil authored Sep 1, 2021
1 parent c935c3d commit 356d974
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 27 deletions.
9 changes: 2 additions & 7 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
# Release History

## 7.3.3 (Unreleased)

### Features Added

### Breaking Changes
## 7.3.3 (2021-09-08)

### Bugs Fixed

### Other Changes

- Improved memory usage of `ServiceBusClient` to automatically discard spawned `ServiceBusSender` or `ServiceBusReceiver` from its handler set when no strong reference to the sender or receiver exists anymore.
- Reduced CPU load of `azure.servicebus.AutoLockRenewer` during lock renewal.

## 7.3.2 (2021-08-10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Any, List, Union, TYPE_CHECKING
from typing import Any, Union, TYPE_CHECKING
import logging
from weakref import WeakSet

import uamqp

from ._base_handler import (
_parse_conn_str,
ServiceBusSharedKeyCredential,
ServiceBusSASTokenCredential,
BaseHandler,
)
from ._servicebus_sender import ServiceBusSender
from ._servicebus_receiver import ServiceBusReceiver
Expand Down Expand Up @@ -89,7 +89,7 @@ def __init__(self, fully_qualified_namespace, credential, **kwargs):
self._auth_uri = "{}/{}".format(self._auth_uri, self._entity_name)
# Internal flag for switching whether to apply connection sharing, pending fix in uamqp library
self._connection_sharing = False
self._handlers = [] # type: List[BaseHandler]
self._handlers = WeakSet() # type: WeakSet

def __enter__(self):
if self._connection_sharing:
Expand Down Expand Up @@ -124,7 +124,8 @@ def close(self):
handler._container_id, # pylint: disable=protected-access
exception,
)
del self._handlers[:]

self._handlers.clear()

if self._connection_sharing and self._connection:
self._connection.destroy()
Expand Down Expand Up @@ -216,7 +217,7 @@ def get_queue_sender(self, queue_name, **kwargs):
retry_backoff_max=self._config.retry_backoff_max,
**kwargs
)
self._handlers.append(handler)
self._handlers.add(handler)
return handler

def get_queue_receiver(self, queue_name, **kwargs):
Expand Down Expand Up @@ -307,7 +308,7 @@ def get_queue_receiver(self, queue_name, **kwargs):
retry_backoff_max=self._config.retry_backoff_max,
**kwargs
)
self._handlers.append(handler)
self._handlers.add(handler)
return handler

def get_topic_sender(self, topic_name, **kwargs):
Expand Down Expand Up @@ -348,7 +349,7 @@ def get_topic_sender(self, topic_name, **kwargs):
retry_backoff_max=self._config.retry_backoff_max,
**kwargs
)
self._handlers.append(handler)
self._handlers.add(handler)
return handler

def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
Expand Down Expand Up @@ -457,5 +458,5 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
retry_backoff_max=self._config.retry_backoff_max,
**kwargs
)
self._handlers.append(handler)
self._handlers.add(handler)
return handler
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Any, List, Union, TYPE_CHECKING
from typing import Any, Union, TYPE_CHECKING
import logging
from weakref import WeakSet

import uamqp
from azure.core.credentials import AzureSasCredential, AzureNamedKeyCredential
Expand All @@ -12,7 +13,6 @@
from ._base_handler_async import (
ServiceBusSharedKeyCredential,
ServiceBusSASTokenCredential,
BaseHandler,
)
from ._servicebus_sender_async import ServiceBusSender
from ._servicebus_receiver_async import ServiceBusReceiver
Expand Down Expand Up @@ -90,7 +90,7 @@ def __init__(
self._auth_uri = "{}/{}".format(self._auth_uri, self._entity_name)
# Internal flag for switching whether to apply connection sharing, pending fix in uamqp library
self._connection_sharing = False
self._handlers = [] # type: List[BaseHandler]
self._handlers = WeakSet() # type: WeakSet

async def __aenter__(self):
if self._connection_sharing:
Expand Down Expand Up @@ -171,7 +171,7 @@ async def close(self) -> None:
handler._container_id, # pylint: disable=protected-access
exception,
)
del self._handlers[:]
self._handlers.clear()

if self._connection_sharing and self._connection:
await self._connection.destroy_async()
Expand Down Expand Up @@ -214,7 +214,7 @@ def get_queue_sender(self, queue_name: str, **kwargs: Any) -> ServiceBusSender:
retry_backoff_max=self._config.retry_backoff_max,
**kwargs
)
self._handlers.append(handler)
self._handlers.add(handler)
return handler

def get_queue_receiver(self, queue_name: str, **kwargs: Any) -> ServiceBusReceiver:
Expand Down Expand Up @@ -303,7 +303,7 @@ def get_queue_receiver(self, queue_name: str, **kwargs: Any) -> ServiceBusReceiv
retry_backoff_max=self._config.retry_backoff_max,
**kwargs
)
self._handlers.append(handler)
self._handlers.add(handler)
return handler

def get_topic_sender(self, topic_name: str, **kwargs: Any) -> ServiceBusSender:
Expand Down Expand Up @@ -343,7 +343,7 @@ def get_topic_sender(self, topic_name: str, **kwargs: Any) -> ServiceBusSender:
retry_backoff_max=self._config.retry_backoff_max,
**kwargs
)
self._handlers.append(handler)
self._handlers.add(handler)
return handler

def get_subscription_receiver(
Expand Down Expand Up @@ -453,5 +453,5 @@ def get_subscription_receiver(
retry_backoff_max=self._config.retry_backoff_max,
**kwargs
)
self._handlers.append(handler)
self._handlers.add(handler)
return handler
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
ServiceBusQueuePreparer,
ServiceBusNamespaceAuthorizationRulePreparer,
ServiceBusQueueAuthorizationRulePreparer,
CachedServiceBusQueuePreparer
CachedServiceBusQueuePreparer,
CachedServiceBusTopicPreparer,
CachedServiceBusSubscriptionPreparer
)
from utilities import get_logger

Expand Down Expand Up @@ -137,7 +139,9 @@ async def test_sb_client_writeonly_credentials_async(self, servicebus_authorizat
@CachedResourceGroupPreparer()
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
@CachedServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True)
async def test_async_sb_client_close_spawned_handlers(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
@CachedServiceBusTopicPreparer(name_prefix='servicebustest')
@CachedServiceBusSubscriptionPreparer(name_prefix='servicebustest')
async def test_async_sb_client_close_spawned_handlers(self, servicebus_namespace_connection_string, servicebus_queue, servicebus_topic, servicebus_subscription, **kwargs):
client = ServiceBusClient.from_connection_string(servicebus_namespace_connection_string)

await client.close()
Expand Down Expand Up @@ -174,6 +178,51 @@ async def test_async_sb_client_close_spawned_handlers(self, servicebus_namespace
assert not receiver._handler and not receiver._running
assert len(client._handlers) == 0

queue_sender = client.get_queue_sender(servicebus_queue.name)
queue_receiver = client.get_queue_receiver(servicebus_queue.name)
assert len(client._handlers) == 2
queue_sender = client.get_queue_sender(servicebus_queue.name)
queue_receiver = client.get_queue_receiver(servicebus_queue.name)
# the previous sender/receiver can not longer be referenced, there might be a delay in CPython
# to remove the reference, so len of handlers should be less than 4
assert len(client._handlers) < 4
await client.close()

queue_sender = client.get_queue_sender(servicebus_queue.name)
queue_receiver = client.get_queue_receiver(servicebus_queue.name)
assert len(client._handlers) == 2
queue_sender = None
queue_receiver = None
assert len(client._handlers) < 2

await client.close()
topic_sender = client.get_topic_sender(servicebus_topic.name)
subscription_receiver = client.get_subscription_receiver(servicebus_topic.name, servicebus_subscription.name)
assert len(client._handlers) == 2
topic_sender = None
subscription_receiver = None
# the previous sender/receiver can not longer be referenced, so len of handlers should just be 2 instead of 4
assert len(client._handlers) < 4

await client.close()
topic_sender = client.get_topic_sender(servicebus_topic.name)
subscription_receiver = client.get_subscription_receiver(servicebus_topic.name, servicebus_subscription.name)
assert len(client._handlers) == 2
topic_sender = client.get_topic_sender(servicebus_topic.name)
subscription_receiver = client.get_subscription_receiver(servicebus_topic.name, servicebus_subscription.name)
# the previous sender/receiver can not longer be referenced, so len of handlers should just be 2 instead of 4
assert len(client._handlers) < 4

await client.close()
for _ in range(5):
queue_sender = client.get_queue_sender(servicebus_queue.name)
queue_receiver = client.get_queue_receiver(servicebus_queue.name)
topic_sender = client.get_topic_sender(servicebus_topic.name)
subscription_receiver = client.get_subscription_receiver(servicebus_topic.name,
servicebus_subscription.name)
assert len(client._handlers) < 15
await client.close()

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
Expand Down
53 changes: 51 additions & 2 deletions sdk/servicebus/azure-servicebus/tests/test_sb_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
ServiceBusQueuePreparer,
ServiceBusNamespaceAuthorizationRulePreparer,
ServiceBusQueueAuthorizationRulePreparer,
CachedServiceBusQueuePreparer
CachedServiceBusQueuePreparer,
CachedServiceBusTopicPreparer,
CachedServiceBusSubscriptionPreparer
)

class ServiceBusClientTests(AzureMgmtTestCase):
Expand Down Expand Up @@ -201,7 +203,9 @@ def test_sb_client_incorrect_queue_conn_str(self, servicebus_queue_authorization
@CachedResourceGroupPreparer()
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
@CachedServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True)
def test_sb_client_close_spawned_handlers(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
@CachedServiceBusTopicPreparer(name_prefix='servicebustest')
@CachedServiceBusSubscriptionPreparer(name_prefix='servicebustest')
def test_sb_client_close_spawned_handlers(self, servicebus_namespace_connection_string, servicebus_queue, servicebus_topic, servicebus_subscription, **kwargs):
client = ServiceBusClient.from_connection_string(servicebus_namespace_connection_string)

client.close()
Expand Down Expand Up @@ -238,6 +242,51 @@ def test_sb_client_close_spawned_handlers(self, servicebus_namespace_connection_
assert not receiver._handler and not receiver._running
assert len(client._handlers) == 0

queue_sender = client.get_queue_sender(servicebus_queue.name)
queue_receiver = client.get_queue_receiver(servicebus_queue.name)
assert len(client._handlers) == 2
queue_sender = client.get_queue_sender(servicebus_queue.name)
queue_receiver = client.get_queue_receiver(servicebus_queue.name)
# the previous sender/receiver can not longer be referenced, there might be a delay in CPython
# to remove the reference, so len of handlers should be less than 4
assert len(client._handlers) < 4
client.close()

queue_sender = client.get_queue_sender(servicebus_queue.name)
queue_receiver = client.get_queue_receiver(servicebus_queue.name)
assert len(client._handlers) == 2
queue_sender = None
queue_receiver = None
assert len(client._handlers) < 2

client.close()
topic_sender = client.get_topic_sender(servicebus_topic.name)
subscription_receiver = client.get_subscription_receiver(servicebus_topic.name, servicebus_subscription.name)
assert len(client._handlers) == 2
topic_sender = None
subscription_receiver = None
# the previous sender/receiver can not longer be referenced, so len of handlers should just be 2 instead of 4
assert len(client._handlers) < 4

client.close()
topic_sender = client.get_topic_sender(servicebus_topic.name)
subscription_receiver = client.get_subscription_receiver(servicebus_topic.name, servicebus_subscription.name)
assert len(client._handlers) == 2
topic_sender = client.get_topic_sender(servicebus_topic.name)
subscription_receiver = client.get_subscription_receiver(servicebus_topic.name, servicebus_subscription.name)
# the previous sender/receiver can not longer be referenced, so len of handlers should just be 2 instead of 4
assert len(client._handlers) < 4

client.close()
for _ in range(5):
queue_sender = client.get_queue_sender(servicebus_queue.name)
queue_receiver = client.get_queue_receiver(servicebus_queue.name)
topic_sender = client.get_topic_sender(servicebus_topic.name)
subscription_receiver = client.get_subscription_receiver(servicebus_topic.name,
servicebus_subscription.name)
assert len(client._handlers) < 15
client.close()

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer()
Expand Down

0 comments on commit 356d974

Please sign in to comment.