diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index e3b0fe19c3acb..8d10e78de39cf 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -6,6 +6,15 @@ - Fixed bug to make AMQP exceptions retryable by default, if condition is not non-retryable, to ensure that InternalServerErrors are retried. +### Features Added + +- The `ServiceBusClient` constructor now accepts optional `custom_endpoint_address` argument +which allows for specifying a custom endpoint to use when communicating with the Service Bus service, +and is useful when your network does not allow communicating to the standard Service Bus endpoint. +- The `ServiceBusClient`constructor now accepts optional `connection_verify` argument +which allows for specifying the path to the custom CA_BUNDLE file of the SSL certificate which is used to authenticate +the identity of the connection endpoint. + ## 7.6.1 (2022-04-11) ### Other Changes diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/_configuration.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/_configuration.py index 7eb6de1017b37..a445e497b6120 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/_configuration.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/_configuration.py @@ -3,8 +3,9 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- from typing import Optional, Dict, Any +from urllib.parse import urlparse -from uamqp.constants import TransportType +from uamqp.constants import TransportType, DEFAULT_AMQP_WSS_PORT, DEFAULT_AMQPS_PORT from azure.core.pipeline.policies import RetryMode @@ -19,6 +20,12 @@ def __init__(self, **kwargs): self.retry_backoff_max = kwargs.get("retry_backoff_max", 120) # type: int self.logging_enable = kwargs.get("logging_enable", False) # type: bool self.http_proxy = kwargs.get("http_proxy") # type: Optional[Dict[str, Any]] + + self.custom_endpoint_address = kwargs.get("custom_endpoint_address") # type: Optional[str] + self.connection_verify = kwargs.get("connection_verify") # type: Optional[str] + self.connection_port = DEFAULT_AMQPS_PORT + self.custom_endpoint_hostname = None + self.transport_type = ( TransportType.AmqpOverWebsocket if self.http_proxy @@ -30,3 +37,19 @@ def __init__(self, **kwargs): self.auto_reconnect = kwargs.get("auto_reconnect", True) self.keep_alive = kwargs.get("keep_alive", 30) self.timeout = kwargs.get("timeout", 60) # type: float + + if self.http_proxy or self.transport_type == TransportType.AmqpOverWebsocket: + self.transport_type = TransportType.AmqpOverWebsocket + self.connection_port = DEFAULT_AMQP_WSS_PORT + + # custom end point + if self.custom_endpoint_address: + # if the custom_endpoint_address doesn't include the schema, + # we prepend a default one to make urlparse work + if self.custom_endpoint_address.find("//") == -1: + self.custom_endpoint_address = "sb://" + self.custom_endpoint_address + endpoint = urlparse(self.custom_endpoint_address) + self.transport_type = TransportType.AmqpOverWebsocket + self.custom_endpoint_hostname = endpoint.hostname + # in case proxy and custom endpoint are both provided, we default port to 443 if it's not provided + self.connection_port = endpoint.port or DEFAULT_AMQP_WSS_PORT diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py index 8c2783d96bd30..96e10df347c20 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py @@ -173,6 +173,9 @@ def create_authentication(client): timeout=client._config.auth_timeout, http_proxy=client._config.http_proxy, transport_type=client._config.transport_type, + custom_endpoint_hostname=client._config.custom_endpoint_hostname, + port=client._config.connection_port, + verify=client._config.connection_verify ) auth.update_token() return auth @@ -185,6 +188,9 @@ def create_authentication(client): http_proxy=client._config.http_proxy, transport_type=client._config.transport_type, refresh_window=300, + custom_endpoint_hostname=client._config.custom_endpoint_hostname, + port=client._config.connection_port, + verify=client._config.connection_verify ) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py index b8804ffdb8cda..e51204bc5a387 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py @@ -76,6 +76,14 @@ class ServiceBusClient(object): # pylint: disable=client-accepts-api-version-key :keyword retry_mode: The delay behavior between retry attempts. Supported values are "fixed" or "exponential", where default is "exponential". :paramtype retry_mode: str + :keyword str custom_endpoint_address: The custom endpoint address to use for establishing a connection to + the Service Bus service, allowing network requests to be routed through any application gateways or + other paths needed for the host environment. Default is None. + The format would be like "sb://:". + If port is not specified in the `custom_endpoint_address`, by default port 443 will be used. + :keyword str connection_verify: Path to the custom CA_BUNDLE file of the SSL certificate which is used to + authenticate the identity of the connection endpoint. + Default is None in which case `certifi.where()` will be used. .. admonition:: Example: @@ -124,6 +132,9 @@ def __init__( self._connection_sharing = False self._handlers = WeakSet() # type: WeakSet + self._custom_endpoint_address = kwargs.get('custom_endpoint_address') + self._connection_verify = kwargs.get("connection_verify") + def __enter__(self): if self._connection_sharing: self._create_uamqp_connection() @@ -196,6 +207,14 @@ def from_connection_string( :keyword retry_mode: The delay behavior between retry attempts. Supported values are 'fixed' or 'exponential', where default is 'exponential'. :paramtype retry_mode: str + :keyword str custom_endpoint_address: The custom endpoint address to use for establishing a connection to + the Service Bus service, allowing network requests to be routed through any application gateways or + other paths needed for the host environment. Default is None. + The format would be like "sb://:". + If port is not specified in the custom_endpoint_address, by default port 443 will be used. + :keyword str connection_verify: Path to the custom CA_BUNDLE file of the SSL certificate which is used to + authenticate the identity of the connection endpoint. + Default is None in which case `certifi.where()` will be used. :rtype: ~azure.servicebus.ServiceBusClient .. admonition:: Example: @@ -264,6 +283,8 @@ def get_queue_sender(self, queue_name, **kwargs): retry_total=self._config.retry_total, retry_backoff_factor=self._config.retry_backoff_factor, retry_backoff_max=self._config.retry_backoff_max, + custom_endpoint_address=self._custom_endpoint_address, + connection_verify=self._connection_verify, **kwargs ) self._handlers.add(handler) @@ -373,6 +394,8 @@ def get_queue_receiver( max_wait_time=max_wait_time, auto_lock_renewer=auto_lock_renewer, prefetch_count=prefetch_count, + custom_endpoint_address=self._custom_endpoint_address, + connection_verify=self._connection_verify, **kwargs ) self._handlers.add(handler) @@ -415,6 +438,8 @@ def get_topic_sender(self, topic_name, **kwargs): retry_total=self._config.retry_total, retry_backoff_factor=self._config.retry_backoff_factor, retry_backoff_max=self._config.retry_backoff_max, + custom_endpoint_address=self._custom_endpoint_address, + connection_verify=self._connection_verify, **kwargs ) self._handlers.add(handler) @@ -523,6 +548,8 @@ def get_subscription_receiver( max_wait_time=max_wait_time, auto_lock_renewer=auto_lock_renewer, prefetch_count=prefetch_count, + custom_endpoint_address=self._custom_endpoint_address, + connection_verify=self._connection_verify, **kwargs ) except ValueError: @@ -550,6 +577,8 @@ def get_subscription_receiver( max_wait_time=max_wait_time, auto_lock_renewer=auto_lock_renewer, prefetch_count=prefetch_count, + custom_endpoint_address=self._custom_endpoint_address, + connection_verify=self._connection_verify, **kwargs ) self._handlers.add(handler) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py index c601d92d68f26..8f70f6cb49209 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py @@ -116,6 +116,9 @@ def __init__( self._connection_sharing = False self._handlers = WeakSet() # type: WeakSet + self._custom_endpoint_address = kwargs.get("custom_endpoint_address") + self._connection_verify = kwargs.get("connection_verify") + async def __aenter__(self): if self._connection_sharing: await self._create_uamqp_connection() @@ -253,6 +256,8 @@ def get_queue_sender(self, queue_name: str, **kwargs: Any) -> ServiceBusSender: retry_total=self._config.retry_total, retry_backoff_factor=self._config.retry_backoff_factor, retry_backoff_max=self._config.retry_backoff_max, + custom_endpoint_address=self._custom_endpoint_address, + connection_verify=self._connection_verify, **kwargs ) self._handlers.add(handler) @@ -361,6 +366,8 @@ def get_queue_receiver( max_wait_time=max_wait_time, auto_lock_renewer=auto_lock_renewer, prefetch_count=prefetch_count, + custom_endpoint_address=self._custom_endpoint_address, + connection_verify=self._connection_verify, **kwargs ) self._handlers.add(handler) @@ -402,6 +409,8 @@ def get_topic_sender(self, topic_name: str, **kwargs: Any) -> ServiceBusSender: retry_total=self._config.retry_total, retry_backoff_factor=self._config.retry_backoff_factor, retry_backoff_max=self._config.retry_backoff_max, + custom_endpoint_address=self._custom_endpoint_address, + connection_verify=self._connection_verify, **kwargs ) self._handlers.add(handler) @@ -510,6 +519,8 @@ def get_subscription_receiver( max_wait_time=max_wait_time, auto_lock_renewer=auto_lock_renewer, prefetch_count=prefetch_count, + custom_endpoint_address=self._custom_endpoint_address, + connection_verify=self._connection_verify, **kwargs ) except ValueError: @@ -537,6 +548,8 @@ def get_subscription_receiver( max_wait_time=max_wait_time, auto_lock_renewer=auto_lock_renewer, prefetch_count=prefetch_count, + custom_endpoint_address=self._custom_endpoint_address, + connection_verify=self._connection_verify, **kwargs ) self._handlers.add(handler) diff --git a/sdk/servicebus/azure-servicebus/samples/async_samples/connection_to_custom_endpoint_address_async.py b/sdk/servicebus/azure-servicebus/samples/async_samples/connection_to_custom_endpoint_address_async.py new file mode 100644 index 0000000000000..c13a55520dcff --- /dev/null +++ b/sdk/servicebus/azure-servicebus/samples/async_samples/connection_to_custom_endpoint_address_async.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +Examples to show how to create async EventHubProducerClient and EventHubConsumerClient that connect to custom endpoint. +""" + +import os +import asyncio +from azure.servicebus import ServiceBusMessage +from azure.servicebus.aio import ServiceBusClient + + +CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR'] +QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"] +# The custom endpoint address to use for establishing a connection to the Service Bus service, +# allowing network requests to be routed through any application gateways +# or other paths needed for the host environment. +CUSTOM_ENDPOINT_ADDRESS = 'sb://:' +# The optional absolute path to the custom certificate file used by client to authenticate the +# identity of the connection endpoint in the case that endpoint has its own issued CA. +# If not set, the certifi library will be used to load certificates. +CUSTOM_CA_BUNDLE_PATH = '' + +async def send_single_message(sender): + message = ServiceBusMessage("Single Message") + sender.send_messages(message) + +async def main(): + servicebus_client = ServiceBusClient.from_connection_string( + conn_str=CONNECTION_STR, + custom_endpoint_address=CUSTOM_ENDPOINT_ADDRESS, + connection_verify=CUSTOM_CA_BUNDLE_PATH + ) + async with servicebus_client: + sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME) + async with sender: + await send_single_message(sender) + print("Send message is done.") + + +asyncio.run(main()) + diff --git a/sdk/servicebus/azure-servicebus/samples/sync_samples/connection_to_custom_endpoint_address.py b/sdk/servicebus/azure-servicebus/samples/sync_samples/connection_to_custom_endpoint_address.py new file mode 100644 index 0000000000000..e27b9e20be6f9 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/samples/sync_samples/connection_to_custom_endpoint_address.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +Examples to show how to create async EventHubProducerClient and EventHubConsumerClient that connect to custom endpoint. +""" + +import os +from azure.servicebus import ServiceBusClient, ServiceBusMessage + + +CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR'] +QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"] +# The custom endpoint address to use for establishing a connection to the Service Bus service, +# allowing network requests to be routed through any application gateways +# or other paths needed for the host environment. +CUSTOM_ENDPOINT_ADDRESS = 'sb://:' +# The optional absolute path to the custom certificate file used by client to authenticate the +# identity of the connection endpoint in the case that endpoint has its own issued CA. +# If not set, the certifi library will be used to load certificates. +CUSTOM_CA_BUNDLE_PATH = '' + +def send_single_message(sender): + message = ServiceBusMessage("Single Message") + sender.send_messages(message) + +servicebus_client = ServiceBusClient.from_connection_string( + conn_str=CONNECTION_STR, + logging_enable=True, + custom_endpoint_address=CUSTOM_ENDPOINT_ADDRESS, + connection_verify=CUSTOM_CA_BUNDLE_PATH + ) +with servicebus_client: + sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME) + with sender: + send_single_message(sender)