From 356d974f81cf1a066ecc269fd1bd5c8ebdf6947b Mon Sep 17 00:00:00 2001 From: "Adam Ling (MSFT)" Date: Wed, 1 Sep 2021 08:53:34 -0700 Subject: [PATCH] [ServiceBus] improve memory usage of service bus client (#19915) * improve memory usage of service bus client * remove set * Update sdk/servicebus/azure-servicebus/CHANGELOG.md Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com> * revert set * update impl to avoid access private ivars * fix bug * use weakref * fix pylint Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com> --- sdk/servicebus/azure-servicebus/CHANGELOG.md | 9 +--- .../azure/servicebus/_servicebus_client.py | 17 +++--- .../aio/_servicebus_client_async.py | 16 +++--- .../tests/async_tests/test_sb_client_async.py | 53 ++++++++++++++++++- .../azure-servicebus/tests/test_sb_client.py | 53 ++++++++++++++++++- 5 files changed, 121 insertions(+), 27 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index 37acf091c94e..98649d9dc29a 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -1,15 +1,10 @@ # Release History -## 7.3.3 (Unreleased) - -### Features Added - -### Breaking Changes +## 7.3.3 (2021-09-08) ### Bugs Fixed -### Other Changes - +- Improved memory usage of `ServiceBusClient` to automatically discard spawned `ServiceBusSender` or `ServiceBusReceiver` from its handler set when no strong reference to the sender or receiver exists anymore. - Reduced CPU load of `azure.servicebus.AutoLockRenewer` during lock renewal. ## 7.3.2 (2021-08-10) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py index 79cd76996abe..28db78a31cc9 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py @@ -2,8 +2,9 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -from typing import Any, List, Union, TYPE_CHECKING +from typing import Any, Union, TYPE_CHECKING import logging +from weakref import WeakSet import uamqp @@ -11,7 +12,6 @@ _parse_conn_str, ServiceBusSharedKeyCredential, ServiceBusSASTokenCredential, - BaseHandler, ) from ._servicebus_sender import ServiceBusSender from ._servicebus_receiver import ServiceBusReceiver @@ -89,7 +89,7 @@ def __init__(self, fully_qualified_namespace, credential, **kwargs): self._auth_uri = "{}/{}".format(self._auth_uri, self._entity_name) # Internal flag for switching whether to apply connection sharing, pending fix in uamqp library self._connection_sharing = False - self._handlers = [] # type: List[BaseHandler] + self._handlers = WeakSet() # type: WeakSet def __enter__(self): if self._connection_sharing: @@ -124,7 +124,8 @@ def close(self): handler._container_id, # pylint: disable=protected-access exception, ) - del self._handlers[:] + + self._handlers.clear() if self._connection_sharing and self._connection: self._connection.destroy() @@ -216,7 +217,7 @@ def get_queue_sender(self, queue_name, **kwargs): retry_backoff_max=self._config.retry_backoff_max, **kwargs ) - self._handlers.append(handler) + self._handlers.add(handler) return handler def get_queue_receiver(self, queue_name, **kwargs): @@ -307,7 +308,7 @@ def get_queue_receiver(self, queue_name, **kwargs): retry_backoff_max=self._config.retry_backoff_max, **kwargs ) - self._handlers.append(handler) + self._handlers.add(handler) return handler def get_topic_sender(self, topic_name, **kwargs): @@ -348,7 +349,7 @@ def get_topic_sender(self, topic_name, **kwargs): retry_backoff_max=self._config.retry_backoff_max, **kwargs ) - self._handlers.append(handler) + self._handlers.add(handler) return handler def get_subscription_receiver(self, topic_name, subscription_name, **kwargs): @@ -457,5 +458,5 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs): retry_backoff_max=self._config.retry_backoff_max, **kwargs ) - self._handlers.append(handler) + self._handlers.add(handler) return handler diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py index 5be437a0452a..2f7f76c338e0 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py @@ -2,8 +2,9 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -from typing import Any, List, Union, TYPE_CHECKING +from typing import Any, Union, TYPE_CHECKING import logging +from weakref import WeakSet import uamqp from azure.core.credentials import AzureSasCredential, AzureNamedKeyCredential @@ -12,7 +13,6 @@ from ._base_handler_async import ( ServiceBusSharedKeyCredential, ServiceBusSASTokenCredential, - BaseHandler, ) from ._servicebus_sender_async import ServiceBusSender from ._servicebus_receiver_async import ServiceBusReceiver @@ -90,7 +90,7 @@ def __init__( self._auth_uri = "{}/{}".format(self._auth_uri, self._entity_name) # Internal flag for switching whether to apply connection sharing, pending fix in uamqp library self._connection_sharing = False - self._handlers = [] # type: List[BaseHandler] + self._handlers = WeakSet() # type: WeakSet async def __aenter__(self): if self._connection_sharing: @@ -171,7 +171,7 @@ async def close(self) -> None: handler._container_id, # pylint: disable=protected-access exception, ) - del self._handlers[:] + self._handlers.clear() if self._connection_sharing and self._connection: await self._connection.destroy_async() @@ -214,7 +214,7 @@ def get_queue_sender(self, queue_name: str, **kwargs: Any) -> ServiceBusSender: retry_backoff_max=self._config.retry_backoff_max, **kwargs ) - self._handlers.append(handler) + self._handlers.add(handler) return handler def get_queue_receiver(self, queue_name: str, **kwargs: Any) -> ServiceBusReceiver: @@ -303,7 +303,7 @@ def get_queue_receiver(self, queue_name: str, **kwargs: Any) -> ServiceBusReceiv retry_backoff_max=self._config.retry_backoff_max, **kwargs ) - self._handlers.append(handler) + self._handlers.add(handler) return handler def get_topic_sender(self, topic_name: str, **kwargs: Any) -> ServiceBusSender: @@ -343,7 +343,7 @@ def get_topic_sender(self, topic_name: str, **kwargs: Any) -> ServiceBusSender: retry_backoff_max=self._config.retry_backoff_max, **kwargs ) - self._handlers.append(handler) + self._handlers.add(handler) return handler def get_subscription_receiver( @@ -453,5 +453,5 @@ def get_subscription_receiver( retry_backoff_max=self._config.retry_backoff_max, **kwargs ) - self._handlers.append(handler) + self._handlers.add(handler) return handler diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sb_client_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sb_client_async.py index 9ba977b8bf4a..ad294793755e 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sb_client_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sb_client_async.py @@ -25,7 +25,9 @@ ServiceBusQueuePreparer, ServiceBusNamespaceAuthorizationRulePreparer, ServiceBusQueueAuthorizationRulePreparer, - CachedServiceBusQueuePreparer + CachedServiceBusQueuePreparer, + CachedServiceBusTopicPreparer, + CachedServiceBusSubscriptionPreparer ) from utilities import get_logger @@ -137,7 +139,9 @@ async def test_sb_client_writeonly_credentials_async(self, servicebus_authorizat @CachedResourceGroupPreparer() @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') @CachedServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True) - async def test_async_sb_client_close_spawned_handlers(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + @CachedServiceBusTopicPreparer(name_prefix='servicebustest') + @CachedServiceBusSubscriptionPreparer(name_prefix='servicebustest') + async def test_async_sb_client_close_spawned_handlers(self, servicebus_namespace_connection_string, servicebus_queue, servicebus_topic, servicebus_subscription, **kwargs): client = ServiceBusClient.from_connection_string(servicebus_namespace_connection_string) await client.close() @@ -174,6 +178,51 @@ async def test_async_sb_client_close_spawned_handlers(self, servicebus_namespace assert not receiver._handler and not receiver._running assert len(client._handlers) == 0 + queue_sender = client.get_queue_sender(servicebus_queue.name) + queue_receiver = client.get_queue_receiver(servicebus_queue.name) + assert len(client._handlers) == 2 + queue_sender = client.get_queue_sender(servicebus_queue.name) + queue_receiver = client.get_queue_receiver(servicebus_queue.name) + # the previous sender/receiver can not longer be referenced, there might be a delay in CPython + # to remove the reference, so len of handlers should be less than 4 + assert len(client._handlers) < 4 + await client.close() + + queue_sender = client.get_queue_sender(servicebus_queue.name) + queue_receiver = client.get_queue_receiver(servicebus_queue.name) + assert len(client._handlers) == 2 + queue_sender = None + queue_receiver = None + assert len(client._handlers) < 2 + + await client.close() + topic_sender = client.get_topic_sender(servicebus_topic.name) + subscription_receiver = client.get_subscription_receiver(servicebus_topic.name, servicebus_subscription.name) + assert len(client._handlers) == 2 + topic_sender = None + subscription_receiver = None + # the previous sender/receiver can not longer be referenced, so len of handlers should just be 2 instead of 4 + assert len(client._handlers) < 4 + + await client.close() + topic_sender = client.get_topic_sender(servicebus_topic.name) + subscription_receiver = client.get_subscription_receiver(servicebus_topic.name, servicebus_subscription.name) + assert len(client._handlers) == 2 + topic_sender = client.get_topic_sender(servicebus_topic.name) + subscription_receiver = client.get_subscription_receiver(servicebus_topic.name, servicebus_subscription.name) + # the previous sender/receiver can not longer be referenced, so len of handlers should just be 2 instead of 4 + assert len(client._handlers) < 4 + + await client.close() + for _ in range(5): + queue_sender = client.get_queue_sender(servicebus_queue.name) + queue_receiver = client.get_queue_receiver(servicebus_queue.name) + topic_sender = client.get_topic_sender(servicebus_topic.name) + subscription_receiver = client.get_subscription_receiver(servicebus_topic.name, + servicebus_subscription.name) + assert len(client._handlers) < 15 + await client.close() + @pytest.mark.liveTest @pytest.mark.live_test_only @CachedResourceGroupPreparer(name_prefix='servicebustest') diff --git a/sdk/servicebus/azure-servicebus/tests/test_sb_client.py b/sdk/servicebus/azure-servicebus/tests/test_sb_client.py index 7a945818533a..b1d09ed30189 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_sb_client.py +++ b/sdk/servicebus/azure-servicebus/tests/test_sb_client.py @@ -29,7 +29,9 @@ ServiceBusQueuePreparer, ServiceBusNamespaceAuthorizationRulePreparer, ServiceBusQueueAuthorizationRulePreparer, - CachedServiceBusQueuePreparer + CachedServiceBusQueuePreparer, + CachedServiceBusTopicPreparer, + CachedServiceBusSubscriptionPreparer ) class ServiceBusClientTests(AzureMgmtTestCase): @@ -201,7 +203,9 @@ def test_sb_client_incorrect_queue_conn_str(self, servicebus_queue_authorization @CachedResourceGroupPreparer() @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') @CachedServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True) - def test_sb_client_close_spawned_handlers(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + @CachedServiceBusTopicPreparer(name_prefix='servicebustest') + @CachedServiceBusSubscriptionPreparer(name_prefix='servicebustest') + def test_sb_client_close_spawned_handlers(self, servicebus_namespace_connection_string, servicebus_queue, servicebus_topic, servicebus_subscription, **kwargs): client = ServiceBusClient.from_connection_string(servicebus_namespace_connection_string) client.close() @@ -238,6 +242,51 @@ def test_sb_client_close_spawned_handlers(self, servicebus_namespace_connection_ assert not receiver._handler and not receiver._running assert len(client._handlers) == 0 + queue_sender = client.get_queue_sender(servicebus_queue.name) + queue_receiver = client.get_queue_receiver(servicebus_queue.name) + assert len(client._handlers) == 2 + queue_sender = client.get_queue_sender(servicebus_queue.name) + queue_receiver = client.get_queue_receiver(servicebus_queue.name) + # the previous sender/receiver can not longer be referenced, there might be a delay in CPython + # to remove the reference, so len of handlers should be less than 4 + assert len(client._handlers) < 4 + client.close() + + queue_sender = client.get_queue_sender(servicebus_queue.name) + queue_receiver = client.get_queue_receiver(servicebus_queue.name) + assert len(client._handlers) == 2 + queue_sender = None + queue_receiver = None + assert len(client._handlers) < 2 + + client.close() + topic_sender = client.get_topic_sender(servicebus_topic.name) + subscription_receiver = client.get_subscription_receiver(servicebus_topic.name, servicebus_subscription.name) + assert len(client._handlers) == 2 + topic_sender = None + subscription_receiver = None + # the previous sender/receiver can not longer be referenced, so len of handlers should just be 2 instead of 4 + assert len(client._handlers) < 4 + + client.close() + topic_sender = client.get_topic_sender(servicebus_topic.name) + subscription_receiver = client.get_subscription_receiver(servicebus_topic.name, servicebus_subscription.name) + assert len(client._handlers) == 2 + topic_sender = client.get_topic_sender(servicebus_topic.name) + subscription_receiver = client.get_subscription_receiver(servicebus_topic.name, servicebus_subscription.name) + # the previous sender/receiver can not longer be referenced, so len of handlers should just be 2 instead of 4 + assert len(client._handlers) < 4 + + client.close() + for _ in range(5): + queue_sender = client.get_queue_sender(servicebus_queue.name) + queue_receiver = client.get_queue_receiver(servicebus_queue.name) + topic_sender = client.get_topic_sender(servicebus_topic.name) + subscription_receiver = client.get_subscription_receiver(servicebus_topic.name, + servicebus_subscription.name) + assert len(client._handlers) < 15 + client.close() + @pytest.mark.liveTest @pytest.mark.live_test_only @CachedResourceGroupPreparer()