Skip to content

Commit

Permalink
[AMQP Python] Eventhub Pyamqp tests (#24895)
Browse files Browse the repository at this point in the history
* starting tests

* updates to websocket sync

* moving around format - unittest and live test

* live test + unittests starting

* websocket async passing

* eol

* assert not return

* assert not return

* fixed assert

* auth tests

* auth unittest pyamqp

* replicating uamqp tests

* keep_alive_thread

* skip for now - no keep alive

* pickle/deepcopy, might not want to keep all

* stopping here for now - need tls on rabbitmq

* cleaning up tests - pickle

* removing and editing uneeded tests

* removing unused test

* added receive amqp tests

* exceptions with pytest.raises, not live

* moving around tests

* testing mgmt calls like _start_producer

* Use --no-cone in pipeline sparse checkout script (#25165)

Co-authored-by: Ben Broderick Phillips <bebroder@microsoft.com>

* unused imports

Co-authored-by: Azure SDK Bot <53356347+azure-sdk@users.noreply.github.com>
Co-authored-by: Ben Broderick Phillips <bebroder@microsoft.com>
  • Loading branch information
3 people authored Jul 14, 2022
1 parent dc73ebe commit 4eb5772
Show file tree
Hide file tree
Showing 11 changed files with 599 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import pytest
import asyncio
import logging

from azure.eventhub._pyamqp.aio import _authentication_async
from azure.eventhub._pyamqp.aio import ReceiveClientAsync, SendClientAsync
from azure.eventhub._pyamqp.constants import TransportType
from azure.eventhub._pyamqp.message import Message


async def send_message(live_eventhub):
uri = "sb://{}/{}".format(live_eventhub['hostname'], live_eventhub['event_hub'])
sas_auth = _authentication_async.SASTokenAuthAsync(
uri=uri,
audience=uri,
username=live_eventhub['key_name'],
password=live_eventhub['access_key']
)

target = "amqps://{}/{}/Partitions/{}".format(
live_eventhub['hostname'],
live_eventhub['event_hub'],
live_eventhub['partition'])

message = Message(value="Single Message")

async with SendClientAsync(live_eventhub['hostname'], target, auth=sas_auth, debug=True, transport_type=TransportType.Amqp) as send_client:
await send_client.send_message_async(message)

@pytest.mark.asyncio
async def test_event_hubs_client_amqp_async(live_eventhub):
uri = "sb://{}/{}".format(live_eventhub['hostname'], live_eventhub['event_hub'])
sas_auth = _authentication_async.SASTokenAuthAsync(
uri=uri,
audience=uri,
username=live_eventhub['key_name'],
password=live_eventhub['access_key']
)

source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
live_eventhub['hostname'],
live_eventhub['event_hub'],
live_eventhub['consumer_group'],
live_eventhub['partition'])

await send_message(live_eventhub=live_eventhub)

async with ReceiveClientAsync(live_eventhub['hostname'], source, auth=sas_auth, debug=False, timeout=500, prefetch=1, transport_type=TransportType.Amqp) as receive_client:
messages = await receive_client.receive_message_batch_async(max_batch_size=1)
assert len(messages) > 0

Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,50 @@
import asyncio
import logging

from azure.eventhub._pyamqp import authentication
from azure.eventhub._pyamqp.aio import ReceiveClientAsync
from azure.eventhub._pyamqp.aio import _authentication_async
from azure.eventhub._pyamqp.aio import ReceiveClientAsync, SendClientAsync
from azure.eventhub._pyamqp.constants import TransportType
from azure.eventhub._pyamqp.message import Message


async def send_message(live_eventhub):
uri = "sb://{}/{}".format(live_eventhub['hostname'], live_eventhub['event_hub'])
sas_auth = _authentication_async.SASTokenAuthAsync(
uri=uri,
audience=uri,
username=live_eventhub['key_name'],
password=live_eventhub['access_key']
)

target = "amqps://{}/{}/Partitions/{}".format(
live_eventhub['hostname'],
live_eventhub['event_hub'],
live_eventhub['partition'])

message = Message(value="Single Message")

async with SendClientAsync(live_eventhub['hostname'], target, auth=sas_auth, debug=True, transport_type=TransportType.Amqp) as send_client:
await send_client.send_message_async(message)

@pytest.mark.asyncio
@pytest.mark.skip()
async def test_event_hubs_client_web_socket(eventhub_config):
uri = "sb://{}/{}".format(eventhub_config['hostname'], eventhub_config['event_hub'])
sas_auth = SASTokenAuthAsync(
async def test_event_hubs_client_web_socket_async(live_eventhub):
uri = "sb://{}/{}".format(live_eventhub['hostname'], live_eventhub['event_hub'])
sas_auth = _authentication_async.SASTokenAuthAsync(
uri=uri,
audience=uri,
username=eventhub_config['key_name'],
password=eventhub_config['access_key']
username=live_eventhub['key_name'],
password=live_eventhub['access_key']
)

source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
eventhub_config['hostname'],
eventhub_config['event_hub'],
eventhub_config['consumer_group'],
eventhub_config['partition'])

receive_client = ReceiveClientAsync(eventhub_config['hostname'] + '/$servicebus/websocket/', source, auth=sas_auth, debug=False, timeout=5000, prefetch=50, transport_type=TransportType.AmqpOverWebsocket)
await receive_client.open_async()
while not await receive_client.client_ready_async():
await asyncio.sleep(0.05)
messages = await receive_client.receive_message_batch_async(max_batch_size=1)
logging.info(len(messages))
logging.info(messages[0])
await receive_client.close_async()
live_eventhub['hostname'],
live_eventhub['event_hub'],
live_eventhub['consumer_group'],
live_eventhub['partition'])

await send_message(live_eventhub=live_eventhub)

async with ReceiveClientAsync(live_eventhub['hostname'] + '/$servicebus/websocket/', source, auth=sas_auth, debug=False, timeout=500, prefetch=1, transport_type=TransportType.AmqpOverWebsocket) as receive_client:
messages = await receive_client.receive_message_batch_async(max_batch_size=1)
assert len(messages) > 0

Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import pytest
import time
from azure.identity import EnvironmentCredential, DefaultAzureCredential
from azure.eventhub import EventHubProducerClient, EventHubSharedKeyCredential
from azure.eventhub._client_base import EventHubSASTokenCredential
from azure.core.credentials import AzureSasCredential, AzureNamedKeyCredential

@pytest.mark.livetest
def test_mgmt_call_conn_str(connstr_receivers):
connection_str, receivers = connstr_receivers
client = EventHubProducerClient.from_connection_string(connection_str)
client._start_producer("0",60)

@pytest.mark.livetest
def test_mgmt_call_default_azure_credential(live_eventhub):
credential = DefaultAzureCredential()
client = EventHubProducerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
credential=credential,
user_agent='customized information')
client._start_producer("0",60)

@pytest.mark.livetest
def test_mgmt_call_credential(live_eventhub):
credential = EnvironmentCredential()
client = EventHubProducerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
credential=credential,
user_agent='customized information')
client._start_producer("0",60)

@pytest.mark.livetest
def test_mgmt_call_sas(live_eventhub):
hostname = live_eventhub["hostname"]
credential = EventHubSharedKeyCredential(live_eventhub['key_name'], live_eventhub['access_key'])
auth_uri = "sb://{}/{}".format(hostname, live_eventhub['event_hub'])
token = credential.get_token(auth_uri).token
client = EventHubProducerClient(fully_qualified_namespace=hostname,
eventhub_name=live_eventhub['event_hub'],
credential=EventHubSASTokenCredential(token, time.time() + 3000))
client._start_producer("0",60)
assert True

@pytest.mark.livetest
def test_mgmt_call_sas_credential(live_eventhub):
hostname = live_eventhub["hostname"]
credential = EventHubSharedKeyCredential(live_eventhub['key_name'], live_eventhub['access_key'])
auth_uri = "sb://{}/{}".format(hostname, live_eventhub['event_hub'])
token = credential.get_token(auth_uri).token.decode()
client = EventHubProducerClient(fully_qualified_namespace=hostname,
eventhub_name=live_eventhub['event_hub'],
credential=AzureSasCredential(token))
client._start_producer("0",60)
assert True

@pytest.mark.livetest
def test_mgmt_call_azure_named_key_credential(live_eventhub):
credential = AzureNamedKeyCredential(live_eventhub['key_name'], live_eventhub['access_key'])
client = EventHubProducerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
credential=credential)

client._start_producer("0",60)
assert True
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import pytest

from azure.eventhub._pyamqp import authentication, ReceiveClient, SendClient
from azure.eventhub._pyamqp.constants import TransportType
from azure.eventhub._pyamqp.message import Message


def send_message(live_eventhub):
uri = "sb://{}/{}".format(live_eventhub['hostname'], live_eventhub['event_hub'])
sas_auth = authentication.SASTokenAuth(
uri=uri,
audience=uri,
username=live_eventhub['key_name'],
password=live_eventhub['access_key']
)

target = "amqps://{}/{}/Partitions/{}".format(
live_eventhub['hostname'],
live_eventhub['event_hub'],
live_eventhub['partition'])

message = Message(value="Single Message")

with SendClient(live_eventhub['hostname'], target, auth=sas_auth, debug=True, transport_type=TransportType.Amqp) as send_client:
send_client.send_message(message)


def test_event_hubs_client_amqp(live_eventhub):
uri = "sb://{}/{}".format(live_eventhub['hostname'], live_eventhub['event_hub'])
sas_auth = authentication.SASTokenAuth(
uri=uri,
audience=uri,
username=live_eventhub['key_name'],
password=live_eventhub['access_key']
)

source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
live_eventhub['hostname'],
live_eventhub['event_hub'],
live_eventhub['consumer_group'],
live_eventhub['partition'])

send_message(live_eventhub=live_eventhub)

with ReceiveClient(live_eventhub['hostname'], source, auth=sas_auth, debug=False, timeout=500, prefetch=1, transport_type=TransportType.Amqp) as receive_client:
messages = receive_client.receive_message_batch(max_batch_size=1)
assert len(messages) > 0
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,31 @@

import pytest

from azure.eventhub._pyamqp import authentication, ReceiveClient
from azure.eventhub._pyamqp import authentication, ReceiveClient, SendClient
from azure.eventhub._pyamqp.constants import TransportType
from azure.eventhub._pyamqp.message import Message


def send_message(live_eventhub):
uri = "sb://{}/{}".format(live_eventhub['hostname'], live_eventhub['event_hub'])
sas_auth = authentication.SASTokenAuth(
uri=uri,
audience=uri,
username=live_eventhub['key_name'],
password=live_eventhub['access_key']
)

target = "amqps://{}/{}/Partitions/{}".format(
live_eventhub['hostname'],
live_eventhub['event_hub'],
live_eventhub['partition'])

message = Message(value="Single Message")

with SendClient(live_eventhub['hostname'], target, auth=sas_auth, debug=True, transport_type=TransportType.Amqp) as send_client:
send_client.send_message(message)


@pytest.mark.skip()
def test_event_hubs_client_web_socket(live_eventhub):
uri = "sb://{}/{}".format(live_eventhub['hostname'], live_eventhub['event_hub'])
sas_auth = authentication.SASTokenAuth(
Expand All @@ -24,5 +45,8 @@ def test_event_hubs_client_web_socket(live_eventhub):
live_eventhub['consumer_group'],
live_eventhub['partition'])

with ReceiveClient(live_eventhub['hostname'] + '/$servicebus/websocket/', source, auth=sas_auth, debug=False, timeout=5000, prefetch=50, transport_type=TransportType.AmqpOverWebsocket) as receive_client:
receive_client.receive_message_batch(max_batch_size=10)
send_message(live_eventhub=live_eventhub)

with ReceiveClient(live_eventhub['hostname'] + '/$servicebus/websocket/', source, auth=sas_auth, debug=False, timeout=500, prefetch=1, transport_type=TransportType.AmqpOverWebsocket) as receive_client:
messages = receive_client.receive_message_batch(max_batch_size=1)
assert len(messages) > 0
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------
from unittest.mock import Mock
from base64 import encode
import os
import sys
import pytest
import uuid

root_path = os.path.realpath('.')
sys.path.append(root_path)

from azure.eventhub._pyamqp.types import AMQPTypes
from azure.eventhub._pyamqp.utils import amqp_uint_value, amqp_long_value, amqp_string_value

def test_uint_value():
value = amqp_uint_value(255)
assert value.get("VALUE") == 255
assert value.get("TYPE") == AMQPTypes.uint


def test_long_value():
value = amqp_long_value(255)
assert value.get("VALUE") == 255
assert value.get("TYPE") == AMQPTypes.long

def test_string_value():
value = amqp_string_value("hello")
assert value.get("VALUE") == "hello"
assert value.get("TYPE") == AMQPTypes.string
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@

import pytest
import functools

from unittest.mock import Mock
from azure.eventhub._pyamqp.sasl import SASLAnonymousCredential, SASLPlainCredential
from azure.eventhub._pyamqp.authentication import SASLPlainAuth, JWTTokenAuth, SASTokenAuth


def test_sasl_plain_auth():
auth = SASLPlainAuth(
authcid="authcid",
passwd="passwd",
authzid="Some Authzid"
)
assert auth.auth_type=="AUTH_SASL_PLAIN"
assert auth.sasl.mechanism==b"PLAIN"
assert auth.sasl.start() == b'Some Authzid\x00authcid\x00passwd'

def test_jwt_token_auth():
credential = Mock()
attr = {"get_token.return_value": "my_token"}
credential.configure_mock(**attr)
auth = JWTTokenAuth(
uri="my_uri",
audience="my_audience_field",
get_token=functools.partial(credential.get_token, "my_auth_uri")
)

assert auth.uri == "my_uri"
assert auth.audience == "my_audience_field"

def test_sas_token_auth():
auth = SASTokenAuth(
uri="my_uri",
audience="my_audience",
username="username",
password="password"
)

assert auth.uri == "my_uri"
assert auth.audience == "my_audience"
assert auth.username == "username"
assert auth.password == "password"
Loading

0 comments on commit 4eb5772

Please sign in to comment.