Skip to content

Commit

Permalink
[servicebus] uamqp custom endpoint support (#24582)
Browse files Browse the repository at this point in the history
* starting to add ce capability for sync

* sync ce pass through

* add ce async

* async test

* update sync test

* update sync test

* trailing whitespace

* updating changelog

* pr comments

* removing fix for other pr
  • Loading branch information
l0lawrence authored Jun 2, 2022
1 parent 62ffa83 commit 169e608
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 1 deletion.
9 changes: 9 additions & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@

- Fixed bug to make AMQP exceptions retryable by default, if condition is not non-retryable, to ensure that InternalServerErrors are retried.

### Features Added

- The `ServiceBusClient` constructor now accepts optional `custom_endpoint_address` argument
which allows for specifying a custom endpoint to use when communicating with the Service Bus service,
and is useful when your network does not allow communicating to the standard Service Bus endpoint.
- The `ServiceBusClient`constructor now accepts optional `connection_verify` argument
which allows for specifying the path to the custom CA_BUNDLE file of the SSL certificate which is used to authenticate
the identity of the connection endpoint.

## 7.6.1 (2022-04-11)

### Other Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Optional, Dict, Any
from urllib.parse import urlparse

from uamqp.constants import TransportType
from uamqp.constants import TransportType, DEFAULT_AMQP_WSS_PORT, DEFAULT_AMQPS_PORT
from azure.core.pipeline.policies import RetryMode


Expand All @@ -19,6 +20,12 @@ def __init__(self, **kwargs):
self.retry_backoff_max = kwargs.get("retry_backoff_max", 120) # type: int
self.logging_enable = kwargs.get("logging_enable", False) # type: bool
self.http_proxy = kwargs.get("http_proxy") # type: Optional[Dict[str, Any]]

self.custom_endpoint_address = kwargs.get("custom_endpoint_address") # type: Optional[str]
self.connection_verify = kwargs.get("connection_verify") # type: Optional[str]
self.connection_port = DEFAULT_AMQPS_PORT
self.custom_endpoint_hostname = None

self.transport_type = (
TransportType.AmqpOverWebsocket
if self.http_proxy
Expand All @@ -30,3 +37,19 @@ def __init__(self, **kwargs):
self.auto_reconnect = kwargs.get("auto_reconnect", True)
self.keep_alive = kwargs.get("keep_alive", 30)
self.timeout = kwargs.get("timeout", 60) # type: float

if self.http_proxy or self.transport_type == TransportType.AmqpOverWebsocket:
self.transport_type = TransportType.AmqpOverWebsocket
self.connection_port = DEFAULT_AMQP_WSS_PORT

# custom end point
if self.custom_endpoint_address:
# if the custom_endpoint_address doesn't include the schema,
# we prepend a default one to make urlparse work
if self.custom_endpoint_address.find("//") == -1:
self.custom_endpoint_address = "sb://" + self.custom_endpoint_address
endpoint = urlparse(self.custom_endpoint_address)
self.transport_type = TransportType.AmqpOverWebsocket
self.custom_endpoint_hostname = endpoint.hostname
# in case proxy and custom endpoint are both provided, we default port to 443 if it's not provided
self.connection_port = endpoint.port or DEFAULT_AMQP_WSS_PORT
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ def create_authentication(client):
timeout=client._config.auth_timeout,
http_proxy=client._config.http_proxy,
transport_type=client._config.transport_type,
custom_endpoint_hostname=client._config.custom_endpoint_hostname,
port=client._config.connection_port,
verify=client._config.connection_verify
)
auth.update_token()
return auth
Expand All @@ -185,6 +188,9 @@ def create_authentication(client):
http_proxy=client._config.http_proxy,
transport_type=client._config.transport_type,
refresh_window=300,
custom_endpoint_hostname=client._config.custom_endpoint_hostname,
port=client._config.connection_port,
verify=client._config.connection_verify
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ class ServiceBusClient(object): # pylint: disable=client-accepts-api-version-key
:keyword retry_mode: The delay behavior between retry attempts. Supported values are "fixed" or "exponential",
where default is "exponential".
:paramtype retry_mode: str
:keyword str custom_endpoint_address: The custom endpoint address to use for establishing a connection to
the Service Bus service, allowing network requests to be routed through any application gateways or
other paths needed for the host environment. Default is None.
The format would be like "sb://<custom_endpoint_hostname>:<custom_endpoint_port>".
If port is not specified in the `custom_endpoint_address`, by default port 443 will be used.
:keyword str connection_verify: Path to the custom CA_BUNDLE file of the SSL certificate which is used to
authenticate the identity of the connection endpoint.
Default is None in which case `certifi.where()` will be used.
.. admonition:: Example:
Expand Down Expand Up @@ -124,6 +132,9 @@ def __init__(
self._connection_sharing = False
self._handlers = WeakSet() # type: WeakSet

self._custom_endpoint_address = kwargs.get('custom_endpoint_address')
self._connection_verify = kwargs.get("connection_verify")

def __enter__(self):
if self._connection_sharing:
self._create_uamqp_connection()
Expand Down Expand Up @@ -196,6 +207,14 @@ def from_connection_string(
:keyword retry_mode: The delay behavior between retry attempts. Supported values are 'fixed' or 'exponential',
where default is 'exponential'.
:paramtype retry_mode: str
:keyword str custom_endpoint_address: The custom endpoint address to use for establishing a connection to
the Service Bus service, allowing network requests to be routed through any application gateways or
other paths needed for the host environment. Default is None.
The format would be like "sb://<custom_endpoint_hostname>:<custom_endpoint_port>".
If port is not specified in the custom_endpoint_address, by default port 443 will be used.
:keyword str connection_verify: Path to the custom CA_BUNDLE file of the SSL certificate which is used to
authenticate the identity of the connection endpoint.
Default is None in which case `certifi.where()` will be used.
:rtype: ~azure.servicebus.ServiceBusClient
.. admonition:: Example:
Expand Down Expand Up @@ -264,6 +283,8 @@ def get_queue_sender(self, queue_name, **kwargs):
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
custom_endpoint_address=self._custom_endpoint_address,
connection_verify=self._connection_verify,
**kwargs
)
self._handlers.add(handler)
Expand Down Expand Up @@ -373,6 +394,8 @@ def get_queue_receiver(
max_wait_time=max_wait_time,
auto_lock_renewer=auto_lock_renewer,
prefetch_count=prefetch_count,
custom_endpoint_address=self._custom_endpoint_address,
connection_verify=self._connection_verify,
**kwargs
)
self._handlers.add(handler)
Expand Down Expand Up @@ -415,6 +438,8 @@ def get_topic_sender(self, topic_name, **kwargs):
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
custom_endpoint_address=self._custom_endpoint_address,
connection_verify=self._connection_verify,
**kwargs
)
self._handlers.add(handler)
Expand Down Expand Up @@ -523,6 +548,8 @@ def get_subscription_receiver(
max_wait_time=max_wait_time,
auto_lock_renewer=auto_lock_renewer,
prefetch_count=prefetch_count,
custom_endpoint_address=self._custom_endpoint_address,
connection_verify=self._connection_verify,
**kwargs
)
except ValueError:
Expand Down Expand Up @@ -550,6 +577,8 @@ def get_subscription_receiver(
max_wait_time=max_wait_time,
auto_lock_renewer=auto_lock_renewer,
prefetch_count=prefetch_count,
custom_endpoint_address=self._custom_endpoint_address,
connection_verify=self._connection_verify,
**kwargs
)
self._handlers.add(handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ def __init__(
self._connection_sharing = False
self._handlers = WeakSet() # type: WeakSet

self._custom_endpoint_address = kwargs.get("custom_endpoint_address")
self._connection_verify = kwargs.get("connection_verify")

async def __aenter__(self):
if self._connection_sharing:
await self._create_uamqp_connection()
Expand Down Expand Up @@ -253,6 +256,8 @@ def get_queue_sender(self, queue_name: str, **kwargs: Any) -> ServiceBusSender:
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
custom_endpoint_address=self._custom_endpoint_address,
connection_verify=self._connection_verify,
**kwargs
)
self._handlers.add(handler)
Expand Down Expand Up @@ -361,6 +366,8 @@ def get_queue_receiver(
max_wait_time=max_wait_time,
auto_lock_renewer=auto_lock_renewer,
prefetch_count=prefetch_count,
custom_endpoint_address=self._custom_endpoint_address,
connection_verify=self._connection_verify,
**kwargs
)
self._handlers.add(handler)
Expand Down Expand Up @@ -402,6 +409,8 @@ def get_topic_sender(self, topic_name: str, **kwargs: Any) -> ServiceBusSender:
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
custom_endpoint_address=self._custom_endpoint_address,
connection_verify=self._connection_verify,
**kwargs
)
self._handlers.add(handler)
Expand Down Expand Up @@ -510,6 +519,8 @@ def get_subscription_receiver(
max_wait_time=max_wait_time,
auto_lock_renewer=auto_lock_renewer,
prefetch_count=prefetch_count,
custom_endpoint_address=self._custom_endpoint_address,
connection_verify=self._connection_verify,
**kwargs
)
except ValueError:
Expand Down Expand Up @@ -537,6 +548,8 @@ def get_subscription_receiver(
max_wait_time=max_wait_time,
auto_lock_renewer=auto_lock_renewer,
prefetch_count=prefetch_count,
custom_endpoint_address=self._custom_endpoint_address,
connection_verify=self._connection_verify,
**kwargs
)
self._handlers.add(handler)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

"""
Examples to show how to create async EventHubProducerClient and EventHubConsumerClient that connect to custom endpoint.
"""

import os
import asyncio
from azure.servicebus import ServiceBusMessage
from azure.servicebus.aio import ServiceBusClient


CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR']
QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"]
# The custom endpoint address to use for establishing a connection to the Service Bus service,
# allowing network requests to be routed through any application gateways
# or other paths needed for the host environment.
CUSTOM_ENDPOINT_ADDRESS = 'sb://<custom_endpoint_hostname>:<custom_endpoint_port>'
# The optional absolute path to the custom certificate file used by client to authenticate the
# identity of the connection endpoint in the case that endpoint has its own issued CA.
# If not set, the certifi library will be used to load certificates.
CUSTOM_CA_BUNDLE_PATH = '<your_custom_ca_bundle_file_path>'

async def send_single_message(sender):
message = ServiceBusMessage("Single Message")
sender.send_messages(message)

async def main():
servicebus_client = ServiceBusClient.from_connection_string(
conn_str=CONNECTION_STR,
custom_endpoint_address=CUSTOM_ENDPOINT_ADDRESS,
connection_verify=CUSTOM_CA_BUNDLE_PATH
)
async with servicebus_client:
sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
async with sender:
await send_single_message(sender)
print("Send message is done.")


asyncio.run(main())

Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

"""
Examples to show how to create async EventHubProducerClient and EventHubConsumerClient that connect to custom endpoint.
"""

import os
from azure.servicebus import ServiceBusClient, ServiceBusMessage


CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR']
QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"]
# The custom endpoint address to use for establishing a connection to the Service Bus service,
# allowing network requests to be routed through any application gateways
# or other paths needed for the host environment.
CUSTOM_ENDPOINT_ADDRESS = 'sb://<custom_endpoint_hostname>:<custom_endpoint_port>'
# The optional absolute path to the custom certificate file used by client to authenticate the
# identity of the connection endpoint in the case that endpoint has its own issued CA.
# If not set, the certifi library will be used to load certificates.
CUSTOM_CA_BUNDLE_PATH = '<your_custom_ca_bundle_file_path>'

def send_single_message(sender):
message = ServiceBusMessage("Single Message")
sender.send_messages(message)

servicebus_client = ServiceBusClient.from_connection_string(
conn_str=CONNECTION_STR,
logging_enable=True,
custom_endpoint_address=CUSTOM_ENDPOINT_ADDRESS,
connection_verify=CUSTOM_CA_BUNDLE_PATH
)
with servicebus_client:
sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
with sender:
send_single_message(sender)

0 comments on commit 169e608

Please sign in to comment.