Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHubs] kwargs/error testing #27065

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,6 @@ def receive(self, batch=False, max_batch_size=300, max_wait_time=None):
break
except Exception as exception: # pylint: disable=broad-except
self._amqp_transport.check_link_stolen(self, exception)
if not self.running: # exit by close
return
if self._last_received_event:
self._offset = self._last_received_event.offset
last_exception = self._handle_exception(exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ def _do_receive(self, partition_id, consumer):
error,
)
self._process_error(self._partition_contexts[partition_id], error)
# TODO: close consumer if non-retryable. does OWNERSHIP_LOST make sense for all errors?
self._close_consumer(partition_id, consumer, CloseReason.OWNERSHIP_LOST)

def start(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from __future__ import absolute_import, unicode_literals

import errno
from multiprocessing import AuthenticationError
import re
import socket
import ssl
Expand All @@ -57,6 +58,7 @@
TransportType,
AMQP_WS_SUBPROTOCOL,
)
from .error import AuthenticationException, ErrorCondition


try:
Expand Down Expand Up @@ -495,7 +497,11 @@ def __init__(

def _setup_transport(self):
"""Wrap the socket in an SSL object."""
self.sock = self._wrap_socket(self.sock, **self.sslopts)
try:
self.sock = self._wrap_socket(self.sock, **self.sslopts)
except FileNotFoundError as exc:
# TODO: invalid connection_verify, should we raise some other error?
raise
self.sock.do_handshake()
self._quick_recv = self.sock.recv

Expand Down Expand Up @@ -684,7 +690,7 @@ def connect(self):
if username or password:
http_proxy_auth = (username, password)
try:
from websocket import create_connection
from websocket import create_connection, WebSocketAddressException

self.ws = create_connection(
url="wss://{}".format(self._custom_endpoint or self._host),
Expand All @@ -696,6 +702,12 @@ def connect(self):
http_proxy_port=http_proxy_port,
http_proxy_auth=http_proxy_auth,
)
except WebSocketAddressException as exc:
raise AuthenticationException(
ErrorCondition.SocketError, # TODO: ClientError?
description="Failed to authenticate the connection due to exception: " + str(exc),
error=exc,
)
except ImportError:
raise ValueError(
"Please install websocket-client library to use websocket transport."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
AMQP_PORT,
TIMEOUT_INTERVAL,
)
from ..error import AuthenticationException, ErrorCondition


_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -464,7 +465,7 @@ async def connect(self):
password = self._http_proxy.get("password", None)

try:
from aiohttp import ClientSession
from aiohttp import ClientSession, ClientConnectorError
from urllib.parse import urlsplit

if username or password:
Expand All @@ -480,15 +481,25 @@ async def connect(self):
parsed_url = urlsplit(url)
url = f"{parsed_url.scheme}://{parsed_url.netloc}:{self.port}{parsed_url.path}"

self.ws = await self.session.ws_connect(
url=url,
timeout=self._connect_timeout,
protocols=[AMQP_WS_SUBPROTOCOL],
autoclose=False,
proxy=http_proxy_host,
proxy_auth=http_proxy_auth,
ssl=self.sslopts,
)
try:
self.ws = await self.session.ws_connect(
url=url,
timeout=self._connect_timeout,
protocols=[AMQP_WS_SUBPROTOCOL],
autoclose=False,
proxy=http_proxy_host,
proxy_auth=http_proxy_auth,
ssl=self.sslopts,
)
except ClientConnectorError as exc:
if self._custom_endpoint:
raise AuthenticationException(
ErrorCondition.SocketError, # TODO: ClientError?
description="Failed to authenticate the connection due to exception: " + str(exc),
error=exc,
)
else:
raise
self.connected = True

except ImportError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ def _create_eventhub_exception(exception):
error = AuthenticationError(str(exception), exception)
elif isinstance(exception, errors.AMQPLinkError):
error = ConnectError(str(exception), exception)
# TODO: do we need MessageHanlderError in amqp any more
# TODO: do we need MessageHandlerError in amqp any more
# if connection/session/link error are enough?
# elif isinstance(exception, errors.MessageHandlerError):
# error = ConnectionLostError(str(exception), exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from azure.core.credentials import AzureSasCredential, AzureNamedKeyCredential
from azure.identity.aio import EnvironmentCredential
from azure.eventhub import EventData
from azure.eventhub.exceptions import ConnectError, AuthenticationError, EventHubError
from azure.eventhub.aio import EventHubConsumerClient, EventHubProducerClient, EventHubSharedKeyCredential
from azure.eventhub.aio._client_base_async import EventHubSASTokenCredential

Expand Down Expand Up @@ -134,3 +135,164 @@ async def test_client_azure_named_key_credential_async(live_eventhub, uamqp_tran

credential.update(live_eventhub['key_name'], live_eventhub['access_key'])
assert (await consumer_client.get_eventhub_properties()) is not None

@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_client_invalid_credential_async(live_eventhub, uamqp_transport):

async def on_event(partition_context, event):
pass

async def on_error(partition_context, error):
on_error.err = error

env_credential = EnvironmentCredential()
producer_client = EventHubProducerClient(fully_qualified_namespace="fakeeventhub.servicebus.windows.net",
eventhub_name=live_eventhub['event_hub'],
credential=env_credential,
user_agent='customized information',
retry_total=1,
retry_mode='exponential',
retry_backoff=0.02,
uamqp_transport=uamqp_transport)
consumer_client = EventHubConsumerClient(fully_qualified_namespace="fakeeventhub.servicebus.windows.net",
eventhub_name=live_eventhub['event_hub'],
credential=env_credential,
user_agent='customized information',
consumer_group='$Default',
retry_total=1,
retry_mode='exponential',
retry_backoff=0.02,
uamqp_transport=uamqp_transport)
async with producer_client:
with pytest.raises(ConnectError):
await producer_client.create_batch(partition_id='0')

on_error.err = None
async with consumer_client:
task = asyncio.ensure_future(consumer_client.receive(on_event,
starting_position= "-1", on_error=on_error))
await asyncio.sleep(15)
await task
assert isinstance(on_error.err, ConnectError)

producer_client = EventHubProducerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name='fakehub',
credential=env_credential,
uamqp_transport=uamqp_transport)

consumer_client = EventHubConsumerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name='fakehub',
credential=env_credential,
consumer_group='$Default',
retry_total=0,
uamqp_transport=uamqp_transport)

async with producer_client:
with pytest.raises(ConnectError):
await producer_client.create_batch(partition_id='0')

on_error.err = None
async with consumer_client:
task = asyncio.ensure_future(consumer_client.receive(on_event,
starting_position= "-1", on_error=on_error))
await asyncio.sleep(15)
await task
assert isinstance(on_error.err, AuthenticationError)

credential = EventHubSharedKeyCredential(live_eventhub['key_name'], live_eventhub['access_key'])
auth_uri = "sb://{}/{}".format(live_eventhub['hostname'], live_eventhub['event_hub'])
token = await credential.get_token(auth_uri)
producer_client = EventHubProducerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
credential=EventHubSASTokenCredential(token.token, time.time() + 5),
uamqp_transport=uamqp_transport)
time.sleep(6)
# expired credential
# uamqp: EventHubError('expected bytes, AccessToken found\nexpected bytes, AccessToken found')
# pyamqp: AuthenticationError
async with producer_client:
with pytest.raises(AuthenticationError):
await producer_client.create_batch(partition_id='0')

consumer_client = EventHubConsumerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
credential=EventHubSASTokenCredential(token, time.time() + 7),
consumer_group='$Default',
retry_total=0,
uamqp_transport=uamqp_transport)
on_error.err = None
async with consumer_client:
task = asyncio.ensure_future(consumer_client.receive(on_event,
starting_position= "-1", on_error=on_error))
await asyncio.sleep(15)
await task

# expired credential
# uamqp: EventHubError('expected bytes, AccessToken found\nexpected bytes, AccessToken found')
# pyamqp: AuthenticationError
assert isinstance(on_error.err, AuthenticationError)

credential = EventHubSharedKeyCredential('fakekey', live_eventhub['access_key'])
producer_client = EventHubProducerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
credential=credential,
uamqp_transport=uamqp_transport)

async with producer_client:
with pytest.raises(AuthenticationError):
await producer_client.create_batch(partition_id='0')

producer_client = EventHubProducerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
credential=env_credential,
connection_verify="cacert.pem",
uamqp_transport=uamqp_transport)

# uamqp: EventHubError
# pyamqp: Not raising error
async with producer_client:
with pytest.raises(EventHubError):
await producer_client.create_batch(partition_id='0')

consumer_client = EventHubConsumerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
consumer_group='$Default',
credential=env_credential,
retry_total=0,
connection_verify="cacert.pem",
uamqp_transport=uamqp_transport)
async with consumer_client:
task = asyncio.ensure_future(consumer_client.receive(on_event,
starting_position= "-1", on_error=on_error))
await asyncio.sleep(5)
await task

# uamqp: FileNotFoundError
# pyamqp: Not raising error
assert isinstance(on_error.err, FileNotFoundError)

producer_client = EventHubProducerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
credential=env_credential,
custom_endpoint_address="fakeaddr",
uamqp_transport=uamqp_transport)

async with producer_client:
with pytest.raises(AuthenticationError):
await producer_client.create_batch(partition_id='0')

consumer_client = EventHubConsumerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
consumer_group='$Default',
credential=env_credential,
retry_total=0,
custom_endpoint_address="fakeaddr",
uamqp_transport=uamqp_transport)
async with consumer_client:
task = asyncio.ensure_future(consumer_client.receive(on_event,
starting_position= "-1", on_error=on_error))
await asyncio.sleep(15)
await task

assert isinstance(on_error.err, AuthenticationError)
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,30 @@ async def on_error(events, error, pid):
uamqp_transport=uamqp_transport
)

def on_success_missing_params(events):
on_success_missing_params.events = events

def on_error_missing_params(events, pid):
on_error_missing_params.events = events

producer = EventHubProducerClient.from_connection_string(
connection_str,
buffered_mode=True,
buffer_concurrency=2,
on_success=on_success_missing_params,
on_error=on_error_missing_params,
uamqp_transport=uamqp_transport,
)

on_success_missing_params.events = None
on_error_missing_params.events = None

# successfully send, but don't enter invalid callback
async with producer:
await producer.send_event(EventData('Single data'))

assert not on_success_missing_params.events
assert not on_error_missing_params.events

@pytest.mark.liveTest
@pytest.mark.asyncio
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,11 @@ async def test_send_multiple_partition_with_app_prop_async(connstr_receivers, ua
app_prop_key = "raw_prop"
app_prop_value = "raw_value"
app_prop = {app_prop_key: app_prop_value}
client = EventHubProducerClient.from_connection_string(connection_str, uamqp_transport=uamqp_transport)
client = EventHubProducerClient.from_connection_string(
connection_str,
uamqp_transport=uamqp_transport,
transport_type=TransportType.Amqp
)
async with client:
ed0 = EventData(b"Message 0")
ed0.properties = app_prop
Expand Down Expand Up @@ -408,7 +412,11 @@ async def test_send_with_create_event_batch_async(connstr_receivers, uamqp_trans
@pytest.mark.asyncio
async def test_send_list_async(connstr_receivers, uamqp_transport, timeout_factor):
connection_str, receivers = connstr_receivers
client = EventHubProducerClient.from_connection_string(connection_str, uamqp_transport=uamqp_transport)
client = EventHubProducerClient.from_connection_string(
connection_str,
uamqp_transport=uamqp_transport,
transport_type=uamqp_TransportType.Amqp
)
payload = "A1"
async with client:
await client.send_batch([EventData(payload)])
Expand Down
Loading